another attempt (sucks)

This commit is contained in:
Igor Katson 2024-05-01 16:23:52 +01:00
parent 7e71e4188f
commit fa557cedd9

View file

@ -176,7 +176,7 @@ pub struct TorrentStateLive {
up_speed_estimator: SpeedEstimator, up_speed_estimator: SpeedEstimator,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
disk_work_queue_tx: tokio::sync::mpsc::Sender<Box<dyn FnOnce() + Send + Sync>>, disk_work_queue_tx: std::sync::mpsc::SyncSender<Box<dyn FnOnce() + Send + Sync>>,
pub(crate) streams: Arc<TorrentStreams>, pub(crate) streams: Arc<TorrentStreams>,
} }
@ -209,7 +209,7 @@ impl TorrentStateLive {
pri pri
}; };
let (disk_work_queue_tx, mut disk_work_queue_rx) = tokio::sync::mpsc::channel(2048); let (disk_work_queue_tx, mut disk_work_queue_rx) = std::sync::mpsc::sync_channel(2048);
let state = Arc::new(TorrentStateLive { let state = Arc::new(TorrentStateLive {
meta: paused.info.clone(), meta: paused.info.clone(),
@ -237,15 +237,11 @@ impl TorrentStateLive {
}); });
if paused.info.options.defer_writes { if paused.info.options.defer_writes {
state.spawn( tokio::runtime::Handle::current().spawn_blocking(move || {
error_span!(parent: state.meta.span.clone(), "disk_writer"), while let Ok(work_unit) = disk_work_queue_rx.recv() {
async move { work_unit();
while let Some(work_unit) = disk_work_queue_rx.recv().await { }
tokio::runtime::Handle::current().spawn_blocking(work_unit); });
}
Ok(())
},
);
} }
state.spawn( state.spawn(
@ -1395,11 +1391,10 @@ impl PeerHandler {
self.state self.state
.stats .stats
.fetched_bytes .fetched_bytes
.fetch_add(piece.block.len() as u64, Ordering::Relaxed); .fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed);
// By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would // By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would
// have fallen off above in one of the defensive checks. // have fallen off above in one of the defensive checks.
//
fn write_to_disk( fn write_to_disk(
state: &TorrentStateLive, state: &TorrentStateLive,
@ -1421,12 +1416,6 @@ impl PeerHandler {
} }
} }
// Global chunk/byte counters.
state
.stats
.fetched_bytes
.fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed);
let full_piece_download_time = match full_piece_download_time { let full_piece_download_time = match full_piece_download_time {
Some(t) => t, Some(t) => t,
None => return Ok(()), None => return Ok(()),
@ -1513,9 +1502,7 @@ impl PeerHandler {
let _ = tx.send(WriterRequest::Disconnect(Err(e))); let _ = tx.send(WriterRequest::Disconnect(Err(e)));
} }
}; };
self.state.spawn(error_span!("disk_work"), async move { disk_work_queue_tx.send(Box::new(work))?;
Ok(disk_work_queue_tx.send(Box::new(work)).await?)
});
} else { } else {
self.state self.state
.meta .meta