From c8d4c8d7136493820c08eb85d870f23550537062 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 14 Jan 2025 09:50:18 +0000 Subject: [PATCH] HTTP API: move functions out to global scope --- crates/librqbit/src/http_api.rs | 854 ++++++++++++++++---------------- 1 file changed, 426 insertions(+), 428 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index b7fa8c0..3ee393a 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -144,6 +144,432 @@ mod timeout { use timeout::Timeout; +async fn dht_stats(State(state): State) -> Result { + state.api_dht_stats().map(axum::Json) +} + +async fn dht_table(State(state): State) -> Result { + state.api_dht_table().map(axum::Json) +} + +async fn session_stats(State(state): State) -> impl IntoResponse { + axum::Json(state.api_session_stats()) +} + +async fn torrents_list( + State(state): State, + Query(opts): Query, +) -> impl IntoResponse { + axum::Json(state.api_torrent_list_ext(opts)) +} + +async fn torrents_post( + State(state): State, + Query(params): Query, + Timeout(timeout): Timeout<600_000, 3_600_000>, + data: Bytes, +) -> Result { + let is_url = params.is_url; + let opts = params.into_add_torrent_options(); + let data = data.to_vec(); + let maybe_magnet = |data: &[u8]| -> bool { + std::str::from_utf8(data) + .ok() + .and_then(|s| Magnet::parse(s).ok()) + .is_some() + }; + let add = match is_url { + Some(true) => AddTorrent::Url( + String::from_utf8(data) + .context("invalid utf-8 for passed URL")? + .into(), + ), + Some(false) => AddTorrent::TorrentFileBytes(data.into()), + + // Guess the format. + None if SUPPORTED_SCHEMES + .iter() + .any(|s| data.starts_with(s.as_bytes())) + || maybe_magnet(&data) => + { + AddTorrent::Url( + String::from_utf8(data) + .context("invalid utf-8 for passed URL")? + .into(), + ) + } + _ => AddTorrent::TorrentFileBytes(data.into()), + }; + tokio::time::timeout(timeout, state.api_add_torrent(add, Some(opts))) + .await + .context("timeout")? + .map(axum::Json) +} + +async fn torrent_details( + State(state): State, + Path(idx): Path, +) -> Result { + state.api_torrent_details(idx).map(axum::Json) +} + +fn torrent_playlist_items(handle: &ManagedTorrent) -> Result> { + let mut playlist_items = handle + .metadata + .load() + .as_ref() + .context("torrent metadata not resolved")? + .info + .iter_file_details()? + .enumerate() + .filter_map(|(file_idx, file_details)| { + let filename = file_details.filename.to_vec().ok()?.join("/"); + let is_playable = mime_guess::from_path(&filename) + .first() + .map(|mime| { + mime.type_() == mime_guess::mime::VIDEO + || mime.type_() == mime_guess::mime::AUDIO + }) + .unwrap_or(false); + if is_playable { + let filename = urlencoding::encode(&filename); + Some((file_idx, filename.into_owned())) + } else { + None + } + }) + .collect::>(); + playlist_items.sort_by(|left, right| left.1.cmp(&right.1)); + Ok(playlist_items) +} + +fn get_host(headers: &HeaderMap) -> Result<&str> { + Ok(headers + .get("host") + .ok_or_else(|| ApiError::new_from_text(StatusCode::BAD_REQUEST, "Missing host header"))? + .to_str() + .context("hostname is not string")?) +} + +fn build_playlist_content( + host: &str, + it: impl IntoIterator, +) -> impl IntoResponse { + let body = it + .into_iter() + .map(|(torrent_idx, file_idx, filename)| { + // TODO: add #EXTINF:{duration} and maybe codecs ? + format!("http://{host}/torrents/{torrent_idx}/stream/{file_idx}/{filename}") + }) + .join("\r\n"); + ( + [ + ("Content-Type", "application/mpegurl; charset=utf-8"), + ( + "Content-Disposition", + "attachment; filename=\"rqbit-playlist.m3u8\"", + ), + ], + format!("#EXTM3U\r\n{body}"), // https://en.wikipedia.org/wiki/M3U + ) +} + +async fn resolve_magnet( + State(state): State, + Timeout(timeout): Timeout<600_000, 3_600_000>, + inp_headers: HeaderMap, + url: String, +) -> Result { + let added = tokio::time::timeout( + timeout, + state.session().add_torrent( + AddTorrent::from_url(&url), + Some(AddTorrentOptions { + list_only: true, + ..Default::default() + }), + ), + ) + .await + .context("timeout")??; + + let (info, content) = match added { + crate::AddTorrentResponse::AlreadyManaged(_, handle) => { + handle.with_metadata(|r| (r.info.clone(), r.torrent_bytes.clone()))? + } + crate::AddTorrentResponse::ListOnly(ListOnlyResponse { + info, + torrent_bytes, + .. + }) => (info, torrent_bytes), + crate::AddTorrentResponse::Added(_, _) => { + return Err(ApiError::new_from_text( + StatusCode::INTERNAL_SERVER_ERROR, + "bug: torrent was added to session, but shouldn't have been", + )) + } + }; + + let mut headers = HeaderMap::new(); + + if inp_headers + .get("Accept") + .and_then(|v| std::str::from_utf8(v.as_bytes()).ok()) + == Some("application/json") + { + let data = bencode::dyn_from_bytes::>(&content) + .context("error decoding .torrent file content")?; + let data = serde_json::to_string(&data).context("error serializing")?; + headers.insert("Content-Type", HeaderValue::from_static("application/json")); + return Ok((headers, data).into_response()); + } + + headers.insert( + "Content-Type", + HeaderValue::from_static("application/x-bittorrent"), + ); + + if let Some(name) = info.name.as_ref() { + if let Ok(name) = std::str::from_utf8(name) { + if let Ok(h) = + HeaderValue::from_str(&format!("attachment; filename=\"{}.torrent\"", name)) + { + headers.insert("Content-Disposition", h); + } + } + } + Ok((headers, content).into_response()) +} + +async fn torrent_playlist( + State(state): State, + headers: HeaderMap, + Path(idx): Path, +) -> Result { + let host = get_host(&headers)?; + let playlist_items = torrent_playlist_items(&*state.mgr_handle(idx)?)?; + Ok(build_playlist_content( + host, + playlist_items + .into_iter() + .map(move |(file_idx, filename)| (idx, file_idx, filename)), + )) +} + +async fn global_playlist( + State(state): State, + headers: HeaderMap, +) -> Result { + let host = get_host(&headers)?; + let all_items = state.session().with_torrents(|torrents| { + torrents + .filter_map(|(torrent_idx, handle)| { + torrent_playlist_items(handle) + .map(move |items| { + items.into_iter().map(move |(file_idx, filename)| { + (torrent_idx.into(), file_idx, filename) + }) + }) + .ok() + }) + .flatten() + .collect::>() + }); + Ok(build_playlist_content(host, all_items)) +} + +async fn torrent_haves( + State(state): State, + Path(idx): Path, +) -> Result { + state.api_dump_haves(idx) +} + +async fn torrent_stats_v0( + State(state): State, + Path(idx): Path, +) -> Result { + state.api_stats_v0(idx).map(axum::Json) +} + +async fn torrent_stats_v1( + State(state): State, + Path(idx): Path, +) -> Result { + state.api_stats_v1(idx).map(axum::Json) +} + +async fn peer_stats( + State(state): State, + Path(idx): Path, + Query(filter): Query, +) -> Result { + state.api_peer_stats(idx, filter).map(axum::Json) +} + +#[derive(Deserialize)] +struct StreamPathParams { + id: TorrentIdOrHash, + file_id: usize, + #[serde(rename = "filename")] + _filename: Option>, +} + +async fn torrent_stream_file( + State(state): State, + Path(StreamPathParams { id, file_id, .. }): Path, + headers: http::HeaderMap, +) -> Result { + let mut stream = state.api_stream(id, file_id)?; + let mut status = StatusCode::OK; + let mut output_headers = HeaderMap::new(); + output_headers.insert("Accept-Ranges", HeaderValue::from_static("bytes")); + + const DLNA_TRANSFER_MODE: &str = "transferMode.dlna.org"; + const DLNA_GET_CONTENT_FEATURES: &str = "getcontentFeatures.dlna.org"; + const DLNA_CONTENT_FEATURES: &str = "contentFeatures.dlna.org"; + + if headers + .get(DLNA_TRANSFER_MODE) + .map(|v| matches!(v.as_bytes(), b"Streaming" | b"streaming")) + .unwrap_or(false) + { + output_headers.insert(DLNA_TRANSFER_MODE, HeaderValue::from_static("Streaming")); + } + + if headers + .get(DLNA_GET_CONTENT_FEATURES) + .map(|v| v.as_bytes() == b"1") + .unwrap_or(false) + { + output_headers.insert( + DLNA_CONTENT_FEATURES, + HeaderValue::from_static("DLNA.ORG_OP=01"), + ); + } + + if let Ok(mime) = state.torrent_file_mime_type(id, file_id) { + output_headers.insert( + http::header::CONTENT_TYPE, + HeaderValue::from_str(mime).context("bug - invalid MIME")?, + ); + } + + let range_header = headers.get(http::header::RANGE); + trace!(torrent_id=%id, file_id=file_id, range=?range_header, "request for HTTP stream"); + + if let Some(range) = range_header { + let offset: Option = range + .to_str() + .ok() + .and_then(|s| s.strip_prefix("bytes=")) + .and_then(|s| s.strip_suffix('-')) + .and_then(|s| s.parse().ok()); + if let Some(offset) = offset { + status = StatusCode::PARTIAL_CONTENT; + stream + .seek(SeekFrom::Start(offset)) + .await + .context("error seeking")?; + + output_headers.insert( + http::header::CONTENT_LENGTH, + HeaderValue::from_str(&format!("{}", stream.len() - stream.position())) + .context("bug")?, + ); + output_headers.insert( + http::header::CONTENT_RANGE, + HeaderValue::from_str(&format!( + "bytes {}-{}/{}", + stream.position(), + stream.len().saturating_sub(1), + stream.len() + )) + .context("bug")?, + ); + } + } else { + output_headers.insert( + http::header::CONTENT_LENGTH, + HeaderValue::from_str(&format!("{}", stream.len())).context("bug")?, + ); + } + + let s = tokio_util::io::ReaderStream::with_capacity(stream, 65536); + Ok((status, (output_headers, axum::body::Body::from_stream(s)))) +} + +async fn torrent_action_pause( + State(state): State, + Path(idx): Path, +) -> Result { + state.api_torrent_action_pause(idx).await.map(axum::Json) +} + +async fn torrent_action_start( + State(state): State, + Path(idx): Path, +) -> Result { + state.api_torrent_action_start(idx).await.map(axum::Json) +} + +async fn torrent_action_forget( + State(state): State, + Path(idx): Path, +) -> Result { + state.api_torrent_action_forget(idx).await.map(axum::Json) +} + +async fn torrent_action_delete( + State(state): State, + Path(idx): Path, +) -> Result { + state.api_torrent_action_delete(idx).await.map(axum::Json) +} + +#[derive(Deserialize)] +struct UpdateOnlyFilesRequest { + only_files: Vec, +} + +async fn torrent_action_update_only_files( + State(state): State, + Path(idx): Path, + axum::Json(req): axum::Json, +) -> Result { + state + .api_torrent_action_update_only_files(idx, &req.only_files.into_iter().collect()) + .await + .map(axum::Json) +} + +async fn set_rust_log( + State(state): State, + new_value: String, +) -> Result { + state.api_set_rust_log(new_value).map(axum::Json) +} + +async fn stream_logs(State(state): State) -> Result { + let s = state.api_log_lines_stream()?.map_err(|e| { + debug!(error=%e, "stream_logs"); + e + }); + Ok(axum::body::Body::from_stream(s)) +} + +async fn update_session_ratelimits( + State(state): State, + Json(limits): Json, +) -> Result { + state.session().ratelimits.set_upload_bps(limits.upload_bps); + state + .session() + .ratelimits + .set_download_bps(limits.download_bps); + Ok(Json(EmptyJsonResponse {})) +} + impl HttpApi { pub fn new(api: Api, opts: Option) -> Self { Self { @@ -205,434 +631,6 @@ impl HttpApi { ).into_response() }; - async fn dht_stats(State(state): State) -> Result { - state.api_dht_stats().map(axum::Json) - } - - async fn dht_table(State(state): State) -> Result { - state.api_dht_table().map(axum::Json) - } - - async fn session_stats(State(state): State) -> impl IntoResponse { - axum::Json(state.api_session_stats()) - } - - async fn torrents_list( - State(state): State, - Query(opts): Query, - ) -> impl IntoResponse { - axum::Json(state.api_torrent_list_ext(opts)) - } - - async fn torrents_post( - State(state): State, - Query(params): Query, - Timeout(timeout): Timeout<600_000, 3_600_000>, - data: Bytes, - ) -> Result { - let is_url = params.is_url; - let opts = params.into_add_torrent_options(); - let data = data.to_vec(); - let maybe_magnet = |data: &[u8]| -> bool { - std::str::from_utf8(data) - .ok() - .and_then(|s| Magnet::parse(s).ok()) - .is_some() - }; - let add = match is_url { - Some(true) => AddTorrent::Url( - String::from_utf8(data) - .context("invalid utf-8 for passed URL")? - .into(), - ), - Some(false) => AddTorrent::TorrentFileBytes(data.into()), - - // Guess the format. - None if SUPPORTED_SCHEMES - .iter() - .any(|s| data.starts_with(s.as_bytes())) - || maybe_magnet(&data) => - { - AddTorrent::Url( - String::from_utf8(data) - .context("invalid utf-8 for passed URL")? - .into(), - ) - } - _ => AddTorrent::TorrentFileBytes(data.into()), - }; - tokio::time::timeout(timeout, state.api_add_torrent(add, Some(opts))) - .await - .context("timeout")? - .map(axum::Json) - } - - async fn torrent_details( - State(state): State, - Path(idx): Path, - ) -> Result { - state.api_torrent_details(idx).map(axum::Json) - } - - fn torrent_playlist_items(handle: &ManagedTorrent) -> Result> { - let mut playlist_items = handle - .metadata - .load() - .as_ref() - .context("torrent metadata not resolved")? - .info - .iter_file_details()? - .enumerate() - .filter_map(|(file_idx, file_details)| { - let filename = file_details.filename.to_vec().ok()?.join("/"); - let is_playable = mime_guess::from_path(&filename) - .first() - .map(|mime| { - mime.type_() == mime_guess::mime::VIDEO - || mime.type_() == mime_guess::mime::AUDIO - }) - .unwrap_or(false); - if is_playable { - let filename = urlencoding::encode(&filename); - Some((file_idx, filename.into_owned())) - } else { - None - } - }) - .collect::>(); - playlist_items.sort_by(|left, right| left.1.cmp(&right.1)); - Ok(playlist_items) - } - - fn get_host(headers: &HeaderMap) -> Result<&str> { - Ok(headers - .get("host") - .ok_or_else(|| { - ApiError::new_from_text(StatusCode::BAD_REQUEST, "Missing host header") - })? - .to_str() - .context("hostname is not string")?) - } - - fn build_playlist_content( - host: &str, - it: impl IntoIterator, - ) -> impl IntoResponse { - let body = it - .into_iter() - .map(|(torrent_idx, file_idx, filename)| { - // TODO: add #EXTINF:{duration} and maybe codecs ? - format!("http://{host}/torrents/{torrent_idx}/stream/{file_idx}/{filename}") - }) - .join("\r\n"); - ( - [ - ("Content-Type", "application/mpegurl; charset=utf-8"), - ( - "Content-Disposition", - "attachment; filename=\"rqbit-playlist.m3u8\"", - ), - ], - format!("#EXTM3U\r\n{body}"), // https://en.wikipedia.org/wiki/M3U - ) - } - - async fn resolve_magnet( - State(state): State, - Timeout(timeout): Timeout<600_000, 3_600_000>, - inp_headers: HeaderMap, - url: String, - ) -> Result { - let added = tokio::time::timeout( - timeout, - state.session().add_torrent( - AddTorrent::from_url(&url), - Some(AddTorrentOptions { - list_only: true, - ..Default::default() - }), - ), - ) - .await - .context("timeout")??; - - let (info, content) = match added { - crate::AddTorrentResponse::AlreadyManaged(_, handle) => { - handle.with_metadata(|r| (r.info.clone(), r.torrent_bytes.clone()))? - } - crate::AddTorrentResponse::ListOnly(ListOnlyResponse { - info, - torrent_bytes, - .. - }) => (info, torrent_bytes), - crate::AddTorrentResponse::Added(_, _) => { - return Err(ApiError::new_from_text( - StatusCode::INTERNAL_SERVER_ERROR, - "bug: torrent was added to session, but shouldn't have been", - )) - } - }; - - let mut headers = HeaderMap::new(); - - if inp_headers - .get("Accept") - .and_then(|v| std::str::from_utf8(v.as_bytes()).ok()) - == Some("application/json") - { - let data = bencode::dyn_from_bytes::>(&content) - .context("error decoding .torrent file content")?; - let data = serde_json::to_string(&data).context("error serializing")?; - headers.insert("Content-Type", HeaderValue::from_static("application/json")); - return Ok((headers, data).into_response()); - } - - headers.insert( - "Content-Type", - HeaderValue::from_static("application/x-bittorrent"), - ); - - if let Some(name) = info.name.as_ref() { - if let Ok(name) = std::str::from_utf8(name) { - if let Ok(h) = - HeaderValue::from_str(&format!("attachment; filename=\"{}.torrent\"", name)) - { - headers.insert("Content-Disposition", h); - } - } - } - Ok((headers, content).into_response()) - } - - async fn torrent_playlist( - State(state): State, - headers: HeaderMap, - Path(idx): Path, - ) -> Result { - let host = get_host(&headers)?; - let playlist_items = torrent_playlist_items(&*state.mgr_handle(idx)?)?; - Ok(build_playlist_content( - host, - playlist_items - .into_iter() - .map(move |(file_idx, filename)| (idx, file_idx, filename)), - )) - } - - async fn global_playlist( - State(state): State, - headers: HeaderMap, - ) -> Result { - let host = get_host(&headers)?; - let all_items = state.session().with_torrents(|torrents| { - torrents - .filter_map(|(torrent_idx, handle)| { - torrent_playlist_items(handle) - .map(move |items| { - items.into_iter().map(move |(file_idx, filename)| { - (torrent_idx.into(), file_idx, filename) - }) - }) - .ok() - }) - .flatten() - .collect::>() - }); - Ok(build_playlist_content(host, all_items)) - } - - async fn torrent_haves( - State(state): State, - Path(idx): Path, - ) -> Result { - state.api_dump_haves(idx) - } - - async fn torrent_stats_v0( - State(state): State, - Path(idx): Path, - ) -> Result { - state.api_stats_v0(idx).map(axum::Json) - } - - async fn torrent_stats_v1( - State(state): State, - Path(idx): Path, - ) -> Result { - state.api_stats_v1(idx).map(axum::Json) - } - - async fn peer_stats( - State(state): State, - Path(idx): Path, - Query(filter): Query, - ) -> Result { - state.api_peer_stats(idx, filter).map(axum::Json) - } - - #[derive(Deserialize)] - struct StreamPathParams { - id: TorrentIdOrHash, - file_id: usize, - #[serde(rename = "filename")] - _filename: Option>, - } - - async fn torrent_stream_file( - State(state): State, - Path(StreamPathParams { id, file_id, .. }): Path, - headers: http::HeaderMap, - ) -> Result { - let mut stream = state.api_stream(id, file_id)?; - let mut status = StatusCode::OK; - let mut output_headers = HeaderMap::new(); - output_headers.insert("Accept-Ranges", HeaderValue::from_static("bytes")); - - const DLNA_TRANSFER_MODE: &str = "transferMode.dlna.org"; - const DLNA_GET_CONTENT_FEATURES: &str = "getcontentFeatures.dlna.org"; - const DLNA_CONTENT_FEATURES: &str = "contentFeatures.dlna.org"; - - if headers - .get(DLNA_TRANSFER_MODE) - .map(|v| matches!(v.as_bytes(), b"Streaming" | b"streaming")) - .unwrap_or(false) - { - output_headers.insert(DLNA_TRANSFER_MODE, HeaderValue::from_static("Streaming")); - } - - if headers - .get(DLNA_GET_CONTENT_FEATURES) - .map(|v| v.as_bytes() == b"1") - .unwrap_or(false) - { - output_headers.insert( - DLNA_CONTENT_FEATURES, - HeaderValue::from_static("DLNA.ORG_OP=01"), - ); - } - - if let Ok(mime) = state.torrent_file_mime_type(id, file_id) { - output_headers.insert( - http::header::CONTENT_TYPE, - HeaderValue::from_str(mime).context("bug - invalid MIME")?, - ); - } - - let range_header = headers.get(http::header::RANGE); - trace!(torrent_id=%id, file_id=file_id, range=?range_header, "request for HTTP stream"); - - if let Some(range) = range_header { - let offset: Option = range - .to_str() - .ok() - .and_then(|s| s.strip_prefix("bytes=")) - .and_then(|s| s.strip_suffix('-')) - .and_then(|s| s.parse().ok()); - if let Some(offset) = offset { - status = StatusCode::PARTIAL_CONTENT; - stream - .seek(SeekFrom::Start(offset)) - .await - .context("error seeking")?; - - output_headers.insert( - http::header::CONTENT_LENGTH, - HeaderValue::from_str(&format!("{}", stream.len() - stream.position())) - .context("bug")?, - ); - output_headers.insert( - http::header::CONTENT_RANGE, - HeaderValue::from_str(&format!( - "bytes {}-{}/{}", - stream.position(), - stream.len().saturating_sub(1), - stream.len() - )) - .context("bug")?, - ); - } - } else { - output_headers.insert( - http::header::CONTENT_LENGTH, - HeaderValue::from_str(&format!("{}", stream.len())).context("bug")?, - ); - } - - let s = tokio_util::io::ReaderStream::with_capacity(stream, 65536); - Ok((status, (output_headers, axum::body::Body::from_stream(s)))) - } - - async fn torrent_action_pause( - State(state): State, - Path(idx): Path, - ) -> Result { - state.api_torrent_action_pause(idx).await.map(axum::Json) - } - - async fn torrent_action_start( - State(state): State, - Path(idx): Path, - ) -> Result { - state.api_torrent_action_start(idx).await.map(axum::Json) - } - - async fn torrent_action_forget( - State(state): State, - Path(idx): Path, - ) -> Result { - state.api_torrent_action_forget(idx).await.map(axum::Json) - } - - async fn torrent_action_delete( - State(state): State, - Path(idx): Path, - ) -> Result { - state.api_torrent_action_delete(idx).await.map(axum::Json) - } - - #[derive(Deserialize)] - struct UpdateOnlyFilesRequest { - only_files: Vec, - } - - async fn torrent_action_update_only_files( - State(state): State, - Path(idx): Path, - axum::Json(req): axum::Json, - ) -> Result { - state - .api_torrent_action_update_only_files(idx, &req.only_files.into_iter().collect()) - .await - .map(axum::Json) - } - - async fn set_rust_log( - State(state): State, - new_value: String, - ) -> Result { - state.api_set_rust_log(new_value).map(axum::Json) - } - - async fn stream_logs(State(state): State) -> Result { - let s = state.api_log_lines_stream()?.map_err(|e| { - debug!(error=%e, "stream_logs"); - e - }); - Ok(axum::body::Body::from_stream(s)) - } - - async fn update_session_ratelimits( - State(state): State, - Json(limits): Json, - ) -> Result { - state.session().ratelimits.set_upload_bps(limits.upload_bps); - state - .session() - .ratelimits - .set_download_bps(limits.download_bps); - Ok(Json(EmptyJsonResponse {})) - } - let mut app = Router::new() .route("/", get(api_root)) .route("/stream_logs", get(stream_logs))