diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 7e0085b..edd4da7 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,7 +1,6 @@ use anyhow::Context; use axum::extract::{Path, Query, State}; use axum::http::StatusCode; -use axum::response::IntoResponse; use buffers::ByteString; use dht::{Dht, DhtStats}; use librqbit_core::id20::Id20; @@ -13,120 +12,118 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; -use axum::{response, routing, Router}; +use axum::{routing, Router}; +use crate::http_api_error::{ApiError, WithErrorStatus}; use crate::session::{AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session}; use crate::torrent_manager::TorrentManagerHandle; use crate::torrent_state::StatsSnapshot; +// Public API +#[derive(Clone)] +pub struct HttpApi { + inner: Arc, +} + +impl HttpApi { + pub fn new(session: Arc) -> Self { + Self { + inner: Arc::new(ApiInternal::new(session)), + } + } + pub fn add_torrent_handle(&self, handle: TorrentManagerHandle) -> usize { + self.inner.add_torrent_handle(handle) + } + + pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { + let state = self.inner; + let api_description_body = serde_json::json!({ + "apis": { + "GET /": "list all available APIs", + "GET /dht/stats": "DHT stats", + "GET /dht/table": "DHT routing table", + "GET /torrents": "List torrents (default torrent is 0)", + "GET /torrents/{index}": "Torrent details", + "GET /torrents/{index}/haves": "The bitfield of have pieces", + "GET /torrents/{index}/stats": "Torrent stats", + // This is kind of not secure as it just reads any local file that it has access to, + // or any URL, but whatever, ok for our purposes / threat model. + "POST /torrents": "Add a torrent here. magnet: or http:// or a local file." + }, + "server": "rqbit", + }); + + let app = Router::new() + .route( + "/", + routing::get(move || async move { axum::Json(api_description_body) }), + ) + .route( + "/dht/stats", + routing::get(|State(state): State| async move { + state.api_dht_stats().map(axum::Json) + }), + ) + .route( + "/dht/table", + routing::get(|State(state): State| async move { + state.api_dht_table().map(axum::Json) + }), + ) + .route( + "/torrents", + routing::get(|State(state): State| async move { + axum::Json(state.api_torrent_list()) + }), + ) + .route( + "/torrents", + routing::post( + |State(state): State, + Query(params): Query, + url: String| async move { + let opts = params.into_add_torrent_options(); + state.api_add_torrent(url, Some(opts)).await.map(axum::Json) + }, + ), + ) + .route( + "/torrents/:id", + routing::get( + |State(state): State, Path(idx): Path| async move { + state.api_torrent_details(idx).map(axum::Json) + }, + ), + ) + .route( + "/torrents/:id/haves", + routing::get( + |State(state): State, Path(idx): Path| async move { + state.api_dump_haves(idx) + }, + ), + ) + .route( + "/torrents/:id/stats", + routing::get( + |State(state): State, Path(idx): Path| async move { + state.api_stats(idx).map(axum::Json) + }, + ), + ) + .with_state(state); + + log::info!("starting HTTP server on {}", addr); + axum::Server::try_bind(&addr) + .with_context(|| format!("error binding to {addr}"))? + .serve(app.into_make_service()) + .await?; + Ok(()) + } +} + type Result = std::result::Result; -// Private HTTP API internals. -pub struct ApiInternal { - dht: Option, - startup_time: Instant, - torrent_managers: RwLock>, - session: Arc, -} - -// Convenience error type. -#[derive(Debug)] -struct ApiError { - status: Option, - kind: ApiErrorKind, -} - -impl ApiError { - const fn torrent_not_found(torrent_id: usize) -> Self { - Self { - status: Some(StatusCode::NOT_FOUND), - kind: ApiErrorKind::TorrentNotFound(torrent_id), - } - } - const fn dht_disabled() -> Self { - Self { - status: Some(StatusCode::NOT_FOUND), - kind: ApiErrorKind::DhtDisabled, - } - } - fn with_status(self, status: StatusCode) -> Self { - Self { - status: Some(status), - kind: self.kind, - } - } -} - -#[derive(Debug)] -enum ApiErrorKind { - TorrentNotFound(usize), - DhtDisabled, - Other(anyhow::Error), -} - -impl From for ApiError { - fn from(value: anyhow::Error) -> Self { - Self { - status: None, - kind: ApiErrorKind::Other(value), - } - } -} - -impl From<(StatusCode, anyhow::Error)> for ApiError { - fn from((code, err): (StatusCode, anyhow::Error)) -> Self { - ApiError::from(err).with_status(code) - } -} - -impl std::error::Error for ApiError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match &self.kind { - ApiErrorKind::Other(err) => err.source(), - _ => None, - } - } -} - -impl std::fmt::Display for ApiError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind { - ApiErrorKind::TorrentNotFound(idx) => write!(f, "torrent {idx} not found"), - ApiErrorKind::Other(err) => write!(f, "{err:?}"), - ApiErrorKind::DhtDisabled => write!(f, "DHT is disabled"), - } - } -} - -impl IntoResponse for ApiError { - fn into_response(self) -> response::Response { - let mut response = format!("{self}").into_response(); - *response.status_mut() = match self.status { - Some(s) => s, - None => StatusCode::INTERNAL_SERVER_ERROR, - }; - response - } -} - -impl ApiInternal { - fn new(session: Arc) -> Self { - Self { - dht: session.get_dht(), - startup_time: Instant::now(), - torrent_managers: RwLock::new(Vec::new()), - session, - } - } - - fn add_torrent_handle(&self, handle: TorrentManagerHandle) -> usize { - let mut g = self.torrent_managers.write(); - let idx = g.len(); - g.push(handle); - idx - } -} - #[derive(Serialize)] struct Speed { mbps: f64, @@ -187,38 +184,55 @@ pub struct ApiAddTorrentResponse { pub details: TorrentDetailsResponse, } -fn make_torrent_details( - info_hash: &Id20, - info: &TorrentMetaV1Info, - only_files: Option<&[usize]>, -) -> Result { - let files = info - .iter_filenames_and_lengths() - .context("error iterating filenames and lengths")? - .enumerate() - .map(|(idx, (filename_it, length))| { - let name = match filename_it.to_string() { - Ok(s) => s, - Err(err) => { - warn!("error reading filename: {:?}", err); - "".to_string() - } - }; - let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true); - TorrentDetailsResponseFile { - name, - length, - included, - } - }) - .collect(); - Ok(TorrentDetailsResponse { - info_hash: info_hash.as_string(), - files, - }) +#[derive(Serialize, Deserialize)] +pub struct TorrentAddQueryParams { + pub overwrite: Option, + pub output_folder: Option, + pub sub_folder: Option, + pub only_files_regex: Option, + pub list_only: Option, } +impl TorrentAddQueryParams { + fn into_add_torrent_options(self) -> AddTorrentOptions { + AddTorrentOptions { + overwrite: self.overwrite.unwrap_or(false), + only_files_regex: self.only_files_regex, + output_folder: self.output_folder, + sub_folder: self.sub_folder, + list_only: self.list_only.unwrap_or(false), + ..Default::default() + } + } +} + +// Private HTTP API internals. Agnostic of web framework. +pub struct ApiInternal { + dht: Option, + startup_time: Instant, + torrent_managers: RwLock>, + session: Arc, +} + +type ApiState = Arc; + impl ApiInternal { + fn new(session: Arc) -> Self { + Self { + dht: session.get_dht(), + startup_time: Instant::now(), + torrent_managers: RwLock::new(Vec::new()), + session, + } + } + + fn add_torrent_handle(&self, handle: TorrentManagerHandle) -> usize { + let mut g = self.torrent_managers.write(); + let idx = g.len(); + g.push(handle); + idx + } + fn mgr_handle(&self, idx: usize) -> Result { self.torrent_managers .read() @@ -258,17 +272,16 @@ impl ApiInternal { .session .add_torrent(&url, opts) .await - .context("error adding torrent")? + .context("error adding torrent") + .with_error_status_code(StatusCode::BAD_REQUEST)? { AddTorrentResponse::AlreadyManaged(managed) => { - return Err(ApiError::from(( - StatusCode::CONFLICT, - anyhow::anyhow!( - "{:?} is already managed, downloaded to {:?}", - managed.info_hash, - managed.output_folder - ), - ))); + return Err(anyhow::anyhow!( + "{:?} is already managed, downloaded to {:?}", + managed.info_hash, + managed.output_folder + )) + .with_error_status_code(StatusCode::CONFLICT); } AddTorrentResponse::ListOnly(ListOnlyResponse { info_hash, @@ -296,8 +309,16 @@ impl ApiInternal { Ok(response) } - fn api_dht_stats(&self) -> Option { - self.dht.as_ref().map(|d| d.stats()) + fn api_dht_stats(&self) -> Result { + self.dht + .as_ref() + .map(|d| d.stats()) + .ok_or(ApiError::dht_disabled()) + } + + fn api_dht_table(&self) -> Result { + let dht = self.dht.as_ref().ok_or(ApiError::dht_disabled())?; + Ok(dht.with_routing_table(|r| r.clone())) } fn api_stats(&self, idx: usize) -> Result { @@ -328,128 +349,33 @@ impl ApiInternal { } } -type ApiState = Arc; - -#[derive(Clone)] -pub struct HttpApi { - inner: Arc, -} - -#[derive(Serialize, Deserialize)] -pub struct TorrentAddQueryParams { - pub overwrite: Option, - pub output_folder: Option, - pub sub_folder: Option, - pub only_files_regex: Option, - pub list_only: Option, -} - -async fn axum_post_torrent( - State(state): State, - Query(params): Query, - url: String, -) -> Result> { - let opts = AddTorrentOptions { - overwrite: params.overwrite.unwrap_or(false), - only_files_regex: params.only_files_regex, - output_folder: params.output_folder, - sub_folder: params.sub_folder, - list_only: params.list_only.unwrap_or(false), - ..Default::default() - }; - state - .api_add_torrent(url, Some(opts)) - .await - .map(axum::Json) - .map_err(|e| e.with_status(StatusCode::BAD_REQUEST)) -} - -// Public API -impl HttpApi { - pub fn new(session: Arc) -> Self { - Self { - inner: Arc::new(ApiInternal::new(session)), - } - } - pub fn add_torrent_handle(&self, handle: TorrentManagerHandle) -> usize { - self.inner.add_torrent_handle(handle) - } - - pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { - let state = self.inner; - let api_description_body = serde_json::json!({ - "apis": { - "GET /": "list all available APIs", - "GET /dht/stats": "DHT stats", - "GET /dht/table": "DHT routing table", - "GET /torrents": "List torrents (default torrent is 0)", - "GET /torrents/{index}": "Torrent details", - "GET /torrents/{index}/haves": "The bitfield of have pieces", - "GET /torrents/{index}/stats": "Torrent stats", - // This is kind of not secure as it just reads any local file that it has access to, - // or any URL, but whatever, ok for our purposes / threat model. - "POST /torrents": "Add a torrent here. magnet: or http:// or a local file." - }, - "server": "rqbit", - }); - - let app = Router::new() - .route( - "/", - routing::get(move || async move { axum::Json(api_description_body) }), - ) - .route( - "/dht/stats", - routing::get(|State(state): State| async move { - let dht_stats = state.api_dht_stats().ok_or(ApiError::dht_disabled())?; - Ok::<_, ApiError>(axum::Json(dht_stats)) - }), - ) - .route( - "/dht/table", - routing::get(|State(state): State| async move { - let dht = state.dht.as_ref().ok_or(ApiError::dht_disabled())?; - Ok::<_, ApiError>(dht.with_routing_table(|r| axum::Json(r.clone()))) - }), - ) - .route( - "/torrents", - routing::get(move |State(state): State| async move { - axum::Json(state.api_torrent_list()) - }), - ) - .route("/torrents", routing::post(axum_post_torrent)) - .route( - "/torrents/:id", - routing::get( - |State(state): State, Path(idx): Path| async move { - state.api_torrent_details(idx).map(axum::Json) - }, - ), - ) - .route( - "/torrents/:id/haves", - routing::get( - |State(state): State, Path(idx): Path| async move { - state.api_dump_haves(idx) - }, - ), - ) - .route( - "/torrents/:id/stats", - routing::get( - |State(state): State, Path(idx): Path| async move { - state.api_stats(idx).map(axum::Json) - }, - ), - ) - .with_state(state); - - log::info!("starting HTTP server on {}", addr); - axum::Server::try_bind(&addr) - .with_context(|| format!("error binding to {addr}"))? - .serve(app.into_make_service()) - .await?; - Ok(()) - } +fn make_torrent_details( + info_hash: &Id20, + info: &TorrentMetaV1Info, + only_files: Option<&[usize]>, +) -> Result { + let files = info + .iter_filenames_and_lengths() + .context("error iterating filenames and lengths")? + .enumerate() + .map(|(idx, (filename_it, length))| { + let name = match filename_it.to_string() { + Ok(s) => s, + Err(err) => { + warn!("error reading filename: {:?}", err); + "".to_string() + } + }; + let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true); + TorrentDetailsResponseFile { + name, + length, + included, + } + }) + .collect(); + Ok(TorrentDetailsResponse { + info_hash: info_hash.as_string(), + files, + }) } diff --git a/crates/librqbit/src/http_api_error.rs b/crates/librqbit/src/http_api_error.rs new file mode 100644 index 0000000..044596e --- /dev/null +++ b/crates/librqbit/src/http_api_error.rs @@ -0,0 +1,91 @@ +use axum::{ + http::StatusCode, + response::{IntoResponse, Response}, +}; + +// Convenience error type. +#[derive(Debug)] +pub struct ApiError { + status: Option, + kind: ApiErrorKind, +} + +impl ApiError { + pub const fn torrent_not_found(torrent_id: usize) -> Self { + Self { + status: Some(StatusCode::NOT_FOUND), + kind: ApiErrorKind::TorrentNotFound(torrent_id), + } + } + pub const fn dht_disabled() -> Self { + Self { + status: Some(StatusCode::NOT_FOUND), + kind: ApiErrorKind::DhtDisabled, + } + } + pub fn with_status(self, status: StatusCode) -> Self { + Self { + status: Some(status), + kind: self.kind, + } + } +} + +#[derive(Debug)] +enum ApiErrorKind { + TorrentNotFound(usize), + DhtDisabled, + Other(anyhow::Error), +} + +impl From for ApiError { + fn from(value: anyhow::Error) -> Self { + Self { + status: None, + kind: ApiErrorKind::Other(value), + } + } +} + +impl std::error::Error for ApiError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match &self.kind { + ApiErrorKind::Other(err) => err.source(), + _ => None, + } + } +} + +impl std::fmt::Display for ApiError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.kind { + ApiErrorKind::TorrentNotFound(idx) => write!(f, "torrent {idx} not found"), + ApiErrorKind::Other(err) => write!(f, "{err:?}"), + ApiErrorKind::DhtDisabled => write!(f, "DHT is disabled"), + } + } +} + +impl IntoResponse for ApiError { + fn into_response(self) -> Response { + let mut response = format!("{self}").into_response(); + *response.status_mut() = match self.status { + Some(s) => s, + None => StatusCode::INTERNAL_SERVER_ERROR, + }; + response + } +} + +pub trait WithErrorStatus { + fn with_error_status_code(self, s: StatusCode) -> Result; +} + +impl WithErrorStatus for std::result::Result +where + E: Into, +{ + fn with_error_status_code(self, s: StatusCode) -> Result { + self.map_err(|e| e.into().with_status(s)) + } +} diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 8a320a0..ecf4ef1 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -3,6 +3,7 @@ pub mod dht_utils; pub mod file_ops; pub mod http_api; pub mod http_api_client; +mod http_api_error; pub mod peer_connection; pub mod peer_handler; pub mod peer_info_reader;