DHT stats in HTTP API

This commit is contained in:
Igor Katson 2021-07-14 13:40:56 +01:00
parent f00f522767
commit 98dff76c40
10 changed files with 111 additions and 62 deletions

1
Cargo.lock generated
View file

@ -1325,6 +1325,7 @@ dependencies = [
"pretty_env_logger", "pretty_env_logger",
"regex", "regex",
"reqwest", "reqwest",
"size_format",
"tokio", "tokio",
] ]

View file

@ -28,6 +28,7 @@ use tokio_stream::wrappers::BroadcastStream;
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
pub struct DhtStats { pub struct DhtStats {
#[serde(serialize_with = "crate::utils::serialize_id20")]
pub id: Id20, pub id: Id20,
pub outstanding_requests: usize, pub outstanding_requests: usize,
pub seen_peers: usize, pub seen_peers: usize,

View file

@ -1,8 +1,10 @@
mod bprotocol; mod bprotocol;
mod dht; mod dht;
mod routing_table; mod routing_table;
mod utils;
pub use dht::Dht; pub use dht::Dht;
pub use dht::DhtStats;
pub use librqbit_core::id20::Id20; pub use librqbit_core::id20::Id20;
pub static DHT_BOOTSTRAP: &[&str] = &["dht.transmissionbt.com:6881", "dht.libtorrent.org:25401"]; pub static DHT_BOOTSTRAP: &[&str] = &["dht.transmissionbt.com:6881", "dht.libtorrent.org:25401"];

View file

@ -5,14 +5,7 @@ use std::{
use librqbit_core::id20::Id20; use librqbit_core::id20::Id20;
use log::debug; use log::debug;
use serde::{Serialize, Serializer}; use serde::Serialize;
fn serialize_id20<S>(id: &Id20, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
ser.serialize_str(&id.as_string())
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
enum BucketTreeNode { enum BucketTreeNode {
@ -23,9 +16,9 @@ enum BucketTreeNode {
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct BucketTree { pub struct BucketTree {
bits: u8, bits: u8,
#[serde(serialize_with = "serialize_id20")] #[serde(serialize_with = "crate::utils::serialize_id20")]
start: Id20, start: Id20,
#[serde(serialize_with = "serialize_id20")] #[serde(serialize_with = "crate::utils::serialize_id20")]
end_inclusive: Id20, end_inclusive: Id20,
data: BucketTreeNode, data: BucketTreeNode,
} }
@ -314,7 +307,7 @@ impl Default for BucketTree {
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct RoutingTableNode { pub struct RoutingTableNode {
#[serde(serialize_with = "serialize_id20")] #[serde(serialize_with = "crate::utils::serialize_id20")]
id: Id20, id: Id20,
addr: SocketAddr, addr: SocketAddr,
#[serde(skip)] #[serde(skip)]
@ -372,7 +365,7 @@ impl RoutingTableNode {
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct RoutingTable { pub struct RoutingTable {
#[serde(serialize_with = "serialize_id20")] #[serde(serialize_with = "crate::utils::serialize_id20")]
id: Id20, id: Id20,
size: usize, size: usize,
buckets: BucketTree, buckets: BucketTree,

9
crates/dht/src/utils.rs Normal file
View file

@ -0,0 +1,9 @@
use librqbit_core::id20::Id20;
use serde::Serializer;
pub fn serialize_id20<S>(id: &Id20, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
ser.serialize_str(&id.as_string())
}

View file

@ -1,6 +1,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use dht::{Dht, DhtStats};
use parking_lot::RwLock; use parking_lot::RwLock;
use serde::Serialize; use serde::Serialize;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -10,13 +11,15 @@ use crate::torrent_manager::TorrentManagerHandle;
use crate::torrent_state::StatsSnapshot; use crate::torrent_state::StatsSnapshot;
struct ApiInternal { struct ApiInternal {
dht: Option<Dht>,
startup_time: Instant, startup_time: Instant,
torrent_managers: RwLock<Vec<TorrentManagerHandle>>, torrent_managers: RwLock<Vec<TorrentManagerHandle>>,
} }
impl ApiInternal { impl ApiInternal {
fn new() -> Self { fn new(dht: Option<Dht>) -> Self {
Self { Self {
dht,
startup_time: Instant::now(), startup_time: Instant::now(),
torrent_managers: RwLock::new(Vec::new()), torrent_managers: RwLock::new(Vec::new()),
} }
@ -111,6 +114,10 @@ impl ApiInternal {
Some(TorrentDetailsResponse { info_hash, files }) Some(TorrentDetailsResponse { info_hash, files })
} }
fn api_dht_stats(&self) -> Option<DhtStats> {
self.dht.as_ref().map(|d| d.stats())
}
fn api_stats(&self, idx: usize) -> Option<StatsResponse> { fn api_stats(&self, idx: usize) -> Option<StatsResponse> {
let mgr = self.mgr_handle(idx)?; let mgr = self.mgr_handle(idx)?;
let snapshot = mgr.torrent_state().stats_snapshot(); let snapshot = mgr.torrent_state().stats_snapshot();
@ -154,23 +161,27 @@ fn json_response<T: Serialize>(v: T) -> warp::reply::Response {
response response
} }
fn not_found_response(idx: usize) -> warp::reply::Response { fn not_found_response(body: String) -> warp::reply::Response {
let mut response = warp::reply::Response::new(format!("torrent {} not found", idx).into()); let mut response = warp::reply::Response::new(body.into());
*response.status_mut() = warp::http::StatusCode::NOT_FOUND; *response.status_mut() = warp::http::StatusCode::NOT_FOUND;
response response
} }
fn torrent_not_found_response(idx: usize) -> warp::reply::Response {
not_found_response(format!("torrent {} not found", idx))
}
fn json_or_404<T: Serialize>(idx: usize, v: Option<T>) -> warp::reply::Response { fn json_or_404<T: Serialize>(idx: usize, v: Option<T>) -> warp::reply::Response {
match v { match v {
Some(v) => json_response(v), Some(v) => json_response(v),
None => not_found_response(idx), None => torrent_not_found_response(idx),
} }
} }
impl HttpApi { impl HttpApi {
pub fn new() -> Self { pub fn new(dht: Option<Dht>) -> Self {
Self { Self {
inner: Arc::new(ApiInternal::new()), inner: Arc::new(ApiInternal::new(dht)),
} }
} }
pub fn add_mgr(&self, handle: TorrentManagerHandle) -> usize { pub fn add_mgr(&self, handle: TorrentManagerHandle) -> usize {
@ -188,6 +199,22 @@ impl HttpApi {
move || json_response(inner.api_torrent_list()) move || json_response(inner.api_torrent_list())
}); });
let dht_stats = warp::path!("dht" / "stats").map({
let inner = inner.clone();
move || match inner.api_dht_stats() {
Some(stats) => json_response(stats),
None => not_found_response("DHT is off".into()),
}
});
let dht_routing_table = warp::path!("dht" / "table").map({
let inner = inner.clone();
move || match inner.dht.as_ref() {
Some(dht) => dht.with_routing_table(|r| json_response(r)),
None => not_found_response("DHT is off".into()),
}
});
let torrent_details = warp::path!(usize).map({ let torrent_details = warp::path!(usize).map({
let inner = inner.clone(); let inner = inner.clone();
move |idx| json_or_404(idx, inner.api_torrent_details(idx)) move |idx| json_or_404(idx, inner.api_torrent_details(idx))
@ -203,15 +230,14 @@ impl HttpApi {
move |idx| json_or_404(idx, inner.api_stats(idx)) move |idx| json_or_404(idx, inner.api_stats(idx))
}); });
let router = list.or(torrent_details).or(dump_haves).or(dump_stats); let router = list
.or(dht_stats)
.or(dht_routing_table)
.or(torrent_details)
.or(dump_haves)
.or(dump_stats);
warp::serve(router).run(addr).await; warp::serve(router).run(addr).await;
Ok(()) Ok(())
} }
} }
impl Default for HttpApi {
fn default() -> Self {
Self::new()
}
}

View file

@ -244,10 +244,11 @@ impl TorrentManager {
options, options,
}); });
spawn("stats printer", { // spawn("stats printer", {
let this = mgr.clone(); // let this = mgr.clone();
async move { this.stats_printer().await } // async move { this.stats_printer().await }
}); // });
spawn("speed estimator updater", { spawn("speed estimator updater", {
let state = mgr.state.clone(); let state = mgr.state.clone();
async move { async move {
@ -264,35 +265,6 @@ impl TorrentManager {
Ok(mgr.into_handle()) Ok(mgr.into_handle())
} }
async fn stats_printer(&self) -> anyhow::Result<()> {
loop {
let live_peer_stats = self.state.lock_read().peers.stats();
let seen_peers_count = self.state.lock_read().peers.seen().len();
let stats = self.state.stats_snapshot();
let needed = self.state.initially_needed();
let downloaded_pct = if stats.remaining_bytes == 0 {
100f64
} else {
(stats.downloaded_and_checked_bytes as f64 / needed as f64) * 100f64
};
info!(
"Stats: downloaded {:.2}% ({:.2}), peers {{live: {}, connecting: {}, queued: {}, seen: {}}}, fetched {}, remaining {:.2} out of {:.2}, uploaded {:.2}, total have {:.2}",
downloaded_pct,
SF::new(stats.downloaded_and_checked_bytes),
live_peer_stats.live,
live_peer_stats.connecting,
live_peer_stats.queued,
seen_peers_count,
SF::new(stats.fetched_bytes),
SF::new(stats.remaining_bytes),
SF::new(needed),
SF::new(stats.uploaded_bytes),
SF::new(stats.have_bytes)
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
fn into_handle(self: Arc<Self>) -> TorrentManagerHandle { fn into_handle(self: Arc<Self>) -> TorrentManagerHandle {
TorrentManagerHandle { manager: self } TorrentManagerHandle { manager: self }
} }

View file

@ -62,11 +62,13 @@ pub struct AggregatePeerStats {
pub queued: usize, pub queued: usize,
pub connecting: usize, pub connecting: usize,
pub live: usize, pub live: usize,
pub seen: usize,
} }
impl PeerStates { impl PeerStates {
pub fn stats(&self) -> AggregatePeerStats { pub fn stats(&self) -> AggregatePeerStats {
self.states let mut stats = self
.states
.values() .values()
.fold(AggregatePeerStats::default(), |mut s, p| { .fold(AggregatePeerStats::default(), |mut s, p| {
match p { match p {
@ -75,7 +77,9 @@ impl PeerStates {
PeerState::Queued => s.queued += 1, PeerState::Queued => s.queued += 1,
}; };
s s
}) });
stats.seen = self.seen.len();
stats
} }
pub fn add_if_not_seen( pub fn add_if_not_seen(
&mut self, &mut self,
@ -547,6 +551,10 @@ impl TorrentState {
true true
} }
pub fn peer_stats_snapshot(&self) -> AggregatePeerStats {
self.locked.read().peers.stats()
}
pub fn stats_snapshot(&self) -> StatsSnapshot { pub fn stats_snapshot(&self) -> StatsSnapshot {
let g = self.locked.read(); let g = self.locked.read();
use Ordering::*; use Ordering::*;

View file

@ -20,6 +20,7 @@ reqwest = "0.11"
regex = "1" regex = "1"
futures = "0.3" futures = "0.3"
parse_duration = "2" parse_duration = "2"
size_format = "1"
[dev-dependencies] [dev-dependencies]
futures = {version = "0.3"} futures = {version = "0.3"}

View file

@ -15,6 +15,7 @@ use librqbit::{
}; };
use log::{info, warn}; use log::{info, warn};
use reqwest::Url; use reqwest::Url;
use size_format::SizeFormatterBinary as SF;
async fn torrent_from_url(url: &str) -> anyhow::Result<TorrentMetaV1Owned> { async fn torrent_from_url(url: &str) -> anyhow::Result<TorrentMetaV1Owned> {
let response = reqwest::get(url) let response = reqwest::get(url)
@ -206,6 +207,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
} = Magnet::parse(&opts.torrent_path).context("provided path is not a valid magnet URL")?; } = Magnet::parse(&opts.torrent_path).context("provided path is not a valid magnet URL")?;
let dht_rx = dht let dht_rx = dht
.as_ref()
.ok_or_else(|| anyhow::anyhow!("magnet links without DHT are not supported"))? .ok_or_else(|| anyhow::anyhow!("magnet links without DHT are not supported"))?
.get_peers(info_hash) .get_peers(info_hash)
.await?; .await?;
@ -236,6 +238,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
info_hash, info_hash,
info, info,
peer_id, peer_id,
dht,
Some(dht_rx), Some(dht_rx),
initial_peers.into_iter().collect(), initial_peers.into_iter().collect(),
trackers, trackers,
@ -250,7 +253,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
} else { } else {
torrent_from_file(&opts.torrent_path)? torrent_from_file(&opts.torrent_path)?
}; };
let dht_rx = match dht { let dht_rx = match dht.as_ref() {
Some(dht) => Some(flatten_dht_peers_stream( Some(dht) => Some(flatten_dht_peers_stream(
dht.get_peers(torrent.info_hash).await?, dht.get_peers(torrent.info_hash).await?,
)), )),
@ -280,6 +283,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
torrent.info_hash, torrent.info_hash,
torrent.info, torrent.info,
peer_id, peer_id,
dht,
dht_rx, dht_rx,
Vec::new(), Vec::new(),
trackers, trackers,
@ -310,6 +314,7 @@ async fn main_torrent_info(
info_hash: Id20, info_hash: Id20,
info: TorrentMetaV1Info<ByteString>, info: TorrentMetaV1Info<ByteString>,
peer_id: Id20, peer_id: Id20,
dht: Option<Dht>,
dht_peer_rx: Option<impl StreamExt<Item = SocketAddr> + Unpin + Send + Sync + 'static>, dht_peer_rx: Option<impl StreamExt<Item = SocketAddr> + Unpin + Send + Sync + 'static>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
trackers: Vec<reqwest::Url>, trackers: Vec<reqwest::Url>,
@ -342,7 +347,7 @@ async fn main_torrent_info(
builder.peer_connect_timeout(t.0); builder.peer_connect_timeout(t.0);
} }
let http_api = librqbit::http_api::HttpApi::new(); let http_api = librqbit::http_api::HttpApi::new(dht.clone());
spawn("HTTP API", { spawn("HTTP API", {
let http_api = http_api.clone(); let http_api = http_api.clone();
async move { http_api.make_http_api_and_run(http_api_listen_addr).await } async move { http_api.make_http_api_and_run(http_api_listen_addr).await }
@ -357,6 +362,37 @@ async fn main_torrent_info(
for peer in initial_peers { for peer in initial_peers {
handle.add_peer(peer); handle.add_peer(peer);
} }
spawn("Stats printer", {
let handle = handle.clone();
async move {
loop {
let dht_stats = dht.as_ref().map(|d| d.stats());
let peer_stats = handle.torrent_state().peer_stats_snapshot();
let stats = handle.torrent_state().stats_snapshot();
let needed = stats.initially_needed_bytes;
let downloaded_pct = if stats.remaining_bytes == 0 {
100f64
} else {
(stats.downloaded_and_checked_bytes as f64 / needed as f64) * 100f64
};
info!(
"Stats: downloaded {:.2}% ({:.2}), fetched {}, remaining {:.2} out of {:.2}, uploaded {:.2}, have {:.2}, peers: {:?}, dht: {:?}",
downloaded_pct,
SF::new(stats.downloaded_and_checked_bytes),
SF::new(stats.fetched_bytes),
SF::new(stats.remaining_bytes),
SF::new(needed),
SF::new(stats.uploaded_bytes),
SF::new(stats.have_bytes),
peer_stats,
dht_stats,
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
});
if let Some(mut dht_peer_rx) = dht_peer_rx { if let Some(mut dht_peer_rx) = dht_peer_rx {
spawn("DHT peer adder", { spawn("DHT peer adder", {
let handle = handle.clone(); let handle = handle.clone();