diff --git a/crates/upnp-serve/src/subscriptions.rs b/crates/upnp-serve/src/subscriptions.rs index 573f791..16aac67 100644 --- a/crates/upnp-serve/src/subscriptions.rs +++ b/crates/upnp-serve/src/subscriptions.rs @@ -71,10 +71,24 @@ impl Subscriptions { } #[derive(Debug)] -pub struct SubscribeRequest { - pub callback: url::Url, - pub subscription_id: Option, - pub timeout: Duration, +pub enum SubscribeRequest { + Create { + callback: url::Url, + timeout: Duration, + }, + Renew { + sid: String, + timeout: Duration, + }, +} + +impl SubscribeRequest { + fn timeout(&self) -> Duration { + match self { + SubscribeRequest::Create { timeout, .. } => *timeout, + SubscribeRequest::Renew { timeout, .. } => *timeout, + } + } } impl SubscribeRequest { @@ -91,9 +105,6 @@ impl SubscribeRequest { .get(HeaderName::from_static("nt")) .map(|v| v.as_bytes() == b"upnp:event") .unwrap_or_default(); - if !is_event { - return Err((StatusCode::BAD_REQUEST, "expected NT: upnp:event header").into_response()); - } let callback = parts .headers @@ -101,10 +112,6 @@ impl SubscribeRequest { .and_then(|v| v.to_str().ok()) .map(|s| s.trim_matches(|c| c == '>' || c == '<')) .and_then(|u| url::Url::parse(u).ok()); - let callback = match callback { - Some(c) => c, - None => return Err((StatusCode::BAD_REQUEST, "callback not provided").into_response()), - }; let subscription_id = parts .headers .get(HeaderName::from_static("sid")) @@ -120,25 +127,40 @@ impl SubscribeRequest { const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800); - let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT); + let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT).min(DEFAULT_TIMEOUT); - Ok(SubscribeRequest { - callback, - subscription_id: subscription_id.map(|s| s.to_owned()), - timeout, - }) + match (is_event, callback, subscription_id) { + (true, Some(callback), None) => Ok(SubscribeRequest::Create { callback, timeout }), + (_, _, Some(sid)) => Ok(SubscribeRequest::Renew { + sid: sid.to_owned(), + timeout, + }), + _ => Err(StatusCode::BAD_REQUEST.into_response()), + } } } +#[derive(Debug)] pub(crate) enum SubscriptionResult { - Renewed, + Renewed { sid: String }, Created { sid: String }, } +impl SubscriptionResult { + fn sid(&self) -> &str { + match self { + SubscriptionResult::Renewed { sid } => sid, + SubscriptionResult::Created { sid } => sid, + } + } +} + pub(crate) fn subscription_into_response( request: &SubscribeRequest, result: anyhow::Result, ) -> axum::response::Response { + trace!(?request, ?result, "subscription request->response"); + let result = match result { Ok(r) => r, Err(e) => { @@ -147,19 +169,11 @@ pub(crate) fn subscription_into_response( } }; - let sid = match result { - SubscriptionResult::Renewed => match &request.subscription_id { - Some(sid) => sid.clone(), - None => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), - }, - SubscriptionResult::Created { sid } => sid, - }; - ( StatusCode::OK, [ - ("SID", sid), - ("TIMEOUT", format!("Second-{}", request.timeout.as_secs())), + ("SID", result.sid().to_owned()), + ("TIMEOUT", format!("Second-{}", request.timeout().as_secs())), ], ) .into_response() @@ -170,17 +184,16 @@ impl UpnpServerStateInner { self: &Arc, req: &SubscribeRequest, ) -> anyhow::Result { - match &req.subscription_id { - Some(existing) => { - self.content_directory_subscriptions - .update_timeout(existing, req.timeout)?; - Ok(SubscriptionResult::Renewed) - } - None => { - let sid = - self.new_content_directory_subscription(req.callback.clone(), req.timeout)?; + match req { + SubscribeRequest::Create { callback, timeout } => { + let sid = self.new_content_directory_subscription(callback.clone(), *timeout)?; Ok(SubscriptionResult::Created { sid }) } + SubscribeRequest::Renew { sid, timeout } => { + self.content_directory_subscriptions + .update_timeout(sid, *timeout)?; + Ok(SubscriptionResult::Renewed { sid: sid.clone() }) + } } } @@ -188,17 +201,16 @@ impl UpnpServerStateInner { self: &Arc, req: &SubscribeRequest, ) -> anyhow::Result { - match &req.subscription_id { - Some(existing) => { - self.connection_manager_subscriptions - .update_timeout(existing, req.timeout)?; - Ok(SubscriptionResult::Renewed) - } - None => { - let sid = - self.new_connection_manager_subscription(req.callback.clone(), req.timeout)?; + match req { + SubscribeRequest::Create { callback, timeout } => { + let sid = self.new_connection_manager_subscription(callback.clone(), *timeout)?; Ok(SubscriptionResult::Created { sid }) } + SubscribeRequest::Renew { sid, timeout } => { + self.connection_manager_subscriptions + .update_timeout(sid, *timeout)?; + Ok(SubscriptionResult::Renewed { sid: sid.clone() }) + } } } @@ -262,9 +274,11 @@ impl UpnpServerStateInner { tokio::select! { _ = refresh_notify.notified() => { timeout = state.upgrade().context("upnp server dead")?.content_directory_subscriptions.get_timeout(&sid)?; + trace!(?timeout, "refreshed subscription"); }, _ = tokio::time::sleep(timeout) => { state.upgrade().context("upnp server dead")?.content_directory_subscriptions.remove(&sid)?; + trace!(?timeout, "subscription timed out, removing"); return Ok(()) } } @@ -293,7 +307,7 @@ impl UpnpServerStateInner { timeout: Duration, ) -> anyhow::Result { let (sid, refresh_notify) = self - .content_directory_subscriptions + .connection_manager_subscriptions .add(url.clone(), timeout); let token = self.cancel_token.child_token();