From 76e0e1aa8f9a89aff991d4573c0d2bf71495fb19 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 28 Aug 2024 16:21:59 +0100 Subject: [PATCH] send SSDP NOTIFY in parallel --- Cargo.lock | 1 + crates/upnp-serve/Cargo.toml | 1 + crates/upnp-serve/src/ssdp.rs | 48 +++++++++++++++++++++++------------ 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d70a1c..00f5ed5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1649,6 +1649,7 @@ dependencies = [ "anyhow", "axum", "bstr", + "futures", "gethostname", "http", "httparse", diff --git a/crates/upnp-serve/Cargo.toml b/crates/upnp-serve/Cargo.toml index 1b3c829..6a134f3 100644 --- a/crates/upnp-serve/Cargo.toml +++ b/crates/upnp-serve/Cargo.toml @@ -35,6 +35,7 @@ reqwest = { version = "0.12.7", default-features = false } socket2 = "0.5.7" quick-xml = { version = "0.36.1", features = ["serialize"] } network-interface = "2.0.0" +futures = "0.3.30" [dev-dependencies] tracing-subscriber = "0.3.18" diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs index b9e4342..c6da767 100644 --- a/crates/upnp-serve/src/ssdp.rs +++ b/crates/upnp-serve/src/ssdp.rs @@ -133,7 +133,7 @@ impl SsdpRunner { Ok(Self { opts, socket }) } - fn generate_notify_message(&self, kind: &str, nts: &str, location: &str) -> String { + fn generate_notify_message(&self, kind: &str, nts: &str, location: &url::Url) -> String { let usn: &str = &self.opts.usn; let server: &str = &self.opts.server_string; let bcast_addr = UPNP_BROADCAST_ADDR; @@ -187,35 +187,51 @@ Content-Length: 0\r\n\r\n" } }; - for ni in interfaces { - for niaddr in ni.addr { - let ip = niaddr.ip(); - if ip.is_ipv6() || ip.is_loopback() { - continue; + let futs = interfaces + .into_iter() + .flat_map(|ni| ni.addr) + .filter_map(|addr| { + let ip = addr.ip(); + if ip.is_ipv4() && !ip.is_loopback() { + Some(ip) + } else { + None } + }) + .map(|ip| async move { let addr = SocketAddr::new(ip, 0); let sock = match tokio::net::UdpSocket::bind(addr).await { Ok(sock) => sock, Err(e) => { debug!(%addr, error=?e, "error binding UDP to send NOTIFY"); - continue; + return; } }; let mut location = self.opts.description_http_location.clone(); location.set_host(Some(&format!("{ip}"))).unwrap(); - for kind in [UPNP_KIND_ROOT_DEVICE, UPNP_KIND_MEDIASERVER] { - let msg = self.generate_notify_message(kind, nts, &format!("{location}")); - trace!(content=?msg, addr=?UPNP_BROADCAST_ADDR, "sending SSDP NOTIFY"); - if let Err(e) = sock.send_to(msg.as_bytes(), UPNP_BROADCAST_ADDR).await { - debug!(sock_addr=%addr, error=%e, kind, nts, "error sending SSDP NOTIFY") - } else { - debug!(kind, nts, %location, "sent SSDP NOTIFY") + macro_rules! gen { + ($kind:expr) => { + async { + let msg = self.generate_notify_message($kind, nts, &location); + trace!(content=?msg, addr=?UPNP_BROADCAST_ADDR, "sending SSDP NOTIFY"); + if let Err(e) = sock.send_to(msg.as_bytes(), UPNP_BROADCAST_ADDR).await { + debug!(sock_addr=%addr, error=%e, kind=$kind, nts, "error sending SSDP NOTIFY") + } else { + debug!(kind=$kind, nts, %location, "sent SSDP NOTIFY") + } + } } } - } - } + + let f1 = gen!(UPNP_KIND_ROOT_DEVICE); + let f2 = gen!(UPNP_KIND_MEDIASERVER) ; + + tokio::join!(f1, f2); + }); + + futures::future::join_all(futs).await; } async fn task_send_alive_notifies_periodically(&self) -> anyhow::Result<()> {