diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 4c754e0..6ba2eba 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,5 +1,9 @@ use anyhow::Context; +use buffers::ByteString; use dht::{Dht, DhtStats}; +use librqbit_core::id20::Id20; +use librqbit_core::torrent_metainfo::TorrentMetaV1Info; +use log::warn; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; @@ -9,7 +13,7 @@ use warp::hyper::body::Bytes; use warp::hyper::Body; use warp::Filter; -use crate::session::{AddTorrentOptions, Session}; +use crate::session::{AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session}; use crate::torrent_manager::TorrentManagerHandle; use crate::torrent_state::StatsSnapshot; @@ -70,16 +74,17 @@ struct TorrentListResponse { torrents: Vec, } -#[derive(Serialize)] -struct TorrentDetailsResponseFile { - name: Option, - length: u64, +#[derive(Serialize, Deserialize)] +pub struct TorrentDetailsResponseFile { + pub name: String, + pub length: u64, + pub included: bool, } -#[derive(Serialize)] -struct TorrentDetailsResponse { - info_hash: String, - files: Vec, +#[derive(Serialize, Deserialize)] +pub struct TorrentDetailsResponse { + pub info_hash: String, + pub files: Vec, } #[derive(Serialize)] @@ -91,6 +96,43 @@ struct StatsResponse { time_remaining: Option, } +#[derive(Serialize, Deserialize)] +pub struct ApiAddTorrentResponse { + pub id: Option, + pub details: TorrentDetailsResponse, +} + +fn make_torrent_details( + info_hash: &Id20, + info: &TorrentMetaV1Info, + only_files: Option<&[usize]>, +) -> TorrentDetailsResponse { + let files = info + .iter_filenames_and_lengths() + .unwrap() + .enumerate() + .map(|(idx, (filename_it, length))| { + let name = match filename_it.to_string() { + Ok(s) => s, + Err(err) => { + warn!("error reading filename: {:?}", err); + "".to_string() + } + }; + let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true); + TorrentDetailsResponseFile { + name, + length, + included, + } + }) + .collect(); + TorrentDetailsResponse { + info_hash: info_hash.as_string(), + files, + } +} + impl ApiInternal { fn mgr_handle(&self, idx: usize) -> Option { self.torrent_managers.read().get(idx).cloned() @@ -113,32 +155,53 @@ impl ApiInternal { fn api_torrent_details(&self, idx: usize) -> Option { let handle = self.mgr_handle(idx)?; - let info_hash = handle.torrent_state().info_hash().as_string(); - let files = handle - .torrent_state() - .info() - .iter_filenames_and_lengths() - .unwrap() - .map(|(filename_it, length)| { - let name = filename_it.to_string().ok(); - TorrentDetailsResponseFile { name, length } - }) - .collect(); - Some(TorrentDetailsResponse { info_hash, files }) + let info_hash = handle.torrent_state().info_hash(); + let only_files = handle.only_files(); + Some(make_torrent_details( + &info_hash, + handle.torrent_state().info(), + only_files, + )) } async fn api_add_torrent( &self, url: String, opts: Option, - ) -> anyhow::Result { - let handle = self + ) -> anyhow::Result { + let response = match self .session .add_torrent(&url, opts) .await .context("error adding torrent")? - .context("expected session.add_torrent() to return a handle")?; - Ok(self.add_mgr(handle)) + { + AddTorrentResponse::AlreadyManaged(managed) => anyhow::bail!( + "{:?} is already managed, downloaded to {:?}", + managed.info_hash, + managed.output_folder + ), + AddTorrentResponse::ListOnly(ListOnlyResponse { + info_hash, + info, + only_files, + }) => ApiAddTorrentResponse { + id: None, + details: make_torrent_details(&info_hash, &info, only_files.as_deref()), + }, + AddTorrentResponse::Added(handle) => { + let details = make_torrent_details( + &handle.torrent_state().info_hash(), + handle.torrent_state().info(), + handle.only_files(), + ); + let id = self.add_mgr(handle); + ApiAddTorrentResponse { + id: Some(id), + details, + } + } + }; + Ok(response) } fn api_dht_stats(&self) -> Option { @@ -214,6 +277,7 @@ pub struct TorrentAddQueryParams { pub overwrite: Option, pub output_folder: Option, pub only_files_regex: Option, + pub list_only: Option, } impl HttpApi { @@ -279,7 +343,7 @@ impl HttpApi { .and_then({ let inner = inner.clone(); use warp::http::Response; - fn make_response(status: u16, body: String) -> Response { + fn make_response(status: u16, body: T) -> Response { Response::builder().status(status).body(body).unwrap() } move |body: Bytes, params: TorrentAddQueryParams| { @@ -298,17 +362,16 @@ impl HttpApi { overwrite: params.overwrite.unwrap_or(false), only_files_regex: params.only_files_regex, output_folder: params.output_folder, + list_only: params.list_only.unwrap_or(false), ..Default::default() }; - let idx = inner + match inner .api_add_torrent(url, Some(opts)) .await - .context("error calling HttpApi::api_add_torrent"); - match idx { - Ok(idx) => { - return Ok(make_response(200, format!("{}", idx))); - } - Err(e) => return Ok(make_response(400, format!("{:#}", e))), + .context("error calling HttpApi::api_add_torrent") + { + Ok(response) => Ok(json_response(response)), + Err(err) => Ok(make_response(400, format!("{:#?}", err).into())), } } } diff --git a/crates/librqbit/src/http_api_client.rs b/crates/librqbit/src/http_api_client.rs index e6a2bc5..569e2f4 100644 --- a/crates/librqbit/src/http_api_client.rs +++ b/crates/librqbit/src/http_api_client.rs @@ -30,9 +30,9 @@ struct ApiRoot { } async fn json_response( - url: &reqwest::Url, response: reqwest::Response, ) -> anyhow::Result { + let url = response.url().clone(); let response = check_response(response).await?; let body = response.bytes().await?; let response: T = serde_json::from_slice(&body).with_context(|| { @@ -59,7 +59,7 @@ impl HttpApiClient { pub async fn validate_rqbit_server(&self) -> anyhow::Result<()> { let response = self.client.get(self.base_url.clone()).send().await?; - let root: ApiRoot = json_response(&self.base_url, response).await?; + let root: ApiRoot = json_response(response).await?; if root.server == "rqbit" { return Ok(()); } @@ -70,12 +70,13 @@ impl HttpApiClient { &self, torrent: &str, opts: Option, - ) -> anyhow::Result { + ) -> anyhow::Result { let opts = opts.unwrap_or_default(); let params = TorrentAddQueryParams { overwrite: Some(opts.overwrite), only_files_regex: opts.only_files_regex, output_folder: opts.output_folder, + list_only: Some(opts.list_only), }; let qs = serde_urlencoded::to_string(¶ms).unwrap(); let url = format!("{}torrents?{}", &self.base_url, qs); @@ -87,6 +88,6 @@ impl HttpApiClient { .await?, ) .await?; - Ok(response.text().await?.parse::()?) + json_response(response).await } } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index bba13f9..da9705b 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -8,13 +8,11 @@ use librqbit_core::{ peer_id::generate_peer_id, torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned}, }; -use log::{info, warn}; +use log::{debug, info, warn}; use parking_lot::RwLock; use reqwest::Url; use tokio_stream::StreamExt; -use size_format::SizeFormatterBinary as SF; - use crate::{ dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, peer_connection::PeerConnectionOptions, @@ -46,14 +44,19 @@ pub struct SessionLocked { torrents: Vec, } +enum SessionLockedAddTorrentResult { + AlreadyManaged(ManagedTorrent), + Added(usize), +} + impl SessionLocked { - fn add_torrent(&mut self, torrent: ManagedTorrent) -> Option { - if self.torrents.contains(&torrent) { - return None; + fn add_torrent(&mut self, torrent: ManagedTorrent) -> SessionLockedAddTorrentResult { + if let Some(handle) = self.torrents.iter().find(|t| **t == torrent) { + return SessionLockedAddTorrentResult::AlreadyManaged(handle.clone()); } let idx = self.torrents.len(); self.torrents.push(torrent); - Some(idx) + SessionLockedAddTorrentResult::Added(idx) } } @@ -125,6 +128,18 @@ pub struct AddTorrentOptions { pub force_tracker_interval: Option, } +pub struct ListOnlyResponse { + pub info_hash: Id20, + pub info: TorrentMetaV1Info, + pub only_files: Option>, +} + +pub enum AddTorrentResponse { + AlreadyManaged(ManagedTorrent), + ListOnly(ListOnlyResponse), + Added(TorrentManagerHandle), +} + #[derive(Default)] pub struct SessionOptions { pub disable_dht: bool, @@ -179,7 +194,7 @@ impl Session { &self, url: &str, opts: Option, - ) -> anyhow::Result> { + ) -> anyhow::Result { // Magnet links are different in that we first need to discover the metadata. let opts = opts.unwrap_or_default(); if url.starts_with("magnet:") { @@ -278,15 +293,17 @@ impl Session { initial_peers: Vec, trackers: Vec, opts: AddTorrentOptions, - ) -> anyhow::Result> { - info!("Torrent info: {:#?}", &info); + ) -> anyhow::Result { + debug!("Torrent info: {:#?}", &info); let only_files = if let Some(filename_re) = opts.only_files_regex { let only_files = compute_only_files(&info, &filename_re)?; for (idx, (filename, _)) in info.iter_filenames_and_lengths()?.enumerate() { if !only_files.contains(&idx) { continue; } - info!("Will download {:?}", filename); + if !opts.list_only { + info!("Will download {:?}", filename); + } } Some(only_files) } else { @@ -294,20 +311,11 @@ impl Session { }; if opts.list_only { - for (idx, (filename, len)) in info.iter_filenames_and_lengths()?.enumerate() { - let included = match &only_files { - Some(files) => files.contains(&idx), - None => true, - }; - info!( - "File {}, size {}{}", - filename.to_string()?, - SF::new(len), - if included { "" } else { "will skip" } - ) - } - info!("--list was passed, nothing to do, exiting."); - return Ok(None); + return Ok(AddTorrentResponse::ListOnly(ListOnlyResponse { + info_hash, + info, + only_files, + })); } let output_folder = opts @@ -321,13 +329,12 @@ impl Session { state: ManagedTorrentState::Initializing, }; - if self.locked.write().add_torrent(managed_torrent).is_none() { - anyhow::bail!( - "torrent with info_hash {:?} that is downloaded to {:?} is already managed", - info_hash, - &output_folder - ); - }; + match self.locked.write().add_torrent(managed_torrent) { + SessionLockedAddTorrentResult::AlreadyManaged(managed) => { + return Ok(AddTorrentResponse::AlreadyManaged(managed)) + } + SessionLockedAddTorrentResult::Added(_) => {} + } let mut builder = TorrentManagerBuilder::new(info, info_hash, output_folder.clone()); builder @@ -400,6 +407,6 @@ impl Session { }); } - Ok(Some(handle)) + Ok(AddTorrentResponse::Added(handle)) } } diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 8a6965c..6bb8dc7 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -118,6 +118,9 @@ impl TorrentManagerHandle { false } } + pub fn only_files(&self) -> Option<&[usize]> { + self.manager.options.only_files.as_deref() + } pub fn add_peer(&self, addr: SocketAddr) -> bool { self.manager.state.add_peer_if_not_seen(addr) } @@ -234,7 +237,7 @@ impl TorrentManager { name, length, err ); } else { - info!( + debug!( "Set length for file {:?} to {} in {:?}", name, SF::new(length), diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index ad5dea1..ac5e6cd 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -3,10 +3,13 @@ use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc, time::Duratio use anyhow::Context; use clap::{ArgEnum, Parser}; use librqbit::{ - http_api::HttpApi, + http_api::{ApiAddTorrentResponse, HttpApi}, http_api_client, peer_connection::PeerConnectionOptions, - session::{AddTorrentOptions, ManagedTorrentState, Session, SessionOptions}, + session::{ + AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, ManagedTorrentState, Session, + SessionOptions, + }, spawn_utils::{spawn, BlockingSpawner}, }; use log::{error, info, warn}; @@ -273,18 +276,29 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> true } Err(err) => { - info!( - "HTTP API at {} returned {:?}, will start the server within this process", - client.base_url(), - err - ); + warn!("Error checking HTTP API at {}: {:}", client.base_url(), err); false } }; if connect_to_existing { for torrent_url in &download_opts.torrent_path { - match client.add_torrent(torrent_url, Some(torrent_opts.clone())).await { - Ok(id) => info!("{} added to the server with index {}. Query {}/torrents/{}/(stats/haves) for details", torrent_url, id, http_api_url, id), + match client + .add_torrent(torrent_url, Some(torrent_opts.clone())) + .await + { + Ok(ApiAddTorrentResponse { id, details }) => { + if let Some(id) = id { + info!("{} added to the server with index {}. Query {}/torrents/{}/(stats/haves) for details", details.info_hash, id, http_api_url, id) + } + for file in details.files { + info!( + "file {:?}, size {}{}", + file.name, + SF::new(file.length), + if file.included { "" } else { ", will skip" } + ) + } + } Err(err) => warn!("error adding {}: {:?}", torrent_url, err), } } @@ -317,11 +331,40 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> for path in &download_opts.torrent_path { let handle = match session.add_torrent(path, Some(torrent_opts.clone())).await { - Ok(Some(handle)) => { - added = true; - handle - } - Ok(None) => continue, + Ok(v) => match v { + AddTorrentResponse::AlreadyManaged(handle) => { + info!( + "torrent {:?} is already managed, downloaded to {:?}", + handle.info_hash, handle.output_folder + ); + continue; + } + AddTorrentResponse::ListOnly(ListOnlyResponse { + info_hash: _, + info, + only_files, + }) => { + for (idx, (filename, len)) in + info.iter_filenames_and_lengths()?.enumerate() + { + let included = match &only_files { + Some(files) => files.contains(&idx), + None => true, + }; + info!( + "File {}, size {}{}", + filename.to_string()?, + SF::new(len), + if included { "" } else { ", will skip" } + ) + } + continue; + } + AddTorrentResponse::Added(handle) => { + added = true; + handle + } + }, Err(err) => { error!("error adding {:?}: {:?}", &path, err); continue; @@ -331,7 +374,9 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> http_api.add_mgr(handle.clone()); } - if added { + if download_opts.list { + Ok(()) + } else if added { loop { tokio::time::sleep(Duration::from_secs(60)).await; }