From adc2ca97b3cb4beabd62cd2b9ccc77c8e658d167 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 4 Dec 2024 10:03:08 +0000 Subject: [PATCH] A bit of refactoring to store live_peers --- crates/librqbit/src/torrent_state/live/mod.rs | 33 +++++++++---------- .../src/torrent_state/live/peer/mod.rs | 29 ++++++++-------- .../src/torrent_state/live/peers/mod.rs | 9 ++--- 3 files changed, 34 insertions(+), 37 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ca2922a..0712cb3 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -73,7 +73,6 @@ use peer_binary_protocol::{ }, Handshake, Message, MessageOwned, Piece, Request, }; -use peers::stats::atomic::AggregatePeerStatsAtomic; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, Notify, OwnedSemaphorePermit, Semaphore, @@ -258,6 +257,7 @@ impl TorrentStateLive { session_stats: session_stats.clone(), stats: Default::default(), states: Default::default(), + live_peers: Default::default(), }, locked: RwLock::new(TorrentStateLocked { chunks: Some(paused.chunk_tracker), @@ -337,10 +337,6 @@ impl TorrentStateLive { spawn_with_cancel(span, self.cancellation_token.clone(), fut); } - fn peer_stats(&self) -> [&AggregatePeerStatsAtomic; 2] { - [&self.peers.stats, &self.peers.session_stats.peers] - } - pub fn down_speed_estimator(&self) -> &SpeedEstimator { &self.down_speed_estimator } @@ -377,7 +373,7 @@ impl TorrentStateLive { .incoming_connection( Id20::new(checked_peer.handshake.peer_id), tx.clone(), - &self.peer_stats(), + &self.peers, ) .context("peer already existed")?; peer.stats.counters.clone() @@ -387,7 +383,7 @@ impl TorrentStateLive { let peer = Peer::new_live_for_incoming_connection( Id20::new(checked_peer.handshake.peer_id), tx.clone(), - &self.peer_stats(), + &self.peers, ); let counters = peer.stats.counters.clone(); vac.insert(peer); @@ -620,7 +616,7 @@ impl TorrentStateLive { fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { self.peers.with_peer_mut(handle, "set_peer_live", |p| { p.state - .connecting_to_live(Id20::new(h.peer_id), &self.peer_stats()); + .connecting_to_live(Id20::new(h.peer_id), &self.peers); }); } @@ -814,7 +810,7 @@ impl TorrentStateLive { for mut pe in self.peers.states.iter_mut() { if let PeerState::Live(l) = pe.value().state.get() { if l.has_full_torrent(self.lengths.total_pieces() as usize) { - let prev = pe.value_mut().state.set_not_needed(&self.peer_stats()); + let prev = pe.value_mut().state.set_not_needed(&self.peers); let _ = prev .take_live_no_counters() .unwrap() @@ -832,7 +828,7 @@ impl TorrentStateLive { .filter_map(|mut p| { let known_addr = *p.key(); p.value_mut() - .reconnect_not_needed_peer(known_addr, &self.peer_stats()) + .reconnect_not_needed_peer(known_addr, &self.peers) }) .map(|socket_addr| self.peer_queue_tx.send(socket_addr)) .take_while(|r| r.is_ok()) @@ -1113,7 +1109,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { impl PeerHandler { fn on_peer_died(self, error: Option) -> anyhow::Result<()> { let peers = &self.state.peers; - let pstats = self.state.peer_stats(); + let pstats = { + let this = &self.state; + &this.peers + }; let handle = self.addr; let mut pe = match peers.states.get_mut(&handle) { Some(peer) => TimedExistence::new(peer, "on_peer_died"), @@ -1122,7 +1121,7 @@ impl PeerHandler { return Ok(()); } }; - let prev = pe.value_mut().state.take(&pstats); + let prev = pe.value_mut().state.take(pstats); match prev { PeerState::Connecting(_) => {} @@ -1141,7 +1140,7 @@ impl PeerHandler { } PeerState::NotNeeded => { // Restore it as std::mem::take() replaced it above. - pe.value_mut().state.set(PeerState::NotNeeded, &pstats); + pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); } s @ PeerState::Queued | s @ PeerState::Dead => { @@ -1157,7 +1156,7 @@ impl PeerHandler { Some(e) => e, None => { trace!("peer died without errors, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, &pstats); + pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); } }; @@ -1166,11 +1165,11 @@ impl PeerHandler { if self.state.is_finished_and_no_active_streams() { debug!("torrent finished, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, &pstats); + pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); } - pe.value_mut().state.set(PeerState::Dead, &pstats); + pe.value_mut().state.set(PeerState::Dead, pstats); if self.incoming { // do not retry incoming peers @@ -1203,7 +1202,7 @@ impl PeerHandler { .with_peer_mut(handle, "dead_to_queued", |peer| { match peer.state.get() { PeerState::Dead => { - peer.state.set(PeerState::Queued, &self.state.peer_stats()) + peer.state.set(PeerState::Queued, &self.state.peers) } other => bail!( "peer is in unexpected state: {}. Expected dead", diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 3c6e6af..94e4daf 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -12,7 +12,7 @@ use tracing::debug; use crate::peer_connection::WriterRequest; use crate::type_aliases::BF; -use super::peers::stats::atomic::AggregatePeerStatsAtomic; +use super::PeerStates; pub(crate) type InflightRequest = ChunkInfo; pub(crate) type PeerRx = UnboundedReceiver; @@ -29,10 +29,10 @@ impl Peer { pub fn new_live_for_incoming_connection( peer_id: Id20, tx: PeerTx, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> Self { let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true))); - for counter in counters { + for counter in [&counters.session_stats.peers, &counters.stats] { counter.inc(&state.0); } Self { @@ -52,7 +52,7 @@ impl Peer { pub(crate) fn reconnect_not_needed_peer( &mut self, known_address: SocketAddr, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> Option { if let PeerState::NotNeeded = self.state.get() { match self.outgoing_address { @@ -124,18 +124,18 @@ impl PeerStateNoMut { &self.0 } - pub fn take(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { + pub fn take(&mut self, counters: &PeerStates) -> PeerState { self.set(Default::default(), counters) } - pub fn destroy(self, counters: &[&AggregatePeerStatsAtomic]) { - for counter in counters { + pub fn destroy(self, counters: &PeerStates) { + for counter in [&counters.session_stats.peers, &counters.stats] { counter.dec(&self.0); } } - pub fn set(&mut self, new: PeerState, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { - for counter in counters { + pub fn set(&mut self, new: PeerState, counters: &PeerStates) -> PeerState { + for counter in [&counters.session_stats.peers, &counters.stats] { counter.incdec(&self.0, &new); } std::mem::replace(&mut self.0, new) @@ -159,10 +159,7 @@ impl PeerStateNoMut { } } - pub fn idle_to_connecting( - &mut self, - counters: &[&AggregatePeerStatsAtomic], - ) -> Option<(PeerRx, PeerTx)> { + pub fn idle_to_connecting(&mut self, counters: &PeerStates) -> Option<(PeerRx, PeerTx)> { match &self.0 { PeerState::Queued | PeerState::NotNeeded => { let (tx, rx) = unbounded_channel(); @@ -178,7 +175,7 @@ impl PeerStateNoMut { &mut self, peer_id: Id20, tx: PeerTx, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> anyhow::Result<()> { if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { anyhow::bail!("peer already active"); @@ -198,7 +195,7 @@ impl PeerStateNoMut { pub fn connecting_to_live( &mut self, peer_id: Id20, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> Option<&mut LivePeerState> { if let PeerState::Connecting(_) = &self.0 { let tx = match self.take(counters) { @@ -215,7 +212,7 @@ impl PeerStateNoMut { } } - pub fn set_not_needed(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { + pub fn set_not_needed(&mut self, counters: &PeerStates) -> PeerState { self.set(PeerState::NotNeeded, counters) } } diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index 110df73..e68dafb 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -4,6 +4,7 @@ use anyhow::Context; use backoff::backoff::Backoff; use dashmap::DashMap; use librqbit_core::lengths::ValidPieceIndex; +use parking_lot::RwLock; use peer_binary_protocol::{Message, Request}; use crate::{ @@ -21,6 +22,7 @@ pub mod stats; pub(crate) struct PeerStates { pub session_stats: Arc, + pub live_peers: RwLock>, pub stats: AggregatePeerStatsAtomic, pub states: DashMap, } @@ -28,7 +30,7 @@ pub(crate) struct PeerStates { impl Drop for PeerStates { fn drop(&mut self) { for (_, p) in std::mem::take(&mut self.states).into_iter() { - p.state.destroy(&[&self.session_stats.peers]); + p.state.destroy(self); } } } @@ -115,7 +117,7 @@ impl PeerStates { let rx = self .with_peer_mut(h, "mark_peer_connecting", |peer| { peer.state - .idle_to_connecting(&[&self.stats, &self.session_stats.peers]) + .idle_to_connecting(self) .context("invalid peer state") }) .context("peer not found in states")??; @@ -130,8 +132,7 @@ impl PeerStates { pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { - peer.state - .set_not_needed(&[&self.stats, &self.session_stats.peers]) + peer.state.set_not_needed(self) })?; Some(prev) }