Fix a bug where upnp subscriptions were not renewed if the headers missed callback and event

This commit is contained in:
Igor Katson 2024-08-29 00:18:29 +01:00
parent a44acd947b
commit 0303cfa084
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -71,10 +71,24 @@ impl Subscriptions {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct SubscribeRequest { pub enum SubscribeRequest {
pub callback: url::Url, Create {
pub subscription_id: Option<String>, callback: url::Url,
pub timeout: Duration, timeout: Duration,
},
Renew {
sid: String,
timeout: Duration,
},
}
impl SubscribeRequest {
fn timeout(&self) -> Duration {
match self {
SubscribeRequest::Create { timeout, .. } => *timeout,
SubscribeRequest::Renew { timeout, .. } => *timeout,
}
}
} }
impl SubscribeRequest { impl SubscribeRequest {
@ -91,9 +105,6 @@ impl SubscribeRequest {
.get(HeaderName::from_static("nt")) .get(HeaderName::from_static("nt"))
.map(|v| v.as_bytes() == b"upnp:event") .map(|v| v.as_bytes() == b"upnp:event")
.unwrap_or_default(); .unwrap_or_default();
if !is_event {
return Err((StatusCode::BAD_REQUEST, "expected NT: upnp:event header").into_response());
}
let callback = parts let callback = parts
.headers .headers
@ -101,10 +112,6 @@ impl SubscribeRequest {
.and_then(|v| v.to_str().ok()) .and_then(|v| v.to_str().ok())
.map(|s| s.trim_matches(|c| c == '>' || c == '<')) .map(|s| s.trim_matches(|c| c == '>' || c == '<'))
.and_then(|u| url::Url::parse(u).ok()); .and_then(|u| url::Url::parse(u).ok());
let callback = match callback {
Some(c) => c,
None => return Err((StatusCode::BAD_REQUEST, "callback not provided").into_response()),
};
let subscription_id = parts let subscription_id = parts
.headers .headers
.get(HeaderName::from_static("sid")) .get(HeaderName::from_static("sid"))
@ -120,25 +127,40 @@ impl SubscribeRequest {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800);
let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT); let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT).min(DEFAULT_TIMEOUT);
Ok(SubscribeRequest { match (is_event, callback, subscription_id) {
callback, (true, Some(callback), None) => Ok(SubscribeRequest::Create { callback, timeout }),
subscription_id: subscription_id.map(|s| s.to_owned()), (_, _, Some(sid)) => Ok(SubscribeRequest::Renew {
timeout, sid: sid.to_owned(),
}) timeout,
}),
_ => Err(StatusCode::BAD_REQUEST.into_response()),
}
} }
} }
#[derive(Debug)]
pub(crate) enum SubscriptionResult { pub(crate) enum SubscriptionResult {
Renewed, Renewed { sid: String },
Created { sid: String }, Created { sid: String },
} }
impl SubscriptionResult {
fn sid(&self) -> &str {
match self {
SubscriptionResult::Renewed { sid } => sid,
SubscriptionResult::Created { sid } => sid,
}
}
}
pub(crate) fn subscription_into_response( pub(crate) fn subscription_into_response(
request: &SubscribeRequest, request: &SubscribeRequest,
result: anyhow::Result<SubscriptionResult>, result: anyhow::Result<SubscriptionResult>,
) -> axum::response::Response { ) -> axum::response::Response {
trace!(?request, ?result, "subscription request->response");
let result = match result { let result = match result {
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
@ -147,19 +169,11 @@ pub(crate) fn subscription_into_response(
} }
}; };
let sid = match result {
SubscriptionResult::Renewed => match &request.subscription_id {
Some(sid) => sid.clone(),
None => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
},
SubscriptionResult::Created { sid } => sid,
};
( (
StatusCode::OK, StatusCode::OK,
[ [
("SID", sid), ("SID", result.sid().to_owned()),
("TIMEOUT", format!("Second-{}", request.timeout.as_secs())), ("TIMEOUT", format!("Second-{}", request.timeout().as_secs())),
], ],
) )
.into_response() .into_response()
@ -170,17 +184,16 @@ impl UpnpServerStateInner {
self: &Arc<Self>, self: &Arc<Self>,
req: &SubscribeRequest, req: &SubscribeRequest,
) -> anyhow::Result<SubscriptionResult> { ) -> anyhow::Result<SubscriptionResult> {
match &req.subscription_id { match req {
Some(existing) => { SubscribeRequest::Create { callback, timeout } => {
self.content_directory_subscriptions let sid = self.new_content_directory_subscription(callback.clone(), *timeout)?;
.update_timeout(existing, req.timeout)?;
Ok(SubscriptionResult::Renewed)
}
None => {
let sid =
self.new_content_directory_subscription(req.callback.clone(), req.timeout)?;
Ok(SubscriptionResult::Created { sid }) Ok(SubscriptionResult::Created { sid })
} }
SubscribeRequest::Renew { sid, timeout } => {
self.content_directory_subscriptions
.update_timeout(sid, *timeout)?;
Ok(SubscriptionResult::Renewed { sid: sid.clone() })
}
} }
} }
@ -188,17 +201,16 @@ impl UpnpServerStateInner {
self: &Arc<Self>, self: &Arc<Self>,
req: &SubscribeRequest, req: &SubscribeRequest,
) -> anyhow::Result<SubscriptionResult> { ) -> anyhow::Result<SubscriptionResult> {
match &req.subscription_id { match req {
Some(existing) => { SubscribeRequest::Create { callback, timeout } => {
self.connection_manager_subscriptions let sid = self.new_connection_manager_subscription(callback.clone(), *timeout)?;
.update_timeout(existing, req.timeout)?;
Ok(SubscriptionResult::Renewed)
}
None => {
let sid =
self.new_connection_manager_subscription(req.callback.clone(), req.timeout)?;
Ok(SubscriptionResult::Created { sid }) Ok(SubscriptionResult::Created { sid })
} }
SubscribeRequest::Renew { sid, timeout } => {
self.connection_manager_subscriptions
.update_timeout(sid, *timeout)?;
Ok(SubscriptionResult::Renewed { sid: sid.clone() })
}
} }
} }
@ -262,9 +274,11 @@ impl UpnpServerStateInner {
tokio::select! { tokio::select! {
_ = refresh_notify.notified() => { _ = refresh_notify.notified() => {
timeout = state.upgrade().context("upnp server dead")?.content_directory_subscriptions.get_timeout(&sid)?; timeout = state.upgrade().context("upnp server dead")?.content_directory_subscriptions.get_timeout(&sid)?;
trace!(?timeout, "refreshed subscription");
}, },
_ = tokio::time::sleep(timeout) => { _ = tokio::time::sleep(timeout) => {
state.upgrade().context("upnp server dead")?.content_directory_subscriptions.remove(&sid)?; state.upgrade().context("upnp server dead")?.content_directory_subscriptions.remove(&sid)?;
trace!(?timeout, "subscription timed out, removing");
return Ok(()) return Ok(())
} }
} }
@ -293,7 +307,7 @@ impl UpnpServerStateInner {
timeout: Duration, timeout: Duration,
) -> anyhow::Result<String> { ) -> anyhow::Result<String> {
let (sid, refresh_notify) = self let (sid, refresh_notify) = self
.content_directory_subscriptions .connection_manager_subscriptions
.add(url.clone(), timeout); .add(url.clone(), timeout);
let token = self.cancel_token.child_token(); let token = self.cancel_token.child_token();