Add discover example to upnp

This commit is contained in:
Igor Katson 2024-08-29 13:51:45 +01:00
parent af00713e4d
commit babe470f9a
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 110 additions and 49 deletions

View file

@ -0,0 +1,35 @@
use std::time::Duration;
use librqbit_upnp::{discover_once, discover_services, SSDP_SEARCH_ROOT_ST};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (stx, mut srx) = tokio::sync::mpsc::unbounded_channel::<()>();
let f1 = async move { discover_once(&tx, SSDP_SEARCH_ROOT_ST, Duration::from_secs(10)).await };
let f2 = async move {
while let Some(r) = rx.recv().await {
let stx = stx.clone();
tokio::spawn(async move {
match discover_services(r.location.clone()).await {
Ok(s) => {
println!("{}: {s:#?}", r.location);
}
Err(e) => {
tracing::error!(location=%r.location, "error discovering")
}
}
drop(stx);
});
}
};
let f3 = async move { while (srx.recv().await).is_some() {} };
tokio::join!(f1, f2, f3);
Ok(())
}

View file

@ -16,12 +16,19 @@ use url::Url;
const SERVICE_TYPE_WAN_IP_CONNECTION: &str = "urn:schemas-upnp-org:service:WANIPConnection:1";
const SSDP_MULTICAST_IP: SocketAddr =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(239, 255, 255, 250), 1900));
const SSDP_SEARCH_REQUEST: &str = "M-SEARCH * HTTP/1.1\r\n\
Host: 239.255.255.250:1900\r\n\
Man: \"ssdp:discover\"\r\n\
MX: 3\r\n\
ST: urn:schemas-upnp-org:service:WANIPConnection:1\r\n\
\r\n";
pub const SSDP_SEARCH_WAN_IPCONNECTION_ST: &str = "urn:schemas-upnp-org:service:WANIPConnection:1";
pub const SSDP_SEARCH_ROOT_ST: &str = "upnp:rootdevice";
pub fn make_ssdp_search_request(kind: &str) -> String {
format!(
"M-SEARCH * HTTP/1.1\r\n\
Host: 239.255.255.250:1900\r\n\
Man: \"ssdp:discover\"\r\n\
MX: 3\r\n\
ST: {kind}\r\n\
\r\n"
)
}
pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result<Ipv4Addr> {
let local_dest = match local_dest {
@ -122,15 +129,15 @@ async fn forward_port(
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
struct RootDesc {
pub struct RootDesc {
#[serde(rename = "device")]
devices: Vec<Device>,
pub devices: Vec<Device>,
}
#[derive(Default, Clone, Debug, Deserialize, PartialEq, Eq)]
pub struct DeviceList {
#[serde(rename = "device")]
devices: Vec<Device>,
pub devices: Vec<Device>,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
@ -189,6 +196,8 @@ pub struct Service {
pub control_url: String,
#[serde(rename = "SCPDURL")]
pub scpd_url: String,
#[serde(rename = "eventSubURL", default)]
pub event_sub_url: Option<String>,
}
impl Service {
@ -242,12 +251,12 @@ impl UpnpEndpoint {
}
#[derive(Debug)]
struct UpnpDiscoverResponse {
pub struct UpnpDiscoverResponse {
pub received_from: SocketAddr,
pub location: Url,
}
async fn discover_services(location: Url) -> anyhow::Result<RootDesc> {
pub async fn discover_services(location: Url) -> anyhow::Result<RootDesc> {
let response = Client::new()
.get(location.clone())
.send()
@ -266,7 +275,7 @@ async fn discover_services(location: Url) -> anyhow::Result<RootDesc> {
Ok(root_desc)
}
fn parse_upnp_discover_response(
pub fn parse_upnp_discover_response(
buf: &[u8],
received_from: SocketAddr,
) -> anyhow::Result<UpnpDiscoverResponse> {
@ -299,6 +308,50 @@ fn parse_upnp_discover_response(
})
}
pub async fn discover_once(
tx: &UnboundedSender<UpnpDiscoverResponse>,
kind: &str,
timeout: Duration,
) -> anyhow::Result<()> {
let socket = tokio::net::UdpSocket::bind("0.0.0.0:0")
.await
.context("failed to bind UDP socket")?;
let message = make_ssdp_search_request(kind);
socket
.send_to(message.as_bytes(), SSDP_MULTICAST_IP)
.await
.context("failed to send SSDP search request")?;
let mut buffer = [0; 2048];
let timeout = tokio::time::sleep(timeout);
let mut timed_out = false;
tokio::pin!(timeout);
let mut discovered = 0;
while !timed_out {
tokio::select! {
_ = &mut timeout, if !timed_out => {
timed_out = true;
}
Ok((len, addr)) = socket.recv_from(&mut buffer), if !timed_out => {
let response = &buffer[..len];
match parse_upnp_discover_response(response, addr) {
Ok(r) => {
tx.send(r)?;
discovered += 1;
},
Err(e) => warn!(error=?e, response=?BStr::new(response), "failed to parse SSDP response"),
};
},
}
}
debug!("discovered {discovered} endpoints");
Ok(())
}
pub struct UpnpPortForwarderOptions {
pub lease_duration: Duration,
pub discover_interval: Duration,
@ -346,42 +399,12 @@ impl UpnpPortForwarder {
&self,
tx: &UnboundedSender<UpnpDiscoverResponse>,
) -> anyhow::Result<()> {
let socket = tokio::net::UdpSocket::bind("0.0.0.0:0")
.await
.context("failed to bind UDP socket")?;
socket
.send_to(SSDP_SEARCH_REQUEST.as_bytes(), SSDP_MULTICAST_IP)
.await
.context("failed to send SSDP search request")?;
let mut buffer = [0; 2048];
let timeout = tokio::time::sleep(self.opts.discover_timeout);
let mut timed_out = false;
tokio::pin!(timeout);
let mut discovered = 0;
while !timed_out {
tokio::select! {
_ = &mut timeout, if !timed_out => {
timed_out = true;
}
Ok((len, addr)) = socket.recv_from(&mut buffer), if !timed_out => {
let response = &buffer[..len];
match parse_upnp_discover_response(response, addr) {
Ok(r) => {
tx.send(r)?;
discovered += 1;
},
Err(e) => warn!(error=?e, response=?BStr::new(response), "failed to parse SSDP response"),
};
},
}
}
debug!("discovered {discovered} endpoints");
Ok(())
discover_once(
tx,
SSDP_SEARCH_WAN_IPCONNECTION_ST,
self.opts.discover_timeout,
)
.await
}
async fn discovery(&self, tx: UnboundedSender<UpnpDiscoverResponse>) -> anyhow::Result<()> {
@ -482,7 +505,7 @@ mod tests {
use crate::{Device, DeviceList, RootDesc, Service, ServiceList};
#[test]
fn test_parse() {
fn test_parse_root_desc() {
let actual = from_str::<RootDesc>(include_str!("resources/test/devices-0.xml")).unwrap();
let expected = RootDesc {
devices: vec![Device {
@ -493,6 +516,7 @@ mod tests {
service_type: "urn:schemas-upnp-org:service:Layer3Forwarding:1".into(),
control_url: "/upnp/control/Layer3Forwarding".into(),
scpd_url: "/Layer3ForwardingSCPD.xml".into(),
event_sub_url: Some("/upnp/event/Layer3Forwarding".into()),
}],
},
device_list: DeviceList {
@ -505,6 +529,7 @@ mod tests {
"urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1".into(),
control_url: "/upnp/control/WANCommonInterfaceConfig0".into(),
scpd_url: "/WANCommonInterfaceConfigSCPD.xml".into(),
event_sub_url: Some("/upnp/event/WANCommonInterfaceConfig0".into()),
}],
},
device_list: DeviceList {
@ -518,6 +543,7 @@ mod tests {
"urn:schemas-upnp-org:service:WANIPConnection:1".into(),
control_url: "/upnp/control/WANIPConnection0".into(),
scpd_url: "/WANIPConnectionServiceSCPD.xml".into(),
event_sub_url: Some("/upnp/event/WANIPConnection0".into()),
}],
},
device_list: DeviceList { devices: vec![] },