diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index b753552..6172330 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,13 +1,15 @@ -use std::net::SocketAddr; -use std::sync::Arc; - +use anyhow::Context; use dht::{Dht, DhtStats}; use parking_lot::RwLock; use serde::Serialize; +use std::net::SocketAddr; +use std::sync::Arc; use std::time::{Duration, Instant}; +use warp::hyper::body::Bytes; use warp::hyper::Body; use warp::Filter; +use crate::session::Session; use crate::torrent_manager::TorrentManagerHandle; use crate::torrent_state::StatsSnapshot; @@ -15,16 +17,25 @@ struct ApiInternal { dht: Option, startup_time: Instant, torrent_managers: RwLock>, + session: Arc, } impl ApiInternal { - fn new(dht: Option) -> Self { + fn new(session: Arc) -> Self { Self { - dht, + dht: session.get_dht(), startup_time: Instant::now(), torrent_managers: RwLock::new(Vec::new()), + session, } } + + fn add_mgr(&self, handle: TorrentManagerHandle) -> usize { + let mut g = self.torrent_managers.write(); + let idx = g.len(); + g.push(handle); + idx + } } #[derive(Serialize)] @@ -116,6 +127,16 @@ impl ApiInternal { Some(TorrentDetailsResponse { info_hash, files }) } + async fn api_add_torrent(&self, url: String) -> anyhow::Result { + let handle = self + .session + .add_torrent(url, None) + .await + .context("error adding torrent")? + .context("expected session.add_torrent() to return a handle")?; + Ok(self.add_mgr(handle)) + } + fn api_dht_stats(&self) -> Option { self.dht.as_ref().map(|d| d.stats()) } @@ -185,16 +206,13 @@ fn json_or_404(idx: usize, v: Option) -> warp::reply::Response } impl HttpApi { - pub fn new(dht: Option) -> Self { + pub fn new(session: Arc) -> Self { Self { - inner: Arc::new(ApiInternal::new(dht)), + inner: Arc::new(ApiInternal::new(session)), } } pub fn add_mgr(&self, handle: TorrentManagerHandle) -> usize { - let mut g = self.inner.torrent_managers.write(); - let idx = g.len(); - g.push(handle); - idx + self.inner.add_mgr(handle) } pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { @@ -235,11 +253,46 @@ impl HttpApi { } }); - let torrent_list = warp::path!("torrents").map({ + let torrent_list = warp::get().and(warp::path("torrents")).map({ let inner = inner.clone(); move || json_response(inner.api_torrent_list()) }); + let torrent_add = warp::post() + .and(warp::path("torrents")) + .and(warp::body::bytes()) + .and_then({ + let inner = inner.clone(); + use warp::http::Response; + fn make_response(status: u16, body: String) -> Response { + Response::builder().status(status).body(body).unwrap() + } + move |body: Bytes| { + let inner = inner.clone(); + async move { + let url = match String::from_utf8(body.to_vec()) { + Ok(str) => str, + Err(_) => { + return Ok::<_, warp::Rejection>(make_response( + 400, + "invalid utf-8".into(), + )) + } + }; + let idx = inner + .api_add_torrent(url) + .await + .context("error calling HttpApi::api_add_torrent"); + match idx { + Ok(idx) => { + return Ok(make_response(200, format!("{}", idx))); + } + Err(e) => return Ok(make_response(400, format!("{}", e))), + } + } + } + }); + let torrent_details = warp::path!("torrents" / usize).map({ let inner = inner.clone(); move |idx| json_or_404(idx, inner.api_torrent_details(idx)) @@ -264,7 +317,8 @@ impl HttpApi { .or(dht_routing_table) .or(torrent_details) .or(torrent_dump_haves) - .or(torrent_dump_stats); + .or(torrent_dump_stats) + .or(torrent_add); warp::serve(router).run(addr).await; Ok(()) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index b1ffc8c..4f90843 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -25,7 +25,6 @@ use crate::{ pub struct Session { peer_id: Id20, dht: Option, - http_api: Option, peer_opts: PeerConnectionOptions, spawner: BlockingSpawner, output_folder: PathBuf, @@ -95,8 +94,6 @@ pub struct SessionOptions { pub disable_dht: bool, pub disable_dht_persistence: bool, pub dht_config: Option, - pub disable_http_api: bool, - pub http_api_listen_addr: Option, pub peer_id: Option, pub peer_opts: Option, } @@ -124,29 +121,17 @@ impl Session { }; let peer_opts = opts.peer_opts.unwrap_or_default(); - let http_api = if opts.disable_http_api { - None - } else { - let http_api_listen_addr = opts - .http_api_listen_addr - .unwrap_or_else(|| "127.0.0.1:3001".parse().unwrap()); - let http_api = HttpApi::new(dht.clone()); - spawn("HTTP API", { - let http_api = http_api.clone(); - async move { http_api.make_http_api_and_run(http_api_listen_addr).await } - }); - Some(http_api) - }; - Ok(Self { peer_id, dht, - http_api, peer_opts, spawner, output_folder, }) } + pub fn get_dht(&self) -> Option { + self.dht.clone() + } pub async fn add_torrent( &self, url: String, @@ -303,16 +288,7 @@ impl Session { builder.peer_connect_timeout(t); } - // let http_api = HttpApi::new(self.dht.clone()); - // spawn("HTTP API", { - // let http_api = http_api.clone(); - // async move { http_api.make_http_api_and_run(http_api_listen_addr).await } - // }); - let handle = builder.start_manager()?; - if let Some(http_api) = self.http_api.as_ref() { - http_api.add_mgr(handle.clone()); - } for url in trackers { handle.add_tracker(url); diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index df48ff4..a18165d 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -1,8 +1,9 @@ -use std::{net::SocketAddr, str::FromStr, time::Duration}; +use std::{net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; use anyhow::Context; use clap::Clap; use librqbit::{ + http_api::HttpApi, peer_connection::PeerConnectionOptions, session::{AddTorrentOptions, Session, SessionOptions}, spawn_utils::{spawn, BlockingSpawner}, @@ -141,17 +142,26 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> disable_dht: opts.disable_dht, disable_dht_persistence: opts.disable_dht_persistence, dht_config: None, - disable_http_api: false, - http_api_listen_addr: Some(opts.http_api_listen_addr), peer_id: None, peer_opts: Some(PeerConnectionOptions { connect_timeout: opts.peer_connect_timeout.map(|d| d.0), ..Default::default() }), }; - let session = Session::new_with_opts(opts.output_folder.into(), spawner, sopts) - .await - .context("error initializing rqbit session")?; + + let session = Arc::new( + Session::new_with_opts(opts.output_folder.into(), spawner, sopts) + .await + .context("error initializing rqbit session")?, + ); + + { + let http_api = HttpApi::new(session.clone()); + spawn("HTTP API", { + let http_api_listen_addr = opts.http_api_listen_addr; + async move { http_api.make_http_api_and_run(http_api_listen_addr).await } + }); + }; let torrent_opts = AddTorrentOptions { only_files_regex: opts.only_files_matching_regex,