split up librqbit torrent state into smaller files

This commit is contained in:
Igor Katson 2023-11-23 16:27:55 +00:00
parent 7cd6102c6a
commit edaf3e0997
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
17 changed files with 502 additions and 440 deletions

View file

@ -19,12 +19,12 @@ use tracing::{info, warn};
use axum::Router;
use crate::http_api_error::{ApiError, ApiErrorExt};
use crate::peer_state::PeerStatsFilter;
use crate::session::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session,
};
use crate::torrent_manager::TorrentManagerHandle;
use crate::torrent_state::StatsSnapshot;
use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot};
use crate::torrent_state::stats::snapshot::StatsSnapshot;
// Public API
#[derive(Clone)]
@ -386,11 +386,7 @@ impl ApiInternal {
make_torrent_details(&info_hash, handle.torrent_state().info(), only_files)
}
fn api_peer_stats(
&self,
idx: usize,
filter: PeerStatsFilter,
) -> Result<crate::peer_state::PeerStatsSnapshot> {
fn api_peer_stats(&self, idx: usize, filter: PeerStatsFilter) -> Result<PeerStatsSnapshot> {
let handle = self.mgr_handle(idx)?;
Ok(handle.torrent_state().per_peer_stats_snapshot(filter))
}

View file

@ -6,7 +6,6 @@ pub mod http_api_client;
mod http_api_error;
pub mod peer_connection;
pub mod peer_info_reader;
pub mod peer_state;
pub mod session;
pub mod spawn_utils;
pub mod torrent_manager;

View file

@ -39,6 +39,10 @@
// > so don't lock them both at the same time at all, or at the worst lock them in the
// > same order (peers one first, then the global one).
pub mod peer;
pub mod peers;
pub mod stats;
use std::{
collections::HashMap,
fs::File,
@ -55,7 +59,6 @@ use anyhow::{bail, Context};
use backoff::backoff::Backoff;
use buffers::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned;
use dashmap::DashMap;
use futures::{stream::FuturesUnordered, StreamExt};
use librqbit_core::{
id20::Id20,
@ -66,7 +69,6 @@ use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use peer_binary_protocol::{
extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request,
};
use serde::Serialize;
use sha1w::Sha1;
use tokio::{
sync::{
@ -83,140 +85,29 @@ use crate::{
peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
},
peer_state::{
atomic_inc, AggregatePeerStatsAtomic, InflightRequest, LivePeerState, Peer, PeerCounters,
PeerRx, PeerState, PeerStatsFilter, PeerStatsSnapshot, PeerTx, SendMany,
},
spawn_utils::{spawn, BlockingSpawner},
type_aliases::{PeerHandle, BF},
};
use self::{
peer::{
stats::{
atomic::PeerCounters as AtomicPeerCounters,
snapshot::{PeerStatsFilter, PeerStatsSnapshot},
},
InflightRequest, PeerState, PeerTx, SendMany,
},
peers::PeerStates,
stats::{atomic::AtomicStats, snapshot::StatsSnapshot},
};
use super::utils::{timeit, TimedExistence};
pub struct InflightPiece {
pub peer: PeerHandle,
pub started: Instant,
}
#[derive(Default)]
pub struct PeerStates {
stats: AggregatePeerStatsAtomic,
states: DashMap<PeerHandle, Peer>,
}
#[derive(Debug, Default, Serialize, PartialEq, Eq)]
pub struct AggregatePeerStats {
pub queued: usize,
pub connecting: usize,
pub live: usize,
pub seen: usize,
pub dead: usize,
pub not_needed: usize,
}
impl<'a> From<&'a AggregatePeerStatsAtomic> for AggregatePeerStats {
fn from(s: &'a AggregatePeerStatsAtomic) -> Self {
let ordering = Ordering::Relaxed;
Self {
queued: s.queued.load(ordering) as usize,
connecting: s.connecting.load(ordering) as usize,
live: s.live.load(ordering) as usize,
seen: s.seen.load(ordering) as usize,
dead: s.dead.load(ordering) as usize,
not_needed: s.not_needed.load(ordering) as usize,
}
}
}
impl PeerStates {
pub fn stats(&self) -> AggregatePeerStats {
AggregatePeerStats::from(&self.stats)
}
pub fn add_if_not_seen(&self, addr: SocketAddr) -> Option<PeerHandle> {
use dashmap::mapref::entry::Entry;
match self.states.entry(addr) {
Entry::Occupied(_) => None,
Entry::Vacant(vac) => {
vac.insert(Default::default());
atomic_inc(&self.stats.queued);
atomic_inc(&self.stats.seen);
Some(addr)
}
}
}
pub fn with_peer<R>(&self, addr: PeerHandle, f: impl FnOnce(&Peer) -> R) -> Option<R> {
self.states.get(&addr).map(|e| f(e.value()))
}
pub fn with_peer_mut<R>(
&self,
addr: PeerHandle,
reason: &'static str,
f: impl FnOnce(&mut Peer) -> R,
) -> Option<R> {
timeit(reason, || self.states.get_mut(&addr))
.map(|e| f(TimedExistence::new(e, reason).value_mut()))
}
pub fn with_live<R>(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option<R> {
self.states
.get(&addr)
.and_then(|e| match &e.value().state.get() {
PeerState::Live(l) => Some(f(l)),
_ => None,
})
}
pub fn with_live_mut<R>(
&self,
addr: PeerHandle,
reason: &'static str,
f: impl FnOnce(&mut LivePeerState) -> R,
) -> Option<R> {
self.with_peer_mut(addr, reason, |peer| peer.state.get_live_mut().map(f))
.flatten()
}
pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> {
let p = self.states.remove(&handle).map(|r| r.1)?;
self.stats.dec(p.state.get());
Some(p)
}
pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option<bool> {
self.with_live_mut(handle, "mark_peer_interested", |live| {
let prev = live.peer_interested;
live.peer_interested = is_interested;
prev
})
}
pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec<u8>) -> Option<()> {
self.with_live_mut(handle, "update_bitfield_from_vec", |live| {
live.bitfield = BF::from_vec(bitfield);
})
}
pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<(PeerRx, PeerTx)> {
let rx = self
.with_peer_mut(h, "mark_peer_connecting", |peer| {
peer.state
.queued_to_connecting(&self.stats)
.context("invalid peer state")
})
.context("peer not found in states")??;
Ok(rx)
}
fn reset_peer_backoff(&self, handle: PeerHandle) {
self.with_peer_mut(handle, "reset_peer_backoff", |p| {
p.stats.backoff.reset();
});
}
fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<PeerState> {
let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| {
peer.state.to_not_needed(&self.stats)
})?;
Some(prev)
}
}
pub struct TorrentStateLocked {
// What chunks we have and need.
pub chunks: ChunkTracker,
@ -226,54 +117,6 @@ pub struct TorrentStateLocked {
pub inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
}
#[derive(Default, Debug)]
struct AtomicStats {
have_bytes: AtomicU64,
downloaded_and_checked_bytes: AtomicU64,
downloaded_and_checked_pieces: AtomicU64,
uploaded_bytes: AtomicU64,
fetched_bytes: AtomicU64,
total_piece_download_ms: AtomicU64,
}
impl AtomicStats {
fn average_piece_download_time(&self) -> Option<Duration> {
let d = self.downloaded_and_checked_pieces.load(Ordering::Acquire);
let t = self.total_piece_download_ms.load(Ordering::Acquire);
if d == 0 {
return None;
}
Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64))
}
}
#[derive(Debug, Serialize)]
pub struct StatsSnapshot {
pub have_bytes: u64,
pub downloaded_and_checked_bytes: u64,
pub downloaded_and_checked_pieces: u64,
pub fetched_bytes: u64,
pub uploaded_bytes: u64,
pub initially_needed_bytes: u64,
pub remaining_bytes: u64,
pub total_bytes: u64,
#[serde(skip)]
pub time: Instant,
pub total_piece_download_ms: u64,
pub peer_stats: AggregatePeerStats,
}
impl StatsSnapshot {
pub fn average_piece_download_time(&self) -> Option<Duration> {
let d = self.downloaded_and_checked_pieces;
let t = self.total_piece_download_ms;
if d == 0 {
return None;
}
Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64))
}
}
#[derive(Default)]
pub struct TorrentStateOptions {
pub peer_connect_timeout: Option<Duration>,
@ -303,105 +146,6 @@ pub struct TorrentState {
finished_notify: Notify,
}
// Used during debugging to see if some locks take too long.
#[cfg(not(feature = "timed_existence"))]
mod timed_existence {
use std::ops::{Deref, DerefMut};
pub struct TimedExistence<T>(T);
impl<T> TimedExistence<T> {
#[inline(always)]
pub fn new(object: T, _reason: &'static str) -> Self {
Self(object)
}
}
impl<T> Deref for TimedExistence<T> {
type Target = T;
#[inline(always)]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for TimedExistence<T> {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[inline(always)]
pub fn timeit<R>(_n: impl std::fmt::Display, f: impl FnOnce() -> R) -> R {
f()
}
}
#[cfg(feature = "timed_existence")]
mod timed_existence {
use std::ops::{Deref, DerefMut};
use std::time::{Duration, Instant};
use tracing::warn;
const MAX: Duration = Duration::from_millis(1);
// Prints if the object exists for too long.
// This is used to track long-lived locks for debugging.
pub struct TimedExistence<T> {
object: T,
reason: &'static str,
started: Instant,
}
impl<T> TimedExistence<T> {
pub fn new(object: T, reason: &'static str) -> Self {
Self {
object,
reason,
started: Instant::now(),
}
}
}
impl<T> Drop for TimedExistence<T> {
fn drop(&mut self) {
let elapsed = self.started.elapsed();
let reason = self.reason;
if elapsed > MAX {
warn!("elapsed on lock {reason:?}: {elapsed:?}")
}
}
}
impl<T> Deref for TimedExistence<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.object
}
}
impl<T> DerefMut for TimedExistence<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.object
}
}
pub fn timeit<R>(name: impl std::fmt::Display, f: impl FnOnce() -> R) -> R {
let now = Instant::now();
let r = f();
let elapsed = now.elapsed();
if elapsed > MAX {
warn!("elapsed on \"{name:}\": {elapsed:?}")
}
r
}
}
pub use timed_existence::{timeit, TimedExistence};
impl TorrentState {
#[allow(clippy::too_many_arguments)]
pub fn new(
@ -734,7 +478,7 @@ struct PeerHandlerLocked {
// This state tracks a live peer.
struct PeerHandler {
state: Arc<TorrentState>,
counters: Arc<PeerCounters>,
counters: Arc<AtomicPeerCounters>,
// Semantically, we don't need an RwLock here, as this is only requested from
// one future (requester + manage_peer).
//

View file

@ -1,3 +1,5 @@
pub mod stats;
use std::collections::HashSet;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
@ -13,6 +15,8 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::peer_connection::WriterRequest;
use crate::type_aliases::BF;
use super::peers::stats::atomic::AggregatePeerStatsAtomic;
#[derive(Debug, Hash, PartialEq, Eq)]
pub struct InflightRequest {
pub piece: ValidPieceIndex,
@ -45,85 +49,10 @@ impl SendMany for PeerTx {
}
}
#[derive(Default, Debug)]
pub struct PeerCounters {
pub fetched_bytes: AtomicU64,
pub total_time_connecting_ms: AtomicU64,
pub connection_attempts: AtomicU32,
pub connections: AtomicU32,
pub errors: AtomicU32,
pub fetched_chunks: AtomicU32,
pub downloaded_and_checked_pieces: AtomicU32,
pub downloaded_and_checked_bytes: AtomicU64,
}
#[derive(Debug)]
pub struct PeerStats {
pub counters: Arc<PeerCounters>,
pub backoff: ExponentialBackoff,
}
impl Default for PeerStats {
fn default() -> Self {
Self {
counters: Arc::new(Default::default()),
backoff: ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_secs(10))
.with_multiplier(6.)
.with_max_interval(Duration::from_secs(3600))
.with_max_elapsed_time(Some(Duration::from_secs(86400)))
.build(),
}
}
}
#[derive(Debug, Default)]
pub struct Peer {
pub state: PeerStateNoMut,
pub stats: PeerStats,
}
#[derive(Debug, Default, Serialize)]
pub struct AggregatePeerStatsAtomic {
pub queued: AtomicU32,
pub connecting: AtomicU32,
pub live: AtomicU32,
pub seen: AtomicU32,
pub dead: AtomicU32,
pub not_needed: AtomicU32,
}
pub fn atomic_inc(c: &AtomicU32) -> u32 {
c.fetch_add(1, Ordering::Relaxed)
}
pub fn atomic_dec(c: &AtomicU32) -> u32 {
c.fetch_sub(1, Ordering::Relaxed)
}
impl AggregatePeerStatsAtomic {
pub fn counter(&self, state: &PeerState) -> &AtomicU32 {
match state {
PeerState::Connecting(_) => &self.connecting,
PeerState::Live(_) => &self.live,
PeerState::Queued => &self.queued,
PeerState::Dead => &self.dead,
PeerState::NotNeeded => &self.not_needed,
}
}
pub fn inc(&self, state: &PeerState) {
atomic_inc(self.counter(state));
}
pub fn dec(&self, state: &PeerState) {
atomic_dec(self.counter(state));
}
pub fn incdec(&self, old: &PeerState, new: &PeerState) {
self.dec(old);
self.inc(new);
}
pub stats: stats::atomic::PeerStats,
}
#[derive(Debug, Default)]
@ -268,82 +197,3 @@ impl LivePeerState {
.map_or(false, |s| s.all())
}
}
mod peer_stats_snapshot {
use std::{collections::HashMap, sync::atomic::Ordering};
use serde::{Deserialize, Serialize};
use crate::peer_state::PeerState;
#[derive(Serialize, Deserialize)]
pub struct PeerCounters {
pub fetched_bytes: u64,
pub total_time_connecting_ms: u64,
pub connection_attempts: u32,
pub connections: u32,
pub errors: u32,
pub fetched_chunks: u32,
pub downloaded_and_checked_pieces: u32,
}
#[derive(Serialize, Deserialize)]
pub struct PeerStats {
pub counters: PeerCounters,
pub state: &'static str,
}
impl From<&crate::peer_state::PeerCounters> for PeerCounters {
fn from(counters: &crate::peer_state::PeerCounters) -> Self {
Self {
fetched_bytes: counters.fetched_bytes.load(Ordering::Relaxed),
total_time_connecting_ms: counters.total_time_connecting_ms.load(Ordering::Relaxed),
connection_attempts: counters.connection_attempts.load(Ordering::Relaxed),
connections: counters.connections.load(Ordering::Relaxed),
errors: counters.errors.load(Ordering::Relaxed),
fetched_chunks: counters.fetched_chunks.load(Ordering::Relaxed),
downloaded_and_checked_pieces: counters
.downloaded_and_checked_pieces
.load(Ordering::Relaxed),
}
}
}
impl From<&crate::peer_state::Peer> for PeerStats {
fn from(peer: &crate::peer_state::Peer) -> Self {
Self {
counters: peer.stats.counters.as_ref().into(),
state: peer.state.get().name(),
}
}
}
#[derive(Serialize)]
pub struct PeerStatsSnapshot {
pub peers: HashMap<String, PeerStats>,
}
#[derive(Clone, Copy, Default, Deserialize)]
pub enum PeerStatsFilterState {
All,
#[default]
Live,
}
impl PeerStatsFilterState {
pub fn matches(&self, s: &PeerState) -> bool {
match (self, s) {
(Self::All, _) => true,
(Self::Live, PeerState::Live(_)) => true,
_ => false,
}
}
}
#[derive(Default, Deserialize)]
pub struct PeerStatsFilter {
pub state: PeerStatsFilterState,
}
}
pub use peer_stats_snapshot::{PeerStatsFilter, PeerStatsSnapshot};

View file

@ -0,0 +1,41 @@
use std::{
sync::{
atomic::{AtomicU32, AtomicU64},
Arc,
},
time::Duration,
};
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
#[derive(Default, Debug)]
pub struct PeerCounters {
pub fetched_bytes: AtomicU64,
pub total_time_connecting_ms: AtomicU64,
pub connection_attempts: AtomicU32,
pub connections: AtomicU32,
pub errors: AtomicU32,
pub fetched_chunks: AtomicU32,
pub downloaded_and_checked_pieces: AtomicU32,
pub downloaded_and_checked_bytes: AtomicU64,
}
#[derive(Debug)]
pub struct PeerStats {
pub counters: Arc<PeerCounters>,
pub backoff: ExponentialBackoff,
}
impl Default for PeerStats {
fn default() -> Self {
Self {
counters: Arc::new(Default::default()),
backoff: ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_secs(10))
.with_multiplier(6.)
.with_max_interval(Duration::from_secs(3600))
.with_max_elapsed_time(Some(Duration::from_secs(86400)))
.build(),
}
}
}

View file

@ -0,0 +1,2 @@
pub mod atomic;
pub mod snapshot;

View file

@ -0,0 +1,74 @@
use std::{collections::HashMap, sync::atomic::Ordering};
use serde::{Deserialize, Serialize};
use crate::torrent_state::live::peer::{Peer, PeerState};
#[derive(Serialize, Deserialize)]
pub struct PeerCounters {
pub fetched_bytes: u64,
pub total_time_connecting_ms: u64,
pub connection_attempts: u32,
pub connections: u32,
pub errors: u32,
pub fetched_chunks: u32,
pub downloaded_and_checked_pieces: u32,
}
#[derive(Serialize, Deserialize)]
pub struct PeerStats {
pub counters: PeerCounters,
pub state: &'static str,
}
impl From<&super::atomic::PeerCounters> for PeerCounters {
fn from(counters: &super::atomic::PeerCounters) -> Self {
Self {
fetched_bytes: counters.fetched_bytes.load(Ordering::Relaxed),
total_time_connecting_ms: counters.total_time_connecting_ms.load(Ordering::Relaxed),
connection_attempts: counters.connection_attempts.load(Ordering::Relaxed),
connections: counters.connections.load(Ordering::Relaxed),
errors: counters.errors.load(Ordering::Relaxed),
fetched_chunks: counters.fetched_chunks.load(Ordering::Relaxed),
downloaded_and_checked_pieces: counters
.downloaded_and_checked_pieces
.load(Ordering::Relaxed),
}
}
}
impl From<&Peer> for PeerStats {
fn from(peer: &Peer) -> Self {
Self {
counters: peer.stats.counters.as_ref().into(),
state: peer.state.get().name(),
}
}
}
#[derive(Serialize)]
pub struct PeerStatsSnapshot {
pub peers: HashMap<String, PeerStats>,
}
#[derive(Clone, Copy, Default, Deserialize)]
pub enum PeerStatsFilterState {
All,
#[default]
Live,
}
impl PeerStatsFilterState {
pub fn matches(&self, s: &PeerState) -> bool {
match (self, s) {
(Self::All, _) => true,
(Self::Live, PeerState::Live(_)) => true,
_ => false,
}
}
}
#[derive(Default, Deserialize)]
pub struct PeerStatsFilter {
pub state: PeerStatsFilterState,
}

View file

@ -0,0 +1,114 @@
use std::net::SocketAddr;
use anyhow::Context;
use backoff::backoff::Backoff;
use dashmap::DashMap;
use crate::{
torrent_state::utils::{atomic_inc, TimedExistence},
type_aliases::{PeerHandle, BF},
};
use self::stats::{atomic::AggregatePeerStatsAtomic, snapshot::AggregatePeerStats};
use super::peer::{LivePeerState, Peer, PeerRx, PeerState, PeerTx};
pub mod stats;
#[derive(Default)]
pub struct PeerStates {
pub stats: AggregatePeerStatsAtomic,
pub states: DashMap<PeerHandle, Peer>,
}
impl PeerStates {
pub fn stats(&self) -> AggregatePeerStats {
AggregatePeerStats::from(&self.stats)
}
pub fn add_if_not_seen(&self, addr: SocketAddr) -> Option<PeerHandle> {
use dashmap::mapref::entry::Entry;
match self.states.entry(addr) {
Entry::Occupied(_) => None,
Entry::Vacant(vac) => {
vac.insert(Default::default());
atomic_inc(&self.stats.queued);
atomic_inc(&self.stats.seen);
Some(addr)
}
}
}
pub fn with_peer<R>(&self, addr: PeerHandle, f: impl FnOnce(&Peer) -> R) -> Option<R> {
self.states.get(&addr).map(|e| f(e.value()))
}
pub fn with_peer_mut<R>(
&self,
addr: PeerHandle,
reason: &'static str,
f: impl FnOnce(&mut Peer) -> R,
) -> Option<R> {
use crate::torrent_state::utils::timeit;
timeit(reason, || self.states.get_mut(&addr))
.map(|e| f(TimedExistence::new(e, reason).value_mut()))
}
pub fn with_live<R>(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option<R> {
self.states
.get(&addr)
.and_then(|e| match &e.value().state.get() {
PeerState::Live(l) => Some(f(l)),
_ => None,
})
}
pub fn with_live_mut<R>(
&self,
addr: PeerHandle,
reason: &'static str,
f: impl FnOnce(&mut LivePeerState) -> R,
) -> Option<R> {
self.with_peer_mut(addr, reason, |peer| peer.state.get_live_mut().map(f))
.flatten()
}
pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> {
let p = self.states.remove(&handle).map(|r| r.1)?;
self.stats.dec(p.state.get());
Some(p)
}
pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option<bool> {
self.with_live_mut(handle, "mark_peer_interested", |live| {
let prev = live.peer_interested;
live.peer_interested = is_interested;
prev
})
}
pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec<u8>) -> Option<()> {
self.with_live_mut(handle, "update_bitfield_from_vec", |live| {
live.bitfield = BF::from_vec(bitfield);
})
}
pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<(PeerRx, PeerTx)> {
let rx = self
.with_peer_mut(h, "mark_peer_connecting", |peer| {
peer.state
.queued_to_connecting(&self.stats)
.context("invalid peer state")
})
.context("peer not found in states")??;
Ok(rx)
}
pub fn reset_peer_backoff(&self, handle: PeerHandle) {
self.with_peer_mut(handle, "reset_peer_backoff", |p| {
p.stats.backoff.reset();
});
}
pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<PeerState> {
let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| {
peer.state.to_not_needed(&self.stats)
})?;
Some(prev)
}
}

View file

@ -0,0 +1,43 @@
use std::sync::atomic::AtomicU32;
use serde::Serialize;
use crate::torrent_state::{
live::peer::PeerState,
utils::{atomic_dec, atomic_inc},
};
#[derive(Debug, Default, Serialize)]
pub struct AggregatePeerStatsAtomic {
pub queued: AtomicU32,
pub connecting: AtomicU32,
pub live: AtomicU32,
pub seen: AtomicU32,
pub dead: AtomicU32,
pub not_needed: AtomicU32,
}
impl AggregatePeerStatsAtomic {
pub fn counter(&self, state: &PeerState) -> &AtomicU32 {
match state {
PeerState::Connecting(_) => &self.connecting,
PeerState::Live(_) => &self.live,
PeerState::Queued => &self.queued,
PeerState::Dead => &self.dead,
PeerState::NotNeeded => &self.not_needed,
}
}
pub fn inc(&self, state: &PeerState) {
atomic_inc(self.counter(state));
}
pub fn dec(&self, state: &PeerState) {
atomic_dec(self.counter(state));
}
pub fn incdec(&self, old: &PeerState, new: &PeerState) {
self.dec(old);
self.inc(new);
}
}

View file

@ -0,0 +1,2 @@
pub mod atomic;
pub mod snapshot;

View file

@ -0,0 +1,29 @@
use std::sync::atomic::Ordering;
use serde::Serialize;
use super::atomic::AggregatePeerStatsAtomic;
#[derive(Debug, Default, Serialize, PartialEq, Eq)]
pub struct AggregatePeerStats {
pub queued: usize,
pub connecting: usize,
pub live: usize,
pub seen: usize,
pub dead: usize,
pub not_needed: usize,
}
impl<'a> From<&'a AggregatePeerStatsAtomic> for AggregatePeerStats {
fn from(s: &'a AggregatePeerStatsAtomic) -> Self {
let ordering = Ordering::Relaxed;
Self {
queued: s.queued.load(ordering) as usize,
connecting: s.connecting.load(ordering) as usize,
live: s.live.load(ordering) as usize,
seen: s.seen.load(ordering) as usize,
dead: s.dead.load(ordering) as usize,
not_needed: s.not_needed.load(ordering) as usize,
}
}
}

View file

@ -0,0 +1,25 @@
use std::{
sync::atomic::{AtomicU64, Ordering},
time::Duration,
};
#[derive(Default, Debug)]
pub struct AtomicStats {
pub have_bytes: AtomicU64,
pub downloaded_and_checked_bytes: AtomicU64,
pub downloaded_and_checked_pieces: AtomicU64,
pub uploaded_bytes: AtomicU64,
pub fetched_bytes: AtomicU64,
pub total_piece_download_ms: AtomicU64,
}
impl AtomicStats {
pub fn average_piece_download_time(&self) -> Option<Duration> {
let d = self.downloaded_and_checked_pieces.load(Ordering::Acquire);
let t = self.total_piece_download_ms.load(Ordering::Acquire);
if d == 0 {
return None;
}
Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64))
}
}

View file

@ -0,0 +1,2 @@
pub mod atomic;
pub mod snapshot;

View file

@ -0,0 +1,32 @@
use std::time::{Duration, Instant};
use serde::Serialize;
use crate::torrent_state::live::peers::stats::snapshot::AggregatePeerStats;
#[derive(Debug, Serialize)]
pub struct StatsSnapshot {
pub have_bytes: u64,
pub downloaded_and_checked_bytes: u64,
pub downloaded_and_checked_pieces: u64,
pub fetched_bytes: u64,
pub uploaded_bytes: u64,
pub initially_needed_bytes: u64,
pub remaining_bytes: u64,
pub total_bytes: u64,
#[serde(skip)]
pub time: Instant,
pub total_piece_download_ms: u64,
pub peer_stats: AggregatePeerStats,
}
impl StatsSnapshot {
pub fn average_piece_download_time(&self) -> Option<Duration> {
let d = self.downloaded_and_checked_pieces;
let t = self.total_piece_download_ms;
if d == 0 {
return None;
}
Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64))
}
}

View file

@ -1,3 +1,5 @@
pub mod utils;
pub mod live;
pub use live::*;

View file

@ -0,0 +1,108 @@
use std::sync::atomic::{AtomicU32, Ordering};
pub fn atomic_inc(c: &AtomicU32) -> u32 {
c.fetch_add(1, Ordering::Relaxed)
}
pub fn atomic_dec(c: &AtomicU32) -> u32 {
c.fetch_sub(1, Ordering::Relaxed)
}
// Used during debugging to see if some locks take too long.
#[cfg(not(feature = "timed_existence"))]
mod timed_existence {
use std::ops::{Deref, DerefMut};
pub struct TimedExistence<T>(T);
impl<T> TimedExistence<T> {
#[inline(always)]
pub fn new(object: T, _reason: &'static str) -> Self {
Self(object)
}
}
impl<T> Deref for TimedExistence<T> {
type Target = T;
#[inline(always)]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for TimedExistence<T> {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[inline(always)]
pub fn timeit<R>(_n: impl std::fmt::Display, f: impl FnOnce() -> R) -> R {
f()
}
}
#[cfg(feature = "timed_existence")]
mod timed_existence {
use std::ops::{Deref, DerefMut};
use std::time::{Duration, Instant};
use tracing::warn;
const MAX: Duration = Duration::from_millis(1);
// Prints if the object exists for too long.
// This is used to track long-lived locks for debugging.
pub struct TimedExistence<T> {
object: T,
reason: &'static str,
started: Instant,
}
impl<T> TimedExistence<T> {
pub fn new(object: T, reason: &'static str) -> Self {
Self {
object,
reason,
started: Instant::now(),
}
}
}
impl<T> Drop for TimedExistence<T> {
fn drop(&mut self) {
let elapsed = self.started.elapsed();
let reason = self.reason;
if elapsed > MAX {
warn!("elapsed on lock {reason:?}: {elapsed:?}")
}
}
}
impl<T> Deref for TimedExistence<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.object
}
}
impl<T> DerefMut for TimedExistence<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.object
}
}
pub fn timeit<R>(name: impl std::fmt::Display, f: impl FnOnce() -> R) -> R {
let now = Instant::now();
let r = f();
let elapsed = now.elapsed();
if elapsed > MAX {
warn!("elapsed on \"{name:}\": {elapsed:?}")
}
r
}
}
pub use timed_existence::{timeit, TimedExistence};

View file

@ -11,7 +11,6 @@ use librqbit::{
Session, SessionOptions,
},
spawn_utils::{spawn, BlockingSpawner},
torrent_state::timeit,
};
use size_format::SizeFormatterBinary as SF;
use tracing::{error, info, span, warn, Level};
@ -244,7 +243,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
info!("[{}] initializing", idx);
},
ManagedTorrentState::Running(handle) => {
let stats = timeit("stats_snapshot", || handle.torrent_state().stats_snapshot());
let stats = handle.torrent_state().stats_snapshot();
let speed = handle.speed_estimator();
let total = stats.total_bytes;
let progress = stats.total_bytes - stats.remaining_bytes;