Write through cache impl
This commit is contained in:
parent
427f490a61
commit
54b17d5ee1
6 changed files with 203 additions and 34 deletions
52
Cargo.lock
generated
52
Cargo.lock
generated
|
|
@ -17,6 +17,18 @@ version = "1.0.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ahash"
|
||||||
|
version = "0.8.11"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"once_cell",
|
||||||
|
"version_check",
|
||||||
|
"zerocopy",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "aho-corasick"
|
name = "aho-corasick"
|
||||||
version = "1.1.3"
|
version = "1.1.3"
|
||||||
|
|
@ -26,6 +38,12 @@ dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "allocator-api2"
|
||||||
|
version = "0.2.18"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "android-tzdata"
|
name = "android-tzdata"
|
||||||
version = "0.1.1"
|
version = "0.1.1"
|
||||||
|
|
@ -904,6 +922,10 @@ name = "hashbrown"
|
||||||
version = "0.14.3"
|
version = "0.14.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604"
|
checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604"
|
||||||
|
dependencies = [
|
||||||
|
"ahash",
|
||||||
|
"allocator-api2",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hdrhistogram"
|
name = "hdrhistogram"
|
||||||
|
|
@ -1293,6 +1315,7 @@ dependencies = [
|
||||||
"librqbit-sha1-wrapper",
|
"librqbit-sha1-wrapper",
|
||||||
"librqbit-tracker-comms",
|
"librqbit-tracker-comms",
|
||||||
"librqbit-upnp",
|
"librqbit-upnp",
|
||||||
|
"lru",
|
||||||
"memmap2",
|
"memmap2",
|
||||||
"openssl",
|
"openssl",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
|
|
@ -1471,6 +1494,15 @@ version = "0.4.21"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
|
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lru"
|
||||||
|
version = "0.12.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc"
|
||||||
|
dependencies = [
|
||||||
|
"hashbrown 0.14.3",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "matchers"
|
name = "matchers"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
|
@ -3173,6 +3205,26 @@ version = "0.8.20"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "791978798f0597cfc70478424c2b4fdc2b7a8024aaff78497ef00f24ef674193"
|
checksum = "791978798f0597cfc70478424c2b4fdc2b7a8024aaff78497ef00f24ef674193"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "zerocopy"
|
||||||
|
version = "0.7.32"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be"
|
||||||
|
dependencies = [
|
||||||
|
"zerocopy-derive",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "zerocopy-derive"
|
||||||
|
version = "0.7.32"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "zeroize"
|
name = "zeroize"
|
||||||
version = "1.7.0"
|
version = "1.7.0"
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,7 @@ rlimit = "0.10.1"
|
||||||
async-stream = "0.3.5"
|
async-stream = "0.3.5"
|
||||||
memmap2 = "0.9.4"
|
memmap2 = "0.9.4"
|
||||||
rand_distr = "0.4.3"
|
rand_distr = "0.4.3"
|
||||||
|
lru = "0.12.3"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
futures = { version = "0.3" }
|
futures = { version = "0.3" }
|
||||||
|
|
|
||||||
|
|
@ -1,2 +1,3 @@
|
||||||
pub mod slow;
|
pub mod slow;
|
||||||
pub mod timing;
|
pub mod timing;
|
||||||
|
pub mod write_through_cache;
|
||||||
|
|
|
||||||
112
crates/librqbit/src/storage/middleware/write_through_cache.rs
Normal file
112
crates/librqbit/src/storage/middleware/write_through_cache.rs
Normal file
|
|
@ -0,0 +1,112 @@
|
||||||
|
use std::num::NonZeroUsize;
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
|
use librqbit_core::lengths::{Lengths, ValidPieceIndex};
|
||||||
|
use lru::LruCache;
|
||||||
|
use parking_lot::RwLock;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
storage::{StorageFactory, StorageFactoryExt, TorrentStorage},
|
||||||
|
FileInfos,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
pub struct WriteThroughCacheStorageFactory<U> {
|
||||||
|
max_cache_bytes: u64,
|
||||||
|
underlying: U,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<U> WriteThroughCacheStorageFactory<U> {
|
||||||
|
pub fn new(max_cache_bytes: u64, underlying: U) -> Self {
|
||||||
|
Self {
|
||||||
|
max_cache_bytes,
|
||||||
|
underlying,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<U: StorageFactory + Clone> StorageFactory for WriteThroughCacheStorageFactory<U> {
|
||||||
|
type Storage = WriteThroughCacheStorage<U::Storage>;
|
||||||
|
|
||||||
|
fn init_storage(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
|
||||||
|
let pieces = self
|
||||||
|
.max_cache_bytes
|
||||||
|
.div_ceil(info.lengths.default_piece_length() as u64)
|
||||||
|
.try_into()?;
|
||||||
|
let pieces = NonZeroUsize::new(pieces).context("bug: pieces == 0")?;
|
||||||
|
let lru = RwLock::new(LruCache::new(pieces));
|
||||||
|
Ok(WriteThroughCacheStorage {
|
||||||
|
lru,
|
||||||
|
underlying: self.underlying.init_storage(info)?,
|
||||||
|
lengths: info.lengths,
|
||||||
|
file_infos: info.file_infos.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clone_box(&self) -> crate::storage::BoxStorageFactory {
|
||||||
|
self.clone().boxed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct WriteThroughCacheStorage<U> {
|
||||||
|
lru: RwLock<LruCache<ValidPieceIndex, Box<[u8]>>>,
|
||||||
|
lengths: Lengths,
|
||||||
|
file_infos: FileInfos,
|
||||||
|
underlying: U,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<U: TorrentStorage> TorrentStorage for WriteThroughCacheStorage<U> {
|
||||||
|
fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> {
|
||||||
|
let file = self.file_infos.get(file_id).context("wrong file")?;
|
||||||
|
let current = self
|
||||||
|
.lengths
|
||||||
|
.compute_current_piece(offset, file.offset_in_torrent)
|
||||||
|
.context("wrong piece")?;
|
||||||
|
let mut g = self.lru.write();
|
||||||
|
if let Some(p) = g.get(¤t.id) {
|
||||||
|
let start = current.piece_offset as usize;
|
||||||
|
let end = start + buf.len();
|
||||||
|
let pbuf = p.get(start..end).context("bugged length")?;
|
||||||
|
buf.copy_from_slice(pbuf);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
self.underlying.pread_exact(file_id, offset, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> {
|
||||||
|
let file = self.file_infos.get(file_id).context("wrong file")?;
|
||||||
|
let current = self
|
||||||
|
.lengths
|
||||||
|
.compute_current_piece(offset, file.offset_in_torrent)
|
||||||
|
.context("wrong piece")?;
|
||||||
|
let mut g = self.lru.write();
|
||||||
|
let pbuf = g.get_or_insert_mut(current.id, || {
|
||||||
|
vec![0; self.lengths.piece_length(current.id) as usize].into_boxed_slice()
|
||||||
|
});
|
||||||
|
let start = current.piece_offset as usize;
|
||||||
|
let end = start + buf.len();
|
||||||
|
pbuf.get_mut(start..end)
|
||||||
|
.context("bugged range")?
|
||||||
|
.copy_from_slice(buf);
|
||||||
|
self.underlying.pwrite_all(file_id, offset, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_file(&self, file_id: usize, filename: &std::path::Path) -> anyhow::Result<()> {
|
||||||
|
self.underlying.remove_file(file_id, filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()> {
|
||||||
|
self.underlying.ensure_file_length(file_id, length)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>> {
|
||||||
|
let replacement_cache = LruCache::new(NonZeroUsize::new(1).context("unreachable")?);
|
||||||
|
let lru = std::mem::replace(&mut *self.lru.write(), replacement_cache);
|
||||||
|
Ok(Box::new(WriteThroughCacheStorage {
|
||||||
|
lru: RwLock::new(lru),
|
||||||
|
underlying: self.underlying.take()?,
|
||||||
|
lengths: self.lengths,
|
||||||
|
file_infos: self.file_infos.clone(),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -11,7 +11,7 @@ use std::{
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
|
|
||||||
use librqbit_core::lengths::{Lengths, ValidPieceIndex};
|
use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex};
|
||||||
use tokio::io::{AsyncRead, AsyncSeek};
|
use tokio::io::{AsyncRead, AsyncSeek};
|
||||||
use tracing::{debug, trace};
|
use tracing::{debug, trace};
|
||||||
|
|
||||||
|
|
@ -34,7 +34,7 @@ struct StreamState {
|
||||||
|
|
||||||
impl StreamState {
|
impl StreamState {
|
||||||
fn current_piece(&self, lengths: &Lengths) -> Option<CurrentPiece> {
|
fn current_piece(&self, lengths: &Lengths) -> Option<CurrentPiece> {
|
||||||
compute_current_piece(lengths, self.position, self.file_abs_offset)
|
lengths.compute_current_piece(self.position, self.file_abs_offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn queue<'a>(&self, lengths: &'a Lengths) -> impl Iterator<Item = ValidPieceIndex> + 'a {
|
fn queue<'a>(&self, lengths: &'a Lengths) -> impl Iterator<Item = ValidPieceIndex> + 'a {
|
||||||
|
|
@ -53,32 +53,6 @@ pub(crate) struct TorrentStreams {
|
||||||
streams: DashMap<StreamId, StreamState>,
|
streams: DashMap<StreamId, StreamState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct CurrentPiece {
|
|
||||||
id: ValidPieceIndex,
|
|
||||||
piece_remaining: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn compute_current_piece(
|
|
||||||
lengths: &Lengths,
|
|
||||||
file_pos: u64,
|
|
||||||
file_torrent_abs_offset: u64,
|
|
||||||
) -> Option<CurrentPiece> {
|
|
||||||
let dpl = lengths.default_piece_length();
|
|
||||||
|
|
||||||
let abs_pos = file_torrent_abs_offset + file_pos;
|
|
||||||
let piece_id = abs_pos / dpl as u64;
|
|
||||||
let piece_id: u32 = piece_id.try_into().ok()?;
|
|
||||||
|
|
||||||
let piece_id = lengths.validate_piece_index(piece_id)?;
|
|
||||||
let piece_len = lengths.piece_length(piece_id);
|
|
||||||
Some(CurrentPiece {
|
|
||||||
id: piece_id,
|
|
||||||
piece_remaining: (piece_len as u64 - (abs_pos % dpl as u64))
|
|
||||||
.try_into()
|
|
||||||
.ok()?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TorrentStreams {
|
impl TorrentStreams {
|
||||||
fn next_id(&self) -> usize {
|
fn next_id(&self) -> usize {
|
||||||
self.next_stream_id.fetch_add(1, Ordering::Relaxed)
|
self.next_stream_id.fetch_add(1, Ordering::Relaxed)
|
||||||
|
|
@ -199,12 +173,12 @@ impl AsyncRead for FileStream {
|
||||||
return Poll::Ready(Ok(()));
|
return Poll::Ready(Ok(()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let current = poll_try_io!(compute_current_piece(
|
let current = poll_try_io!(self
|
||||||
&self.torrent.info().lengths,
|
.torrent
|
||||||
self.position,
|
.info()
|
||||||
self.file_torrent_abs_offset
|
.lengths
|
||||||
)
|
.compute_current_piece(self.position, self.file_torrent_abs_offset)
|
||||||
.context("invalid position"));
|
.context("invalid position"));
|
||||||
|
|
||||||
// if the piece is not there, register to wake when it is
|
// if the piece is not there, register to wake when it is
|
||||||
// check if we have the piece for real
|
// check if we have the piece for real
|
||||||
|
|
|
||||||
|
|
@ -265,6 +265,35 @@ impl Lengths {
|
||||||
|
|
||||||
end.saturating_sub(offset)
|
end.saturating_sub(offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn compute_current_piece(
|
||||||
|
self: &Lengths,
|
||||||
|
file_pos: u64,
|
||||||
|
file_torrent_abs_offset: u64,
|
||||||
|
) -> Option<CurrentPiece> {
|
||||||
|
let dpl = self.default_piece_length();
|
||||||
|
|
||||||
|
let abs_pos = file_torrent_abs_offset + file_pos;
|
||||||
|
let piece_id = abs_pos / dpl as u64;
|
||||||
|
let piece_id: u32 = piece_id.try_into().ok()?;
|
||||||
|
|
||||||
|
let piece_id = self.validate_piece_index(piece_id)?;
|
||||||
|
let piece_len = self.piece_length(piece_id);
|
||||||
|
let piece_offset = (abs_pos / dpl as u64).try_into().ok()?;
|
||||||
|
Some(CurrentPiece {
|
||||||
|
id: piece_id,
|
||||||
|
piece_offset,
|
||||||
|
piece_remaining: (piece_len as u64 - (abs_pos % dpl as u64))
|
||||||
|
.try_into()
|
||||||
|
.ok()?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CurrentPiece {
|
||||||
|
pub id: ValidPieceIndex,
|
||||||
|
pub piece_remaining: u32,
|
||||||
|
pub piece_offset: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue