From e22132bba06a52649f5c73ce547505184eff0622 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 5 Dec 2024 21:40:40 +0000 Subject: [PATCH] "make_peer_rx" - include initial peers --- crates/librqbit/src/session.rs | 59 +++++++++++++++++----------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index ca9b923..a1104d5 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -480,7 +480,7 @@ struct InternalAddResult { info_bytes: Bytes, trackers: Vec, peer_rx: Option, - initial_peers: Vec, + seen_peers: Vec, } impl Session { @@ -889,8 +889,6 @@ impl Session { let paused = opts.list_only || opts.paused; - let announce_port = if paused { None } else { self.tcp_listen_port }; - // The main difference between magnet link and torrent file, is that we need to resolve the magnet link // into a torrent file by connecting to peers that support extended handshakes. // So we must discover at least one peer and connect to it to be able to proceed further. @@ -920,19 +918,10 @@ impl Session { } trackers }, - announce_port, + !paused, opts.force_tracker_interval, - )?; - let initial_peers_stream = opts - .initial_peers - .clone() - .and_then(|v| if v.is_empty() { None } else { Some(v) }) - .map(futures::stream::iter); - let peer_rx = merge_two_optional_streams(peer_rx, initial_peers_stream); - let peer_rx = match peer_rx { - Some(peer_rx) => peer_rx, - None => bail!("can't find peers: DHT is disabled, no trackers in magnet, and no initial peers provided"), - }; + opts.initial_peers.clone().unwrap_or_default() + )?.context("can't find peers: DHT is disabled, no trackers in magnet, and no initial peers provided")?; debug!(?info_hash, "querying DHT"); match read_metainfo_from_peer_receiver( @@ -966,7 +955,7 @@ impl Session { info, trackers, peer_rx: Some(rx), - initial_peers: { + seen_peers: { let seen = seen.into_iter().collect_vec(); for peer in &seen { trace!(?peer, "seen") @@ -1023,8 +1012,9 @@ impl Session { } else { trackers.clone() }, - announce_port, + !paused, opts.force_tracker_interval, + opts.initial_peers.clone().unwrap_or_default() )? }; @@ -1035,7 +1025,7 @@ impl Session { info_bytes: torrent.info_bytes, trackers, peer_rx, - initial_peers: opts + seen_peers: opts .initial_peers .clone() .unwrap_or_default() @@ -1088,7 +1078,7 @@ impl Session { info_hash, trackers, peer_rx, - initial_peers, + seen_peers, torrent_bytes, info_bytes, } = add_res; @@ -1126,7 +1116,7 @@ impl Session { info, only_files, output_folder, - seen_peers: initial_peers, + seen_peers, torrent_bytes, })); } @@ -1225,12 +1215,12 @@ impl Session { // Merge "initial_peers" and "peer_rx" into one stream. let peer_rx = merge_two_optional_streams( - if !initial_peers.is_empty() { + if !seen_peers.is_empty() { debug!( - count = initial_peers.len(), + count = seen_peers.len(), "merging initial peers into peer_rx" ); - Some(futures::stream::iter(initial_peers.into_iter())) + Some(futures::stream::iter(seen_peers.into_iter())) } else { None }, @@ -1344,31 +1334,39 @@ impl Session { self: &Arc, info_hash: Id20, trackers: Vec, - announce_port: Option, + announce: bool, force_tracker_interval: Option, + initial_peers: Vec, ) -> anyhow::Result> { - let announce_port = announce_port.or(self.tcp_listen_port); + let announce_port = if announce { self.tcp_listen_port } else { None }; let dht_rx = self .dht .as_ref() .map(|dht| dht.get_peers(info_hash, announce_port)) .transpose()?; - let peer_rx_stats = PeerRxTorrentInfo { + let tracker_rx_stats = PeerRxTorrentInfo { info_hash, session: self.clone(), }; - let peer_rx = TrackerComms::start( + let tracker_rx = TrackerComms::start( info_hash, self.peer_id, trackers, - Box::new(peer_rx_stats), + Box::new(tracker_rx_stats), force_tracker_interval, announce_port, self.reqwest_client.clone(), ); - Ok(merge_two_optional_streams(dht_rx, peer_rx)) + let initial_peers_rx = if initial_peers.is_empty() { + None + } else { + Some(futures::stream::iter(initial_peers)) + }; + let peer_rx = merge_two_optional_streams(dht_rx, tracker_rx); + let peer_rx = merge_two_optional_streams(peer_rx, initial_peers_rx); + Ok(peer_rx) } async fn try_update_persistence_metadata(&self, handle: &ManagedTorrentHandle) { @@ -1391,8 +1389,9 @@ impl Session { let peer_rx = self.make_peer_rx( handle.info_hash(), handle.shared().trackers.clone().into_iter().collect(), - self.tcp_listen_port, + true, handle.shared().options.force_tracker_interval, + Default::default(), )?; handle.start(peer_rx, false)?; self.try_update_persistence_metadata(handle).await;