Announcing port on DHT

This commit is contained in:
Igor Katson 2023-12-05 21:17:37 +00:00
parent 162afe3056
commit 6bb5d01c0f
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 19 additions and 62 deletions

View file

@ -1,6 +1,6 @@
use std::{ use std::{
cmp::Reverse, cmp::Reverse,
net::{SocketAddr, SocketAddrV4}, net::SocketAddr,
str::FromStr, str::FromStr,
sync::{ sync::{
atomic::{AtomicU16, Ordering}, atomic::{AtomicU16, Ordering},
@ -109,7 +109,7 @@ impl RecursiveRequestCallbacks for RecursiveRequestCallbacksGetPeers {
addr: SocketAddr, addr: SocketAddr,
resp: &anyhow::Result<ResponseOrError>, resp: &anyhow::Result<ResponseOrError>,
) { ) {
let announce_addr = match req.dht.announce_addr { let announce_port = match req.dht.announce_port {
Some(a) => a, Some(a) => a,
None => return, None => return,
}; };
@ -132,7 +132,7 @@ impl RecursiveRequestCallbacks for RecursiveRequestCallbacksGetPeers {
let (tid, message) = req.dht.create_request(Request::Announce { let (tid, message) = req.dht.create_request(Request::Announce {
info_hash: req.info_hash, info_hash: req.info_hash,
token: token.clone(), token: token.clone(),
addr: announce_addr, port: announce_port,
}); });
let _ = req.dht.worker_sender.send(WorkerSendRequest { let _ = req.dht.worker_sender.send(WorkerSendRequest {
@ -534,7 +534,7 @@ pub struct DhtState {
worker_sender: UnboundedSender<WorkerSendRequest>, worker_sender: UnboundedSender<WorkerSendRequest>,
pub(crate) peer_store: PeerStore, pub(crate) peer_store: PeerStore,
announce_addr: Option<SocketAddrV4>, announce_port: Option<u16>,
} }
impl DhtState { impl DhtState {
@ -544,7 +544,7 @@ impl DhtState {
routing_table: Option<RoutingTable>, routing_table: Option<RoutingTable>,
listen_addr: SocketAddr, listen_addr: SocketAddr,
peer_store: PeerStore, peer_store: PeerStore,
announce_addr: Option<SocketAddrV4>, announce_port: Option<u16>,
) -> Self { ) -> Self {
let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None)); let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None));
Self { Self {
@ -556,7 +556,7 @@ impl DhtState {
listen_addr, listen_addr,
rate_limiter: make_rate_limiter(), rate_limiter: make_rate_limiter(),
peer_store, peer_store,
announce_addr, announce_port,
} }
} }
@ -628,13 +628,13 @@ impl DhtState {
Request::Announce { Request::Announce {
info_hash, info_hash,
token, token,
addr, port,
} => Message { } => Message {
kind: MessageKind::AnnouncePeer(AnnouncePeer { kind: MessageKind::AnnouncePeer(AnnouncePeer {
id: self.id, id: self.id,
implied_port: 0, implied_port: 0,
info_hash, info_hash,
port: addr.port(), port,
token, token,
}), }),
transaction_id: ByteString::from(transaction_id_buf.as_ref()), transaction_id: ByteString::from(transaction_id_buf.as_ref()),
@ -811,7 +811,7 @@ enum Request {
Announce { Announce {
info_hash: Id20, info_hash: Id20,
token: ByteString, token: ByteString,
addr: SocketAddrV4, port: u16,
}, },
Ping, Ping,
} }
@ -1124,7 +1124,7 @@ pub struct DhtConfig {
pub bootstrap_addrs: Option<Vec<String>>, pub bootstrap_addrs: Option<Vec<String>>,
pub routing_table: Option<RoutingTable>, pub routing_table: Option<RoutingTable>,
pub listen_addr: Option<SocketAddr>, pub listen_addr: Option<SocketAddr>,
pub announce_addr: Option<SocketAddr>, pub announce_port: Option<u16>,
pub peer_store: Option<PeerStore>, pub peer_store: Option<PeerStore>,
} }
@ -1160,13 +1160,7 @@ impl DhtState {
config.routing_table, config.routing_table,
listen_addr, listen_addr,
config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)),
config.announce_addr.and_then(|a| match a { config.announce_port,
SocketAddr::V4(v4) => Some(v4),
SocketAddr::V6(_) => {
warn!("libqrqbit-dht doesn't support announcing IPv6 addresses");
None
}
}),
)); ));
spawn(error_span!("dht"), { spawn(error_span!("dht"), {

View file

@ -20,7 +20,7 @@ use crate::{Dht, DhtConfig, DhtState};
pub struct PersistentDhtConfig { pub struct PersistentDhtConfig {
pub dump_interval: Option<Duration>, pub dump_interval: Option<Duration>,
pub config_filename: Option<PathBuf>, pub config_filename: Option<PathBuf>,
pub announce_addr: Option<SocketAddr>, pub announce_port: Option<u16>,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -118,7 +118,7 @@ impl PersistentDht {
routing_table, routing_table,
listen_addr, listen_addr,
peer_store, peer_store,
announce_addr: config.announce_addr, announce_port: config.announce_port,
..Default::default() ..Default::default()
}; };
let dht = DhtState::with_config(dht_config).await?; let dht = DhtState::with_config(dht_config).await?;

View file

@ -2,7 +2,7 @@ use std::{
borrow::Cow, borrow::Cow,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
io::{BufReader, BufWriter, Read}, io::{BufReader, BufWriter, Read},
net::{Ipv4Addr, SocketAddr, SocketAddrV4}, net::SocketAddr,
path::PathBuf, path::PathBuf,
str::FromStr, str::FromStr,
sync::Arc, sync::Arc,
@ -357,34 +357,6 @@ async fn create_tcp_listener(
bail!("no free TCP ports in range {port_range:?}"); bail!("no free TCP ports in range {port_range:?}");
} }
async fn get_public_announce_addr(port: u16) -> anyhow::Result<SocketAddr> {
async fn get_ipify() -> anyhow::Result<Ipv4Addr> {
#[derive(Deserialize)]
struct Data {
ip: Ipv4Addr,
}
let resp: Data = reqwest::get("https://api.ipify.org?format=json")
.await
.context("error getting public IP address")?
.error_for_status()?
.json()
.await?;
Ok(resp.ip)
}
async fn get_public_ip() -> anyhow::Result<Ipv4Addr> {
get_ipify().await
}
let ip = get_public_ip()
.await
.context("error getting public IP address")?;
let addr = SocketAddr::V4(SocketAddrV4::new(ip, port));
info!("using public IP address {addr} to publish on DHT");
Ok(addr)
}
pub(crate) struct CheckedIncomingConnection { pub(crate) struct CheckedIncomingConnection {
pub addr: SocketAddr, pub addr: SocketAddr,
pub stream: tokio::net::TcpStream, pub stream: tokio::net::TcpStream,
@ -406,7 +378,7 @@ impl Session {
) -> anyhow::Result<Arc<Self>> { ) -> anyhow::Result<Arc<Self>> {
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
let (tcp_listener, port) = if let Some(port_range) = opts.listen_port_range { let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range {
let (l, p) = create_tcp_listener(port_range) let (l, p) = create_tcp_listener(port_range)
.await .await
.context("error listening on TCP")?; .context("error listening on TCP")?;
@ -419,24 +391,15 @@ impl Session {
let dht = if opts.disable_dht { let dht = if opts.disable_dht {
None None
} else { } else {
let announce_addr = if let Some(port) = port {
Some(
get_public_announce_addr(port)
.await
.context("error getting public announce address")?,
)
} else {
None
};
let dht = if opts.disable_dht_persistence { let dht = if opts.disable_dht_persistence {
DhtBuilder::with_config(DhtConfig { DhtBuilder::with_config(DhtConfig {
announce_addr, announce_port: tcp_listen_port,
..Default::default() ..Default::default()
}) })
.await .await
} else { } else {
let mut pdht_config = opts.dht_config.take().unwrap_or_default(); let mut pdht_config = opts.dht_config.take().unwrap_or_default();
pdht_config.announce_addr = announce_addr; pdht_config.announce_port = tcp_listen_port;
PersistentDht::create(Some(pdht_config)).await PersistentDht::create(Some(pdht_config)).await
} }
.context("error initializing DHT")?; .context("error initializing DHT")?;
@ -468,12 +431,12 @@ impl Session {
if let Some(tcp_listener) = tcp_listener { if let Some(tcp_listener) = tcp_listener {
session.spawn( session.spawn(
"tcp listener", "tcp listener",
error_span!("tcp_listen", port = port), error_span!("tcp_listen", port = tcp_listen_port),
session.clone().task_tcp_listener(tcp_listener), session.clone().task_tcp_listener(tcp_listener),
); );
} }
if let Some(listen_port) = port { if let Some(listen_port) = tcp_listen_port {
if opts.enable_upnp_port_forwarding { if opts.enable_upnp_port_forwarding {
session.spawn( session.spawn(
"upnp_forward", "upnp_forward",