From eafd274a0b72a9fb19673712eccd5ca97360ff3b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 3 May 2024 14:55:31 +0100 Subject: [PATCH] --defer-writes-up-to --- crates/librqbit/src/session.rs | 38 +++++++++++--- crates/librqbit/src/tests/e2e.rs | 2 +- crates/librqbit/src/torrent_state/live/mod.rs | 49 +++---------------- crates/librqbit/src/torrent_state/mod.rs | 13 ++--- crates/librqbit/src/type_aliases.rs | 3 ++ crates/rqbit/src/main.rs | 12 +++-- 6 files changed, 55 insertions(+), 62 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 97f927a..88dcc5e 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -20,7 +20,7 @@ use crate::{ torrent_state::{ ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, }, - type_aliases::PeerStream, + type_aliases::{DiskWorkQueueSender, PeerStream}, }; use anyhow::{bail, Context}; use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; @@ -34,6 +34,7 @@ use futures::{ }; use itertools::Itertools; use librqbit_core::{ + constants::CHUNK_SIZE, directories::get_configuration_directory, magnet::Magnet, peer_id::generate_peer_id, @@ -188,7 +189,7 @@ pub struct Session { cancellation_token: CancellationToken, - default_defer_writes: bool, + disk_write_tx: Option, default_storage_factory: Option, @@ -424,9 +425,9 @@ pub struct SessionOptions { pub listen_port_range: Option>, pub enable_upnp_port_forwarding: bool, - // If true, will write to disk in separate threads. The downside is additional allocations. - // May be useful if the disk is slow. - pub default_defer_writes: bool, + // If you set this to something, all writes to disk will happen in background and be + // buffered in memory up to approximately the given number of megabytes. + pub defer_writes_up_to: Option, pub default_storage_factory: Option, } @@ -516,6 +517,16 @@ impl Session { }; let spawner = BlockingSpawner::default(); + let (disk_write_tx, disk_write_rx) = opts + .defer_writes_up_to + .map(|mb| { + const DISK_WRITE_APPROX_WORK_ITEM_SIZE: usize = CHUNK_SIZE as usize + 300; + let count = mb * 1024 * 1024 / DISK_WRITE_APPROX_WORK_ITEM_SIZE; + let (tx, rx) = tokio::sync::mpsc::channel(count); + (Some(tx), Some(rx)) + }) + .unwrap_or_default(); + let session = Arc::new(Self { persistence_filename, peer_id, @@ -527,10 +538,20 @@ impl Session { _cancellation_token_drop_guard: token.clone().drop_guard(), cancellation_token: token, tcp_listen_port, - default_defer_writes: opts.default_defer_writes, + disk_write_tx, default_storage_factory: opts.default_storage_factory, }); + if let Some(mut disk_write_rx) = disk_write_rx { + session.spawn(error_span!("disk_writer"), async move { + while let Some(work) = disk_write_rx.recv().await { + trace!(disk_write_rx_queue_len = disk_write_rx.len()); + spawner.spawn_block_in_place(work); + } + Ok(()) + }); + } + if let Some(tcp_listener) = tcp_listener { session.spawn( error_span!("tcp_listen", port = tcp_listen_port), @@ -1041,9 +1062,12 @@ impl Session { .allow_overwrite(opts.overwrite) .spawner(self.spawner) .trackers(trackers) - .defer_writes(opts.defer_writes.unwrap_or(self.default_defer_writes)) .peer_id(self.peer_id); + if let Some(d) = self.disk_write_tx.clone() { + builder.disk_writer(d); + } + if let Some(only_files) = only_files { builder.only_files(only_files); } diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index d994a78..cb28906 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -72,8 +72,8 @@ async fn test_e2e() { peer_opts: None, listen_port_range: Some(15100..17000), enable_upnp_port_forwarding: false, - default_defer_writes: false, default_storage_factory: None, + defer_writes_up_to: None, }, ) .await diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index f673cd8..1fc0da1 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -59,7 +59,6 @@ use buffers::{ByteBuf, ByteBufOwned}; use clone_to_owned::CloneToOwned; use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::{ - constants::CHUNK_SIZE, hash_id::Id20, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, spawn_utils::spawn_with_cancel, @@ -88,7 +87,7 @@ use crate::{ }, session::CheckedIncomingConnection, torrent_state::{peer::Peer, utils::atomic_inc}, - type_aliases::{FilePriorities, FileStorage, PeerHandle, BF}, + type_aliases::{DiskWorkQueueSender, FilePriorities, FileStorage, PeerHandle, BF}, }; use self::{ @@ -156,10 +155,6 @@ pub struct TorrentStateOptions { pub peer_read_write_timeout: Option, } -struct DiskWriteWorkItem { - work: Box, -} - pub struct TorrentStateLive { peers: PeerStates, meta: Arc, @@ -184,8 +179,6 @@ pub struct TorrentStateLive { up_speed_estimator: SpeedEstimator, cancellation_token: CancellationToken, - disk_work_tx: tokio::sync::mpsc::Sender, - pub(crate) streams: Arc, } @@ -217,15 +210,7 @@ impl TorrentStateLive { pri }; - let defer_writes = paused.info.options.defer_writes; - - // 8MB per torrent of disk buffering. - let (disk_work_tx, mut disk_work_rx) = tokio::sync::mpsc::channel(if defer_writes { - const APPROX_WORK_ITEM_SIZE: usize = CHUNK_SIZE as usize + 300; - 8 * 1024 * 1024 / APPROX_WORK_ITEM_SIZE - } else { - 1 - }); + let defer_writes = paused.info.options.disk_write_queue.is_some(); let state = Arc::new(TorrentStateLive { meta: paused.info.clone(), @@ -257,24 +242,8 @@ impl TorrentStateLive { } else { vec![] }, - disk_work_tx, }); - if defer_writes { - state.spawn( - error_span!(parent: state.meta.span.clone(), "disk_writer"), - { - let spawner = state.meta.spawner; - async move { - while let Some(work_item) = disk_work_rx.recv().await { - spawner.spawn_block_in_place(work_item.work); - } - Ok(()) - } - }, - ); - } - state.spawn( error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), { @@ -324,8 +293,8 @@ impl TorrentStateLive { &self.up_speed_estimator } - fn defer_writes(&self) -> bool { - self.meta.options.defer_writes + fn disk_work_tx(&self) -> Option<&DiskWorkQueueSender> { + self.meta.options.disk_write_queue.as_ref() } pub(crate) fn add_incoming_peer( @@ -1546,7 +1515,7 @@ impl PeerHandler { Ok(()) } - if self.state.defer_writes() { + if let Some(dtx) = self.state.disk_work_tx() { // TODO: shove all this into one thing to .clone() once rather than 5 times. let state = self.state.clone(); let addr = self.addr; @@ -1555,7 +1524,6 @@ impl PeerHandler { let tx = self.tx.clone(); let span = tracing::error_span!("deferred_write"); - let work = move || { span.in_scope(|| { if let Err(e) = write_to_disk(&state, addr, &counters, &piece, &chunk_info) { @@ -1563,12 +1531,7 @@ impl PeerHandler { } }) }; - self.state - .disk_work_tx - .send(DiskWriteWorkItem { - work: Box::new(work), - }) - .await?; + dtx.send(Box::new(work)).await?; } else { self.state .meta diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 1471f2b..b3438f2 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -38,6 +38,7 @@ use crate::file_info::FileInfo; use crate::spawn_utils::BlockingSpawner; use crate::storage::BoxStorageFactory; use crate::torrent_state::stats::LiveStats; +use crate::type_aliases::DiskWorkQueueSender; use crate::type_aliases::FileInfos; use crate::type_aliases::PeerStream; @@ -92,7 +93,7 @@ pub(crate) struct ManagedTorrentOptions { pub peer_read_write_timeout: Option, pub allow_overwrite: bool, pub output_folder: PathBuf, - pub defer_writes: bool, + pub disk_write_queue: Option, } pub struct ManagedTorrentInfo { @@ -506,7 +507,7 @@ pub(crate) struct ManagedTorrentBuilder { spawner: Option, allow_overwrite: bool, storage_factory: BoxStorageFactory, - defer_writes: bool, + disk_writer: Option, } impl ManagedTorrentBuilder { @@ -529,7 +530,7 @@ impl ManagedTorrentBuilder { allow_overwrite: false, output_folder, storage_factory, - defer_writes: false, + disk_writer: None, } } @@ -573,8 +574,8 @@ impl ManagedTorrentBuilder { self } - pub fn defer_writes(&mut self, value: bool) -> &mut Self { - self.defer_writes = value; + pub fn disk_writer(&mut self, value: DiskWorkQueueSender) -> &mut Self { + self.disk_writer = Some(value); self } @@ -608,7 +609,7 @@ impl ManagedTorrentBuilder { peer_read_write_timeout: self.peer_read_write_timeout, allow_overwrite: self.allow_overwrite, output_folder: self.output_folder, - defer_writes: self.defer_writes, + disk_write_queue: self.disk_writer, }, }); diff --git a/crates/librqbit/src/type_aliases.rs b/crates/librqbit/src/type_aliases.rs index 089e968..be5741e 100644 --- a/crates/librqbit/src/type_aliases.rs +++ b/crates/librqbit/src/type_aliases.rs @@ -11,3 +11,6 @@ pub type PeerStream = BoxStream<'static, SocketAddr>; pub type FileInfos = Vec; pub(crate) type FileStorage = Box; pub(crate) type FilePriorities = Vec; + +pub(crate) type DiskWorkQueueItem = Box; +pub(crate) type DiskWorkQueueSender = tokio::sync::mpsc::Sender; diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index e11602f..77b52fc 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -104,10 +104,12 @@ struct Opts { #[arg(long = "max-blocking-threads", default_value = "8")] max_blocking_threads: u16, - /// If set, will write to disk in background and not inline with peer. - /// Useful if the disk is slow or its latency is very unstable (e.g. HDD or old SSD). - #[arg(long = "defer-writes", default_value = "false")] - defer_writes: bool, + // If you set this to something, all writes to disk will happen in background and be + // buffered in memory up to approximately the given number of megabytes. + // + // Might be useful for slow disks. + #[arg(long = "defer-writes-up-to")] + defer_writes_up_to: Option, /// Use mmap (file-backed) for storage. Any advantages are questionable and unproven. /// If you use it, you know what you are doing. @@ -298,7 +300,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { None }, enable_upnp_port_forwarding: !opts.disable_upnp, - default_defer_writes: opts.defer_writes, + defer_writes_up_to: opts.defer_writes_up_to, default_storage_factory: Some({ fn wrap(s: S) -> impl StorageFactory { #[cfg(feature = "debug_slow_disk")]