From bc243143e52c32eb9f852b57b09b205094c37c12 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 5 Dec 2023 23:31:04 +0000 Subject: [PATCH] Do not announce when listing torrents --- crates/dht/examples/dht.rs | 2 +- crates/dht/src/dht.rs | 23 ++++++++++++++--------- crates/dht/src/persistence.rs | 2 -- crates/librqbit/src/dht_utils.rs | 2 +- crates/librqbit/src/session.rs | 24 ++++++++++++++---------- 5 files changed, 30 insertions(+), 23 deletions(-) diff --git a/crates/dht/examples/dht.rs b/crates/dht/examples/dht.rs index 883ef79..11c289c 100644 --- a/crates/dht/examples/dht.rs +++ b/crates/dht/examples/dht.rs @@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let dht = DhtBuilder::new().await.context("error initializing DHT")?; - let mut stream = dht.get_peers(info_hash)?; + let mut stream = dht.get_peers(info_hash, None)?; let stats_printer = async { loop { diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 6268b8a..b55a54c 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -97,6 +97,7 @@ trait RecursiveRequestCallbacks: Sized + Send + Sync + 'static { struct RecursiveRequestCallbacksGetPeers { // Id20::from_str("00000fffffffffffffffffffffffffffffffffff").unwrap() min_distance_to_announce: Id20, + announce_port: Option, } impl RecursiveRequestCallbacks for RecursiveRequestCallbacksGetPeers { @@ -109,7 +110,7 @@ impl RecursiveRequestCallbacks for RecursiveRequestCallbacksGetPeers { addr: SocketAddr, resp: &anyhow::Result, ) { - let announce_port = match req.dht.announce_port { + let announce_port = match self.announce_port { Some(a) => a, None => return, }; @@ -189,7 +190,7 @@ pub struct RequestPeersStream { } impl RequestPeersStream { - fn new(dht: Arc, info_hash: Id20) -> Self { + fn new(dht: Arc, info_hash: Id20, announce_port: Option) -> Self { let (peer_tx, peer_rx) = unbounded_channel(); let (node_tx, node_rx) = unbounded_channel(); let rp = Arc::new(RecursiveRequest { @@ -206,6 +207,7 @@ impl RequestPeersStream { "0000ffffffffffffffffffffffffffffffffffff", ) .unwrap(), + announce_port, }, }); let join_handle = rp.request_peers_forever(node_rx); @@ -534,7 +536,6 @@ pub struct DhtState { worker_sender: UnboundedSender, pub(crate) peer_store: PeerStore, - announce_port: Option, } impl DhtState { @@ -544,7 +545,6 @@ impl DhtState { routing_table: Option, listen_addr: SocketAddr, peer_store: PeerStore, - announce_port: Option, ) -> Self { let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None)); Self { @@ -556,7 +556,6 @@ impl DhtState { listen_addr, rate_limiter: make_rate_limiter(), peer_store, - announce_port, } } @@ -1124,7 +1123,6 @@ pub struct DhtConfig { pub bootstrap_addrs: Option>, pub routing_table: Option, pub listen_addr: Option, - pub announce_port: Option, pub peer_store: Option, } @@ -1160,7 +1158,6 @@ impl DhtState { config.routing_table, listen_addr, config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), - config.announce_port, )); spawn(error_span!("dht"), { @@ -1174,8 +1171,16 @@ impl DhtState { Ok(state) } - pub fn get_peers(self: &Arc, info_hash: Id20) -> anyhow::Result { - Ok(RequestPeersStream::new(self.clone(), info_hash)) + pub fn get_peers( + self: &Arc, + info_hash: Id20, + announce_port: Option, + ) -> anyhow::Result { + Ok(RequestPeersStream::new( + self.clone(), + info_hash, + announce_port, + )) } pub fn listen_addr(&self) -> SocketAddr { diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index 2cb61b3..2c002bb 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -20,7 +20,6 @@ use crate::{Dht, DhtConfig, DhtState}; pub struct PersistentDhtConfig { pub dump_interval: Option, pub config_filename: Option, - pub announce_port: Option, } #[derive(Serialize, Deserialize)] @@ -118,7 +117,6 @@ impl PersistentDht { routing_table, listen_addr, peer_store, - announce_port: config.announce_port, ..Default::default() }; let dht = DhtState::with_config(dht_config).await?; diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index 455407d..ea1225b 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -108,7 +108,7 @@ mod tests { let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap(); let dht = DhtBuilder::new().await.unwrap(); - let peer_rx = dht.get_peers(info_hash).unwrap(); + let peer_rx = dht.get_peers(info_hash, None).unwrap(); let peer_id = generate_peer_id(); match read_metainfo_from_peer_receiver(peer_id, info_hash, Vec::new(), peer_rx, None).await { diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index fb3f2dc..dcc9bd6 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -159,6 +159,8 @@ pub struct Session { db: RwLock, output_folder: PathBuf, + tcp_listen_port: Option, + cancel_tx: tokio::sync::watch::Sender<()>, cancel_rx: tokio::sync::watch::Receiver<()>, } @@ -392,14 +394,9 @@ impl Session { None } else { let dht = if opts.disable_dht_persistence { - DhtBuilder::with_config(DhtConfig { - announce_port: tcp_listen_port, - ..Default::default() - }) - .await + DhtBuilder::with_config(DhtConfig::default()).await } else { - let mut pdht_config = opts.dht_config.take().unwrap_or_default(); - pdht_config.announce_port = tcp_listen_port; + let pdht_config = opts.dht_config.take().unwrap_or_default(); PersistentDht::create(Some(pdht_config)).await } .context("error initializing DHT")?; @@ -426,6 +423,7 @@ impl Session { db: RwLock::new(Default::default()), cancel_rx, cancel_tx, + tcp_listen_port, }); if let Some(tcp_listener) = tcp_listener { @@ -740,6 +738,12 @@ impl Session { let opts = opts.unwrap_or_default(); + let announce_port = if opts.list_only { + None + } else { + self.tcp_listen_port + }; + let (info_hash, info, dht_rx, trackers, initial_peers) = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { let Magnet { @@ -751,7 +755,7 @@ impl Session { .dht .as_ref() .context("magnet links without DHT are not supported")? - .get_peers(info_hash)?; + .get_peers(info_hash, announce_port)?; let trackers = trackers .into_iter() @@ -814,7 +818,7 @@ impl Session { let dht_rx = match self.dht.as_ref() { Some(dht) if !opts.paused && !opts.list_only => { debug!("reading peers for {:?} from DHT", torrent.info_hash); - Some(dht.get_peers(torrent.info_hash)?) + Some(dht.get_peers(torrent.info_hash, announce_port)?) } _ => None, }; @@ -1047,7 +1051,7 @@ impl Session { let peer_rx = self .dht .as_ref() - .map(|dht| dht.get_peers(handle.info_hash())) + .map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port)) .transpose()?; handle.start(Default::default(), peer_rx, false)?; Ok(())