diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index c439df2..72a7db5 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1011,10 +1011,15 @@ impl PeerHandler { } } + let n_opt = match n_opt { + Some(n_opt) => n_opt, + None => return Ok(None), + }; + self.state .lengths - .validate_piece_index(n_opt.context("invalid n_opt")? as u32) - .context("invalid piece")? + .validate_piece_index(n_opt as u32) + .context("bug: invalid piece")? }; g.inflight_pieces.insert( n, @@ -1030,18 +1035,15 @@ impl PeerHandler { .map(|r| r.flatten()) } + /// Try to steal a piece from a slower peer. Threshold is + /// "how many times is my average download speed faster to be able to steal". + /// + /// If this returns, an existing in-flight piece was marked to be ours. fn try_steal_old_slow_piece(&self, threshold: f64) -> Option { - let total = self - .state - .stats - .downloaded_and_checked_pieces - .load(Ordering::Acquire); - - // heuristic for not enough precision in average time - if total < 20 { - return None; - } - let avg_time = self.state.stats.average_piece_download_time()?; + let my_avg_time = match self.counters.average_piece_download_time() { + Some(t) => t, + None => return None, + }; let mut g = self.state.lock_write("try_steal_old_slow_piece"); let (idx, elapsed, piece_req) = g @@ -1053,10 +1055,10 @@ impl PeerHandler { .max_by_key(|(_, e, _)| *e)?; // heuristic for "too slow peer" - if elapsed.as_secs_f64() > avg_time.as_secs_f64() * threshold { + if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold { debug!( - "will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}", - idx, piece_req.peer, elapsed, avg_time + "will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}", + idx, piece_req.peer, elapsed, my_avg_time ); piece_req.peer = self.addr; piece_req.started = Instant::now(); @@ -1217,8 +1219,8 @@ impl PeerHandler { // Afterwards means we are close to completion, try stealing more aggressively. let next = match self .try_steal_old_slow_piece(10.) - .or_else(|| self.reserve_next_needed_piece().ok().flatten()) - .or_else(|| self.try_steal_old_slow_piece(2.)) + .map_or_else(|| self.reserve_next_needed_piece(), |v| Ok(Some(v)))? + .or_else(|| self.try_steal_old_slow_piece(3.)) { Some(next) => next, None => { @@ -1469,17 +1471,12 @@ impl PeerHandler { .fetch_add(piece_len, Ordering::Relaxed); self.state.stats.total_piece_download_ms.fetch_add( full_piece_download_time.as_millis() as u64, - Ordering::Release, + Ordering::Relaxed, ); // Per-peer piece counters. self.counters - .downloaded_and_checked_pieces - .fetch_add(1, Ordering::Relaxed); - self.counters - .downloaded_and_checked_bytes - .fetch_add(piece_len, Ordering::Relaxed); - + .on_piece_downloaded(piece_len, full_piece_download_time); self.state.peers.reset_peer_backoff(self.addr); debug!("piece={} successfully downloaded and verified", index); diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs b/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs index 2933d07..5ab7cda 100644 --- a/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs +++ b/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs @@ -1,6 +1,6 @@ use std::{ sync::{ - atomic::{AtomicU32, AtomicU64}, + atomic::{AtomicU32, AtomicU64, Ordering}, Arc, }, time::Duration, @@ -19,6 +19,30 @@ pub(crate) struct PeerCountersAtomic { pub fetched_chunks: AtomicU32, pub downloaded_and_checked_pieces: AtomicU32, pub downloaded_and_checked_bytes: AtomicU64, + pub total_piece_download_ms: AtomicU64, +} + +impl PeerCountersAtomic { + pub(crate) fn on_piece_downloaded(&self, piece_len: u64, elapsed: Duration) { + let elapsed = elapsed.as_millis() as u64; + self.total_piece_download_ms + .fetch_add(elapsed, Ordering::Release); + self.downloaded_and_checked_pieces + .fetch_add(1, Ordering::Release); + self.downloaded_and_checked_bytes + .fetch_add(piece_len, Ordering::Relaxed); + } + + pub(crate) fn average_piece_download_time(&self) -> Option { + let downloaded_pieces = self.downloaded_and_checked_pieces.load(Ordering::Acquire); + let total_download_time = self.total_piece_download_ms.load(Ordering::Acquire); + if total_download_time == 0 || downloaded_pieces == 0 { + return None; + } + Some(Duration::from_millis( + total_download_time / downloaded_pieces as u64, + )) + } } #[derive(Debug)] diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs index df18007..1e0b2bd 100644 --- a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs +++ b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs @@ -14,6 +14,7 @@ pub struct PeerCounters { pub errors: u32, pub fetched_chunks: u32, pub downloaded_and_checked_pieces: u32, + pub total_piece_download_ms: u64, } #[derive(Serialize, Deserialize)] @@ -37,6 +38,7 @@ impl From<&super::atomic::PeerCountersAtomic> for PeerCounters { downloaded_and_checked_pieces: counters .downloaded_and_checked_pieces .load(Ordering::Relaxed), + total_piece_download_ms: counters.total_piece_download_ms.load(Ordering::Relaxed), } } } diff --git a/crates/librqbit/src/torrent_state/live/stats/atomic.rs b/crates/librqbit/src/torrent_state/live/stats/atomic.rs index 4e3c024..c16ed50 100644 --- a/crates/librqbit/src/torrent_state/live/stats/atomic.rs +++ b/crates/librqbit/src/torrent_state/live/stats/atomic.rs @@ -1,7 +1,4 @@ -use std::{ - sync::atomic::{AtomicU64, Ordering}, - time::Duration, -}; +use std::sync::atomic::AtomicU64; #[derive(Default, Debug)] pub struct AtomicStats { @@ -12,14 +9,3 @@ pub struct AtomicStats { pub fetched_bytes: AtomicU64, pub total_piece_download_ms: AtomicU64, } - -impl AtomicStats { - pub fn average_piece_download_time(&self) -> Option { - let d = self.downloaded_and_checked_pieces.load(Ordering::Acquire); - let t = self.total_piece_download_ms.load(Ordering::Acquire); - if d == 0 { - return None; - } - Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64)) - } -}