Remove some args in start() function

This commit is contained in:
Igor Katson 2024-08-21 15:57:18 +01:00
parent 80f4d3b1b2
commit ad7b59ea3c
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 20 additions and 42 deletions

View file

@ -101,7 +101,7 @@ pub struct Session {
peer_id: Id20, peer_id: Id20,
dht: Option<Dht>, dht: Option<Dht>,
persistence: Option<Arc<dyn SessionPersistenceStore>>, persistence: Option<Arc<dyn SessionPersistenceStore>>,
bitv_factory: Arc<dyn BitVFactory>, pub(crate) bitv_factory: Arc<dyn BitVFactory>,
peer_opts: PeerConnectionOptions, peer_opts: PeerConnectionOptions,
spawner: BlockingSpawner, spawner: BlockingSpawner,
next_id: AtomicUsize, next_id: AtomicUsize,
@ -117,9 +117,8 @@ pub struct Session {
default_storage_factory: Option<BoxStorageFactory>, default_storage_factory: Option<BoxStorageFactory>,
reqwest_client: reqwest::Client, reqwest_client: reqwest::Client,
connector: Arc<StreamConnector>, pub(crate) connector: Arc<StreamConnector>,
pub(crate) concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>,
concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>,
root_span: Option<Span>, root_span: Option<Span>,
@ -1186,14 +1185,7 @@ impl Session {
let _ = span.enter(); let _ = span.enter();
managed_torrent managed_torrent
.start( .start(peer_rx, opts.paused)
peer_rx,
opts.paused,
self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(),
self.bitv_factory.clone(),
self.stats.atomic.clone(),
)
.context("error starting torrent")?; .context("error starting torrent")?;
} }
@ -1343,14 +1335,7 @@ impl Session {
self.tcp_listen_port, self.tcp_listen_port,
handle.info().options.force_tracker_interval, handle.info().options.force_tracker_interval,
)?; )?;
handle.start( handle.start(peer_rx, false)?;
peer_rx,
false,
self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(),
self.bitv_factory.clone(),
self.stats.atomic.clone(),
)?;
self.try_update_persistence_metadata(handle).await; self.try_update_persistence_metadata(handle).await;
Ok(()) Ok(())
} }

View file

@ -34,10 +34,8 @@ use tracing::debug;
use tracing::error_span; use tracing::error_span;
use tracing::warn; use tracing::warn;
use crate::bitv_factory::BitVFactory;
use crate::chunk_tracker::ChunkTracker; use crate::chunk_tracker::ChunkTracker;
use crate::session::TorrentId; use crate::session::TorrentId;
use crate::session_stats::atomic::AtomicSessionStats;
use crate::spawn_utils::BlockingSpawner; use crate::spawn_utils::BlockingSpawner;
use crate::storage::BoxStorageFactory; use crate::storage::BoxStorageFactory;
use crate::stream_connect::StreamConnector; use crate::stream_connect::StreamConnector;
@ -210,11 +208,11 @@ impl ManagedTorrent {
self: &Arc<Self>, self: &Arc<Self>,
peer_rx: Option<PeerStream>, peer_rx: Option<PeerStream>,
start_paused: bool, start_paused: bool,
live_cancellation_token: CancellationToken,
init_semaphore: Arc<tokio::sync::Semaphore>,
bitv_factory: Arc<dyn BitVFactory>,
session_stats: Arc<AtomicSessionStats>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let session = self
.session
.upgrade()
.context("session is dead, cannot start torrent")?;
let mut g = self.locked.write(); let mut g = self.locked.write();
let spawn_fatal_errors_receiver = let spawn_fatal_errors_receiver =
@ -295,18 +293,20 @@ impl ManagedTorrent {
drop(g); drop(g);
let t = self.clone(); let t = self.clone();
let span = self.info().span.clone(); let span = self.info().span.clone();
let token = live_cancellation_token.clone(); let token = session.cancellation_token().child_token().clone();
spawn_with_cancel( spawn_with_cancel(
error_span!(parent: span.clone(), "initialize_and_start"), error_span!(parent: span.clone(), "initialize_and_start"),
token.clone(), token.clone(),
async move { async move {
let _permit = init_semaphore let concurrent_init_semaphore =
session.concurrent_initialize_semaphore.clone();
let _permit = concurrent_init_semaphore
.acquire() .acquire()
.await .await
.context("bug: concurrent init semaphore was closed")?; .context("bug: concurrent init semaphore was closed")?;
match init.check(bitv_factory).await { match init.check(session.bitv_factory.clone()).await {
Ok(paused) => { Ok(paused) => {
let mut g = t.locked.write(); let mut g = t.locked.write();
if let ManagedTorrentState::Initializing(_) = &g.state { if let ManagedTorrentState::Initializing(_) = &g.state {
@ -325,8 +325,8 @@ impl ManagedTorrent {
let live = TorrentStateLive::new( let live = TorrentStateLive::new(
paused, paused,
tx, tx,
live_cancellation_token, session.cancellation_token().child_token(),
session_stats, session.stats.atomic.clone(),
)?; )?;
g.state = ManagedTorrentState::Live(live.clone()); g.state = ManagedTorrentState::Live(live.clone());
drop(g); drop(g);
@ -355,13 +355,13 @@ impl ManagedTorrent {
let live = TorrentStateLive::new( let live = TorrentStateLive::new(
paused, paused,
tx, tx,
live_cancellation_token.clone(), session.cancellation_token().child_token().clone(),
session_stats, session.stats.atomic.clone(),
)?; )?;
g.state = ManagedTorrentState::Live(live.clone()); g.state = ManagedTorrentState::Live(live.clone());
drop(g); drop(g);
spawn_fatal_errors_receiver(self, rx, live_cancellation_token); spawn_fatal_errors_receiver(self, rx, session.cancellation_token().child_token());
spawn_peer_adder(&live, peer_rx); spawn_peer_adder(&live, peer_rx);
Ok(()) Ok(())
} }
@ -377,14 +377,7 @@ impl ManagedTorrent {
self.state_change_notify.notify_waiters(); self.state_change_notify.notify_waiters();
// Recurse. // Recurse.
self.start( self.start(peer_rx, start_paused)
peer_rx,
start_paused,
live_cancellation_token,
init_semaphore,
bitv_factory,
session_stats,
)
} }
ManagedTorrentState::None => bail!("bug: torrent is in empty state"), ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
} }