Debugging
This commit is contained in:
parent
ca9afa8535
commit
640d2c31bc
2 changed files with 46 additions and 29 deletions
|
|
@ -109,6 +109,7 @@ use super::{
|
||||||
ManagedTorrentInfo,
|
ManagedTorrentInfo,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct InflightPiece {
|
struct InflightPiece {
|
||||||
peer: PeerHandle,
|
peer: PeerHandle,
|
||||||
started: Instant,
|
started: Instant,
|
||||||
|
|
@ -214,7 +215,7 @@ impl TorrentStateLive {
|
||||||
peers: Default::default(),
|
peers: Default::default(),
|
||||||
locked: RwLock::new(TorrentStateLocked {
|
locked: RwLock::new(TorrentStateLocked {
|
||||||
chunks: Some(paused.chunk_tracker),
|
chunks: Some(paused.chunk_tracker),
|
||||||
// TODO: move under per_piece_locks
|
// TODO: move under per_piece_locks?
|
||||||
inflight_pieces: Default::default(),
|
inflight_pieces: Default::default(),
|
||||||
file_priorities,
|
file_priorities,
|
||||||
fatal_errors_tx: Some(fatal_errors_tx),
|
fatal_errors_tx: Some(fatal_errors_tx),
|
||||||
|
|
@ -1051,14 +1052,20 @@ impl PeerHandler {
|
||||||
|
|
||||||
// heuristic for "too slow peer"
|
// heuristic for "too slow peer"
|
||||||
if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold {
|
if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold {
|
||||||
debug!(
|
// If the piece is locked and someone is active writing to disk, don't steal it.
|
||||||
"will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}",
|
if let Some(_g) = self.state.per_piece_locks[idx.get_usize()].try_write() {
|
||||||
idx, piece_req.peer, elapsed, my_avg_time
|
debug!(
|
||||||
);
|
"will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}",
|
||||||
let old = piece_req.peer;
|
idx, piece_req.peer, elapsed, my_avg_time
|
||||||
piece_req.peer = self.addr;
|
);
|
||||||
piece_req.started = Instant::now();
|
let old = piece_req.peer;
|
||||||
(*idx, old)
|
piece_req.peer = self.addr;
|
||||||
|
piece_req.started = Instant::now();
|
||||||
|
(*idx, old)
|
||||||
|
} else {
|
||||||
|
warn!(?idx, ?piece_req, "attempted to steal but peer was writing");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
@ -1349,8 +1356,10 @@ impl PeerHandler {
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let index = piece.index;
|
let index = piece.index;
|
||||||
|
|
||||||
let full_piece_download_time = {
|
let ppl_guard = {
|
||||||
let mut g = state.lock_write("mark_chunk_downloaded");
|
let g = state.lock_read("check_steal");
|
||||||
|
|
||||||
|
let ppl = state.per_piece_locks[piece.index as usize].read();
|
||||||
|
|
||||||
match g.inflight_pieces.get(&chunk_info.piece_index) {
|
match g.inflight_pieces.get(&chunk_info.piece_index) {
|
||||||
Some(InflightPiece { peer, .. }) if *peer == addr => {}
|
Some(InflightPiece { peer, .. }) if *peer == addr => {}
|
||||||
|
|
@ -1370,10 +1379,26 @@ impl PeerHandler {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
ppl
|
||||||
|
};
|
||||||
|
|
||||||
|
// While we hold per piece lock, noone can steal it.
|
||||||
|
// So we can proceed writing knowing that the piece is ours now and will still be by the time
|
||||||
|
// the write is finished.
|
||||||
|
match state.file_ops().write_chunk(addr, piece, chunk_info) {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(e) => {
|
||||||
|
error!("FATAL: error writing chunk to disk: {:?}", e);
|
||||||
|
return state.on_fatal_error(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let full_piece_download_time = {
|
||||||
|
let mut g = state.lock_write("mark_chunk_downloaded");
|
||||||
let chunk_marking_result = g.get_chunks_mut()?.mark_chunk_downloaded(piece);
|
let chunk_marking_result = g.get_chunks_mut()?.mark_chunk_downloaded(piece);
|
||||||
trace!(?piece, chunk_marking_result=?chunk_marking_result);
|
trace!(?piece, chunk_marking_result=?chunk_marking_result);
|
||||||
|
|
||||||
let full_piece_download_time = match chunk_marking_result {
|
match chunk_marking_result {
|
||||||
Some(ChunkMarkingResult::Completed) => {
|
Some(ChunkMarkingResult::Completed) => {
|
||||||
trace!("piece={} done, will write and checksum", piece.index);
|
trace!("piece={} done, will write and checksum", piece.index);
|
||||||
// This will prevent others from stealing it.
|
// This will prevent others from stealing it.
|
||||||
|
|
@ -1395,24 +1420,13 @@ impl PeerHandler {
|
||||||
piece
|
piece
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
// By this time we reach here, no other peer can request this piece. All others, even if they steal pieces would
|
|
||||||
// have fallen off above in one of the defensive checks.
|
|
||||||
|
|
||||||
// Not being able to write to storage is a fatal error. You need to unpause the
|
|
||||||
// torrent to recover from it.
|
|
||||||
match state.file_ops().write_chunk(addr, piece, chunk_info) {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(e) => {
|
|
||||||
error!("FATAL: error writing chunk to disk: {:?}", e);
|
|
||||||
return state.on_fatal_error(e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
full_piece_download_time
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// We don't care about per piece lock anymore, as it's removed from inflight pieces.
|
||||||
|
// It shouldn't impact perf anyway, but dropping just in case.
|
||||||
|
drop(ppl_guard);
|
||||||
|
|
||||||
let full_piece_download_time = match full_piece_download_time {
|
let full_piece_download_time = match full_piece_download_time {
|
||||||
Some(t) => t,
|
Some(t) => t,
|
||||||
None => return Ok(()),
|
None => return Ok(()),
|
||||||
|
|
|
||||||
|
|
@ -74,6 +74,7 @@ mod timed_existence {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let elapsed = self.started.elapsed();
|
let elapsed = self.started.elapsed();
|
||||||
let reason = self.reason;
|
let reason = self.reason;
|
||||||
|
tracing::trace!(name=%self.reason, ?elapsed, "dropping guard");
|
||||||
if elapsed > MAX {
|
if elapsed > MAX {
|
||||||
warn!("elapsed on lock {reason:?}: {elapsed:?}")
|
warn!("elapsed on lock {reason:?}: {elapsed:?}")
|
||||||
}
|
}
|
||||||
|
|
@ -96,10 +97,12 @@ mod timed_existence {
|
||||||
|
|
||||||
pub fn timeit<R>(name: impl std::fmt::Display, f: impl FnOnce() -> R) -> R {
|
pub fn timeit<R>(name: impl std::fmt::Display, f: impl FnOnce() -> R) -> R {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
tracing::trace!(%name, "starting");
|
||||||
let r = f();
|
let r = f();
|
||||||
|
tracing::trace!(%name, "done");
|
||||||
let elapsed = now.elapsed();
|
let elapsed = now.elapsed();
|
||||||
if elapsed > MAX {
|
if elapsed > MAX {
|
||||||
warn!("elapsed on \"{name:}\": {elapsed:?}")
|
warn!(%name, ?elapsed, max = ?MAX, "elapsed > MAX");
|
||||||
}
|
}
|
||||||
r
|
r
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue