diff --git a/crates/upnp/examples/discover.rs b/crates/upnp/examples/discover.rs new file mode 100644 index 0000000..a445476 --- /dev/null +++ b/crates/upnp/examples/discover.rs @@ -0,0 +1,35 @@ +use std::time::Duration; + +use librqbit_upnp::{discover_once, discover_services, SSDP_SEARCH_ROOT_ST}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt().init(); + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (stx, mut srx) = tokio::sync::mpsc::unbounded_channel::<()>(); + + let f1 = async move { discover_once(&tx, SSDP_SEARCH_ROOT_ST, Duration::from_secs(10)).await }; + + let f2 = async move { + while let Some(r) = rx.recv().await { + let stx = stx.clone(); + tokio::spawn(async move { + match discover_services(r.location.clone()).await { + Ok(s) => { + println!("{}: {s:#?}", r.location); + } + Err(e) => { + tracing::error!(location=%r.location, "error discovering") + } + } + drop(stx); + }); + } + }; + + let f3 = async move { while (srx.recv().await).is_some() {} }; + + tokio::join!(f1, f2, f3); + Ok(()) +} diff --git a/crates/upnp/src/lib.rs b/crates/upnp/src/lib.rs index 11aad56..0feb3e6 100644 --- a/crates/upnp/src/lib.rs +++ b/crates/upnp/src/lib.rs @@ -16,12 +16,19 @@ use url::Url; const SERVICE_TYPE_WAN_IP_CONNECTION: &str = "urn:schemas-upnp-org:service:WANIPConnection:1"; const SSDP_MULTICAST_IP: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(239, 255, 255, 250), 1900)); -const SSDP_SEARCH_REQUEST: &str = "M-SEARCH * HTTP/1.1\r\n\ - Host: 239.255.255.250:1900\r\n\ - Man: \"ssdp:discover\"\r\n\ - MX: 3\r\n\ - ST: urn:schemas-upnp-org:service:WANIPConnection:1\r\n\ - \r\n"; +pub const SSDP_SEARCH_WAN_IPCONNECTION_ST: &str = "urn:schemas-upnp-org:service:WANIPConnection:1"; +pub const SSDP_SEARCH_ROOT_ST: &str = "upnp:rootdevice"; + +pub fn make_ssdp_search_request(kind: &str) -> String { + format!( + "M-SEARCH * HTTP/1.1\r\n\ + Host: 239.255.255.250:1900\r\n\ + Man: \"ssdp:discover\"\r\n\ + MX: 3\r\n\ + ST: {kind}\r\n\ + \r\n" + ) +} pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result { let local_dest = match local_dest { @@ -122,15 +129,15 @@ async fn forward_port( } #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] -struct RootDesc { +pub struct RootDesc { #[serde(rename = "device")] - devices: Vec, + pub devices: Vec, } #[derive(Default, Clone, Debug, Deserialize, PartialEq, Eq)] pub struct DeviceList { #[serde(rename = "device")] - devices: Vec, + pub devices: Vec, } #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] @@ -189,6 +196,8 @@ pub struct Service { pub control_url: String, #[serde(rename = "SCPDURL")] pub scpd_url: String, + #[serde(rename = "eventSubURL", default)] + pub event_sub_url: Option, } impl Service { @@ -242,12 +251,12 @@ impl UpnpEndpoint { } #[derive(Debug)] -struct UpnpDiscoverResponse { +pub struct UpnpDiscoverResponse { pub received_from: SocketAddr, pub location: Url, } -async fn discover_services(location: Url) -> anyhow::Result { +pub async fn discover_services(location: Url) -> anyhow::Result { let response = Client::new() .get(location.clone()) .send() @@ -266,7 +275,7 @@ async fn discover_services(location: Url) -> anyhow::Result { Ok(root_desc) } -fn parse_upnp_discover_response( +pub fn parse_upnp_discover_response( buf: &[u8], received_from: SocketAddr, ) -> anyhow::Result { @@ -299,6 +308,50 @@ fn parse_upnp_discover_response( }) } +pub async fn discover_once( + tx: &UnboundedSender, + kind: &str, + timeout: Duration, +) -> anyhow::Result<()> { + let socket = tokio::net::UdpSocket::bind("0.0.0.0:0") + .await + .context("failed to bind UDP socket")?; + let message = make_ssdp_search_request(kind); + socket + .send_to(message.as_bytes(), SSDP_MULTICAST_IP) + .await + .context("failed to send SSDP search request")?; + + let mut buffer = [0; 2048]; + + let timeout = tokio::time::sleep(timeout); + let mut timed_out = false; + tokio::pin!(timeout); + + let mut discovered = 0; + + while !timed_out { + tokio::select! { + _ = &mut timeout, if !timed_out => { + timed_out = true; + } + Ok((len, addr)) = socket.recv_from(&mut buffer), if !timed_out => { + let response = &buffer[..len]; + match parse_upnp_discover_response(response, addr) { + Ok(r) => { + tx.send(r)?; + discovered += 1; + }, + Err(e) => warn!(error=?e, response=?BStr::new(response), "failed to parse SSDP response"), + }; + }, + } + } + + debug!("discovered {discovered} endpoints"); + Ok(()) +} + pub struct UpnpPortForwarderOptions { pub lease_duration: Duration, pub discover_interval: Duration, @@ -346,42 +399,12 @@ impl UpnpPortForwarder { &self, tx: &UnboundedSender, ) -> anyhow::Result<()> { - let socket = tokio::net::UdpSocket::bind("0.0.0.0:0") - .await - .context("failed to bind UDP socket")?; - socket - .send_to(SSDP_SEARCH_REQUEST.as_bytes(), SSDP_MULTICAST_IP) - .await - .context("failed to send SSDP search request")?; - - let mut buffer = [0; 2048]; - - let timeout = tokio::time::sleep(self.opts.discover_timeout); - let mut timed_out = false; - tokio::pin!(timeout); - - let mut discovered = 0; - - while !timed_out { - tokio::select! { - _ = &mut timeout, if !timed_out => { - timed_out = true; - } - Ok((len, addr)) = socket.recv_from(&mut buffer), if !timed_out => { - let response = &buffer[..len]; - match parse_upnp_discover_response(response, addr) { - Ok(r) => { - tx.send(r)?; - discovered += 1; - }, - Err(e) => warn!(error=?e, response=?BStr::new(response), "failed to parse SSDP response"), - }; - }, - } - } - - debug!("discovered {discovered} endpoints"); - Ok(()) + discover_once( + tx, + SSDP_SEARCH_WAN_IPCONNECTION_ST, + self.opts.discover_timeout, + ) + .await } async fn discovery(&self, tx: UnboundedSender) -> anyhow::Result<()> { @@ -482,7 +505,7 @@ mod tests { use crate::{Device, DeviceList, RootDesc, Service, ServiceList}; #[test] - fn test_parse() { + fn test_parse_root_desc() { let actual = from_str::(include_str!("resources/test/devices-0.xml")).unwrap(); let expected = RootDesc { devices: vec![Device { @@ -493,6 +516,7 @@ mod tests { service_type: "urn:schemas-upnp-org:service:Layer3Forwarding:1".into(), control_url: "/upnp/control/Layer3Forwarding".into(), scpd_url: "/Layer3ForwardingSCPD.xml".into(), + event_sub_url: Some("/upnp/event/Layer3Forwarding".into()), }], }, device_list: DeviceList { @@ -505,6 +529,7 @@ mod tests { "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1".into(), control_url: "/upnp/control/WANCommonInterfaceConfig0".into(), scpd_url: "/WANCommonInterfaceConfigSCPD.xml".into(), + event_sub_url: Some("/upnp/event/WANCommonInterfaceConfig0".into()), }], }, device_list: DeviceList { @@ -518,6 +543,7 @@ mod tests { "urn:schemas-upnp-org:service:WANIPConnection:1".into(), control_url: "/upnp/control/WANIPConnection0".into(), scpd_url: "/WANIPConnectionServiceSCPD.xml".into(), + event_sub_url: Some("/upnp/event/WANIPConnection0".into()), }], }, device_list: DeviceList { devices: vec![] },