diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 43077fe..11395d8 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -19,12 +19,12 @@ use tracing::{info, warn}; use axum::Router; use crate::http_api_error::{ApiError, ApiErrorExt}; -use crate::peer_state::PeerStatsFilter; use crate::session::{ AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, }; use crate::torrent_manager::TorrentManagerHandle; -use crate::torrent_state::StatsSnapshot; +use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}; +use crate::torrent_state::stats::snapshot::StatsSnapshot; // Public API #[derive(Clone)] @@ -386,11 +386,7 @@ impl ApiInternal { make_torrent_details(&info_hash, handle.torrent_state().info(), only_files) } - fn api_peer_stats( - &self, - idx: usize, - filter: PeerStatsFilter, - ) -> Result { + fn api_peer_stats(&self, idx: usize, filter: PeerStatsFilter) -> Result { let handle = self.mgr_handle(idx)?; Ok(handle.torrent_state().per_peer_stats_snapshot(filter)) } diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 097d45e..78d44aa 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -6,7 +6,6 @@ pub mod http_api_client; mod http_api_error; pub mod peer_connection; pub mod peer_info_reader; -pub mod peer_state; pub mod session; pub mod spawn_utils; pub mod torrent_manager; diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index c0a84d3..fa49040 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -39,6 +39,10 @@ // > so don't lock them both at the same time at all, or at the worst lock them in the // > same order (peers one first, then the global one). +pub mod peer; +pub mod peers; +pub mod stats; + use std::{ collections::HashMap, fs::File, @@ -55,7 +59,6 @@ use anyhow::{bail, Context}; use backoff::backoff::Backoff; use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; -use dashmap::DashMap; use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::{ id20::Id20, @@ -66,7 +69,6 @@ use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; use peer_binary_protocol::{ extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request, }; -use serde::Serialize; use sha1w::Sha1; use tokio::{ sync::{ @@ -83,140 +85,29 @@ use crate::{ peer_connection::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, - peer_state::{ - atomic_inc, AggregatePeerStatsAtomic, InflightRequest, LivePeerState, Peer, PeerCounters, - PeerRx, PeerState, PeerStatsFilter, PeerStatsSnapshot, PeerTx, SendMany, - }, spawn_utils::{spawn, BlockingSpawner}, type_aliases::{PeerHandle, BF}, }; +use self::{ + peer::{ + stats::{ + atomic::PeerCounters as AtomicPeerCounters, + snapshot::{PeerStatsFilter, PeerStatsSnapshot}, + }, + InflightRequest, PeerState, PeerTx, SendMany, + }, + peers::PeerStates, + stats::{atomic::AtomicStats, snapshot::StatsSnapshot}, +}; + +use super::utils::{timeit, TimedExistence}; + pub struct InflightPiece { pub peer: PeerHandle, pub started: Instant, } -#[derive(Default)] -pub struct PeerStates { - stats: AggregatePeerStatsAtomic, - states: DashMap, -} - -#[derive(Debug, Default, Serialize, PartialEq, Eq)] -pub struct AggregatePeerStats { - pub queued: usize, - pub connecting: usize, - pub live: usize, - pub seen: usize, - pub dead: usize, - pub not_needed: usize, -} - -impl<'a> From<&'a AggregatePeerStatsAtomic> for AggregatePeerStats { - fn from(s: &'a AggregatePeerStatsAtomic) -> Self { - let ordering = Ordering::Relaxed; - Self { - queued: s.queued.load(ordering) as usize, - connecting: s.connecting.load(ordering) as usize, - live: s.live.load(ordering) as usize, - seen: s.seen.load(ordering) as usize, - dead: s.dead.load(ordering) as usize, - not_needed: s.not_needed.load(ordering) as usize, - } - } -} - -impl PeerStates { - pub fn stats(&self) -> AggregatePeerStats { - AggregatePeerStats::from(&self.stats) - } - - pub fn add_if_not_seen(&self, addr: SocketAddr) -> Option { - use dashmap::mapref::entry::Entry; - match self.states.entry(addr) { - Entry::Occupied(_) => None, - Entry::Vacant(vac) => { - vac.insert(Default::default()); - atomic_inc(&self.stats.queued); - atomic_inc(&self.stats.seen); - Some(addr) - } - } - } - pub fn with_peer(&self, addr: PeerHandle, f: impl FnOnce(&Peer) -> R) -> Option { - self.states.get(&addr).map(|e| f(e.value())) - } - - pub fn with_peer_mut( - &self, - addr: PeerHandle, - reason: &'static str, - f: impl FnOnce(&mut Peer) -> R, - ) -> Option { - timeit(reason, || self.states.get_mut(&addr)) - .map(|e| f(TimedExistence::new(e, reason).value_mut())) - } - pub fn with_live(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option { - self.states - .get(&addr) - .and_then(|e| match &e.value().state.get() { - PeerState::Live(l) => Some(f(l)), - _ => None, - }) - } - pub fn with_live_mut( - &self, - addr: PeerHandle, - reason: &'static str, - f: impl FnOnce(&mut LivePeerState) -> R, - ) -> Option { - self.with_peer_mut(addr, reason, |peer| peer.state.get_live_mut().map(f)) - .flatten() - } - - pub fn drop_peer(&self, handle: PeerHandle) -> Option { - let p = self.states.remove(&handle).map(|r| r.1)?; - self.stats.dec(p.state.get()); - Some(p) - } - - pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option { - self.with_live_mut(handle, "mark_peer_interested", |live| { - let prev = live.peer_interested; - live.peer_interested = is_interested; - prev - }) - } - pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec) -> Option<()> { - self.with_live_mut(handle, "update_bitfield_from_vec", |live| { - live.bitfield = BF::from_vec(bitfield); - }) - } - pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<(PeerRx, PeerTx)> { - let rx = self - .with_peer_mut(h, "mark_peer_connecting", |peer| { - peer.state - .queued_to_connecting(&self.stats) - .context("invalid peer state") - }) - .context("peer not found in states")??; - Ok(rx) - } - - fn reset_peer_backoff(&self, handle: PeerHandle) { - self.with_peer_mut(handle, "reset_peer_backoff", |p| { - p.stats.backoff.reset(); - }); - } - - fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { - let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { - peer.state.to_not_needed(&self.stats) - })?; - Some(prev) - } -} - pub struct TorrentStateLocked { // What chunks we have and need. pub chunks: ChunkTracker, @@ -226,54 +117,6 @@ pub struct TorrentStateLocked { pub inflight_pieces: HashMap, } -#[derive(Default, Debug)] -struct AtomicStats { - have_bytes: AtomicU64, - downloaded_and_checked_bytes: AtomicU64, - downloaded_and_checked_pieces: AtomicU64, - uploaded_bytes: AtomicU64, - fetched_bytes: AtomicU64, - total_piece_download_ms: AtomicU64, -} - -impl AtomicStats { - fn average_piece_download_time(&self) -> Option { - let d = self.downloaded_and_checked_pieces.load(Ordering::Acquire); - let t = self.total_piece_download_ms.load(Ordering::Acquire); - if d == 0 { - return None; - } - Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64)) - } -} - -#[derive(Debug, Serialize)] -pub struct StatsSnapshot { - pub have_bytes: u64, - pub downloaded_and_checked_bytes: u64, - pub downloaded_and_checked_pieces: u64, - pub fetched_bytes: u64, - pub uploaded_bytes: u64, - pub initially_needed_bytes: u64, - pub remaining_bytes: u64, - pub total_bytes: u64, - #[serde(skip)] - pub time: Instant, - pub total_piece_download_ms: u64, - pub peer_stats: AggregatePeerStats, -} - -impl StatsSnapshot { - pub fn average_piece_download_time(&self) -> Option { - let d = self.downloaded_and_checked_pieces; - let t = self.total_piece_download_ms; - if d == 0 { - return None; - } - Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64)) - } -} - #[derive(Default)] pub struct TorrentStateOptions { pub peer_connect_timeout: Option, @@ -303,105 +146,6 @@ pub struct TorrentState { finished_notify: Notify, } -// Used during debugging to see if some locks take too long. -#[cfg(not(feature = "timed_existence"))] -mod timed_existence { - use std::ops::{Deref, DerefMut}; - - pub struct TimedExistence(T); - - impl TimedExistence { - #[inline(always)] - pub fn new(object: T, _reason: &'static str) -> Self { - Self(object) - } - } - - impl Deref for TimedExistence { - type Target = T; - - #[inline(always)] - fn deref(&self) -> &Self::Target { - &self.0 - } - } - - impl DerefMut for TimedExistence { - #[inline(always)] - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } - } - - #[inline(always)] - pub fn timeit(_n: impl std::fmt::Display, f: impl FnOnce() -> R) -> R { - f() - } -} - -#[cfg(feature = "timed_existence")] -mod timed_existence { - use std::ops::{Deref, DerefMut}; - use std::time::{Duration, Instant}; - use tracing::warn; - - const MAX: Duration = Duration::from_millis(1); - - // Prints if the object exists for too long. - // This is used to track long-lived locks for debugging. - pub struct TimedExistence { - object: T, - reason: &'static str, - started: Instant, - } - - impl TimedExistence { - pub fn new(object: T, reason: &'static str) -> Self { - Self { - object, - reason, - started: Instant::now(), - } - } - } - - impl Drop for TimedExistence { - fn drop(&mut self) { - let elapsed = self.started.elapsed(); - let reason = self.reason; - if elapsed > MAX { - warn!("elapsed on lock {reason:?}: {elapsed:?}") - } - } - } - - impl Deref for TimedExistence { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.object - } - } - - impl DerefMut for TimedExistence { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.object - } - } - - pub fn timeit(name: impl std::fmt::Display, f: impl FnOnce() -> R) -> R { - let now = Instant::now(); - let r = f(); - let elapsed = now.elapsed(); - if elapsed > MAX { - warn!("elapsed on \"{name:}\": {elapsed:?}") - } - r - } -} - -pub use timed_existence::{timeit, TimedExistence}; - impl TorrentState { #[allow(clippy::too_many_arguments)] pub fn new( @@ -734,7 +478,7 @@ struct PeerHandlerLocked { // This state tracks a live peer. struct PeerHandler { state: Arc, - counters: Arc, + counters: Arc, // Semantically, we don't need an RwLock here, as this is only requested from // one future (requester + manage_peer). // diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs similarity index 54% rename from crates/librqbit/src/peer_state.rs rename to crates/librqbit/src/torrent_state/live/peer/mod.rs index a34633b..773be14 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -1,3 +1,5 @@ +pub mod stats; + use std::collections::HashSet; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; @@ -13,6 +15,8 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::peer_connection::WriterRequest; use crate::type_aliases::BF; +use super::peers::stats::atomic::AggregatePeerStatsAtomic; + #[derive(Debug, Hash, PartialEq, Eq)] pub struct InflightRequest { pub piece: ValidPieceIndex, @@ -45,85 +49,10 @@ impl SendMany for PeerTx { } } -#[derive(Default, Debug)] -pub struct PeerCounters { - pub fetched_bytes: AtomicU64, - pub total_time_connecting_ms: AtomicU64, - pub connection_attempts: AtomicU32, - pub connections: AtomicU32, - pub errors: AtomicU32, - pub fetched_chunks: AtomicU32, - pub downloaded_and_checked_pieces: AtomicU32, - pub downloaded_and_checked_bytes: AtomicU64, -} - -#[derive(Debug)] -pub struct PeerStats { - pub counters: Arc, - pub backoff: ExponentialBackoff, -} - -impl Default for PeerStats { - fn default() -> Self { - Self { - counters: Arc::new(Default::default()), - backoff: ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_secs(10)) - .with_multiplier(6.) - .with_max_interval(Duration::from_secs(3600)) - .with_max_elapsed_time(Some(Duration::from_secs(86400))) - .build(), - } - } -} - #[derive(Debug, Default)] pub struct Peer { pub state: PeerStateNoMut, - pub stats: PeerStats, -} - -#[derive(Debug, Default, Serialize)] -pub struct AggregatePeerStatsAtomic { - pub queued: AtomicU32, - pub connecting: AtomicU32, - pub live: AtomicU32, - pub seen: AtomicU32, - pub dead: AtomicU32, - pub not_needed: AtomicU32, -} - -pub fn atomic_inc(c: &AtomicU32) -> u32 { - c.fetch_add(1, Ordering::Relaxed) -} - -pub fn atomic_dec(c: &AtomicU32) -> u32 { - c.fetch_sub(1, Ordering::Relaxed) -} - -impl AggregatePeerStatsAtomic { - pub fn counter(&self, state: &PeerState) -> &AtomicU32 { - match state { - PeerState::Connecting(_) => &self.connecting, - PeerState::Live(_) => &self.live, - PeerState::Queued => &self.queued, - PeerState::Dead => &self.dead, - PeerState::NotNeeded => &self.not_needed, - } - } - - pub fn inc(&self, state: &PeerState) { - atomic_inc(self.counter(state)); - } - - pub fn dec(&self, state: &PeerState) { - atomic_dec(self.counter(state)); - } - - pub fn incdec(&self, old: &PeerState, new: &PeerState) { - self.dec(old); - self.inc(new); - } + pub stats: stats::atomic::PeerStats, } #[derive(Debug, Default)] @@ -268,82 +197,3 @@ impl LivePeerState { .map_or(false, |s| s.all()) } } - -mod peer_stats_snapshot { - use std::{collections::HashMap, sync::atomic::Ordering}; - - use serde::{Deserialize, Serialize}; - - use crate::peer_state::PeerState; - - #[derive(Serialize, Deserialize)] - pub struct PeerCounters { - pub fetched_bytes: u64, - pub total_time_connecting_ms: u64, - pub connection_attempts: u32, - pub connections: u32, - pub errors: u32, - pub fetched_chunks: u32, - pub downloaded_and_checked_pieces: u32, - } - - #[derive(Serialize, Deserialize)] - pub struct PeerStats { - pub counters: PeerCounters, - pub state: &'static str, - } - - impl From<&crate::peer_state::PeerCounters> for PeerCounters { - fn from(counters: &crate::peer_state::PeerCounters) -> Self { - Self { - fetched_bytes: counters.fetched_bytes.load(Ordering::Relaxed), - total_time_connecting_ms: counters.total_time_connecting_ms.load(Ordering::Relaxed), - connection_attempts: counters.connection_attempts.load(Ordering::Relaxed), - connections: counters.connections.load(Ordering::Relaxed), - errors: counters.errors.load(Ordering::Relaxed), - fetched_chunks: counters.fetched_chunks.load(Ordering::Relaxed), - downloaded_and_checked_pieces: counters - .downloaded_and_checked_pieces - .load(Ordering::Relaxed), - } - } - } - - impl From<&crate::peer_state::Peer> for PeerStats { - fn from(peer: &crate::peer_state::Peer) -> Self { - Self { - counters: peer.stats.counters.as_ref().into(), - state: peer.state.get().name(), - } - } - } - - #[derive(Serialize)] - pub struct PeerStatsSnapshot { - pub peers: HashMap, - } - - #[derive(Clone, Copy, Default, Deserialize)] - pub enum PeerStatsFilterState { - All, - #[default] - Live, - } - - impl PeerStatsFilterState { - pub fn matches(&self, s: &PeerState) -> bool { - match (self, s) { - (Self::All, _) => true, - (Self::Live, PeerState::Live(_)) => true, - _ => false, - } - } - } - - #[derive(Default, Deserialize)] - pub struct PeerStatsFilter { - pub state: PeerStatsFilterState, - } -} - -pub use peer_stats_snapshot::{PeerStatsFilter, PeerStatsSnapshot}; diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs b/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs new file mode 100644 index 0000000..bca260f --- /dev/null +++ b/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs @@ -0,0 +1,41 @@ +use std::{ + sync::{ + atomic::{AtomicU32, AtomicU64}, + Arc, + }, + time::Duration, +}; + +use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; + +#[derive(Default, Debug)] +pub struct PeerCounters { + pub fetched_bytes: AtomicU64, + pub total_time_connecting_ms: AtomicU64, + pub connection_attempts: AtomicU32, + pub connections: AtomicU32, + pub errors: AtomicU32, + pub fetched_chunks: AtomicU32, + pub downloaded_and_checked_pieces: AtomicU32, + pub downloaded_and_checked_bytes: AtomicU64, +} + +#[derive(Debug)] +pub struct PeerStats { + pub counters: Arc, + pub backoff: ExponentialBackoff, +} + +impl Default for PeerStats { + fn default() -> Self { + Self { + counters: Arc::new(Default::default()), + backoff: ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_secs(10)) + .with_multiplier(6.) + .with_max_interval(Duration::from_secs(3600)) + .with_max_elapsed_time(Some(Duration::from_secs(86400))) + .build(), + } + } +} diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/mod.rs b/crates/librqbit/src/torrent_state/live/peer/stats/mod.rs new file mode 100644 index 0000000..1f2b657 --- /dev/null +++ b/crates/librqbit/src/torrent_state/live/peer/stats/mod.rs @@ -0,0 +1,2 @@ +pub mod atomic; +pub mod snapshot; diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs new file mode 100644 index 0000000..81c00cb --- /dev/null +++ b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs @@ -0,0 +1,74 @@ +use std::{collections::HashMap, sync::atomic::Ordering}; + +use serde::{Deserialize, Serialize}; + +use crate::torrent_state::live::peer::{Peer, PeerState}; + +#[derive(Serialize, Deserialize)] +pub struct PeerCounters { + pub fetched_bytes: u64, + pub total_time_connecting_ms: u64, + pub connection_attempts: u32, + pub connections: u32, + pub errors: u32, + pub fetched_chunks: u32, + pub downloaded_and_checked_pieces: u32, +} + +#[derive(Serialize, Deserialize)] +pub struct PeerStats { + pub counters: PeerCounters, + pub state: &'static str, +} + +impl From<&super::atomic::PeerCounters> for PeerCounters { + fn from(counters: &super::atomic::PeerCounters) -> Self { + Self { + fetched_bytes: counters.fetched_bytes.load(Ordering::Relaxed), + total_time_connecting_ms: counters.total_time_connecting_ms.load(Ordering::Relaxed), + connection_attempts: counters.connection_attempts.load(Ordering::Relaxed), + connections: counters.connections.load(Ordering::Relaxed), + errors: counters.errors.load(Ordering::Relaxed), + fetched_chunks: counters.fetched_chunks.load(Ordering::Relaxed), + downloaded_and_checked_pieces: counters + .downloaded_and_checked_pieces + .load(Ordering::Relaxed), + } + } +} + +impl From<&Peer> for PeerStats { + fn from(peer: &Peer) -> Self { + Self { + counters: peer.stats.counters.as_ref().into(), + state: peer.state.get().name(), + } + } +} + +#[derive(Serialize)] +pub struct PeerStatsSnapshot { + pub peers: HashMap, +} + +#[derive(Clone, Copy, Default, Deserialize)] +pub enum PeerStatsFilterState { + All, + #[default] + Live, +} + +impl PeerStatsFilterState { + pub fn matches(&self, s: &PeerState) -> bool { + match (self, s) { + (Self::All, _) => true, + (Self::Live, PeerState::Live(_)) => true, + _ => false, + } + } +} + +#[derive(Default, Deserialize)] +pub struct PeerStatsFilter { + pub state: PeerStatsFilterState, +} diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs new file mode 100644 index 0000000..7973a27 --- /dev/null +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -0,0 +1,114 @@ +use std::net::SocketAddr; + +use anyhow::Context; +use backoff::backoff::Backoff; +use dashmap::DashMap; + +use crate::{ + torrent_state::utils::{atomic_inc, TimedExistence}, + type_aliases::{PeerHandle, BF}, +}; + +use self::stats::{atomic::AggregatePeerStatsAtomic, snapshot::AggregatePeerStats}; + +use super::peer::{LivePeerState, Peer, PeerRx, PeerState, PeerTx}; + +pub mod stats; + +#[derive(Default)] +pub struct PeerStates { + pub stats: AggregatePeerStatsAtomic, + pub states: DashMap, +} + +impl PeerStates { + pub fn stats(&self) -> AggregatePeerStats { + AggregatePeerStats::from(&self.stats) + } + + pub fn add_if_not_seen(&self, addr: SocketAddr) -> Option { + use dashmap::mapref::entry::Entry; + match self.states.entry(addr) { + Entry::Occupied(_) => None, + Entry::Vacant(vac) => { + vac.insert(Default::default()); + atomic_inc(&self.stats.queued); + atomic_inc(&self.stats.seen); + Some(addr) + } + } + } + pub fn with_peer(&self, addr: PeerHandle, f: impl FnOnce(&Peer) -> R) -> Option { + self.states.get(&addr).map(|e| f(e.value())) + } + + pub fn with_peer_mut( + &self, + addr: PeerHandle, + reason: &'static str, + f: impl FnOnce(&mut Peer) -> R, + ) -> Option { + use crate::torrent_state::utils::timeit; + timeit(reason, || self.states.get_mut(&addr)) + .map(|e| f(TimedExistence::new(e, reason).value_mut())) + } + pub fn with_live(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option { + self.states + .get(&addr) + .and_then(|e| match &e.value().state.get() { + PeerState::Live(l) => Some(f(l)), + _ => None, + }) + } + pub fn with_live_mut( + &self, + addr: PeerHandle, + reason: &'static str, + f: impl FnOnce(&mut LivePeerState) -> R, + ) -> Option { + self.with_peer_mut(addr, reason, |peer| peer.state.get_live_mut().map(f)) + .flatten() + } + + pub fn drop_peer(&self, handle: PeerHandle) -> Option { + let p = self.states.remove(&handle).map(|r| r.1)?; + self.stats.dec(p.state.get()); + Some(p) + } + + pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option { + self.with_live_mut(handle, "mark_peer_interested", |live| { + let prev = live.peer_interested; + live.peer_interested = is_interested; + prev + }) + } + pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec) -> Option<()> { + self.with_live_mut(handle, "update_bitfield_from_vec", |live| { + live.bitfield = BF::from_vec(bitfield); + }) + } + pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<(PeerRx, PeerTx)> { + let rx = self + .with_peer_mut(h, "mark_peer_connecting", |peer| { + peer.state + .queued_to_connecting(&self.stats) + .context("invalid peer state") + }) + .context("peer not found in states")??; + Ok(rx) + } + + pub fn reset_peer_backoff(&self, handle: PeerHandle) { + self.with_peer_mut(handle, "reset_peer_backoff", |p| { + p.stats.backoff.reset(); + }); + } + + pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { + let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { + peer.state.to_not_needed(&self.stats) + })?; + Some(prev) + } +} diff --git a/crates/librqbit/src/torrent_state/live/peers/stats/atomic.rs b/crates/librqbit/src/torrent_state/live/peers/stats/atomic.rs new file mode 100644 index 0000000..a9188c3 --- /dev/null +++ b/crates/librqbit/src/torrent_state/live/peers/stats/atomic.rs @@ -0,0 +1,43 @@ +use std::sync::atomic::AtomicU32; + +use serde::Serialize; + +use crate::torrent_state::{ + live::peer::PeerState, + utils::{atomic_dec, atomic_inc}, +}; + +#[derive(Debug, Default, Serialize)] +pub struct AggregatePeerStatsAtomic { + pub queued: AtomicU32, + pub connecting: AtomicU32, + pub live: AtomicU32, + pub seen: AtomicU32, + pub dead: AtomicU32, + pub not_needed: AtomicU32, +} + +impl AggregatePeerStatsAtomic { + pub fn counter(&self, state: &PeerState) -> &AtomicU32 { + match state { + PeerState::Connecting(_) => &self.connecting, + PeerState::Live(_) => &self.live, + PeerState::Queued => &self.queued, + PeerState::Dead => &self.dead, + PeerState::NotNeeded => &self.not_needed, + } + } + + pub fn inc(&self, state: &PeerState) { + atomic_inc(self.counter(state)); + } + + pub fn dec(&self, state: &PeerState) { + atomic_dec(self.counter(state)); + } + + pub fn incdec(&self, old: &PeerState, new: &PeerState) { + self.dec(old); + self.inc(new); + } +} diff --git a/crates/librqbit/src/torrent_state/live/peers/stats/mod.rs b/crates/librqbit/src/torrent_state/live/peers/stats/mod.rs new file mode 100644 index 0000000..1f2b657 --- /dev/null +++ b/crates/librqbit/src/torrent_state/live/peers/stats/mod.rs @@ -0,0 +1,2 @@ +pub mod atomic; +pub mod snapshot; diff --git a/crates/librqbit/src/torrent_state/live/peers/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/peers/stats/snapshot.rs new file mode 100644 index 0000000..a42ad2c --- /dev/null +++ b/crates/librqbit/src/torrent_state/live/peers/stats/snapshot.rs @@ -0,0 +1,29 @@ +use std::sync::atomic::Ordering; + +use serde::Serialize; + +use super::atomic::AggregatePeerStatsAtomic; + +#[derive(Debug, Default, Serialize, PartialEq, Eq)] +pub struct AggregatePeerStats { + pub queued: usize, + pub connecting: usize, + pub live: usize, + pub seen: usize, + pub dead: usize, + pub not_needed: usize, +} + +impl<'a> From<&'a AggregatePeerStatsAtomic> for AggregatePeerStats { + fn from(s: &'a AggregatePeerStatsAtomic) -> Self { + let ordering = Ordering::Relaxed; + Self { + queued: s.queued.load(ordering) as usize, + connecting: s.connecting.load(ordering) as usize, + live: s.live.load(ordering) as usize, + seen: s.seen.load(ordering) as usize, + dead: s.dead.load(ordering) as usize, + not_needed: s.not_needed.load(ordering) as usize, + } + } +} diff --git a/crates/librqbit/src/torrent_state/live/stats/atomic.rs b/crates/librqbit/src/torrent_state/live/stats/atomic.rs new file mode 100644 index 0000000..4e3c024 --- /dev/null +++ b/crates/librqbit/src/torrent_state/live/stats/atomic.rs @@ -0,0 +1,25 @@ +use std::{ + sync::atomic::{AtomicU64, Ordering}, + time::Duration, +}; + +#[derive(Default, Debug)] +pub struct AtomicStats { + pub have_bytes: AtomicU64, + pub downloaded_and_checked_bytes: AtomicU64, + pub downloaded_and_checked_pieces: AtomicU64, + pub uploaded_bytes: AtomicU64, + pub fetched_bytes: AtomicU64, + pub total_piece_download_ms: AtomicU64, +} + +impl AtomicStats { + pub fn average_piece_download_time(&self) -> Option { + let d = self.downloaded_and_checked_pieces.load(Ordering::Acquire); + let t = self.total_piece_download_ms.load(Ordering::Acquire); + if d == 0 { + return None; + } + Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64)) + } +} diff --git a/crates/librqbit/src/torrent_state/live/stats/mod.rs b/crates/librqbit/src/torrent_state/live/stats/mod.rs new file mode 100644 index 0000000..1f2b657 --- /dev/null +++ b/crates/librqbit/src/torrent_state/live/stats/mod.rs @@ -0,0 +1,2 @@ +pub mod atomic; +pub mod snapshot; diff --git a/crates/librqbit/src/torrent_state/live/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/stats/snapshot.rs new file mode 100644 index 0000000..f22b8be --- /dev/null +++ b/crates/librqbit/src/torrent_state/live/stats/snapshot.rs @@ -0,0 +1,32 @@ +use std::time::{Duration, Instant}; + +use serde::Serialize; + +use crate::torrent_state::live::peers::stats::snapshot::AggregatePeerStats; + +#[derive(Debug, Serialize)] +pub struct StatsSnapshot { + pub have_bytes: u64, + pub downloaded_and_checked_bytes: u64, + pub downloaded_and_checked_pieces: u64, + pub fetched_bytes: u64, + pub uploaded_bytes: u64, + pub initially_needed_bytes: u64, + pub remaining_bytes: u64, + pub total_bytes: u64, + #[serde(skip)] + pub time: Instant, + pub total_piece_download_ms: u64, + pub peer_stats: AggregatePeerStats, +} + +impl StatsSnapshot { + pub fn average_piece_download_time(&self) -> Option { + let d = self.downloaded_and_checked_pieces; + let t = self.total_piece_download_ms; + if d == 0 { + return None; + } + Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64)) + } +} diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 0ac1567..c1c0d09 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -1,3 +1,5 @@ +pub mod utils; + pub mod live; pub use live::*; diff --git a/crates/librqbit/src/torrent_state/utils.rs b/crates/librqbit/src/torrent_state/utils.rs new file mode 100644 index 0000000..3323cba --- /dev/null +++ b/crates/librqbit/src/torrent_state/utils.rs @@ -0,0 +1,108 @@ +use std::sync::atomic::{AtomicU32, Ordering}; + +pub fn atomic_inc(c: &AtomicU32) -> u32 { + c.fetch_add(1, Ordering::Relaxed) +} + +pub fn atomic_dec(c: &AtomicU32) -> u32 { + c.fetch_sub(1, Ordering::Relaxed) +} + +// Used during debugging to see if some locks take too long. +#[cfg(not(feature = "timed_existence"))] +mod timed_existence { + use std::ops::{Deref, DerefMut}; + + pub struct TimedExistence(T); + + impl TimedExistence { + #[inline(always)] + pub fn new(object: T, _reason: &'static str) -> Self { + Self(object) + } + } + + impl Deref for TimedExistence { + type Target = T; + + #[inline(always)] + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl DerefMut for TimedExistence { + #[inline(always)] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } + } + + #[inline(always)] + pub fn timeit(_n: impl std::fmt::Display, f: impl FnOnce() -> R) -> R { + f() + } +} + +#[cfg(feature = "timed_existence")] +mod timed_existence { + use std::ops::{Deref, DerefMut}; + use std::time::{Duration, Instant}; + use tracing::warn; + + const MAX: Duration = Duration::from_millis(1); + + // Prints if the object exists for too long. + // This is used to track long-lived locks for debugging. + pub struct TimedExistence { + object: T, + reason: &'static str, + started: Instant, + } + + impl TimedExistence { + pub fn new(object: T, reason: &'static str) -> Self { + Self { + object, + reason, + started: Instant::now(), + } + } + } + + impl Drop for TimedExistence { + fn drop(&mut self) { + let elapsed = self.started.elapsed(); + let reason = self.reason; + if elapsed > MAX { + warn!("elapsed on lock {reason:?}: {elapsed:?}") + } + } + } + + impl Deref for TimedExistence { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.object + } + } + + impl DerefMut for TimedExistence { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.object + } + } + + pub fn timeit(name: impl std::fmt::Display, f: impl FnOnce() -> R) -> R { + let now = Instant::now(); + let r = f(); + let elapsed = now.elapsed(); + if elapsed > MAX { + warn!("elapsed on \"{name:}\": {elapsed:?}") + } + r + } +} + +pub use timed_existence::{timeit, TimedExistence}; diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 33fd911..e457e36 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -11,7 +11,6 @@ use librqbit::{ Session, SessionOptions, }, spawn_utils::{spawn, BlockingSpawner}, - torrent_state::timeit, }; use size_format::SizeFormatterBinary as SF; use tracing::{error, info, span, warn, Level}; @@ -244,7 +243,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> info!("[{}] initializing", idx); }, ManagedTorrentState::Running(handle) => { - let stats = timeit("stats_snapshot", || handle.torrent_state().stats_snapshot()); + let stats = handle.torrent_state().stats_snapshot(); let speed = handle.speed_estimator(); let total = stats.total_bytes; let progress = stats.total_bytes - stats.remaining_bytes;