From 626c5290002f535cb93f10f6013e5456c0eea7b0 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 5 Dec 2023 14:24:16 +0000 Subject: [PATCH] Created upnp crate to port forward local ports --- Cargo.lock | 58 +++ Cargo.toml | 3 +- crates/upnp/Cargo.toml | 22 + crates/upnp/examples/upnp-forward.rs | 24 + crates/upnp/src/lib.rs | 480 +++++++++++++++++++ crates/upnp/src/resources/test/devices-0.xml | 77 +++ 6 files changed, 663 insertions(+), 1 deletion(-) create mode 100644 crates/upnp/Cargo.toml create mode 100644 crates/upnp/examples/upnp-forward.rs create mode 100644 crates/upnp/src/lib.rs create mode 100644 crates/upnp/src/resources/test/devices-0.xml diff --git a/Cargo.lock b/Cargo.lock index 2df7daa..1ec8066 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,6 +95,17 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "async-recursion" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -1378,6 +1389,23 @@ dependencies = [ "sha1", ] +[[package]] +name = "librqbit-upnp" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-recursion", + "futures", + "network-interface", + "reqwest", + "serde", + "serde-xml-rs", + "tokio", + "tracing", + "tracing-subscriber", + "url", +] + [[package]] name = "linux-raw-sys" version = "0.4.12" @@ -1471,6 +1499,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "network-interface" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d68759ef97fe9c9e46f79ea8736c19f1d28992e24c8dc8ce86752918bfeaae7" +dependencies = [ + "cc", + "libc", + "thiserror", + "winapi", +] + [[package]] name = "nom" version = "7.1.3" @@ -2099,6 +2139,18 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-xml-rs" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb3aa78ecda1ebc9ec9847d5d3aba7d618823446a049ba2491940506da6e2782" +dependencies = [ + "log", + "serde", + "thiserror", + "xml-rs", +] + [[package]] name = "serde_derive" version = "1.0.193" @@ -2994,3 +3046,9 @@ checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" dependencies = [ "tap", ] + +[[package]] +name = "xml-rs" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fcb9cbac069e033553e8bb871be2fbdffcab578eb25bd0f7c508cedc6dcd75a" diff --git a/Cargo.toml b/Cargo.toml index 4ac75b7..166e697 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,8 @@ members = [ "crates/sha1w", "crates/librqbit_core", "crates/peer_binary_protocol", - "crates/dht" + "crates/dht", + "crates/upnp" ] [profile.dev] diff --git a/crates/upnp/Cargo.toml b/crates/upnp/Cargo.toml new file mode 100644 index 0000000..07b5cc1 --- /dev/null +++ b/crates/upnp/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "librqbit-upnp" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tracing = "0.1" +anyhow = "1" +reqwest = {version = "0.11"} +serde = {version = "1", features = ["derive"]} +serde-xml-rs = "0.6.0" +tokio = {version = "1"} +futures = "0.3" +url = "2" +async-recursion = "1" +network-interface = "1" + +[dev-dependencies] +tokio = {version = "1", features = ["macros", "rt-multi-thread"]} +tracing-subscriber = "0.3" \ No newline at end of file diff --git a/crates/upnp/examples/upnp-forward.rs b/crates/upnp/examples/upnp-forward.rs new file mode 100644 index 0000000..d426ad1 --- /dev/null +++ b/crates/upnp/examples/upnp-forward.rs @@ -0,0 +1,24 @@ +use librqbit_upnp::UpnpPortForwarder; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let args: Vec = std::env::args().collect(); + if args.len() != 2 { + eprintln!("Usage: {} ", args[0]); + return Ok(()); + } + + let port: u16 = match args[1].parse() { + Ok(p) => p, + Err(_) => { + eprintln!("Invalid port number: {}", args[1]); + return Ok(()); + } + }; + + let port_forwarder = UpnpPortForwarder::new(vec![port], None)?; + + port_forwarder.run_forever().await; + Ok(()) +} diff --git a/crates/upnp/src/lib.rs b/crates/upnp/src/lib.rs new file mode 100644 index 0000000..614e1d5 --- /dev/null +++ b/crates/upnp/src/lib.rs @@ -0,0 +1,480 @@ +use anyhow::{bail, Context}; +use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}; +use network_interface::NetworkInterfaceConfig; +use reqwest::Client; +use serde::Deserialize; +use serde_xml_rs::from_str; +use std::{ + collections::{HashMap, HashSet}, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + time::Duration, +}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tracing::{debug, error, error_span, trace, warn, Instrument, Span}; +use url::Url; + +const SERVICE_TYPE_WAN_IP_CONNECTION: &str = "urn:schemas-upnp-org:service:WANIPConnection:1"; +const SSDP_MULTICAST_IP: SocketAddr = + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(239, 255, 255, 250), 1900)); +const SSDP_SEARCH_REQUEST: &str = "M-SEARCH * HTTP/1.1\r\n\ + Host: 239.255.255.250:1900\r\n\ + Man: \"ssdp:discover\"\r\n\ + MX: 3\r\n\ + ST: upnp:rootdevice\r\n\ + \r\n"; + +fn get_local_ip_relative_to(local_dest: Ipv4Addr) -> anyhow::Result { + // Ipv4Addr.to_bits() is only there in nightly rust, so copying here for now. + fn ip_bits(addr: Ipv4Addr) -> u32 { + u32::from_be_bytes(addr.octets()) + } + + fn masked(ip: Ipv4Addr, mask: Ipv4Addr) -> u32 { + ip_bits(ip) & ip_bits(mask) + } + + let interfaces = + network_interface::NetworkInterface::show().context("error listing network interfaces")?; + + for i in interfaces { + for addr in i.addr { + if let network_interface::Addr::V4(v4) = addr { + let ip = v4.ip; + let mask = match v4.netmask { + Some(mask) => mask, + None => continue, + }; + trace!("found local addr {ip}/{mask}"); + // If the masked address is the same, means we are on the same network, return + // the ip address + if masked(ip, mask) == masked(local_dest, mask) { + return Ok(ip); + } + } + } + } + bail!("couldn't find a local ip address") +} + +async fn forward_port( + control_url: Url, + local_ip: Ipv4Addr, + port: u16, + lease_duration: Duration, +) -> anyhow::Result<()> { + let request_body = format!( + r#" + + + + + {} + TCP + {} + {} + 1 + rust UPnP + {} + + + + "#, + SERVICE_TYPE_WAN_IP_CONNECTION, + port, + port, + local_ip, + lease_duration.as_secs() + ); + + let url = control_url; + + let client = reqwest::Client::new(); + let response = client + .post(url.clone()) + .header("Content-Type", "text/xml") + .header( + "SOAPAction", + format!("\"{}#AddPortMapping\"", SERVICE_TYPE_WAN_IP_CONNECTION), + ) + .body(request_body) + .send() + .await + .context("error sending")?; + + let status = response.status(); + + let response_text = response + .text() + .await + .context("error reading response text")?; + + trace!("AddPortMapping response: {} {}", status, response_text); + if !status.is_success() { + bail!("failed port forwarding: {}", status); + } else { + debug!("successfully port forwarded {}:{}", local_ip, port); + } + Ok(()) +} + +#[derive(Clone, Debug, Deserialize)] +struct RootDesc { + #[serde(rename = "device")] + devices: Vec, +} + +#[derive(Default, Clone, Debug, Deserialize)] +pub struct DeviceList { + #[serde(rename = "device")] + devices: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Device { + #[serde(rename = "deviceType")] + pub device_type: String, + #[serde(rename = "friendlyName", default)] + pub friendly_name: String, + #[serde(rename = "serviceList", default)] + pub service_list: ServiceList, + #[serde(rename = "deviceList", default)] + pub device_list: DeviceList, +} + +impl Device { + pub fn iter_services( + &self, + parent: Span, + ) -> Box + '_> { + let self_span = self.span(parent); + let services = self.service_list.services.iter().map({ + let self_span = self_span.clone(); + move |s| (s.span(self_span.clone()), s) + }); + Box::new(services.chain(self.device_list.devices.iter().flat_map({ + let self_span = self_span.clone(); + move |d| d.iter_services(self_span.clone()) + }))) + } + + pub fn span(&self, parent: tracing::Span) -> tracing::Span { + error_span!(parent: parent, "device", name = self.name()) + } +} + +impl Device { + pub fn name(&self) -> &str { + if self.friendly_name.is_empty() { + return &self.device_type; + } + &self.friendly_name + } +} + +#[derive(Clone, Debug, Deserialize, Default)] +pub struct ServiceList { + #[serde(rename = "service", default)] + pub services: Vec, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct Service { + #[serde(rename = "serviceType")] + pub service_type: String, + #[serde(rename = "controlURL")] + pub control_url: String, + #[serde(rename = "SCPDURL")] + pub scpd_url: String, +} + +impl Service { + pub fn span(&self, parent: tracing::Span) -> tracing::Span { + error_span!(parent: parent, "service", url = self.control_url) + } +} + +#[derive(Debug)] +struct UpnpEndpoint { + discover_response: UpnpDiscoverResponse, + data: RootDesc, +} + +impl UpnpEndpoint { + fn location(&self) -> &Url { + &self.discover_response.location + } + + fn span(&self) -> tracing::Span { + error_span!("upnp_endpoint", location = %self.location()) + } + + fn iter_services(&self) -> impl Iterator + '_ { + let self_span = self.span(); + self.data + .devices + .iter() + .flat_map(move |d| d.iter_services(self_span.clone())) + } + + fn my_local_ip(&self) -> anyhow::Result { + let dest_ipv4 = match self.discover_response.received_from { + SocketAddr::V4(v4) => *v4.ip(), + SocketAddr::V6(v6) => { + bail!("don't support IPv6, but remote ip is {}", v6.ip()) + } + }; + let local_ip = get_local_ip_relative_to(dest_ipv4) + .with_context(|| format!("can't determine local IP relative to {dest_ipv4}"))?; + Ok(local_ip) + } + + fn get_wan_ip_control_urls(&self) -> impl Iterator + '_ { + self.iter_services() + .filter(|(_, s)| s.service_type == SERVICE_TYPE_WAN_IP_CONNECTION) + .map(|(span, s)| (span, self.discover_response.location.join(&s.control_url))) + .filter_map(|(span, url)| match url { + Ok(url) => Some((span, url)), + Err(e) => { + error!("bad control url: {e:#}"); + None + } + }) + } +} + +#[derive(Debug)] +struct UpnpDiscoverResponse { + pub received_from: SocketAddr, + pub location: Url, +} + +async fn discover_services(location: Url) -> anyhow::Result { + let response = Client::new() + .get(location.clone()) + .send() + .await + .context("failed to send GET request")? + .text() + .await + .context("failed to read response body")?; + trace!("received from {location}: {response}"); + let root_desc: RootDesc = from_str(&response) + .context("failed to parse response body as xml") + .map_err(|e| { + error!("failed to parse this XML: {response}"); + e + })?; + Ok(root_desc) +} + +fn parse_upnp_discover_response( + response: &str, + received_from: SocketAddr, +) -> anyhow::Result { + let mut headers = HashMap::new(); + for line in response.lines() { + if let Some((key, value)) = line.split_once(": ") { + headers.insert(key.to_lowercase(), value.trim_end().to_string()); + } + } + let location = headers.get("location").context("missing location header")?; + let location = + Url::parse(location).with_context(|| format!("failed parsing location {location}"))?; + Ok(UpnpDiscoverResponse { + location, + received_from, + }) +} + +pub struct UpnpPortForwarderOptions { + pub lease_duration: Duration, + pub discover_interval: Duration, + pub discover_timeout: Duration, +} + +impl Default for UpnpPortForwarderOptions { + fn default() -> Self { + Self { + discover_interval: Duration::from_secs(60), + discover_timeout: Duration::from_secs(10), + lease_duration: Duration::from_secs(60), + } + } +} + +pub struct UpnpPortForwarder { + ports: Vec, + opts: UpnpPortForwarderOptions, +} + +impl UpnpPortForwarder { + pub fn new(ports: Vec, opts: Option) -> anyhow::Result { + if ports.is_empty() { + bail!("empty ports") + } + Ok(Self { + ports, + opts: opts.unwrap_or_default(), + }) + } + + async fn parse_endpoint( + &self, + discover_response: UpnpDiscoverResponse, + ) -> anyhow::Result { + let services = discover_services(discover_response.location.clone()).await?; + Ok(UpnpEndpoint { + discover_response, + data: services, + }) + } + + async fn discover_once( + &self, + tx: &UnboundedSender, + ) -> anyhow::Result<()> { + let socket = tokio::net::UdpSocket::bind("0.0.0.0:0") + .await + .context("failed to bind UDP socket")?; + socket + .send_to(SSDP_SEARCH_REQUEST.as_bytes(), SSDP_MULTICAST_IP) + .await + .context("failed to send SSDP search request")?; + + let mut buffer = [0; 2048]; + + let timeout = tokio::time::sleep(self.opts.discover_timeout); + let mut timed_out = false; + tokio::pin!(timeout); + + let mut discovered = 0; + + while !timed_out { + tokio::select! { + _ = &mut timeout, if !timed_out => { + timed_out = true; + } + Ok((len, addr)) = socket.recv_from(&mut buffer), if !timed_out => { + let response = match std::str::from_utf8(&buffer[..len]) { + Ok(response) => response, + Err(_) => { + warn!("received invalid utf-8 from {addr}"); + continue; + }, + }; + trace!("received response from {addr}: {response}"); + match parse_upnp_discover_response(response, addr) { + Ok(r) => { + tx.send(r)?; + discovered += 1; + }, + Err(e) => warn!("failed to parse response: {e:#}"), + }; + }, + } + } + + debug!("discovered {discovered} endpoints"); + Ok(()) + } + + async fn discovery(&self, tx: UnboundedSender) -> anyhow::Result<()> { + let mut discover_interval = tokio::time::interval(self.opts.discover_interval); + + loop { + discover_interval.tick().await; + if let Err(e) = self.discover_once(&tx).await { + warn!("failed to run discovery: {e:#}"); + } + } + } + + async fn manage_port(&self, control_url: Url, local_ip: Ipv4Addr, port: u16) -> ! { + let lease_duration = self.opts.lease_duration; + let mut interval = tokio::time::interval(lease_duration / 2); + loop { + interval.tick().await; + if let Err(e) = forward_port(control_url.clone(), local_ip, port, lease_duration).await + { + warn!("failed to forward port: {e:#}"); + } + } + } + + async fn manage_service(&self, control_url: Url, local_ip: Ipv4Addr) -> anyhow::Result<()> { + futures::future::join_all(self.ports.iter().cloned().map(|port| { + self.manage_port(control_url.clone(), local_ip, port) + .instrument(error_span!("manage_port", port = port)) + })) + .await; + Ok(()) + } + + pub async fn run_forever(self) -> ! { + let (discover_tx, mut discover_rx) = unbounded_channel(); + let discovery = self.discovery(discover_tx); + + let mut spawned_tasks = HashSet::::new(); + + let mut endpoints = FuturesUnordered::new(); + let mut service_managers = FuturesUnordered::new(); + + tokio::pin!(discovery); + + loop { + tokio::select! { + _ = &mut discovery => {}, + r = discover_rx.recv() => { + let r = r.unwrap(); + let location = r.location.clone(); + endpoints.push(self.parse_endpoint(r).map_err(|e| { + error!("failed to parse endpoint: {e:#}"); + e + }).instrument(error_span!("parse endpoint", location=location.to_string()))); + }, + Some(Ok(endpoint)) = endpoints.next(), if !endpoints.is_empty() => { + let mut local_ip = None; + for (span, control_url) in endpoint.get_wan_ip_control_urls() { + if spawned_tasks.contains(&control_url) { + debug!("already spawned for {}", control_url); + continue; + } + let ip = match local_ip { + Some(ip) => ip, + None => { + match endpoint.my_local_ip() { + Ok(ip) => { + local_ip = Some(ip); + ip + }, + Err(e) => { + warn!("failed to determine local IP for endpoint at {}: {:#}", endpoint.location(), e); + break; + } + } + } + }; + spawned_tasks.insert(control_url.clone()); + service_managers.push(self.manage_service(control_url, ip).instrument(span)) + } + }, + _ = service_managers.next(), if !service_managers.is_empty() => { + + }, + } + } + } +} + +#[cfg(test)] +mod tests { + use serde_xml_rs::from_str; + + use crate::RootDesc; + + #[test] + fn test_parse() { + dbg!(from_str::(include_str!("resources/test/devices-0.xml")).unwrap()); + } +} diff --git a/crates/upnp/src/resources/test/devices-0.xml b/crates/upnp/src/resources/test/devices-0.xml new file mode 100644 index 0000000..7e30770 --- /dev/null +++ b/crates/upnp/src/resources/test/devices-0.xml @@ -0,0 +1,77 @@ + + + 1 + 0 + + + urn:schemas-upnp-org:device:InternetGatewayDevice:1 + ARRIS TG3492LG + Arris Group, Inc + http://www.arris.com/ + DOCSIS 3.1 Cable Modem Gateway Device + TG3492LG + TG3492LG + http://www.arris.com + ABAP02974423 + uuid:ebf5a0a0-1dd1-11b2-a90f-acf8cc3de6b6 + TG3492LG + + + urn:schemas-upnp-org:service:Layer3Forwarding:1 + urn:upnp-org:serviceId:L3Forwarding1 + /Layer3ForwardingSCPD.xml + /upnp/control/Layer3Forwarding + /upnp/event/Layer3Forwarding + + + + + urn:schemas-upnp-org:device:WANDevice:1 + WANDevice:1 + Arris Group, Inc + http://www.arris.com/ + DOCSIS 3.1 Cable Modem Gateway Device + TG3492LG + TG3492LG + http://www.arris.com + ABAP02974423 + uuid:ebf5a0a0-1dd1-11b2-a92f-acf8cc3de6b6 + TG3492LG + + + urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1 + urn:upnp-org:serviceId:WANCommonIFC1 + /WANCommonInterfaceConfigSCPD.xml + /upnp/control/WANCommonInterfaceConfig0 + /upnp/event/WANCommonInterfaceConfig0 + + + + + urn:schemas-upnp-org:device:WANConnectionDevice:1 + WANConnectionDevice:1 + Arris Group, Inc + http://www.arris.com/ + DOCSIS 3.1 Cable Modem Gateway Device + TG3492LG + TG3492LG + http://www.arris.com + ABAP02974423 + uuid:ebf5a0a0-1dd1-11b2-a93f-acf8cc3de6b6 + TG3492LG + + + urn:schemas-upnp-org:service:WANIPConnection:1 + urn:upnp-org:serviceId:WANIPConn1 + /WANIPConnectionServiceSCPD.xml + /upnp/control/WANIPConnection0 + /upnp/event/WANIPConnection0 + + + + + + + http://192.168.0.1/ + + \ No newline at end of file