From 96eb5fec73a890972fc8369d5be69718913954ec Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 28 Aug 2024 23:04:26 +0100 Subject: [PATCH] ConnectionManager service stub --- crates/upnp-serve/src/http_server.rs | 19 +++- .../src/services/connection_manager.rs | 95 +++++++++++++++++++ crates/upnp-serve/src/state.rs | 2 + crates/upnp-serve/src/subscriptions.rs | 59 +++++++++++- 4 files changed, 170 insertions(+), 5 deletions(-) diff --git a/crates/upnp-serve/src/http_server.rs b/crates/upnp-serve/src/http_server.rs index 74a7df2..b2c3ed4 100644 --- a/crates/upnp-serve/src/http_server.rs +++ b/crates/upnp-serve/src/http_server.rs @@ -5,7 +5,7 @@ use axum::{ response::IntoResponse, routing::{get, post}, }; -use http::{header::CONTENT_TYPE, StatusCode}; +use http::header::CONTENT_TYPE; use tokio_util::sync::CancellationToken; use crate::{ @@ -69,6 +69,17 @@ pub fn make_router( } }; + let connection_manager_sub_handler = { + let state = state.clone(); + move |request: axum::extract::Request| async move { + crate::services::connection_manager::subscribe_http_handler( + State(state.clone()), + request, + ) + .await + } + }; + let app = axum::Router::new() .route("/description.xml", get(description_xml)) .route( @@ -85,15 +96,15 @@ pub fn make_router( ) .route( "/control/ConnectionManager", - post(|| async { (StatusCode::NOT_IMPLEMENTED, "") }), + post(crate::services::connection_manager::http_handler), ) .route_service( "/subscribe/ContentDirectory", content_dir_sub_handler.into_service(), ) - .route( + .route_service( "/subscribe/ConnectionManager", - post(|| async { (StatusCode::NOT_IMPLEMENTED, "") }), + connection_manager_sub_handler.into_service(), ) .with_state(state); diff --git a/crates/upnp-serve/src/services/connection_manager.rs b/crates/upnp-serve/src/services/connection_manager.rs index e69de29..380883b 100644 --- a/crates/upnp-serve/src/services/connection_manager.rs +++ b/crates/upnp-serve/src/services/connection_manager.rs @@ -0,0 +1,95 @@ +use axum::{body::Bytes, extract::State, response::IntoResponse}; +use bstr::BStr; +use http::{HeaderMap, StatusCode}; +use tracing::{debug, trace, warn}; + +use crate::{state::UnpnServerState, subscriptions::SubscribeRequest}; + +pub const SOAP_ACTION_GET_PROTOCOL_INFO: &[u8] = + b"\"urn:schemas-upnp-org:service:ConnectionManager:1#GetProtocolInfo\""; + +pub const SOAP_ACTION_CONNECTION_COMPLETE: &[u8] = + b"\"urn:schemas-upnp-org:service:ConnectionManager:1#ConnectionComplete\""; + +pub const SOAP_ACTION_GET_CURRENT_CONNECTION_IDS: &[u8] = + b"\"urn:schemas-upnp-org:service:ConnectionManager:1#GetCurrentConnectionIDs\""; + +pub const SOAP_ACTION_GET_CURRENT_CONNECTION_INFO: &[u8] = + b"\"urn:schemas-upnp-org:service:ConnectionManager:1#GetCurrentConnectionInfo\""; + +pub const SOAP_ACTION_PREPARE_FOR_CONNECTION: &[u8] = + b"\"urn:schemas-upnp-org:service:ConnectionManager:1#PrepareForConnection\""; + +pub(crate) async fn http_handler( + headers: HeaderMap, + State(_state): State, + body: Bytes, +) -> impl IntoResponse { + let body = BStr::new(&body); + let action = headers.get("soapaction").map(|v| BStr::new(v.as_bytes())); + trace!(?body, ?action, "received control request"); + let action = match action { + Some(action) => action, + None => { + debug!("missing SOAPACTION header"); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let not_implemented = StatusCode::NOT_IMPLEMENTED.into_response(); + + match action.as_ref() { + SOAP_ACTION_GET_PROTOCOL_INFO => not_implemented, + SOAP_ACTION_CONNECTION_COMPLETE => not_implemented, + SOAP_ACTION_GET_CURRENT_CONNECTION_INFO => not_implemented, + SOAP_ACTION_GET_CURRENT_CONNECTION_IDS => not_implemented, + SOAP_ACTION_PREPARE_FOR_CONNECTION => not_implemented, + _ => StatusCode::BAD_REQUEST.into_response(), + } +} + +pub(crate) async fn subscribe_http_handler( + State(state): State, + request: axum::extract::Request, +) -> impl IntoResponse { + let SubscribeRequest { + callback, + subscription_id, + timeout, + } = match SubscribeRequest::parse(request) { + Ok(sub) => sub, + Err(e) => return e, + }; + + if let Some(sid) = subscription_id { + match state.renew_connection_manager_subscription(&sid, timeout) { + Ok(()) => ( + StatusCode::OK, + [ + ("SID", sid.to_owned()), + ("TIMEOUT", format!("Second-{}", timeout.as_secs())), + ], + ) + .into_response(), + Err(e) => { + warn!(sid, error=?e, "error renewing subscription"); + StatusCode::NOT_FOUND.into_response() + } + } + } else { + match state.new_connection_manager_subscription(callback, timeout) { + Ok(sid) => ( + StatusCode::OK, + [ + ("SID", sid), + ("TIMEOUT", format!("Second-{}", timeout.as_secs())), + ], + ) + .into_response(), + Err(e) => { + warn!(error=?e, "error creating subscription"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } + } +} diff --git a/crates/upnp-serve/src/state.rs b/crates/upnp-serve/src/state.rs index 8623e0b..024309f 100644 --- a/crates/upnp-serve/src/state.rs +++ b/crates/upnp-serve/src/state.rs @@ -19,6 +19,7 @@ pub struct UpnpServerStateInner { pub(crate) provider: Box, pub(crate) system_update_id: AtomicU64, pub(crate) content_directory_subscriptions: Subscriptions, + pub(crate) connection_manager_subscriptions: Subscriptions, pub(crate) span: Span, pub(crate) system_update_bcast_tx: tokio::sync::broadcast::Sender, @@ -45,6 +46,7 @@ impl UpnpServerStateInner { provider, system_update_id: AtomicU64::new(new_system_update_id()?), content_directory_subscriptions: Default::default(), + connection_manager_subscriptions: Default::default(), system_update_bcast_tx: btx, _drop_guard: drop_guard, span: span.clone(), diff --git a/crates/upnp-serve/src/subscriptions.rs b/crates/upnp-serve/src/subscriptions.rs index fd53490..383b220 100644 --- a/crates/upnp-serve/src/subscriptions.rs +++ b/crates/upnp-serve/src/subscriptions.rs @@ -80,6 +80,15 @@ impl UpnpServerStateInner { .update_timeout(sid, new_timeout) } + pub fn renew_connection_manager_subscription( + &self, + sid: &str, + new_timeout: Duration, + ) -> anyhow::Result<()> { + self.content_directory_subscriptions + .update_timeout(sid, new_timeout) + } + pub fn new_content_directory_subscription( self: &Arc, url: url::Url, @@ -157,7 +166,55 @@ impl UpnpServerStateInner { }; spawn_with_cancel( - error_span!(parent: pspan, "subscription-manager", sid, %url), + error_span!(parent: pspan, "subscription-manager", sid, %url, service="ContentDirectory"), + token, + subscription_manager, + ); + + Ok(sid) + } + + pub fn new_connection_manager_subscription( + self: &Arc, + url: url::Url, + timeout: Duration, + ) -> anyhow::Result { + let (sid, refresh_notify) = self + .content_directory_subscriptions + .add(url.clone(), timeout); + let token = self.cancel_token.child_token(); + + // Spawn a task that will notify it of system id changes. + // Spawn a task that will wait for timeout or subscription refreshes. + // When it times out, kill all of them. + + let pspan = self.span.clone(); + let subscription_manager = { + let state = Arc::downgrade(self); + let sid = sid.clone(); + + async move { + let timeout_notifier = async { + let mut timeout = timeout; + loop { + tokio::select! { + _ = refresh_notify.notified() => { + timeout = state.upgrade().context("upnp server dead")?.connection_manager_subscriptions.get_timeout(&sid)?; + }, + _ = tokio::time::sleep(timeout) => { + state.upgrade().context("upnp server dead")?.connection_manager_subscriptions.remove(&sid)?; + return Ok(()) + } + } + } + }.instrument(error_span!("timeout-killer")); + + timeout_notifier.await + } + }; + + spawn_with_cancel( + error_span!(parent: pspan, "subscription-manager", sid, %url, service="ConnectionManager"), token, subscription_manager, );