diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs new file mode 100644 index 0000000..1e1574a --- /dev/null +++ b/crates/librqbit/src/api.rs @@ -0,0 +1,262 @@ +use std::{net::SocketAddr, sync::Arc}; + +use anyhow::Context; +use buffers::ByteString; +use dht::{DhtStats, Id20}; +use http::StatusCode; +use librqbit_core::torrent_metainfo::TorrentMetaV1Info; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::UnboundedSender; +use tracing::warn; + +use crate::{ + api_error::{ApiError, ApiErrorExt}, + session::{ + AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId, + }, + torrent_state::{ + peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}, + stats::{LiveStats, TorrentStats}, + ManagedTorrentHandle, + }, +}; + +pub type Result = std::result::Result; + +// Library API for use in different web frameworks. +// Contains all methods you might want to expose with (de)serializable inputs/outputs. +pub struct Api { + session: Arc, + rust_log_reload_tx: Option>, +} + +impl Api { + pub fn new(session: Arc, rust_log_reload_tx: Option>) -> Self { + Self { + session, + rust_log_reload_tx, + } + } + + pub fn mgr_handle(&self, idx: TorrentId) -> Result { + self.session + .get(idx) + .ok_or(ApiError::torrent_not_found(idx)) + } + + pub fn api_torrent_list(&self) -> TorrentListResponse { + let items = self.session.with_torrents(|torrents| { + torrents + .map(|(id, mgr)| TorrentListResponseItem { + id, + info_hash: mgr.info().info_hash.as_string(), + }) + .collect() + }); + TorrentListResponse { torrents: items } + } + + pub fn api_torrent_details(&self, idx: TorrentId) -> Result { + let handle = self.mgr_handle(idx)?; + let info_hash = handle.info().info_hash; + let only_files = handle.only_files(); + make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref()) + } + + pub fn api_peer_stats( + &self, + idx: TorrentId, + filter: PeerStatsFilter, + ) -> Result { + let handle = self.mgr_handle(idx)?; + Ok(handle + .live() + .context("not live")? + .per_peer_stats_snapshot(filter)) + } + + pub fn api_torrent_action_pause(&self, idx: TorrentId) -> Result { + let handle = self.mgr_handle(idx)?; + handle + .pause() + .context("error pausing torrent") + .with_error_status_code(StatusCode::BAD_REQUEST)?; + Ok(Default::default()) + } + + pub fn api_torrent_action_start(&self, idx: TorrentId) -> Result { + let handle = self.mgr_handle(idx)?; + self.session + .unpause(&handle) + .context("error unpausing torrent") + .with_error_status_code(StatusCode::BAD_REQUEST)?; + Ok(Default::default()) + } + + pub fn api_torrent_action_forget(&self, idx: TorrentId) -> Result { + self.session + .delete(idx, false) + .context("error forgetting torrent")?; + Ok(Default::default()) + } + + pub fn api_torrent_action_delete(&self, idx: TorrentId) -> Result { + self.session + .delete(idx, true) + .context("error deleting torrent with files")?; + Ok(Default::default()) + } + + pub fn api_set_rust_log(&self, new_value: String) -> Result { + let tx = self + .rust_log_reload_tx + .as_ref() + .context("rust_log_reload_tx was not set")?; + tx.send(new_value) + .context("noone is listening to RUST_LOG changes")?; + Ok(Default::default()) + } + + pub async fn api_add_torrent( + &self, + add: AddTorrent<'_>, + opts: Option, + ) -> Result { + let response = match self + .session + .add_torrent(add, opts) + .await + .context("error adding torrent") + .with_error_status_code(StatusCode::BAD_REQUEST)? + { + AddTorrentResponse::AlreadyManaged(id, managed) => { + return Err(anyhow::anyhow!( + "{:?} is already managed, id={}, downloaded to {:?}", + managed.info_hash(), + id, + &managed.info().out_dir + )) + .with_error_status_code(StatusCode::CONFLICT); + } + AddTorrentResponse::ListOnly(ListOnlyResponse { + info_hash, + info, + only_files, + seen_peers, + }) => ApiAddTorrentResponse { + id: None, + seen_peers: Some(seen_peers), + details: make_torrent_details(&info_hash, &info, only_files.as_deref()) + .context("error making torrent details")?, + }, + AddTorrentResponse::Added(id, handle) => { + let details = make_torrent_details( + &handle.info_hash(), + &handle.info().info, + handle.only_files().as_deref(), + ) + .context("error making torrent details")?; + ApiAddTorrentResponse { + id: Some(id), + details, + seen_peers: None, + } + } + }; + Ok(response) + } + + pub fn api_dht_stats(&self) -> Result { + self.session + .get_dht() + .as_ref() + .map(|d| d.stats()) + .ok_or(ApiError::dht_disabled()) + } + + pub fn api_dht_table(&self) -> Result { + let dht = self.session.get_dht().ok_or(ApiError::dht_disabled())?; + Ok(dht.with_routing_table(|r| r.clone())) + } + + pub fn api_stats_v0(&self, idx: TorrentId) -> Result { + let mgr = self.mgr_handle(idx)?; + let live = mgr.live().context("torrent not live")?; + Ok(LiveStats::from(&*live)) + } + + pub fn api_stats_v1(&self, idx: TorrentId) -> Result { + let mgr = self.mgr_handle(idx)?; + Ok(mgr.stats()) + } + + pub fn api_dump_haves(&self, idx: usize) -> Result { + let mgr = self.mgr_handle(idx)?; + Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?) + } +} + +#[derive(Serialize)] +pub struct TorrentListResponseItem { + pub id: usize, + pub info_hash: String, +} + +#[derive(Serialize)] +pub struct TorrentListResponse { + pub torrents: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct TorrentDetailsResponseFile { + pub name: String, + pub length: u64, + pub included: bool, +} + +#[derive(Default, Serialize)] +pub struct EmptyJsonResponse {} + +#[derive(Serialize, Deserialize)] +pub struct TorrentDetailsResponse { + pub info_hash: String, + pub files: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct ApiAddTorrentResponse { + pub id: Option, + pub details: TorrentDetailsResponse, + pub seen_peers: Option>, +} + +fn make_torrent_details( + info_hash: &Id20, + info: &TorrentMetaV1Info, + only_files: Option<&[usize]>, +) -> Result { + let files = info + .iter_filenames_and_lengths() + .context("error iterating filenames and lengths")? + .enumerate() + .map(|(idx, (filename_it, length))| { + let name = match filename_it.to_string() { + Ok(s) => s, + Err(err) => { + warn!("error reading filename: {:?}", err); + "".to_string() + } + }; + let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true); + TorrentDetailsResponseFile { + name, + length, + included, + } + }) + .collect(); + Ok(TorrentDetailsResponse { + info_hash: info_hash.as_string(), + files, + }) +} diff --git a/crates/librqbit/src/http_api_error.rs b/crates/librqbit/src/api_error.rs similarity index 100% rename from crates/librqbit/src/http_api_error.rs rename to crates/librqbit/src/api_error.rs diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index b85d630..5a682f7 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -3,36 +3,31 @@ use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; use axum::routing::{get, post}; -use buffers::ByteString; -use dht::DhtStats; -use http::StatusCode; use itertools::Itertools; -use librqbit_core::id20::Id20; -use librqbit_core::torrent_metainfo::TorrentMetaV1Info; + use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; -use tracing::{info, warn}; +use tracing::info; use axum::Router; -use crate::http_api_error::{ApiError, ApiErrorExt}; +use crate::api::Api; use crate::peer_connection::PeerConnectionOptions; -use crate::session::{ - AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId, - SUPPORTED_SCHEMES, -}; -use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}; -use crate::torrent_state::stats::{LiveStats, TorrentStats}; -use crate::torrent_state::ManagedTorrentHandle; +use crate::session::{AddTorrent, AddTorrentOptions, Session, SUPPORTED_SCHEMES}; +use crate::torrent_state::peer::stats::snapshot::PeerStatsFilter; + +type ApiState = Arc; + +use crate::api::Result; // Public API #[derive(Clone)] pub struct HttpApi { - inner: Arc, + inner: ApiState, } impl HttpApi { @@ -210,6 +205,7 @@ impl HttpApi { #[cfg(feature = "webui")] { + use tracing::warn; let webui_router = Router::new() .route( "/", @@ -271,42 +267,6 @@ impl HttpApi { } } -type Result = std::result::Result; - -#[derive(Serialize)] -pub struct TorrentListResponseItem { - pub id: usize, - pub info_hash: String, -} - -#[derive(Serialize)] -pub struct TorrentListResponse { - pub torrents: Vec, -} - -#[derive(Serialize, Deserialize)] -pub struct TorrentDetailsResponseFile { - pub name: String, - pub length: u64, - pub included: bool, -} - -#[derive(Default, Serialize)] -pub struct EmptyJsonResponse {} - -#[derive(Serialize, Deserialize)] -pub struct TorrentDetailsResponse { - pub info_hash: String, - pub files: Vec, -} - -#[derive(Serialize, Deserialize)] -pub struct ApiAddTorrentResponse { - pub id: Option, - pub details: TorrentDetailsResponse, - pub seen_peers: Option>, -} - pub struct OnlyFiles(Vec); pub struct InitialPeers(pub Vec); @@ -411,208 +371,3 @@ impl TorrentAddQueryParams { } } } - -// Private HTTP API internals. Agnostic of web framework. -pub struct Api { - session: Arc, - rust_log_reload_tx: Option>, -} - -type ApiState = Arc; - -impl Api { - pub fn new(session: Arc, rust_log_reload_tx: Option>) -> Self { - Self { - session, - rust_log_reload_tx, - } - } - - pub fn mgr_handle(&self, idx: TorrentId) -> Result { - self.session - .get(idx) - .ok_or(ApiError::torrent_not_found(idx)) - } - - pub fn api_torrent_list(&self) -> TorrentListResponse { - let items = self.session.with_torrents(|torrents| { - torrents - .map(|(id, mgr)| TorrentListResponseItem { - id, - info_hash: mgr.info().info_hash.as_string(), - }) - .collect() - }); - TorrentListResponse { torrents: items } - } - - pub fn api_torrent_details(&self, idx: TorrentId) -> Result { - let handle = self.mgr_handle(idx)?; - let info_hash = handle.info().info_hash; - let only_files = handle.only_files(); - make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref()) - } - - pub fn api_peer_stats( - &self, - idx: TorrentId, - filter: PeerStatsFilter, - ) -> Result { - let handle = self.mgr_handle(idx)?; - Ok(handle - .live() - .context("not live")? - .per_peer_stats_snapshot(filter)) - } - - pub fn api_torrent_action_pause(&self, idx: TorrentId) -> Result { - let handle = self.mgr_handle(idx)?; - handle - .pause() - .context("error pausing torrent") - .with_error_status_code(StatusCode::BAD_REQUEST)?; - Ok(Default::default()) - } - - pub fn api_torrent_action_start(&self, idx: TorrentId) -> Result { - let handle = self.mgr_handle(idx)?; - self.session - .unpause(&handle) - .context("error unpausing torrent") - .with_error_status_code(StatusCode::BAD_REQUEST)?; - Ok(Default::default()) - } - - pub fn api_torrent_action_forget(&self, idx: TorrentId) -> Result { - self.session - .delete(idx, false) - .context("error forgetting torrent")?; - Ok(Default::default()) - } - - pub fn api_torrent_action_delete(&self, idx: TorrentId) -> Result { - self.session - .delete(idx, true) - .context("error deleting torrent with files")?; - Ok(Default::default()) - } - - pub fn api_set_rust_log(&self, new_value: String) -> Result { - let tx = self - .rust_log_reload_tx - .as_ref() - .context("rust_log_reload_tx was not set")?; - tx.send(new_value) - .context("noone is listening to RUST_LOG changes")?; - Ok(Default::default()) - } - - pub async fn api_add_torrent( - &self, - add: AddTorrent<'_>, - opts: Option, - ) -> Result { - let response = match self - .session - .add_torrent(add, opts) - .await - .context("error adding torrent") - .with_error_status_code(StatusCode::BAD_REQUEST)? - { - AddTorrentResponse::AlreadyManaged(id, managed) => { - return Err(anyhow::anyhow!( - "{:?} is already managed, id={}, downloaded to {:?}", - managed.info_hash(), - id, - &managed.info().out_dir - )) - .with_error_status_code(StatusCode::CONFLICT); - } - AddTorrentResponse::ListOnly(ListOnlyResponse { - info_hash, - info, - only_files, - seen_peers, - }) => ApiAddTorrentResponse { - id: None, - seen_peers: Some(seen_peers), - details: make_torrent_details(&info_hash, &info, only_files.as_deref()) - .context("error making torrent details")?, - }, - AddTorrentResponse::Added(id, handle) => { - let details = make_torrent_details( - &handle.info_hash(), - &handle.info().info, - handle.only_files().as_deref(), - ) - .context("error making torrent details")?; - ApiAddTorrentResponse { - id: Some(id), - details, - seen_peers: None, - } - } - }; - Ok(response) - } - - pub fn api_dht_stats(&self) -> Result { - self.session - .get_dht() - .as_ref() - .map(|d| d.stats()) - .ok_or(ApiError::dht_disabled()) - } - - pub fn api_dht_table(&self) -> Result { - let dht = self.session.get_dht().ok_or(ApiError::dht_disabled())?; - Ok(dht.with_routing_table(|r| r.clone())) - } - - pub fn api_stats_v0(&self, idx: TorrentId) -> Result { - let mgr = self.mgr_handle(idx)?; - let live = mgr.live().context("torrent not live")?; - Ok(LiveStats::from(&*live)) - } - - pub fn api_stats_v1(&self, idx: TorrentId) -> Result { - let mgr = self.mgr_handle(idx)?; - Ok(mgr.stats()) - } - - pub fn api_dump_haves(&self, idx: usize) -> Result { - let mgr = self.mgr_handle(idx)?; - Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?) - } -} - -fn make_torrent_details( - info_hash: &Id20, - info: &TorrentMetaV1Info, - only_files: Option<&[usize]>, -) -> Result { - let files = info - .iter_filenames_and_lengths() - .context("error iterating filenames and lengths")? - .enumerate() - .map(|(idx, (filename_it, length))| { - let name = match filename_it.to_string() { - Ok(s) => s, - Err(err) => { - warn!("error reading filename: {:?}", err); - "".to_string() - } - }; - let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true); - TorrentDetailsResponseFile { - name, - length, - included, - } - }) - .collect(); - Ok(TorrentDetailsResponse { - info_hash: info_hash.as_string(), - files, - }) -} diff --git a/crates/librqbit/src/http_api_client.rs b/crates/librqbit/src/http_api_client.rs index f763af9..2b6bf77 100644 --- a/crates/librqbit/src/http_api_client.rs +++ b/crates/librqbit/src/http_api_client.rs @@ -2,6 +2,7 @@ use anyhow::Context; use serde::Deserialize; use crate::{ + api::ApiAddTorrentResponse, http_api::TorrentAddQueryParams, session::{AddTorrent, AddTorrentOptions}, }; @@ -82,7 +83,7 @@ impl HttpApiClient { &self, torrent: AddTorrent<'_>, opts: Option, - ) -> anyhow::Result { + ) -> anyhow::Result { let opts = opts.unwrap_or_default(); let params = TorrentAddQueryParams { overwrite: Some(opts.overwrite), diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 94cf4f0..cb7934e 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -1,9 +1,10 @@ +pub mod api; +pub mod api_error; pub mod chunk_tracker; pub mod dht_utils; pub mod file_ops; pub mod http_api; pub mod http_api_client; -pub mod http_api_error; pub mod peer_connection; pub mod peer_info_reader; pub mod session; diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 5e5b72c..e2fa5b7 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -3,7 +3,8 @@ use std::{io::LineWriter, net::SocketAddr, path::PathBuf, sync::Arc, time::Durat use anyhow::Context; use clap::{Parser, ValueEnum}; use librqbit::{ - http_api::{ApiAddTorrentResponse, HttpApi}, + api::ApiAddTorrentResponse, + http_api::HttpApi, http_api_client, peer_connection::PeerConnectionOptions, session::{