From 43008f28571e304066842fd896ef96ef9f9fe5d2 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 3 May 2024 13:28:31 +0100 Subject: [PATCH] Less work when defer writes is disabled --- crates/librqbit/src/torrent_state/live/mod.rs | 59 +++++++++++++------ 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 5a1b31c..3e42bd7 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -217,9 +217,14 @@ impl TorrentStateLive { pri }; + let defer_writes = paused.info.options.defer_writes; + // 8MB per torrent of disk buffering. - let (disk_work_tx, mut disk_work_rx) = - tokio::sync::mpsc::channel(8 * 1024 * 1024 / CHUNK_SIZE as usize); + let (disk_work_tx, mut disk_work_rx) = tokio::sync::mpsc::channel(if defer_writes { + 8 * 1024 * 1024 / CHUNK_SIZE as usize + } else { + 1 + }); let state = Arc::new(TorrentStateLive { meta: paused.info.clone(), @@ -244,21 +249,27 @@ impl TorrentStateLive { up_speed_estimator, cancellation_token, streams: paused.streams, - per_piece_locks: (0..lengths.total_pieces()) - .map(|_| RwLock::new(())) - .collect(), + per_piece_locks: if defer_writes { + (0..lengths.total_pieces()) + .map(|_| RwLock::new(())) + .collect() + } else { + vec![] + }, disk_work_tx, }); - state.spawn( - error_span!(parent: state.meta.span.clone(), "disk_writer"), - async move { - while let Some(work_item) = disk_work_rx.recv().await { - tokio::task::spawn_blocking(work_item.work); - } - Ok(()) - }, - ); + if defer_writes { + state.spawn( + error_span!(parent: state.meta.span.clone(), "disk_writer"), + async move { + while let Some(work_item) = disk_work_rx.recv().await { + tokio::task::spawn_blocking(work_item.work); + } + Ok(()) + }, + ); + } state.spawn( error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), @@ -309,6 +320,10 @@ impl TorrentStateLive { &self.up_speed_estimator } + fn defer_writes(&self) -> bool { + self.meta.options.defer_writes + } + pub(crate) fn add_incoming_peer( self: &Arc, checked_peer: CheckedIncomingConnection, @@ -1078,8 +1093,13 @@ impl PeerHandler { // heuristic for "too slow peer" if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold { - // If the piece is locked and someone is active writing to disk, don't steal it. - if let Some(_g) = self.state.per_piece_locks[idx.get_usize()].try_write() { + // If the piece is locked and someone is actively writing to disk, don't steal it. + if let Some(_g) = self + .state + .per_piece_locks + .get(idx.get_usize()) + .and_then(|l| l.try_write()) + { debug!( "will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}", idx, piece_req.peer, elapsed, my_avg_time @@ -1385,7 +1405,10 @@ impl PeerHandler { let ppl_guard = { let g = state.lock_read("check_steal"); - let ppl = state.per_piece_locks[piece.index as usize].read(); + let ppl = state + .per_piece_locks + .get(piece.index as usize) + .map(|l| l.read()); match g.inflight_pieces.get(&chunk_info.piece_index) { Some(InflightPiece { peer, .. }) if *peer == addr => {} @@ -1519,7 +1542,7 @@ impl PeerHandler { Ok(()) } - if self.state.meta().options.defer_writes { + if self.state.defer_writes() { // TODO: shove all this into one thing to .clone() once rather than 5 times. let state = self.state.clone(); let addr = self.addr;