From 1e525eb91e1ecd568f8956eb30007ee6bfcbab3f Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 25 Dec 2023 08:58:42 -0500 Subject: [PATCH 1/4] Move per-peer piece stats into methods --- crates/librqbit/src/torrent_state/live/mod.rs | 7 +---- .../torrent_state/live/peer/stats/atomic.rs | 26 ++++++++++++++++++- .../torrent_state/live/peer/stats/snapshot.rs | 2 ++ 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index c439df2..411e817 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1474,12 +1474,7 @@ impl PeerHandler { // Per-peer piece counters. self.counters - .downloaded_and_checked_pieces - .fetch_add(1, Ordering::Relaxed); - self.counters - .downloaded_and_checked_bytes - .fetch_add(piece_len, Ordering::Relaxed); - + .on_piece_downloaded(piece_len, full_piece_download_time); self.state.peers.reset_peer_backoff(self.addr); debug!("piece={} successfully downloaded and verified", index); diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs b/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs index 2933d07..5ab7cda 100644 --- a/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs +++ b/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs @@ -1,6 +1,6 @@ use std::{ sync::{ - atomic::{AtomicU32, AtomicU64}, + atomic::{AtomicU32, AtomicU64, Ordering}, Arc, }, time::Duration, @@ -19,6 +19,30 @@ pub(crate) struct PeerCountersAtomic { pub fetched_chunks: AtomicU32, pub downloaded_and_checked_pieces: AtomicU32, pub downloaded_and_checked_bytes: AtomicU64, + pub total_piece_download_ms: AtomicU64, +} + +impl PeerCountersAtomic { + pub(crate) fn on_piece_downloaded(&self, piece_len: u64, elapsed: Duration) { + let elapsed = elapsed.as_millis() as u64; + self.total_piece_download_ms + .fetch_add(elapsed, Ordering::Release); + self.downloaded_and_checked_pieces + .fetch_add(1, Ordering::Release); + self.downloaded_and_checked_bytes + .fetch_add(piece_len, Ordering::Relaxed); + } + + pub(crate) fn average_piece_download_time(&self) -> Option { + let downloaded_pieces = self.downloaded_and_checked_pieces.load(Ordering::Acquire); + let total_download_time = self.total_piece_download_ms.load(Ordering::Acquire); + if total_download_time == 0 || downloaded_pieces == 0 { + return None; + } + Some(Duration::from_millis( + total_download_time / downloaded_pieces as u64, + )) + } } #[derive(Debug)] diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs index df18007..1e0b2bd 100644 --- a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs +++ b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs @@ -14,6 +14,7 @@ pub struct PeerCounters { pub errors: u32, pub fetched_chunks: u32, pub downloaded_and_checked_pieces: u32, + pub total_piece_download_ms: u64, } #[derive(Serialize, Deserialize)] @@ -37,6 +38,7 @@ impl From<&super::atomic::PeerCountersAtomic> for PeerCounters { downloaded_and_checked_pieces: counters .downloaded_and_checked_pieces .load(Ordering::Relaxed), + total_piece_download_ms: counters.total_piece_download_ms.load(Ordering::Relaxed), } } } From 5d4ffae6de93dee6f1167f2b5d807c0a11615196 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 25 Dec 2023 09:08:16 -0500 Subject: [PATCH 2/4] When stealing pieces, only steal from slower peers --- crates/librqbit/src/torrent_state/live/mod.rs | 27 +++++++++---------- .../src/torrent_state/live/stats/atomic.rs | 16 +---------- 2 files changed, 13 insertions(+), 30 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 411e817..e2d2baa 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1030,18 +1030,15 @@ impl PeerHandler { .map(|r| r.flatten()) } + /// Try to steal a piece from a slower peer. Threshold is + /// "how many times is my average download speed faster to be able to steal". + /// + /// If this returns, an existing in-flight piece was marked to be ours. fn try_steal_old_slow_piece(&self, threshold: f64) -> Option { - let total = self - .state - .stats - .downloaded_and_checked_pieces - .load(Ordering::Acquire); - - // heuristic for not enough precision in average time - if total < 20 { - return None; - } - let avg_time = self.state.stats.average_piece_download_time()?; + let my_avg_time = match self.counters.average_piece_download_time() { + Some(t) => t, + None => return None, + }; let mut g = self.state.lock_write("try_steal_old_slow_piece"); let (idx, elapsed, piece_req) = g @@ -1053,10 +1050,10 @@ impl PeerHandler { .max_by_key(|(_, e, _)| *e)?; // heuristic for "too slow peer" - if elapsed.as_secs_f64() > avg_time.as_secs_f64() * threshold { + if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold { debug!( - "will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}", - idx, piece_req.peer, elapsed, avg_time + "will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}", + idx, piece_req.peer, elapsed, my_avg_time ); piece_req.peer = self.addr; piece_req.started = Instant::now(); @@ -1469,7 +1466,7 @@ impl PeerHandler { .fetch_add(piece_len, Ordering::Relaxed); self.state.stats.total_piece_download_ms.fetch_add( full_piece_download_time.as_millis() as u64, - Ordering::Release, + Ordering::Relaxed, ); // Per-peer piece counters. diff --git a/crates/librqbit/src/torrent_state/live/stats/atomic.rs b/crates/librqbit/src/torrent_state/live/stats/atomic.rs index 4e3c024..c16ed50 100644 --- a/crates/librqbit/src/torrent_state/live/stats/atomic.rs +++ b/crates/librqbit/src/torrent_state/live/stats/atomic.rs @@ -1,7 +1,4 @@ -use std::{ - sync::atomic::{AtomicU64, Ordering}, - time::Duration, -}; +use std::sync::atomic::AtomicU64; #[derive(Default, Debug)] pub struct AtomicStats { @@ -12,14 +9,3 @@ pub struct AtomicStats { pub fetched_bytes: AtomicU64, pub total_piece_download_ms: AtomicU64, } - -impl AtomicStats { - pub fn average_piece_download_time(&self) -> Option { - let d = self.downloaded_and_checked_pieces.load(Ordering::Acquire); - let t = self.total_piece_download_ms.load(Ordering::Acquire); - if d == 0 { - return None; - } - Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64)) - } -} From 8cdf44c4fd0eb8f0ba5472db5c39cfa165707810 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 25 Dec 2023 09:09:52 -0500 Subject: [PATCH 3/4] Make final stealing less aggressive --- crates/librqbit/src/torrent_state/live/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index e2d2baa..46e3b85 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1215,7 +1215,7 @@ impl PeerHandler { let next = match self .try_steal_old_slow_piece(10.) .or_else(|| self.reserve_next_needed_piece().ok().flatten()) - .or_else(|| self.try_steal_old_slow_piece(2.)) + .or_else(|| self.try_steal_old_slow_piece(3.)) { Some(next) => next, None => { From dfdb6b5fae46800f4e0b3759a2b3a0593a88f5f7 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 25 Dec 2023 09:24:57 -0500 Subject: [PATCH 4/4] Do not ignore errors in reserve_next_needed_piece() --- crates/librqbit/src/torrent_state/live/mod.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 46e3b85..72a7db5 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1011,10 +1011,15 @@ impl PeerHandler { } } + let n_opt = match n_opt { + Some(n_opt) => n_opt, + None => return Ok(None), + }; + self.state .lengths - .validate_piece_index(n_opt.context("invalid n_opt")? as u32) - .context("invalid piece")? + .validate_piece_index(n_opt as u32) + .context("bug: invalid piece")? }; g.inflight_pieces.insert( n, @@ -1214,7 +1219,7 @@ impl PeerHandler { // Afterwards means we are close to completion, try stealing more aggressively. let next = match self .try_steal_old_slow_piece(10.) - .or_else(|| self.reserve_next_needed_piece().ok().flatten()) + .map_or_else(|| self.reserve_next_needed_piece(), |v| Ok(Some(v)))? .or_else(|| self.try_steal_old_slow_piece(3.)) { Some(next) => next,