From 98dff76c404d416263fc3e4516b3745e46c00353 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 14 Jul 2021 13:40:56 +0100 Subject: [PATCH] DHT stats in HTTP API --- Cargo.lock | 1 + crates/dht/src/dht.rs | 1 + crates/dht/src/lib.rs | 2 + crates/dht/src/routing_table.rs | 17 +++------ crates/dht/src/utils.rs | 9 +++++ crates/librqbit/src/http_api.rs | 52 +++++++++++++++++++------- crates/librqbit/src/torrent_manager.rs | 38 +++---------------- crates/librqbit/src/torrent_state.rs | 12 +++++- crates/rqbit/Cargo.toml | 1 + crates/rqbit/src/main.rs | 40 +++++++++++++++++++- 10 files changed, 111 insertions(+), 62 deletions(-) create mode 100644 crates/dht/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index af966d6..55e824b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1325,6 +1325,7 @@ dependencies = [ "pretty_env_logger", "regex", "reqwest", + "size_format", "tokio", ] diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 89cbf40..16b45e2 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -28,6 +28,7 @@ use tokio_stream::wrappers::BroadcastStream; #[derive(Debug, Serialize)] pub struct DhtStats { + #[serde(serialize_with = "crate::utils::serialize_id20")] pub id: Id20, pub outstanding_requests: usize, pub seen_peers: usize, diff --git a/crates/dht/src/lib.rs b/crates/dht/src/lib.rs index 193af3d..ff5fee1 100644 --- a/crates/dht/src/lib.rs +++ b/crates/dht/src/lib.rs @@ -1,8 +1,10 @@ mod bprotocol; mod dht; mod routing_table; +mod utils; pub use dht::Dht; +pub use dht::DhtStats; pub use librqbit_core::id20::Id20; pub static DHT_BOOTSTRAP: &[&str] = &["dht.transmissionbt.com:6881", "dht.libtorrent.org:25401"]; diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index 2e45dc0..c7fe2d3 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -5,14 +5,7 @@ use std::{ use librqbit_core::id20::Id20; use log::debug; -use serde::{Serialize, Serializer}; - -fn serialize_id20(id: &Id20, ser: S) -> Result -where - S: Serializer, -{ - ser.serialize_str(&id.as_string()) -} +use serde::Serialize; #[derive(Debug, Clone, Serialize)] enum BucketTreeNode { @@ -23,9 +16,9 @@ enum BucketTreeNode { #[derive(Debug, Clone, Serialize)] pub struct BucketTree { bits: u8, - #[serde(serialize_with = "serialize_id20")] + #[serde(serialize_with = "crate::utils::serialize_id20")] start: Id20, - #[serde(serialize_with = "serialize_id20")] + #[serde(serialize_with = "crate::utils::serialize_id20")] end_inclusive: Id20, data: BucketTreeNode, } @@ -314,7 +307,7 @@ impl Default for BucketTree { #[derive(Debug, Clone, Serialize)] pub struct RoutingTableNode { - #[serde(serialize_with = "serialize_id20")] + #[serde(serialize_with = "crate::utils::serialize_id20")] id: Id20, addr: SocketAddr, #[serde(skip)] @@ -372,7 +365,7 @@ impl RoutingTableNode { #[derive(Debug, Clone, Serialize)] pub struct RoutingTable { - #[serde(serialize_with = "serialize_id20")] + #[serde(serialize_with = "crate::utils::serialize_id20")] id: Id20, size: usize, buckets: BucketTree, diff --git a/crates/dht/src/utils.rs b/crates/dht/src/utils.rs new file mode 100644 index 0000000..59d7236 --- /dev/null +++ b/crates/dht/src/utils.rs @@ -0,0 +1,9 @@ +use librqbit_core::id20::Id20; +use serde::Serializer; + +pub fn serialize_id20(id: &Id20, ser: S) -> Result +where + S: Serializer, +{ + ser.serialize_str(&id.as_string()) +} diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index a783963..c58b1bc 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; +use dht::{Dht, DhtStats}; use parking_lot::RwLock; use serde::Serialize; use std::time::{Duration, Instant}; @@ -10,13 +11,15 @@ use crate::torrent_manager::TorrentManagerHandle; use crate::torrent_state::StatsSnapshot; struct ApiInternal { + dht: Option, startup_time: Instant, torrent_managers: RwLock>, } impl ApiInternal { - fn new() -> Self { + fn new(dht: Option) -> Self { Self { + dht, startup_time: Instant::now(), torrent_managers: RwLock::new(Vec::new()), } @@ -111,6 +114,10 @@ impl ApiInternal { Some(TorrentDetailsResponse { info_hash, files }) } + fn api_dht_stats(&self) -> Option { + self.dht.as_ref().map(|d| d.stats()) + } + fn api_stats(&self, idx: usize) -> Option { let mgr = self.mgr_handle(idx)?; let snapshot = mgr.torrent_state().stats_snapshot(); @@ -154,23 +161,27 @@ fn json_response(v: T) -> warp::reply::Response { response } -fn not_found_response(idx: usize) -> warp::reply::Response { - let mut response = warp::reply::Response::new(format!("torrent {} not found", idx).into()); +fn not_found_response(body: String) -> warp::reply::Response { + let mut response = warp::reply::Response::new(body.into()); *response.status_mut() = warp::http::StatusCode::NOT_FOUND; response } +fn torrent_not_found_response(idx: usize) -> warp::reply::Response { + not_found_response(format!("torrent {} not found", idx)) +} + fn json_or_404(idx: usize, v: Option) -> warp::reply::Response { match v { Some(v) => json_response(v), - None => not_found_response(idx), + None => torrent_not_found_response(idx), } } impl HttpApi { - pub fn new() -> Self { + pub fn new(dht: Option) -> Self { Self { - inner: Arc::new(ApiInternal::new()), + inner: Arc::new(ApiInternal::new(dht)), } } pub fn add_mgr(&self, handle: TorrentManagerHandle) -> usize { @@ -188,6 +199,22 @@ impl HttpApi { move || json_response(inner.api_torrent_list()) }); + let dht_stats = warp::path!("dht" / "stats").map({ + let inner = inner.clone(); + move || match inner.api_dht_stats() { + Some(stats) => json_response(stats), + None => not_found_response("DHT is off".into()), + } + }); + + let dht_routing_table = warp::path!("dht" / "table").map({ + let inner = inner.clone(); + move || match inner.dht.as_ref() { + Some(dht) => dht.with_routing_table(|r| json_response(r)), + None => not_found_response("DHT is off".into()), + } + }); + let torrent_details = warp::path!(usize).map({ let inner = inner.clone(); move |idx| json_or_404(idx, inner.api_torrent_details(idx)) @@ -203,15 +230,14 @@ impl HttpApi { move |idx| json_or_404(idx, inner.api_stats(idx)) }); - let router = list.or(torrent_details).or(dump_haves).or(dump_stats); + let router = list + .or(dht_stats) + .or(dht_routing_table) + .or(torrent_details) + .or(dump_haves) + .or(dump_stats); warp::serve(router).run(addr).await; Ok(()) } } - -impl Default for HttpApi { - fn default() -> Self { - Self::new() - } -} diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index e05b14b..5d1ad8d 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -244,10 +244,11 @@ impl TorrentManager { options, }); - spawn("stats printer", { - let this = mgr.clone(); - async move { this.stats_printer().await } - }); + // spawn("stats printer", { + // let this = mgr.clone(); + // async move { this.stats_printer().await } + // }); + spawn("speed estimator updater", { let state = mgr.state.clone(); async move { @@ -264,35 +265,6 @@ impl TorrentManager { Ok(mgr.into_handle()) } - async fn stats_printer(&self) -> anyhow::Result<()> { - loop { - let live_peer_stats = self.state.lock_read().peers.stats(); - let seen_peers_count = self.state.lock_read().peers.seen().len(); - let stats = self.state.stats_snapshot(); - let needed = self.state.initially_needed(); - let downloaded_pct = if stats.remaining_bytes == 0 { - 100f64 - } else { - (stats.downloaded_and_checked_bytes as f64 / needed as f64) * 100f64 - }; - info!( - "Stats: downloaded {:.2}% ({:.2}), peers {{live: {}, connecting: {}, queued: {}, seen: {}}}, fetched {}, remaining {:.2} out of {:.2}, uploaded {:.2}, total have {:.2}", - downloaded_pct, - SF::new(stats.downloaded_and_checked_bytes), - live_peer_stats.live, - live_peer_stats.connecting, - live_peer_stats.queued, - seen_peers_count, - SF::new(stats.fetched_bytes), - SF::new(stats.remaining_bytes), - SF::new(needed), - SF::new(stats.uploaded_bytes), - SF::new(stats.have_bytes) - ); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - fn into_handle(self: Arc) -> TorrentManagerHandle { TorrentManagerHandle { manager: self } } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index e092826..e26d270 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -62,11 +62,13 @@ pub struct AggregatePeerStats { pub queued: usize, pub connecting: usize, pub live: usize, + pub seen: usize, } impl PeerStates { pub fn stats(&self) -> AggregatePeerStats { - self.states + let mut stats = self + .states .values() .fold(AggregatePeerStats::default(), |mut s, p| { match p { @@ -75,7 +77,9 @@ impl PeerStates { PeerState::Queued => s.queued += 1, }; s - }) + }); + stats.seen = self.seen.len(); + stats } pub fn add_if_not_seen( &mut self, @@ -547,6 +551,10 @@ impl TorrentState { true } + pub fn peer_stats_snapshot(&self) -> AggregatePeerStats { + self.locked.read().peers.stats() + } + pub fn stats_snapshot(&self) -> StatsSnapshot { let g = self.locked.read(); use Ordering::*; diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index 405917d..a7f0c51 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -20,6 +20,7 @@ reqwest = "0.11" regex = "1" futures = "0.3" parse_duration = "2" +size_format = "1" [dev-dependencies] futures = {version = "0.3"} \ No newline at end of file diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 43b3e1d..8c818e0 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -15,6 +15,7 @@ use librqbit::{ }; use log::{info, warn}; use reqwest::Url; +use size_format::SizeFormatterBinary as SF; async fn torrent_from_url(url: &str) -> anyhow::Result { let response = reqwest::get(url) @@ -206,6 +207,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> } = Magnet::parse(&opts.torrent_path).context("provided path is not a valid magnet URL")?; let dht_rx = dht + .as_ref() .ok_or_else(|| anyhow::anyhow!("magnet links without DHT are not supported"))? .get_peers(info_hash) .await?; @@ -236,6 +238,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> info_hash, info, peer_id, + dht, Some(dht_rx), initial_peers.into_iter().collect(), trackers, @@ -250,7 +253,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> } else { torrent_from_file(&opts.torrent_path)? }; - let dht_rx = match dht { + let dht_rx = match dht.as_ref() { Some(dht) => Some(flatten_dht_peers_stream( dht.get_peers(torrent.info_hash).await?, )), @@ -280,6 +283,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> torrent.info_hash, torrent.info, peer_id, + dht, dht_rx, Vec::new(), trackers, @@ -310,6 +314,7 @@ async fn main_torrent_info( info_hash: Id20, info: TorrentMetaV1Info, peer_id: Id20, + dht: Option, dht_peer_rx: Option + Unpin + Send + Sync + 'static>, initial_peers: Vec, trackers: Vec, @@ -342,7 +347,7 @@ async fn main_torrent_info( builder.peer_connect_timeout(t.0); } - let http_api = librqbit::http_api::HttpApi::new(); + let http_api = librqbit::http_api::HttpApi::new(dht.clone()); spawn("HTTP API", { let http_api = http_api.clone(); async move { http_api.make_http_api_and_run(http_api_listen_addr).await } @@ -357,6 +362,37 @@ async fn main_torrent_info( for peer in initial_peers { handle.add_peer(peer); } + + spawn("Stats printer", { + let handle = handle.clone(); + async move { + loop { + let dht_stats = dht.as_ref().map(|d| d.stats()); + let peer_stats = handle.torrent_state().peer_stats_snapshot(); + let stats = handle.torrent_state().stats_snapshot(); + let needed = stats.initially_needed_bytes; + let downloaded_pct = if stats.remaining_bytes == 0 { + 100f64 + } else { + (stats.downloaded_and_checked_bytes as f64 / needed as f64) * 100f64 + }; + info!( + "Stats: downloaded {:.2}% ({:.2}), fetched {}, remaining {:.2} out of {:.2}, uploaded {:.2}, have {:.2}, peers: {:?}, dht: {:?}", + downloaded_pct, + SF::new(stats.downloaded_and_checked_bytes), + SF::new(stats.fetched_bytes), + SF::new(stats.remaining_bytes), + SF::new(needed), + SF::new(stats.uploaded_bytes), + SF::new(stats.have_bytes), + peer_stats, + dht_stats, + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }); + if let Some(mut dht_peer_rx) = dht_peer_rx { spawn("DHT peer adder", { let handle = handle.clone();