Another disk indirectio attempt

This commit is contained in:
Igor Katson 2024-05-01 15:37:22 +01:00
parent d25309e358
commit d03943db8d

View file

@ -176,6 +176,8 @@ pub struct TorrentStateLive {
up_speed_estimator: SpeedEstimator,
cancellation_token: CancellationToken,
disk_work_queue_tx: tokio::sync::mpsc::Sender<Box<dyn FnOnce() + Send + Sync>>,
pub(crate) streams: Arc<TorrentStreams>,
}
@ -207,6 +209,8 @@ impl TorrentStateLive {
pri
};
let (disk_work_queue_tx, mut disk_work_queue_rx) = tokio::sync::mpsc::channel(2048);
let state = Arc::new(TorrentStateLive {
meta: paused.info.clone(),
peers: Default::default(),
@ -229,8 +233,21 @@ impl TorrentStateLive {
up_speed_estimator,
cancellation_token,
streams: paused.streams,
disk_work_queue_tx,
});
if paused.info.options.defer_writes {
state.spawn(
error_span!(parent: state.meta.span.clone(), "disk_writer"),
async move {
while let Some(work_unit) = disk_work_queue_rx.recv().await {
tokio::runtime::Handle::current().spawn_blocking(work_unit);
}
Ok(())
},
);
}
state.spawn(
error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"),
{
@ -1482,6 +1499,7 @@ impl PeerHandler {
let counters = self.counters.clone();
let piece = piece.clone_to_owned();
let tx = self.tx.clone();
let disk_work_queue_tx = state.disk_work_queue_tx.clone();
let work = move || {
if let Err(e) = write_to_disk(
@ -1495,7 +1513,7 @@ impl PeerHandler {
let _ = tx.send(WriterRequest::Disconnect(Err(e)));
}
};
tokio::runtime::Handle::current().spawn_blocking(work);
disk_work_queue_tx.blocking_send(Box::new(work))?;
} else {
self.state
.meta