send SSDP NOTIFY in parallel

This commit is contained in:
Igor Katson 2024-08-28 16:21:59 +01:00
parent 5f7bf174bf
commit 76e0e1aa8f
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 34 additions and 16 deletions

1
Cargo.lock generated
View file

@ -1649,6 +1649,7 @@ dependencies = [
"anyhow",
"axum",
"bstr",
"futures",
"gethostname",
"http",
"httparse",

View file

@ -35,6 +35,7 @@ reqwest = { version = "0.12.7", default-features = false }
socket2 = "0.5.7"
quick-xml = { version = "0.36.1", features = ["serialize"] }
network-interface = "2.0.0"
futures = "0.3.30"
[dev-dependencies]
tracing-subscriber = "0.3.18"

View file

@ -133,7 +133,7 @@ impl SsdpRunner {
Ok(Self { opts, socket })
}
fn generate_notify_message(&self, kind: &str, nts: &str, location: &str) -> String {
fn generate_notify_message(&self, kind: &str, nts: &str, location: &url::Url) -> String {
let usn: &str = &self.opts.usn;
let server: &str = &self.opts.server_string;
let bcast_addr = UPNP_BROADCAST_ADDR;
@ -187,35 +187,51 @@ Content-Length: 0\r\n\r\n"
}
};
for ni in interfaces {
for niaddr in ni.addr {
let ip = niaddr.ip();
if ip.is_ipv6() || ip.is_loopback() {
continue;
let futs = interfaces
.into_iter()
.flat_map(|ni| ni.addr)
.filter_map(|addr| {
let ip = addr.ip();
if ip.is_ipv4() && !ip.is_loopback() {
Some(ip)
} else {
None
}
})
.map(|ip| async move {
let addr = SocketAddr::new(ip, 0);
let sock = match tokio::net::UdpSocket::bind(addr).await {
Ok(sock) => sock,
Err(e) => {
debug!(%addr, error=?e, "error binding UDP to send NOTIFY");
continue;
return;
}
};
let mut location = self.opts.description_http_location.clone();
location.set_host(Some(&format!("{ip}"))).unwrap();
for kind in [UPNP_KIND_ROOT_DEVICE, UPNP_KIND_MEDIASERVER] {
let msg = self.generate_notify_message(kind, nts, &format!("{location}"));
trace!(content=?msg, addr=?UPNP_BROADCAST_ADDR, "sending SSDP NOTIFY");
if let Err(e) = sock.send_to(msg.as_bytes(), UPNP_BROADCAST_ADDR).await {
debug!(sock_addr=%addr, error=%e, kind, nts, "error sending SSDP NOTIFY")
} else {
debug!(kind, nts, %location, "sent SSDP NOTIFY")
macro_rules! gen {
($kind:expr) => {
async {
let msg = self.generate_notify_message($kind, nts, &location);
trace!(content=?msg, addr=?UPNP_BROADCAST_ADDR, "sending SSDP NOTIFY");
if let Err(e) = sock.send_to(msg.as_bytes(), UPNP_BROADCAST_ADDR).await {
debug!(sock_addr=%addr, error=%e, kind=$kind, nts, "error sending SSDP NOTIFY")
} else {
debug!(kind=$kind, nts, %location, "sent SSDP NOTIFY")
}
}
}
}
}
}
let f1 = gen!(UPNP_KIND_ROOT_DEVICE);
let f2 = gen!(UPNP_KIND_MEDIASERVER) ;
tokio::join!(f1, f2);
});
futures::future::join_all(futs).await;
}
async fn task_send_alive_notifies_periodically(&self) -> anyhow::Result<()> {