From 123859328fa41f05bb0180990714a6f7aed4a344 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 20 Nov 2023 01:19:24 +0000 Subject: [PATCH] Remove old slow peer stats computation --- crates/librqbit/examples/ubuntu.rs | 2 +- crates/librqbit/src/http_api.rs | 2 +- crates/librqbit/src/torrent_manager.rs | 2 +- crates/librqbit/src/torrent_state.rs | 43 +++++--------------------- crates/rqbit/src/main.rs | 2 +- 5 files changed, 11 insertions(+), 40 deletions(-) diff --git a/crates/librqbit/examples/ubuntu.rs b/crates/librqbit/examples/ubuntu.rs index 782dbbc..6813043 100644 --- a/crates/librqbit/examples/ubuntu.rs +++ b/crates/librqbit/examples/ubuntu.rs @@ -55,7 +55,7 @@ async fn main() -> Result<(), anyhow::Error> { async move { loop { tokio::time::sleep(Duration::from_secs(1)).await; - let stats = handle.torrent_state().stats_snapshot(true); + let stats = handle.torrent_state().stats_snapshot(); info!("stats: {stats:?}"); } } diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 6fdebb5..e63156f 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -320,7 +320,7 @@ impl ApiInternal { fn api_stats(&self, idx: usize) -> Result { let mgr = self.mgr_handle(idx)?; - let snapshot = mgr.torrent_state().stats_snapshot(true); + let snapshot = mgr.torrent_state().stats_snapshot(); let estimator = mgr.speed_estimator(); // Poor mans download speed computation diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 3f160dc..c8157d4 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -294,7 +294,7 @@ impl TorrentManager { let state = mgr.state.clone(); async move { loop { - let stats = state.stats_snapshot(false); + let stats = state.stats_snapshot(); let fetched = stats.fetched_bytes; let needed = state.initially_needed(); // fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64. diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 86fa8b3..b7b0080 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -97,27 +97,9 @@ impl<'a> From<&'a AggregatePeerStatsAtomic> for AggregatePeerStats { impl PeerStates { pub fn stats(&self) -> AggregatePeerStats { - // TODO: it would be better to store these as atomic not to lock needlessly. - // However this would probably cause even more spaghetti. - timeit("PeerStates::stats", || { - self.states - .iter() - .fold(AggregatePeerStats::default(), |mut s, p| { - s.seen += 1; - match &p.value().state.get() { - PeerState::Connecting(_) => s.connecting += 1, - PeerState::Live(_) => s.live += 1, - PeerState::Queued => s.queued += 1, - PeerState::Dead => s.dead += 1, - PeerState::NotNeeded => s.not_needed += 1, - }; - s - }) - }) - } - pub fn stats_from_atomic(&self) -> AggregatePeerStats { AggregatePeerStats::from(&self.stats) } + pub fn add_if_not_seen(&self, addr: SocketAddr) -> Option { use dashmap::mapref::entry::Entry; match self.states.entry(addr) { @@ -274,7 +256,6 @@ pub struct StatsSnapshot { pub time: Instant, pub total_piece_download_ms: u64, pub peer_stats: AggregatePeerStats, - pub new_peer_stats: AggregatePeerStats, } impl StatsSnapshot { @@ -778,7 +759,7 @@ impl TorrentState { self.stats.uploaded.load(Ordering::Relaxed) } pub fn get_downloaded(&self) -> u64 { - self.stats.downloaded_and_checked.load(Ordering::Relaxed) + self.stats.downloaded_and_checked.load(Ordering::Acquire) } pub fn is_finished(&self) -> bool { @@ -854,19 +835,8 @@ impl TorrentState { true } - pub fn stats_snapshot(&self, with_peer_stats: bool) -> StatsSnapshot { + pub fn stats_snapshot(&self) -> StatsSnapshot { use Ordering::*; - let new_peer_stats = self.peers.stats_from_atomic(); - let peer_stats = if with_peer_stats { - let old_stats = self.peers.stats(); - if old_stats != new_peer_stats { - warn!("old != new: {old_stats:?} != {new_peer_stats:?}") - } - old_stats - } else { - Default::default() - }; - let downloaded = self.stats.downloaded_and_checked.load(Relaxed); let remaining = self.needed - downloaded; StatsSnapshot { @@ -880,8 +850,7 @@ impl TorrentState { initially_needed_bytes: self.needed, remaining_bytes: remaining, total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), - peer_stats, - new_peer_stats, + peer_stats: self.peers.stats(), } } @@ -1324,7 +1293,9 @@ impl PeerHandler { self.state .stats .downloaded_and_checked - .fetch_add(piece_len, Ordering::Relaxed); + // This counter is used to compute "is_finished", so using + // stronger ordering. + .fetch_add(piece_len, Ordering::Release); self.state .stats .have diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 49f7686..83cbb5f 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -244,7 +244,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> info!("[{}] initializing", idx); }, ManagedTorrentState::Running(handle) => { - let stats = timeit("stats_snapshot", || handle.torrent_state().stats_snapshot(true)); + let stats = timeit("stats_snapshot", || handle.torrent_state().stats_snapshot()); let speed = handle.speed_estimator(); let total = stats.total_bytes; let progress = stats.total_bytes - stats.remaining_bytes;