From 54b17d5ee14b0155df471e44f56461ba56da592c Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 2 May 2024 20:59:09 +0100 Subject: [PATCH] Write through cache impl --- Cargo.lock | 52 ++++++++ crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/storage/middleware/mod.rs | 1 + .../storage/middleware/write_through_cache.rs | 112 ++++++++++++++++++ .../librqbit/src/torrent_state/streaming.rs | 42 ++----- crates/librqbit_core/src/lengths.rs | 29 +++++ 6 files changed, 203 insertions(+), 34 deletions(-) create mode 100644 crates/librqbit/src/storage/middleware/write_through_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 4bcb33f..5f02ebe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -26,6 +38,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -904,6 +922,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "hdrhistogram" @@ -1293,6 +1315,7 @@ dependencies = [ "librqbit-sha1-wrapper", "librqbit-tracker-comms", "librqbit-upnp", + "lru", "memmap2", "openssl", "parking_lot", @@ -1471,6 +1494,15 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lru" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "matchers" version = "0.1.0" @@ -3173,6 +3205,26 @@ version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "791978798f0597cfc70478424c2b4fdc2b7a8024aaff78497ef00f24ef674193" +[[package]] +name = "zerocopy" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zeroize" version = "1.7.0" diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 9a237b0..3f1e1a0 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -72,6 +72,7 @@ rlimit = "0.10.1" async-stream = "0.3.5" memmap2 = "0.9.4" rand_distr = "0.4.3" +lru = "0.12.3" [dev-dependencies] futures = { version = "0.3" } diff --git a/crates/librqbit/src/storage/middleware/mod.rs b/crates/librqbit/src/storage/middleware/mod.rs index e1b0fab..fb4bb3e 100644 --- a/crates/librqbit/src/storage/middleware/mod.rs +++ b/crates/librqbit/src/storage/middleware/mod.rs @@ -1,2 +1,3 @@ pub mod slow; pub mod timing; +pub mod write_through_cache; diff --git a/crates/librqbit/src/storage/middleware/write_through_cache.rs b/crates/librqbit/src/storage/middleware/write_through_cache.rs new file mode 100644 index 0000000..7b064b2 --- /dev/null +++ b/crates/librqbit/src/storage/middleware/write_through_cache.rs @@ -0,0 +1,112 @@ +use std::num::NonZeroUsize; + +use anyhow::Context; +use librqbit_core::lengths::{Lengths, ValidPieceIndex}; +use lru::LruCache; +use parking_lot::RwLock; + +use crate::{ + storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, + FileInfos, +}; + +#[derive(Clone, Copy)] +pub struct WriteThroughCacheStorageFactory { + max_cache_bytes: u64, + underlying: U, +} + +impl WriteThroughCacheStorageFactory { + pub fn new(max_cache_bytes: u64, underlying: U) -> Self { + Self { + max_cache_bytes, + underlying, + } + } +} + +impl StorageFactory for WriteThroughCacheStorageFactory { + type Storage = WriteThroughCacheStorage; + + fn init_storage(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + let pieces = self + .max_cache_bytes + .div_ceil(info.lengths.default_piece_length() as u64) + .try_into()?; + let pieces = NonZeroUsize::new(pieces).context("bug: pieces == 0")?; + let lru = RwLock::new(LruCache::new(pieces)); + Ok(WriteThroughCacheStorage { + lru, + underlying: self.underlying.init_storage(info)?, + lengths: info.lengths, + file_infos: info.file_infos.clone(), + }) + } + + fn clone_box(&self) -> crate::storage::BoxStorageFactory { + self.clone().boxed() + } +} + +pub struct WriteThroughCacheStorage { + lru: RwLock>>, + lengths: Lengths, + file_infos: FileInfos, + underlying: U, +} + +impl TorrentStorage for WriteThroughCacheStorage { + fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + let file = self.file_infos.get(file_id).context("wrong file")?; + let current = self + .lengths + .compute_current_piece(offset, file.offset_in_torrent) + .context("wrong piece")?; + let mut g = self.lru.write(); + if let Some(p) = g.get(¤t.id) { + let start = current.piece_offset as usize; + let end = start + buf.len(); + let pbuf = p.get(start..end).context("bugged length")?; + buf.copy_from_slice(pbuf); + return Ok(()); + } + self.underlying.pread_exact(file_id, offset, buf) + } + + fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { + let file = self.file_infos.get(file_id).context("wrong file")?; + let current = self + .lengths + .compute_current_piece(offset, file.offset_in_torrent) + .context("wrong piece")?; + let mut g = self.lru.write(); + let pbuf = g.get_or_insert_mut(current.id, || { + vec![0; self.lengths.piece_length(current.id) as usize].into_boxed_slice() + }); + let start = current.piece_offset as usize; + let end = start + buf.len(); + pbuf.get_mut(start..end) + .context("bugged range")? + .copy_from_slice(buf); + self.underlying.pwrite_all(file_id, offset, buf) + } + + fn remove_file(&self, file_id: usize, filename: &std::path::Path) -> anyhow::Result<()> { + self.underlying.remove_file(file_id, filename) + } + + fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()> { + self.underlying.ensure_file_length(file_id, length) + } + + fn take(&self) -> anyhow::Result> { + let replacement_cache = LruCache::new(NonZeroUsize::new(1).context("unreachable")?); + let lru = std::mem::replace(&mut *self.lru.write(), replacement_cache); + Ok(Box::new(WriteThroughCacheStorage { + lru: RwLock::new(lru), + underlying: self.underlying.take()?, + lengths: self.lengths, + file_infos: self.file_infos.clone(), + })) + } +} diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index e197290..48b4b3e 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -11,7 +11,7 @@ use std::{ use anyhow::Context; use dashmap::DashMap; -use librqbit_core::lengths::{Lengths, ValidPieceIndex}; +use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex}; use tokio::io::{AsyncRead, AsyncSeek}; use tracing::{debug, trace}; @@ -34,7 +34,7 @@ struct StreamState { impl StreamState { fn current_piece(&self, lengths: &Lengths) -> Option { - compute_current_piece(lengths, self.position, self.file_abs_offset) + lengths.compute_current_piece(self.position, self.file_abs_offset) } fn queue<'a>(&self, lengths: &'a Lengths) -> impl Iterator + 'a { @@ -53,32 +53,6 @@ pub(crate) struct TorrentStreams { streams: DashMap, } -struct CurrentPiece { - id: ValidPieceIndex, - piece_remaining: u32, -} - -fn compute_current_piece( - lengths: &Lengths, - file_pos: u64, - file_torrent_abs_offset: u64, -) -> Option { - let dpl = lengths.default_piece_length(); - - let abs_pos = file_torrent_abs_offset + file_pos; - let piece_id = abs_pos / dpl as u64; - let piece_id: u32 = piece_id.try_into().ok()?; - - let piece_id = lengths.validate_piece_index(piece_id)?; - let piece_len = lengths.piece_length(piece_id); - Some(CurrentPiece { - id: piece_id, - piece_remaining: (piece_len as u64 - (abs_pos % dpl as u64)) - .try_into() - .ok()?, - }) -} - impl TorrentStreams { fn next_id(&self) -> usize { self.next_stream_id.fetch_add(1, Ordering::Relaxed) @@ -199,12 +173,12 @@ impl AsyncRead for FileStream { return Poll::Ready(Ok(())); } - let current = poll_try_io!(compute_current_piece( - &self.torrent.info().lengths, - self.position, - self.file_torrent_abs_offset - ) - .context("invalid position")); + let current = poll_try_io!(self + .torrent + .info() + .lengths + .compute_current_piece(self.position, self.file_torrent_abs_offset) + .context("invalid position")); // if the piece is not there, register to wake when it is // check if we have the piece for real diff --git a/crates/librqbit_core/src/lengths.rs b/crates/librqbit_core/src/lengths.rs index 80c1204..1f06053 100644 --- a/crates/librqbit_core/src/lengths.rs +++ b/crates/librqbit_core/src/lengths.rs @@ -265,6 +265,35 @@ impl Lengths { end.saturating_sub(offset) } + + pub fn compute_current_piece( + self: &Lengths, + file_pos: u64, + file_torrent_abs_offset: u64, + ) -> Option { + let dpl = self.default_piece_length(); + + let abs_pos = file_torrent_abs_offset + file_pos; + let piece_id = abs_pos / dpl as u64; + let piece_id: u32 = piece_id.try_into().ok()?; + + let piece_id = self.validate_piece_index(piece_id)?; + let piece_len = self.piece_length(piece_id); + let piece_offset = (abs_pos / dpl as u64).try_into().ok()?; + Some(CurrentPiece { + id: piece_id, + piece_offset, + piece_remaining: (piece_len as u64 - (abs_pos % dpl as u64)) + .try_into() + .ok()?, + }) + } +} + +pub struct CurrentPiece { + pub id: ValidPieceIndex, + pub piece_remaining: u32, + pub piece_offset: u32, } #[cfg(test)]