Remove custom libc/winapi code in favor of duplicating the socket and using both socket2 and tokio
This commit is contained in:
parent
403a4ce480
commit
f7e0835452
3 changed files with 24 additions and 101 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
|
@ -2692,7 +2692,6 @@ dependencies = [
|
||||||
"gethostname",
|
"gethostname",
|
||||||
"http 1.1.0",
|
"http 1.1.0",
|
||||||
"httparse",
|
"httparse",
|
||||||
"libc",
|
|
||||||
"librqbit-core",
|
"librqbit-core",
|
||||||
"librqbit-sha1-wrapper",
|
"librqbit-sha1-wrapper",
|
||||||
"librqbit-upnp",
|
"librqbit-upnp",
|
||||||
|
|
@ -2710,7 +2709,6 @@ dependencies = [
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
"url",
|
"url",
|
||||||
"uuid",
|
"uuid",
|
||||||
"winapi",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
||||||
|
|
@ -39,10 +39,6 @@ socket2 = "0.5.7"
|
||||||
quick-xml = { version = "0.36.1", features = ["serialize"] }
|
quick-xml = { version = "0.36.1", features = ["serialize"] }
|
||||||
network-interface = "2.0.0"
|
network-interface = "2.0.0"
|
||||||
futures = "0.3.30"
|
futures = "0.3.30"
|
||||||
libc = "0.2.158"
|
|
||||||
|
|
||||||
[target.'cfg(windows)'.dependencies]
|
|
||||||
winapi = { version = "0.3.9", features = ["winsock2"] }
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tracing-subscriber = "0.3.18"
|
tracing-subscriber = "0.3.18"
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,6 @@ use anyhow::{bail, Context};
|
||||||
use bstr::BStr;
|
use bstr::BStr;
|
||||||
use network_interface::NetworkInterfaceConfig;
|
use network_interface::NetworkInterfaceConfig;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use tokio::net::UdpSocket;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::{debug, trace, warn};
|
||||||
|
|
||||||
|
|
@ -109,13 +108,18 @@ pub struct SsdpRunnerOptions {
|
||||||
pub shutdown: CancellationToken,
|
pub shutdown: CancellationToken,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct UdpSocket {
|
||||||
|
sock2: socket2::Socket,
|
||||||
|
tokio: tokio::net::UdpSocket,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct SsdpRunner {
|
pub struct SsdpRunner {
|
||||||
opts: SsdpRunnerOptions,
|
opts: SsdpRunnerOptions,
|
||||||
socket_v4: Option<UdpSocket>,
|
socket_v4: Option<UdpSocket>,
|
||||||
socket_v6: Option<UdpSocket>,
|
socket_v6: Option<UdpSocket>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn socket_presetup(bind_addr: SocketAddr) -> anyhow::Result<tokio::net::UdpSocket> {
|
fn socket_presetup(bind_addr: SocketAddr) -> anyhow::Result<UdpSocket> {
|
||||||
let domain = if bind_addr.is_ipv4() {
|
let domain = if bind_addr.is_ipv4() {
|
||||||
socket2::Domain::IPV4
|
socket2::Domain::IPV4
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -136,10 +140,16 @@ fn socket_presetup(bind_addr: SocketAddr) -> anyhow::Result<tokio::net::UdpSocke
|
||||||
.context("error binding")?;
|
.context("error binding")?;
|
||||||
|
|
||||||
sock.set_nonblocking(true)?;
|
sock.set_nonblocking(true)?;
|
||||||
let socket = tokio::net::UdpSocket::from_std(sock.into())
|
|
||||||
|
let sock_clone = sock.try_clone().context("can't clone socket")?;
|
||||||
|
|
||||||
|
let tokio_socket = tokio::net::UdpSocket::from_std(sock_clone.into())
|
||||||
.context("error converting socket2 socket to tokio")?;
|
.context("error converting socket2 socket to tokio")?;
|
||||||
|
|
||||||
Ok(socket)
|
Ok(UdpSocket {
|
||||||
|
sock2: sock,
|
||||||
|
tokio: tokio_socket,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn bind_v4_socket() -> anyhow::Result<UdpSocket> {
|
async fn bind_v4_socket() -> anyhow::Result<UdpSocket> {
|
||||||
|
|
@ -163,7 +173,7 @@ async fn bind_v4_socket() -> anyhow::Result<UdpSocket> {
|
||||||
|
|
||||||
for ifaddr in default_multiast_membership_ip.chain(all_multicast_membership_ips) {
|
for ifaddr in default_multiast_membership_ip.chain(all_multicast_membership_ips) {
|
||||||
trace!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "joining multicast v4 group");
|
trace!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "joining multicast v4 group");
|
||||||
if let Err(e) = socket.join_multicast_v4(SSDM_MCAST_IPV4, ifaddr) {
|
if let Err(e) = socket.tokio.join_multicast_v4(SSDM_MCAST_IPV4, ifaddr) {
|
||||||
debug!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "error joining multicast v4 group: {e:#}");
|
debug!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "error joining multicast v4 group: {e:#}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -203,7 +213,7 @@ async fn bind_v6_socket() -> anyhow::Result<UdpSocket> {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
trace!(multiaddr=?multiaddr, interface=?nic.index, "joining multicast v6 group");
|
trace!(multiaddr=?multiaddr, interface=?nic.index, "joining multicast v6 group");
|
||||||
if let Err(e) = socket.join_multicast_v6(&multiaddr, nic.index) {
|
if let Err(e) = socket.tokio.join_multicast_v6(&multiaddr, nic.index) {
|
||||||
debug!(multiaddr=?multiaddr, interface=?nic.index, "error joining multicast v6 group: {e:#}");
|
debug!(multiaddr=?multiaddr, interface=?nic.index, "error joining multicast v6 group: {e:#}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -219,88 +229,6 @@ struct MulticastOpts {
|
||||||
mcast_addr: SocketAddr,
|
mcast_addr: SocketAddr,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_mcast_if_v4(sock: &UdpSocket, local_ip: Ipv4Addr) -> anyhow::Result<()> {
|
|
||||||
// in_addr is the same on unix and windows and contains just the 4 bytes of IPv4 in network
|
|
||||||
// byte order.
|
|
||||||
let addr = u32::from_ne_bytes(local_ip.octets());
|
|
||||||
let sz: usize = std::mem::size_of_val(&addr);
|
|
||||||
|
|
||||||
trace!(addr = %local_ip, "setting IP_MULTICAST_IF");
|
|
||||||
|
|
||||||
let ret: i32;
|
|
||||||
#[cfg(target_os = "windows")]
|
|
||||||
{
|
|
||||||
use std::os::windows::io::AsRawSocket;
|
|
||||||
ret = unsafe {
|
|
||||||
winapi::um::winsock2::setsockopt(
|
|
||||||
sock.as_raw_socket().try_into()?,
|
|
||||||
winapi::shared::ws2def::IPPROTO_IP,
|
|
||||||
winapi::shared::ws2ipdef::IP_MULTICAST_IF,
|
|
||||||
&addr as *const _ as _,
|
|
||||||
sz.try_into()?,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
}
|
|
||||||
#[cfg(not(target_os = "windows"))]
|
|
||||||
{
|
|
||||||
use std::os::fd::{AsFd, AsRawFd};
|
|
||||||
ret = unsafe {
|
|
||||||
libc::setsockopt(
|
|
||||||
sock.as_fd().as_raw_fd(),
|
|
||||||
libc::IPPROTO_IP,
|
|
||||||
libc::IP_MULTICAST_IF,
|
|
||||||
&addr as *const _ as _,
|
|
||||||
sz.try_into()?,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
}
|
|
||||||
if ret < 0 {
|
|
||||||
return Err(std::io::Error::last_os_error().into());
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_mcast_if_v6(sock: &UdpSocket, dev_idx: u32) -> anyhow::Result<()> {
|
|
||||||
// in_addr is the same on unix and windows and contains just the 4 bytes of IPv4 in network
|
|
||||||
// byte order.
|
|
||||||
trace!(dev_idx, "setting IP_MULTICAST_IF");
|
|
||||||
|
|
||||||
let ret: i32;
|
|
||||||
#[cfg(target_os = "windows")]
|
|
||||||
{
|
|
||||||
use std::os::windows::io::AsRawSocket;
|
|
||||||
let sz: usize = std::mem::size_of_val(&dev_idx);
|
|
||||||
ret = unsafe {
|
|
||||||
winapi::um::winsock2::setsockopt(
|
|
||||||
sock.as_raw_socket().try_into()?,
|
|
||||||
winapi::shared::ws2def::IPPROTO_IPV6,
|
|
||||||
winapi::shared::ws2ipdef::IPV6_MULTICAST_IF,
|
|
||||||
&dev_idx as *const _ as _,
|
|
||||||
sz.try_into()?,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
}
|
|
||||||
#[cfg(not(target_os = "windows"))]
|
|
||||||
{
|
|
||||||
use std::os::fd::{AsFd, AsRawFd};
|
|
||||||
let dev_idx = dev_idx as i32;
|
|
||||||
let sz: usize = std::mem::size_of_val(&dev_idx);
|
|
||||||
ret = unsafe {
|
|
||||||
libc::setsockopt(
|
|
||||||
sock.as_fd().as_raw_fd(),
|
|
||||||
libc::IPPROTO_IPV6,
|
|
||||||
libc::IPV6_MULTICAST_IF,
|
|
||||||
&dev_idx as *const _ as _,
|
|
||||||
sz.try_into()?,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
}
|
|
||||||
if ret < 0 {
|
|
||||||
return Err(std::io::Error::last_os_error().into());
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MulticastOpts {
|
impl MulticastOpts {
|
||||||
fn addr_no_scope(&self) -> SocketAddr {
|
fn addr_no_scope(&self) -> SocketAddr {
|
||||||
let mut addr = self.mcast_addr;
|
let mut addr = self.mcast_addr;
|
||||||
|
|
@ -443,14 +371,14 @@ Content-Length: 0\r\n\r\n"
|
||||||
// gets sent out of the interface we want (otherwise it'll get sent through
|
// gets sent out of the interface we want (otherwise it'll get sent through
|
||||||
// default one).
|
// default one).
|
||||||
(IpAddr::V4(ip), Some(sock_v4), _) => {
|
(IpAddr::V4(ip), Some(sock_v4), _) => {
|
||||||
if let Err(e) = set_mcast_if_v4(sock_v4, ip) {
|
if let Err(e) = sock_v4.sock2.set_multicast_if_v4(&ip) {
|
||||||
debug!(addr=%ip, "error calling set_mcast_if_v4: {e:#}");
|
debug!(addr=%ip, "error calling set_multicast_if_v4: {e:#}");
|
||||||
}
|
}
|
||||||
sock_v4
|
sock_v4
|
||||||
}
|
}
|
||||||
(IpAddr::V6(_), _, Some(sock_v6)) => {
|
(IpAddr::V6(_), _, Some(sock_v6)) => {
|
||||||
if let Err(e) = set_mcast_if_v6(sock_v6, opts.interface_id) {
|
if let Err(e) = sock_v6.sock2.set_multicast_if_v6(opts.interface_id) {
|
||||||
debug!(oif_id=opts.interface_id, "error calling set_mcast_if_v6: {e:#}");
|
debug!(oif_id=opts.interface_id, "error calling set_multicast_if_v6: {e:#}");
|
||||||
}
|
}
|
||||||
sock_v6
|
sock_v6
|
||||||
},
|
},
|
||||||
|
|
@ -460,7 +388,7 @@ Content-Length: 0\r\n\r\n"
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
match sock.send_to(payload.as_slice(), opts.mcast_addr).await {
|
match sock.tokio.send_to(payload.as_slice(), opts.mcast_addr).await {
|
||||||
Ok(sz) => trace!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, size=sz, payload=?payload, "sent"),
|
Ok(sz) => trace!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, size=sz, payload=?payload, "sent"),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, payload=?payload, "error sending: {e:#}")
|
debug!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, payload=?payload, "error sending: {e:#}")
|
||||||
|
|
@ -515,7 +443,8 @@ Content-Length: 0\r\n\r\n"
|
||||||
if let Ok(st) = std::str::from_utf8(msg.st) {
|
if let Ok(st) = std::str::from_utf8(msg.st) {
|
||||||
let response = self.generate_ssdp_discover_response(st, addr)?;
|
let response = self.generate_ssdp_discover_response(st, addr)?;
|
||||||
trace!(content = response, ?addr, "sending SSDP discover response");
|
trace!(content = response, ?addr, "sending SSDP discover response");
|
||||||
sock.send_to(response.as_bytes(), addr)
|
sock.tokio
|
||||||
|
.send_to(response.as_bytes(), addr)
|
||||||
.await
|
.await
|
||||||
.context("error sending")?;
|
.context("error sending")?;
|
||||||
}
|
}
|
||||||
|
|
@ -531,7 +460,7 @@ Content-Length: 0\r\n\r\n"
|
||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (sz, addr) = match sock.recv_from(&mut buf).await {
|
let (sz, addr) = match sock.tokio.recv_from(&mut buf).await {
|
||||||
Ok((sz, addr)) => (sz, addr),
|
Ok((sz, addr)) => (sz, addr),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(error=?e, "error receving");
|
warn!(error=?e, "error receving");
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue