From 22ea146ff608f681d05dd5febaeab6943b0fd40a Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 19 Nov 2023 23:23:31 +0000 Subject: [PATCH] Starting to implement aggregate peer stats --- crates/librqbit/src/torrent_state.rs | 51 ++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index b28ed6e..c9f0d1e 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -4,7 +4,9 @@ // NOTE: deadlock notice: // peers and stateLocked are behind 2 different locks. // if you lock them in different order, this may deadlock. -// so always lock the peers one first, and unlock it before stateLocked is locked. +// +// so don't lock them both at the same time at all, or at the worst lock them in the +// same order (peers one first, then the global one). use std::{ collections::HashMap, @@ -12,7 +14,7 @@ use std::{ net::SocketAddr, path::PathBuf, sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicU32, AtomicU64, Ordering}, Arc, }, time::{Duration, Instant}, @@ -62,9 +64,32 @@ pub struct InflightPiece { #[derive(Default)] pub struct PeerStates { + stats: AggregatePeerStatsAtomic, states: DashMap, } +#[derive(Debug, Default, Serialize)] +pub struct AggregatePeerStatsAtomic { + pub queued: AtomicU32, + pub connecting: AtomicU32, + pub live: AtomicU32, + pub seen: AtomicU32, + pub dead: AtomicU32, + pub not_needed: AtomicU32, +} + +impl AggregatePeerStatsAtomic { + fn counter(&self, state: &PeerState) -> &AtomicU32 { + match state { + PeerState::Connecting(_) => &self.connecting, + PeerState::Live(_) => &self.live, + PeerState::Queued => &self.queued, + PeerState::Dead => &self.dead, + PeerState::NotNeeded => &self.not_needed, + } + } +} + #[derive(Debug, Default, Serialize)] pub struct AggregatePeerStats { pub queued: usize, @@ -75,6 +100,20 @@ pub struct AggregatePeerStats { pub not_needed: usize, } +impl<'a> From<&'a AggregatePeerStatsAtomic> for AggregatePeerStats { + fn from(s: &'a AggregatePeerStatsAtomic) -> Self { + let ordering = Ordering::Relaxed; + Self { + queued: s.queued.load(ordering) as usize, + connecting: s.connecting.load(ordering) as usize, + live: s.live.load(ordering) as usize, + seen: s.seen.load(ordering) as usize, + dead: s.dead.load(ordering) as usize, + not_needed: s.not_needed.load(ordering) as usize, + } + } +} + impl PeerStates { pub fn stats(&self) -> AggregatePeerStats { // TODO: it would be better to store these as atomic not to lock needlessly. @@ -95,12 +134,16 @@ impl PeerStates { }) }) } + pub fn stats_from_atomic(&self) -> AggregatePeerStats { + AggregatePeerStats::from(&self.stats) + } pub fn add_if_not_seen(&self, addr: SocketAddr) -> Option { use dashmap::mapref::entry::Entry; match self.states.entry(addr) { Entry::Occupied(_) => None, Entry::Vacant(vac) => { vac.insert(Default::default()); + self.stats.queued.fetch_add(1, Ordering::Relaxed); Some(addr) } } @@ -142,7 +185,9 @@ impl PeerStates { .flatten() } pub fn drop_peer(&self, handle: PeerHandle) -> Option { - self.states.remove(&handle).map(|r| r.1) + let p = self.states.remove(&handle).map(|r| r.1)?; + self.stats.counter(&p.state).fetch_sub(1, Ordering::Relaxed); + Some(p) } pub fn mark_i_am_choked(&self, handle: PeerHandle, is_choked: bool) -> Option { self.with_live_mut(handle, "mark_i_am_choked", |live| {