Enable elapsed stats calculation and slow piece stealing

This commit is contained in:
Igor Katson 2021-06-30 00:32:38 +01:00
parent ccf19b7921
commit 1cb7a7bbc6
4 changed files with 103 additions and 29 deletions

View file

@ -143,6 +143,17 @@ impl ChunkTracker {
piece.index, chunk_info, chunk_range, piece.index, chunk_info, chunk_range,
); );
// TODO: remove me, it's for debugging
// {
// use std::io::Write;
// let mut f = std::fs::OpenOptions::new()
// .write(true)
// .create(true)
// .open("/tmp/chunks")
// .unwrap();
// write!(f, "{:?}", &self.have).unwrap();
// }
if chunk_range.all() { if chunk_range.all() {
return Some(ChunkMarkingResult::Completed); return Some(ChunkMarkingResult::Completed);
} }

View file

@ -403,27 +403,31 @@ impl PeerConnection {
None => return Ok(()), None => return Ok(()),
} }
let next = match self.state.reserve_next_needed_piece(handle) { let next = match self.state.try_steal_old_slow_piece(handle) {
Some(next) => next, Some(next) => next,
None => { None => match self.state.reserve_next_needed_piece(handle) {
if self.state.get_left_to_download() == 0 { Some(next) => next,
debug!("{}: nothing left to download, closing requester", handle); None => {
return Ok(()); if self.state.get_left_to_download() == 0 {
} debug!("{}: nothing left to download, closing requester", handle);
return Ok(());
if let Some(piece) = self.state.try_steal_piece(handle) { }
debug!("{}: stole a piece {}", handle, piece);
piece if let Some(piece) = self.state.try_steal_piece(handle) {
} else { debug!("{}: stole a piece {}", handle, piece);
debug!("no pieces to request from {}", handle); piece
#[allow(unused_must_use)] } else {
{ debug!("no pieces to request from {}", handle);
timeout(Duration::from_secs(60), notify.notified()).await; #[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), notify.notified()).await;
}
continue;
} }
continue;
} }
} },
}; };
let tx = match self.state.locked.read().peers.clone_tx(handle) { let tx = match self.state.locked.read().peers.clone_tx(handle) {
Some(tx) => tx, Some(tx) => tx,
None => return Ok(()), None => return Ok(()),
@ -508,15 +512,16 @@ impl PeerConnection {
); );
} }
let should_checksum = match g.chunks.mark_chunk_downloaded(&piece) { let full_piece_download_time = match g.chunks.mark_chunk_downloaded(&piece) {
Some(ChunkMarkingResult::Completed) => { Some(ChunkMarkingResult::Completed) => {
debug!( debug!(
"piece={} done by {}, will write and checksum", "piece={} done by {}, will write and checksum",
piece.index, handle piece.index, handle
); );
// This will prevent others from stealing it. // This will prevent others from stealing it.
g.peers.remove_inflight_piece(chunk_info.piece_index); g.peers
true .remove_inflight_piece(chunk_info.piece_index)
.map(|t| t.started.elapsed())
} }
Some(ChunkMarkingResult::PreviouslyCompleted) => { Some(ChunkMarkingResult::PreviouslyCompleted) => {
// TODO: we might need to send cancellations here. // TODO: we might need to send cancellations here.
@ -526,7 +531,7 @@ impl PeerConnection {
); );
return Ok(()); return Ok(());
} }
Some(ChunkMarkingResult::NotCompleted) => false, Some(ChunkMarkingResult::NotCompleted) => None,
None => { None => {
anyhow::bail!( anyhow::bail!(
"bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer",
@ -551,9 +556,10 @@ impl PeerConnection {
.write_chunk(handle, &piece, &chunk_info) .write_chunk(handle, &piece, &chunk_info)
.expect("expected to be able to write to disk"); .expect("expected to be able to write to disk");
if !should_checksum { let full_piece_download_time = match full_piece_download_time {
return Ok(()); Some(t) => t,
} None => return Ok(()),
};
match self match self
.state .state
@ -571,6 +577,18 @@ impl PeerConnection {
.stats .stats
.have .have
.fetch_add(piece_len, Ordering::Relaxed); .fetch_add(piece_len, Ordering::Relaxed);
self.state
.stats
.downloaded_pieces
.fetch_add(1, Ordering::Relaxed);
self.state
.stats
.downloaded_pieces
.fetch_add(1, Ordering::Relaxed);
self.state.stats.total_piece_download_ms.fetch_add(
full_piece_download_time.as_millis() as u64,
Ordering::Relaxed,
);
self.state self.state
.locked .locked
.write() .write()

View file

@ -179,9 +179,7 @@ impl TorrentManager {
files, files,
stats: AtomicStats { stats: AtomicStats {
have: AtomicU64::new(initial_check_results.have_bytes), have: AtomicU64::new(initial_check_results.have_bytes),
downloaded_and_checked: Default::default(), ..Default::default()
fetched_bytes: Default::default(),
uploaded: Default::default(),
}, },
needed: initial_check_results.needed_bytes, needed: initial_check_results.needed_bytes,
lengths, lengths,

View file

@ -6,11 +6,11 @@ use std::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
Arc, Arc,
}, },
time::Instant, time::{Duration, Instant},
}; };
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{stream::FuturesUnordered, StreamExt};
use log::{debug, trace, warn}; use log::{debug, info, trace, warn};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::mpsc::{channel, Sender};
@ -146,11 +146,26 @@ pub struct TorrentStateLocked {
pub chunks: ChunkTracker, pub chunks: ChunkTracker,
} }
#[derive(Default)]
pub struct AtomicStats { pub struct AtomicStats {
pub have: AtomicU64, pub have: AtomicU64,
pub downloaded_and_checked: AtomicU64, pub downloaded_and_checked: AtomicU64,
pub uploaded: AtomicU64, pub uploaded: AtomicU64,
pub fetched_bytes: AtomicU64, pub fetched_bytes: AtomicU64,
pub downloaded_pieces: AtomicU64,
pub total_piece_download_ms: AtomicU64,
}
impl AtomicStats {
pub fn average_piece_download_time(&self) -> Option<Duration> {
let d = self.downloaded_pieces.load(Ordering::Relaxed);
let t = self.total_piece_download_ms.load(Ordering::Relaxed);
if d == 0 {
return None;
}
Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64))
}
} }
pub struct TorrentState { pub struct TorrentState {
@ -222,6 +237,38 @@ impl TorrentState {
self.get_next_needed_piece(handle).is_some() self.get_next_needed_piece(handle).is_some()
} }
pub fn try_steal_old_slow_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
let total = self.stats.downloaded_pieces.load(Ordering::Relaxed);
// heuristic for not enough precision in average time
if total < 20 {
return None;
}
let avg_time = self.stats.average_piece_download_time()?;
let mut g = self.locked.write();
let (idx, elapsed, piece_req) = g
.peers
.inflight_pieces
.iter_mut()
// don't steal from myself
.filter(|(_, r)| r.peer != handle)
.map(|(p, r)| (p, r.started.elapsed(), r))
.max_by_key(|(_, e, _)| *e)?;
// heuristic for "too slow peer"
if elapsed > avg_time * 10 {
debug!(
"{} will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}",
handle, idx, piece_req.peer, elapsed, avg_time
);
piece_req.peer = handle;
piece_req.started = Instant::now();
return Some(*idx);
}
None
}
pub fn try_steal_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> { pub fn try_steal_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
use rand::seq::IteratorRandom; use rand::seq::IteratorRandom;