IPv6 support for SSDP (for UPNP)
This commit is contained in:
parent
932131b18d
commit
3acc53a11f
5 changed files with 305 additions and 146 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -2692,6 +2692,7 @@ dependencies = [
|
||||||
"gethostname",
|
"gethostname",
|
||||||
"http 1.1.0",
|
"http 1.1.0",
|
||||||
"httparse",
|
"httparse",
|
||||||
|
"libc",
|
||||||
"librqbit-core",
|
"librqbit-core",
|
||||||
"librqbit-sha1-wrapper",
|
"librqbit-sha1-wrapper",
|
||||||
"librqbit-upnp",
|
"librqbit-upnp",
|
||||||
|
|
|
||||||
2
Makefile
2
Makefile
|
|
@ -13,7 +13,7 @@ webui-dev: webui-deps
|
||||||
# NOTE: on LG TV using hostname is unstable for some reason, use IP address.
|
# NOTE: on LG TV using hostname is unstable for some reason, use IP address.
|
||||||
export RQBIT_UPNP_SERVER_ENABLE ?= true
|
export RQBIT_UPNP_SERVER_ENABLE ?= true
|
||||||
export RQBIT_UPNP_SERVER_FRIENDLY_NAME ?= rqbit-dev
|
export RQBIT_UPNP_SERVER_FRIENDLY_NAME ?= rqbit-dev
|
||||||
export RQBIT_HTTP_API_LISTEN_ADDR ?= 0.0.0.0:3030
|
export RQBIT_HTTP_API_LISTEN_ADDR ?= [::]:3030
|
||||||
export RQBIT_FASTRESUME = true
|
export RQBIT_FASTRESUME = true
|
||||||
CARGO_RUN_FLAGS ?=
|
CARGO_RUN_FLAGS ?=
|
||||||
RQBIT_OUTPUT_FOLDER ?= /tmp/scratch
|
RQBIT_OUTPUT_FOLDER ?= /tmp/scratch
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@ socket2 = "0.5.7"
|
||||||
quick-xml = { version = "0.36.1", features = ["serialize"] }
|
quick-xml = { version = "0.36.1", features = ["serialize"] }
|
||||||
network-interface = "2.0.0"
|
network-interface = "2.0.0"
|
||||||
futures = "0.3.30"
|
futures = "0.3.30"
|
||||||
|
libc = "0.2.158"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tracing-subscriber = "0.3.18"
|
tracing-subscriber = "0.3.18"
|
||||||
|
|
|
||||||
|
|
@ -1,24 +1,33 @@
|
||||||
use std::{
|
use std::{
|
||||||
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
|
collections::HashSet,
|
||||||
|
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
|
||||||
|
os::fd::{AsFd, AsRawFd},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use bstr::BStr;
|
use bstr::BStr;
|
||||||
use network_interface::NetworkInterfaceConfig;
|
use network_interface::NetworkInterfaceConfig;
|
||||||
|
use parking_lot::Mutex;
|
||||||
use tokio::net::UdpSocket;
|
use tokio::net::UdpSocket;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::{debug, trace, warn};
|
||||||
|
|
||||||
use crate::constants::{UPNP_KIND_MEDIASERVER, UPNP_KIND_ROOT_DEVICE};
|
use crate::constants::{UPNP_KIND_MEDIASERVER, UPNP_KIND_ROOT_DEVICE};
|
||||||
|
|
||||||
const UPNP_PORT: u16 = 1900;
|
const SSDP_PORT: u16 = 1900;
|
||||||
const UPNP_BROADCAST_IP: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250);
|
const SSDM_MCAST_IPV4: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250);
|
||||||
const UPNP_BROADCAST_ADDR: SocketAddrV4 = SocketAddrV4::new(UPNP_BROADCAST_IP, UPNP_PORT);
|
const SSDP_MCAST_IPV6_LINK_LOCAL: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xc);
|
||||||
|
const SSDP_MCAST_IPV6_SITE_LOCAL: Ipv6Addr = Ipv6Addr::new(0xff05, 0, 0, 0, 0, 0, 0, 0xc);
|
||||||
|
|
||||||
const NTS_ALIVE: &str = "ssdp:alive";
|
const NTS_ALIVE: &str = "ssdp:alive";
|
||||||
const NTS_BYEBYE: &str = "ssdp:byebye";
|
const NTS_BYEBYE: &str = "ssdp:byebye";
|
||||||
|
|
||||||
|
fn ipv6_is_link_local(ip: Ipv6Addr) -> bool {
|
||||||
|
let s = ip.segments();
|
||||||
|
[s[0], s[1], s[2], s[3]] == [0xfe80, 0, 0, 0]
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum SsdpMessage<'a, 'h> {
|
pub enum SsdpMessage<'a, 'h> {
|
||||||
MSearch(SsdpMSearchRequest<'a>),
|
MSearch(SsdpMSearchRequest<'a>),
|
||||||
|
|
@ -30,6 +39,7 @@ pub enum SsdpMessage<'a, 'h> {
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SsdpMSearchRequest<'a> {
|
pub struct SsdpMSearchRequest<'a> {
|
||||||
|
#[allow(dead_code)]
|
||||||
pub host: &'a BStr,
|
pub host: &'a BStr,
|
||||||
pub man: &'a BStr,
|
pub man: &'a BStr,
|
||||||
pub st: &'a BStr,
|
pub st: &'a BStr,
|
||||||
|
|
@ -37,9 +47,6 @@ pub struct SsdpMSearchRequest<'a> {
|
||||||
|
|
||||||
impl<'a> SsdpMSearchRequest<'a> {
|
impl<'a> SsdpMSearchRequest<'a> {
|
||||||
fn matches_media_server(&self) -> bool {
|
fn matches_media_server(&self) -> bool {
|
||||||
if self.host != "239.255.255.250:1900" {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if self.man != "\"ssdp:discover\"" {
|
if self.man != "\"ssdp:discover\"" {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
@ -103,61 +110,171 @@ pub struct SsdpRunnerOptions {
|
||||||
|
|
||||||
pub struct SsdpRunner {
|
pub struct SsdpRunner {
|
||||||
opts: SsdpRunnerOptions,
|
opts: SsdpRunnerOptions,
|
||||||
socket: UdpSocket,
|
socket_v4: Option<UdpSocket>,
|
||||||
|
socket_v6: Option<UdpSocket>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn socket_presetup(bind_addr: SocketAddr) -> anyhow::Result<tokio::net::UdpSocket> {
|
||||||
|
let domain = if bind_addr.is_ipv4() {
|
||||||
|
socket2::Domain::IPV4
|
||||||
|
} else {
|
||||||
|
socket2::Domain::IPV6
|
||||||
|
};
|
||||||
|
let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, None)
|
||||||
|
.context(bind_addr)
|
||||||
|
.context("error creating socket")?;
|
||||||
|
#[cfg(not(target_os = "windows"))]
|
||||||
|
sock.set_reuse_port(true)
|
||||||
|
.context("error setting SO_REUSEPORT")?;
|
||||||
|
sock.set_reuse_address(true)
|
||||||
|
.context("error setting SO_REUSEADDR")?;
|
||||||
|
|
||||||
|
trace!(addr=?bind_addr, "binding UDP");
|
||||||
|
sock.bind(&bind_addr.into())
|
||||||
|
.context(bind_addr)
|
||||||
|
.context("error binding")?;
|
||||||
|
|
||||||
|
sock.set_nonblocking(true)?;
|
||||||
|
let socket = tokio::net::UdpSocket::from_std(sock.into())
|
||||||
|
.context("error converting socket2 socket to tokio")?;
|
||||||
|
|
||||||
|
Ok(socket)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn bind_v4_socket() -> anyhow::Result<UdpSocket> {
|
||||||
|
let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, SSDP_PORT);
|
||||||
|
let socket = socket_presetup(bind_addr.into())?;
|
||||||
|
|
||||||
|
let default_multiast_membership_ip = std::iter::once(Ipv4Addr::UNSPECIFIED);
|
||||||
|
let all_multicast_membership_ips = network_interface::NetworkInterface::show()
|
||||||
|
.into_iter()
|
||||||
|
.flatten()
|
||||||
|
.flat_map(|nic| nic.addr.into_iter())
|
||||||
|
.filter_map(|addr| {
|
||||||
|
let ip = addr.ip();
|
||||||
|
match ip {
|
||||||
|
std::net::IpAddr::V4(addr) if addr.is_private() && !addr.is_loopback() => {
|
||||||
|
Some(addr)
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for ifaddr in default_multiast_membership_ip.chain(all_multicast_membership_ips) {
|
||||||
|
trace!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "joining multicast v4 group");
|
||||||
|
if let Err(e) = socket.join_multicast_v4(SSDM_MCAST_IPV4, ifaddr) {
|
||||||
|
debug!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "error joining multicast v4 group: {e:#}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(socket)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn bind_v6_socket() -> anyhow::Result<UdpSocket> {
|
||||||
|
let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, SSDP_PORT, 0, 0);
|
||||||
|
let socket = socket_presetup(bind_addr.into())?;
|
||||||
|
|
||||||
|
for nic in network_interface::NetworkInterface::show()
|
||||||
|
.into_iter()
|
||||||
|
.flatten()
|
||||||
|
{
|
||||||
|
let mut has_link_local = false;
|
||||||
|
let mut has_site_local = false;
|
||||||
|
for addr in nic.addr.iter() {
|
||||||
|
let addr = match addr.ip() {
|
||||||
|
IpAddr::V4(_) => continue,
|
||||||
|
IpAddr::V6(v6) => v6,
|
||||||
|
};
|
||||||
|
if addr.is_loopback() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if ipv6_is_link_local(addr) {
|
||||||
|
has_link_local = true;
|
||||||
|
} else {
|
||||||
|
has_site_local = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (present, multiaddr) in [
|
||||||
|
(has_link_local, SSDP_MCAST_IPV6_LINK_LOCAL),
|
||||||
|
(has_site_local, SSDP_MCAST_IPV6_SITE_LOCAL),
|
||||||
|
] {
|
||||||
|
if !present {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Err(e) = socket.join_multicast_v6(&multiaddr, nic.index) {
|
||||||
|
debug!(multiaddr=?multiaddr, interface=?nic.index, "error joining multicast v6 group: {e:#}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(socket)
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MulticastOpts {
|
||||||
|
local_interface_ip: IpAddr,
|
||||||
|
#[allow(dead_code)]
|
||||||
|
local_interface_id: u32,
|
||||||
|
addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_mcast_if(sock: &UdpSocket, local_ip: Ipv4Addr) -> std::io::Result<()> {
|
||||||
|
let addr = libc::in_addr {
|
||||||
|
s_addr: u32::from_ne_bytes(local_ip.octets()),
|
||||||
|
};
|
||||||
|
const SZ: usize = std::mem::size_of::<libc::in_addr>();
|
||||||
|
|
||||||
|
trace!(addr = %local_ip, "setting IP_MULTICAST_IF");
|
||||||
|
let ret = unsafe {
|
||||||
|
libc::setsockopt(
|
||||||
|
sock.as_fd().as_raw_fd(),
|
||||||
|
libc::IPPROTO_IP,
|
||||||
|
libc::IP_MULTICAST_IF,
|
||||||
|
&addr as *const _ as _,
|
||||||
|
SZ as u32,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
if ret < 0 {
|
||||||
|
return Err(std::io::Error::last_os_error());
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MulticastOpts {
|
||||||
|
fn addr_no_scope(&self) -> SocketAddr {
|
||||||
|
let mut addr = self.addr;
|
||||||
|
if let SocketAddr::V6(v6) = &mut addr {
|
||||||
|
v6.set_scope_id(0);
|
||||||
|
}
|
||||||
|
addr
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SsdpRunner {
|
impl SsdpRunner {
|
||||||
pub async fn new(opts: SsdpRunnerOptions) -> anyhow::Result<Self> {
|
pub async fn new(opts: SsdpRunnerOptions) -> anyhow::Result<Self> {
|
||||||
let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, UPNP_PORT);
|
let socket_v4 = bind_v4_socket()
|
||||||
let sock = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None)
|
.await
|
||||||
.context("error creating socket")?;
|
.map_err(|e| warn!("error creating IPv4 SSDP socket: {e:#}"))
|
||||||
#[cfg(not(target_os = "windows"))]
|
.ok();
|
||||||
sock.set_reuse_port(true)
|
let socket_v6 = bind_v6_socket()
|
||||||
.context("error setting SO_REUSEPORT")?;
|
.await
|
||||||
sock.set_reuse_address(true)
|
.map_err(|e| warn!("error creating IPv6 SSDP socket: {e:#}"))
|
||||||
.context("error setting SO_REUSEADDR")?;
|
.ok();
|
||||||
|
Ok(Self {
|
||||||
trace!(addr=?bind_addr, "binding UDP");
|
opts,
|
||||||
sock.bind(&bind_addr.into())
|
socket_v4,
|
||||||
.context(bind_addr)
|
socket_v6,
|
||||||
.context("error binding")?;
|
})
|
||||||
|
|
||||||
sock.set_nonblocking(true)?;
|
|
||||||
let socket = tokio::net::UdpSocket::from_std(sock.into())
|
|
||||||
.context("error converting socket2 socket to tokio")?;
|
|
||||||
|
|
||||||
let default_multiast_membership_ip = std::iter::once(Ipv4Addr::UNSPECIFIED);
|
|
||||||
let all_multicast_membership_ips = network_interface::NetworkInterface::show()
|
|
||||||
.into_iter()
|
|
||||||
.flatten()
|
|
||||||
.flat_map(|nic| nic.addr.into_iter())
|
|
||||||
.filter_map(|addr| {
|
|
||||||
let ip = addr.ip();
|
|
||||||
match ip {
|
|
||||||
std::net::IpAddr::V4(addr) if addr.is_private() && !addr.is_loopback() => {
|
|
||||||
Some(addr)
|
|
||||||
}
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for ifaddr in default_multiast_membership_ip.chain(all_multicast_membership_ips) {
|
|
||||||
trace!(multiaddr=?UPNP_BROADCAST_IP, interface=?ifaddr, "joining multicast v4 group");
|
|
||||||
if let Err(e) = socket.join_multicast_v4(UPNP_BROADCAST_IP, ifaddr) {
|
|
||||||
debug!(error=?e, multiaddr=?UPNP_BROADCAST_IP, interface=?ifaddr, "error joining multicast v4 group");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Self { opts, socket })
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn generate_notify_message(&self, kind: &str, nts: &str, location: &url::Url) -> String {
|
fn generate_notify_message(&self, kind: &str, nts: &str, opts: &MulticastOpts) -> String {
|
||||||
let usn: &str = &self.opts.usn;
|
let usn: &str = &self.opts.usn;
|
||||||
let server: &str = &self.opts.server_string;
|
let server: &str = &self.opts.server_string;
|
||||||
let bcast_addr = UPNP_BROADCAST_ADDR;
|
let host = opts.addr_no_scope();
|
||||||
|
let mut location = self.opts.description_http_location.clone();
|
||||||
|
let _ = location.set_ip_host(opts.local_interface_ip);
|
||||||
format!(
|
format!(
|
||||||
"NOTIFY * HTTP/1.1\r
|
"NOTIFY * HTTP/1.1\r
|
||||||
Host: {bcast_addr}\r
|
Host: {host}\r
|
||||||
Cache-Control: max-age=75\r
|
Cache-Control: max-age=75\r
|
||||||
Location: {location}\r
|
Location: {location}\r
|
||||||
NT: {kind}\r
|
NT: {kind}\r
|
||||||
|
|
@ -177,7 +294,7 @@ USN: {usn}::{kind}\r
|
||||||
let local_ip = ::librqbit_upnp::get_local_ip_relative_to(addr.ip())?;
|
let local_ip = ::librqbit_upnp::get_local_ip_relative_to(addr.ip())?;
|
||||||
let location = {
|
let location = {
|
||||||
let mut loc = self.opts.description_http_location.clone();
|
let mut loc = self.opts.description_http_location.clone();
|
||||||
loc.set_host(Some(&format!("{local_ip}")))?;
|
let _ = loc.set_ip_host(local_ip);
|
||||||
loc
|
loc
|
||||||
};
|
};
|
||||||
let usn = &self.opts.usn;
|
let usn = &self.opts.usn;
|
||||||
|
|
@ -194,7 +311,10 @@ Content-Length: 0\r\n\r\n"
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_send_notifies(&self, nts: &str) {
|
async fn try_send_mcast_everywhere(
|
||||||
|
&self,
|
||||||
|
get_payload: &impl Fn(&MulticastOpts) -> bstr::BString,
|
||||||
|
) {
|
||||||
use network_interface::NetworkInterfaceConfig;
|
use network_interface::NetworkInterfaceConfig;
|
||||||
let interfaces = network_interface::NetworkInterface::show();
|
let interfaces = network_interface::NetworkInterface::show();
|
||||||
let interfaces = match interfaces {
|
let interfaces = match interfaces {
|
||||||
|
|
@ -205,52 +325,79 @@ Content-Length: 0\r\n\r\n"
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let sent = Mutex::new(HashSet::new());
|
||||||
|
let sent = &sent;
|
||||||
|
|
||||||
let futs = interfaces
|
let futs = interfaces
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flat_map(|ni| ni.addr)
|
.flat_map(|ni| ni.addr.into_iter().map(move |a| (ni.index, a)))
|
||||||
.filter_map(|addr| {
|
.filter_map(|(ifidx, addr)| match addr.ip() {
|
||||||
match addr.ip() {
|
std::net::IpAddr::V4(a) if !a.is_loopback() && a.is_private() => {
|
||||||
std::net::IpAddr::V4(a) if !a.is_loopback() && a.is_private() => Some(a),
|
Some(MulticastOpts {
|
||||||
_ => None
|
local_interface_ip: addr.ip(),
|
||||||
|
local_interface_id: ifidx,
|
||||||
|
addr: SocketAddr::V4(SocketAddrV4::new(SSDM_MCAST_IPV4, SSDP_PORT)),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
std::net::IpAddr::V6(a) if !a.is_loopback() => Some(MulticastOpts {
|
||||||
|
local_interface_ip: addr.ip(),
|
||||||
|
local_interface_id: ifidx,
|
||||||
|
addr: {
|
||||||
|
let bip = if ipv6_is_link_local(a) {
|
||||||
|
SSDP_MCAST_IPV6_LINK_LOCAL
|
||||||
|
} else {
|
||||||
|
SSDP_MCAST_IPV6_SITE_LOCAL
|
||||||
|
};
|
||||||
|
SocketAddr::V6(SocketAddrV6::new(bip, SSDP_PORT, 0, ifidx))
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
_ => None,
|
||||||
})
|
})
|
||||||
.map(|ip| async move {
|
.map(|opts| async move {
|
||||||
let addr = SocketAddrV4::new(ip, 0);
|
let payload = get_payload(&opts);
|
||||||
let sock = match tokio::net::UdpSocket::bind(addr).await {
|
if !sent
|
||||||
Ok(sock) => sock,
|
.lock()
|
||||||
Err(e) => {
|
.insert((payload.clone(), opts.local_interface_id, opts.addr))
|
||||||
debug!(%addr, error=?e, "error binding UDP to send NOTIFY");
|
{
|
||||||
return;
|
// don't send duplicates
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let sock = match (
|
||||||
|
opts.local_interface_ip,
|
||||||
|
self.socket_v4.as_ref(),
|
||||||
|
self.socket_v6.as_ref(),
|
||||||
|
) {
|
||||||
|
(IpAddr::V4(ip), Some(sock_v4), _) => {
|
||||||
|
if let Err(e) = set_mcast_if(sock_v4, ip) {
|
||||||
|
debug!(addr=%ip, "error calling set_mcast_if: {e:#}");
|
||||||
|
}
|
||||||
|
sock_v4
|
||||||
}
|
}
|
||||||
|
(IpAddr::V6(_), _, Some(sock_v6)) => sock_v6,
|
||||||
|
_ => return,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut location = self.opts.description_http_location.clone();
|
match sock.send_to(payload.as_slice(), opts.addr).await {
|
||||||
location.set_host(Some(&format!("{ip}"))).unwrap();
|
Ok(sz) => trace!(payload=?payload, addr=%opts.addr, size=sz, "sent"),
|
||||||
|
Err(e) => {
|
||||||
macro_rules! gen {
|
debug!(payload=?payload, addr=%opts.addr, "error sending: {e:#}")
|
||||||
($kind:expr) => {
|
|
||||||
async {
|
|
||||||
let msg = self.generate_notify_message($kind, nts, &location);
|
|
||||||
trace!(content=?msg, addr=?UPNP_BROADCAST_ADDR, "sending SSDP NOTIFY");
|
|
||||||
if let Err(e) = sock.send_to(msg.as_bytes(), UPNP_BROADCAST_ADDR).await {
|
|
||||||
debug!(sock_addr=%addr, error=%e, kind=$kind, nts, "error sending SSDP NOTIFY")
|
|
||||||
} else {
|
|
||||||
debug!(kind=$kind, nts, %location, "sent SSDP NOTIFY")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
let f1 = gen!(UPNP_KIND_ROOT_DEVICE);
|
|
||||||
let f2 = gen!(UPNP_KIND_MEDIASERVER) ;
|
|
||||||
|
|
||||||
tokio::join!(f1, f2);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
futures::future::join_all(futs).await;
|
futures::future::join_all(futs).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn task_send_alive_notifies_periodically(&self) -> anyhow::Result<()> {
|
async fn try_send_notifies(&self, nts: &str) {
|
||||||
|
self.try_send_mcast_everywhere(&|opts| {
|
||||||
|
self.generate_notify_message(UPNP_KIND_MEDIASERVER, nts, opts)
|
||||||
|
.into()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn task_send_alive_notifies_periodically(&self) {
|
||||||
let mut interval = tokio::time::interval(self.opts.notify_interval);
|
let mut interval = tokio::time::interval(self.opts.notify_interval);
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
|
|
@ -258,7 +405,12 @@ Content-Length: 0\r\n\r\n"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_incoming_message(&self, msg: &[u8], addr: SocketAddr) -> anyhow::Result<()> {
|
async fn process_incoming_message(
|
||||||
|
&self,
|
||||||
|
msg: &[u8],
|
||||||
|
sock: &UdpSocket,
|
||||||
|
addr: SocketAddr,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
let mut headers = [httparse::EMPTY_HEADER; 16];
|
let mut headers = [httparse::EMPTY_HEADER; 16];
|
||||||
trace!(content = ?BStr::new(msg), ?addr, "received message");
|
trace!(content = ?BStr::new(msg), ?addr, "received message");
|
||||||
let parsed = try_parse_ssdp(msg, &mut headers);
|
let parsed = try_parse_ssdp(msg, &mut headers);
|
||||||
|
|
@ -281,8 +433,7 @@ Content-Length: 0\r\n\r\n"
|
||||||
if let Ok(st) = std::str::from_utf8(msg.st) {
|
if let Ok(st) = std::str::from_utf8(msg.st) {
|
||||||
let response = self.generate_ssdp_discover_response(st, addr)?;
|
let response = self.generate_ssdp_discover_response(st, addr)?;
|
||||||
trace!(content = response, ?addr, "sending SSDP discover response");
|
trace!(content = response, ?addr, "sending SSDP discover response");
|
||||||
self.socket
|
sock.send_to(response.as_bytes(), addr)
|
||||||
.send_to(response.as_bytes(), addr)
|
|
||||||
.await
|
.await
|
||||||
.context("error sending")?;
|
.context("error sending")?;
|
||||||
}
|
}
|
||||||
|
|
@ -290,51 +441,57 @@ Content-Length: 0\r\n\r\n"
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn task_respond_on_msearches(&self) -> anyhow::Result<()> {
|
async fn task_respond_on_msearches(&self, sock: Option<&UdpSocket>) {
|
||||||
let mut buf = vec![0u8; 16184];
|
let mut buf = vec![0u8; 16184];
|
||||||
|
let sock = match sock {
|
||||||
|
Some(sock) => sock,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (sz, addr) = self
|
let (sz, addr) = match sock.recv_from(&mut buf).await {
|
||||||
.socket
|
Ok((sz, addr)) => (sz, addr),
|
||||||
.recv_from(&mut buf)
|
Err(e) => {
|
||||||
.await
|
warn!(error=?e, "error receving");
|
||||||
.context("error receiving")?;
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
let msg = &buf[..sz];
|
let msg = &buf[..sz];
|
||||||
if let Err(e) = self.process_incoming_message(msg, addr).await {
|
if let Err(e) = self.process_incoming_message(msg, sock, addr).await {
|
||||||
warn!(error=?e, ?addr, "error processing incoming SSDP message")
|
warn!(error=?e, ?addr, "error processing incoming SSDP message")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_msearch(&self) -> anyhow::Result<()> {
|
async fn try_send_example_msearch(&self) {
|
||||||
let msearch_msg = "M-SEARCH * HTTP/1.1\r
|
self.try_send_mcast_everywhere(&|opts| {
|
||||||
HOST: 239.255.255.250:1900\r
|
let dest = opts.addr_no_scope();
|
||||||
|
format!(
|
||||||
|
"M-SEARCH * HTTP/1.1\r
|
||||||
|
HOST: {dest}\r
|
||||||
ST: urn:schemas-upnp-org:device:MediaServer:1\r
|
ST: urn:schemas-upnp-org:device:MediaServer:1\r
|
||||||
MAN: \"ssdp:discover\"\r
|
MAN: \"ssdp:discover\"\r
|
||||||
MX: 2\r\n\r\n";
|
MX: 2\r\n\r\n"
|
||||||
|
)
|
||||||
trace!(content = msearch_msg, "multicasting M-SEARCH");
|
.into()
|
||||||
|
})
|
||||||
self.socket
|
.await
|
||||||
.send_to(msearch_msg.as_bytes(), UPNP_BROADCAST_ADDR)
|
|
||||||
.await
|
|
||||||
.context("error sending msearch")?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_forever(&self) -> anyhow::Result<()> {
|
pub async fn run_forever(&self) -> anyhow::Result<()> {
|
||||||
// This isn't necessary, but would show that it works.
|
// This isn't necessary, but would show that it works.
|
||||||
self.send_msearch().await?;
|
let t0 = self.try_send_example_msearch();
|
||||||
|
let t1 = self.task_respond_on_msearches(self.socket_v4.as_ref());
|
||||||
|
let t2 = self.task_respond_on_msearches(self.socket_v6.as_ref());
|
||||||
|
let t3 = self.task_send_alive_notifies_periodically();
|
||||||
|
|
||||||
let t1 = self.task_respond_on_msearches();
|
let wait = async move {
|
||||||
let t2 = self.task_send_alive_notifies_periodically();
|
tokio::join!(t0, t1, t2, t3);
|
||||||
|
Ok(())
|
||||||
tokio::pin!(t1);
|
};
|
||||||
tokio::pin!(t2);
|
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
r = &mut t1 => r,
|
r = wait => r,
|
||||||
r = &mut t2 => r,
|
|
||||||
_ = self.opts.shutdown.cancelled() => {
|
_ = self.opts.shutdown.cancelled() => {
|
||||||
self.try_send_notifies(NTS_BYEBYE).await;
|
self.try_send_notifies(NTS_BYEBYE).await;
|
||||||
bail!("canceled");
|
bail!("canceled");
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ use reqwest::Client;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashSet,
|
collections::HashSet,
|
||||||
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
|
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
||||||
|
|
@ -30,21 +30,21 @@ pub fn make_ssdp_search_request(kind: &str) -> String {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result<Ipv4Addr> {
|
pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result<IpAddr> {
|
||||||
let local_dest = match local_dest {
|
fn ip_bits_v4(addr: Ipv4Addr) -> u32 {
|
||||||
IpAddr::V4(v4) => v4,
|
|
||||||
IpAddr::V6(v6) => {
|
|
||||||
anyhow::bail!("get_local_ip_relative_to not implemented for IPv6; addr={v6}")
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Ipv4Addr.to_bits() is only there in nightly rust, so copying here for now.
|
|
||||||
fn ip_bits(addr: Ipv4Addr) -> u32 {
|
|
||||||
u32::from_be_bytes(addr.octets())
|
u32::from_be_bytes(addr.octets())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn masked(ip: Ipv4Addr, mask: Ipv4Addr) -> u32 {
|
fn masked_v4(ip: Ipv4Addr, mask: Ipv4Addr) -> u32 {
|
||||||
ip_bits(ip) & ip_bits(mask)
|
ip_bits_v4(ip) & ip_bits_v4(mask)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ip_bits_v6(addr: Ipv6Addr) -> u128 {
|
||||||
|
u128::from_be_bytes(addr.octets())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn masked_v6(ip: Ipv6Addr, mask: Ipv6Addr) -> u128 {
|
||||||
|
ip_bits_v6(ip) & ip_bits_v6(mask)
|
||||||
}
|
}
|
||||||
|
|
||||||
let interfaces =
|
let interfaces =
|
||||||
|
|
@ -52,18 +52,18 @@ pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result<Ipv4Addr>
|
||||||
|
|
||||||
for i in interfaces {
|
for i in interfaces {
|
||||||
for addr in i.addr {
|
for addr in i.addr {
|
||||||
if let network_interface::Addr::V4(v4) = addr {
|
match (local_dest, addr.ip(), addr.netmask()) {
|
||||||
let ip = v4.ip;
|
(IpAddr::V4(l), IpAddr::V4(a), Some(IpAddr::V4(m)))
|
||||||
let mask = match v4.netmask {
|
if masked_v4(l, m) == masked_v4(a, m) =>
|
||||||
Some(mask) => mask,
|
{
|
||||||
None => continue,
|
return Ok(addr.ip())
|
||||||
};
|
|
||||||
trace!("found local addr {ip}/{mask}");
|
|
||||||
// If the masked address is the same, means we are on the same network, return
|
|
||||||
// the ip address
|
|
||||||
if masked(ip, mask) == masked(local_dest, mask) {
|
|
||||||
return Ok(ip);
|
|
||||||
}
|
}
|
||||||
|
(IpAddr::V6(l), IpAddr::V6(a), Some(IpAddr::V6(m)))
|
||||||
|
if masked_v6(l, m) == masked_v6(a, m) =>
|
||||||
|
{
|
||||||
|
return Ok(addr.ip())
|
||||||
|
}
|
||||||
|
_ => continue,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -72,7 +72,7 @@ pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result<Ipv4Addr>
|
||||||
|
|
||||||
async fn forward_port(
|
async fn forward_port(
|
||||||
control_url: Url,
|
control_url: Url,
|
||||||
local_ip: Ipv4Addr,
|
local_ip: IpAddr,
|
||||||
port: u16,
|
port: u16,
|
||||||
lease_duration: Duration,
|
lease_duration: Duration,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
|
@ -229,7 +229,7 @@ impl UpnpEndpoint {
|
||||||
.flat_map(move |d| d.iter_services(self_span.clone()))
|
.flat_map(move |d| d.iter_services(self_span.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn my_local_ip(&self) -> anyhow::Result<Ipv4Addr> {
|
fn my_local_ip(&self) -> anyhow::Result<IpAddr> {
|
||||||
let dest_ip = self.discover_response.received_from.ip();
|
let dest_ip = self.discover_response.received_from.ip();
|
||||||
let local_ip = get_local_ip_relative_to(dest_ip)
|
let local_ip = get_local_ip_relative_to(dest_ip)
|
||||||
.with_context(|| format!("can't determine local IP relative to {dest_ip}"))?;
|
.with_context(|| format!("can't determine local IP relative to {dest_ip}"))?;
|
||||||
|
|
@ -419,7 +419,7 @@ impl UpnpPortForwarder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn manage_port(&self, control_url: Url, local_ip: Ipv4Addr, port: u16) -> ! {
|
async fn manage_port(&self, control_url: Url, local_ip: IpAddr, port: u16) -> ! {
|
||||||
let lease_duration = self.opts.lease_duration;
|
let lease_duration = self.opts.lease_duration;
|
||||||
let mut interval = tokio::time::interval(lease_duration / 2);
|
let mut interval = tokio::time::interval(lease_duration / 2);
|
||||||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
||||||
|
|
@ -433,7 +433,7 @@ impl UpnpPortForwarder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn manage_service(&self, control_url: Url, local_ip: Ipv4Addr) -> anyhow::Result<()> {
|
async fn manage_service(&self, control_url: Url, local_ip: IpAddr) -> anyhow::Result<()> {
|
||||||
futures::future::join_all(self.ports.iter().cloned().map(|port| {
|
futures::future::join_all(self.ports.iter().cloned().map(|port| {
|
||||||
self.manage_port(control_url.clone(), local_ip, port)
|
self.manage_port(control_url.clone(), local_ip, port)
|
||||||
.instrument(error_span!("manage_port", port = port))
|
.instrument(error_span!("manage_port", port = port))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue