From 67afdb0aa5bb927cc47216ea584dc5d67bfdea4e Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 11:22:37 +0100 Subject: [PATCH] Add a struct for session stats + bump the counters --- crates/librqbit/src/lib.rs | 1 + crates/librqbit/src/session.rs | 6 +++ crates/librqbit/src/session_stats/atomic.rs | 10 ++++ crates/librqbit/src/session_stats/mod.rs | 1 + crates/librqbit/src/torrent_state/live/mod.rs | 48 ++++++++++++++----- .../src/torrent_state/live/peer/mod.rs | 24 ++++++---- .../src/torrent_state/live/peers/mod.rs | 19 ++++++-- crates/librqbit/src/torrent_state/mod.rs | 18 +++++-- 8 files changed, 96 insertions(+), 31 deletions(-) create mode 100644 crates/librqbit/src/session_stats/atomic.rs create mode 100644 crates/librqbit/src/session_stats/mod.rs diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index b6028c7..defe1d1 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -57,6 +57,7 @@ mod peer_info_reader; mod read_buf; mod session; mod session_persistence; +mod session_stats; mod spawn_utils; pub mod storage; mod stream_connect; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 01ac3ef..af1a892 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -16,6 +16,7 @@ use crate::{ peer_connection::PeerConnectionOptions, read_buf::ReadBuf, session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore}, + session_stats::atomic::AtomicSessionStats, spawn_utils::BlockingSpawner, storage::{ filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage, @@ -116,6 +117,8 @@ pub struct Session { root_span: Option, + stats: Arc, + // This is stored for all tasks to stop when session is dropped. _cancellation_token_drop_guard: DropGuard, } @@ -602,6 +605,7 @@ impl Session { reqwest_client, connector: stream_connector, root_span: opts.root_span, + stats: Default::default(), concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3))) }); @@ -1147,6 +1151,7 @@ impl Session { self.cancellation_token.child_token(), self.concurrent_initialize_semaphore.clone(), self.bitv_factory.clone(), + self.stats.clone(), ) .context("error starting torrent")?; } @@ -1303,6 +1308,7 @@ impl Session { self.cancellation_token.child_token(), self.concurrent_initialize_semaphore.clone(), self.bitv_factory.clone(), + self.stats.clone(), )?; self.try_update_persistence_metadata(handle).await; Ok(()) diff --git a/crates/librqbit/src/session_stats/atomic.rs b/crates/librqbit/src/session_stats/atomic.rs new file mode 100644 index 0000000..fb2ae1c --- /dev/null +++ b/crates/librqbit/src/session_stats/atomic.rs @@ -0,0 +1,10 @@ +use std::sync::atomic::AtomicU64; + +use crate::torrent_state::live::peers::stats::atomic::AggregatePeerStatsAtomic; + +#[derive(Default, Debug)] +pub struct AtomicSessionStats { + pub fetched_bytes: AtomicU64, + pub uploaded_bytes: AtomicU64, + pub peers: AggregatePeerStatsAtomic, +} diff --git a/crates/librqbit/src/session_stats/mod.rs b/crates/librqbit/src/session_stats/mod.rs new file mode 100644 index 0000000..652223f --- /dev/null +++ b/crates/librqbit/src/session_stats/mod.rs @@ -0,0 +1 @@ +pub mod atomic; diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index a537fd0..e9da863 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -70,6 +70,7 @@ use peer_binary_protocol::{ extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage}, Handshake, Message, MessageOwned, Piece, Request, }; +use peers::stats::atomic::AggregatePeerStatsAtomic; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, Notify, OwnedSemaphorePermit, Semaphore, @@ -84,6 +85,7 @@ use crate::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, session::CheckedIncomingConnection, + session_stats::atomic::AtomicSessionStats, torrent_state::{peer::Peer, utils::atomic_inc}, type_aliases::{DiskWorkQueueSender, FilePriorities, FileStorage, PeerHandle, BF}, }; @@ -201,6 +203,8 @@ pub struct TorrentStateLive { up_speed_estimator: SpeedEstimator, cancellation_token: CancellationToken, + session_stats: Arc, + pub(crate) streams: Arc, have_broadcast_tx: tokio::sync::broadcast::Sender, } @@ -210,6 +214,7 @@ impl TorrentStateLive { paused: TorrentStatePaused, fatal_errors_tx: tokio::sync::oneshot::Sender, cancellation_token: CancellationToken, + session_stats: Arc, ) -> anyhow::Result> { let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); @@ -237,7 +242,11 @@ impl TorrentStateLive { let state = Arc::new(TorrentStateLive { meta: paused.info.clone(), - peers: Default::default(), + peers: PeerStates { + session_stats: session_stats.clone(), + stats: Default::default(), + states: Default::default(), + }, locked: RwLock::new(TorrentStateLocked { chunks: Some(paused.chunk_tracker), // TODO: move under per_piece_locks? @@ -260,6 +269,7 @@ impl TorrentStateLive { up_speed_estimator, cancellation_token, have_broadcast_tx, + session_stats, streams: paused.streams, per_piece_locks: (0..lengths.total_pieces()) .map(|_| RwLock::new(())) @@ -307,6 +317,10 @@ impl TorrentStateLive { spawn_with_cancel(span, self.cancellation_token.clone(), fut); } + fn peer_stats(&self) -> [&AggregatePeerStatsAtomic; 2] { + [&self.peers.stats, &self.peers.session_stats.peers] + } + pub fn down_speed_estimator(&self) -> &SpeedEstimator { &self.down_speed_estimator } @@ -343,7 +357,7 @@ impl TorrentStateLive { .incoming_connection( Id20::new(checked_peer.handshake.peer_id), tx.clone(), - &self.peers.stats, + &self.peer_stats(), ) .context("peer already existed")?; peer.stats.counters.clone() @@ -353,7 +367,7 @@ impl TorrentStateLive { let peer = Peer::new_live_for_incoming_connection( Id20::new(checked_peer.handshake.peer_id), tx.clone(), - &self.peers.stats, + &self.peer_stats(), ); let counters = peer.stats.counters.clone(); vac.insert(peer); @@ -562,7 +576,7 @@ impl TorrentStateLive { fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { self.peers.with_peer_mut(handle, "set_peer_live", |p| { p.state - .connecting_to_live(Id20::new(h.peer_id), &self.peers.stats); + .connecting_to_live(Id20::new(h.peer_id), &self.peer_stats()); }); } @@ -750,7 +764,7 @@ impl TorrentStateLive { for mut pe in self.peers.states.iter_mut() { if let PeerState::Live(l) = pe.value().state.get() { if l.has_full_torrent(self.lengths.total_pieces() as usize) { - let prev = pe.value_mut().state.set_not_needed(&self.peers.stats); + let prev = pe.value_mut().state.set_not_needed(&self.peer_stats()); let _ = prev .take_live_no_counters() .unwrap() @@ -763,7 +777,7 @@ impl TorrentStateLive { pub(crate) fn reconnect_all_not_needed_peers(&self) { for mut pe in self.peers.states.iter_mut() { - if pe.state.not_needed_to_queued(&self.peers.stats) + if pe.state.not_needed_to_queued(&self.peer_stats()) && self.peer_queue_tx.send(*pe.key()).is_err() { return; @@ -883,6 +897,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { .stats .uploaded_bytes .fetch_add(bytes as u64, Ordering::Relaxed); + self.state + .session_stats + .uploaded_bytes + .fetch_add(bytes as u64, Ordering::Relaxed); } fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()> { @@ -925,7 +943,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { impl PeerHandler { fn on_peer_died(self, error: Option) -> anyhow::Result<()> { let peers = &self.state.peers; - let pstats = &peers.stats; + let pstats = self.state.peer_stats(); let handle = self.addr; let mut pe = match peers.states.get_mut(&handle) { Some(peer) => TimedExistence::new(peer, "on_peer_died"), @@ -934,7 +952,7 @@ impl PeerHandler { return Ok(()); } }; - let prev = pe.value_mut().state.take(pstats); + let prev = pe.value_mut().state.take(&pstats); match prev { PeerState::Connecting(_) => {} @@ -953,7 +971,7 @@ impl PeerHandler { } PeerState::NotNeeded => { // Restore it as std::mem::take() replaced it above. - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().state.set(PeerState::NotNeeded, &pstats); return Ok(()); } s @ PeerState::Queued | s @ PeerState::Dead => { @@ -969,7 +987,7 @@ impl PeerHandler { Some(e) => e, None => { trace!("peer died without errors, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().state.set(PeerState::NotNeeded, &pstats); return Ok(()); } }; @@ -978,11 +996,11 @@ impl PeerHandler { if self.state.is_finished_and_no_active_streams() { debug!("torrent finished, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().state.set(PeerState::NotNeeded, &pstats); return Ok(()); } - pe.value_mut().state.set(PeerState::Dead, pstats); + pe.value_mut().state.set(PeerState::Dead, &pstats); let backoff = pe.value_mut().stats.backoff.next_backoff(); @@ -1006,7 +1024,7 @@ impl PeerHandler { .with_peer_mut(handle, "dead_to_queued", |peer| { match peer.state.get() { PeerState::Dead => { - peer.state.set(PeerState::Queued, &self.state.peers.stats) + peer.state.set(PeerState::Queued, &self.state.peer_stats()) } other => bail!( "peer is in unexpected state: {}. Expected dead", @@ -1415,6 +1433,10 @@ impl PeerHandler { .stats .fetched_bytes .fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed); + self.state + .session_stats + .fetched_bytes + .fetch_add(piece.block.len() as u64, Ordering::Relaxed); fn write_to_disk( state: &TorrentStateLive, diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 8299fc1..0c6b0ca 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -26,10 +26,12 @@ impl Peer { pub fn new_live_for_incoming_connection( peer_id: Id20, tx: PeerTx, - counters: &AggregatePeerStatsAtomic, + counters: &[&AggregatePeerStatsAtomic], ) -> Self { let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true))); - counters.inc(&state.0); + for counter in counters { + counter.inc(&state.0); + } Self { state, stats: Default::default(), @@ -85,12 +87,14 @@ impl PeerStateNoMut { &self.0 } - pub fn take(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { + pub fn take(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { self.set(Default::default(), counters) } - pub fn set(&mut self, new: PeerState, counters: &AggregatePeerStatsAtomic) -> PeerState { - counters.incdec(&self.0, &new); + pub fn set(&mut self, new: PeerState, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { + for counter in counters { + counter.incdec(&self.0, &new); + } std::mem::replace(&mut self.0, new) } @@ -110,7 +114,7 @@ impl PeerStateNoMut { pub fn idle_to_connecting( &mut self, - counters: &AggregatePeerStatsAtomic, + counters: &[&AggregatePeerStatsAtomic], ) -> Option<(PeerRx, PeerTx)> { match &self.0 { PeerState::Queued | PeerState::NotNeeded => { @@ -123,7 +127,7 @@ impl PeerStateNoMut { } } - pub fn not_needed_to_queued(&mut self, counters: &AggregatePeerStatsAtomic) -> bool { + pub fn not_needed_to_queued(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> bool { if let PeerState::NotNeeded = &self.0 { self.set(PeerState::Queued, counters); return true; @@ -135,7 +139,7 @@ impl PeerStateNoMut { &mut self, peer_id: Id20, tx: PeerTx, - counters: &AggregatePeerStatsAtomic, + counters: &[&AggregatePeerStatsAtomic], ) -> anyhow::Result<()> { if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { anyhow::bail!("peer already active"); @@ -155,7 +159,7 @@ impl PeerStateNoMut { pub fn connecting_to_live( &mut self, peer_id: Id20, - counters: &AggregatePeerStatsAtomic, + counters: &[&AggregatePeerStatsAtomic], ) -> Option<&mut LivePeerState> { if let PeerState::Connecting(_) = &self.0 { let tx = match self.take(counters) { @@ -172,7 +176,7 @@ impl PeerStateNoMut { } } - pub fn set_not_needed(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { + pub fn set_not_needed(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { self.set(PeerState::NotNeeded, counters) } } diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index edbd8ff..fc4d6ea 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; use anyhow::Context; use backoff::backoff::Backoff; @@ -8,6 +8,7 @@ use peer_binary_protocol::{Message, Request}; use crate::{ peer_connection::WriterRequest, + session_stats::atomic::AtomicSessionStats, torrent_state::utils::{atomic_inc, TimedExistence}, type_aliases::{PeerHandle, BF}, }; @@ -18,8 +19,8 @@ use super::peer::{LivePeerState, Peer, PeerRx, PeerState, PeerTx}; pub mod stats; -#[derive(Default)] pub(crate) struct PeerStates { + pub session_stats: Arc, pub stats: AggregatePeerStatsAtomic, pub states: DashMap, } @@ -36,7 +37,10 @@ impl PeerStates { Entry::Vacant(vac) => { vac.insert(Default::default()); atomic_inc(&self.stats.queued); + atomic_inc(&self.session_stats.peers.queued); + atomic_inc(&self.stats.seen); + atomic_inc(&self.session_stats.peers.queued); Some(addr) } } @@ -73,7 +77,10 @@ impl PeerStates { pub fn drop_peer(&self, handle: PeerHandle) -> Option { let p = self.states.remove(&handle).map(|r| r.1)?; - self.stats.dec(p.state.get()); + let s = p.state.get(); + self.stats.dec(s); + self.session_stats.peers.dec(s); + Some(p) } @@ -99,7 +106,7 @@ impl PeerStates { let rx = self .with_peer_mut(h, "mark_peer_connecting", |peer| { peer.state - .idle_to_connecting(&self.stats) + .idle_to_connecting(&[&self.stats, &self.session_stats.peers]) .context("invalid peer state") }) .context("peer not found in states")??; @@ -114,7 +121,8 @@ impl PeerStates { pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { - peer.state.set_not_needed(&self.stats) + peer.state + .set_not_needed(&[&self.stats, &self.session_stats.peers]) })?; Some(prev) } @@ -132,6 +140,7 @@ impl PeerStates { atomic_inc(&p.stats.counters.times_stolen_from_me); }); self.stats.inc_steals(); + self.session_stats.peers.inc_steals(); self.with_live_mut(from_peer, "send_cancellations", |live| { let to_remove = live diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 2002924..0b506ab 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -38,6 +38,7 @@ use crate::bitv_factory::BitVFactory; use crate::chunk_tracker::ChunkTracker; use crate::file_info::FileInfo; use crate::session::TorrentId; +use crate::session_stats::atomic::AtomicSessionStats; use crate::spawn_utils::BlockingSpawner; use crate::storage::BoxStorageFactory; use crate::stream_connect::StreamConnector; @@ -211,6 +212,7 @@ impl ManagedTorrent { live_cancellation_token: CancellationToken, init_semaphore: Arc, bitv_factory: Arc, + session_stats: Arc, ) -> anyhow::Result<()> { let mut g = self.locked.write(); @@ -319,8 +321,12 @@ impl ManagedTorrent { } let (tx, rx) = tokio::sync::oneshot::channel(); - let live = - TorrentStateLive::new(paused, tx, live_cancellation_token)?; + let live = TorrentStateLive::new( + paused, + tx, + live_cancellation_token, + session_stats, + )?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); @@ -345,7 +351,12 @@ impl ManagedTorrent { ManagedTorrentState::Paused(_) => { let paused = g.state.take().assert_paused(); let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone())?; + let live = TorrentStateLive::new( + paused, + tx, + live_cancellation_token.clone(), + session_stats, + )?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); @@ -371,6 +382,7 @@ impl ManagedTorrent { live_cancellation_token, init_semaphore, bitv_factory, + session_stats, ) } ManagedTorrentState::None => bail!("bug: torrent is in empty state"),