Less work when defer writes is disabled

This commit is contained in:
Igor Katson 2024-05-03 13:28:31 +01:00
parent 573185bcfa
commit 43008f2857

View file

@ -217,9 +217,14 @@ impl TorrentStateLive {
pri pri
}; };
let defer_writes = paused.info.options.defer_writes;
// 8MB per torrent of disk buffering. // 8MB per torrent of disk buffering.
let (disk_work_tx, mut disk_work_rx) = let (disk_work_tx, mut disk_work_rx) = tokio::sync::mpsc::channel(if defer_writes {
tokio::sync::mpsc::channel(8 * 1024 * 1024 / CHUNK_SIZE as usize); 8 * 1024 * 1024 / CHUNK_SIZE as usize
} else {
1
});
let state = Arc::new(TorrentStateLive { let state = Arc::new(TorrentStateLive {
meta: paused.info.clone(), meta: paused.info.clone(),
@ -244,21 +249,27 @@ impl TorrentStateLive {
up_speed_estimator, up_speed_estimator,
cancellation_token, cancellation_token,
streams: paused.streams, streams: paused.streams,
per_piece_locks: (0..lengths.total_pieces()) per_piece_locks: if defer_writes {
.map(|_| RwLock::new(())) (0..lengths.total_pieces())
.collect(), .map(|_| RwLock::new(()))
.collect()
} else {
vec![]
},
disk_work_tx, disk_work_tx,
}); });
state.spawn( if defer_writes {
error_span!(parent: state.meta.span.clone(), "disk_writer"), state.spawn(
async move { error_span!(parent: state.meta.span.clone(), "disk_writer"),
while let Some(work_item) = disk_work_rx.recv().await { async move {
tokio::task::spawn_blocking(work_item.work); while let Some(work_item) = disk_work_rx.recv().await {
} tokio::task::spawn_blocking(work_item.work);
Ok(()) }
}, Ok(())
); },
);
}
state.spawn( state.spawn(
error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"),
@ -309,6 +320,10 @@ impl TorrentStateLive {
&self.up_speed_estimator &self.up_speed_estimator
} }
fn defer_writes(&self) -> bool {
self.meta.options.defer_writes
}
pub(crate) fn add_incoming_peer( pub(crate) fn add_incoming_peer(
self: &Arc<Self>, self: &Arc<Self>,
checked_peer: CheckedIncomingConnection, checked_peer: CheckedIncomingConnection,
@ -1078,8 +1093,13 @@ impl PeerHandler {
// heuristic for "too slow peer" // heuristic for "too slow peer"
if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold { if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold {
// If the piece is locked and someone is active writing to disk, don't steal it. // If the piece is locked and someone is actively writing to disk, don't steal it.
if let Some(_g) = self.state.per_piece_locks[idx.get_usize()].try_write() { if let Some(_g) = self
.state
.per_piece_locks
.get(idx.get_usize())
.and_then(|l| l.try_write())
{
debug!( debug!(
"will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}", "will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}",
idx, piece_req.peer, elapsed, my_avg_time idx, piece_req.peer, elapsed, my_avg_time
@ -1385,7 +1405,10 @@ impl PeerHandler {
let ppl_guard = { let ppl_guard = {
let g = state.lock_read("check_steal"); let g = state.lock_read("check_steal");
let ppl = state.per_piece_locks[piece.index as usize].read(); let ppl = state
.per_piece_locks
.get(piece.index as usize)
.map(|l| l.read());
match g.inflight_pieces.get(&chunk_info.piece_index) { match g.inflight_pieces.get(&chunk_info.piece_index) {
Some(InflightPiece { peer, .. }) if *peer == addr => {} Some(InflightPiece { peer, .. }) if *peer == addr => {}
@ -1519,7 +1542,7 @@ impl PeerHandler {
Ok(()) Ok(())
} }
if self.state.meta().options.defer_writes { if self.state.defer_writes() {
// TODO: shove all this into one thing to .clone() once rather than 5 times. // TODO: shove all this into one thing to .clone() once rather than 5 times.
let state = self.state.clone(); let state = self.state.clone();
let addr = self.addr; let addr = self.addr;