From e214dd47a583ff44b5608fcf6811b2beee96facc Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 20 Nov 2023 13:55:42 +0000 Subject: [PATCH] Exposing counters through HTTP --- crates/librqbit/src/http_api.rs | 21 +++++- crates/librqbit/src/torrent_state.rs | 98 ++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index e63156f..dca4264 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -19,7 +19,7 @@ use axum::Router; use crate::http_api_error::{ApiError, ApiErrorExt}; use crate::session::{AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session}; use crate::torrent_manager::TorrentManagerHandle; -use crate::torrent_state::StatsSnapshot; +use crate::torrent_state::{PeerStatsFilter, StatsSnapshot}; // Public API #[derive(Clone)] @@ -50,6 +50,7 @@ impl HttpApi { "GET /torrents/{index}": "Torrent details", "GET /torrents/{index}/haves": "The bitfield of have pieces", "GET /torrents/{index}/stats": "Torrent stats", + "GET /torrents/{index}/peer_stats": "Per peer stats", // This is kind of not secure as it just reads any local file that it has access to, // or any URL, but whatever, ok for our purposes / threat model. "POST /torrents": "Add a torrent here. magnet: or http:// or a local file." @@ -100,6 +101,14 @@ impl HttpApi { state.api_stats(idx).map(axum::Json) } + async fn peer_stats( + State(state): State, + Path(idx): Path, + Query(filter): Query, + ) -> Result { + state.api_peer_stats(idx, filter).map(axum::Json) + } + let app = Router::new() .route("/", get(api_root)) .route("/dht/stats", get(dht_stats)) @@ -108,6 +117,7 @@ impl HttpApi { .route("/torrents/:id", get(torrent_details)) .route("/torrents/:id/haves", get(torrent_haves)) .route("/torrents/:id/stats", get(torrent_stats)) + .route("/torrents/:id/peer_stats", get(peer_stats)) .with_state(state); info!("starting HTTP server on {}", addr); @@ -260,6 +270,15 @@ impl ApiInternal { make_torrent_details(&info_hash, handle.torrent_state().info(), only_files) } + fn api_peer_stats( + &self, + idx: usize, + filter: PeerStatsFilter, + ) -> Result { + let handle = self.mgr_handle(idx)?; + Ok(handle.torrent_state().per_peer_stats_snapshot(filter)) + } + pub async fn api_add_torrent( &self, url: String, diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index c5d5d4b..272a8d5 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -399,6 +399,88 @@ mod timed_existence { } } +mod peer_stats_snapshot { + use std::{collections::HashMap, sync::atomic::Ordering}; + + use serde::{Deserialize, Serialize}; + + use crate::peer_state::PeerState; + + use super::AggregatePeerStats; + + #[derive(Serialize, Deserialize)] + pub struct PeerCounters { + pub fetched_bytes: u64, + pub total_time_connecting_ms: u64, + pub connection_attempts: u32, + pub connections: u32, + pub errors: u32, + pub fetched_chunks: u32, + pub downloaded_and_checked_pieces: u32, + } + + #[derive(Serialize, Deserialize)] + pub struct PeerStats { + pub counters: PeerCounters, + pub state: &'static str, + } + + impl From<&crate::peer_state::PeerCounters> for PeerCounters { + fn from(counters: &crate::peer_state::PeerCounters) -> Self { + Self { + fetched_bytes: counters.fetched_bytes.load(Ordering::Relaxed), + total_time_connecting_ms: counters.total_time_connecting_ms.load(Ordering::Relaxed), + connection_attempts: counters.connection_attempts.load(Ordering::Relaxed), + connections: counters.connections.load(Ordering::Relaxed), + errors: counters.errors.load(Ordering::Relaxed), + fetched_chunks: counters.fetched_chunks.load(Ordering::Relaxed), + downloaded_and_checked_pieces: counters + .downloaded_and_checked_pieces + .load(Ordering::Relaxed), + } + } + } + + impl From<&crate::peer_state::Peer> for PeerStats { + fn from(peer: &crate::peer_state::Peer) -> Self { + Self { + counters: peer.stats.counters.as_ref().into(), + state: peer.state.get().name(), + } + } + } + + #[derive(Serialize)] + pub struct PeerStatsSnapshot { + pub peers: HashMap, + pub aggregate: AggregatePeerStats, + } + + #[derive(Clone, Copy, Default, Deserialize)] + pub enum PeerStatsFilterState { + All, + #[default] + Live, + } + + impl PeerStatsFilterState { + pub fn matches(&self, s: &PeerState) -> bool { + match (self, s) { + (Self::All, _) => true, + (Self::Live, PeerState::Live(_)) => true, + _ => false, + } + } + } + + #[derive(Default, Deserialize)] + pub struct PeerStatsFilter { + pub state: PeerStatsFilterState, + } +} + +pub use peer_stats_snapshot::{PeerStatsFilter, PeerStatsSnapshot}; + pub use timed_existence::{timeit, TimedExistence}; impl TorrentState { @@ -701,6 +783,22 @@ impl TorrentState { } } + pub fn per_peer_stats_snapshot( + &self, + filter: PeerStatsFilter, + ) -> peer_stats_snapshot::PeerStatsSnapshot { + peer_stats_snapshot::PeerStatsSnapshot { + peers: self + .peers + .states + .iter() + .filter(|e| filter.state.matches(e.value().state.get())) + .map(|e| (e.key().to_string(), e.value().into())) + .collect(), + aggregate: self.peers.stats(), + } + } + pub async fn wait_until_completed(&self) { if self.is_finished() { return;