From d25309e358877f1bec761151fde41be6116d3f61 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 1 May 2024 15:23:11 +0100 Subject: [PATCH] Defer writes command line flag --- crates/librqbit/src/peer_connection.rs | 15 +- crates/librqbit/src/session.rs | 12 + crates/librqbit/src/tests/e2e.rs | 1 + crates/librqbit/src/torrent_state/live/mod.rs | 206 +++++++++++------- crates/librqbit/src/torrent_state/mod.rs | 9 + crates/rqbit/src/main.rs | 13 +- 6 files changed, 166 insertions(+), 90 deletions(-) diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 34028ca..d0fac39 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -37,7 +37,7 @@ pub trait PeerConnectionHandler { pub enum WriterRequest { Message(MessageOwned), ReadChunkRequest(ChunkInfo), - Disconnect, + Disconnect(anyhow::Result<()>), } #[serde_as] @@ -270,7 +270,8 @@ impl PeerConnection { let mut uploaded_add = None; - let len = match &req { + trace!("about to send: {:?}", &req); + let len = match req { WriterRequest::Message(msg) => msg.serialize(&mut write_buf, &|| { extended_handshake_ref .read() @@ -307,14 +308,14 @@ impl PeerConnection { // this whole section is an optimization write_buf.resize(PIECE_MESSAGE_DEFAULT_LEN, 0); - let preamble_len = serialize_piece_preamble(chunk, &mut write_buf); + let preamble_len = serialize_piece_preamble(&chunk, &mut write_buf); let full_len = preamble_len + chunk.size as usize; write_buf.resize(full_len, 0); if !skip_reading_for_e2e_tests { self.spawner .spawn_block_in_place(|| { self.handler - .read_chunk(chunk, &mut write_buf[preamble_len..]) + .read_chunk(&chunk, &mut write_buf[preamble_len..]) }) .with_context(|| format!("error reading chunk {chunk:?}"))?; } @@ -322,14 +323,12 @@ impl PeerConnection { uploaded_add = Some(chunk.size); full_len } - WriterRequest::Disconnect => { + WriterRequest::Disconnect(res) => { trace!("disconnect requested, closing writer"); - return Ok(()); + return res; } }; - trace!("sending: {:?}, length={}", &req, len); - with_timeout(rwtimeout, write_half.write_all(&write_buf[..len])) .await .context("error writing the message to peer")?; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index a7e660b..9374420 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -186,6 +186,8 @@ pub struct Session { cancellation_token: CancellationToken, + default_defer_writes: bool, + // This is stored for all tasks to stop when session is dropped. _cancellation_token_drop_guard: DropGuard, } @@ -306,6 +308,10 @@ pub struct AddTorrentOptions { pub preferred_id: Option, pub storage_factory: Option, + + // If true, will write to disk in separate threads. The downside is additional allocations. + // May be useful if the disk is slow. + pub defer_writes: Option, } pub struct ListOnlyResponse { @@ -413,6 +419,10 @@ pub struct SessionOptions { pub listen_port_range: Option>, pub enable_upnp_port_forwarding: bool, + + // If true, will write to disk in separate threads. The downside is additional allocations. + // May be useful if the disk is slow. + pub default_defer_writes: bool, } async fn create_tcp_listener( @@ -511,6 +521,7 @@ impl Session { _cancellation_token_drop_guard: token.clone().drop_guard(), cancellation_token: token, tcp_listen_port, + default_defer_writes: opts.default_defer_writes, }); if let Some(tcp_listener) = tcp_listener { @@ -1022,6 +1033,7 @@ impl Session { .allow_overwrite(opts.overwrite) .spawner(self.spawner) .trackers(trackers) + .defer_writes(opts.defer_writes.unwrap_or(self.default_defer_writes)) .peer_id(self.peer_id); if let Some(only_files) = only_files { diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 938ca11..86f615b 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -72,6 +72,7 @@ async fn test_e2e() { peer_opts: None, listen_port_range: Some(15100..17000), enable_upnp_port_forwarding: false, + default_defer_writes: false, }, ) .await diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 6373e07..ff9a3a6 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -747,7 +747,7 @@ impl TorrentStateLive { .take_live_no_counters() .unwrap() .tx - .send(WriterRequest::Disconnect); + .send(WriterRequest::Disconnect(Ok(()))); } } } @@ -1189,7 +1189,7 @@ impl PeerHandler { .unwrap_or_default() { debug!("both peer and us have full torrent, disconnecting"); - self.tx.send(WriterRequest::Disconnect)?; + self.tx.send(WriterRequest::Disconnect(Ok(())))?; // Sleep a bit to ensure this gets written to the network by manage_peer tokio::time::sleep(Duration::from_millis(100)).await; return Ok(()); @@ -1382,93 +1382,137 @@ impl PeerHandler { // By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would // have fallen off above in one of the defensive checks. + // - let work = { + fn write_to_disk( + state: &TorrentStateLive, + addr: PeerHandle, + counters: &AtomicPeerCounters, + piece: &Piece>, + chunk_info: &ChunkInfo, + full_piece_download_time: Option, + ) -> anyhow::Result<()> { + let index = piece.index; + + // Not being able to write to storage is a fatal error. You need to unpause the + // torrent to recover from it. + match state.file_ops().write_chunk(addr, piece, chunk_info) { + Ok(()) => {} + Err(e) => { + error!("FATAL: error writing chunk to disk: {:?}", e); + return state.on_fatal_error(e); + } + } + + // Global chunk/byte counters. + state + .stats + .fetched_bytes + .fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed); + + let full_piece_download_time = match full_piece_download_time { + Some(t) => t, + None => return Ok(()), + }; + + match state + .file_ops() + .check_piece(addr, chunk_info.piece_index, chunk_info) + .with_context(|| format!("error checking piece={index}"))? + { + true => { + { + let mut g = state.lock_write("mark_piece_downloaded"); + g.get_chunks_mut()? + .mark_piece_downloaded(chunk_info.piece_index); + } + + // Global piece counters. + let piece_len = state.lengths.piece_length(chunk_info.piece_index) as u64; + state + .stats + .downloaded_and_checked_bytes + // This counter is used to compute "is_finished", so using + // stronger ordering. + .fetch_add(piece_len, Ordering::Release); + state + .stats + .downloaded_and_checked_pieces + // This counter is used to compute "is_finished", so using + // stronger ordering. + .fetch_add(1, Ordering::Release); + state + .stats + .have_bytes + .fetch_add(piece_len, Ordering::Relaxed); + #[allow(clippy::cast_possible_truncation)] + state.stats.total_piece_download_ms.fetch_add( + full_piece_download_time.as_millis() as u64, + Ordering::Relaxed, + ); + + // Per-peer piece counters. + counters.on_piece_completed(piece_len, full_piece_download_time); + state.peers.reset_peer_backoff(addr); + + debug!("piece={} successfully downloaded and verified", index); + + state.on_piece_completed(chunk_info.piece_index)?; + + state.maybe_transmit_haves(chunk_info.piece_index); + } + false => { + warn!( + "checksum for piece={} did not validate. disconecting peer.", + index + ); + state + .lock_write("mark_piece_broken") + .get_chunks_mut()? + .mark_piece_broken_if_not_have(chunk_info.piece_index); + anyhow::bail!("i am probably a bogus peer. dying.") + } + }; + Ok(()) + } + + if self.state.meta().options.defer_writes { let state = self.state.clone(); let addr = self.addr; let counters = self.counters.clone(); let piece = piece.clone_to_owned(); - move || { - let index = piece.index; + let tx = self.tx.clone(); - // Not being able to write to storage is a fatal error. You need to unpause the - // torrent to recover from it. - match state.file_ops().write_chunk(addr, &piece, &chunk_info) { - Ok(()) => {} - Err(e) => { - error!("FATAL: error writing chunk to disk: {:?}", e); - return state.on_fatal_error(e); - } + let work = move || { + if let Err(e) = write_to_disk( + &state, + addr, + &counters, + &piece, + &chunk_info, + full_piece_download_time, + ) { + let _ = tx.send(WriterRequest::Disconnect(Err(e))); } + }; + tokio::runtime::Handle::current().spawn_blocking(work); + } else { + self.state + .meta + .spawner + .spawn_block_in_place(|| { + write_to_disk( + &self.state, + self.addr, + &self.counters, + &piece, + &chunk_info, + full_piece_download_time, + ) + }) + .with_context(|| format!("error processing received chunk {chunk_info:?}"))?; + } - let full_piece_download_time = match full_piece_download_time { - Some(t) => t, - None => return Ok(()), - }; - - match state - .file_ops() - .check_piece(addr, chunk_info.piece_index, &chunk_info) - .with_context(|| format!("error checking piece={index}"))? - { - true => { - { - let mut g = state.lock_write("mark_piece_downloaded"); - g.get_chunks_mut()? - .mark_piece_downloaded(chunk_info.piece_index); - } - - // Global piece counters. - let piece_len = state.lengths.piece_length(chunk_info.piece_index) as u64; - state - .stats - .downloaded_and_checked_bytes - // This counter is used to compute "is_finished", so using - // stronger ordering. - .fetch_add(piece_len, Ordering::Release); - state - .stats - .downloaded_and_checked_pieces - // This counter is used to compute "is_finished", so using - // stronger ordering. - .fetch_add(1, Ordering::Release); - state - .stats - .have_bytes - .fetch_add(piece_len, Ordering::Relaxed); - #[allow(clippy::cast_possible_truncation)] - state.stats.total_piece_download_ms.fetch_add( - full_piece_download_time.as_millis() as u64, - Ordering::Relaxed, - ); - - // Per-peer piece counters. - counters.on_piece_completed(piece_len, full_piece_download_time); - state.peers.reset_peer_backoff(addr); - - debug!("piece={} successfully downloaded and verified", index); - - state.on_piece_completed(chunk_info.piece_index)?; - - state.maybe_transmit_haves(chunk_info.piece_index); - } - false => { - warn!( - "checksum for piece={} did not validate. disconecting peer.", - index - ); - state - .lock_write("mark_piece_broken") - .get_chunks_mut()? - .mark_piece_broken_if_not_have(chunk_info.piece_index); - anyhow::bail!("i am probably a bogus peer. dying.") - } - }; - Ok::<_, anyhow::Error>(()) - } - }; - - tokio::runtime::Handle::current().spawn_blocking(work); Ok(()) } } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 65b806d..1471f2b 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -92,6 +92,7 @@ pub(crate) struct ManagedTorrentOptions { pub peer_read_write_timeout: Option, pub allow_overwrite: bool, pub output_folder: PathBuf, + pub defer_writes: bool, } pub struct ManagedTorrentInfo { @@ -505,6 +506,7 @@ pub(crate) struct ManagedTorrentBuilder { spawner: Option, allow_overwrite: bool, storage_factory: BoxStorageFactory, + defer_writes: bool, } impl ManagedTorrentBuilder { @@ -527,6 +529,7 @@ impl ManagedTorrentBuilder { allow_overwrite: false, output_folder, storage_factory, + defer_writes: false, } } @@ -570,6 +573,11 @@ impl ManagedTorrentBuilder { self } + pub fn defer_writes(&mut self, value: bool) -> &mut Self { + self.defer_writes = value; + self + } + pub fn build(self, span: tracing::Span) -> anyhow::Result { let lengths = Lengths::from_torrent(&self.info)?; let file_infos = self @@ -600,6 +608,7 @@ impl ManagedTorrentBuilder { peer_read_write_timeout: self.peer_read_write_timeout, allow_overwrite: self.allow_overwrite, output_folder: self.output_folder, + defer_writes: self.defer_writes, }, }); diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 40d1ca5..6698259 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -97,6 +97,16 @@ struct Opts { #[command(subcommand)] subcommand: SubCommand, + + /// How many blocking tokio threads to spawn to process disk reads/writes. + /// Might want to increase if the disk is slow. + #[arg(long = "max-blocking-threads", default_value = "16")] + max_blocking_threads: u16, + + /// If set, will write to disk in background and not inline with peer. + /// Might be useful if the disk is slow. + #[arg(long = "defer-writes", default_value = "false")] + defer_writes: bool, } #[derive(Parser)] @@ -239,7 +249,7 @@ fn main() -> anyhow::Result<()> { // note: we aren't using spawn_blocking() anymore, so this doesn't apply, // however I'm still messing around, so in case we do, let's block the number of // spawned threads. - .max_blocking_threads(8) + .max_blocking_threads(opts.max_blocking_threads as usize) .build()?; rt.block_on(async_main(opts)) @@ -282,6 +292,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { None }, enable_upnp_port_forwarding: !opts.disable_upnp, + default_defer_writes: opts.defer_writes, }; let stats_printer = |session: Arc| async move {