diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 98ef416..bcfb120 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -941,7 +941,6 @@ impl Session { Ok::<_, anyhow::Error>(Some(PathBuf::from(longest))) } - #[allow(clippy::too_many_arguments)] async fn main_torrent_info( &self, info_hash: Id20, @@ -1017,17 +1016,26 @@ impl Session { (managed_torrent, id) }; + // Merge "initial_peers" and "peer_rx" into one stream. + let peer_rx: Option = if !initial_peers.is_empty() || peer_rx.is_some() { + use futures::future::Either; + Some(Box::new( + futures::stream::iter(initial_peers).chain( + peer_rx + .map(Either::Left) + .unwrap_or(Either::Right(futures::stream::empty())), + ), + )) + } else { + peer_rx + }; + { let span = managed_torrent.info.span.clone(); let _ = span.enter(); managed_torrent - .start( - initial_peers, - peer_rx, - opts.paused, - self.cancellation_token.child_token(), - ) + .start(peer_rx, opts.paused, self.cancellation_token.child_token()) .context("error starting torrent")?; } @@ -1119,12 +1127,7 @@ impl Session { self.tcp_listen_port, handle.info().options.force_tracker_interval, )?; - handle.start( - Default::default(), - peer_rx, - false, - self.cancellation_token.child_token(), - )?; + handle.start(peer_rx, false, self.cancellation_token.child_token())?; Ok(()) } } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 8678abd..4a51778 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -5,7 +5,6 @@ pub mod stats; pub mod utils; use std::collections::HashSet; -use std::net::SocketAddr; use std::path::Path; use std::path::PathBuf; use std::sync::atomic::Ordering; @@ -31,7 +30,6 @@ use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use tracing::debug; use tracing::error_span; -use tracing::trace; use tracing::warn; use crate::chunk_tracker::ChunkTracker; @@ -172,7 +170,6 @@ impl ManagedTorrent { pub(crate) fn start( self: &Arc, - initial_peers: Vec, peer_rx: Option, start_paused: bool, live_cancellation_token: CancellationToken, @@ -203,21 +200,12 @@ impl ManagedTorrent { ); }; - fn spawn_peer_adder( - live: &Arc, - initial_peers: Vec, - peer_rx: Option, - ) { + fn spawn_peer_adder(live: &Arc, peer_rx: Option) { live.spawn( error_span!(parent: live.meta().span.clone(), "external_peer_adder"), { let live = live.clone(); async move { - trace!("adding {} initial peers", initial_peers.len()); - for peer in initial_peers { - live.add_peer_if_not_seen(peer).context("torrent closed")?; - } - let live = { let weak = Arc::downgrade(&live); drop(live); @@ -284,7 +272,7 @@ impl ManagedTorrent { g.state = ManagedTorrentState::Live(live.clone()); spawn_fatal_errors_receiver(&t, rx, token); - spawn_peer_adder(&live, initial_peers, peer_rx); + spawn_peer_adder(&live, peer_rx); Ok(()) } @@ -304,7 +292,7 @@ impl ManagedTorrent { let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone()); g.state = ManagedTorrentState::Live(live.clone()); spawn_fatal_errors_receiver(self, rx, live_cancellation_token); - spawn_peer_adder(&live, initial_peers, peer_rx); + spawn_peer_adder(&live, peer_rx); Ok(()) } ManagedTorrentState::Error(_) => { @@ -316,12 +304,7 @@ impl ManagedTorrent { drop(g); // Recurse. - self.start( - initial_peers, - peer_rx, - start_paused, - live_cancellation_token, - ) + self.start(peer_rx, start_paused, live_cancellation_token) } ManagedTorrentState::None => bail!("bug: torrent is in empty state"), }