use anyhow::Context; use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; use axum::routing::{get, post}; use buffers::ByteString; use dht::DhtStats; use http::StatusCode; use itertools::Itertools; use librqbit_core::id20::Id20; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tracing::{info, warn}; use axum::Router; use crate::http_api_error::{ApiError, ApiErrorExt}; use crate::peer_connection::PeerConnectionOptions; use crate::session::{ AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId, SUPPORTED_SCHEMES, }; use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}; use crate::torrent_state::stats::{LiveStats, TorrentStats}; use crate::torrent_state::ManagedTorrentHandle; // Public API #[derive(Clone)] pub struct HttpApi { inner: Arc, } impl HttpApi { pub fn new(session: Arc, rust_log_reload_tx: Option>) -> Self { Self { inner: Arc::new(Api::new(session, rust_log_reload_tx)), } } pub async fn make_http_api_and_run( self, addr: SocketAddr, read_only: bool, ) -> anyhow::Result<()> { let state = self.inner; async fn api_root() -> impl IntoResponse { axum::Json(serde_json::json!({ "apis": { "GET /": "list all available APIs", "GET /dht/stats": "DHT stats", "GET /dht/table": "DHT routing table", "GET /torrents": "List torrents (default torrent is 0)", "GET /torrents/{index}": "Torrent details", "GET /torrents/{index}/haves": "The bitfield of have pieces", "GET /torrents/{index}/stats/v1": "Torrent stats", "GET /torrents/{index}/peer_stats": "Per peer stats", "POST /torrents/{index}/pause": "Pause torrent", "POST /torrents/{index}/start": "Resume torrent", "POST /torrents/{index}/forget": "Forget about the torrent, keep the files", "POST /torrents/{index}/delete": "Forget about the torrent, remove the files", "POST /torrents": "Add a torrent here. magnet: or http:// or a local file.", "POST /rust_log": "Set RUST_LOG to this post launch (for debugging)", "GET /web/": "Web UI", }, "server": "rqbit", })) } async fn dht_stats(State(state): State) -> Result { state.api_dht_stats().map(axum::Json) } async fn dht_table(State(state): State) -> Result { state.api_dht_table().map(axum::Json) } async fn torrents_list(State(state): State) -> impl IntoResponse { axum::Json(state.api_torrent_list()) } async fn torrents_post( State(state): State, Query(params): Query, data: Bytes, ) -> Result { let is_url = params.is_url; let opts = params.into_add_torrent_options(); let data = data.to_vec(); let add = match is_url { Some(true) => AddTorrent::Url( String::from_utf8(data) .context("invalid utf-8 for passed URL")? .into(), ), Some(false) => AddTorrent::TorrentFileBytes(data.into()), // Guess the format. None if SUPPORTED_SCHEMES .iter() .any(|s| data.starts_with(s.as_bytes())) => { AddTorrent::Url( String::from_utf8(data) .context("invalid utf-8 for passed URL")? .into(), ) } _ => AddTorrent::TorrentFileBytes(data.into()), }; state.api_add_torrent(add, Some(opts)).await.map(axum::Json) } async fn torrent_details( State(state): State, Path(idx): Path, ) -> Result { state.api_torrent_details(idx).map(axum::Json) } async fn torrent_haves( State(state): State, Path(idx): Path, ) -> Result { state.api_dump_haves(idx) } async fn torrent_stats_v0( State(state): State, Path(idx): Path, ) -> Result { state.api_stats_v0(idx).map(axum::Json) } async fn torrent_stats_v1( State(state): State, Path(idx): Path, ) -> Result { state.api_stats_v1(idx).map(axum::Json) } async fn peer_stats( State(state): State, Path(idx): Path, Query(filter): Query, ) -> Result { state.api_peer_stats(idx, filter).map(axum::Json) } async fn torrent_action_pause( State(state): State, Path(idx): Path, ) -> Result { state.api_torrent_action_pause(idx).map(axum::Json) } async fn torrent_action_start( State(state): State, Path(idx): Path, ) -> Result { state.api_torrent_action_start(idx).map(axum::Json) } async fn torrent_action_forget( State(state): State, Path(idx): Path, ) -> Result { state.api_torrent_action_forget(idx).map(axum::Json) } async fn torrent_action_delete( State(state): State, Path(idx): Path, ) -> Result { state.api_torrent_action_delete(idx).map(axum::Json) } async fn set_rust_log( State(state): State, new_value: String, ) -> Result { state.api_set_rust_log(new_value).map(axum::Json) } let mut app = Router::new() .route("/", get(api_root)) .route("/rust_log", post(set_rust_log)) .route("/dht/stats", get(dht_stats)) .route("/dht/table", get(dht_table)) .route("/torrents", get(torrents_list)) .route("/torrents/:id", get(torrent_details)) .route("/torrents/:id/haves", get(torrent_haves)) .route("/torrents/:id/stats", get(torrent_stats_v0)) .route("/torrents/:id/stats/v1", get(torrent_stats_v1)) .route("/torrents/:id/peer_stats", get(peer_stats)); if !read_only { app = app .route("/torrents", post(torrents_post)) .route("/torrents/:id/pause", post(torrent_action_pause)) .route("/torrents/:id/start", post(torrent_action_start)) .route("/torrents/:id/forget", post(torrent_action_forget)) .route("/torrents/:id/delete", post(torrent_action_delete)); } #[cfg(feature = "webui")] { let webui_router = Router::new() .route( "/", get(|| async { ( [("Content-Type", "text/html")], include_str!("../webui/dist/index.html"), ) }), ) .route( "/assets/index.js", get(|| async { ( [("Content-Type", "application/javascript")], include_str!("../webui/dist/assets/index.js"), ) }), ) .route( "/assets/logo.svg", get(|| async { ( [("Content-Type", "image/svg+xml")], include_str!("../webui/dist/assets/logo.svg"), ) }), ); // This is to develop webui by just doing "open index.html && tsc --watch" let cors_layer = std::env::var("CORS_DEBUG") .ok() .map(|_| { use tower_http::cors::{AllowHeaders, AllowOrigin}; warn!("CorsLayer: allowing everything because CORS_DEBUG is set"); tower_http::cors::CorsLayer::default() .allow_origin(AllowOrigin::predicate(|_, _| true)) .allow_headers(AllowHeaders::any()) }) .unwrap_or_default(); app = app.nest("/web/", webui_router).layer(cors_layer); } let app = app .layer(tower_http::trace::TraceLayer::new_for_http()) .with_state(state) .into_make_service(); info!("starting HTTP server on {}", addr); use tokio::net::TcpListener; let listener = TcpListener::bind(&addr) .await .with_context(|| format!("error binding to {addr}"))?; axum::serve(listener, app).await?; Ok(()) } } type Result = std::result::Result; #[derive(Serialize)] pub struct TorrentListResponseItem { pub id: usize, pub info_hash: String, } #[derive(Serialize)] pub struct TorrentListResponse { pub torrents: Vec, } #[derive(Serialize, Deserialize)] pub struct TorrentDetailsResponseFile { pub name: String, pub length: u64, pub included: bool, } #[derive(Default, Serialize)] pub struct EmptyJsonResponse {} #[derive(Serialize, Deserialize)] pub struct TorrentDetailsResponse { pub info_hash: String, pub files: Vec, } #[derive(Serialize, Deserialize)] pub struct ApiAddTorrentResponse { pub id: Option, pub details: TorrentDetailsResponse, pub seen_peers: Option>, } pub struct OnlyFiles(Vec); pub struct InitialPeers(pub Vec); #[derive(Serialize, Deserialize, Default)] pub struct TorrentAddQueryParams { pub overwrite: Option, pub output_folder: Option, pub sub_folder: Option, pub only_files_regex: Option, pub only_files: Option, pub peer_connect_timeout: Option, pub peer_read_write_timeout: Option, pub initial_peers: Option, // Will force interpreting the content as a URL. pub is_url: Option, pub list_only: Option, } impl Serialize for OnlyFiles { fn serialize(&self, serializer: S) -> core::result::Result where S: serde::Serializer, { let s = self.0.iter().map(|id| id.to_string()).join(","); s.serialize(serializer) } } impl<'de> Deserialize<'de> for OnlyFiles { fn deserialize(deserializer: D) -> core::result::Result where D: serde::Deserializer<'de>, { use serde::de::Error; let s = String::deserialize(deserializer)?; let list = s .split(',') .try_fold(Vec::::new(), |mut acc, c| match c.parse() { Ok(i) => { acc.push(i); Ok(acc) } Err(_) => Err(D::Error::custom(format!( "only_files: failed to parse {:?} as integer", c ))), })?; if list.is_empty() { return Err(D::Error::custom( "only_files: should contain at least one file id", )); } Ok(OnlyFiles(list)) } } impl<'de> Deserialize<'de> for InitialPeers { fn deserialize(deserializer: D) -> std::prelude::v1::Result where D: serde::Deserializer<'de>, { use serde::de::Error; let string = String::deserialize(deserializer)?; let mut addrs = Vec::new(); for addr_str in string.split(',').filter(|s| !s.is_empty()) { addrs.push(SocketAddr::from_str(addr_str).map_err(D::Error::custom)?); } Ok(InitialPeers(addrs)) } } impl Serialize for InitialPeers { fn serialize(&self, serializer: S) -> std::prelude::v1::Result where S: serde::Serializer, { self.0 .iter() .map(|s| s.to_string()) .join(",") .serialize(serializer) } } impl TorrentAddQueryParams { pub fn into_add_torrent_options(self) -> AddTorrentOptions { AddTorrentOptions { overwrite: self.overwrite.unwrap_or(false), only_files_regex: self.only_files_regex, only_files: self.only_files.map(|o| o.0), output_folder: self.output_folder, sub_folder: self.sub_folder, list_only: self.list_only.unwrap_or(false), initial_peers: self.initial_peers.map(|i| i.0), peer_opts: Some(PeerConnectionOptions { connect_timeout: self.peer_connect_timeout.map(Duration::from_secs), read_write_timeout: self.peer_read_write_timeout.map(Duration::from_secs), ..Default::default() }), ..Default::default() } } } // Private HTTP API internals. Agnostic of web framework. pub struct Api { session: Arc, rust_log_reload_tx: Option>, } type ApiState = Arc; impl Api { pub fn new(session: Arc, rust_log_reload_tx: Option>) -> Self { Self { session, rust_log_reload_tx, } } pub fn mgr_handle(&self, idx: TorrentId) -> Result { self.session .get(idx) .ok_or(ApiError::torrent_not_found(idx)) } pub fn api_torrent_list(&self) -> TorrentListResponse { let items = self.session.with_torrents(|torrents| { torrents .map(|(id, mgr)| TorrentListResponseItem { id, info_hash: mgr.info().info_hash.as_string(), }) .collect() }); TorrentListResponse { torrents: items } } pub fn api_torrent_details(&self, idx: TorrentId) -> Result { let handle = self.mgr_handle(idx)?; let info_hash = handle.info().info_hash; let only_files = handle.only_files(); make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref()) } pub fn api_peer_stats( &self, idx: TorrentId, filter: PeerStatsFilter, ) -> Result { let handle = self.mgr_handle(idx)?; Ok(handle .live() .context("not live")? .per_peer_stats_snapshot(filter)) } pub fn api_torrent_action_pause(&self, idx: TorrentId) -> Result { let handle = self.mgr_handle(idx)?; handle .pause() .context("error pausing torrent") .with_error_status_code(StatusCode::BAD_REQUEST)?; Ok(Default::default()) } pub fn api_torrent_action_start(&self, idx: TorrentId) -> Result { let handle = self.mgr_handle(idx)?; self.session .unpause(&handle) .context("error unpausing torrent") .with_error_status_code(StatusCode::BAD_REQUEST)?; Ok(Default::default()) } pub fn api_torrent_action_forget(&self, idx: TorrentId) -> Result { self.session .delete(idx, false) .context("error forgetting torrent")?; Ok(Default::default()) } pub fn api_torrent_action_delete(&self, idx: TorrentId) -> Result { self.session .delete(idx, true) .context("error deleting torrent with files")?; Ok(Default::default()) } pub fn api_set_rust_log(&self, new_value: String) -> Result { let tx = self .rust_log_reload_tx .as_ref() .context("rust_log_reload_tx was not set")?; tx.send(new_value) .context("noone is listening to RUST_LOG changes")?; Ok(Default::default()) } pub async fn api_add_torrent( &self, add: AddTorrent<'_>, opts: Option, ) -> Result { let response = match self .session .add_torrent(add, opts) .await .context("error adding torrent") .with_error_status_code(StatusCode::BAD_REQUEST)? { AddTorrentResponse::AlreadyManaged(id, managed) => { return Err(anyhow::anyhow!( "{:?} is already managed, id={}, downloaded to {:?}", managed.info_hash(), id, &managed.info().out_dir )) .with_error_status_code(StatusCode::CONFLICT); } AddTorrentResponse::ListOnly(ListOnlyResponse { info_hash, info, only_files, seen_peers, }) => ApiAddTorrentResponse { id: None, seen_peers: Some(seen_peers), details: make_torrent_details(&info_hash, &info, only_files.as_deref()) .context("error making torrent details")?, }, AddTorrentResponse::Added(id, handle) => { let details = make_torrent_details( &handle.info_hash(), &handle.info().info, handle.only_files().as_deref(), ) .context("error making torrent details")?; ApiAddTorrentResponse { id: Some(id), details, seen_peers: None, } } }; Ok(response) } pub fn api_dht_stats(&self) -> Result { self.session .get_dht() .as_ref() .map(|d| d.stats()) .ok_or(ApiError::dht_disabled()) } pub fn api_dht_table(&self) -> Result { let dht = self.session.get_dht().ok_or(ApiError::dht_disabled())?; Ok(dht.with_routing_table(|r| r.clone())) } pub fn api_stats_v0(&self, idx: TorrentId) -> Result { let mgr = self.mgr_handle(idx)?; let live = mgr.live().context("torrent not live")?; Ok(LiveStats::from(&*live)) } pub fn api_stats_v1(&self, idx: TorrentId) -> Result { let mgr = self.mgr_handle(idx)?; Ok(mgr.stats()) } pub fn api_dump_haves(&self, idx: usize) -> Result { let mgr = self.mgr_handle(idx)?; Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?) } } fn make_torrent_details( info_hash: &Id20, info: &TorrentMetaV1Info, only_files: Option<&[usize]>, ) -> Result { let files = info .iter_filenames_and_lengths() .context("error iterating filenames and lengths")? .enumerate() .map(|(idx, (filename_it, length))| { let name = match filename_it.to_string() { Ok(s) => s, Err(err) => { warn!("error reading filename: {:?}", err); "".to_string() } }; let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true); TorrentDetailsResponseFile { name, length, included, } }) .collect(); Ok(TorrentDetailsResponse { info_hash: info_hash.as_string(), files, }) }