Fix the ordering bug in deferred writes, but its a slow fix
This commit is contained in:
parent
438392da1d
commit
e53f5a7f23
3 changed files with 75 additions and 79 deletions
|
|
@ -121,6 +121,7 @@ fn compute_queued_pieces(have_pieces: &BF, selected_pieces: &BF) -> anyhow::Resu
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum ChunkMarkingResult {
|
pub enum ChunkMarkingResult {
|
||||||
PreviouslyCompleted,
|
PreviouslyCompleted,
|
||||||
NotCompleted,
|
NotCompleted,
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ macro_rules! timeit {
|
||||||
let start = std::time::Instant::now();
|
let start = std::time::Instant::now();
|
||||||
let r = $op;
|
let r = $op;
|
||||||
let elapsed = start.elapsed();
|
let elapsed = start.elapsed();
|
||||||
tracing::debug!(name = $name, $($rest),*, elapsed_micros=elapsed.as_micros(), "timeit");
|
tracing::debug!(name = $name, $($rest),*, elapsed_micros=elapsed.as_micros());
|
||||||
r
|
r
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -1328,80 +1328,84 @@ impl PeerHandler {
|
||||||
})
|
})
|
||||||
.context("peer not found")??;
|
.context("peer not found")??;
|
||||||
|
|
||||||
let full_piece_download_time = {
|
|
||||||
let mut g = self.state.lock_write("mark_chunk_downloaded");
|
|
||||||
|
|
||||||
match g.inflight_pieces.get(&chunk_info.piece_index) {
|
|
||||||
Some(InflightPiece { peer, .. }) if *peer == self.addr => {}
|
|
||||||
Some(InflightPiece { peer, .. }) => {
|
|
||||||
debug!(
|
|
||||||
"in-flight piece {} was stolen by {}, ignoring",
|
|
||||||
chunk_info.piece_index, peer
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
debug!(
|
|
||||||
"in-flight piece {} not found. it was probably completed by someone else",
|
|
||||||
chunk_info.piece_index
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match g.get_chunks_mut()?.mark_chunk_downloaded(&piece) {
|
|
||||||
Some(ChunkMarkingResult::Completed) => {
|
|
||||||
trace!("piece={} done, will write and checksum", piece.index,);
|
|
||||||
// This will prevent others from stealing it.
|
|
||||||
{
|
|
||||||
let piece = chunk_info.piece_index;
|
|
||||||
g.inflight_pieces.remove(&piece)
|
|
||||||
}
|
|
||||||
.map(|t| t.started.elapsed())
|
|
||||||
}
|
|
||||||
Some(ChunkMarkingResult::PreviouslyCompleted) => {
|
|
||||||
// TODO: we might need to send cancellations here.
|
|
||||||
debug!("piece={} was done by someone else, ignoring", piece.index,);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
Some(ChunkMarkingResult::NotCompleted) => None,
|
|
||||||
None => {
|
|
||||||
anyhow::bail!(
|
|
||||||
"bogus data received: {:?}, cannot map this to a chunk, dropping peer",
|
|
||||||
piece
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// This one is used to calculate download speed.
|
// This one is used to calculate download speed.
|
||||||
self.state
|
self.state
|
||||||
.stats
|
.stats
|
||||||
.fetched_bytes
|
.fetched_bytes
|
||||||
.fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed);
|
.fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed);
|
||||||
|
|
||||||
// By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would
|
|
||||||
// have fallen off above in one of the defensive checks.
|
|
||||||
|
|
||||||
fn write_to_disk(
|
fn write_to_disk(
|
||||||
state: &TorrentStateLive,
|
state: &TorrentStateLive,
|
||||||
addr: PeerHandle,
|
addr: PeerHandle,
|
||||||
counters: &AtomicPeerCounters,
|
counters: &AtomicPeerCounters,
|
||||||
piece: &Piece<impl AsRef<[u8]>>,
|
piece: &Piece<impl AsRef<[u8]> + std::fmt::Debug>,
|
||||||
chunk_info: &ChunkInfo,
|
chunk_info: &ChunkInfo,
|
||||||
full_piece_download_time: Option<Duration>,
|
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let index = piece.index;
|
let index = piece.index;
|
||||||
|
|
||||||
// Not being able to write to storage is a fatal error. You need to unpause the
|
let full_piece_download_time = {
|
||||||
// torrent to recover from it.
|
let mut g = state.lock_write("mark_chunk_downloaded");
|
||||||
match state.file_ops().write_chunk(addr, piece, chunk_info) {
|
|
||||||
Ok(()) => {}
|
match g.inflight_pieces.get(&chunk_info.piece_index) {
|
||||||
Err(e) => {
|
Some(InflightPiece { peer, .. }) if *peer == addr => {}
|
||||||
error!("FATAL: error writing chunk to disk: {:?}", e);
|
Some(InflightPiece { peer, .. }) => {
|
||||||
return state.on_fatal_error(e);
|
debug!(
|
||||||
}
|
"in-flight piece {} was stolen by {}, ignoring",
|
||||||
}
|
chunk_info.piece_index, peer
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
debug!(
|
||||||
|
"in-flight piece {} not found. it was probably completed by someone else",
|
||||||
|
chunk_info.piece_index
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let chunk_marking_result = g.get_chunks_mut()?.mark_chunk_downloaded(piece);
|
||||||
|
trace!(?piece, chunk_marking_result=?chunk_marking_result);
|
||||||
|
|
||||||
|
let full_piece_download_time = match chunk_marking_result {
|
||||||
|
Some(ChunkMarkingResult::Completed) => {
|
||||||
|
trace!("piece={} done, will write and checksum", piece.index);
|
||||||
|
// This will prevent others from stealing it.
|
||||||
|
{
|
||||||
|
let piece = chunk_info.piece_index;
|
||||||
|
g.inflight_pieces.remove(&piece)
|
||||||
|
}
|
||||||
|
.map(|t| t.started.elapsed())
|
||||||
|
}
|
||||||
|
Some(ChunkMarkingResult::PreviouslyCompleted) => {
|
||||||
|
// TODO: we might need to send cancellations here.
|
||||||
|
debug!("piece={} was done by someone else, ignoring", piece.index);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Some(ChunkMarkingResult::NotCompleted) => None,
|
||||||
|
None => {
|
||||||
|
anyhow::bail!(
|
||||||
|
"bogus data received: {:?}, cannot map this to a chunk, dropping peer",
|
||||||
|
piece
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// By this time we reach here, no other peer can request this piece. All others, even if they steal pieces would
|
||||||
|
// have fallen off above in one of the defensive checks.
|
||||||
|
|
||||||
|
// Not being able to write to storage is a fatal error. You need to unpause the
|
||||||
|
// torrent to recover from it.
|
||||||
|
match state.file_ops().write_chunk(addr, piece, chunk_info) {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(e) => {
|
||||||
|
error!("FATAL: error writing chunk to disk: {:?}", e);
|
||||||
|
return state.on_fatal_error(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
full_piece_download_time
|
||||||
|
};
|
||||||
|
|
||||||
let full_piece_download_time = match full_piece_download_time {
|
let full_piece_download_time = match full_piece_download_time {
|
||||||
Some(t) => t,
|
Some(t) => t,
|
||||||
|
|
@ -1470,23 +1474,21 @@ impl PeerHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.state.meta().options.defer_writes {
|
if self.state.meta().options.defer_writes {
|
||||||
|
// TODO: shove all this into one thing to .clone() once rather than 5 times.
|
||||||
let state = self.state.clone();
|
let state = self.state.clone();
|
||||||
let addr = self.addr;
|
let addr = self.addr;
|
||||||
let counters = self.counters.clone();
|
let counters = self.counters.clone();
|
||||||
let piece = piece.clone_to_owned();
|
let piece = piece.clone_to_owned();
|
||||||
let tx = self.tx.clone();
|
let tx = self.tx.clone();
|
||||||
|
|
||||||
|
let span = tracing::error_span!("deferred_write");
|
||||||
|
|
||||||
let work = move || {
|
let work = move || {
|
||||||
if let Err(e) = write_to_disk(
|
span.in_scope(|| {
|
||||||
&state,
|
if let Err(e) = write_to_disk(&state, addr, &counters, &piece, &chunk_info) {
|
||||||
addr,
|
let _ = tx.send(WriterRequest::Disconnect(Err(e)));
|
||||||
&counters,
|
}
|
||||||
&piece,
|
})
|
||||||
&chunk_info,
|
|
||||||
full_piece_download_time,
|
|
||||||
) {
|
|
||||||
let _ = tx.send(WriterRequest::Disconnect(Err(e)));
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
tokio::task::spawn_blocking(work);
|
tokio::task::spawn_blocking(work);
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -1494,14 +1496,7 @@ impl PeerHandler {
|
||||||
.meta
|
.meta
|
||||||
.spawner
|
.spawner
|
||||||
.spawn_block_in_place(|| {
|
.spawn_block_in_place(|| {
|
||||||
write_to_disk(
|
write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info)
|
||||||
&self.state,
|
|
||||||
self.addr,
|
|
||||||
&self.counters,
|
|
||||||
&piece,
|
|
||||||
&chunk_info,
|
|
||||||
full_piece_download_time,
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
.with_context(|| format!("error processing received chunk {chunk_info:?}"))?;
|
.with_context(|| format!("error processing received chunk {chunk_info:?}"))?;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue