Split up http_api into files

This commit is contained in:
Igor Katson 2025-01-14 11:13:42 +00:00
parent bde8d7cf40
commit c60f36540e
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
17 changed files with 1046 additions and 954 deletions

View file

@ -12,9 +12,10 @@ readme = "README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["default-tls"]
default = ["default-tls", "http-api-client"]
tokio-console = ["console-subscriber", "tokio/tracing"]
http-api = ["axum", "tower-http"]
http-api-client = []
upnp-serve-adapter = ["upnp-serve"]
webui = []
timed_existence = []

View file

@ -1,951 +0,0 @@
use anyhow::Context;
use axum::body::Bytes;
use axum::extract::{ConnectInfo, Path, Query, Request, State};
use axum::middleware::Next;
use axum::response::{IntoResponse, Redirect};
use axum::routing::{get, post};
use base64::Engine;
use bencode::AsDisplay;
use buffers::ByteBuf;
use futures::future::BoxFuture;
use futures::{FutureExt, TryStreamExt};
use http::request::Parts;
use http::{HeaderMap, HeaderValue, StatusCode};
use itertools::Itertools;
use librqbit_core::magnet::Magnet;
use serde::{Deserialize, Serialize};
use std::io::SeekFrom;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use tokio::io::AsyncSeekExt;
use tokio::net::TcpListener;
use tower_http::trace::{DefaultOnFailure, DefaultOnResponse, OnFailure};
use tracing::{debug, error_span, info, trace, Span};
use axum::{Json, Router};
use crate::api::{Api, ApiTorrentListOpts, EmptyJsonResponse, TorrentIdOrHash};
use crate::limits::LimitsConfig;
use crate::peer_connection::PeerConnectionOptions;
use crate::session::{AddTorrent, AddTorrentOptions, SUPPORTED_SCHEMES};
use crate::torrent_state::peer::stats::snapshot::PeerStatsFilter;
type ApiState = Arc<HttpApi>;
use crate::api::Result;
use crate::{ApiError, ListOnlyResponse, ManagedTorrent};
/// An HTTP server for the API.
pub struct HttpApi {
api: Api,
opts: HttpApiOptions,
}
#[derive(Debug, Default)]
pub struct HttpApiOptions {
pub read_only: bool,
pub basic_auth: Option<(String, String)>,
}
async fn simple_basic_auth(
expected_username: Option<&str>,
expected_password: Option<&str>,
headers: HeaderMap,
request: axum::extract::Request,
next: Next,
) -> Result<axum::response::Response> {
let (expected_user, expected_pass) = match (expected_username, expected_password) {
(Some(u), Some(p)) => (u, p),
_ => return Ok(next.run(request).await),
};
let user_pass = headers
.get("Authorization")
.and_then(|h| h.to_str().ok())
.and_then(|h| h.strip_prefix("Basic "))
.and_then(|v| base64::engine::general_purpose::STANDARD.decode(v).ok())
.and_then(|v| String::from_utf8(v).ok());
let user_pass = match user_pass {
Some(user_pass) => user_pass,
None => {
return Ok((
StatusCode::UNAUTHORIZED,
[("WWW-Authenticate", "Basic realm=\"API\"")],
)
.into_response())
}
};
// TODO: constant time compare
match user_pass.split_once(':') {
Some((u, p)) if u == expected_user && p == expected_pass => Ok(next.run(request).await),
_ => Err(ApiError::unathorized()),
}
}
mod timeout {
use std::time::Duration;
use anyhow::Context;
use axum::{extract::Query, RequestPartsExt};
use http::request::Parts;
use serde::Deserialize;
use crate::ApiError;
pub struct Timeout<const DEFAULT_MS: usize, const MAX_MS: usize>(pub Duration);
impl<S, const DEFAULT_MS: usize, const MAX_MS: usize> axum::extract::FromRequestParts<S>
for Timeout<DEFAULT_MS, MAX_MS>
where
S: Send + Sync,
{
type Rejection = ApiError;
/// Perform the extraction.
async fn from_request_parts(
parts: &mut Parts,
_state: &S,
) -> Result<Self, Self::Rejection> {
#[derive(Deserialize)]
struct QueryT {
timeout_ms: Option<usize>,
}
let q = parts
.extract::<Query<QueryT>>()
.await
.context("error running Timeout extractor")?;
let timeout_ms = q
.timeout_ms
.map(Ok)
.or_else(|| {
parts
.headers
.get("x-req-timeout-ms")
.map(|v| {
std::str::from_utf8(v.as_bytes())
.context("invalid utf-8 in timeout value")
})
.map(|v| {
v.and_then(|v| v.parse::<usize>().context("invalid timeout integer"))
})
})
.transpose()
.context("error parsing timeout")?
.unwrap_or(DEFAULT_MS);
let timeout_ms = timeout_ms.min(MAX_MS);
Ok(Timeout(Duration::from_millis(timeout_ms as u64)))
}
}
}
use timeout::Timeout;
async fn h_dht_stats(State(state): State<ApiState>) -> Result<impl IntoResponse> {
state.api.api_dht_stats().map(axum::Json)
}
async fn h_dht_table(State(state): State<ApiState>) -> Result<impl IntoResponse> {
state.api.api_dht_table().map(axum::Json)
}
async fn h_session_stats(State(state): State<ApiState>) -> impl IntoResponse {
axum::Json(state.api.api_session_stats())
}
async fn h_torrents_list(
State(state): State<ApiState>,
Query(opts): Query<ApiTorrentListOpts>,
) -> impl IntoResponse {
axum::Json(state.api.api_torrent_list_ext(opts))
}
async fn h_torrents_post(
State(state): State<ApiState>,
Query(params): Query<TorrentAddQueryParams>,
Timeout(timeout): Timeout<600_000, 3_600_000>,
data: Bytes,
) -> Result<impl IntoResponse> {
let is_url = params.is_url;
let opts = params.into_add_torrent_options();
let data = data.to_vec();
let maybe_magnet = |data: &[u8]| -> bool {
std::str::from_utf8(data)
.ok()
.and_then(|s| Magnet::parse(s).ok())
.is_some()
};
let add = match is_url {
Some(true) => AddTorrent::Url(
String::from_utf8(data)
.context("invalid utf-8 for passed URL")?
.into(),
),
Some(false) => AddTorrent::TorrentFileBytes(data.into()),
// Guess the format.
None if SUPPORTED_SCHEMES
.iter()
.any(|s| data.starts_with(s.as_bytes()))
|| maybe_magnet(&data) =>
{
AddTorrent::Url(
String::from_utf8(data)
.context("invalid utf-8 for passed URL")?
.into(),
)
}
_ => AddTorrent::TorrentFileBytes(data.into()),
};
tokio::time::timeout(timeout, state.api.api_add_torrent(add, Some(opts)))
.await
.context("timeout")?
.map(axum::Json)
}
async fn h_torrent_details(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api.api_torrent_details(idx).map(axum::Json)
}
fn torrent_playlist_items(handle: &ManagedTorrent) -> Result<Vec<(usize, String)>> {
let mut playlist_items = handle
.metadata
.load()
.as_ref()
.context("torrent metadata not resolved")?
.info
.iter_file_details()?
.enumerate()
.filter_map(|(file_idx, file_details)| {
let filename = file_details.filename.to_vec().ok()?.join("/");
let is_playable = mime_guess::from_path(&filename)
.first()
.map(|mime| {
mime.type_() == mime_guess::mime::VIDEO
|| mime.type_() == mime_guess::mime::AUDIO
})
.unwrap_or(false);
if is_playable {
let filename = urlencoding::encode(&filename);
Some((file_idx, filename.into_owned()))
} else {
None
}
})
.collect::<Vec<_>>();
playlist_items.sort_by(|left, right| left.1.cmp(&right.1));
Ok(playlist_items)
}
fn get_host(headers: &HeaderMap) -> Result<&str> {
Ok(headers
.get("host")
.ok_or_else(|| ApiError::new_from_text(StatusCode::BAD_REQUEST, "Missing host header"))?
.to_str()
.context("hostname is not string")?)
}
fn build_playlist_content(
host: &str,
it: impl IntoIterator<Item = (TorrentIdOrHash, usize, String)>,
) -> impl IntoResponse {
let body = it
.into_iter()
.map(|(torrent_idx, file_idx, filename)| {
// TODO: add #EXTINF:{duration} and maybe codecs ?
format!("http://{host}/torrents/{torrent_idx}/stream/{file_idx}/{filename}")
})
.join("\r\n");
(
[
("Content-Type", "application/mpegurl; charset=utf-8"),
(
"Content-Disposition",
"attachment; filename=\"rqbit-playlist.m3u8\"",
),
],
format!("#EXTM3U\r\n{body}"), // https://en.wikipedia.org/wiki/M3U
)
}
async fn h_resolve_magnet(
State(state): State<ApiState>,
Timeout(timeout): Timeout<600_000, 3_600_000>,
inp_headers: HeaderMap,
url: String,
) -> Result<impl IntoResponse> {
let added = tokio::time::timeout(
timeout,
state.api.session().add_torrent(
AddTorrent::from_url(&url),
Some(AddTorrentOptions {
list_only: true,
..Default::default()
}),
),
)
.await
.context("timeout")??;
let (info, content) = match added {
crate::AddTorrentResponse::AlreadyManaged(_, handle) => {
handle.with_metadata(|r| (r.info.clone(), r.torrent_bytes.clone()))?
}
crate::AddTorrentResponse::ListOnly(ListOnlyResponse {
info,
torrent_bytes,
..
}) => (info, torrent_bytes),
crate::AddTorrentResponse::Added(_, _) => {
return Err(ApiError::new_from_text(
StatusCode::INTERNAL_SERVER_ERROR,
"bug: torrent was added to session, but shouldn't have been",
))
}
};
let mut headers = HeaderMap::new();
if inp_headers
.get("Accept")
.and_then(|v| std::str::from_utf8(v.as_bytes()).ok())
== Some("application/json")
{
let data = bencode::dyn_from_bytes::<AsDisplay<ByteBuf>>(&content)
.context("error decoding .torrent file content")?;
let data = serde_json::to_string(&data).context("error serializing")?;
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
return Ok((headers, data).into_response());
}
headers.insert(
"Content-Type",
HeaderValue::from_static("application/x-bittorrent"),
);
if let Some(name) = info.name.as_ref() {
if let Ok(name) = std::str::from_utf8(name) {
if let Ok(h) =
HeaderValue::from_str(&format!("attachment; filename=\"{}.torrent\"", name))
{
headers.insert("Content-Disposition", h);
}
}
}
Ok((headers, content).into_response())
}
async fn h_torrent_playlist(
State(state): State<ApiState>,
headers: HeaderMap,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
let host = get_host(&headers)?;
let playlist_items = torrent_playlist_items(&*state.api.mgr_handle(idx)?)?;
Ok(build_playlist_content(
host,
playlist_items
.into_iter()
.map(move |(file_idx, filename)| (idx, file_idx, filename)),
))
}
async fn h_global_playlist(
State(state): State<ApiState>,
headers: HeaderMap,
) -> Result<impl IntoResponse> {
let host = get_host(&headers)?;
let all_items = state.api.session().with_torrents(|torrents| {
torrents
.filter_map(|(torrent_idx, handle)| {
torrent_playlist_items(handle)
.map(move |items| {
items.into_iter().map(move |(file_idx, filename)| {
(torrent_idx.into(), file_idx, filename)
})
})
.ok()
})
.flatten()
.collect::<Vec<_>>()
});
Ok(build_playlist_content(host, all_items))
}
async fn h_torrent_haves(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api.api_dump_haves(idx)
}
async fn h_torrent_stats_v0(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api.api_stats_v0(idx).map(axum::Json)
}
async fn h_torrent_stats_v1(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api.api_stats_v1(idx).map(axum::Json)
}
async fn h_peer_stats(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
Query(filter): Query<PeerStatsFilter>,
) -> Result<impl IntoResponse> {
state.api.api_peer_stats(idx, filter).map(axum::Json)
}
#[derive(Deserialize)]
struct StreamPathParams {
id: TorrentIdOrHash,
file_id: usize,
#[serde(rename = "filename")]
_filename: Option<Arc<str>>,
}
async fn h_torrent_stream_file(
State(state): State<ApiState>,
Path(StreamPathParams { id, file_id, .. }): Path<StreamPathParams>,
headers: http::HeaderMap,
) -> Result<impl IntoResponse> {
let mut stream = state.api.api_stream(id, file_id)?;
let mut status = StatusCode::OK;
let mut output_headers = HeaderMap::new();
output_headers.insert("Accept-Ranges", HeaderValue::from_static("bytes"));
const DLNA_TRANSFER_MODE: &str = "transferMode.dlna.org";
const DLNA_GET_CONTENT_FEATURES: &str = "getcontentFeatures.dlna.org";
const DLNA_CONTENT_FEATURES: &str = "contentFeatures.dlna.org";
if headers
.get(DLNA_TRANSFER_MODE)
.map(|v| matches!(v.as_bytes(), b"Streaming" | b"streaming"))
.unwrap_or(false)
{
output_headers.insert(DLNA_TRANSFER_MODE, HeaderValue::from_static("Streaming"));
}
if headers
.get(DLNA_GET_CONTENT_FEATURES)
.map(|v| v.as_bytes() == b"1")
.unwrap_or(false)
{
output_headers.insert(
DLNA_CONTENT_FEATURES,
HeaderValue::from_static("DLNA.ORG_OP=01"),
);
}
if let Ok(mime) = state.api.torrent_file_mime_type(id, file_id) {
output_headers.insert(
http::header::CONTENT_TYPE,
HeaderValue::from_str(mime).context("bug - invalid MIME")?,
);
}
let range_header = headers.get(http::header::RANGE);
trace!(torrent_id=%id, file_id=file_id, range=?range_header, "request for HTTP stream");
if let Some(range) = range_header {
let offset: Option<u64> = range
.to_str()
.ok()
.and_then(|s| s.strip_prefix("bytes="))
.and_then(|s| s.strip_suffix('-'))
.and_then(|s| s.parse().ok());
if let Some(offset) = offset {
status = StatusCode::PARTIAL_CONTENT;
stream
.seek(SeekFrom::Start(offset))
.await
.context("error seeking")?;
output_headers.insert(
http::header::CONTENT_LENGTH,
HeaderValue::from_str(&format!("{}", stream.len() - stream.position()))
.context("bug")?,
);
output_headers.insert(
http::header::CONTENT_RANGE,
HeaderValue::from_str(&format!(
"bytes {}-{}/{}",
stream.position(),
stream.len().saturating_sub(1),
stream.len()
))
.context("bug")?,
);
}
} else {
output_headers.insert(
http::header::CONTENT_LENGTH,
HeaderValue::from_str(&format!("{}", stream.len())).context("bug")?,
);
}
let s = tokio_util::io::ReaderStream::with_capacity(stream, 65536);
Ok((status, (output_headers, axum::body::Body::from_stream(s))))
}
async fn h_torrent_action_pause(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state
.api
.api_torrent_action_pause(idx)
.await
.map(axum::Json)
}
async fn h_torrent_action_start(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state
.api
.api_torrent_action_start(idx)
.await
.map(axum::Json)
}
async fn h_torrent_action_forget(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state
.api
.api_torrent_action_forget(idx)
.await
.map(axum::Json)
}
async fn h_torrent_action_delete(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state
.api
.api_torrent_action_delete(idx)
.await
.map(axum::Json)
}
#[derive(Deserialize)]
struct UpdateOnlyFilesRequest {
only_files: Vec<usize>,
}
async fn h_torrent_action_update_only_files(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
axum::Json(req): axum::Json<UpdateOnlyFilesRequest>,
) -> Result<impl IntoResponse> {
state
.api
.api_torrent_action_update_only_files(idx, &req.only_files.into_iter().collect())
.await
.map(axum::Json)
}
async fn h_set_rust_log(
State(state): State<ApiState>,
new_value: String,
) -> Result<impl IntoResponse> {
state.api.api_set_rust_log(new_value).map(axum::Json)
}
async fn h_stream_logs(State(state): State<ApiState>) -> Result<impl IntoResponse> {
let s = state.api.api_log_lines_stream()?.map_err(|e| {
debug!(error=%e, "stream_logs");
e
});
Ok(axum::body::Body::from_stream(s))
}
async fn h_update_session_ratelimits(
State(state): State<ApiState>,
Json(limits): Json<LimitsConfig>,
) -> Result<impl IntoResponse> {
state
.api
.session()
.ratelimits
.set_upload_bps(limits.upload_bps);
state
.api
.session()
.ratelimits
.set_download_bps(limits.download_bps);
Ok(Json(EmptyJsonResponse {}))
}
async fn h_api_root(parts: Parts) -> impl IntoResponse {
// If browser, and webui enabled, redirect to web
#[cfg(feature = "webui")]
{
if parts
.headers
.get("Accept")
.and_then(|h| h.to_str().ok())
.map_or(false, |h| h.contains("text/html"))
{
return Redirect::temporary("./web/").into_response();
}
}
static API_ROOT_JSON: LazyLock<Arc<serde_json::Value>> = LazyLock::new(|| {
Arc::new(serde_json::json!({
"apis": {
"GET /": "list all available APIs",
"GET /dht/stats": "DHT stats",
"GET /dht/table": "DHT routing table",
"GET /torrents": "List torrents",
"GET /torrents/playlist": "Generate M3U8 playlist for all files in all torrents",
"GET /stats": "Global session stats",
"POST /torrents/resolve_magnet": "Resolve a magnet to torrent file bytes",
"GET /torrents/{id_or_infohash}": "Torrent details",
"GET /torrents/{id_or_infohash}/haves": "The bitfield of have pieces",
"GET /torrents/{id_or_infohash}/playlist": "Generate M3U8 playlist for this torrent",
"GET /torrents/{id_or_infohash}/stats/v1": "Torrent stats",
"GET /torrents/{id_or_infohash}/peer_stats": "Per peer stats",
"GET /torrents/{id_or_infohash}/stream/{file_idx}": "Stream a file. Accepts Range header to seek.",
"POST /torrents/{id_or_infohash}/pause": "Pause torrent",
"POST /torrents/{id_or_infohash}/start": "Resume torrent",
"POST /torrents/{id_or_infohash}/forget": "Forget about the torrent, keep the files",
"POST /torrents/{id_or_infohash}/delete": "Forget about the torrent, remove the files",
"POST /torrents/{id_or_infohash}/update_only_files": "Change the selection of files to download. You need to POST json of the following form {\"only_files\": [0, 1, 2]}",
"POST /torrents": "Add a torrent here. magnet: or http:// or a local file.",
"POST /rust_log": "Set RUST_LOG to this post launch (for debugging)",
"GET /web/": "Web UI",
},
"server": "rqbit",
"version": env!("CARGO_PKG_VERSION"),
}))
});
(
[("Content-Type", "application/json")],
axum::Json(API_ROOT_JSON.clone()),
)
.into_response()
}
fn make_api_router(state: ApiState) -> Router {
let mut api_router = Router::new()
.route("/", get(h_api_root))
.route("/stream_logs", get(h_stream_logs))
.route("/rust_log", post(h_set_rust_log))
.route("/dht/stats", get(h_dht_stats))
.route("/dht/table", get(h_dht_table))
.route("/stats", get(h_session_stats))
.route("/torrents", get(h_torrents_list))
.route("/torrents/{id}", get(h_torrent_details))
.route("/torrents/{id}/haves", get(h_torrent_haves))
.route("/torrents/{id}/stats", get(h_torrent_stats_v0))
.route("/torrents/{id}/stats/v1", get(h_torrent_stats_v1))
.route("/torrents/{id}/peer_stats", get(h_peer_stats))
.route("/torrents/{id}/playlist", get(h_torrent_playlist))
.route("/torrents/playlist", get(h_global_playlist))
.route("/torrents/resolve_magnet", post(h_resolve_magnet))
.route(
"/torrents/{id}/stream/{file_id}",
get(h_torrent_stream_file),
)
.route(
"/torrents/{id}/stream/{file_id}/{*filename}",
get(h_torrent_stream_file),
);
if !state.opts.read_only {
api_router = api_router
.route("/torrents", post(h_torrents_post))
.route("/torrents/limits", post(h_update_session_ratelimits))
.route("/torrents/{id}/pause", post(h_torrent_action_pause))
.route("/torrents/{id}/start", post(h_torrent_action_start))
.route("/torrents/{id}/forget", post(h_torrent_action_forget))
.route("/torrents/{id}/delete", post(h_torrent_action_delete))
.route(
"/torrents/{id}/update_only_files",
post(h_torrent_action_update_only_files),
);
}
api_router.with_state(state)
}
#[cfg(feature = "webui")]
fn make_webui_router() -> Router {
Router::new()
.route(
"/",
get(|| async {
(
[("Content-Type", "text/html")],
include_str!("../webui/dist/index.html"),
)
}),
)
.route(
"/assets/index.js",
get(|| async {
(
[("Content-Type", "application/javascript")],
include_str!("../webui/dist/assets/index.js"),
)
}),
)
.route(
"/assets/index.css",
get(|| async {
(
[("Content-Type", "text/css")],
include_str!("../webui/dist/assets/index.css"),
)
}),
)
.route(
"/assets/logo.svg",
get(|| async {
(
[("Content-Type", "image/svg+xml")],
include_str!("../webui/dist/assets/logo.svg"),
)
}),
)
}
impl HttpApi {
pub fn new(api: Api, opts: Option<HttpApiOptions>) -> Self {
Self {
api,
opts: opts.unwrap_or_default(),
}
}
/// Run the HTTP server forever on the given address.
/// If read_only is passed, no state-modifying methods will be exposed.
#[inline(never)]
pub fn make_http_api_and_run(
self,
listener: TcpListener,
upnp_router: Option<Router>,
) -> BoxFuture<'static, anyhow::Result<()>> {
let state = Arc::new(self);
let mut main_router = make_api_router(state.clone());
#[cfg(feature = "webui")]
{
use axum::response::Redirect;
let webui_router = make_webui_router();
main_router = main_router.nest("/web/", webui_router);
main_router = main_router.route("/web", get(|| async { Redirect::permanent("./web/") }))
}
let cors_layer = {
use tower_http::cors::{AllowHeaders, AllowOrigin};
const ALLOWED_ORIGINS: [&[u8]; 4] = [
// Webui-dev
b"http://localhost:3031",
b"http://127.0.0.1:3031",
// Tauri dev
b"http://localhost:1420",
// Tauri prod
b"tauri://localhost",
];
let allow_regex = std::env::var("CORS_ALLOW_REGEXP")
.ok()
.and_then(|value| regex::bytes::Regex::new(&value).ok());
tower_http::cors::CorsLayer::default()
.allow_origin(AllowOrigin::predicate(move |v, _| {
ALLOWED_ORIGINS.contains(&v.as_bytes())
|| allow_regex
.as_ref()
.map(move |r| r.is_match(v.as_bytes()))
.unwrap_or(false)
}))
.allow_headers(AllowHeaders::any())
};
// Simple one-user basic auth
if let Some((user, pass)) = state.opts.basic_auth.clone() {
info!("Enabling simple basic authentication in HTTP API");
main_router = main_router.route_layer(axum::middleware::from_fn(
move |headers, request, next| {
let user = user.clone();
let pass = pass.clone();
async move {
simple_basic_auth(Some(&user), Some(&pass), headers, request, next).await
}
},
));
}
if let Some(upnp_router) = upnp_router {
main_router = main_router.nest("/upnp", upnp_router);
}
let app = main_router
.layer(cors_layer)
.layer(
tower_http::trace::TraceLayer::new_for_http()
.make_span_with(|req: &Request| {
let method = req.method();
let uri = req.uri();
if let Some(ConnectInfo(addr)) =
req.extensions().get::<ConnectInfo<SocketAddr>>()
{
let addr = SocketAddr::new(addr.ip().to_canonical(), addr.port());
error_span!("request", %method, %uri, %addr)
} else {
error_span!("request", %method, %uri)
}
})
.on_request(|req: &Request, _: &Span| {
if req.uri().path().starts_with("/upnp") {
debug!(headers=?req.headers())
}
})
.on_response(DefaultOnResponse::new().include_headers(true))
.on_failure({
let mut default = DefaultOnFailure::new();
move |failure_class, latency, span: &Span| match failure_class {
tower_http::classify::ServerErrorsFailureClass::StatusCode(
StatusCode::NOT_IMPLEMENTED,
) => {}
_ => default.on_failure(failure_class, latency, span),
}
}),
)
.into_make_service_with_connect_info::<SocketAddr>();
async move {
axum::serve(listener, app)
.await
.context("error running HTTP API")
}
.boxed()
}
}
pub(crate) struct OnlyFiles(Vec<usize>);
pub(crate) struct InitialPeers(pub Vec<SocketAddr>);
#[derive(Serialize, Deserialize, Default)]
pub(crate) struct TorrentAddQueryParams {
pub overwrite: Option<bool>,
pub output_folder: Option<String>,
pub sub_folder: Option<String>,
pub only_files_regex: Option<String>,
pub only_files: Option<OnlyFiles>,
pub peer_connect_timeout: Option<u64>,
pub peer_read_write_timeout: Option<u64>,
pub initial_peers: Option<InitialPeers>,
// Will force interpreting the content as a URL.
pub is_url: Option<bool>,
pub list_only: Option<bool>,
}
impl Serialize for OnlyFiles {
fn serialize<S>(&self, serializer: S) -> core::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = self.0.iter().map(|id| id.to_string()).join(",");
s.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for OnlyFiles {
fn deserialize<D>(deserializer: D) -> core::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let s = String::deserialize(deserializer)?;
let list = s
.split(',')
.try_fold(Vec::<usize>::new(), |mut acc, c| match c.parse() {
Ok(i) => {
acc.push(i);
Ok(acc)
}
Err(_) => Err(D::Error::custom(format!(
"only_files: failed to parse {:?} as integer",
c
))),
})?;
if list.is_empty() {
return Err(D::Error::custom(
"only_files: should contain at least one file id",
));
}
Ok(OnlyFiles(list))
}
}
impl<'de> Deserialize<'de> for InitialPeers {
fn deserialize<D>(deserializer: D) -> std::prelude::v1::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let string = String::deserialize(deserializer)?;
let mut addrs = Vec::new();
for addr_str in string.split(',').filter(|s| !s.is_empty()) {
addrs.push(SocketAddr::from_str(addr_str).map_err(D::Error::custom)?);
}
Ok(InitialPeers(addrs))
}
}
impl Serialize for InitialPeers {
fn serialize<S>(&self, serializer: S) -> std::prelude::v1::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.0
.iter()
.map(|s| s.to_string())
.join(",")
.serialize(serializer)
}
}
impl TorrentAddQueryParams {
pub fn into_add_torrent_options(self) -> AddTorrentOptions {
AddTorrentOptions {
overwrite: self.overwrite.unwrap_or(false),
only_files_regex: self.only_files_regex,
only_files: self.only_files.map(|o| o.0),
output_folder: self.output_folder,
sub_folder: self.sub_folder,
list_only: self.list_only.unwrap_or(false),
initial_peers: self.initial_peers.map(|i| i.0),
peer_opts: Some(PeerConnectionOptions {
connect_timeout: self.peer_connect_timeout.map(Duration::from_secs),
read_write_timeout: self.peer_read_write_timeout.map(Duration::from_secs),
..Default::default()
}),
..Default::default()
}
}
}

View file

@ -0,0 +1,24 @@
use axum::{extract::State, response::IntoResponse, Json};
use super::ApiState;
use crate::{
api::{EmptyJsonResponse, Result},
limits::LimitsConfig,
};
pub async fn h_update_session_ratelimits(
State(state): State<ApiState>,
Json(limits): Json<LimitsConfig>,
) -> Result<impl IntoResponse> {
state
.api
.session()
.ratelimits
.set_upload_bps(limits.upload_bps);
state
.api
.session()
.ratelimits
.set_download_bps(limits.download_bps);
Ok(Json(EmptyJsonResponse {}))
}

View file

@ -0,0 +1,12 @@
use axum::{extract::State, response::IntoResponse};
use super::ApiState;
use crate::api::Result;
pub async fn h_dht_stats(State(state): State<ApiState>) -> Result<impl IntoResponse> {
state.api.api_dht_stats().map(axum::Json)
}
pub async fn h_dht_table(State(state): State<ApiState>) -> Result<impl IntoResponse> {
state.api.api_dht_table().map(axum::Json)
}

View file

@ -0,0 +1,21 @@
use axum::{extract::State, response::IntoResponse};
use futures::TryStreamExt;
use tracing::debug;
use super::ApiState;
use crate::api::Result;
pub async fn h_set_rust_log(
State(state): State<ApiState>,
new_value: String,
) -> Result<impl IntoResponse> {
state.api.api_set_rust_log(new_value).map(axum::Json)
}
pub async fn h_stream_logs(State(state): State<ApiState>) -> Result<impl IntoResponse> {
let s = state.api.api_log_lines_stream()?.map_err(|e| {
debug!(error=%e, "stream_logs");
e
});
Ok(axum::body::Body::from_stream(s))
}

View file

@ -0,0 +1,128 @@
mod configure;
mod dht;
mod logging;
mod other;
mod playlist;
mod streaming;
mod torrents;
use std::sync::{Arc, LazyLock};
use axum::{
response::{IntoResponse, Redirect},
routing::{get, post},
Router,
};
use http::request::Parts;
use super::HttpApi;
type ApiState = Arc<HttpApi>;
async fn h_api_root(parts: Parts) -> impl IntoResponse {
// If browser, and webui enabled, redirect to web
#[cfg(feature = "webui")]
{
if parts
.headers
.get("Accept")
.and_then(|h| h.to_str().ok())
.map_or(false, |h| h.contains("text/html"))
{
return Redirect::temporary("./web/").into_response();
}
}
static API_ROOT_JSON: LazyLock<Arc<serde_json::Value>> = LazyLock::new(|| {
Arc::new(serde_json::json!({
"apis": {
"GET /": "list all available APIs",
"GET /dht/stats": "DHT stats",
"GET /dht/table": "DHT routing table",
"GET /torrents": "List torrents",
"GET /torrents/playlist": "Generate M3U8 playlist for all files in all torrents",
"GET /stats": "Global session stats",
"POST /torrents/resolve_magnet": "Resolve a magnet to torrent file bytes",
"GET /torrents/{id_or_infohash}": "Torrent details",
"GET /torrents/{id_or_infohash}/haves": "The bitfield of have pieces",
"GET /torrents/{id_or_infohash}/playlist": "Generate M3U8 playlist for this torrent",
"GET /torrents/{id_or_infohash}/stats/v1": "Torrent stats",
"GET /torrents/{id_or_infohash}/peer_stats": "Per peer stats",
"GET /torrents/{id_or_infohash}/stream/{file_idx}": "Stream a file. Accepts Range header to seek.",
"POST /torrents/{id_or_infohash}/pause": "Pause torrent",
"POST /torrents/{id_or_infohash}/start": "Resume torrent",
"POST /torrents/{id_or_infohash}/forget": "Forget about the torrent, keep the files",
"POST /torrents/{id_or_infohash}/delete": "Forget about the torrent, remove the files",
"POST /torrents/{id_or_infohash}/update_only_files": "Change the selection of files to download. You need to POST json of the following form {\"only_files\": [0, 1, 2]}",
"POST /torrents": "Add a torrent here. magnet: or http:// or a local file.",
"POST /rust_log": "Set RUST_LOG to this post launch (for debugging)",
"GET /web/": "Web UI",
},
"server": "rqbit",
"version": env!("CARGO_PKG_VERSION"),
}))
});
(
[("Content-Type", "application/json")],
axum::Json(API_ROOT_JSON.clone()),
)
.into_response()
}
pub fn make_api_router(state: ApiState) -> Router {
let mut api_router = Router::new()
.route("/", get(h_api_root))
.route("/stream_logs", get(logging::h_stream_logs))
.route("/rust_log", post(logging::h_set_rust_log))
.route("/dht/stats", get(dht::h_dht_stats))
.route("/dht/table", get(dht::h_dht_table))
.route("/stats", get(torrents::h_session_stats))
.route("/torrents", get(torrents::h_torrents_list))
.route("/torrents/{id}", get(torrents::h_torrent_details))
.route("/torrents/{id}/haves", get(torrents::h_torrent_haves))
.route("/torrents/{id}/stats", get(torrents::h_torrent_stats_v0))
.route("/torrents/{id}/stats/v1", get(torrents::h_torrent_stats_v1))
.route("/torrents/{id}/peer_stats", get(torrents::h_peer_stats))
.route("/torrents/{id}/playlist", get(playlist::h_torrent_playlist))
.route("/torrents/playlist", get(playlist::h_global_playlist))
.route("/torrents/resolve_magnet", post(other::h_resolve_magnet))
.route(
"/torrents/{id}/stream/{file_id}",
get(streaming::h_torrent_stream_file),
)
.route(
"/torrents/{id}/stream/{file_id}/{*filename}",
get(streaming::h_torrent_stream_file),
);
if !state.opts.read_only {
api_router = api_router
.route("/torrents", post(torrents::h_torrents_post))
.route(
"/torrents/limits",
post(configure::h_update_session_ratelimits),
)
.route(
"/torrents/{id}/pause",
post(torrents::h_torrent_action_pause),
)
.route(
"/torrents/{id}/start",
post(torrents::h_torrent_action_start),
)
.route(
"/torrents/{id}/forget",
post(torrents::h_torrent_action_forget),
)
.route(
"/torrents/{id}/delete",
post(torrents::h_torrent_action_delete),
)
.route(
"/torrents/{id}/update_only_files",
post(torrents::h_torrent_action_update_only_files),
);
}
api_router.with_state(state)
}

View file

@ -0,0 +1,78 @@
use anyhow::Context;
use axum::{extract::State, response::IntoResponse};
use bencode::AsDisplay;
use buffers::ByteBuf;
use http::{HeaderMap, HeaderValue, StatusCode};
use super::ApiState;
use crate::{
api::Result, http_api::timeout::Timeout, AddTorrent, AddTorrentOptions, ApiError,
ListOnlyResponse,
};
pub async fn h_resolve_magnet(
State(state): State<ApiState>,
Timeout(timeout): Timeout<600_000, 3_600_000>,
inp_headers: HeaderMap,
url: String,
) -> Result<impl IntoResponse> {
let added = tokio::time::timeout(
timeout,
state.api.session().add_torrent(
AddTorrent::from_url(&url),
Some(AddTorrentOptions {
list_only: true,
..Default::default()
}),
),
)
.await
.context("timeout")??;
let (info, content) = match added {
crate::AddTorrentResponse::AlreadyManaged(_, handle) => {
handle.with_metadata(|r| (r.info.clone(), r.torrent_bytes.clone()))?
}
crate::AddTorrentResponse::ListOnly(ListOnlyResponse {
info,
torrent_bytes,
..
}) => (info, torrent_bytes),
crate::AddTorrentResponse::Added(_, _) => {
return Err(ApiError::new_from_text(
StatusCode::INTERNAL_SERVER_ERROR,
"bug: torrent was added to session, but shouldn't have been",
))
}
};
let mut headers = HeaderMap::new();
if inp_headers
.get("Accept")
.and_then(|v| std::str::from_utf8(v.as_bytes()).ok())
== Some("application/json")
{
let data = bencode::dyn_from_bytes::<AsDisplay<ByteBuf>>(&content)
.context("error decoding .torrent file content")?;
let data = serde_json::to_string(&data).context("error serializing")?;
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
return Ok((headers, data).into_response());
}
headers.insert(
"Content-Type",
HeaderValue::from_static("application/x-bittorrent"),
);
if let Some(name) = info.name.as_ref() {
if let Ok(name) = std::str::from_utf8(name) {
if let Ok(h) =
HeaderValue::from_str(&format!("attachment; filename=\"{}.torrent\"", name))
{
headers.insert("Content-Disposition", h);
}
}
}
Ok((headers, content).into_response())
}

View file

@ -0,0 +1,111 @@
use anyhow::Context;
use axum::{
extract::{Path, State},
response::IntoResponse,
};
use http::{HeaderMap, StatusCode};
use itertools::Itertools;
use super::ApiState;
use crate::{
api::{Result, TorrentIdOrHash},
ApiError, ManagedTorrent,
};
fn torrent_playlist_items(handle: &ManagedTorrent) -> Result<Vec<(usize, String)>> {
let mut playlist_items = handle
.metadata
.load()
.as_ref()
.context("torrent metadata not resolved")?
.info
.iter_file_details()?
.enumerate()
.filter_map(|(file_idx, file_details)| {
let filename = file_details.filename.to_vec().ok()?.join("/");
let is_playable = mime_guess::from_path(&filename)
.first()
.map(|mime| {
mime.type_() == mime_guess::mime::VIDEO
|| mime.type_() == mime_guess::mime::AUDIO
})
.unwrap_or(false);
if is_playable {
let filename = urlencoding::encode(&filename);
Some((file_idx, filename.into_owned()))
} else {
None
}
})
.collect::<Vec<_>>();
playlist_items.sort_by(|left, right| left.1.cmp(&right.1));
Ok(playlist_items)
}
fn get_host(headers: &HeaderMap) -> Result<&str> {
Ok(headers
.get("host")
.ok_or_else(|| ApiError::new_from_text(StatusCode::BAD_REQUEST, "Missing host header"))?
.to_str()
.context("hostname is not string")?)
}
fn build_playlist_content(
host: &str,
it: impl IntoIterator<Item = (TorrentIdOrHash, usize, String)>,
) -> impl IntoResponse {
let body = it
.into_iter()
.map(|(torrent_idx, file_idx, filename)| {
// TODO: add #EXTINF:{duration} and maybe codecs ?
format!("http://{host}/torrents/{torrent_idx}/stream/{file_idx}/{filename}")
})
.join("\r\n");
(
[
("Content-Type", "application/mpegurl; charset=utf-8"),
(
"Content-Disposition",
"attachment; filename=\"rqbit-playlist.m3u8\"",
),
],
format!("#EXTM3U\r\n{body}"), // https://en.wikipedia.org/wiki/M3U
)
}
pub async fn h_torrent_playlist(
State(state): State<ApiState>,
headers: HeaderMap,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
let host = get_host(&headers)?;
let playlist_items = torrent_playlist_items(&*state.api.mgr_handle(idx)?)?;
Ok(build_playlist_content(
host,
playlist_items
.into_iter()
.map(move |(file_idx, filename)| (idx, file_idx, filename)),
))
}
pub async fn h_global_playlist(
State(state): State<ApiState>,
headers: HeaderMap,
) -> Result<impl IntoResponse> {
let host = get_host(&headers)?;
let all_items = state.api.session().with_torrents(|torrents| {
torrents
.filter_map(|(torrent_idx, handle)| {
torrent_playlist_items(handle)
.map(move |items| {
items.into_iter().map(move |(file_idx, filename)| {
(torrent_idx.into(), file_idx, filename)
})
})
.ok()
})
.flatten()
.collect::<Vec<_>>()
});
Ok(build_playlist_content(host, all_items))
}

View file

@ -0,0 +1,106 @@
use std::{io::SeekFrom, sync::Arc};
use anyhow::Context;
use axum::{
extract::{Path, State},
response::IntoResponse,
};
use http::{HeaderMap, HeaderValue, StatusCode};
use serde::Deserialize;
use tokio::io::AsyncSeekExt;
use tracing::trace;
use super::ApiState;
use crate::api::{Result, TorrentIdOrHash};
#[derive(Deserialize)]
pub struct StreamPathParams {
id: TorrentIdOrHash,
file_id: usize,
#[serde(rename = "filename")]
_filename: Option<Arc<str>>,
}
pub async fn h_torrent_stream_file(
State(state): State<ApiState>,
Path(StreamPathParams { id, file_id, .. }): Path<StreamPathParams>,
headers: http::HeaderMap,
) -> Result<impl IntoResponse> {
let mut stream = state.api.api_stream(id, file_id)?;
let mut status = StatusCode::OK;
let mut output_headers = HeaderMap::new();
output_headers.insert("Accept-Ranges", HeaderValue::from_static("bytes"));
const DLNA_TRANSFER_MODE: &str = "transferMode.dlna.org";
const DLNA_GET_CONTENT_FEATURES: &str = "getcontentFeatures.dlna.org";
const DLNA_CONTENT_FEATURES: &str = "contentFeatures.dlna.org";
if headers
.get(DLNA_TRANSFER_MODE)
.map(|v| matches!(v.as_bytes(), b"Streaming" | b"streaming"))
.unwrap_or(false)
{
output_headers.insert(DLNA_TRANSFER_MODE, HeaderValue::from_static("Streaming"));
}
if headers
.get(DLNA_GET_CONTENT_FEATURES)
.map(|v| v.as_bytes() == b"1")
.unwrap_or(false)
{
output_headers.insert(
DLNA_CONTENT_FEATURES,
HeaderValue::from_static("DLNA.ORG_OP=01"),
);
}
if let Ok(mime) = state.api.torrent_file_mime_type(id, file_id) {
output_headers.insert(
http::header::CONTENT_TYPE,
HeaderValue::from_str(mime).context("bug - invalid MIME")?,
);
}
let range_header = headers.get(http::header::RANGE);
trace!(torrent_id=%id, file_id=file_id, range=?range_header, "request for HTTP stream");
if let Some(range) = range_header {
let offset: Option<u64> = range
.to_str()
.ok()
.and_then(|s| s.strip_prefix("bytes="))
.and_then(|s| s.strip_suffix('-'))
.and_then(|s| s.parse().ok());
if let Some(offset) = offset {
status = StatusCode::PARTIAL_CONTENT;
stream
.seek(SeekFrom::Start(offset))
.await
.context("error seeking")?;
output_headers.insert(
http::header::CONTENT_LENGTH,
HeaderValue::from_str(&format!("{}", stream.len() - stream.position()))
.context("bug")?,
);
output_headers.insert(
http::header::CONTENT_RANGE,
HeaderValue::from_str(&format!(
"bytes {}-{}/{}",
stream.position(),
stream.len().saturating_sub(1),
stream.len()
))
.context("bug")?,
);
}
} else {
output_headers.insert(
http::header::CONTENT_LENGTH,
HeaderValue::from_str(&format!("{}", stream.len())).context("bug")?,
);
}
let s = tokio_util::io::ReaderStream::with_capacity(stream, 65536);
Ok((status, (output_headers, axum::body::Body::from_stream(s))))
}

View file

@ -0,0 +1,168 @@
use anyhow::Context;
use axum::{
extract::{Path, Query, State},
response::IntoResponse,
};
use bytes::Bytes;
use librqbit_core::magnet::Magnet;
use serde::Deserialize;
use super::ApiState;
use crate::{
api::{ApiTorrentListOpts, Result, TorrentIdOrHash},
http_api::timeout::Timeout,
http_api_types::TorrentAddQueryParams,
torrent_state::peer::stats::snapshot::PeerStatsFilter,
AddTorrent, SUPPORTED_SCHEMES,
};
pub async fn h_torrents_list(
State(state): State<ApiState>,
Query(opts): Query<ApiTorrentListOpts>,
) -> impl IntoResponse {
axum::Json(state.api.api_torrent_list_ext(opts))
}
pub async fn h_torrents_post(
State(state): State<ApiState>,
Query(params): Query<TorrentAddQueryParams>,
Timeout(timeout): Timeout<600_000, 3_600_000>,
data: Bytes,
) -> Result<impl IntoResponse> {
let is_url = params.is_url;
let opts = params.into_add_torrent_options();
let data = data.to_vec();
let maybe_magnet = |data: &[u8]| -> bool {
std::str::from_utf8(data)
.ok()
.and_then(|s| Magnet::parse(s).ok())
.is_some()
};
let add = match is_url {
Some(true) => AddTorrent::Url(
String::from_utf8(data)
.context("invalid utf-8 for passed URL")?
.into(),
),
Some(false) => AddTorrent::TorrentFileBytes(data.into()),
// Guess the format.
None if SUPPORTED_SCHEMES
.iter()
.any(|s| data.starts_with(s.as_bytes()))
|| maybe_magnet(&data) =>
{
AddTorrent::Url(
String::from_utf8(data)
.context("invalid utf-8 for passed URL")?
.into(),
)
}
_ => AddTorrent::TorrentFileBytes(data.into()),
};
tokio::time::timeout(timeout, state.api.api_add_torrent(add, Some(opts)))
.await
.context("timeout")?
.map(axum::Json)
}
pub async fn h_torrent_details(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api.api_torrent_details(idx).map(axum::Json)
}
pub async fn h_torrent_haves(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api.api_dump_haves(idx)
}
pub async fn h_torrent_stats_v0(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api.api_stats_v0(idx).map(axum::Json)
}
pub async fn h_torrent_stats_v1(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api.api_stats_v1(idx).map(axum::Json)
}
pub async fn h_peer_stats(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
Query(filter): Query<PeerStatsFilter>,
) -> Result<impl IntoResponse> {
state.api.api_peer_stats(idx, filter).map(axum::Json)
}
pub async fn h_torrent_action_pause(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state
.api
.api_torrent_action_pause(idx)
.await
.map(axum::Json)
}
pub async fn h_torrent_action_start(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state
.api
.api_torrent_action_start(idx)
.await
.map(axum::Json)
}
pub async fn h_torrent_action_forget(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state
.api
.api_torrent_action_forget(idx)
.await
.map(axum::Json)
}
pub async fn h_torrent_action_delete(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state
.api
.api_torrent_action_delete(idx)
.await
.map(axum::Json)
}
#[derive(Deserialize)]
pub struct UpdateOnlyFilesRequest {
only_files: Vec<usize>,
}
pub async fn h_torrent_action_update_only_files(
State(state): State<ApiState>,
Path(idx): Path<TorrentIdOrHash>,
axum::Json(req): axum::Json<UpdateOnlyFilesRequest>,
) -> Result<impl IntoResponse> {
state
.api
.api_torrent_action_update_only_files(idx, &req.only_files.into_iter().collect())
.await
.map(axum::Json)
}
pub async fn h_session_stats(State(state): State<ApiState>) -> impl IntoResponse {
axum::Json(state.api.api_session_stats())
}

View file

@ -0,0 +1,190 @@
use anyhow::Context;
use axum::extract::{ConnectInfo, Request};
use axum::middleware::Next;
use axum::response::IntoResponse;
use axum::routing::get;
use base64::Engine;
use futures::future::BoxFuture;
use futures::FutureExt;
use http::{HeaderMap, StatusCode};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tower_http::trace::{DefaultOnFailure, DefaultOnResponse, OnFailure};
use tracing::{debug, error_span, info, Span};
use axum::Router;
use crate::api::Api;
use crate::api::Result;
use crate::ApiError;
mod handlers;
mod timeout;
#[cfg(feature = "webui")]
mod webui;
/// An HTTP server for the API.
pub struct HttpApi {
api: Api,
opts: HttpApiOptions,
}
#[derive(Debug, Default)]
pub struct HttpApiOptions {
pub read_only: bool,
pub basic_auth: Option<(String, String)>,
}
async fn simple_basic_auth(
expected_username: Option<&str>,
expected_password: Option<&str>,
headers: HeaderMap,
request: axum::extract::Request,
next: Next,
) -> Result<axum::response::Response> {
let (expected_user, expected_pass) = match (expected_username, expected_password) {
(Some(u), Some(p)) => (u, p),
_ => return Ok(next.run(request).await),
};
let user_pass = headers
.get("Authorization")
.and_then(|h| h.to_str().ok())
.and_then(|h| h.strip_prefix("Basic "))
.and_then(|v| base64::engine::general_purpose::STANDARD.decode(v).ok())
.and_then(|v| String::from_utf8(v).ok());
let user_pass = match user_pass {
Some(user_pass) => user_pass,
None => {
return Ok((
StatusCode::UNAUTHORIZED,
[("WWW-Authenticate", "Basic realm=\"API\"")],
)
.into_response())
}
};
// TODO: constant time compare
match user_pass.split_once(':') {
Some((u, p)) if u == expected_user && p == expected_pass => Ok(next.run(request).await),
_ => Err(ApiError::unathorized()),
}
}
impl HttpApi {
pub fn new(api: Api, opts: Option<HttpApiOptions>) -> Self {
Self {
api,
opts: opts.unwrap_or_default(),
}
}
/// Run the HTTP server forever on the given address.
/// If read_only is passed, no state-modifying methods will be exposed.
#[inline(never)]
pub fn make_http_api_and_run(
self,
listener: TcpListener,
upnp_router: Option<Router>,
) -> BoxFuture<'static, anyhow::Result<()>> {
let state = Arc::new(self);
let mut main_router = handlers::make_api_router(state.clone());
#[cfg(feature = "webui")]
{
use axum::response::Redirect;
let webui_router = webui::make_webui_router();
main_router = main_router.nest("/web/", webui_router);
main_router = main_router.route("/web", get(|| async { Redirect::permanent("./web/") }))
}
let cors_layer = {
use tower_http::cors::{AllowHeaders, AllowOrigin};
const ALLOWED_ORIGINS: [&[u8]; 4] = [
// Webui-dev
b"http://localhost:3031",
b"http://127.0.0.1:3031",
// Tauri dev
b"http://localhost:1420",
// Tauri prod
b"tauri://localhost",
];
let allow_regex = std::env::var("CORS_ALLOW_REGEXP")
.ok()
.and_then(|value| regex::bytes::Regex::new(&value).ok());
tower_http::cors::CorsLayer::default()
.allow_origin(AllowOrigin::predicate(move |v, _| {
ALLOWED_ORIGINS.contains(&v.as_bytes())
|| allow_regex
.as_ref()
.map(move |r| r.is_match(v.as_bytes()))
.unwrap_or(false)
}))
.allow_headers(AllowHeaders::any())
};
// Simple one-user basic auth
if let Some((user, pass)) = state.opts.basic_auth.clone() {
info!("Enabling simple basic authentication in HTTP API");
main_router = main_router.route_layer(axum::middleware::from_fn(
move |headers, request, next| {
let user = user.clone();
let pass = pass.clone();
async move {
simple_basic_auth(Some(&user), Some(&pass), headers, request, next).await
}
},
));
}
if let Some(upnp_router) = upnp_router {
main_router = main_router.nest("/upnp", upnp_router);
}
let app = main_router
.layer(cors_layer)
.layer(
tower_http::trace::TraceLayer::new_for_http()
.make_span_with(|req: &Request| {
let method = req.method();
let uri = req.uri();
if let Some(ConnectInfo(addr)) =
req.extensions().get::<ConnectInfo<SocketAddr>>()
{
let addr = SocketAddr::new(addr.ip().to_canonical(), addr.port());
error_span!("request", %method, %uri, %addr)
} else {
error_span!("request", %method, %uri)
}
})
.on_request(|req: &Request, _: &Span| {
if req.uri().path().starts_with("/upnp") {
debug!(headers=?req.headers())
}
})
.on_response(DefaultOnResponse::new().include_headers(true))
.on_failure({
let mut default = DefaultOnFailure::new();
move |failure_class, latency, span: &Span| match failure_class {
tower_http::classify::ServerErrorsFailureClass::StatusCode(
StatusCode::NOT_IMPLEMENTED,
) => {}
_ => default.on_failure(failure_class, latency, span),
}
}),
)
.into_make_service_with_connect_info::<SocketAddr>();
async move {
axum::serve(listener, app)
.await
.context("error running HTTP API")
}
.boxed()
}
}

View file

@ -0,0 +1,49 @@
use std::time::Duration;
use anyhow::Context;
use axum::{extract::Query, RequestPartsExt};
use http::request::Parts;
use serde::Deserialize;
use crate::ApiError;
pub struct Timeout<const DEFAULT_MS: usize, const MAX_MS: usize>(pub Duration);
impl<S, const DEFAULT_MS: usize, const MAX_MS: usize> axum::extract::FromRequestParts<S>
for Timeout<DEFAULT_MS, MAX_MS>
where
S: Send + Sync,
{
type Rejection = ApiError;
/// Perform the extraction.
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
#[derive(Deserialize)]
struct QueryT {
timeout_ms: Option<usize>,
}
let q = parts
.extract::<Query<QueryT>>()
.await
.context("error running Timeout extractor")?;
let timeout_ms = q
.timeout_ms
.map(Ok)
.or_else(|| {
parts
.headers
.get("x-req-timeout-ms")
.map(|v| {
std::str::from_utf8(v.as_bytes()).context("invalid utf-8 in timeout value")
})
.map(|v| v.and_then(|v| v.parse::<usize>().context("invalid timeout integer")))
})
.transpose()
.context("error parsing timeout")?
.unwrap_or(DEFAULT_MS);
let timeout_ms = timeout_ms.min(MAX_MS);
Ok(Timeout(Duration::from_millis(timeout_ms as u64)))
}
}

View file

@ -0,0 +1,41 @@
use axum::{routing::get, Router};
pub fn make_webui_router() -> Router {
Router::new()
.route(
"/",
get(|| async {
(
[("Content-Type", "text/html")],
include_str!("../../webui/dist/index.html"),
)
}),
)
.route(
"/assets/index.js",
get(|| async {
(
[("Content-Type", "application/javascript")],
include_str!("../../webui/dist/assets/index.js"),
)
}),
)
.route(
"/assets/index.css",
get(|| async {
(
[("Content-Type", "text/css")],
include_str!("../../webui/dist/assets/index.css"),
)
}),
)
.route(
"/assets/logo.svg",
get(|| async {
(
[("Content-Type", "image/svg+xml")],
include_str!("../../webui/dist/assets/logo.svg"),
)
}),
)
}

View file

@ -4,7 +4,7 @@ use serde::Deserialize;
use crate::{
api::ApiAddTorrentResponse,
http_api::{InitialPeers, TorrentAddQueryParams},
http_api_types::{InitialPeers, TorrentAddQueryParams},
session::{AddTorrent, AddTorrentOptions},
};

View file

@ -0,0 +1,111 @@
use std::{net::SocketAddr, str::FromStr, time::Duration};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use crate::{AddTorrentOptions, PeerConnectionOptions};
pub struct OnlyFiles(Vec<usize>);
pub struct InitialPeers(pub Vec<SocketAddr>);
#[derive(Serialize, Deserialize, Default)]
pub struct TorrentAddQueryParams {
pub overwrite: Option<bool>,
pub output_folder: Option<String>,
pub sub_folder: Option<String>,
pub only_files_regex: Option<String>,
pub only_files: Option<OnlyFiles>,
pub peer_connect_timeout: Option<u64>,
pub peer_read_write_timeout: Option<u64>,
pub initial_peers: Option<InitialPeers>,
// Will force interpreting the content as a URL.
pub is_url: Option<bool>,
pub list_only: Option<bool>,
}
impl Serialize for OnlyFiles {
fn serialize<S>(&self, serializer: S) -> core::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = self.0.iter().map(|id| id.to_string()).join(",");
s.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for OnlyFiles {
fn deserialize<D>(deserializer: D) -> core::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let s = String::deserialize(deserializer)?;
let list = s
.split(',')
.try_fold(Vec::<usize>::new(), |mut acc, c| match c.parse() {
Ok(i) => {
acc.push(i);
Ok(acc)
}
Err(_) => Err(D::Error::custom(format!(
"only_files: failed to parse {:?} as integer",
c
))),
})?;
if list.is_empty() {
return Err(D::Error::custom(
"only_files: should contain at least one file id",
));
}
Ok(OnlyFiles(list))
}
}
impl<'de> Deserialize<'de> for InitialPeers {
fn deserialize<D>(deserializer: D) -> std::prelude::v1::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let string = String::deserialize(deserializer)?;
let mut addrs = Vec::new();
for addr_str in string.split(',').filter(|s| !s.is_empty()) {
addrs.push(SocketAddr::from_str(addr_str).map_err(D::Error::custom)?);
}
Ok(InitialPeers(addrs))
}
}
impl Serialize for InitialPeers {
fn serialize<S>(&self, serializer: S) -> std::prelude::v1::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.0
.iter()
.map(|s| s.to_string())
.join(",")
.serialize(serializer)
}
}
impl TorrentAddQueryParams {
pub fn into_add_torrent_options(self) -> AddTorrentOptions {
AddTorrentOptions {
overwrite: self.overwrite.unwrap_or(false),
only_files_regex: self.only_files_regex,
only_files: self.only_files.map(|o| o.0),
output_folder: self.output_folder,
sub_folder: self.sub_folder,
list_only: self.list_only.unwrap_or(false),
initial_peers: self.initial_peers.map(|i| i.0),
peer_opts: Some(PeerConnectionOptions {
connect_timeout: self.peer_connect_timeout.map(Duration::from_secs),
read_write_timeout: self.peer_read_write_timeout.map(Duration::from_secs),
..Default::default()
}),
..Default::default()
}
}
}

View file

@ -49,8 +49,10 @@ pub mod file_info;
mod file_ops;
#[cfg(feature = "http-api")]
pub mod http_api;
#[cfg(feature = "http-api")]
#[cfg(feature = "http-api-client")]
pub mod http_api_client;
#[cfg(any(feature = "http-api", feature = "http-api-client"))]
pub mod http_api_types;
pub mod limits;
mod merge_streams;
mod peer_connection;

View file

@ -26,6 +26,7 @@ disable-upload = ["librqbit/disable-upload"]
[dependencies]
librqbit = { version = "8.0.0", path = "../librqbit", default-features = false, features = [
"http-api",
"http-api-client",
"tracing-subscriber-utils",
"upnp-serve-adapter",
"watch",