From fa557cedd9e85ae3f09b98b783aefc2ce306893f Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 1 May 2024 16:23:52 +0100 Subject: [PATCH] another attempt (sucks) --- crates/librqbit/src/torrent_state/live/mod.rs | 31 ++++++------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 0b252b7..535d123 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -176,7 +176,7 @@ pub struct TorrentStateLive { up_speed_estimator: SpeedEstimator, cancellation_token: CancellationToken, - disk_work_queue_tx: tokio::sync::mpsc::Sender>, + disk_work_queue_tx: std::sync::mpsc::SyncSender>, pub(crate) streams: Arc, } @@ -209,7 +209,7 @@ impl TorrentStateLive { pri }; - let (disk_work_queue_tx, mut disk_work_queue_rx) = tokio::sync::mpsc::channel(2048); + let (disk_work_queue_tx, mut disk_work_queue_rx) = std::sync::mpsc::sync_channel(2048); let state = Arc::new(TorrentStateLive { meta: paused.info.clone(), @@ -237,15 +237,11 @@ impl TorrentStateLive { }); if paused.info.options.defer_writes { - state.spawn( - error_span!(parent: state.meta.span.clone(), "disk_writer"), - async move { - while let Some(work_unit) = disk_work_queue_rx.recv().await { - tokio::runtime::Handle::current().spawn_blocking(work_unit); - } - Ok(()) - }, - ); + tokio::runtime::Handle::current().spawn_blocking(move || { + while let Ok(work_unit) = disk_work_queue_rx.recv() { + work_unit(); + } + }); } state.spawn( @@ -1395,11 +1391,10 @@ impl PeerHandler { self.state .stats .fetched_bytes - .fetch_add(piece.block.len() as u64, Ordering::Relaxed); + .fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed); // By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would // have fallen off above in one of the defensive checks. - // fn write_to_disk( state: &TorrentStateLive, @@ -1421,12 +1416,6 @@ impl PeerHandler { } } - // Global chunk/byte counters. - state - .stats - .fetched_bytes - .fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed); - let full_piece_download_time = match full_piece_download_time { Some(t) => t, None => return Ok(()), @@ -1513,9 +1502,7 @@ impl PeerHandler { let _ = tx.send(WriterRequest::Disconnect(Err(e))); } }; - self.state.spawn(error_span!("disk_work"), async move { - Ok(disk_work_queue_tx.send(Box::new(work)).await?) - }); + disk_work_queue_tx.send(Box::new(work))?; } else { self.state .meta