diff --git a/Cargo.lock b/Cargo.lock index 4c4aa43..27c993e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -683,6 +683,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" + [[package]] name = "httparse" version = "1.8.0" @@ -862,6 +868,7 @@ dependencies = [ "size_format", "tokio", "tokio-stream", + "tower-http", "tracing", "tracing-subscriber", "url", @@ -1944,6 +1951,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +dependencies = [ + "bitflags 2.4.1", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-layer" version = "0.3.2" diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 8386fdd..3f23817 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -31,6 +31,7 @@ dht = {path = "../dht", package="librqbit-dht", version="3.0.0"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} axum = {version = "0.6"} +tower-http = {version = "0.4", features = ["cors", "trace"]} tokio-stream = "0.1" serde = {version = "1", features=["derive"]} serde_json = "1" diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 0b96dc3..d073ab6 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,4 +1,5 @@ use anyhow::Context; +use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; use axum::routing::get; @@ -12,13 +13,16 @@ use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; +use tower_http::cors::{AllowHeaders, AllowOrigin}; use tracing::{info, warn}; use axum::Router; use crate::http_api_error::{ApiError, ApiErrorExt}; use crate::peer_state::PeerStatsFilter; -use crate::session::{AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session}; +use crate::session::{ + AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, +}; use crate::torrent_manager::TorrentManagerHandle; use crate::torrent_state::StatsSnapshot; @@ -75,10 +79,14 @@ impl HttpApi { async fn torrents_post( State(state): State, Query(params): Query, - url: String, + data: Bytes, ) -> Result { let opts = params.into_add_torrent_options(); - state.api_add_torrent(url, Some(opts)).await.map(axum::Json) + let add = match String::from_utf8(data.to_vec()) { + Ok(s) => AddTorrent::from(s), + Err(e) => AddTorrent::from(e.into_bytes()), + }; + state.api_add_torrent(add, Some(opts)).await.map(axum::Json) } async fn torrent_details( @@ -119,6 +127,12 @@ impl HttpApi { .route("/torrents/:id/haves", get(torrent_haves)) .route("/torrents/:id/stats", get(torrent_stats)) .route("/torrents/:id/peer_stats", get(peer_stats)) + .layer( + tower_http::cors::CorsLayer::default() + .allow_origin(AllowOrigin::predicate(|_, _| true)) + .allow_headers(AllowHeaders::any()), + ) + .layer(tower_http::trace::TraceLayer::new_for_http()) .with_state(state); info!("starting HTTP server on {}", addr); @@ -177,13 +191,33 @@ pub struct TorrentDetailsResponse { pub files: Vec, } +struct DurationWithHumanReadable(Duration); + +impl Serialize for DurationWithHumanReadable { + fn serialize(&self, serializer: S) -> core::result::Result + where + S: serde::Serializer, + { + #[derive(Serialize)] + struct Tmp { + duration: Duration, + human_readable: String, + } + Tmp { + duration: self.0, + human_readable: format!("{:?}", self.0), + } + .serialize(serializer) + } +} + #[derive(Serialize)] struct StatsResponse { snapshot: StatsSnapshot, average_piece_download_time: Option, download_speed: Speed, all_time_download_speed: Speed, - time_remaining: Option, + time_remaining: Option, } #[derive(Serialize, Deserialize)] @@ -282,12 +316,12 @@ impl ApiInternal { pub async fn api_add_torrent( &self, - url: String, + add: AddTorrent<'_>, opts: Option, ) -> Result { let response = match self .session - .add_torrent(&url, opts) + .add_torrent(add, opts) .await .context("error adding torrent") .with_error_status_code(StatusCode::BAD_REQUEST)? @@ -353,7 +387,7 @@ impl ApiInternal { snapshot, all_time_download_speed: (downloaded_mb / elapsed.as_secs_f64()).into(), download_speed: estimator.download_mbps().into(), - time_remaining: estimator.time_remaining(), + time_remaining: estimator.time_remaining().map(DurationWithHumanReadable), }) } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 8033572..394e8bc 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1,4 +1,4 @@ -use std::{fs::File, io::Read, net::SocketAddr, path::PathBuf, time::Duration}; +use std::{borrow::Cow, fs::File, io::Read, net::SocketAddr, path::PathBuf, time::Duration}; use anyhow::Context; use buffers::ByteString; @@ -141,6 +141,29 @@ pub enum AddTorrentResponse { Added(TorrentManagerHandle), } +pub enum AddTorrent<'a> { + Url(Cow<'a, str>), + TorrentFileBytes(Vec), +} + +impl<'a> From<&'a str> for AddTorrent<'a> { + fn from(s: &'a str) -> Self { + Self::Url(Cow::Borrowed(s)) + } +} + +impl<'a> From for AddTorrent<'a> { + fn from(s: String) -> Self { + Self::Url(Cow::Owned(s)) + } +} + +impl<'a> From> for AddTorrent<'a> { + fn from(b: Vec) -> Self { + Self::TorrentFileBytes(b) + } +} + #[derive(Default)] pub struct SessionOptions { pub disable_dht: bool, @@ -193,99 +216,110 @@ impl Session { } pub async fn add_torrent( &self, - url: &str, + add: impl Into>, opts: Option, ) -> anyhow::Result { // Magnet links are different in that we first need to discover the metadata. let opts = opts.unwrap_or_default(); - if url.starts_with("magnet:") { - let Magnet { - info_hash, - trackers, - } = Magnet::parse(url).context("provided path is not a valid magnet URL")?; - let dht_rx = self - .dht - .as_ref() - .context("magnet links without DHT are not supported")? - .get_peers(info_hash) - .await?; + let (info_hash, info, dht_rx, trackers, initial_peers) = match add.into() { + AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { + let Magnet { + info_hash, + trackers, + } = Magnet::parse(&*magnet).context("provided path is not a valid magnet URL")?; - let trackers = trackers - .into_iter() - .filter_map(|url| match reqwest::Url::parse(&url) { - Ok(url) => Some(url), - Err(e) => { - warn!("error parsing tracker {} as url: {}", url, e); - None - } - }) - .collect(); + let dht_rx = self + .dht + .as_ref() + .context("magnet links without DHT are not supported")? + .get_peers(info_hash) + .await?; - let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver( - self.peer_id, - info_hash, - dht_rx, - Some(self.peer_opts), - ) - .await - { - ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), - ReadMetainfoResult::ChannelClosed { .. } => { - anyhow::bail!("DHT died, no way to discover torrent metainfo") - } - }; - self.main_torrent_info( - info_hash, - info, - Some(dht_rx), - initial_peers.into_iter().collect(), - trackers, - opts, - ) - .await - } else { - let torrent = if url.starts_with("http://") || url.starts_with("https://") { - torrent_from_url(url).await? - } else { - torrent_from_file(url)? - }; - let dht_rx = match self.dht.as_ref() { - Some(dht) => { - debug!("reading peers for {:?} from DHT", torrent.info_hash); - Some(dht.get_peers(torrent.info_hash).await?) - } - None => None, - }; - let trackers = torrent - .iter_announce() - .filter_map(|tracker| { - let url = match std::str::from_utf8(tracker.as_ref()) { - Ok(url) => url, - Err(_) => { - warn!("cannot parse tracker url as utf-8, ignoring"); - return None; - } - }; - match Url::parse(url) { + let trackers = trackers + .into_iter() + .filter_map(|url| match reqwest::Url::parse(&url) { Ok(url) => Some(url), Err(e) => { - warn!("cannot parse tracker URL {}: {}", url, e); + warn!("error parsing tracker {} as url: {}", url, e); None } + }) + .collect(); + + let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver( + self.peer_id, + info_hash, + dht_rx, + Some(self.peer_opts), + ) + .await + { + ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), + ReadMetainfoResult::ChannelClosed { .. } => { + anyhow::bail!("DHT died, no way to discover torrent metainfo") } - }) - .collect::>(); - self.main_torrent_info( - torrent.info_hash, - torrent.info, - dht_rx, - Vec::new(), - trackers, - opts, - ) - .await - } + }; + (info_hash, info, Some(dht_rx), trackers, initial_peers) + } + other => { + let torrent = match other { + AddTorrent::Url(url) + if url.starts_with("http://") || url.starts_with("https://") => + { + torrent_from_url(&*url).await? + } + AddTorrent::Url(filename) => torrent_from_file(&*filename)?, + AddTorrent::TorrentFileBytes(bytes) => { + torrent_from_bytes(&bytes).context("error decoding torrent")? + } + }; + + let dht_rx = match self.dht.as_ref() { + Some(dht) => { + debug!("reading peers for {:?} from DHT", torrent.info_hash); + Some(dht.get_peers(torrent.info_hash).await?) + } + None => None, + }; + let trackers = torrent + .iter_announce() + .filter_map(|tracker| { + let url = match std::str::from_utf8(tracker.as_ref()) { + Ok(url) => url, + Err(_) => { + warn!("cannot parse tracker url as utf-8, ignoring"); + return None; + } + }; + match Url::parse(url) { + Ok(url) => Some(url), + Err(e) => { + warn!("cannot parse tracker URL {}: {}", url, e); + None + } + } + }) + .collect::>(); + ( + torrent.info_hash, + torrent.info, + dht_rx, + trackers, + Default::default(), + ) + } + }; + + self.main_torrent_info( + info_hash, + info, + dht_rx, + initial_peers.into_iter().collect(), + trackers, + opts, + ) + .await } #[allow(clippy::too_many_arguments)] diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 29addef..d0745f3 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -381,7 +381,10 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> let mut handles = Vec::new(); for path in &download_opts.torrent_path { - let handle = match session.add_torrent(path, Some(torrent_opts.clone())).await { + let handle = match session + .add_torrent(path.as_str(), Some(torrent_opts.clone())) + .await + { Ok(v) => match v { AddTorrentResponse::AlreadyManaged(handle) => { info!(