diff --git a/Makefile b/Makefile index 9277346..79501fb 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ webui-dev: webui-deps export RQBIT_UPNP_SERVER_ENABLE ?= true export RQBIT_UPNP_SERVER_FRIENDLY_NAME ?= rqbit-dev export RQBIT_HTTP_API_LISTEN_ADDR ?= 0.0.0.0:3030 -CARGO_RUN_FLAGS ?= "" +CARGO_RUN_FLAGS ?= RQBIT_OUTPUT_FOLDER ?= /tmp/scratch RQBIT_POSTGRES_CONNECTION_STRING ?= postgres:///rqbit diff --git a/crates/librqbit/src/upnp_server_adapter.rs b/crates/librqbit/src/upnp_server_adapter.rs index 2000e47..e6878b7 100644 --- a/crates/librqbit/src/upnp_server_adapter.rs +++ b/crates/librqbit/src/upnp_server_adapter.rs @@ -20,8 +20,8 @@ use itertools::Itertools; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use tracing::{debug, trace, warn}; use upnp_serve::{ - upnp_types::content_directory::{ - response::{Container, Item, ItemOrContainer}, + services::content_directory::{ + browse::response::{Container, Item, ItemOrContainer}, ContentDirectoryBrowseProvider, }, UpnpServer, UpnpServerOptions, @@ -346,8 +346,8 @@ mod tests { TorrentMetaV1File, TorrentMetaV1Info, TorrentMetaV1Owned, }; use tempfile::TempDir; - use upnp_serve::upnp_types::content_directory::{ - response::{Container, Item, ItemOrContainer}, + use upnp_serve::services::content_directory::{ + browse::response::{Container, Item, ItemOrContainer}, ContentDirectoryBrowseProvider, }; diff --git a/crates/upnp-serve/examples/upnp-stub-server.rs b/crates/upnp-serve/examples/upnp-stub-server.rs index 4fb2f77..efb768b 100644 --- a/crates/upnp-serve/examples/upnp-stub-server.rs +++ b/crates/upnp-serve/examples/upnp-stub-server.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Context; use axum::routing::get; use librqbit_upnp_serve::{ - upnp_types::content_directory::response::{Item, ItemOrContainer}, + services::content_directory::browse::response::{Item, ItemOrContainer}, UpnpServer, UpnpServerOptions, }; use mime_guess::Mime; diff --git a/crates/upnp-serve/src/http_handlers.rs b/crates/upnp-serve/src/http_handlers.rs deleted file mode 100644 index 59c8851..0000000 --- a/crates/upnp-serve/src/http_handlers.rs +++ /dev/null @@ -1,239 +0,0 @@ -use std::{sync::atomic::Ordering, time::Duration}; - -use anyhow::Context; -use axum::{ - body::Bytes, - extract::State, - handler::HandlerWithoutStateExt, - response::IntoResponse, - routing::{get, post}, -}; -use bstr::BStr; -use http::{header::CONTENT_TYPE, HeaderMap, HeaderName, StatusCode}; -use tokio_util::sync::CancellationToken; -use tracing::{debug, trace, warn}; - -use crate::{ - constants::{ - CONTENT_TYPE_XML_UTF8, SOAP_ACTION_CONTENT_DIRECTORY_BROWSE, - SOAP_ACTION_GET_SYSTEM_UPDATE_ID, - }, - state::{UnpnServerState, UpnpServerStateInner}, - templates::{ - render_content_directory_browse, render_content_directory_control_get_system_update_id, - render_root_description_xml, RootDescriptionInputs, - }, - upnp_types::content_directory::{ - request::ContentDirectoryControlRequest, ContentDirectoryBrowseProvider, - }, -}; - -async fn description_xml(State(state): State) -> impl IntoResponse { - ( - [(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)], - state.rendered_root_description.clone(), - ) -} - -async fn generate_content_directory_control_response( - headers: HeaderMap, - State(state): State, - body: Bytes, -) -> impl IntoResponse { - let body = BStr::new(&body); - let action = headers.get("soapaction").map(|v| BStr::new(v.as_bytes())); - trace!(?body, ?action, "received control request"); - let action = match action { - Some(action) => action, - None => { - debug!("missing SOAPACTION header"); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - match action.as_ref() { - SOAP_ACTION_CONTENT_DIRECTORY_BROWSE => { - let http_hostname = headers - .get("host") - .and_then(|h| std::str::from_utf8(h.as_bytes()).ok()) - .and_then(|h| h.split(':').next()); - let http_hostname = match http_hostname { - Some(h) => h, - None => return StatusCode::BAD_REQUEST.into_response(), - }; - - let body = match std::str::from_utf8(body) { - Ok(body) => body, - Err(_) => return (StatusCode::BAD_REQUEST, "cannot parse request").into_response(), - }; - - let request = match ContentDirectoryControlRequest::parse(body) { - Ok(req) => req, - Err(e) => { - debug!(error=?e, "error parsing XML"); - return (StatusCode::BAD_REQUEST, "cannot parse request").into_response(); - } - }; - - use crate::upnp_types::content_directory::request::BrowseFlag; - - match request.browse_flag { - BrowseFlag::BrowseDirectChildren => ( - [(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)], - render_content_directory_browse( - state - .provider - .browse_direct_children(request.object_id, http_hostname), - ), - ) - .into_response(), - BrowseFlag::BrowseMetadata => StatusCode::NOT_IMPLEMENTED.into_response(), - } - } - SOAP_ACTION_GET_SYSTEM_UPDATE_ID => { - let update_id = state.system_update_id.load(Ordering::Relaxed); - ( - [(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)], - render_content_directory_control_get_system_update_id(update_id), - ) - .into_response() - } - _ => { - debug!(?action, "unsupported ContentDirectory action"); - (StatusCode::NOT_IMPLEMENTED, "").into_response() - } - } -} - -async fn subscription( - State(state): State, - request: axum::extract::Request, -) -> impl IntoResponse { - if request.method().as_str() != "SUBSCRIBE" { - return (StatusCode::METHOD_NOT_ALLOWED, "").into_response(); - } - - let (parts, _body) = request.into_parts(); - let is_event = parts - .headers - .get(HeaderName::from_static("nt")) - .map(|v| v.as_bytes() == b"upnp:event") - .unwrap_or_default(); - if !is_event { - return (StatusCode::BAD_REQUEST, "expected NT: upnp:event header").into_response(); - } - - let callback = parts - .headers - .get(HeaderName::from_static("callback")) - .and_then(|v| v.to_str().ok()) - .map(|s| s.trim_matches(|c| c == '>' || c == '<')) - .and_then(|u| url::Url::parse(u).ok()); - let callback = match callback { - Some(c) => c, - None => return (StatusCode::BAD_REQUEST, "callback not provided").into_response(), - }; - let subscription_id = parts - .headers - .get(HeaderName::from_static("sid")) - .and_then(|v| v.to_str().ok()); - - let timeout = parts - .headers - .get(HeaderName::from_static("timeout")) - .and_then(|v| v.to_str().ok()) - .and_then(|t| t.strip_prefix("Second-")) - .and_then(|t| t.parse::().ok()) - .map(|t| Duration::from_secs(t as u64)); - - const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800); - - let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT); - - if let Some(sid) = subscription_id { - match state.renew_subscription(sid, timeout) { - Ok(()) => ( - StatusCode::OK, - [ - ("SID", sid.to_owned()), - ("TIMEOUT", format!("Second-{}", timeout.as_secs())), - ], - ) - .into_response(), - Err(e) => { - warn!(sid, error=?e, "error renewing subscription"); - StatusCode::NOT_FOUND.into_response() - } - } - } else { - match state.new_subscription(callback, timeout) { - Ok(sid) => ( - StatusCode::OK, - [ - ("SID", sid), - ("TIMEOUT", format!("Second-{}", timeout.as_secs())), - ], - ) - .into_response(), - Err(e) => { - warn!(error=?e, "error creating subscription"); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } - } -} - -pub fn make_router( - friendly_name: String, - http_prefix: String, - upnp_usn: String, - browse_provider: Box, - cancellation_token: CancellationToken, -) -> anyhow::Result { - let root_desc = render_root_description_xml(&RootDescriptionInputs { - friendly_name: &friendly_name, - manufacturer: "rqbit developers", - model_name: "1.0.0", - unique_id: &upnp_usn, - http_prefix: &http_prefix, - }); - - let state = UpnpServerStateInner::new(root_desc.into(), browse_provider, cancellation_token) - .context("error creating UPNP server")?; - - let content_dir_sub_handler = { - let state = state.clone(); - move |request: axum::extract::Request| async move { - subscription(State(state.clone()), request).await - } - }; - - let app = axum::Router::new() - .route("/description.xml", get(description_xml)) - .route( - "/scpd/ContentDirectory.xml", - get(|| async { include_str!("resources/templates/content_directory/scpd.xml") }), - ) - .route( - "/scpd/ConnectionManager.xml", - get(|| async { include_str!("resources/templates/connection_manager/scpd.xml") }), - ) - .route( - "/control/ContentDirectory", - post(generate_content_directory_control_response), - ) - .route( - "/control/ConnectionManager", - post(|| async { (StatusCode::NOT_IMPLEMENTED, "") }), - ) - .route_service( - "/subscribe/ContentDirectory", - content_dir_sub_handler.into_service(), - ) - .route( - "/subscribe/ConnectionManager", - post(|| async { (StatusCode::NOT_IMPLEMENTED, "") }), - ) - .with_state(state); - - Ok(app) -} diff --git a/crates/upnp-serve/src/http_server.rs b/crates/upnp-serve/src/http_server.rs new file mode 100644 index 0000000..b2c3ed4 --- /dev/null +++ b/crates/upnp-serve/src/http_server.rs @@ -0,0 +1,112 @@ +use anyhow::Context; +use axum::{ + extract::State, + handler::HandlerWithoutStateExt, + response::IntoResponse, + routing::{get, post}, +}; +use http::header::CONTENT_TYPE; +use tokio_util::sync::CancellationToken; + +use crate::{ + constants::CONTENT_TYPE_XML_UTF8, + services::content_directory::ContentDirectoryBrowseProvider, + state::{UnpnServerState, UpnpServerStateInner}, +}; + +async fn description_xml(State(state): State) -> impl IntoResponse { + ( + [(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)], + state.rendered_root_description.clone(), + ) +} + +pub struct RootDescriptionInputs<'a> { + pub friendly_name: &'a str, + pub manufacturer: &'a str, + pub model_name: &'a str, + pub unique_id: &'a str, + pub http_prefix: &'a str, +} + +pub fn render_root_description_xml(input: &RootDescriptionInputs<'_>) -> String { + format!( + include_str!("resources/templates/root_desc.tmpl.xml"), + friendly_name = input.friendly_name, + manufacturer = input.manufacturer, + model_name = input.model_name, + unique_id = input.unique_id, + http_prefix = input.http_prefix + ) +} + +pub fn make_router( + friendly_name: String, + http_prefix: String, + upnp_usn: String, + browse_provider: Box, + cancellation_token: CancellationToken, +) -> anyhow::Result { + let root_desc = render_root_description_xml(&RootDescriptionInputs { + friendly_name: &friendly_name, + manufacturer: "rqbit developers", + model_name: "1.0.0", + unique_id: &upnp_usn, + http_prefix: &http_prefix, + }); + + let state = UpnpServerStateInner::new(root_desc.into(), browse_provider, cancellation_token) + .context("error creating UPNP server")?; + + let content_dir_sub_handler = { + let state = state.clone(); + move |request: axum::extract::Request| async move { + crate::services::content_directory::subscription::subscribe_http_handler( + State(state.clone()), + request, + ) + .await + } + }; + + let connection_manager_sub_handler = { + let state = state.clone(); + move |request: axum::extract::Request| async move { + crate::services::connection_manager::subscribe_http_handler( + State(state.clone()), + request, + ) + .await + } + }; + + let app = axum::Router::new() + .route("/description.xml", get(description_xml)) + .route( + "/scpd/ContentDirectory.xml", + get(|| async { include_str!("resources/templates/content_directory/scpd.xml") }), + ) + .route( + "/scpd/ConnectionManager.xml", + get(|| async { include_str!("resources/templates/connection_manager/scpd.xml") }), + ) + .route( + "/control/ContentDirectory", + post(crate::services::content_directory::http_handler), + ) + .route( + "/control/ConnectionManager", + post(crate::services::connection_manager::http_handler), + ) + .route_service( + "/subscribe/ContentDirectory", + content_dir_sub_handler.into_service(), + ) + .route_service( + "/subscribe/ConnectionManager", + connection_manager_sub_handler.into_service(), + ) + .with_state(state); + + Ok(app) +} diff --git a/crates/upnp-serve/src/lib.rs b/crates/upnp-serve/src/lib.rs index 00aaa1a..69f68af 100644 --- a/crates/upnp-serve/src/lib.rs +++ b/crates/upnp-serve/src/lib.rs @@ -2,21 +2,20 @@ use std::{io::Write, time::Duration}; use anyhow::Context; use gethostname::gethostname; -use http_handlers::make_router; use librqbit_sha1_wrapper::ISha1; +use services::content_directory::ContentDirectoryBrowseProvider; use ssdp::SsdpRunner; use tokio_util::sync::CancellationToken; use tracing::{debug, info}; -use upnp_types::content_directory::ContentDirectoryBrowseProvider; mod constants; -mod http_handlers; +mod http_server; +pub mod services; mod ssdp; pub mod state; mod subscriptions; mod templates; -pub mod upnp_types; pub struct UpnpServerOptions { pub friendly_name: String, @@ -78,7 +77,7 @@ impl UpnpServer { .await .context("error initializing SsdpRunner")?; - let router = make_router( + let router = crate::http_server::make_router( opts.friendly_name, opts.http_prefix, usn, diff --git a/crates/upnp-serve/src/resources/templates/connection_manager/control/get_current_connection_ids.xml b/crates/upnp-serve/src/resources/templates/connection_manager/control/get_current_connection_ids.xml new file mode 100644 index 0000000..799061f --- /dev/null +++ b/crates/upnp-serve/src/resources/templates/connection_manager/control/get_current_connection_ids.xml @@ -0,0 +1,10 @@ + + + + + + + + diff --git a/crates/upnp-serve/src/resources/templates/connection_manager/control/get_current_connection_info.xml b/crates/upnp-serve/src/resources/templates/connection_manager/control/get_current_connection_info.xml new file mode 100644 index 0000000..95feadb --- /dev/null +++ b/crates/upnp-serve/src/resources/templates/connection_manager/control/get_current_connection_info.xml @@ -0,0 +1,16 @@ + + + + + 0 + 0 + http-get:*:*:DLNA.ORG_OP=01 + + -1 + Output + OK + + + diff --git a/crates/upnp-serve/src/resources/templates/connection_manager/control/get_protocol_info.xml b/crates/upnp-serve/src/resources/templates/connection_manager/control/get_protocol_info.xml new file mode 100644 index 0000000..3845b72 --- /dev/null +++ b/crates/upnp-serve/src/resources/templates/connection_manager/control/get_protocol_info.xml @@ -0,0 +1,11 @@ + + + + + http-get:*:*:DLNA.ORG_OP=01 + + + + diff --git a/crates/upnp-serve/src/resources/templates/connection_manager/scpd.xml b/crates/upnp-serve/src/resources/templates/connection_manager/scpd.xml index 5304264..9e4cf95 100644 --- a/crates/upnp-serve/src/resources/templates/connection_manager/scpd.xml +++ b/crates/upnp-serve/src/resources/templates/connection_manager/scpd.xml @@ -20,7 +20,7 @@ - + + GetCurrentConnectionIDs diff --git a/crates/upnp-serve/src/services/connection_manager.rs b/crates/upnp-serve/src/services/connection_manager.rs new file mode 100644 index 0000000..4422156 --- /dev/null +++ b/crates/upnp-serve/src/services/connection_manager.rs @@ -0,0 +1,81 @@ +use axum::{body::Bytes, extract::State, response::IntoResponse}; +use bstr::BStr; +use http::{header::CONTENT_TYPE, HeaderMap, StatusCode}; +use tracing::{debug, trace}; + +use crate::{ + constants::CONTENT_TYPE_XML_UTF8, state::UnpnServerState, subscriptions::SubscribeRequest, +}; + +pub const SOAP_ACTION_GET_PROTOCOL_INFO: &[u8] = + b"\"urn:schemas-upnp-org:service:ConnectionManager:1#GetProtocolInfo\""; + +pub const SOAP_ACTION_CONNECTION_COMPLETE: &[u8] = + b"\"urn:schemas-upnp-org:service:ConnectionManager:1#ConnectionComplete\""; + +pub const SOAP_ACTION_GET_CURRENT_CONNECTION_IDS: &[u8] = + b"\"urn:schemas-upnp-org:service:ConnectionManager:1#GetCurrentConnectionIDs\""; + +pub const SOAP_ACTION_GET_CURRENT_CONNECTION_INFO: &[u8] = + b"\"urn:schemas-upnp-org:service:ConnectionManager:1#GetCurrentConnectionInfo\""; + +pub const SOAP_ACTION_PREPARE_FOR_CONNECTION: &[u8] = + b"\"urn:schemas-upnp-org:service:ConnectionManager:1#PrepareForConnection\""; + +pub(crate) async fn http_handler( + headers: HeaderMap, + State(_state): State, + body: Bytes, +) -> impl IntoResponse { + let body = BStr::new(&body); + let action = headers.get("soapaction").map(|v| BStr::new(v.as_bytes())); + trace!(?body, ?action, "received control request"); + let action = match action { + Some(action) => action, + None => { + debug!("missing SOAPACTION header"); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let not_implemented = || StatusCode::NOT_IMPLEMENTED.into_response(); + + match action.as_ref() { + SOAP_ACTION_GET_PROTOCOL_INFO => ( + [(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)], + include_str!("../resources/templates/connection_manager/control/get_protocol_info.xml"), + ) + .into_response(), + + SOAP_ACTION_GET_CURRENT_CONNECTION_INFO => ( + [(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)], + include_str!( + "../resources/templates/connection_manager/control/get_current_connection_info.xml" + ), + ) + .into_response(), + SOAP_ACTION_GET_CURRENT_CONNECTION_IDS => ( + [(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)], + include_str!( + "../resources/templates/connection_manager/control/get_current_connection_ids.xml" + ), + ) + .into_response(), + SOAP_ACTION_PREPARE_FOR_CONNECTION => not_implemented(), + SOAP_ACTION_CONNECTION_COMPLETE => not_implemented(), + _ => StatusCode::BAD_REQUEST.into_response(), + } +} + +pub(crate) async fn subscribe_http_handler( + State(state): State, + request: axum::extract::Request, +) -> impl IntoResponse { + let req = match SubscribeRequest::parse(request) { + Ok(sub) => sub, + Err(err) => return err, + }; + + let resp = state.handle_connection_manager_subscription_request(&req); + crate::subscriptions::subscription_into_response(&req, resp) +} diff --git a/crates/upnp-serve/src/services/content_directory.rs b/crates/upnp-serve/src/services/content_directory.rs new file mode 100644 index 0000000..8fa9571 --- /dev/null +++ b/crates/upnp-serve/src/services/content_directory.rs @@ -0,0 +1,336 @@ +use std::sync::atomic::Ordering; + +use axum::{body::Bytes, extract::State, response::IntoResponse}; +use browse::response::ItemOrContainer; +use bstr::BStr; +use http::{header::CONTENT_TYPE, HeaderMap, StatusCode}; +use tracing::{debug, trace}; + +use crate::{ + constants::{ + CONTENT_TYPE_XML_UTF8, SOAP_ACTION_CONTENT_DIRECTORY_BROWSE, + SOAP_ACTION_GET_SYSTEM_UPDATE_ID, + }, + state::UnpnServerState, +}; + +pub mod browse { + pub mod request { + use anyhow::Context; + use serde::Deserialize; + + #[derive(Deserialize)] + struct Envelope { + #[serde(rename = "Body")] + body: Body, + } + + #[derive(Deserialize)] + struct Body { + #[serde(rename = "Browse")] + browse: ContentDirectoryControlRequest, + } + + #[derive(Deserialize, PartialEq, Eq, Debug)] + pub enum BrowseFlag { + BrowseDirectChildren, + BrowseMetadata, + } + + #[derive(Deserialize, Debug)] + pub struct ContentDirectoryControlRequest { + #[serde(rename = "ObjectID")] + pub object_id: usize, + #[serde(rename = "BrowseFlag")] + pub browse_flag: BrowseFlag, + #[serde(rename = "StartingIndex", default)] + pub starting_index: usize, + #[serde(rename = "RequestedCount", default)] + pub requested_count: usize, + } + + impl ContentDirectoryControlRequest { + pub fn parse(s: &str) -> anyhow::Result { + let envelope: Envelope = + quick_xml::de::from_str(s).context("error deserializing")?; + Ok(envelope.body.browse) + } + } + } + + pub mod response { + #[derive(Debug, Clone, PartialEq, Eq)] + pub struct Container { + pub id: usize, + pub parent_id: Option, + pub children_count: Option, + pub title: String, + } + + #[derive(Debug, Clone, PartialEq, Eq)] + pub struct Item { + pub id: usize, + pub parent_id: Option, + pub title: String, + pub mime_type: Option, + pub url: String, + } + + #[derive(Debug, Clone, PartialEq, Eq)] + pub enum ItemOrContainer { + Container(Container), + Item(Item), + } + + pub(crate) fn render(items: impl IntoIterator) -> String { + fn item_or_container(item_or_container: &ItemOrContainer) -> Option { + fn item(item: &Item) -> Option { + let mime = item.mime_type.as_ref()?; + let upnp_class = match mime.type_().as_str() { + "video" => "object.item.videoItem", + _ => return None, + }; + let mime = mime.to_string(); + + Some(format!( + include_str!( + "../resources/templates/content_directory/control/browse/item.tmpl.xml" + ), + id = item.id, + parent_id = item.parent_id.unwrap_or(0), + mime_type = mime, + url = item.url, + upnp_class = upnp_class, + title = item.title + )) + } + + fn container(item: &Container) -> String { + let child_count_tag = match item.children_count { + Some(cc) => format!("childCount=\"{}\"", cc), + None => String::new(), + }; + format!( + include_str!( + "../resources/templates/content_directory/control/browse/container.tmpl.xml" + ), + id = item.id, + parent_id = item.parent_id.unwrap_or(0), + title = item.title, + childCountTag = child_count_tag + ) + } + + match item_or_container { + ItemOrContainer::Container(c) => Some(container(c)), + ItemOrContainer::Item(i) => item(i), + } + } + + struct Envelope<'a> { + items: &'a str, + number_returned: usize, + total_matches: usize, + update_id: u64, + } + + fn render_response(envelope: &Envelope<'_>) -> String { + format!( + include_str!( + "../resources/templates/content_directory/control/browse/response.tmpl.xml" + ), + items = envelope.items, + number_returned = envelope.number_returned, + total_matches = envelope.total_matches, + update_id = envelope.update_id + ) + } + + let all_items = items + .into_iter() + .filter_map(|item| item_or_container(&item)) + .collect::>(); + let total = all_items.len(); + let all_items = all_items.join(""); + + use std::time::{SystemTime, UNIX_EPOCH}; + let update_id = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + + render_response(&Envelope { + items: &all_items, + number_returned: total, + total_matches: total, + update_id, + }) + } + } +} + +pub mod get_system_update_id { + pub(crate) fn render_notify(update_id: u64) -> String { + format!( + include_str!( + "../resources/templates/content_directory/subscriptions/system_update_id.tmpl.xml" + ), + system_update_id = update_id + ) + } + + pub(crate) fn render_response(update_id: u64) -> String { + format!( + include_str!( + "../resources/templates/content_directory/control/get_system_update_id/response.tmpl.xml" + ), + id = update_id + ) + } +} + +pub mod subscription { + use axum::{extract::State, response::IntoResponse}; + use http::Method; + + use crate::{state::UnpnServerState, subscriptions::SubscribeRequest}; + + pub(crate) async fn subscribe_http_handler( + State(state): State, + request: axum::extract::Request, + ) -> impl IntoResponse { + let req = match SubscribeRequest::parse(request) { + Ok(sub) => sub, + Err(err) => return err, + }; + + let resp = state.handle_content_directory_subscription_request(&req); + crate::subscriptions::subscription_into_response(&req, resp) + } + + pub async fn notify_system_id_update( + url: &url::Url, + sid: &str, + seq: u64, + system_update_id: u64, + ) -> anyhow::Result<()> { + // NOTIFY /callback_path HTTP/1.1 + // CONTENT-TYPE: text/xml; charset="utf-8" + // NT: upnp:event + // NTS: upnp:propchange + // SID: uuid: + // SEQ: + // + let body = super::get_system_update_id::render_notify(system_update_id); + + let resp = reqwest::Client::builder() + .build()? + .request(Method::from_bytes(b"NOTIFY")?, url.clone()) + .header("Content-Type", r#"text/xml; charset="utf-8""#) + .header("NT", "upnp:event") + .header("NTS", "upnp:propchange") + .header("SID", sid) + .header("SEQ", seq.to_string()) + .body(body) + .send() + .await?; + + if !resp.status().is_success() { + anyhow::bail!("{:?}", resp.status()) + } + Ok(()) + } +} + +pub(crate) async fn http_handler( + headers: HeaderMap, + State(state): State, + body: Bytes, +) -> impl IntoResponse { + let body = BStr::new(&body); + let action = headers.get("soapaction").map(|v| BStr::new(v.as_bytes())); + trace!(?body, ?action, "received control request"); + let action = match action { + Some(action) => action, + None => { + debug!("missing SOAPACTION header"); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + match action.as_ref() { + SOAP_ACTION_CONTENT_DIRECTORY_BROWSE => { + let http_hostname = headers + .get("host") + .and_then(|h| std::str::from_utf8(h.as_bytes()).ok()) + .and_then(|h| h.split(':').next()); + let http_hostname = match http_hostname { + Some(h) => h, + None => return StatusCode::BAD_REQUEST.into_response(), + }; + + let body = match std::str::from_utf8(body) { + Ok(body) => body, + Err(_) => return (StatusCode::BAD_REQUEST, "cannot parse request").into_response(), + }; + + let request = match browse::request::ContentDirectoryControlRequest::parse(body) { + Ok(req) => req, + Err(e) => { + debug!(error=?e, "error parsing XML"); + return (StatusCode::BAD_REQUEST, "cannot parse request").into_response(); + } + }; + + use browse::request::BrowseFlag; + + match request.browse_flag { + BrowseFlag::BrowseDirectChildren => ( + [(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)], + browse::response::render( + state + .provider + .browse_direct_children(request.object_id, http_hostname), + ), + ) + .into_response(), + BrowseFlag::BrowseMetadata => StatusCode::NOT_IMPLEMENTED.into_response(), + } + } + SOAP_ACTION_GET_SYSTEM_UPDATE_ID => { + let update_id = state.system_update_id.load(Ordering::Relaxed); + ( + [(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)], + get_system_update_id::render_response(update_id), + ) + .into_response() + } + _ => { + debug!(?action, "unsupported ContentDirectory action"); + (StatusCode::NOT_IMPLEMENTED, "").into_response() + } + } +} + +pub trait ContentDirectoryBrowseProvider: Send + Sync { + fn browse_direct_children(&self, parent_id: usize, http_hostname: &str) + -> Vec; +} + +impl ContentDirectoryBrowseProvider for Vec { + fn browse_direct_children(&self, _parent_id: usize, _http_host: &str) -> Vec { + self.clone() + } +} + +#[cfg(test)] +mod tests { + #[test] + fn test_parse_content_directory_request() { + use super::browse::request::{BrowseFlag, ContentDirectoryControlRequest}; + + let s = include_str!("../resources/test/ContentDirectoryControlExampleRequest.xml"); + let req = ContentDirectoryControlRequest::parse(s).unwrap(); + assert_eq!(req.object_id, 5); + assert_eq!(req.browse_flag, BrowseFlag::BrowseDirectChildren) + } +} diff --git a/crates/upnp-serve/src/services/mod.rs b/crates/upnp-serve/src/services/mod.rs new file mode 100644 index 0000000..c5bf046 --- /dev/null +++ b/crates/upnp-serve/src/services/mod.rs @@ -0,0 +1,2 @@ +pub mod connection_manager; +pub mod content_directory; diff --git a/crates/upnp-serve/src/state.rs b/crates/upnp-serve/src/state.rs index 2a392f8..024309f 100644 --- a/crates/upnp-serve/src/state.rs +++ b/crates/upnp-serve/src/state.rs @@ -12,19 +12,18 @@ use librqbit_core::spawn_utils::spawn_with_cancel; use tokio_util::sync::CancellationToken; use tracing::{error_span, Span}; -use crate::{ - subscriptions::Subscriptions, upnp_types::content_directory::ContentDirectoryBrowseProvider, -}; +use crate::{subscriptions::Subscriptions, ContentDirectoryBrowseProvider}; pub struct UpnpServerStateInner { - pub rendered_root_description: Bytes, - pub provider: Box, - pub system_update_id: AtomicU64, - pub subscriptions: Subscriptions, + pub(crate) rendered_root_description: Bytes, + pub(crate) provider: Box, + pub(crate) system_update_id: AtomicU64, + pub(crate) content_directory_subscriptions: Subscriptions, + pub(crate) connection_manager_subscriptions: Subscriptions, - pub span: Span, - pub system_update_bcast_tx: tokio::sync::broadcast::Sender, - pub cancel_token: tokio_util::sync::CancellationToken, + pub(crate) span: Span, + pub(crate) system_update_bcast_tx: tokio::sync::broadcast::Sender, + pub(crate) cancel_token: tokio_util::sync::CancellationToken, _drop_guard: tokio_util::sync::DropGuard, } @@ -46,7 +45,8 @@ impl UpnpServerStateInner { rendered_root_description, provider, system_update_id: AtomicU64::new(new_system_update_id()?), - subscriptions: Default::default(), + content_directory_subscriptions: Default::default(), + connection_manager_subscriptions: Default::default(), system_update_bcast_tx: btx, _drop_guard: drop_guard, span: span.clone(), diff --git a/crates/upnp-serve/src/subscriptions.rs b/crates/upnp-serve/src/subscriptions.rs index 5988aa5..f3fdd80 100644 --- a/crates/upnp-serve/src/subscriptions.rs +++ b/crates/upnp-serve/src/subscriptions.rs @@ -1,7 +1,7 @@ use crate::state::UpnpServerStateInner; -use crate::templates::render_notify_subscription_system_update_id; use anyhow::Context; -use http::Method; +use axum::response::IntoResponse; +use http::{HeaderName, StatusCode}; use librqbit_core::spawn_utils::spawn_with_cancel; use parking_lot::RwLock; use std::{ @@ -13,6 +13,7 @@ use tokio::sync::{broadcast::error::RecvError, Notify}; use tracing::{debug, error_span, trace, warn, Instrument}; pub struct Subscription { + #[allow(dead_code)] pub url: url::Url, pub seq: u64, pub timeout: Duration, @@ -69,50 +70,171 @@ impl Subscriptions { } } -pub async fn notify_subscription_system_update( - url: &url::Url, - sid: &str, - seq: u64, - system_update_id: u64, -) -> anyhow::Result<()> { - // NOTIFY /callback_path HTTP/1.1 - // CONTENT-TYPE: text/xml; charset="utf-8" - // NT: upnp:event - // NTS: upnp:propchange - // SID: uuid: - // SEQ: - // - let body = render_notify_subscription_system_update_id(system_update_id); +#[derive(Debug)] +pub enum SubscribeRequest { + Create { + callback: url::Url, + timeout: Duration, + }, + Renew { + sid: String, + timeout: Duration, + }, +} - let resp = reqwest::Client::builder() - .build()? - .request(Method::from_bytes(b"NOTIFY")?, url.clone()) - .header("Content-Type", r#"text/xml; charset="utf-8""#) - .header("NT", "upnp:event") - .header("NTS", "upnp:propchange") - .header("SID", sid) - .header("SEQ", seq.to_string()) - .body(body) - .send() - .await?; - - if !resp.status().is_success() { - anyhow::bail!("{:?}", resp.status()) +impl core::fmt::Display for SubscribeRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SubscribeRequest::Create { callback, timeout } => { + write!(f, "create;callback={callback};timeout={timeout:?}") + } + SubscribeRequest::Renew { sid, timeout } => { + write!(f, "renew;sid={sid};timeout={timeout:?}") + } + } } - Ok(()) +} + +impl SubscribeRequest { + fn timeout(&self) -> Duration { + match self { + SubscribeRequest::Create { timeout, .. } => *timeout, + SubscribeRequest::Renew { timeout, .. } => *timeout, + } + } +} + +impl SubscribeRequest { + pub fn parse( + request: axum::extract::Request, + ) -> Result { + if request.method().as_str() != "SUBSCRIBE" { + return Err(StatusCode::METHOD_NOT_ALLOWED.into_response()); + } + + let (parts, _body) = request.into_parts(); + let is_event = parts + .headers + .get(HeaderName::from_static("nt")) + .map(|v| v.as_bytes() == b"upnp:event") + .unwrap_or_default(); + + let callback = parts + .headers + .get(HeaderName::from_static("callback")) + .and_then(|v| v.to_str().ok()) + .map(|s| s.trim_matches(|c| c == '>' || c == '<')) + .and_then(|u| url::Url::parse(u).ok()); + let subscription_id = parts + .headers + .get(HeaderName::from_static("sid")) + .and_then(|v| v.to_str().ok()); + + let timeout = parts + .headers + .get(HeaderName::from_static("timeout")) + .and_then(|v| v.to_str().ok()) + .and_then(|t| t.strip_prefix("Second-")) + .and_then(|t| t.parse::().ok()) + .map(|t| Duration::from_secs(t as u64)); + + const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800); + + let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT).min(DEFAULT_TIMEOUT); + + match (is_event, callback, subscription_id) { + (true, Some(callback), None) => Ok(SubscribeRequest::Create { callback, timeout }), + (_, _, Some(sid)) => Ok(SubscribeRequest::Renew { + sid: sid.to_owned(), + timeout, + }), + _ => Err(StatusCode::BAD_REQUEST.into_response()), + } + } +} + +#[derive(Debug)] +pub(crate) enum SubscriptionResult { + Renewed { sid: String }, + Created { sid: String }, +} + +impl SubscriptionResult { + fn sid(&self) -> &str { + match self { + SubscriptionResult::Renewed { sid } => sid, + SubscriptionResult::Created { sid } => sid, + } + } +} + +pub(crate) fn subscription_into_response( + request: &SubscribeRequest, + result: anyhow::Result, +) -> axum::response::Response { + trace!(%request, ?result, "request->response"); + + let result = match result { + Ok(r) => r, + Err(e) => { + warn!(error=?e, sub=?request, "error handling subscription request"); + return StatusCode::BAD_REQUEST.into_response(); + } + }; + + ( + StatusCode::OK, + [ + ("SID", result.sid().to_owned()), + ("TIMEOUT", format!("Second-{}", request.timeout().as_secs())), + ], + ) + .into_response() } impl UpnpServerStateInner { - pub fn renew_subscription(&self, sid: &str, new_timeout: Duration) -> anyhow::Result<()> { - self.subscriptions.update_timeout(sid, new_timeout) + pub(crate) fn handle_content_directory_subscription_request( + self: &Arc, + req: &SubscribeRequest, + ) -> anyhow::Result { + match req { + SubscribeRequest::Create { callback, timeout } => { + let sid = self.new_content_directory_subscription(callback.clone(), *timeout)?; + Ok(SubscriptionResult::Created { sid }) + } + SubscribeRequest::Renew { sid, timeout } => { + self.content_directory_subscriptions + .update_timeout(sid, *timeout)?; + Ok(SubscriptionResult::Renewed { sid: sid.clone() }) + } + } } - pub fn new_subscription( + pub(crate) fn handle_connection_manager_subscription_request( + self: &Arc, + req: &SubscribeRequest, + ) -> anyhow::Result { + match req { + SubscribeRequest::Create { callback, timeout } => { + let sid = self.new_connection_manager_subscription(callback.clone(), *timeout)?; + Ok(SubscriptionResult::Created { sid }) + } + SubscribeRequest::Renew { sid, timeout } => { + self.connection_manager_subscriptions + .update_timeout(sid, *timeout)?; + Ok(SubscriptionResult::Renewed { sid: sid.clone() }) + } + } + } + + fn new_content_directory_subscription( self: &Arc, url: url::Url, timeout: Duration, ) -> anyhow::Result { - let (sid, refresh_notify) = self.subscriptions.add(url.clone(), timeout); + let (sid, refresh_notify) = self + .content_directory_subscriptions + .add(url.clone(), timeout); let token = self.cancel_token.child_token(); // Spawn a task that will notify it of system id changes. @@ -127,37 +249,28 @@ impl UpnpServerStateInner { let url = url.clone(); async move { + use crate::services::content_directory::subscription::notify_system_id_update; let system_update_id_notifier = async { loop { let res = brx.recv().await; let state = state.upgrade().context("upnp server dead")?; - let seq = state.subscriptions.next_seq(&sid)?; + let seq = state.content_directory_subscriptions.next_seq(&sid)?; match res { Ok(system_update_id) => { trace!(system_update_id, "notifying SystemUpdateId update"); - if let Err(e) = notify_subscription_system_update( - &url, - &sid, - seq, - system_update_id, - ) - .await + if let Err(e) = + notify_system_id_update(&url, &sid, seq, system_update_id).await { debug!(error=?e, "error updating UPNP subscription"); } } Err(RecvError::Lagged(by)) => { warn!(by, "UPNP subscription lagged"); - let seq = state.subscriptions.next_seq(&sid)?; + let seq = state.content_directory_subscriptions.next_seq(&sid)?; let system_update_id = state.system_update_id.load(Ordering::Relaxed); - if let Err(e) = notify_subscription_system_update( - &url, - &sid, - seq, - system_update_id, - ) - .await + if let Err(e) = + notify_system_id_update(&url, &sid, seq, system_update_id).await { debug!(error=?e, "error updating UPNP subscription"); } @@ -173,10 +286,12 @@ impl UpnpServerStateInner { loop { tokio::select! { _ = refresh_notify.notified() => { - timeout = state.upgrade().context("upnp server dead")?.subscriptions.get_timeout(&sid)?; + timeout = state.upgrade().context("upnp server dead")?.content_directory_subscriptions.get_timeout(&sid)?; + trace!(?timeout, "refreshed subscription"); }, _ = tokio::time::sleep(timeout) => { - state.upgrade().context("upnp server dead")?.subscriptions.remove(&sid)?; + state.upgrade().context("upnp server dead")?.content_directory_subscriptions.remove(&sid)?; + trace!(?timeout, "subscription timed out, removing"); return Ok(()) } } @@ -191,7 +306,55 @@ impl UpnpServerStateInner { }; spawn_with_cancel( - error_span!(parent: pspan, "subscription-manager", sid, %url), + error_span!(parent: pspan, "subscription-manager", sid, %url, service="ContentDirectory"), + token, + subscription_manager, + ); + + Ok(sid) + } + + fn new_connection_manager_subscription( + self: &Arc, + url: url::Url, + timeout: Duration, + ) -> anyhow::Result { + let (sid, refresh_notify) = self + .connection_manager_subscriptions + .add(url.clone(), timeout); + let token = self.cancel_token.child_token(); + + // Spawn a task that will notify it of system id changes. + // Spawn a task that will wait for timeout or subscription refreshes. + // When it times out, kill all of them. + + let pspan = self.span.clone(); + let subscription_manager = { + let state = Arc::downgrade(self); + let sid = sid.clone(); + + async move { + let timeout_notifier = async { + let mut timeout = timeout; + loop { + tokio::select! { + _ = refresh_notify.notified() => { + timeout = state.upgrade().context("upnp server dead")?.connection_manager_subscriptions.get_timeout(&sid)?; + }, + _ = tokio::time::sleep(timeout) => { + state.upgrade().context("upnp server dead")?.connection_manager_subscriptions.remove(&sid)?; + return Ok(()) + } + } + } + }.instrument(error_span!("timeout-killer")); + + timeout_notifier.await + } + }; + + spawn_with_cancel( + error_span!(parent: pspan, "subscription-manager", sid, %url, service="ConnectionManager"), token, subscription_manager, ); diff --git a/crates/upnp-serve/src/templates.rs b/crates/upnp-serve/src/templates.rs index 7729f71..8b13789 100644 --- a/crates/upnp-serve/src/templates.rs +++ b/crates/upnp-serve/src/templates.rs @@ -1,119 +1 @@ -use crate::upnp_types::content_directory::response::{Container, Item, ItemOrContainer}; -pub struct RootDescriptionInputs<'a> { - pub friendly_name: &'a str, - pub manufacturer: &'a str, - pub model_name: &'a str, - pub unique_id: &'a str, - pub http_prefix: &'a str, -} - -pub fn render_root_description_xml(input: &RootDescriptionInputs<'_>) -> String { - format!( - include_str!("resources/templates/root_desc.tmpl.xml"), - friendly_name = input.friendly_name, - manufacturer = input.manufacturer, - model_name = input.model_name, - unique_id = input.unique_id, - http_prefix = input.http_prefix - ) -} - -pub fn render_content_directory_browse(items: impl IntoIterator) -> String { - fn item_or_container(item_or_container: &ItemOrContainer) -> Option { - fn item(item: &Item) -> Option { - let mime = item.mime_type.as_ref()?; - let upnp_class = match mime.type_().as_str() { - "video" => "object.item.videoItem", - _ => return None, - }; - let mime = mime.to_string(); - - Some(format!( - include_str!("resources/templates/content_directory/control/browse/item.tmpl.xml"), - id = item.id, - parent_id = item.parent_id.unwrap_or(0), - mime_type = mime, - url = item.url, - upnp_class = upnp_class, - title = item.title - )) - } - - fn container(item: &Container) -> String { - let child_count_tag = match item.children_count { - Some(cc) => format!("childCount=\"{}\"", cc), - None => String::new(), - }; - format!( - include_str!( - "resources/templates/content_directory/control/browse/container.tmpl.xml" - ), - id = item.id, - parent_id = item.parent_id.unwrap_or(0), - title = item.title, - childCountTag = child_count_tag - ) - } - - match item_or_container { - ItemOrContainer::Container(c) => Some(container(c)), - ItemOrContainer::Item(i) => item(i), - } - } - - struct Envelope<'a> { - items: &'a str, - number_returned: usize, - total_matches: usize, - update_id: u64, - } - - fn render_response(envelope: &Envelope<'_>) -> String { - format!( - include_str!("resources/templates/content_directory/control/browse/response.tmpl.xml"), - items = envelope.items, - number_returned = envelope.number_returned, - total_matches = envelope.total_matches, - update_id = envelope.update_id - ) - } - - let all_items = items - .into_iter() - .filter_map(|item| item_or_container(&item)) - .collect::>(); - let total = all_items.len(); - let all_items = all_items.join(""); - - use std::time::{SystemTime, UNIX_EPOCH}; - let update_id = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_secs()) - .unwrap_or(0); - - render_response(&Envelope { - items: &all_items, - number_returned: total, - total_matches: total, - update_id, - }) -} - -pub fn render_notify_subscription_system_update_id(update_id: u64) -> String { - format!( - include_str!( - "resources/templates/content_directory/subscriptions/system_update_id.tmpl.xml" - ), - system_update_id = update_id - ) -} - -pub fn render_content_directory_control_get_system_update_id(update_id: u64) -> String { - format!( - include_str!( - "resources/templates/content_directory/control/get_system_update_id/response.tmpl.xml" - ), - id = update_id - ) -} diff --git a/crates/upnp-serve/src/upnp_types.rs b/crates/upnp-serve/src/upnp_types.rs deleted file mode 100644 index 0cf2afe..0000000 --- a/crates/upnp-serve/src/upnp_types.rs +++ /dev/null @@ -1,104 +0,0 @@ -pub mod content_directory { - use response::ItemOrContainer; - - pub mod request { - use anyhow::Context; - use serde::Deserialize; - - #[derive(Deserialize)] - struct Envelope { - #[serde(rename = "Body")] - body: Body, - } - - #[derive(Deserialize)] - struct Body { - #[serde(rename = "Browse")] - browse: ContentDirectoryControlRequest, - } - - #[derive(Deserialize, PartialEq, Eq, Debug)] - pub enum BrowseFlag { - BrowseDirectChildren, - BrowseMetadata, - } - - #[derive(Deserialize, Debug)] - pub struct ContentDirectoryControlRequest { - #[serde(rename = "ObjectID")] - pub object_id: usize, - #[serde(rename = "BrowseFlag")] - pub browse_flag: BrowseFlag, - #[serde(rename = "StartingIndex", default)] - pub starting_index: usize, - #[serde(rename = "RequestedCount", default)] - pub requested_count: usize, - } - - impl ContentDirectoryControlRequest { - pub fn parse(s: &str) -> anyhow::Result { - let envelope: Envelope = - quick_xml::de::from_str(s).context("error deserializing")?; - Ok(envelope.body.browse) - } - } - } - - pub mod response { - #[derive(Debug, Clone, PartialEq, Eq)] - pub struct Container { - pub id: usize, - pub parent_id: Option, - pub children_count: Option, - pub title: String, - } - - #[derive(Debug, Clone, PartialEq, Eq)] - pub struct Item { - pub id: usize, - pub parent_id: Option, - pub title: String, - pub mime_type: Option, - pub url: String, - } - - #[derive(Debug, Clone, PartialEq, Eq)] - pub enum ItemOrContainer { - Container(Container), - Item(Item), - } - } - - pub trait ContentDirectoryBrowseProvider: Send + Sync { - fn browse_direct_children( - &self, - parent_id: usize, - http_hostname: &str, - ) -> Vec; - } - - impl ContentDirectoryBrowseProvider for Vec { - fn browse_direct_children( - &self, - _parent_id: usize, - _http_host: &str, - ) -> Vec { - self.clone() - } - } -} - -#[cfg(test)] -mod tests { - use crate::upnp_types::content_directory::request::{ - BrowseFlag, ContentDirectoryControlRequest, - }; - - #[test] - fn test_parse_content_directory_request() { - let s = include_str!("resources/test/ContentDirectoryControlExampleRequest.xml"); - let req = ContentDirectoryControlRequest::parse(s).unwrap(); - assert_eq!(req.object_id, 5); - assert_eq!(req.browse_flag, BrowseFlag::BrowseDirectChildren) - } -}