"make_peer_rx" - include initial peers

This commit is contained in:
Igor Katson 2024-12-05 21:40:40 +00:00
parent fce467e005
commit e22132bba0
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -480,7 +480,7 @@ struct InternalAddResult {
info_bytes: Bytes,
trackers: Vec<String>,
peer_rx: Option<PeerStream>,
initial_peers: Vec<SocketAddr>,
seen_peers: Vec<SocketAddr>,
}
impl Session {
@ -889,8 +889,6 @@ impl Session {
let paused = opts.list_only || opts.paused;
let announce_port = if paused { None } else { self.tcp_listen_port };
// The main difference between magnet link and torrent file, is that we need to resolve the magnet link
// into a torrent file by connecting to peers that support extended handshakes.
// So we must discover at least one peer and connect to it to be able to proceed further.
@ -920,19 +918,10 @@ impl Session {
}
trackers
},
announce_port,
!paused,
opts.force_tracker_interval,
)?;
let initial_peers_stream = opts
.initial_peers
.clone()
.and_then(|v| if v.is_empty() { None } else { Some(v) })
.map(futures::stream::iter);
let peer_rx = merge_two_optional_streams(peer_rx, initial_peers_stream);
let peer_rx = match peer_rx {
Some(peer_rx) => peer_rx,
None => bail!("can't find peers: DHT is disabled, no trackers in magnet, and no initial peers provided"),
};
opts.initial_peers.clone().unwrap_or_default()
)?.context("can't find peers: DHT is disabled, no trackers in magnet, and no initial peers provided")?;
debug!(?info_hash, "querying DHT");
match read_metainfo_from_peer_receiver(
@ -966,7 +955,7 @@ impl Session {
info,
trackers,
peer_rx: Some(rx),
initial_peers: {
seen_peers: {
let seen = seen.into_iter().collect_vec();
for peer in &seen {
trace!(?peer, "seen")
@ -1023,8 +1012,9 @@ impl Session {
} else {
trackers.clone()
},
announce_port,
!paused,
opts.force_tracker_interval,
opts.initial_peers.clone().unwrap_or_default()
)?
};
@ -1035,7 +1025,7 @@ impl Session {
info_bytes: torrent.info_bytes,
trackers,
peer_rx,
initial_peers: opts
seen_peers: opts
.initial_peers
.clone()
.unwrap_or_default()
@ -1088,7 +1078,7 @@ impl Session {
info_hash,
trackers,
peer_rx,
initial_peers,
seen_peers,
torrent_bytes,
info_bytes,
} = add_res;
@ -1126,7 +1116,7 @@ impl Session {
info,
only_files,
output_folder,
seen_peers: initial_peers,
seen_peers,
torrent_bytes,
}));
}
@ -1225,12 +1215,12 @@ impl Session {
// Merge "initial_peers" and "peer_rx" into one stream.
let peer_rx = merge_two_optional_streams(
if !initial_peers.is_empty() {
if !seen_peers.is_empty() {
debug!(
count = initial_peers.len(),
count = seen_peers.len(),
"merging initial peers into peer_rx"
);
Some(futures::stream::iter(initial_peers.into_iter()))
Some(futures::stream::iter(seen_peers.into_iter()))
} else {
None
},
@ -1344,31 +1334,39 @@ impl Session {
self: &Arc<Self>,
info_hash: Id20,
trackers: Vec<String>,
announce_port: Option<u16>,
announce: bool,
force_tracker_interval: Option<Duration>,
initial_peers: Vec<SocketAddr>,
) -> anyhow::Result<Option<PeerStream>> {
let announce_port = announce_port.or(self.tcp_listen_port);
let announce_port = if announce { self.tcp_listen_port } else { None };
let dht_rx = self
.dht
.as_ref()
.map(|dht| dht.get_peers(info_hash, announce_port))
.transpose()?;
let peer_rx_stats = PeerRxTorrentInfo {
let tracker_rx_stats = PeerRxTorrentInfo {
info_hash,
session: self.clone(),
};
let peer_rx = TrackerComms::start(
let tracker_rx = TrackerComms::start(
info_hash,
self.peer_id,
trackers,
Box::new(peer_rx_stats),
Box::new(tracker_rx_stats),
force_tracker_interval,
announce_port,
self.reqwest_client.clone(),
);
Ok(merge_two_optional_streams(dht_rx, peer_rx))
let initial_peers_rx = if initial_peers.is_empty() {
None
} else {
Some(futures::stream::iter(initial_peers))
};
let peer_rx = merge_two_optional_streams(dht_rx, tracker_rx);
let peer_rx = merge_two_optional_streams(peer_rx, initial_peers_rx);
Ok(peer_rx)
}
async fn try_update_persistence_metadata(&self, handle: &ManagedTorrentHandle) {
@ -1391,8 +1389,9 @@ impl Session {
let peer_rx = self.make_peer_rx(
handle.info_hash(),
handle.shared().trackers.clone().into_iter().collect(),
self.tcp_listen_port,
true,
handle.shared().options.force_tracker_interval,
Default::default(),
)?;
handle.start(peer_rx, false)?;
self.try_update_persistence_metadata(handle).await;