diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index ba0ce27..6b8bce8 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,65 +1,145 @@ +use std::net::SocketAddr; use std::sync::Arc; -use librqbit_core::speed_estimator::SpeedEstimator; +use parking_lot::RwLock; use std::io::Write; use std::time::Instant; use warp::Filter; -use crate::torrent_state::TorrentState; +use crate::torrent_manager::TorrentManagerHandle; -// This is just a stub for debugging. -// A real http api would know about ALL torrents we are downloading, not just one. -pub async fn make_and_run_http_api( - state: Arc, - estimator: Arc, -) -> anyhow::Result<()> { - let dump_haves = warp::path("haves").map({ - let state = state.clone(); - move || format!("{:?}", state.lock_read().chunks.get_have_pieces()) - }); - - let dump_stats = warp::path("stats").map({ - let state = state.clone(); - let start_time = Instant::now(); - let initial_downloaded_and_checked = state.stats_snapshot().downloaded_and_checked_bytes; - move || { - let snapshot = state.stats_snapshot(); - let mut buf = Vec::new(); - writeln!(buf, "{:#?}", state.stats_snapshot()).unwrap(); - writeln!( - buf, - "Average download time: {:?}", - snapshot.average_piece_download_time() - ) - .unwrap(); - - // Poor mans download speed computation - let elapsed = start_time.elapsed(); - let downloaded_bytes = - snapshot.downloaded_and_checked_bytes - initial_downloaded_and_checked; - let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64; - writeln!( - buf, - "Total download speed over all time: {:.2}Mbps", - downloaded_mb / elapsed.as_secs_f64() - ) - .unwrap(); - - writeln!(buf, "Download speed: {:.2}Mbps", estimator.download_mbps()).unwrap(); - match estimator.time_remaining() { - Some(time) => { - writeln!(buf, "Time remaining: {:?}", time).unwrap(); - } - None => { - writeln!(buf, "Time remaining: unknown").unwrap(); - } - } - buf - } - }); - - let router = dump_haves.or(dump_stats); - - warp::serve(router).run(([127, 0, 0, 1], 3030)).await; - Ok(()) +enum Response { + NotFound(usize), + OkVec(Vec), + OkString(String), +} + +impl warp::Reply for Response { + fn into_response(self) -> warp::reply::Response { + match self { + Response::NotFound(idx) => { + let mut response = warp::reply::Response::new(warp::hyper::Body::from(format!( + "torrent {} not found", + idx + ))); + *response.status_mut() = warp::http::StatusCode::NOT_FOUND; + response + } + Response::OkVec(body) => warp::reply::Response::new(warp::hyper::Body::from(body)), + Response::OkString(body) => warp::reply::Response::new(warp::hyper::Body::from(body)), + } + } +} + +#[derive(Default)] +struct Inner { + torrent_managers: RwLock>, +} + +impl Inner { + fn mgr_handle(&self, idx: usize) -> Option { + self.torrent_managers.read().get(idx).cloned() + } +} + +#[derive(Clone, Default)] +pub struct HttpApi { + inner: Arc, +} + +impl HttpApi { + pub fn new() -> Self { + Default::default() + } + pub fn add_mgr(&self, handle: TorrentManagerHandle) -> usize { + let mut g = self.inner.torrent_managers.write(); + let idx = g.len(); + g.push(handle); + idx + } + + // TODO: this is all for debugging, not even JSON. + // After using this for a bit, not a big fan of warp. + pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { + let inner = self.inner; + + let list = warp::path::end().map({ + let inner = inner.clone(); + move || { + let mut buf = Vec::::new(); + for (idx, handle) in inner.torrent_managers.read().iter().enumerate() { + writeln!( + buf, + "{}: {}\n", + idx, + hex::encode(handle.torrent_state().info_hash()) + ) + .unwrap(); + } + Response::OkVec(buf) + } + }); + + let dump_haves = warp::path!(usize / "haves").map({ + let inner = inner.clone(); + move |idx| { + let mgr = match inner.mgr_handle(idx) { + Some(mgr) => mgr, + None => return Response::NotFound(idx), + }; + return Response::OkString(format!( + "{:?}", + mgr.torrent_state().lock_read().chunks.get_have_pieces(), + )); + } + }); + + let dump_stats = warp::path!(usize / "stats").map({ + let inner = inner.clone(); + let start_time = Instant::now(); + move |idx| { + let mgr = match inner.mgr_handle(idx) { + Some(mgr) => mgr, + None => return Response::NotFound(idx), + }; + let snapshot = mgr.torrent_state().stats_snapshot(); + let estimator = mgr.speed_estimator(); + let mut buf = Vec::new(); + writeln!(buf, "{:#?}", &snapshot).unwrap(); + writeln!( + buf, + "Average download time: {:?}", + snapshot.average_piece_download_time() + ) + .unwrap(); + + // Poor mans download speed computation + let elapsed = start_time.elapsed(); + let downloaded_bytes = snapshot.downloaded_and_checked_bytes; + let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64; + writeln!( + buf, + "Total download speed over all time: {:.2}Mbps", + downloaded_mb / elapsed.as_secs_f64() + ) + .unwrap(); + + writeln!(buf, "Download speed: {:.2}Mbps", estimator.download_mbps()).unwrap(); + match estimator.time_remaining() { + Some(time) => { + writeln!(buf, "Time remaining: {:?}", time).unwrap(); + } + None => { + writeln!(buf, "Time remaining: unknown").unwrap(); + } + } + Response::OkVec(buf) + } + }); + + let router = list.or(dump_haves).or(dump_stats); + + warp::serve(router).run(addr).await; + Ok(()) + } } diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 5c4f1b0..78d467d 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -23,7 +23,6 @@ use size_format::SizeFormatterBinary as SF; use crate::{ chunk_tracker::ChunkTracker, file_ops::FileOps, - http_api::make_and_run_http_api, spawn_utils::{spawn, BlockingSpawner}, torrent_state::TorrentState, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, @@ -116,10 +115,12 @@ impl TorrentManagerHandle { pub fn add_peer(&self, addr: SocketAddr) -> bool { self.manager.state.add_peer_if_not_seen(addr) } - // Not sure why anyone would need that, but as this is a library... pub fn torrent_state(&self) -> &TorrentState { &self.manager.state } + pub fn speed_estimator(&self) -> &Arc { + &self.manager.speed_estimator + } pub async fn cancel(&self) -> anyhow::Result<()> { todo!() } @@ -237,10 +238,6 @@ impl TorrentManager { let this = mgr.clone(); async move { this.stats_printer().await } }); - spawn( - "http api", - make_and_run_http_api(mgr.state.clone(), estimator.clone()), - ); spawn("speed estimator updater", { let state = mgr.state.clone(); async move { diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index f9b640c..27aad84 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -84,6 +84,10 @@ struct Opts { #[clap(short = 'i', long = "tracker-refresh-interval")] force_tracker_interval: Option, + /// The listen address for (debugging) HTTP API + #[clap(long = "http-api-listen-addr", default_value = "127.0.0.1:3030")] + http_api_listen_addr: SocketAddr, + /// Set this flag if you want to use tokio's single threaded runtime. /// It MAY perform better, but the main purpose is easier debugging, as time /// profilers work better with this one. @@ -101,11 +105,7 @@ fn compute_only_files>( let full_path = filename .to_pathbuf() .with_context(|| format!("filename of file {} is not valid utf8", idx))?; - if filename_re.is_match( - full_path - .to_str() - .ok_or_else(|| anyhow::anyhow!("filename of file {} is not valid utf8", idx))?, - ) { + if filename_re.is_match(full_path.to_str().unwrap()) { only_files.push(idx); } } @@ -265,6 +265,9 @@ async fn main_info( } else { None }; + + let http_api_listen_addr = opts.http_api_listen_addr; + let mut builder = TorrentManagerBuilder::new(info, info_hash, opts.output_folder); builder .overwrite(opts.overwrite) @@ -276,7 +279,16 @@ async fn main_info( if let Some(interval) = opts.force_tracker_interval { builder.force_tracker_interval(Duration::from_secs(interval)); } + + let http_api = librqbit::http_api::HttpApi::new(); + spawn("HTTP API", { + let http_api = http_api.clone(); + async move { http_api.make_http_api_and_run(http_api_listen_addr).await } + }); + let handle = builder.start_manager()?; + http_api.add_mgr(handle.clone()); + for url in trackers { handle.add_tracker(url); }