Exposing counters through HTTP

This commit is contained in:
Igor Katson 2023-11-20 13:55:42 +00:00
parent 3797a91be9
commit e214dd47a5
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 118 additions and 1 deletions

View file

@ -19,7 +19,7 @@ use axum::Router;
use crate::http_api_error::{ApiError, ApiErrorExt};
use crate::session::{AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session};
use crate::torrent_manager::TorrentManagerHandle;
use crate::torrent_state::StatsSnapshot;
use crate::torrent_state::{PeerStatsFilter, StatsSnapshot};
// Public API
#[derive(Clone)]
@ -50,6 +50,7 @@ impl HttpApi {
"GET /torrents/{index}": "Torrent details",
"GET /torrents/{index}/haves": "The bitfield of have pieces",
"GET /torrents/{index}/stats": "Torrent stats",
"GET /torrents/{index}/peer_stats": "Per peer stats",
// This is kind of not secure as it just reads any local file that it has access to,
// or any URL, but whatever, ok for our purposes / threat model.
"POST /torrents": "Add a torrent here. magnet: or http:// or a local file."
@ -100,6 +101,14 @@ impl HttpApi {
state.api_stats(idx).map(axum::Json)
}
async fn peer_stats(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Query(filter): Query<PeerStatsFilter>,
) -> Result<impl IntoResponse> {
state.api_peer_stats(idx, filter).map(axum::Json)
}
let app = Router::new()
.route("/", get(api_root))
.route("/dht/stats", get(dht_stats))
@ -108,6 +117,7 @@ impl HttpApi {
.route("/torrents/:id", get(torrent_details))
.route("/torrents/:id/haves", get(torrent_haves))
.route("/torrents/:id/stats", get(torrent_stats))
.route("/torrents/:id/peer_stats", get(peer_stats))
.with_state(state);
info!("starting HTTP server on {}", addr);
@ -260,6 +270,15 @@ impl ApiInternal {
make_torrent_details(&info_hash, handle.torrent_state().info(), only_files)
}
fn api_peer_stats(
&self,
idx: usize,
filter: PeerStatsFilter,
) -> Result<crate::torrent_state::PeerStatsSnapshot> {
let handle = self.mgr_handle(idx)?;
Ok(handle.torrent_state().per_peer_stats_snapshot(filter))
}
pub async fn api_add_torrent(
&self,
url: String,

View file

@ -399,6 +399,88 @@ mod timed_existence {
}
}
mod peer_stats_snapshot {
use std::{collections::HashMap, sync::atomic::Ordering};
use serde::{Deserialize, Serialize};
use crate::peer_state::PeerState;
use super::AggregatePeerStats;
#[derive(Serialize, Deserialize)]
pub struct PeerCounters {
pub fetched_bytes: u64,
pub total_time_connecting_ms: u64,
pub connection_attempts: u32,
pub connections: u32,
pub errors: u32,
pub fetched_chunks: u32,
pub downloaded_and_checked_pieces: u32,
}
#[derive(Serialize, Deserialize)]
pub struct PeerStats {
pub counters: PeerCounters,
pub state: &'static str,
}
impl From<&crate::peer_state::PeerCounters> for PeerCounters {
fn from(counters: &crate::peer_state::PeerCounters) -> Self {
Self {
fetched_bytes: counters.fetched_bytes.load(Ordering::Relaxed),
total_time_connecting_ms: counters.total_time_connecting_ms.load(Ordering::Relaxed),
connection_attempts: counters.connection_attempts.load(Ordering::Relaxed),
connections: counters.connections.load(Ordering::Relaxed),
errors: counters.errors.load(Ordering::Relaxed),
fetched_chunks: counters.fetched_chunks.load(Ordering::Relaxed),
downloaded_and_checked_pieces: counters
.downloaded_and_checked_pieces
.load(Ordering::Relaxed),
}
}
}
impl From<&crate::peer_state::Peer> for PeerStats {
fn from(peer: &crate::peer_state::Peer) -> Self {
Self {
counters: peer.stats.counters.as_ref().into(),
state: peer.state.get().name(),
}
}
}
#[derive(Serialize)]
pub struct PeerStatsSnapshot {
pub peers: HashMap<String, PeerStats>,
pub aggregate: AggregatePeerStats,
}
#[derive(Clone, Copy, Default, Deserialize)]
pub enum PeerStatsFilterState {
All,
#[default]
Live,
}
impl PeerStatsFilterState {
pub fn matches(&self, s: &PeerState) -> bool {
match (self, s) {
(Self::All, _) => true,
(Self::Live, PeerState::Live(_)) => true,
_ => false,
}
}
}
#[derive(Default, Deserialize)]
pub struct PeerStatsFilter {
pub state: PeerStatsFilterState,
}
}
pub use peer_stats_snapshot::{PeerStatsFilter, PeerStatsSnapshot};
pub use timed_existence::{timeit, TimedExistence};
impl TorrentState {
@ -701,6 +783,22 @@ impl TorrentState {
}
}
pub fn per_peer_stats_snapshot(
&self,
filter: PeerStatsFilter,
) -> peer_stats_snapshot::PeerStatsSnapshot {
peer_stats_snapshot::PeerStatsSnapshot {
peers: self
.peers
.states
.iter()
.filter(|e| filter.state.matches(e.value().state.get()))
.map(|e| (e.key().to_string(), e.value().into()))
.collect(),
aggregate: self.peers.stats(),
}
}
pub async fn wait_until_completed(&self) {
if self.is_finished() {
return;