diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ff9a3a6..3024e7a 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -176,6 +176,8 @@ pub struct TorrentStateLive { up_speed_estimator: SpeedEstimator, cancellation_token: CancellationToken, + disk_work_queue_tx: tokio::sync::mpsc::Sender>, + pub(crate) streams: Arc, } @@ -207,6 +209,8 @@ impl TorrentStateLive { pri }; + let (disk_work_queue_tx, mut disk_work_queue_rx) = tokio::sync::mpsc::channel(2048); + let state = Arc::new(TorrentStateLive { meta: paused.info.clone(), peers: Default::default(), @@ -229,8 +233,21 @@ impl TorrentStateLive { up_speed_estimator, cancellation_token, streams: paused.streams, + disk_work_queue_tx, }); + if paused.info.options.defer_writes { + state.spawn( + error_span!(parent: state.meta.span.clone(), "disk_writer"), + async move { + while let Some(work_unit) = disk_work_queue_rx.recv().await { + tokio::runtime::Handle::current().spawn_blocking(work_unit); + } + Ok(()) + }, + ); + } + state.spawn( error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), { @@ -1482,6 +1499,7 @@ impl PeerHandler { let counters = self.counters.clone(); let piece = piece.clone_to_owned(); let tx = self.tx.clone(); + let disk_work_queue_tx = state.disk_work_queue_tx.clone(); let work = move || { if let Err(e) = write_to_disk( @@ -1495,7 +1513,7 @@ impl PeerHandler { let _ = tx.send(WriterRequest::Disconnect(Err(e))); } }; - tokio::runtime::Handle::current().spawn_blocking(work); + disk_work_queue_tx.blocking_send(Box::new(work))?; } else { self.state .meta