From 3beac77e5d7ca1b607dc97f8c42c3b26514611a1 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 8 Jul 2021 23:03:58 +0100 Subject: [PATCH] JSON HTTP API --- Cargo.lock | 1 + crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/http_api.rs | 247 ++++++++++++++++++--------- crates/librqbit/src/torrent_state.rs | 4 +- 4 files changed, 167 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3523cac..8912f13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -684,6 +684,7 @@ dependencies = [ "rand 0.8.4", "reqwest", "serde", + "serde_json", "sha1", "sha1w", "size_format", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 37034e0..d058412 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -22,6 +22,7 @@ sha1w = {path = "../sha1w"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} serde = {version = "1", features=["derive"]} +serde_json = "1" anyhow = "1" reqwest = "0.11" diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 6b8bce8..627cb8f 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -2,54 +2,190 @@ use std::net::SocketAddr; use std::sync::Arc; use parking_lot::RwLock; +use serde::Serialize; use std::io::Write; -use std::time::Instant; +use std::time::{Duration, Instant}; use warp::Filter; use crate::torrent_manager::TorrentManagerHandle; +use crate::torrent_state::StatsSnapshot; -enum Response { - NotFound(usize), - OkVec(Vec), - OkString(String), +// enum Response { +// NotFound(usize), +// OkJson(T), +// Ok(B), +// } + +// impl warp::Reply for Response +// where T: Serialize + Send, +// B: Into + Send +// { +// fn into_response(self) -> warp::reply::Response + +// { +// match self { +// Response::NotFound(idx) => { +// let mut response = warp::reply::Response::new(warp::hyper::Body::from(format!( +// "torrent {} not found", +// idx +// ))); +// *response.status_mut() = warp::http::StatusCode::NOT_FOUND; +// response +// } +// Response::OkJson(body) => { +// match serde_json::to_vec_pretty(&body) { +// Ok(body) => { +// let mut response = warp::reply::Response::new(warp::hyper::Body::from(body)); +// response.headers_mut().insert("content-type", warp::http::HeaderValue::from_static("application/json")); +// response +// } +// Err(e) => { +// todo!() +// } +// } + +// }, +// Response::Ok(body) => warp::reply::Response::new(body.into()), +// } +// } +// } + +struct Inner { + startup_time: Instant, + torrent_managers: RwLock>, } -impl warp::Reply for Response { - fn into_response(self) -> warp::reply::Response { - match self { - Response::NotFound(idx) => { - let mut response = warp::reply::Response::new(warp::hyper::Body::from(format!( - "torrent {} not found", - idx - ))); - *response.status_mut() = warp::http::StatusCode::NOT_FOUND; - response - } - Response::OkVec(body) => warp::reply::Response::new(warp::hyper::Body::from(body)), - Response::OkString(body) => warp::reply::Response::new(warp::hyper::Body::from(body)), +impl Inner { + fn new() -> Self { + Self { + startup_time: Instant::now(), + torrent_managers: RwLock::new(Vec::new()), } } } -#[derive(Default)] -struct Inner { - torrent_managers: RwLock>, +#[derive(Serialize)] +struct Speed { + mbps: f64, + human_readable: String, +} + +impl Speed { + fn new(mbps: f64) -> Self { + Self { + mbps, + human_readable: format!("{:.2}Mbps", mbps), + } + } +} + +impl From for Speed { + fn from(mbps: f64) -> Self { + Self::new(mbps) + } +} + +#[derive(Serialize)] +struct TorrentListResponseItem { + id: usize, + info_hash: String, +} + +#[derive(Serialize)] +struct TorrentListResponse { + torrents: Vec, +} + +#[derive(Serialize)] +struct StatsResponse { + snapshot: StatsSnapshot, + average_piece_download_time: Option, + download_speed: Speed, + all_time_download_speed: Speed, + time_remaining: Option, } impl Inner { fn mgr_handle(&self, idx: usize) -> Option { self.torrent_managers.read().get(idx).cloned() } + + fn api_torrent_list(&self) -> TorrentListResponse { + TorrentListResponse { + torrents: self + .torrent_managers + .read() + .iter() + .enumerate() + .map(|(id, mgr)| TorrentListResponseItem { + id, + info_hash: hex::encode(mgr.torrent_state().info_hash()), + }) + .collect(), + } + } + + fn api_stats(&self, idx: usize) -> Option { + let mgr = self.mgr_handle(idx)?; + let snapshot = mgr.torrent_state().stats_snapshot(); + let estimator = mgr.speed_estimator(); + + // Poor mans download speed computation + let elapsed = self.startup_time.elapsed(); + let downloaded_bytes = snapshot.downloaded_and_checked_bytes; + let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64; + + Some(StatsResponse { + average_piece_download_time: snapshot.average_piece_download_time(), + snapshot, + all_time_download_speed: (downloaded_mb / elapsed.as_secs_f64()).into(), + download_speed: estimator.download_mbps().into(), + time_remaining: estimator.time_remaining(), + }) + } + + fn api_dump_haves(&self, idx: usize) -> Option { + let mgr = self.mgr_handle(idx)?; + Some(format!( + "{:?}", + mgr.torrent_state().lock_read().chunks.get_have_pieces(), + )) + } } -#[derive(Clone, Default)] +#[derive(Clone)] pub struct HttpApi { inner: Arc, } +fn json_response(v: T) -> warp::reply::Response { + let body = serde_json::to_string_pretty(&v).unwrap(); + let mut response = warp::reply::Response::new(body.into()); + response.headers_mut().insert( + "content-type", + warp::http::HeaderValue::from_static("application/json"), + ); + response +} + +fn not_found_response(idx: usize) -> warp::reply::Response { + let mut response = warp::reply::Response::new(format!("torrent {} not found", idx).into()); + *response.status_mut() = warp::http::StatusCode::NOT_FOUND; + response +} + +fn json_or_404(idx: usize, v: Option) -> warp::reply::Response { + match v { + Some(v) => json_response(v), + None => not_found_response(idx), + } +} + impl HttpApi { pub fn new() -> Self { - Default::default() + Self { + inner: Arc::new(Inner::new()), + } } pub fn add_mgr(&self, handle: TorrentManagerHandle) -> usize { let mut g = self.inner.torrent_managers.write(); @@ -65,76 +201,17 @@ impl HttpApi { let list = warp::path::end().map({ let inner = inner.clone(); - move || { - let mut buf = Vec::::new(); - for (idx, handle) in inner.torrent_managers.read().iter().enumerate() { - writeln!( - buf, - "{}: {}\n", - idx, - hex::encode(handle.torrent_state().info_hash()) - ) - .unwrap(); - } - Response::OkVec(buf) - } + move || json_response(inner.api_torrent_list()) }); let dump_haves = warp::path!(usize / "haves").map({ let inner = inner.clone(); - move |idx| { - let mgr = match inner.mgr_handle(idx) { - Some(mgr) => mgr, - None => return Response::NotFound(idx), - }; - return Response::OkString(format!( - "{:?}", - mgr.torrent_state().lock_read().chunks.get_have_pieces(), - )); - } + move |idx| json_or_404(idx, inner.api_dump_haves(idx)) }); let dump_stats = warp::path!(usize / "stats").map({ let inner = inner.clone(); - let start_time = Instant::now(); - move |idx| { - let mgr = match inner.mgr_handle(idx) { - Some(mgr) => mgr, - None => return Response::NotFound(idx), - }; - let snapshot = mgr.torrent_state().stats_snapshot(); - let estimator = mgr.speed_estimator(); - let mut buf = Vec::new(); - writeln!(buf, "{:#?}", &snapshot).unwrap(); - writeln!( - buf, - "Average download time: {:?}", - snapshot.average_piece_download_time() - ) - .unwrap(); - - // Poor mans download speed computation - let elapsed = start_time.elapsed(); - let downloaded_bytes = snapshot.downloaded_and_checked_bytes; - let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64; - writeln!( - buf, - "Total download speed over all time: {:.2}Mbps", - downloaded_mb / elapsed.as_secs_f64() - ) - .unwrap(); - - writeln!(buf, "Download speed: {:.2}Mbps", estimator.download_mbps()).unwrap(); - match estimator.time_remaining() { - Some(time) => { - writeln!(buf, "Time remaining: {:?}", time).unwrap(); - } - None => { - writeln!(buf, "Time remaining: unknown").unwrap(); - } - } - Response::OkVec(buf) - } + move |idx| json_or_404(idx, inner.api_stats(idx)) }); let router = list.or(dump_haves).or(dump_stats); diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 5aad406..2b96efe 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -23,6 +23,7 @@ use parking_lot::{Mutex, RwLock, RwLockReadGuard}; use peer_binary_protocol::{ extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request, }; +use serde::Serialize; use sha1w::Sha1; use tokio::{ sync::{ @@ -185,7 +186,7 @@ impl AtomicStats { } } -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct StatsSnapshot { pub have_bytes: u64, pub downloaded_and_checked_bytes: u64, @@ -197,6 +198,7 @@ pub struct StatsSnapshot { pub live_peers: u32, pub seen_peers: u32, pub connecting_peers: u32, + #[serde(skip)] pub time: Instant, pub queued_peers: u32, total_piece_download_ms: u64,