Merge pull request #238 from ikatson/ssdp-ipv6

[Feature] SSDP IPv6 support
This commit is contained in:
Igor Katson 2024-09-20 09:38:29 +01:00 committed by GitHub
commit d9a56acacb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 346 additions and 165 deletions

View file

@ -13,7 +13,7 @@ webui-dev: webui-deps
# NOTE: on LG TV using hostname is unstable for some reason, use IP address.
export RQBIT_UPNP_SERVER_ENABLE ?= true
export RQBIT_UPNP_SERVER_FRIENDLY_NAME ?= rqbit-dev
export RQBIT_HTTP_API_LISTEN_ADDR ?= 0.0.0.0:3030
export RQBIT_HTTP_API_LISTEN_ADDR ?= [::]:3030
export RQBIT_FASTRESUME = true
CARGO_RUN_FLAGS ?=
RQBIT_OUTPUT_FOLDER ?= /tmp/scratch

View file

@ -1198,17 +1198,13 @@ impl Session {
peer_rx,
);
{
let span = managed_torrent.shared.span.clone();
let _ = span.enter();
managed_torrent
.start(peer_rx, opts.paused)
.context("error starting torrent")?;
}
let _e = managed_torrent.shared.span.clone().entered();
managed_torrent
.start(peer_rx, opts.paused)
.context("error starting torrent")?;
if let Some(name) = managed_torrent.shared().info.name.as_ref() {
info!(?name, id, "added torrent");
info!(?name, "added torrent");
}
Ok(AddTorrentResponse::Added(id, managed_torrent))

View file

@ -1,5 +1,5 @@
pub const UPNP_KIND_ROOT_DEVICE: &str = "upnp:rootdevice";
pub const UPNP_KIND_MEDIASERVER: &str = "urn:schemas-upnp-org:device:MediaServer:1";
pub const UPNP_DEVICE_ROOT: &str = "upnp:rootdevice";
pub const UPNP_DEVICE_MEDIASERVER: &str = "urn:schemas-upnp-org:device:MediaServer:1";
pub const SOAP_ACTION_CONTENT_DIRECTORY_BROWSE: &[u8] =
b"\"urn:schemas-upnp-org:service:ContentDirectory:1#Browse\"";

View file

@ -1,20 +1,23 @@
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
collections::HashSet,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
time::Duration,
};
use anyhow::{bail, Context};
use bstr::BStr;
use librqbit_upnp::ipv6_is_link_local;
use network_interface::NetworkInterfaceConfig;
use tokio::net::UdpSocket;
use parking_lot::Mutex;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use crate::constants::{UPNP_KIND_MEDIASERVER, UPNP_KIND_ROOT_DEVICE};
use crate::constants::{UPNP_DEVICE_MEDIASERVER, UPNP_DEVICE_ROOT};
const UPNP_PORT: u16 = 1900;
const UPNP_BROADCAST_IP: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250);
const UPNP_BROADCAST_ADDR: SocketAddrV4 = SocketAddrV4::new(UPNP_BROADCAST_IP, UPNP_PORT);
const SSDP_PORT: u16 = 1900;
const SSDM_MCAST_IPV4: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250);
const SSDP_MCAST_IPV6_LINK_LOCAL: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xc);
const SSDP_MCAST_IPV6_SITE_LOCAL: Ipv6Addr = Ipv6Addr::new(0xff05, 0, 0, 0, 0, 0, 0, 0xc);
const NTS_ALIVE: &str = "ssdp:alive";
const NTS_BYEBYE: &str = "ssdp:byebye";
@ -30,6 +33,7 @@ pub enum SsdpMessage<'a, 'h> {
#[derive(Debug)]
pub struct SsdpMSearchRequest<'a> {
#[allow(dead_code)]
pub host: &'a BStr,
pub man: &'a BStr,
pub st: &'a BStr,
@ -37,13 +41,10 @@ pub struct SsdpMSearchRequest<'a> {
impl<'a> SsdpMSearchRequest<'a> {
fn matches_media_server(&self) -> bool {
if self.host != "239.255.255.250:1900" {
return false;
}
if self.man != "\"ssdp:discover\"" {
return false;
}
if self.st == UPNP_KIND_ROOT_DEVICE || self.st == UPNP_KIND_MEDIASERVER {
if self.st == UPNP_DEVICE_ROOT || self.st == UPNP_DEVICE_MEDIASERVER {
return true;
}
false
@ -101,69 +102,174 @@ pub struct SsdpRunnerOptions {
pub shutdown: CancellationToken,
}
struct UdpSocket {
sock2: socket2::Socket,
tokio: tokio::net::UdpSocket,
}
pub struct SsdpRunner {
opts: SsdpRunnerOptions,
socket: UdpSocket,
socket_v4: Option<UdpSocket>,
socket_v6: Option<UdpSocket>,
}
fn socket_presetup(bind_addr: SocketAddr) -> anyhow::Result<UdpSocket> {
let domain = if bind_addr.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, None)
.context(bind_addr)
.context("error creating socket")?;
#[cfg(not(target_os = "windows"))]
sock.set_reuse_port(true)
.context("error setting SO_REUSEPORT")?;
sock.set_reuse_address(true)
.context("error setting SO_REUSEADDR")?;
trace!(addr=?bind_addr, "binding UDP");
sock.bind(&bind_addr.into())
.context(bind_addr)
.context("error binding")?;
sock.set_nonblocking(true)?;
let sock_clone = sock.try_clone().context("can't clone socket")?;
let tokio_socket = tokio::net::UdpSocket::from_std(sock_clone.into())
.context("error converting socket2 socket to tokio")?;
Ok(UdpSocket {
sock2: sock,
tokio: tokio_socket,
})
}
async fn bind_v4_socket() -> anyhow::Result<UdpSocket> {
let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, SSDP_PORT);
let socket = socket_presetup(bind_addr.into())?;
let default_multiast_membership_ip = std::iter::once(Ipv4Addr::UNSPECIFIED);
let all_multicast_membership_ips = network_interface::NetworkInterface::show()
.into_iter()
.flatten()
.flat_map(|nic| nic.addr.into_iter())
.filter_map(|addr| {
let ip = addr.ip();
match ip {
std::net::IpAddr::V4(addr) if addr.is_private() && !addr.is_loopback() => {
Some(addr)
}
_ => None,
}
});
for ifaddr in default_multiast_membership_ip.chain(all_multicast_membership_ips) {
trace!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "joining multicast v4 group");
if let Err(e) = socket.tokio.join_multicast_v4(SSDM_MCAST_IPV4, ifaddr) {
debug!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "error joining multicast v4 group: {e:#}");
}
}
Ok(socket)
}
async fn bind_v6_socket() -> anyhow::Result<UdpSocket> {
let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, SSDP_PORT, 0, 0);
let socket = socket_presetup(bind_addr.into())?;
for nic in network_interface::NetworkInterface::show()
.into_iter()
.flatten()
{
let mut has_link_local = false;
let mut has_site_local = false;
for addr in nic.addr.iter() {
let addr = match addr.ip() {
IpAddr::V4(_) => continue,
IpAddr::V6(v6) => v6,
};
if addr.is_loopback() {
continue;
}
if ipv6_is_link_local(addr) {
has_link_local = true;
} else {
has_site_local = true;
}
}
for (present, multiaddr) in [
(has_site_local, SSDP_MCAST_IPV6_SITE_LOCAL),
(has_link_local, SSDP_MCAST_IPV6_LINK_LOCAL),
] {
if !present {
continue;
}
trace!(multiaddr=?multiaddr, interface=?nic.index, "joining multicast v6 group");
if let Err(e) = socket.tokio.join_multicast_v6(&multiaddr, nic.index) {
debug!(multiaddr=?multiaddr, interface=?nic.index, "error joining multicast v6 group: {e:#}");
}
}
}
Ok(socket)
}
struct MulticastOpts {
interface_addr: IpAddr,
#[allow(dead_code)]
interface_id: u32,
mcast_addr: SocketAddr,
}
impl MulticastOpts {
fn addr_no_scope(&self) -> SocketAddr {
let mut addr = self.mcast_addr;
if let SocketAddr::V6(v6) = &mut addr {
v6.set_scope_id(0);
}
addr
}
}
impl SsdpRunner {
pub async fn new(opts: SsdpRunnerOptions) -> anyhow::Result<Self> {
let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, UPNP_PORT);
let sock = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None)
.context("error creating socket")?;
#[cfg(not(target_os = "windows"))]
sock.set_reuse_port(true)
.context("error setting SO_REUSEPORT")?;
sock.set_reuse_address(true)
.context("error setting SO_REUSEADDR")?;
trace!(addr=?bind_addr, "binding UDP");
sock.bind(&bind_addr.into())
.context(bind_addr)
.context("error binding")?;
sock.set_nonblocking(true)?;
let socket = tokio::net::UdpSocket::from_std(sock.into())
.context("error converting socket2 socket to tokio")?;
let default_multiast_membership_ip = std::iter::once(Ipv4Addr::UNSPECIFIED);
let all_multicast_membership_ips = network_interface::NetworkInterface::show()
.into_iter()
.flatten()
.flat_map(|nic| nic.addr.into_iter())
.filter_map(|addr| {
let ip = addr.ip();
match ip {
std::net::IpAddr::V4(addr) if addr.is_private() && !addr.is_loopback() => {
Some(addr)
}
_ => None,
}
});
for ifaddr in default_multiast_membership_ip.chain(all_multicast_membership_ips) {
trace!(multiaddr=?UPNP_BROADCAST_IP, interface=?ifaddr, "joining multicast v4 group");
if let Err(e) = socket.join_multicast_v4(UPNP_BROADCAST_IP, ifaddr) {
debug!(error=?e, multiaddr=?UPNP_BROADCAST_IP, interface=?ifaddr, "error joining multicast v4 group");
}
}
Ok(Self { opts, socket })
let socket_v4 = bind_v4_socket()
.await
.map_err(|e| warn!("error creating IPv4 SSDP socket: {e:#}"))
.ok();
let socket_v6 = bind_v6_socket()
.await
.map_err(|e| warn!("error creating IPv6 SSDP socket: {e:#}"))
.ok();
Ok(Self {
opts,
socket_v4,
socket_v6,
})
}
fn generate_notify_message(&self, kind: &str, nts: &str, location: &url::Url) -> String {
fn generate_notify_message(
&self,
device_kind: &str,
nts: &str,
opts: &MulticastOpts,
) -> String {
let usn: &str = &self.opts.usn;
let server: &str = &self.opts.server_string;
let bcast_addr = UPNP_BROADCAST_ADDR;
let host = opts.addr_no_scope();
let mut location = self.opts.description_http_location.clone();
let _ = location.set_ip_host(opts.interface_addr);
format!(
"NOTIFY * HTTP/1.1\r
Host: {bcast_addr}\r
Host: {host}\r
Cache-Control: max-age=75\r
Location: {location}\r
NT: {kind}\r
NT: {device_kind}\r
NTS: {nts}\r
Server: {server}\r
USN: {usn}::{kind}\r
USN: {usn}::{device_kind}\r
\r
"
)
@ -174,10 +280,10 @@ USN: {usn}::{kind}\r
st: &str,
addr: SocketAddr,
) -> anyhow::Result<String> {
let local_ip = ::librqbit_upnp::get_local_ip_relative_to(addr.ip())?;
let local_ip = ::librqbit_upnp::get_local_ip_relative_to(addr)?;
let location = {
let mut loc = self.opts.description_http_location.clone();
loc.set_host(Some(&format!("{local_ip}")))?;
let _ = loc.set_ip_host(local_ip);
loc
};
let usn = &self.opts.usn;
@ -194,7 +300,10 @@ Content-Length: 0\r\n\r\n"
))
}
async fn try_send_notifies(&self, nts: &str) {
async fn try_send_mcast_everywhere(
&self,
get_payload: &impl Fn(&MulticastOpts) -> bstr::BString,
) {
use network_interface::NetworkInterfaceConfig;
let interfaces = network_interface::NetworkInterface::show();
let interfaces = match interfaces {
@ -205,52 +314,94 @@ Content-Length: 0\r\n\r\n"
}
};
let sent = Mutex::new(HashSet::new());
let sent = &sent;
let futs = interfaces
.into_iter()
.flat_map(|ni| ni.addr)
.filter_map(|addr| {
match addr.ip() {
std::net::IpAddr::V4(a) if !a.is_loopback() && a.is_private() => Some(a),
_ => None
.flat_map(|ni|
ni.addr.into_iter().map(move |a| (ni.index, a))
)
.filter_map(|(ifidx, addr)| match addr.ip() {
std::net::IpAddr::V4(a) if !a.is_loopback() && a.is_private() => {
Some(MulticastOpts {
interface_addr: addr.ip(),
interface_id: ifidx,
mcast_addr: SocketAddr::V4(SocketAddrV4::new(SSDM_MCAST_IPV4, SSDP_PORT)),
})
}
std::net::IpAddr::V6(a) if !a.is_loopback() => Some(MulticastOpts {
interface_addr: addr.ip(),
interface_id: ifidx,
mcast_addr: {
if ipv6_is_link_local(a) {
SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_LINK_LOCAL, SSDP_PORT, 0, ifidx))
} else {
SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_SITE_LOCAL, SSDP_PORT, 0, 0))
}
},
}),
_ => {
trace!(oif_id=ifidx, addr=%addr.ip(), "ignoring address");
None
},
})
.map(|ip| async move {
let addr = SocketAddrV4::new(ip, 0);
let sock = match tokio::net::UdpSocket::bind(addr).await {
Ok(sock) => sock,
Err(e) => {
debug!(%addr, error=?e, "error binding UDP to send NOTIFY");
return;
.map(|opts| async move {
let payload = get_payload(&opts);
if !sent
.lock()
.insert((payload.clone(), opts.interface_id, opts.mcast_addr))
{
trace!(oif_id=opts.interface_id, addr=%opts.mcast_addr, "not sending duplicate payload");
return;
}
let sock = match (
opts.interface_addr,
self.socket_v4.as_ref(),
self.socket_v6.as_ref(),
) {
// Call setsockopt(IP_MULTICAST_IF), so that the message
// gets sent out of the interface we want (otherwise it'll get sent through
// default one).
(IpAddr::V4(ip), Some(sock_v4), _) => {
if let Err(e) = sock_v4.sock2.set_multicast_if_v4(&ip) {
debug!(addr=%ip, "error calling set_multicast_if_v4: {e:#}");
}
sock_v4
}
(IpAddr::V6(_), _, Some(sock_v6)) => {
if let Err(e) = sock_v6.sock2.set_multicast_if_v6(opts.interface_id) {
debug!(oif_id=opts.interface_id, "error calling set_multicast_if_v6: {e:#}");
}
sock_v6
},
_ => {
trace!(addr=%opts.interface_addr, "ignoring address, no socket to send to");
return;
},
};
let mut location = self.opts.description_http_location.clone();
location.set_host(Some(&format!("{ip}"))).unwrap();
macro_rules! gen {
($kind:expr) => {
async {
let msg = self.generate_notify_message($kind, nts, &location);
trace!(content=?msg, addr=?UPNP_BROADCAST_ADDR, "sending SSDP NOTIFY");
if let Err(e) = sock.send_to(msg.as_bytes(), UPNP_BROADCAST_ADDR).await {
debug!(sock_addr=%addr, error=%e, kind=$kind, nts, "error sending SSDP NOTIFY")
} else {
debug!(kind=$kind, nts, %location, "sent SSDP NOTIFY")
}
}
match sock.tokio.send_to(payload.as_slice(), opts.mcast_addr).await {
Ok(sz) => trace!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, size=sz, payload=?payload, "sent"),
Err(e) => {
debug!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, payload=?payload, "error sending: {e:#}")
}
}
let f1 = gen!(UPNP_KIND_ROOT_DEVICE);
let f2 = gen!(UPNP_KIND_MEDIASERVER) ;
tokio::join!(f1, f2);
};
});
futures::future::join_all(futs).await;
}
async fn task_send_alive_notifies_periodically(&self) -> anyhow::Result<()> {
async fn try_send_notifies(&self, nts: &str) {
self.try_send_mcast_everywhere(&|opts| {
self.generate_notify_message(UPNP_DEVICE_MEDIASERVER, nts, opts)
.into()
})
.await
}
async fn task_send_alive_notifies_periodically(&self) {
let mut interval = tokio::time::interval(self.opts.notify_interval);
loop {
interval.tick().await;
@ -258,7 +409,12 @@ Content-Length: 0\r\n\r\n"
}
}
async fn process_incoming_message(&self, msg: &[u8], addr: SocketAddr) -> anyhow::Result<()> {
async fn process_incoming_message(
&self,
msg: &[u8],
sock: &UdpSocket,
addr: SocketAddr,
) -> anyhow::Result<()> {
let mut headers = [httparse::EMPTY_HEADER; 16];
trace!(content = ?BStr::new(msg), ?addr, "received message");
let parsed = try_parse_ssdp(msg, &mut headers);
@ -281,7 +437,7 @@ Content-Length: 0\r\n\r\n"
if let Ok(st) = std::str::from_utf8(msg.st) {
let response = self.generate_ssdp_discover_response(st, addr)?;
trace!(content = response, ?addr, "sending SSDP discover response");
self.socket
sock.tokio
.send_to(response.as_bytes(), addr)
.await
.context("error sending")?;
@ -290,54 +446,60 @@ Content-Length: 0\r\n\r\n"
Ok(())
}
async fn task_respond_on_msearches(&self) -> anyhow::Result<()> {
async fn task_respond_on_msearches(&self, sock: Option<&UdpSocket>) {
let mut buf = vec![0u8; 16184];
let sock = match sock {
Some(sock) => sock,
None => return,
};
loop {
let (sz, addr) = self
.socket
.recv_from(&mut buf)
.await
.context("error receiving")?;
let (sz, addr) = match sock.tokio.recv_from(&mut buf).await {
Ok((sz, addr)) => (sz, addr),
Err(e) => {
warn!(error=?e, "error receving");
return;
}
};
let msg = &buf[..sz];
if let Err(e) = self.process_incoming_message(msg, addr).await {
if let Err(e) = self.process_incoming_message(msg, sock, addr).await {
warn!(error=?e, ?addr, "error processing incoming SSDP message")
}
}
}
async fn send_msearch(&self) -> anyhow::Result<()> {
let msearch_msg = "M-SEARCH * HTTP/1.1\r
HOST: 239.255.255.250:1900\r
async fn try_send_example_msearch(&self) {
self.try_send_mcast_everywhere(&|opts| {
let dest = opts.addr_no_scope();
format!(
"M-SEARCH * HTTP/1.1\r
HOST: {dest}\r
ST: urn:schemas-upnp-org:device:MediaServer:1\r
MAN: \"ssdp:discover\"\r
MX: 2\r\n\r\n";
trace!(content = msearch_msg, "multicasting M-SEARCH");
self.socket
.send_to(msearch_msg.as_bytes(), UPNP_BROADCAST_ADDR)
.await
.context("error sending msearch")?;
Ok(())
MX: 2\r\n\r\n"
)
.into()
})
.await
}
pub async fn run_forever(&self) -> anyhow::Result<()> {
// This isn't necessary, but would show that it works.
self.send_msearch().await?;
let t0 = self.try_send_example_msearch();
let t1 = self.task_respond_on_msearches(self.socket_v4.as_ref());
let t2 = self.task_respond_on_msearches(self.socket_v6.as_ref());
let t3 = self.task_send_alive_notifies_periodically();
let t1 = self.task_respond_on_msearches();
let t2 = self.task_send_alive_notifies_periodically();
tokio::pin!(t1);
tokio::pin!(t2);
let wait = async move {
tokio::join!(t0, t1, t2, t3);
Ok(())
};
tokio::select! {
r = &mut t1 => r,
r = &mut t2 => r,
r = wait => r,
_ = self.opts.shutdown.cancelled() => {
self.try_send_notifies(NTS_BYEBYE).await;
bail!("canceled");
Ok(())
}
}
}

View file

@ -6,7 +6,7 @@ use reqwest::Client;
use serde::Deserialize;
use std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4},
time::Duration,
};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
@ -30,21 +30,29 @@ pub fn make_ssdp_search_request(kind: &str) -> String {
)
}
pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result<Ipv4Addr> {
let local_dest = match local_dest {
IpAddr::V4(v4) => v4,
IpAddr::V6(v6) => {
anyhow::bail!("get_local_ip_relative_to not implemented for IPv6; addr={v6}")
}
};
// .to_bits() isn't yet available on min rust version we support (1.75 at the time of writing this)
const fn ip_bits_v6(addr: Ipv6Addr) -> u128 {
u128::from_be_bytes(addr.octets())
}
// Ipv4Addr.to_bits() is only there in nightly rust, so copying here for now.
fn ip_bits(addr: Ipv4Addr) -> u32 {
pub fn ipv6_is_link_local(ip: Ipv6Addr) -> bool {
const LL: Ipv6Addr = Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 0);
const MASK: Ipv6Addr = Ipv6Addr::new(0xffff, 0xffff, 0xffff, 0xffff, 0, 0, 0, 0);
ip_bits_v6(ip) & ip_bits_v6(MASK) == ip_bits_v6(LL) & ip_bits_v6(MASK)
}
pub fn get_local_ip_relative_to(local_dest: SocketAddr) -> anyhow::Result<IpAddr> {
fn ip_bits_v4(addr: Ipv4Addr) -> u32 {
u32::from_be_bytes(addr.octets())
}
fn masked(ip: Ipv4Addr, mask: Ipv4Addr) -> u32 {
ip_bits(ip) & ip_bits(mask)
fn masked_v4(ip: Ipv4Addr, mask: Ipv4Addr) -> u32 {
ip_bits_v4(ip) & ip_bits_v4(mask)
}
fn masked_v6(ip: Ipv6Addr, mask: Ipv6Addr) -> u128 {
ip_bits_v6(ip) & ip_bits_v6(mask)
}
let interfaces =
@ -52,18 +60,33 @@ pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result<Ipv4Addr>
for i in interfaces {
for addr in i.addr {
if let network_interface::Addr::V4(v4) = addr {
let ip = v4.ip;
let mask = match v4.netmask {
Some(mask) => mask,
None => continue,
};
trace!("found local addr {ip}/{mask}");
// If the masked address is the same, means we are on the same network, return
// the ip address
if masked(ip, mask) == masked(local_dest, mask) {
return Ok(ip);
trace!(%local_dest, nic=i.index, ip=?addr.ip(), nm=?addr.netmask(), "dbg");
match (local_dest, addr.ip(), addr.netmask()) {
// We are connecting to ourselves, return itself.
(l, a, _) if l.ip() == a => return Ok(addr.ip()),
// IPv4 masks match.
(SocketAddr::V4(l), IpAddr::V4(a), Some(IpAddr::V4(m)))
if masked_v4(*l.ip(), m) == masked_v4(a, m) =>
{
return Ok(addr.ip())
}
// Return IPv6 link-local addresses when source is link-local address and there's a scope_id set.
(SocketAddr::V6(l), IpAddr::V6(a), _)
if ipv6_is_link_local(*l.ip()) && l.scope_id() > 0 =>
{
if ipv6_is_link_local(a) && l.scope_id() == i.index {
return Ok(addr.ip());
}
}
// If V6 masks match, return.
(SocketAddr::V6(l), IpAddr::V6(a), Some(IpAddr::V6(m)))
if masked_v6(*l.ip(), m) == masked_v6(a, m) =>
{
return Ok(addr.ip())
}
// For IPv6 fallback to returning a random (first encountered) IPv6 address.
(SocketAddr::V6(_), IpAddr::V6(_), None) => return Ok(addr.ip()),
_ => continue,
}
}
}
@ -72,7 +95,7 @@ pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result<Ipv4Addr>
async fn forward_port(
control_url: Url,
local_ip: Ipv4Addr,
local_ip: IpAddr,
port: u16,
lease_duration: Duration,
) -> anyhow::Result<()> {
@ -229,10 +252,10 @@ impl UpnpEndpoint {
.flat_map(move |d| d.iter_services(self_span.clone()))
}
fn my_local_ip(&self) -> anyhow::Result<Ipv4Addr> {
let dest_ip = self.discover_response.received_from.ip();
let local_ip = get_local_ip_relative_to(dest_ip)
.with_context(|| format!("can't determine local IP relative to {dest_ip}"))?;
fn my_local_ip(&self) -> anyhow::Result<IpAddr> {
let received_from = self.discover_response.received_from;
let local_ip = get_local_ip_relative_to(received_from)
.with_context(|| format!("can't determine local IP relative to {received_from}"))?;
Ok(local_ip)
}
@ -419,7 +442,7 @@ impl UpnpPortForwarder {
}
}
async fn manage_port(&self, control_url: Url, local_ip: Ipv4Addr, port: u16) -> ! {
async fn manage_port(&self, control_url: Url, local_ip: IpAddr, port: u16) -> ! {
let lease_duration = self.opts.lease_duration;
let mut interval = tokio::time::interval(lease_duration / 2);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
@ -433,7 +456,7 @@ impl UpnpPortForwarder {
}
}
async fn manage_service(&self, control_url: Url, local_ip: Ipv4Addr) -> anyhow::Result<()> {
async fn manage_service(&self, control_url: Url, local_ip: IpAddr) -> anyhow::Result<()> {
futures::future::join_all(self.ports.iter().cloned().map(|port| {
self.manage_port(control_url.clone(), local_ip, port)
.instrument(error_span!("manage_port", port = port))