Merge pull request #226 from ikatson/connection-manager-stub

[Feature] [UPnP] connection manager stub
This commit is contained in:
Igor Katson 2024-08-29 12:29:46 +01:00 committed by GitHub
commit d7f3d883f2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 810 additions and 541 deletions

View file

@ -14,7 +14,7 @@ webui-dev: webui-deps
export RQBIT_UPNP_SERVER_ENABLE ?= true
export RQBIT_UPNP_SERVER_FRIENDLY_NAME ?= rqbit-dev
export RQBIT_HTTP_API_LISTEN_ADDR ?= 0.0.0.0:3030
CARGO_RUN_FLAGS ?= ""
CARGO_RUN_FLAGS ?=
RQBIT_OUTPUT_FOLDER ?= /tmp/scratch
RQBIT_POSTGRES_CONNECTION_STRING ?= postgres:///rqbit

View file

@ -20,8 +20,8 @@ use itertools::Itertools;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use tracing::{debug, trace, warn};
use upnp_serve::{
upnp_types::content_directory::{
response::{Container, Item, ItemOrContainer},
services::content_directory::{
browse::response::{Container, Item, ItemOrContainer},
ContentDirectoryBrowseProvider,
},
UpnpServer, UpnpServerOptions,
@ -346,8 +346,8 @@ mod tests {
TorrentMetaV1File, TorrentMetaV1Info, TorrentMetaV1Owned,
};
use tempfile::TempDir;
use upnp_serve::upnp_types::content_directory::{
response::{Container, Item, ItemOrContainer},
use upnp_serve::services::content_directory::{
browse::response::{Container, Item, ItemOrContainer},
ContentDirectoryBrowseProvider,
};

View file

@ -6,7 +6,7 @@ use std::{
use anyhow::Context;
use axum::routing::get;
use librqbit_upnp_serve::{
upnp_types::content_directory::response::{Item, ItemOrContainer},
services::content_directory::browse::response::{Item, ItemOrContainer},
UpnpServer, UpnpServerOptions,
};
use mime_guess::Mime;

View file

@ -1,239 +0,0 @@
use std::{sync::atomic::Ordering, time::Duration};
use anyhow::Context;
use axum::{
body::Bytes,
extract::State,
handler::HandlerWithoutStateExt,
response::IntoResponse,
routing::{get, post},
};
use bstr::BStr;
use http::{header::CONTENT_TYPE, HeaderMap, HeaderName, StatusCode};
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use crate::{
constants::{
CONTENT_TYPE_XML_UTF8, SOAP_ACTION_CONTENT_DIRECTORY_BROWSE,
SOAP_ACTION_GET_SYSTEM_UPDATE_ID,
},
state::{UnpnServerState, UpnpServerStateInner},
templates::{
render_content_directory_browse, render_content_directory_control_get_system_update_id,
render_root_description_xml, RootDescriptionInputs,
},
upnp_types::content_directory::{
request::ContentDirectoryControlRequest, ContentDirectoryBrowseProvider,
},
};
async fn description_xml(State(state): State<UnpnServerState>) -> impl IntoResponse {
(
[(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)],
state.rendered_root_description.clone(),
)
}
async fn generate_content_directory_control_response(
headers: HeaderMap,
State(state): State<UnpnServerState>,
body: Bytes,
) -> impl IntoResponse {
let body = BStr::new(&body);
let action = headers.get("soapaction").map(|v| BStr::new(v.as_bytes()));
trace!(?body, ?action, "received control request");
let action = match action {
Some(action) => action,
None => {
debug!("missing SOAPACTION header");
return (StatusCode::BAD_REQUEST, "").into_response();
}
};
match action.as_ref() {
SOAP_ACTION_CONTENT_DIRECTORY_BROWSE => {
let http_hostname = headers
.get("host")
.and_then(|h| std::str::from_utf8(h.as_bytes()).ok())
.and_then(|h| h.split(':').next());
let http_hostname = match http_hostname {
Some(h) => h,
None => return StatusCode::BAD_REQUEST.into_response(),
};
let body = match std::str::from_utf8(body) {
Ok(body) => body,
Err(_) => return (StatusCode::BAD_REQUEST, "cannot parse request").into_response(),
};
let request = match ContentDirectoryControlRequest::parse(body) {
Ok(req) => req,
Err(e) => {
debug!(error=?e, "error parsing XML");
return (StatusCode::BAD_REQUEST, "cannot parse request").into_response();
}
};
use crate::upnp_types::content_directory::request::BrowseFlag;
match request.browse_flag {
BrowseFlag::BrowseDirectChildren => (
[(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)],
render_content_directory_browse(
state
.provider
.browse_direct_children(request.object_id, http_hostname),
),
)
.into_response(),
BrowseFlag::BrowseMetadata => StatusCode::NOT_IMPLEMENTED.into_response(),
}
}
SOAP_ACTION_GET_SYSTEM_UPDATE_ID => {
let update_id = state.system_update_id.load(Ordering::Relaxed);
(
[(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)],
render_content_directory_control_get_system_update_id(update_id),
)
.into_response()
}
_ => {
debug!(?action, "unsupported ContentDirectory action");
(StatusCode::NOT_IMPLEMENTED, "").into_response()
}
}
}
async fn subscription(
State(state): State<UnpnServerState>,
request: axum::extract::Request,
) -> impl IntoResponse {
if request.method().as_str() != "SUBSCRIBE" {
return (StatusCode::METHOD_NOT_ALLOWED, "").into_response();
}
let (parts, _body) = request.into_parts();
let is_event = parts
.headers
.get(HeaderName::from_static("nt"))
.map(|v| v.as_bytes() == b"upnp:event")
.unwrap_or_default();
if !is_event {
return (StatusCode::BAD_REQUEST, "expected NT: upnp:event header").into_response();
}
let callback = parts
.headers
.get(HeaderName::from_static("callback"))
.and_then(|v| v.to_str().ok())
.map(|s| s.trim_matches(|c| c == '>' || c == '<'))
.and_then(|u| url::Url::parse(u).ok());
let callback = match callback {
Some(c) => c,
None => return (StatusCode::BAD_REQUEST, "callback not provided").into_response(),
};
let subscription_id = parts
.headers
.get(HeaderName::from_static("sid"))
.and_then(|v| v.to_str().ok());
let timeout = parts
.headers
.get(HeaderName::from_static("timeout"))
.and_then(|v| v.to_str().ok())
.and_then(|t| t.strip_prefix("Second-"))
.and_then(|t| t.parse::<u16>().ok())
.map(|t| Duration::from_secs(t as u64));
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800);
let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT);
if let Some(sid) = subscription_id {
match state.renew_subscription(sid, timeout) {
Ok(()) => (
StatusCode::OK,
[
("SID", sid.to_owned()),
("TIMEOUT", format!("Second-{}", timeout.as_secs())),
],
)
.into_response(),
Err(e) => {
warn!(sid, error=?e, "error renewing subscription");
StatusCode::NOT_FOUND.into_response()
}
}
} else {
match state.new_subscription(callback, timeout) {
Ok(sid) => (
StatusCode::OK,
[
("SID", sid),
("TIMEOUT", format!("Second-{}", timeout.as_secs())),
],
)
.into_response(),
Err(e) => {
warn!(error=?e, "error creating subscription");
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
}
pub fn make_router(
friendly_name: String,
http_prefix: String,
upnp_usn: String,
browse_provider: Box<dyn ContentDirectoryBrowseProvider>,
cancellation_token: CancellationToken,
) -> anyhow::Result<axum::Router> {
let root_desc = render_root_description_xml(&RootDescriptionInputs {
friendly_name: &friendly_name,
manufacturer: "rqbit developers",
model_name: "1.0.0",
unique_id: &upnp_usn,
http_prefix: &http_prefix,
});
let state = UpnpServerStateInner::new(root_desc.into(), browse_provider, cancellation_token)
.context("error creating UPNP server")?;
let content_dir_sub_handler = {
let state = state.clone();
move |request: axum::extract::Request| async move {
subscription(State(state.clone()), request).await
}
};
let app = axum::Router::new()
.route("/description.xml", get(description_xml))
.route(
"/scpd/ContentDirectory.xml",
get(|| async { include_str!("resources/templates/content_directory/scpd.xml") }),
)
.route(
"/scpd/ConnectionManager.xml",
get(|| async { include_str!("resources/templates/connection_manager/scpd.xml") }),
)
.route(
"/control/ContentDirectory",
post(generate_content_directory_control_response),
)
.route(
"/control/ConnectionManager",
post(|| async { (StatusCode::NOT_IMPLEMENTED, "") }),
)
.route_service(
"/subscribe/ContentDirectory",
content_dir_sub_handler.into_service(),
)
.route(
"/subscribe/ConnectionManager",
post(|| async { (StatusCode::NOT_IMPLEMENTED, "") }),
)
.with_state(state);
Ok(app)
}

View file

@ -0,0 +1,112 @@
use anyhow::Context;
use axum::{
extract::State,
handler::HandlerWithoutStateExt,
response::IntoResponse,
routing::{get, post},
};
use http::header::CONTENT_TYPE;
use tokio_util::sync::CancellationToken;
use crate::{
constants::CONTENT_TYPE_XML_UTF8,
services::content_directory::ContentDirectoryBrowseProvider,
state::{UnpnServerState, UpnpServerStateInner},
};
async fn description_xml(State(state): State<UnpnServerState>) -> impl IntoResponse {
(
[(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)],
state.rendered_root_description.clone(),
)
}
pub struct RootDescriptionInputs<'a> {
pub friendly_name: &'a str,
pub manufacturer: &'a str,
pub model_name: &'a str,
pub unique_id: &'a str,
pub http_prefix: &'a str,
}
pub fn render_root_description_xml(input: &RootDescriptionInputs<'_>) -> String {
format!(
include_str!("resources/templates/root_desc.tmpl.xml"),
friendly_name = input.friendly_name,
manufacturer = input.manufacturer,
model_name = input.model_name,
unique_id = input.unique_id,
http_prefix = input.http_prefix
)
}
pub fn make_router(
friendly_name: String,
http_prefix: String,
upnp_usn: String,
browse_provider: Box<dyn ContentDirectoryBrowseProvider>,
cancellation_token: CancellationToken,
) -> anyhow::Result<axum::Router> {
let root_desc = render_root_description_xml(&RootDescriptionInputs {
friendly_name: &friendly_name,
manufacturer: "rqbit developers",
model_name: "1.0.0",
unique_id: &upnp_usn,
http_prefix: &http_prefix,
});
let state = UpnpServerStateInner::new(root_desc.into(), browse_provider, cancellation_token)
.context("error creating UPNP server")?;
let content_dir_sub_handler = {
let state = state.clone();
move |request: axum::extract::Request| async move {
crate::services::content_directory::subscription::subscribe_http_handler(
State(state.clone()),
request,
)
.await
}
};
let connection_manager_sub_handler = {
let state = state.clone();
move |request: axum::extract::Request| async move {
crate::services::connection_manager::subscribe_http_handler(
State(state.clone()),
request,
)
.await
}
};
let app = axum::Router::new()
.route("/description.xml", get(description_xml))
.route(
"/scpd/ContentDirectory.xml",
get(|| async { include_str!("resources/templates/content_directory/scpd.xml") }),
)
.route(
"/scpd/ConnectionManager.xml",
get(|| async { include_str!("resources/templates/connection_manager/scpd.xml") }),
)
.route(
"/control/ContentDirectory",
post(crate::services::content_directory::http_handler),
)
.route(
"/control/ConnectionManager",
post(crate::services::connection_manager::http_handler),
)
.route_service(
"/subscribe/ContentDirectory",
content_dir_sub_handler.into_service(),
)
.route_service(
"/subscribe/ConnectionManager",
connection_manager_sub_handler.into_service(),
)
.with_state(state);
Ok(app)
}

View file

@ -2,21 +2,20 @@ use std::{io::Write, time::Duration};
use anyhow::Context;
use gethostname::gethostname;
use http_handlers::make_router;
use librqbit_sha1_wrapper::ISha1;
use services::content_directory::ContentDirectoryBrowseProvider;
use ssdp::SsdpRunner;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use upnp_types::content_directory::ContentDirectoryBrowseProvider;
mod constants;
mod http_handlers;
mod http_server;
pub mod services;
mod ssdp;
pub mod state;
mod subscriptions;
mod templates;
pub mod upnp_types;
pub struct UpnpServerOptions {
pub friendly_name: String,
@ -78,7 +77,7 @@ impl UpnpServer {
.await
.context("error initializing SsdpRunner")?;
let router = make_router(
let router = crate::http_server::make_router(
opts.friendly_name,
opts.http_prefix,
usn,

View file

@ -0,0 +1,10 @@
<?xml version="1.0"?>
<s:Envelope
xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:GetCurrentConnectionIDsResponse xmlns:u="urn:schemas-upnp-org:service:ConnectionManager:1">
<ConnectionIDs></ConnectionIDs>
</u:GetCurrentConnectionIDsResponse>
</s:Body>
</s:Envelope>

View file

@ -0,0 +1,16 @@
<?xml version="1.0"?>
<s:Envelope
xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:GetCurrentConnectionInfoResponse xmlns:u="urn:schemas-upnp-org:service:ConnectionManager:1">
<RcsID>0</RcsID>
<AVTransportID>0</AVTransportID>
<ProtocolInfo>http-get:*:*:DLNA.ORG_OP=01</ProtocolInfo>
<PeerConnectionManager></PeerConnectionManager>
<PeerConnectionID>-1</PeerConnectionID>
<Direction>Output</Direction>
<Status>OK</Status>
</u:GetCurrentConnectionInfoResponse>
</s:Body>
</s:Envelope>

View file

@ -0,0 +1,11 @@
<?xml version="1.0"?>
<s:Envelope
xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:GetProtocolInfoResponse xmlns:u="urn:schemas-upnp-org:service:ConnectionManager:1">
<Source>http-get:*:*:DLNA.ORG_OP=01</Source>
<Sink></Sink>
</u:GetProtocolInfoResponse>
</s:Body>
</s:Envelope>

View file

@ -20,7 +20,7 @@
</argument>
</argumentList>
</action>
<action>
<!--action>
<name>PrepareForConnection</name>
<argumentList>
<argument>
@ -59,8 +59,8 @@
<relatedStateVariable>A_ARG_TYPE_RcsID</relatedStateVariable>
</argument>
</argumentList>
</action>
<action>
</action-->
<!--action>
<name>ConnectionComplete</name>
<argumentList>
<argument>
@ -69,7 +69,7 @@
<relatedStateVariable>A_ARG_TYPE_ConnectionID</relatedStateVariable>
</argument>
</argumentList>
</action>
</action-->
<action>
<name>GetCurrentConnectionIDs</name>
<argumentList>

View file

@ -0,0 +1,81 @@
use axum::{body::Bytes, extract::State, response::IntoResponse};
use bstr::BStr;
use http::{header::CONTENT_TYPE, HeaderMap, StatusCode};
use tracing::{debug, trace};
use crate::{
constants::CONTENT_TYPE_XML_UTF8, state::UnpnServerState, subscriptions::SubscribeRequest,
};
pub const SOAP_ACTION_GET_PROTOCOL_INFO: &[u8] =
b"\"urn:schemas-upnp-org:service:ConnectionManager:1#GetProtocolInfo\"";
pub const SOAP_ACTION_CONNECTION_COMPLETE: &[u8] =
b"\"urn:schemas-upnp-org:service:ConnectionManager:1#ConnectionComplete\"";
pub const SOAP_ACTION_GET_CURRENT_CONNECTION_IDS: &[u8] =
b"\"urn:schemas-upnp-org:service:ConnectionManager:1#GetCurrentConnectionIDs\"";
pub const SOAP_ACTION_GET_CURRENT_CONNECTION_INFO: &[u8] =
b"\"urn:schemas-upnp-org:service:ConnectionManager:1#GetCurrentConnectionInfo\"";
pub const SOAP_ACTION_PREPARE_FOR_CONNECTION: &[u8] =
b"\"urn:schemas-upnp-org:service:ConnectionManager:1#PrepareForConnection\"";
pub(crate) async fn http_handler(
headers: HeaderMap,
State(_state): State<UnpnServerState>,
body: Bytes,
) -> impl IntoResponse {
let body = BStr::new(&body);
let action = headers.get("soapaction").map(|v| BStr::new(v.as_bytes()));
trace!(?body, ?action, "received control request");
let action = match action {
Some(action) => action,
None => {
debug!("missing SOAPACTION header");
return (StatusCode::BAD_REQUEST, "").into_response();
}
};
let not_implemented = || StatusCode::NOT_IMPLEMENTED.into_response();
match action.as_ref() {
SOAP_ACTION_GET_PROTOCOL_INFO => (
[(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)],
include_str!("../resources/templates/connection_manager/control/get_protocol_info.xml"),
)
.into_response(),
SOAP_ACTION_GET_CURRENT_CONNECTION_INFO => (
[(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)],
include_str!(
"../resources/templates/connection_manager/control/get_current_connection_info.xml"
),
)
.into_response(),
SOAP_ACTION_GET_CURRENT_CONNECTION_IDS => (
[(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)],
include_str!(
"../resources/templates/connection_manager/control/get_current_connection_ids.xml"
),
)
.into_response(),
SOAP_ACTION_PREPARE_FOR_CONNECTION => not_implemented(),
SOAP_ACTION_CONNECTION_COMPLETE => not_implemented(),
_ => StatusCode::BAD_REQUEST.into_response(),
}
}
pub(crate) async fn subscribe_http_handler(
State(state): State<UnpnServerState>,
request: axum::extract::Request,
) -> impl IntoResponse {
let req = match SubscribeRequest::parse(request) {
Ok(sub) => sub,
Err(err) => return err,
};
let resp = state.handle_connection_manager_subscription_request(&req);
crate::subscriptions::subscription_into_response(&req, resp)
}

View file

@ -0,0 +1,336 @@
use std::sync::atomic::Ordering;
use axum::{body::Bytes, extract::State, response::IntoResponse};
use browse::response::ItemOrContainer;
use bstr::BStr;
use http::{header::CONTENT_TYPE, HeaderMap, StatusCode};
use tracing::{debug, trace};
use crate::{
constants::{
CONTENT_TYPE_XML_UTF8, SOAP_ACTION_CONTENT_DIRECTORY_BROWSE,
SOAP_ACTION_GET_SYSTEM_UPDATE_ID,
},
state::UnpnServerState,
};
pub mod browse {
pub mod request {
use anyhow::Context;
use serde::Deserialize;
#[derive(Deserialize)]
struct Envelope {
#[serde(rename = "Body")]
body: Body,
}
#[derive(Deserialize)]
struct Body {
#[serde(rename = "Browse")]
browse: ContentDirectoryControlRequest,
}
#[derive(Deserialize, PartialEq, Eq, Debug)]
pub enum BrowseFlag {
BrowseDirectChildren,
BrowseMetadata,
}
#[derive(Deserialize, Debug)]
pub struct ContentDirectoryControlRequest {
#[serde(rename = "ObjectID")]
pub object_id: usize,
#[serde(rename = "BrowseFlag")]
pub browse_flag: BrowseFlag,
#[serde(rename = "StartingIndex", default)]
pub starting_index: usize,
#[serde(rename = "RequestedCount", default)]
pub requested_count: usize,
}
impl ContentDirectoryControlRequest {
pub fn parse(s: &str) -> anyhow::Result<Self> {
let envelope: Envelope =
quick_xml::de::from_str(s).context("error deserializing")?;
Ok(envelope.body.browse)
}
}
}
pub mod response {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Container {
pub id: usize,
pub parent_id: Option<usize>,
pub children_count: Option<usize>,
pub title: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Item {
pub id: usize,
pub parent_id: Option<usize>,
pub title: String,
pub mime_type: Option<mime_guess::Mime>,
pub url: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ItemOrContainer {
Container(Container),
Item(Item),
}
pub(crate) fn render(items: impl IntoIterator<Item = ItemOrContainer>) -> String {
fn item_or_container(item_or_container: &ItemOrContainer) -> Option<String> {
fn item(item: &Item) -> Option<String> {
let mime = item.mime_type.as_ref()?;
let upnp_class = match mime.type_().as_str() {
"video" => "object.item.videoItem",
_ => return None,
};
let mime = mime.to_string();
Some(format!(
include_str!(
"../resources/templates/content_directory/control/browse/item.tmpl.xml"
),
id = item.id,
parent_id = item.parent_id.unwrap_or(0),
mime_type = mime,
url = item.url,
upnp_class = upnp_class,
title = item.title
))
}
fn container(item: &Container) -> String {
let child_count_tag = match item.children_count {
Some(cc) => format!("childCount=\"{}\"", cc),
None => String::new(),
};
format!(
include_str!(
"../resources/templates/content_directory/control/browse/container.tmpl.xml"
),
id = item.id,
parent_id = item.parent_id.unwrap_or(0),
title = item.title,
childCountTag = child_count_tag
)
}
match item_or_container {
ItemOrContainer::Container(c) => Some(container(c)),
ItemOrContainer::Item(i) => item(i),
}
}
struct Envelope<'a> {
items: &'a str,
number_returned: usize,
total_matches: usize,
update_id: u64,
}
fn render_response(envelope: &Envelope<'_>) -> String {
format!(
include_str!(
"../resources/templates/content_directory/control/browse/response.tmpl.xml"
),
items = envelope.items,
number_returned = envelope.number_returned,
total_matches = envelope.total_matches,
update_id = envelope.update_id
)
}
let all_items = items
.into_iter()
.filter_map(|item| item_or_container(&item))
.collect::<Vec<_>>();
let total = all_items.len();
let all_items = all_items.join("");
use std::time::{SystemTime, UNIX_EPOCH};
let update_id = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
render_response(&Envelope {
items: &all_items,
number_returned: total,
total_matches: total,
update_id,
})
}
}
}
pub mod get_system_update_id {
pub(crate) fn render_notify(update_id: u64) -> String {
format!(
include_str!(
"../resources/templates/content_directory/subscriptions/system_update_id.tmpl.xml"
),
system_update_id = update_id
)
}
pub(crate) fn render_response(update_id: u64) -> String {
format!(
include_str!(
"../resources/templates/content_directory/control/get_system_update_id/response.tmpl.xml"
),
id = update_id
)
}
}
pub mod subscription {
use axum::{extract::State, response::IntoResponse};
use http::Method;
use crate::{state::UnpnServerState, subscriptions::SubscribeRequest};
pub(crate) async fn subscribe_http_handler(
State(state): State<UnpnServerState>,
request: axum::extract::Request,
) -> impl IntoResponse {
let req = match SubscribeRequest::parse(request) {
Ok(sub) => sub,
Err(err) => return err,
};
let resp = state.handle_content_directory_subscription_request(&req);
crate::subscriptions::subscription_into_response(&req, resp)
}
pub async fn notify_system_id_update(
url: &url::Url,
sid: &str,
seq: u64,
system_update_id: u64,
) -> anyhow::Result<()> {
// NOTIFY /callback_path HTTP/1.1
// CONTENT-TYPE: text/xml; charset="utf-8"
// NT: upnp:event
// NTS: upnp:propchange
// SID: uuid:<Subscription ID>
// SEQ: <sequence number>
//
let body = super::get_system_update_id::render_notify(system_update_id);
let resp = reqwest::Client::builder()
.build()?
.request(Method::from_bytes(b"NOTIFY")?, url.clone())
.header("Content-Type", r#"text/xml; charset="utf-8""#)
.header("NT", "upnp:event")
.header("NTS", "upnp:propchange")
.header("SID", sid)
.header("SEQ", seq.to_string())
.body(body)
.send()
.await?;
if !resp.status().is_success() {
anyhow::bail!("{:?}", resp.status())
}
Ok(())
}
}
pub(crate) async fn http_handler(
headers: HeaderMap,
State(state): State<UnpnServerState>,
body: Bytes,
) -> impl IntoResponse {
let body = BStr::new(&body);
let action = headers.get("soapaction").map(|v| BStr::new(v.as_bytes()));
trace!(?body, ?action, "received control request");
let action = match action {
Some(action) => action,
None => {
debug!("missing SOAPACTION header");
return (StatusCode::BAD_REQUEST, "").into_response();
}
};
match action.as_ref() {
SOAP_ACTION_CONTENT_DIRECTORY_BROWSE => {
let http_hostname = headers
.get("host")
.and_then(|h| std::str::from_utf8(h.as_bytes()).ok())
.and_then(|h| h.split(':').next());
let http_hostname = match http_hostname {
Some(h) => h,
None => return StatusCode::BAD_REQUEST.into_response(),
};
let body = match std::str::from_utf8(body) {
Ok(body) => body,
Err(_) => return (StatusCode::BAD_REQUEST, "cannot parse request").into_response(),
};
let request = match browse::request::ContentDirectoryControlRequest::parse(body) {
Ok(req) => req,
Err(e) => {
debug!(error=?e, "error parsing XML");
return (StatusCode::BAD_REQUEST, "cannot parse request").into_response();
}
};
use browse::request::BrowseFlag;
match request.browse_flag {
BrowseFlag::BrowseDirectChildren => (
[(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)],
browse::response::render(
state
.provider
.browse_direct_children(request.object_id, http_hostname),
),
)
.into_response(),
BrowseFlag::BrowseMetadata => StatusCode::NOT_IMPLEMENTED.into_response(),
}
}
SOAP_ACTION_GET_SYSTEM_UPDATE_ID => {
let update_id = state.system_update_id.load(Ordering::Relaxed);
(
[(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)],
get_system_update_id::render_response(update_id),
)
.into_response()
}
_ => {
debug!(?action, "unsupported ContentDirectory action");
(StatusCode::NOT_IMPLEMENTED, "").into_response()
}
}
}
pub trait ContentDirectoryBrowseProvider: Send + Sync {
fn browse_direct_children(&self, parent_id: usize, http_hostname: &str)
-> Vec<ItemOrContainer>;
}
impl ContentDirectoryBrowseProvider for Vec<ItemOrContainer> {
fn browse_direct_children(&self, _parent_id: usize, _http_host: &str) -> Vec<ItemOrContainer> {
self.clone()
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_parse_content_directory_request() {
use super::browse::request::{BrowseFlag, ContentDirectoryControlRequest};
let s = include_str!("../resources/test/ContentDirectoryControlExampleRequest.xml");
let req = ContentDirectoryControlRequest::parse(s).unwrap();
assert_eq!(req.object_id, 5);
assert_eq!(req.browse_flag, BrowseFlag::BrowseDirectChildren)
}
}

View file

@ -0,0 +1,2 @@
pub mod connection_manager;
pub mod content_directory;

View file

@ -12,19 +12,18 @@ use librqbit_core::spawn_utils::spawn_with_cancel;
use tokio_util::sync::CancellationToken;
use tracing::{error_span, Span};
use crate::{
subscriptions::Subscriptions, upnp_types::content_directory::ContentDirectoryBrowseProvider,
};
use crate::{subscriptions::Subscriptions, ContentDirectoryBrowseProvider};
pub struct UpnpServerStateInner {
pub rendered_root_description: Bytes,
pub provider: Box<dyn ContentDirectoryBrowseProvider>,
pub system_update_id: AtomicU64,
pub subscriptions: Subscriptions,
pub(crate) rendered_root_description: Bytes,
pub(crate) provider: Box<dyn ContentDirectoryBrowseProvider>,
pub(crate) system_update_id: AtomicU64,
pub(crate) content_directory_subscriptions: Subscriptions,
pub(crate) connection_manager_subscriptions: Subscriptions,
pub span: Span,
pub system_update_bcast_tx: tokio::sync::broadcast::Sender<u64>,
pub cancel_token: tokio_util::sync::CancellationToken,
pub(crate) span: Span,
pub(crate) system_update_bcast_tx: tokio::sync::broadcast::Sender<u64>,
pub(crate) cancel_token: tokio_util::sync::CancellationToken,
_drop_guard: tokio_util::sync::DropGuard,
}
@ -46,7 +45,8 @@ impl UpnpServerStateInner {
rendered_root_description,
provider,
system_update_id: AtomicU64::new(new_system_update_id()?),
subscriptions: Default::default(),
content_directory_subscriptions: Default::default(),
connection_manager_subscriptions: Default::default(),
system_update_bcast_tx: btx,
_drop_guard: drop_guard,
span: span.clone(),

View file

@ -1,7 +1,7 @@
use crate::state::UpnpServerStateInner;
use crate::templates::render_notify_subscription_system_update_id;
use anyhow::Context;
use http::Method;
use axum::response::IntoResponse;
use http::{HeaderName, StatusCode};
use librqbit_core::spawn_utils::spawn_with_cancel;
use parking_lot::RwLock;
use std::{
@ -13,6 +13,7 @@ use tokio::sync::{broadcast::error::RecvError, Notify};
use tracing::{debug, error_span, trace, warn, Instrument};
pub struct Subscription {
#[allow(dead_code)]
pub url: url::Url,
pub seq: u64,
pub timeout: Duration,
@ -69,50 +70,171 @@ impl Subscriptions {
}
}
pub async fn notify_subscription_system_update(
url: &url::Url,
sid: &str,
seq: u64,
system_update_id: u64,
) -> anyhow::Result<()> {
// NOTIFY /callback_path HTTP/1.1
// CONTENT-TYPE: text/xml; charset="utf-8"
// NT: upnp:event
// NTS: upnp:propchange
// SID: uuid:<Subscription ID>
// SEQ: <sequence number>
//
let body = render_notify_subscription_system_update_id(system_update_id);
#[derive(Debug)]
pub enum SubscribeRequest {
Create {
callback: url::Url,
timeout: Duration,
},
Renew {
sid: String,
timeout: Duration,
},
}
let resp = reqwest::Client::builder()
.build()?
.request(Method::from_bytes(b"NOTIFY")?, url.clone())
.header("Content-Type", r#"text/xml; charset="utf-8""#)
.header("NT", "upnp:event")
.header("NTS", "upnp:propchange")
.header("SID", sid)
.header("SEQ", seq.to_string())
.body(body)
.send()
.await?;
if !resp.status().is_success() {
anyhow::bail!("{:?}", resp.status())
impl core::fmt::Display for SubscribeRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SubscribeRequest::Create { callback, timeout } => {
write!(f, "create;callback={callback};timeout={timeout:?}")
}
SubscribeRequest::Renew { sid, timeout } => {
write!(f, "renew;sid={sid};timeout={timeout:?}")
}
}
}
Ok(())
}
impl SubscribeRequest {
fn timeout(&self) -> Duration {
match self {
SubscribeRequest::Create { timeout, .. } => *timeout,
SubscribeRequest::Renew { timeout, .. } => *timeout,
}
}
}
impl SubscribeRequest {
pub fn parse(
request: axum::extract::Request,
) -> Result<SubscribeRequest, axum::response::Response> {
if request.method().as_str() != "SUBSCRIBE" {
return Err(StatusCode::METHOD_NOT_ALLOWED.into_response());
}
let (parts, _body) = request.into_parts();
let is_event = parts
.headers
.get(HeaderName::from_static("nt"))
.map(|v| v.as_bytes() == b"upnp:event")
.unwrap_or_default();
let callback = parts
.headers
.get(HeaderName::from_static("callback"))
.and_then(|v| v.to_str().ok())
.map(|s| s.trim_matches(|c| c == '>' || c == '<'))
.and_then(|u| url::Url::parse(u).ok());
let subscription_id = parts
.headers
.get(HeaderName::from_static("sid"))
.and_then(|v| v.to_str().ok());
let timeout = parts
.headers
.get(HeaderName::from_static("timeout"))
.and_then(|v| v.to_str().ok())
.and_then(|t| t.strip_prefix("Second-"))
.and_then(|t| t.parse::<u16>().ok())
.map(|t| Duration::from_secs(t as u64));
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800);
let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT).min(DEFAULT_TIMEOUT);
match (is_event, callback, subscription_id) {
(true, Some(callback), None) => Ok(SubscribeRequest::Create { callback, timeout }),
(_, _, Some(sid)) => Ok(SubscribeRequest::Renew {
sid: sid.to_owned(),
timeout,
}),
_ => Err(StatusCode::BAD_REQUEST.into_response()),
}
}
}
#[derive(Debug)]
pub(crate) enum SubscriptionResult {
Renewed { sid: String },
Created { sid: String },
}
impl SubscriptionResult {
fn sid(&self) -> &str {
match self {
SubscriptionResult::Renewed { sid } => sid,
SubscriptionResult::Created { sid } => sid,
}
}
}
pub(crate) fn subscription_into_response(
request: &SubscribeRequest,
result: anyhow::Result<SubscriptionResult>,
) -> axum::response::Response {
trace!(%request, ?result, "request->response");
let result = match result {
Ok(r) => r,
Err(e) => {
warn!(error=?e, sub=?request, "error handling subscription request");
return StatusCode::BAD_REQUEST.into_response();
}
};
(
StatusCode::OK,
[
("SID", result.sid().to_owned()),
("TIMEOUT", format!("Second-{}", request.timeout().as_secs())),
],
)
.into_response()
}
impl UpnpServerStateInner {
pub fn renew_subscription(&self, sid: &str, new_timeout: Duration) -> anyhow::Result<()> {
self.subscriptions.update_timeout(sid, new_timeout)
pub(crate) fn handle_content_directory_subscription_request(
self: &Arc<Self>,
req: &SubscribeRequest,
) -> anyhow::Result<SubscriptionResult> {
match req {
SubscribeRequest::Create { callback, timeout } => {
let sid = self.new_content_directory_subscription(callback.clone(), *timeout)?;
Ok(SubscriptionResult::Created { sid })
}
SubscribeRequest::Renew { sid, timeout } => {
self.content_directory_subscriptions
.update_timeout(sid, *timeout)?;
Ok(SubscriptionResult::Renewed { sid: sid.clone() })
}
}
}
pub fn new_subscription(
pub(crate) fn handle_connection_manager_subscription_request(
self: &Arc<Self>,
req: &SubscribeRequest,
) -> anyhow::Result<SubscriptionResult> {
match req {
SubscribeRequest::Create { callback, timeout } => {
let sid = self.new_connection_manager_subscription(callback.clone(), *timeout)?;
Ok(SubscriptionResult::Created { sid })
}
SubscribeRequest::Renew { sid, timeout } => {
self.connection_manager_subscriptions
.update_timeout(sid, *timeout)?;
Ok(SubscriptionResult::Renewed { sid: sid.clone() })
}
}
}
fn new_content_directory_subscription(
self: &Arc<Self>,
url: url::Url,
timeout: Duration,
) -> anyhow::Result<String> {
let (sid, refresh_notify) = self.subscriptions.add(url.clone(), timeout);
let (sid, refresh_notify) = self
.content_directory_subscriptions
.add(url.clone(), timeout);
let token = self.cancel_token.child_token();
// Spawn a task that will notify it of system id changes.
@ -127,37 +249,28 @@ impl UpnpServerStateInner {
let url = url.clone();
async move {
use crate::services::content_directory::subscription::notify_system_id_update;
let system_update_id_notifier = async {
loop {
let res = brx.recv().await;
let state = state.upgrade().context("upnp server dead")?;
let seq = state.subscriptions.next_seq(&sid)?;
let seq = state.content_directory_subscriptions.next_seq(&sid)?;
match res {
Ok(system_update_id) => {
trace!(system_update_id, "notifying SystemUpdateId update");
if let Err(e) = notify_subscription_system_update(
&url,
&sid,
seq,
system_update_id,
)
.await
if let Err(e) =
notify_system_id_update(&url, &sid, seq, system_update_id).await
{
debug!(error=?e, "error updating UPNP subscription");
}
}
Err(RecvError::Lagged(by)) => {
warn!(by, "UPNP subscription lagged");
let seq = state.subscriptions.next_seq(&sid)?;
let seq = state.content_directory_subscriptions.next_seq(&sid)?;
let system_update_id =
state.system_update_id.load(Ordering::Relaxed);
if let Err(e) = notify_subscription_system_update(
&url,
&sid,
seq,
system_update_id,
)
.await
if let Err(e) =
notify_system_id_update(&url, &sid, seq, system_update_id).await
{
debug!(error=?e, "error updating UPNP subscription");
}
@ -173,10 +286,12 @@ impl UpnpServerStateInner {
loop {
tokio::select! {
_ = refresh_notify.notified() => {
timeout = state.upgrade().context("upnp server dead")?.subscriptions.get_timeout(&sid)?;
timeout = state.upgrade().context("upnp server dead")?.content_directory_subscriptions.get_timeout(&sid)?;
trace!(?timeout, "refreshed subscription");
},
_ = tokio::time::sleep(timeout) => {
state.upgrade().context("upnp server dead")?.subscriptions.remove(&sid)?;
state.upgrade().context("upnp server dead")?.content_directory_subscriptions.remove(&sid)?;
trace!(?timeout, "subscription timed out, removing");
return Ok(())
}
}
@ -191,7 +306,55 @@ impl UpnpServerStateInner {
};
spawn_with_cancel(
error_span!(parent: pspan, "subscription-manager", sid, %url),
error_span!(parent: pspan, "subscription-manager", sid, %url, service="ContentDirectory"),
token,
subscription_manager,
);
Ok(sid)
}
fn new_connection_manager_subscription(
self: &Arc<Self>,
url: url::Url,
timeout: Duration,
) -> anyhow::Result<String> {
let (sid, refresh_notify) = self
.connection_manager_subscriptions
.add(url.clone(), timeout);
let token = self.cancel_token.child_token();
// Spawn a task that will notify it of system id changes.
// Spawn a task that will wait for timeout or subscription refreshes.
// When it times out, kill all of them.
let pspan = self.span.clone();
let subscription_manager = {
let state = Arc::downgrade(self);
let sid = sid.clone();
async move {
let timeout_notifier = async {
let mut timeout = timeout;
loop {
tokio::select! {
_ = refresh_notify.notified() => {
timeout = state.upgrade().context("upnp server dead")?.connection_manager_subscriptions.get_timeout(&sid)?;
},
_ = tokio::time::sleep(timeout) => {
state.upgrade().context("upnp server dead")?.connection_manager_subscriptions.remove(&sid)?;
return Ok(())
}
}
}
}.instrument(error_span!("timeout-killer"));
timeout_notifier.await
}
};
spawn_with_cancel(
error_span!(parent: pspan, "subscription-manager", sid, %url, service="ConnectionManager"),
token,
subscription_manager,
);

View file

@ -1,119 +1 @@
use crate::upnp_types::content_directory::response::{Container, Item, ItemOrContainer};
pub struct RootDescriptionInputs<'a> {
pub friendly_name: &'a str,
pub manufacturer: &'a str,
pub model_name: &'a str,
pub unique_id: &'a str,
pub http_prefix: &'a str,
}
pub fn render_root_description_xml(input: &RootDescriptionInputs<'_>) -> String {
format!(
include_str!("resources/templates/root_desc.tmpl.xml"),
friendly_name = input.friendly_name,
manufacturer = input.manufacturer,
model_name = input.model_name,
unique_id = input.unique_id,
http_prefix = input.http_prefix
)
}
pub fn render_content_directory_browse(items: impl IntoIterator<Item = ItemOrContainer>) -> String {
fn item_or_container(item_or_container: &ItemOrContainer) -> Option<String> {
fn item(item: &Item) -> Option<String> {
let mime = item.mime_type.as_ref()?;
let upnp_class = match mime.type_().as_str() {
"video" => "object.item.videoItem",
_ => return None,
};
let mime = mime.to_string();
Some(format!(
include_str!("resources/templates/content_directory/control/browse/item.tmpl.xml"),
id = item.id,
parent_id = item.parent_id.unwrap_or(0),
mime_type = mime,
url = item.url,
upnp_class = upnp_class,
title = item.title
))
}
fn container(item: &Container) -> String {
let child_count_tag = match item.children_count {
Some(cc) => format!("childCount=\"{}\"", cc),
None => String::new(),
};
format!(
include_str!(
"resources/templates/content_directory/control/browse/container.tmpl.xml"
),
id = item.id,
parent_id = item.parent_id.unwrap_or(0),
title = item.title,
childCountTag = child_count_tag
)
}
match item_or_container {
ItemOrContainer::Container(c) => Some(container(c)),
ItemOrContainer::Item(i) => item(i),
}
}
struct Envelope<'a> {
items: &'a str,
number_returned: usize,
total_matches: usize,
update_id: u64,
}
fn render_response(envelope: &Envelope<'_>) -> String {
format!(
include_str!("resources/templates/content_directory/control/browse/response.tmpl.xml"),
items = envelope.items,
number_returned = envelope.number_returned,
total_matches = envelope.total_matches,
update_id = envelope.update_id
)
}
let all_items = items
.into_iter()
.filter_map(|item| item_or_container(&item))
.collect::<Vec<_>>();
let total = all_items.len();
let all_items = all_items.join("");
use std::time::{SystemTime, UNIX_EPOCH};
let update_id = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
render_response(&Envelope {
items: &all_items,
number_returned: total,
total_matches: total,
update_id,
})
}
pub fn render_notify_subscription_system_update_id(update_id: u64) -> String {
format!(
include_str!(
"resources/templates/content_directory/subscriptions/system_update_id.tmpl.xml"
),
system_update_id = update_id
)
}
pub fn render_content_directory_control_get_system_update_id(update_id: u64) -> String {
format!(
include_str!(
"resources/templates/content_directory/control/get_system_update_id/response.tmpl.xml"
),
id = update_id
)
}

View file

@ -1,104 +0,0 @@
pub mod content_directory {
use response::ItemOrContainer;
pub mod request {
use anyhow::Context;
use serde::Deserialize;
#[derive(Deserialize)]
struct Envelope {
#[serde(rename = "Body")]
body: Body,
}
#[derive(Deserialize)]
struct Body {
#[serde(rename = "Browse")]
browse: ContentDirectoryControlRequest,
}
#[derive(Deserialize, PartialEq, Eq, Debug)]
pub enum BrowseFlag {
BrowseDirectChildren,
BrowseMetadata,
}
#[derive(Deserialize, Debug)]
pub struct ContentDirectoryControlRequest {
#[serde(rename = "ObjectID")]
pub object_id: usize,
#[serde(rename = "BrowseFlag")]
pub browse_flag: BrowseFlag,
#[serde(rename = "StartingIndex", default)]
pub starting_index: usize,
#[serde(rename = "RequestedCount", default)]
pub requested_count: usize,
}
impl ContentDirectoryControlRequest {
pub fn parse(s: &str) -> anyhow::Result<Self> {
let envelope: Envelope =
quick_xml::de::from_str(s).context("error deserializing")?;
Ok(envelope.body.browse)
}
}
}
pub mod response {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Container {
pub id: usize,
pub parent_id: Option<usize>,
pub children_count: Option<usize>,
pub title: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Item {
pub id: usize,
pub parent_id: Option<usize>,
pub title: String,
pub mime_type: Option<mime_guess::Mime>,
pub url: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ItemOrContainer {
Container(Container),
Item(Item),
}
}
pub trait ContentDirectoryBrowseProvider: Send + Sync {
fn browse_direct_children(
&self,
parent_id: usize,
http_hostname: &str,
) -> Vec<ItemOrContainer>;
}
impl ContentDirectoryBrowseProvider for Vec<ItemOrContainer> {
fn browse_direct_children(
&self,
_parent_id: usize,
_http_host: &str,
) -> Vec<ItemOrContainer> {
self.clone()
}
}
}
#[cfg(test)]
mod tests {
use crate::upnp_types::content_directory::request::{
BrowseFlag, ContentDirectoryControlRequest,
};
#[test]
fn test_parse_content_directory_request() {
let s = include_str!("resources/test/ContentDirectoryControlExampleRequest.xml");
let req = ContentDirectoryControlRequest::parse(s).unwrap();
assert_eq!(req.object_id, 5);
assert_eq!(req.browse_flag, BrowseFlag::BrowseDirectChildren)
}
}