Starting to implement aggregate peer stats

This commit is contained in:
Igor Katson 2023-11-19 23:23:31 +00:00
parent 0c89ee9248
commit 22ea146ff6
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -4,7 +4,9 @@
// NOTE: deadlock notice:
// peers and stateLocked are behind 2 different locks.
// if you lock them in different order, this may deadlock.
// so always lock the peers one first, and unlock it before stateLocked is locked.
//
// so don't lock them both at the same time at all, or at the worst lock them in the
// same order (peers one first, then the global one).
use std::{
collections::HashMap,
@ -12,7 +14,7 @@ use std::{
net::SocketAddr,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicU32, AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
@ -62,9 +64,32 @@ pub struct InflightPiece {
#[derive(Default)]
pub struct PeerStates {
stats: AggregatePeerStatsAtomic,
states: DashMap<PeerHandle, Peer>,
}
#[derive(Debug, Default, Serialize)]
pub struct AggregatePeerStatsAtomic {
pub queued: AtomicU32,
pub connecting: AtomicU32,
pub live: AtomicU32,
pub seen: AtomicU32,
pub dead: AtomicU32,
pub not_needed: AtomicU32,
}
impl AggregatePeerStatsAtomic {
fn counter(&self, state: &PeerState) -> &AtomicU32 {
match state {
PeerState::Connecting(_) => &self.connecting,
PeerState::Live(_) => &self.live,
PeerState::Queued => &self.queued,
PeerState::Dead => &self.dead,
PeerState::NotNeeded => &self.not_needed,
}
}
}
#[derive(Debug, Default, Serialize)]
pub struct AggregatePeerStats {
pub queued: usize,
@ -75,6 +100,20 @@ pub struct AggregatePeerStats {
pub not_needed: usize,
}
impl<'a> From<&'a AggregatePeerStatsAtomic> for AggregatePeerStats {
fn from(s: &'a AggregatePeerStatsAtomic) -> Self {
let ordering = Ordering::Relaxed;
Self {
queued: s.queued.load(ordering) as usize,
connecting: s.connecting.load(ordering) as usize,
live: s.live.load(ordering) as usize,
seen: s.seen.load(ordering) as usize,
dead: s.dead.load(ordering) as usize,
not_needed: s.not_needed.load(ordering) as usize,
}
}
}
impl PeerStates {
pub fn stats(&self) -> AggregatePeerStats {
// TODO: it would be better to store these as atomic not to lock needlessly.
@ -95,12 +134,16 @@ impl PeerStates {
})
})
}
pub fn stats_from_atomic(&self) -> AggregatePeerStats {
AggregatePeerStats::from(&self.stats)
}
pub fn add_if_not_seen(&self, addr: SocketAddr) -> Option<PeerHandle> {
use dashmap::mapref::entry::Entry;
match self.states.entry(addr) {
Entry::Occupied(_) => None,
Entry::Vacant(vac) => {
vac.insert(Default::default());
self.stats.queued.fetch_add(1, Ordering::Relaxed);
Some(addr)
}
}
@ -142,7 +185,9 @@ impl PeerStates {
.flatten()
}
pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> {
self.states.remove(&handle).map(|r| r.1)
let p = self.states.remove(&handle).map(|r| r.1)?;
self.stats.counter(&p.state).fetch_sub(1, Ordering::Relaxed);
Some(p)
}
pub fn mark_i_am_choked(&self, handle: PeerHandle, is_choked: bool) -> Option<bool> {
self.with_live_mut(handle, "mark_i_am_choked", |live| {