Tiny refactor: simplify a few methods by removing initial_peers propagated too deep down

This commit is contained in:
Igor Katson 2024-03-02 10:57:32 +00:00
parent 8f711ed2e7
commit 3a1d0c3ac9
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 20 additions and 34 deletions

View file

@ -941,7 +941,6 @@ impl Session {
Ok::<_, anyhow::Error>(Some(PathBuf::from(longest)))
}
#[allow(clippy::too_many_arguments)]
async fn main_torrent_info(
&self,
info_hash: Id20,
@ -1017,17 +1016,26 @@ impl Session {
(managed_torrent, id)
};
// Merge "initial_peers" and "peer_rx" into one stream.
let peer_rx: Option<PeerStream> = if !initial_peers.is_empty() || peer_rx.is_some() {
use futures::future::Either;
Some(Box::new(
futures::stream::iter(initial_peers).chain(
peer_rx
.map(Either::Left)
.unwrap_or(Either::Right(futures::stream::empty())),
),
))
} else {
peer_rx
};
{
let span = managed_torrent.info.span.clone();
let _ = span.enter();
managed_torrent
.start(
initial_peers,
peer_rx,
opts.paused,
self.cancellation_token.child_token(),
)
.start(peer_rx, opts.paused, self.cancellation_token.child_token())
.context("error starting torrent")?;
}
@ -1119,12 +1127,7 @@ impl Session {
self.tcp_listen_port,
handle.info().options.force_tracker_interval,
)?;
handle.start(
Default::default(),
peer_rx,
false,
self.cancellation_token.child_token(),
)?;
handle.start(peer_rx, false, self.cancellation_token.child_token())?;
Ok(())
}
}

View file

@ -5,7 +5,6 @@ pub mod stats;
pub mod utils;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
@ -31,7 +30,6 @@ use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::error_span;
use tracing::trace;
use tracing::warn;
use crate::chunk_tracker::ChunkTracker;
@ -172,7 +170,6 @@ impl ManagedTorrent {
pub(crate) fn start(
self: &Arc<Self>,
initial_peers: Vec<SocketAddr>,
peer_rx: Option<PeerStream>,
start_paused: bool,
live_cancellation_token: CancellationToken,
@ -203,21 +200,12 @@ impl ManagedTorrent {
);
};
fn spawn_peer_adder(
live: &Arc<TorrentStateLive>,
initial_peers: Vec<SocketAddr>,
peer_rx: Option<PeerStream>,
) {
fn spawn_peer_adder(live: &Arc<TorrentStateLive>, peer_rx: Option<PeerStream>) {
live.spawn(
error_span!(parent: live.meta().span.clone(), "external_peer_adder"),
{
let live = live.clone();
async move {
trace!("adding {} initial peers", initial_peers.len());
for peer in initial_peers {
live.add_peer_if_not_seen(peer).context("torrent closed")?;
}
let live = {
let weak = Arc::downgrade(&live);
drop(live);
@ -284,7 +272,7 @@ impl ManagedTorrent {
g.state = ManagedTorrentState::Live(live.clone());
spawn_fatal_errors_receiver(&t, rx, token);
spawn_peer_adder(&live, initial_peers, peer_rx);
spawn_peer_adder(&live, peer_rx);
Ok(())
}
@ -304,7 +292,7 @@ impl ManagedTorrent {
let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone());
g.state = ManagedTorrentState::Live(live.clone());
spawn_fatal_errors_receiver(self, rx, live_cancellation_token);
spawn_peer_adder(&live, initial_peers, peer_rx);
spawn_peer_adder(&live, peer_rx);
Ok(())
}
ManagedTorrentState::Error(_) => {
@ -316,12 +304,7 @@ impl ManagedTorrent {
drop(g);
// Recurse.
self.start(
initial_peers,
peer_rx,
start_paused,
live_cancellation_token,
)
self.start(peer_rx, start_paused, live_cancellation_token)
}
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
}