diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index b2f5d9d..40104ae 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -143,6 +143,17 @@ impl ChunkTracker { piece.index, chunk_info, chunk_range, ); + // TODO: remove me, it's for debugging + // { + // use std::io::Write; + // let mut f = std::fs::OpenOptions::new() + // .write(true) + // .create(true) + // .open("/tmp/chunks") + // .unwrap(); + // write!(f, "{:?}", &self.have).unwrap(); + // } + if chunk_range.all() { return Some(ChunkMarkingResult::Completed); } diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 908b062..34f0758 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -403,27 +403,31 @@ impl PeerConnection { None => return Ok(()), } - let next = match self.state.reserve_next_needed_piece(handle) { + let next = match self.state.try_steal_old_slow_piece(handle) { Some(next) => next, - None => { - if self.state.get_left_to_download() == 0 { - debug!("{}: nothing left to download, closing requester", handle); - return Ok(()); - } - - if let Some(piece) = self.state.try_steal_piece(handle) { - debug!("{}: stole a piece {}", handle, piece); - piece - } else { - debug!("no pieces to request from {}", handle); - #[allow(unused_must_use)] - { - timeout(Duration::from_secs(60), notify.notified()).await; + None => match self.state.reserve_next_needed_piece(handle) { + Some(next) => next, + None => { + if self.state.get_left_to_download() == 0 { + debug!("{}: nothing left to download, closing requester", handle); + return Ok(()); + } + + if let Some(piece) = self.state.try_steal_piece(handle) { + debug!("{}: stole a piece {}", handle, piece); + piece + } else { + debug!("no pieces to request from {}", handle); + #[allow(unused_must_use)] + { + timeout(Duration::from_secs(60), notify.notified()).await; + } + continue; } - continue; } - } + }, }; + let tx = match self.state.locked.read().peers.clone_tx(handle) { Some(tx) => tx, None => return Ok(()), @@ -508,15 +512,16 @@ impl PeerConnection { ); } - let should_checksum = match g.chunks.mark_chunk_downloaded(&piece) { + let full_piece_download_time = match g.chunks.mark_chunk_downloaded(&piece) { Some(ChunkMarkingResult::Completed) => { debug!( "piece={} done by {}, will write and checksum", piece.index, handle ); // This will prevent others from stealing it. - g.peers.remove_inflight_piece(chunk_info.piece_index); - true + g.peers + .remove_inflight_piece(chunk_info.piece_index) + .map(|t| t.started.elapsed()) } Some(ChunkMarkingResult::PreviouslyCompleted) => { // TODO: we might need to send cancellations here. @@ -526,7 +531,7 @@ impl PeerConnection { ); return Ok(()); } - Some(ChunkMarkingResult::NotCompleted) => false, + Some(ChunkMarkingResult::NotCompleted) => None, None => { anyhow::bail!( "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", @@ -551,9 +556,10 @@ impl PeerConnection { .write_chunk(handle, &piece, &chunk_info) .expect("expected to be able to write to disk"); - if !should_checksum { - return Ok(()); - } + let full_piece_download_time = match full_piece_download_time { + Some(t) => t, + None => return Ok(()), + }; match self .state @@ -571,6 +577,18 @@ impl PeerConnection { .stats .have .fetch_add(piece_len, Ordering::Relaxed); + self.state + .stats + .downloaded_pieces + .fetch_add(1, Ordering::Relaxed); + self.state + .stats + .downloaded_pieces + .fetch_add(1, Ordering::Relaxed); + self.state.stats.total_piece_download_ms.fetch_add( + full_piece_download_time.as_millis() as u64, + Ordering::Relaxed, + ); self.state .locked .write() diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 86a29dd..b49481d 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -179,9 +179,7 @@ impl TorrentManager { files, stats: AtomicStats { have: AtomicU64::new(initial_check_results.have_bytes), - downloaded_and_checked: Default::default(), - fetched_bytes: Default::default(), - uploaded: Default::default(), + ..Default::default() }, needed: initial_check_results.needed_bytes, lengths, diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 3105a99..0e51caa 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -6,11 +6,11 @@ use std::{ atomic::{AtomicU64, Ordering}, Arc, }, - time::Instant, + time::{Duration, Instant}, }; use futures::{stream::FuturesUnordered, StreamExt}; -use log::{debug, trace, warn}; +use log::{debug, info, trace, warn}; use parking_lot::{Mutex, RwLock}; use tokio::sync::mpsc::{channel, Sender}; @@ -146,11 +146,26 @@ pub struct TorrentStateLocked { pub chunks: ChunkTracker, } +#[derive(Default)] pub struct AtomicStats { pub have: AtomicU64, pub downloaded_and_checked: AtomicU64, pub uploaded: AtomicU64, pub fetched_bytes: AtomicU64, + + pub downloaded_pieces: AtomicU64, + pub total_piece_download_ms: AtomicU64, +} + +impl AtomicStats { + pub fn average_piece_download_time(&self) -> Option { + let d = self.downloaded_pieces.load(Ordering::Relaxed); + let t = self.total_piece_download_ms.load(Ordering::Relaxed); + if d == 0 { + return None; + } + Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64)) + } } pub struct TorrentState { @@ -222,6 +237,38 @@ impl TorrentState { self.get_next_needed_piece(handle).is_some() } + pub fn try_steal_old_slow_piece(&self, handle: PeerHandle) -> Option { + let total = self.stats.downloaded_pieces.load(Ordering::Relaxed); + + // heuristic for not enough precision in average time + if total < 20 { + return None; + } + let avg_time = self.stats.average_piece_download_time()?; + + let mut g = self.locked.write(); + let (idx, elapsed, piece_req) = g + .peers + .inflight_pieces + .iter_mut() + // don't steal from myself + .filter(|(_, r)| r.peer != handle) + .map(|(p, r)| (p, r.started.elapsed(), r)) + .max_by_key(|(_, e, _)| *e)?; + + // heuristic for "too slow peer" + if elapsed > avg_time * 10 { + debug!( + "{} will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}", + handle, idx, piece_req.peer, elapsed, avg_time + ); + piece_req.peer = handle; + piece_req.started = Instant::now(); + return Some(*idx); + } + None + } + pub fn try_steal_piece(&self, handle: PeerHandle) -> Option { let mut rng = rand::thread_rng(); use rand::seq::IteratorRandom;