From aa99872e522760c1e6de3012674b80de5764d175 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 20 Nov 2023 00:55:31 +0000 Subject: [PATCH] WTF is going on with counters --- crates/librqbit/src/peer_state.rs | 145 ++++++++++++++++++++------- crates/librqbit/src/torrent_state.rs | 139 +++++++++++++------------ 2 files changed, 181 insertions(+), 103 deletions(-) diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 4e217d6..02189b6 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; use std::{collections::HashSet, sync::Arc}; @@ -5,8 +6,10 @@ use anyhow::Context; use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; use librqbit_core::id20::Id20; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; +use serde::Serialize; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::{Notify, Semaphore}; +use tracing::trace; use crate::peer_connection::WriterRequest; use crate::type_aliases::BF; @@ -63,10 +66,61 @@ impl Default for PeerStats { #[derive(Debug, Default)] pub struct Peer { - pub state: PeerState, + pub state: PeerStateNoMut, pub stats: PeerStats, } +#[derive(Debug, Default, Serialize)] +pub struct AggregatePeerStatsAtomic { + pub queued: AtomicU32, + pub connecting: AtomicU32, + pub live: AtomicU32, + pub seen: AtomicU32, + pub dead: AtomicU32, + pub not_needed: AtomicU32, +} + +pub fn atomic_inc(c: &AtomicU32) -> u32 { + c.fetch_add(1, Ordering::Relaxed) +} + +pub fn atomic_dec(c: &AtomicU32) -> u32 { + c.fetch_sub(1, Ordering::Relaxed) +} + +impl AggregatePeerStatsAtomic { + pub fn counter(&self, state: &PeerState) -> &AtomicU32 { + match state { + PeerState::Connecting(_) => &self.connecting, + PeerState::Live(_) => &self.live, + PeerState::Queued => &self.queued, + PeerState::Dead => &self.dead, + PeerState::NotNeeded => &self.not_needed, + } + } + + pub fn inc(&self, state: &PeerState) { + trace!( + "inc, new value = {}, state = {}", + atomic_inc(self.counter(state)), + state + ); + } + + pub fn dec(&self, state: &PeerState) { + trace!( + "dec, new value = {}, state = {}", + atomic_dec(self.counter(state)), + state + ); + } + + pub fn incdec(&self, old: &PeerState, new: &PeerState) { + self.dec(old); + self.inc(new); + } +} + #[derive(Debug, Default)] pub enum PeerState { #[default] @@ -82,6 +136,12 @@ pub enum PeerState { NotNeeded, } +impl std::fmt::Display for PeerState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } +} + impl PeerState { pub fn name(&self) -> &'static str { match self { @@ -93,70 +153,77 @@ impl PeerState { } } - fn take_connecting(&mut self) -> Option { - if let PeerState::Connecting(_) = self { - match std::mem::take(self) { - PeerState::Connecting(tx) => Some(tx), - _ => unreachable!(), - } - } else { - None + pub fn take_live_no_counters(self) -> Option { + match self { + PeerState::Live(l) => Some(l), + _ => None, } } +} - pub fn take_live(&mut self) -> Option { - if let PeerState::Live(_) = self { - match std::mem::take(self) { - PeerState::Live(l) => Some(l), - _ => unreachable!(), - } - } else { - None - } +#[derive(Debug, Default)] +pub struct PeerStateNoMut(PeerState); + +impl PeerStateNoMut { + pub fn get(&self) -> &PeerState { + &self.0 + } + + pub fn take(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { + self.set(Default::default(), counters) + } + + pub fn set(&mut self, new: PeerState, counters: &AggregatePeerStatsAtomic) -> PeerState { + counters.incdec(&self.0, &new); + std::mem::replace(&mut self.0, new) } pub fn get_live(&self) -> Option<&LivePeerState> { - match self { + match &self.0 { PeerState::Live(l) => Some(l), _ => None, } } pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { - match self { + match &mut self.0 { PeerState::Live(l) => Some(l), _ => None, } } - pub fn queued_to_connecting(&mut self) -> Option { - if let PeerState::Queued = self { + pub fn queued_to_connecting(&mut self, counters: &AggregatePeerStatsAtomic) -> Option { + if let PeerState::Queued = &self.0 { let (tx, rx) = unbounded_channel(); - *self = PeerState::Connecting(tx); + self.set(PeerState::Connecting(tx), counters); Some(rx) } else { None } } - pub fn connecting_to_live(&mut self, peer_id: Id20) -> Option<&mut LivePeerState> { - let tx = self.take_connecting()?; - *self = PeerState::Live(LivePeerState::new(peer_id, tx)); - self.get_live_mut() - } - - pub fn to_dead(&mut self) -> Option> { - match std::mem::replace(self, PeerState::Dead) { - PeerState::Live(l) => Some(Some(l)), - PeerState::Connecting(_) => Some(None), - _ => None, + pub fn connecting_to_live( + &mut self, + peer_id: Id20, + counters: &AggregatePeerStatsAtomic, + ) -> Option<&mut LivePeerState> { + if let PeerState::Connecting(_) = &self.0 { + let tx = match self.take(counters) { + PeerState::Connecting(tx) => tx, + _ => unreachable!(), + }; + self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters); + self.get_live_mut() + } else { + None } } - pub fn to_not_needed(&mut self) -> Option { - match std::mem::replace(self, PeerState::NotNeeded) { - PeerState::Live(l) => Some(l), - _ => None, - } + pub fn to_dead(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { + self.set(PeerState::Dead, counters) + } + + pub fn to_not_needed(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { + self.set(PeerState::NotNeeded, counters) } } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index c9f0d1e..86fa8b3 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -14,7 +14,7 @@ use std::{ net::SocketAddr, path::PathBuf, sync::{ - atomic::{AtomicU32, AtomicU64, Ordering}, + atomic::{AtomicU64, Ordering}, Arc, }, time::{Duration, Instant}, @@ -52,7 +52,10 @@ use crate::{ peer_connection::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, - peer_state::{InflightRequest, LivePeerState, Peer, PeerRx, PeerState, PeerTx, SendMany}, + peer_state::{ + atomic_inc, AggregatePeerStatsAtomic, InflightRequest, LivePeerState, Peer, PeerRx, + PeerState, PeerTx, SendMany, + }, spawn_utils::{spawn, BlockingSpawner}, type_aliases::{PeerHandle, BF}, }; @@ -68,29 +71,7 @@ pub struct PeerStates { states: DashMap, } -#[derive(Debug, Default, Serialize)] -pub struct AggregatePeerStatsAtomic { - pub queued: AtomicU32, - pub connecting: AtomicU32, - pub live: AtomicU32, - pub seen: AtomicU32, - pub dead: AtomicU32, - pub not_needed: AtomicU32, -} - -impl AggregatePeerStatsAtomic { - fn counter(&self, state: &PeerState) -> &AtomicU32 { - match state { - PeerState::Connecting(_) => &self.connecting, - PeerState::Live(_) => &self.live, - PeerState::Queued => &self.queued, - PeerState::Dead => &self.dead, - PeerState::NotNeeded => &self.not_needed, - } - } -} - -#[derive(Debug, Default, Serialize)] +#[derive(Debug, Default, Serialize, PartialEq, Eq)] pub struct AggregatePeerStats { pub queued: usize, pub connecting: usize, @@ -123,7 +104,7 @@ impl PeerStates { .iter() .fold(AggregatePeerStats::default(), |mut s, p| { s.seen += 1; - match &p.value().state { + match &p.value().state.get() { PeerState::Connecting(_) => s.connecting += 1, PeerState::Live(_) => s.live += 1, PeerState::Queued => s.queued += 1, @@ -143,7 +124,8 @@ impl PeerStates { Entry::Occupied(_) => None, Entry::Vacant(vac) => { vac.insert(Default::default()); - self.stats.queued.fetch_add(1, Ordering::Relaxed); + atomic_inc(&self.stats.queued); + atomic_inc(&self.stats.seen); Some(addr) } } @@ -162,10 +144,12 @@ impl PeerStates { .map(|e| f(TimedExistence::new(e, reason).value_mut())) } pub fn with_live(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option { - self.states.get(&addr).and_then(|e| match &e.value().state { - PeerState::Live(l) => Some(f(l)), - _ => None, - }) + self.states + .get(&addr) + .and_then(|e| match &e.value().state.get() { + PeerState::Live(l) => Some(f(l)), + _ => None, + }) } pub fn with_live_mut( &self, @@ -173,20 +157,19 @@ impl PeerStates { reason: &'static str, f: impl FnOnce(&mut LivePeerState) -> R, ) -> Option { - self.with_peer_mut(addr, reason, |peer| match &mut peer.state { - PeerState::Live(l) => Some(f(l)), - _ => None, - }) - .flatten() + self.with_peer_mut(addr, reason, |peer| peer.state.get_live_mut().map(f)) + .flatten() } pub fn mark_peer_dead(&self, handle: PeerHandle) -> Option> { - self.with_peer_mut(handle, "mark_peer_dead", |peer| peer.state.to_dead()) - .flatten() + let prev = self.with_peer_mut(handle, "mark_peer_dead", |peer| { + peer.state.to_dead(&self.stats) + })?; + Some(prev.take_live_no_counters()) } pub fn drop_peer(&self, handle: PeerHandle) -> Option { let p = self.states.remove(&handle).map(|r| r.1)?; - self.stats.counter(&p.state).fetch_sub(1, Ordering::Relaxed); + self.stats.dec(p.state.get()); Some(p) } pub fn mark_i_am_choked(&self, handle: PeerHandle, is_choked: bool) -> Option { @@ -216,12 +199,14 @@ impl PeerStates { }) } pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result { - self.with_peer_mut(h, "mark_peer_connecting", |peer| { - peer.state - .queued_to_connecting() - .context("invalid peer state") - }) - .context("peer not found in states")? + let rx = self + .with_peer_mut(h, "mark_peer_connecting", |peer| { + peer.state + .queued_to_connecting(&self.stats) + .context("invalid peer state") + }) + .context("peer not found in states")??; + Ok(rx) } pub fn clone_tx(&self, handle: PeerHandle) -> Option { @@ -234,11 +219,11 @@ impl PeerStates { }); } - fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { - self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { - peer.state.to_not_needed() - }) - .flatten() + fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { + let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { + peer.state.to_not_needed(&self.stats) + })?; + Some(prev) } } @@ -289,6 +274,7 @@ pub struct StatsSnapshot { pub time: Instant, pub total_piece_download_ms: u64, pub peer_stats: AggregatePeerStats, + pub new_peer_stats: AggregatePeerStats, } impl StatsSnapshot { @@ -677,10 +663,14 @@ impl TorrentState { fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| { - p.state.connecting_to_live(Id20(h.peer_id)).is_some() + p.state + .connecting_to_live(Id20(h.peer_id), &self.peers.stats) + .is_some() }); match result { - Some(true) => debug!("set peer to live"), + Some(true) => { + debug!("set peer to live") + } Some(false) => debug!("can't set peer live, it was in wrong state"), None => debug!("can't set peer live, it disappeared"), } @@ -694,7 +684,9 @@ impl TorrentState { return; } }; - match std::mem::take(&mut pe.value_mut().state) { + let prev = pe.value_mut().state.take(&self.peers.stats); + + match prev { PeerState::Connecting(_) => {} PeerState::Live(live) => { let mut g = self.lock_write("mark_chunk_requests_canceled"); @@ -709,7 +701,9 @@ impl TorrentState { } PeerState::NotNeeded => { // Restore it as std::mem::take() replaced it above. - pe.value_mut().state = PeerState::NotNeeded; + pe.value_mut() + .state + .set(PeerState::NotNeeded, &self.peers.stats); return; } s @ PeerState::Queued | s @ PeerState::Dead => { @@ -723,17 +717,21 @@ impl TorrentState { if error.is_none() { debug!("peer died without errors, not re-queueing"); - pe.value_mut().state = PeerState::NotNeeded; + pe.value_mut() + .state + .set(PeerState::NotNeeded, &self.peers.stats); return; } if self.is_finished() { debug!("torrent finished, not re-queueing"); - pe.value_mut().state = PeerState::NotNeeded; + pe.value_mut() + .state + .set(PeerState::NotNeeded, &self.peers.stats); return; } - pe.value_mut().state = PeerState::Dead; + pe.value_mut().state.set(PeerState::Dead, &self.peers.stats); let backoff = pe.value_mut().stats.backoff.next_backoff(); // Prevent deadlocks. @@ -754,8 +752,10 @@ impl TorrentState { state .peers .with_peer_mut(handle, "dead_to_queued", |peer| { - match &peer.state { - PeerState::Dead => peer.state = PeerState::Queued, + match peer.state.get() { + PeerState::Dead => { + peer.state.set(PeerState::Queued, &state.peers.stats) + } other => bail!( "peer is in unexpected state: {}. Expected dead", other.name() @@ -793,7 +793,7 @@ impl TorrentState { let mut futures = Vec::new(); for pe in self.peers.states.iter() { - match &pe.value().state { + match &pe.value().state.get() { PeerState::Live(live) => { if !live.peer_interested { continue; @@ -856,11 +856,17 @@ impl TorrentState { pub fn stats_snapshot(&self, with_peer_stats: bool) -> StatsSnapshot { use Ordering::*; + let new_peer_stats = self.peers.stats_from_atomic(); let peer_stats = if with_peer_stats { - self.peers.stats() + let old_stats = self.peers.stats(); + if old_stats != new_peer_stats { + warn!("old != new: {old_stats:?} != {new_peer_stats:?}") + } + old_stats } else { Default::default() }; + let downloaded = self.stats.downloaded_and_checked.load(Relaxed); let remaining = self.needed - downloaded; StatsSnapshot { @@ -875,6 +881,7 @@ impl TorrentState { remaining_bytes: remaining, total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), peer_stats, + new_peer_stats, } } @@ -1367,10 +1374,14 @@ impl PeerHandler { fn disconnect_all_peers_that_have_full_torrent(&self) { for mut pe in self.state.peers.states.iter_mut() { - if let PeerState::Live(l) = &pe.value().state { + if let PeerState::Live(l) = pe.value().state.get() { if l.has_full_torrent(self.state.lengths.total_pieces() as usize) { - let live = pe.value_mut().state.to_not_needed().unwrap(); - let _ = live.tx.send(WriterRequest::Disconnect); + let prev = pe.value_mut().state.to_not_needed(&self.state.peers.stats); + let _ = prev + .take_live_no_counters() + .unwrap() + .tx + .send(WriterRequest::Disconnect); } } }