diff --git a/crates/upnp-serve/src/services/connection_manager.rs b/crates/upnp-serve/src/services/connection_manager.rs index 380883b..9b7817f 100644 --- a/crates/upnp-serve/src/services/connection_manager.rs +++ b/crates/upnp-serve/src/services/connection_manager.rs @@ -1,7 +1,7 @@ use axum::{body::Bytes, extract::State, response::IntoResponse}; use bstr::BStr; use http::{HeaderMap, StatusCode}; -use tracing::{debug, trace, warn}; +use tracing::{debug, trace}; use crate::{state::UnpnServerState, subscriptions::SubscribeRequest}; @@ -52,44 +52,11 @@ pub(crate) async fn subscribe_http_handler( State(state): State, request: axum::extract::Request, ) -> impl IntoResponse { - let SubscribeRequest { - callback, - subscription_id, - timeout, - } = match SubscribeRequest::parse(request) { + let req = match SubscribeRequest::parse(request) { Ok(sub) => sub, - Err(e) => return e, + Err(err) => return err, }; - if let Some(sid) = subscription_id { - match state.renew_connection_manager_subscription(&sid, timeout) { - Ok(()) => ( - StatusCode::OK, - [ - ("SID", sid.to_owned()), - ("TIMEOUT", format!("Second-{}", timeout.as_secs())), - ], - ) - .into_response(), - Err(e) => { - warn!(sid, error=?e, "error renewing subscription"); - StatusCode::NOT_FOUND.into_response() - } - } - } else { - match state.new_connection_manager_subscription(callback, timeout) { - Ok(sid) => ( - StatusCode::OK, - [ - ("SID", sid), - ("TIMEOUT", format!("Second-{}", timeout.as_secs())), - ], - ) - .into_response(), - Err(e) => { - warn!(error=?e, "error creating subscription"); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } - } + let resp = state.handle_connection_manager_subscription_request(&req); + crate::subscriptions::subscription_into_response(&req, resp) } diff --git a/crates/upnp-serve/src/services/content_directory.rs b/crates/upnp-serve/src/services/content_directory.rs index 3c8db42..8fa9571 100644 --- a/crates/upnp-serve/src/services/content_directory.rs +++ b/crates/upnp-serve/src/services/content_directory.rs @@ -191,8 +191,7 @@ pub mod get_system_update_id { pub mod subscription { use axum::{extract::State, response::IntoResponse}; - use http::{Method, StatusCode}; - use tracing::warn; + use http::Method; use crate::{state::UnpnServerState, subscriptions::SubscribeRequest}; @@ -200,46 +199,13 @@ pub mod subscription { State(state): State, request: axum::extract::Request, ) -> impl IntoResponse { - let SubscribeRequest { - callback, - subscription_id, - timeout, - } = match SubscribeRequest::parse(request) { + let req = match SubscribeRequest::parse(request) { Ok(sub) => sub, Err(err) => return err, }; - if let Some(sid) = subscription_id { - match state.renew_content_directory_subscription(&sid, timeout) { - Ok(()) => ( - StatusCode::OK, - [ - ("SID", sid.to_owned()), - ("TIMEOUT", format!("Second-{}", timeout.as_secs())), - ], - ) - .into_response(), - Err(e) => { - warn!(sid, error=?e, "error renewing subscription"); - StatusCode::NOT_FOUND.into_response() - } - } - } else { - match state.new_content_directory_subscription(callback, timeout) { - Ok(sid) => ( - StatusCode::OK, - [ - ("SID", sid), - ("TIMEOUT", format!("Second-{}", timeout.as_secs())), - ], - ) - .into_response(), - Err(e) => { - warn!(error=?e, "error creating subscription"); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } - } + let resp = state.handle_content_directory_subscription_request(&req); + crate::subscriptions::subscription_into_response(&req, resp) } pub async fn notify_system_id_update( diff --git a/crates/upnp-serve/src/subscriptions.rs b/crates/upnp-serve/src/subscriptions.rs index 383b220..573f791 100644 --- a/crates/upnp-serve/src/subscriptions.rs +++ b/crates/upnp-serve/src/subscriptions.rs @@ -70,26 +70,139 @@ impl Subscriptions { } } +#[derive(Debug)] +pub struct SubscribeRequest { + pub callback: url::Url, + pub subscription_id: Option, + pub timeout: Duration, +} + +impl SubscribeRequest { + pub fn parse( + request: axum::extract::Request, + ) -> Result { + if request.method().as_str() != "SUBSCRIBE" { + return Err(StatusCode::METHOD_NOT_ALLOWED.into_response()); + } + + let (parts, _body) = request.into_parts(); + let is_event = parts + .headers + .get(HeaderName::from_static("nt")) + .map(|v| v.as_bytes() == b"upnp:event") + .unwrap_or_default(); + if !is_event { + return Err((StatusCode::BAD_REQUEST, "expected NT: upnp:event header").into_response()); + } + + let callback = parts + .headers + .get(HeaderName::from_static("callback")) + .and_then(|v| v.to_str().ok()) + .map(|s| s.trim_matches(|c| c == '>' || c == '<')) + .and_then(|u| url::Url::parse(u).ok()); + let callback = match callback { + Some(c) => c, + None => return Err((StatusCode::BAD_REQUEST, "callback not provided").into_response()), + }; + let subscription_id = parts + .headers + .get(HeaderName::from_static("sid")) + .and_then(|v| v.to_str().ok()); + + let timeout = parts + .headers + .get(HeaderName::from_static("timeout")) + .and_then(|v| v.to_str().ok()) + .and_then(|t| t.strip_prefix("Second-")) + .and_then(|t| t.parse::().ok()) + .map(|t| Duration::from_secs(t as u64)); + + const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800); + + let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT); + + Ok(SubscribeRequest { + callback, + subscription_id: subscription_id.map(|s| s.to_owned()), + timeout, + }) + } +} + +pub(crate) enum SubscriptionResult { + Renewed, + Created { sid: String }, +} + +pub(crate) fn subscription_into_response( + request: &SubscribeRequest, + result: anyhow::Result, +) -> axum::response::Response { + let result = match result { + Ok(r) => r, + Err(e) => { + warn!(error=?e, sub=?request, "error handling subscription request"); + return StatusCode::BAD_REQUEST.into_response(); + } + }; + + let sid = match result { + SubscriptionResult::Renewed => match &request.subscription_id { + Some(sid) => sid.clone(), + None => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), + }, + SubscriptionResult::Created { sid } => sid, + }; + + ( + StatusCode::OK, + [ + ("SID", sid), + ("TIMEOUT", format!("Second-{}", request.timeout.as_secs())), + ], + ) + .into_response() +} + impl UpnpServerStateInner { - pub fn renew_content_directory_subscription( - &self, - sid: &str, - new_timeout: Duration, - ) -> anyhow::Result<()> { - self.content_directory_subscriptions - .update_timeout(sid, new_timeout) + pub(crate) fn handle_content_directory_subscription_request( + self: &Arc, + req: &SubscribeRequest, + ) -> anyhow::Result { + match &req.subscription_id { + Some(existing) => { + self.content_directory_subscriptions + .update_timeout(existing, req.timeout)?; + Ok(SubscriptionResult::Renewed) + } + None => { + let sid = + self.new_content_directory_subscription(req.callback.clone(), req.timeout)?; + Ok(SubscriptionResult::Created { sid }) + } + } } - pub fn renew_connection_manager_subscription( - &self, - sid: &str, - new_timeout: Duration, - ) -> anyhow::Result<()> { - self.content_directory_subscriptions - .update_timeout(sid, new_timeout) + pub(crate) fn handle_connection_manager_subscription_request( + self: &Arc, + req: &SubscribeRequest, + ) -> anyhow::Result { + match &req.subscription_id { + Some(existing) => { + self.connection_manager_subscriptions + .update_timeout(existing, req.timeout)?; + Ok(SubscriptionResult::Renewed) + } + None => { + let sid = + self.new_connection_manager_subscription(req.callback.clone(), req.timeout)?; + Ok(SubscriptionResult::Created { sid }) + } + } } - pub fn new_content_directory_subscription( + fn new_content_directory_subscription( self: &Arc, url: url::Url, timeout: Duration, @@ -174,7 +287,7 @@ impl UpnpServerStateInner { Ok(sid) } - pub fn new_connection_manager_subscription( + fn new_connection_manager_subscription( self: &Arc, url: url::Url, timeout: Duration, @@ -222,62 +335,3 @@ impl UpnpServerStateInner { Ok(sid) } } - -pub struct SubscribeRequest { - pub callback: url::Url, - pub subscription_id: Option, - pub timeout: Duration, -} - -impl SubscribeRequest { - pub fn parse( - request: axum::extract::Request, - ) -> Result { - if request.method().as_str() != "SUBSCRIBE" { - return Err(StatusCode::METHOD_NOT_ALLOWED.into_response()); - } - - let (parts, _body) = request.into_parts(); - let is_event = parts - .headers - .get(HeaderName::from_static("nt")) - .map(|v| v.as_bytes() == b"upnp:event") - .unwrap_or_default(); - if !is_event { - return Err((StatusCode::BAD_REQUEST, "expected NT: upnp:event header").into_response()); - } - - let callback = parts - .headers - .get(HeaderName::from_static("callback")) - .and_then(|v| v.to_str().ok()) - .map(|s| s.trim_matches(|c| c == '>' || c == '<')) - .and_then(|u| url::Url::parse(u).ok()); - let callback = match callback { - Some(c) => c, - None => return Err((StatusCode::BAD_REQUEST, "callback not provided").into_response()), - }; - let subscription_id = parts - .headers - .get(HeaderName::from_static("sid")) - .and_then(|v| v.to_str().ok()); - - let timeout = parts - .headers - .get(HeaderName::from_static("timeout")) - .and_then(|v| v.to_str().ok()) - .and_then(|t| t.strip_prefix("Second-")) - .and_then(|t| t.parse::().ok()) - .map(|t| Duration::from_secs(t as u64)); - - const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800); - - let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT); - - Ok(SubscribeRequest { - callback, - subscription_id: subscription_id.map(|s| s.to_owned()), - timeout, - }) - } -}