From e53f5a7f2337adf8da15a17bb3dc796b48b3ddb4 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 3 May 2024 09:45:32 +0100 Subject: [PATCH] Fix the ordering bug in deferred writes, but its a slow fix --- crates/librqbit/src/chunk_tracker.rs | 1 + .../librqbit/src/storage/middleware/timing.rs | 2 +- crates/librqbit/src/torrent_state/live/mod.rs | 151 +++++++++--------- 3 files changed, 75 insertions(+), 79 deletions(-) diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 794febd..254a831 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -121,6 +121,7 @@ fn compute_queued_pieces(have_pieces: &BF, selected_pieces: &BF) -> anyhow::Resu )) } +#[derive(Debug)] pub enum ChunkMarkingResult { PreviouslyCompleted, NotCompleted, diff --git a/crates/librqbit/src/storage/middleware/timing.rs b/crates/librqbit/src/storage/middleware/timing.rs index 081a83b..8419694 100644 --- a/crates/librqbit/src/storage/middleware/timing.rs +++ b/crates/librqbit/src/storage/middleware/timing.rs @@ -49,7 +49,7 @@ macro_rules! timeit { let start = std::time::Instant::now(); let r = $op; let elapsed = start.elapsed(); - tracing::debug!(name = $name, $($rest),*, elapsed_micros=elapsed.as_micros(), "timeit"); + tracing::debug!(name = $name, $($rest),*, elapsed_micros=elapsed.as_micros()); r } }; diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 2cffbcc..21289a6 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1328,80 +1328,84 @@ impl PeerHandler { }) .context("peer not found")??; - let full_piece_download_time = { - let mut g = self.state.lock_write("mark_chunk_downloaded"); - - match g.inflight_pieces.get(&chunk_info.piece_index) { - Some(InflightPiece { peer, .. }) if *peer == self.addr => {} - Some(InflightPiece { peer, .. }) => { - debug!( - "in-flight piece {} was stolen by {}, ignoring", - chunk_info.piece_index, peer - ); - return Ok(()); - } - None => { - debug!( - "in-flight piece {} not found. it was probably completed by someone else", - chunk_info.piece_index - ); - return Ok(()); - } - }; - - match g.get_chunks_mut()?.mark_chunk_downloaded(&piece) { - Some(ChunkMarkingResult::Completed) => { - trace!("piece={} done, will write and checksum", piece.index,); - // This will prevent others from stealing it. - { - let piece = chunk_info.piece_index; - g.inflight_pieces.remove(&piece) - } - .map(|t| t.started.elapsed()) - } - Some(ChunkMarkingResult::PreviouslyCompleted) => { - // TODO: we might need to send cancellations here. - debug!("piece={} was done by someone else, ignoring", piece.index,); - return Ok(()); - } - Some(ChunkMarkingResult::NotCompleted) => None, - None => { - anyhow::bail!( - "bogus data received: {:?}, cannot map this to a chunk, dropping peer", - piece - ); - } - } - }; - // This one is used to calculate download speed. self.state .stats .fetched_bytes .fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed); - // By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would - // have fallen off above in one of the defensive checks. - fn write_to_disk( state: &TorrentStateLive, addr: PeerHandle, counters: &AtomicPeerCounters, - piece: &Piece>, + piece: &Piece + std::fmt::Debug>, chunk_info: &ChunkInfo, - full_piece_download_time: Option, ) -> anyhow::Result<()> { let index = piece.index; - // Not being able to write to storage is a fatal error. You need to unpause the - // torrent to recover from it. - match state.file_ops().write_chunk(addr, piece, chunk_info) { - Ok(()) => {} - Err(e) => { - error!("FATAL: error writing chunk to disk: {:?}", e); - return state.on_fatal_error(e); - } - } + let full_piece_download_time = { + let mut g = state.lock_write("mark_chunk_downloaded"); + + match g.inflight_pieces.get(&chunk_info.piece_index) { + Some(InflightPiece { peer, .. }) if *peer == addr => {} + Some(InflightPiece { peer, .. }) => { + debug!( + "in-flight piece {} was stolen by {}, ignoring", + chunk_info.piece_index, peer + ); + return Ok(()); + } + None => { + debug!( + "in-flight piece {} not found. it was probably completed by someone else", + chunk_info.piece_index + ); + return Ok(()); + } + }; + + let chunk_marking_result = g.get_chunks_mut()?.mark_chunk_downloaded(piece); + trace!(?piece, chunk_marking_result=?chunk_marking_result); + + let full_piece_download_time = match chunk_marking_result { + Some(ChunkMarkingResult::Completed) => { + trace!("piece={} done, will write and checksum", piece.index); + // This will prevent others from stealing it. + { + let piece = chunk_info.piece_index; + g.inflight_pieces.remove(&piece) + } + .map(|t| t.started.elapsed()) + } + Some(ChunkMarkingResult::PreviouslyCompleted) => { + // TODO: we might need to send cancellations here. + debug!("piece={} was done by someone else, ignoring", piece.index); + return Ok(()); + } + Some(ChunkMarkingResult::NotCompleted) => None, + None => { + anyhow::bail!( + "bogus data received: {:?}, cannot map this to a chunk, dropping peer", + piece + ); + } + }; + + // By this time we reach here, no other peer can request this piece. All others, even if they steal pieces would + // have fallen off above in one of the defensive checks. + + // Not being able to write to storage is a fatal error. You need to unpause the + // torrent to recover from it. + match state.file_ops().write_chunk(addr, piece, chunk_info) { + Ok(()) => {} + Err(e) => { + error!("FATAL: error writing chunk to disk: {:?}", e); + return state.on_fatal_error(e); + } + }; + + full_piece_download_time + }; let full_piece_download_time = match full_piece_download_time { Some(t) => t, @@ -1470,23 +1474,21 @@ impl PeerHandler { } if self.state.meta().options.defer_writes { + // TODO: shove all this into one thing to .clone() once rather than 5 times. let state = self.state.clone(); let addr = self.addr; let counters = self.counters.clone(); let piece = piece.clone_to_owned(); let tx = self.tx.clone(); + let span = tracing::error_span!("deferred_write"); + let work = move || { - if let Err(e) = write_to_disk( - &state, - addr, - &counters, - &piece, - &chunk_info, - full_piece_download_time, - ) { - let _ = tx.send(WriterRequest::Disconnect(Err(e))); - } + span.in_scope(|| { + if let Err(e) = write_to_disk(&state, addr, &counters, &piece, &chunk_info) { + let _ = tx.send(WriterRequest::Disconnect(Err(e))); + } + }) }; tokio::task::spawn_blocking(work); } else { @@ -1494,14 +1496,7 @@ impl PeerHandler { .meta .spawner .spawn_block_in_place(|| { - write_to_disk( - &self.state, - self.addr, - &self.counters, - &piece, - &chunk_info, - full_piece_download_time, - ) + write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info) }) .with_context(|| format!("error processing received chunk {chunk_info:?}"))?; }