diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ac9f295..faa2955 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -274,7 +274,6 @@ impl TorrentStateLive { tokio::select! { r = fut => r, _ = cancel_rx.changed() => { - error!("canceled"); bail!("canceled") } } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index e2637e1..793e8f3 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -7,6 +7,7 @@ use std::net::SocketAddr; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; +use std::sync::Weak; use std::time::Duration; use anyhow::bail; @@ -21,6 +22,7 @@ pub use live::*; use parking_lot::RwLock; use tokio_stream::StreamExt; +use tracing::error; use tracing::error_span; use url::Url; @@ -110,7 +112,10 @@ impl ManagedTorrent { let g = self.locked.read(); match &g.state { ManagedTorrentState::Paused(p) => Ok(f(&p.chunk_tracker)), - ManagedTorrentState::Live(l) => Ok(f(l.lock_read("chunk_tracker").get_chunks()?)), + ManagedTorrentState::Live(l) => Ok(f(l + .lock_read("chunk_tracker") + .get_chunks() + .context("error getting chunks")?)), _ => bail!("no chunk tracker, torrent neither paused nor live"), } } @@ -129,6 +134,31 @@ impl ManagedTorrent { peer_rx: Option + Unpin + Send + Sync + 'static>, ) -> anyhow::Result<()> { let mut g = self.locked.write(); + + let peer_adder = |live: Weak| async move { + { + let live: Arc = live.upgrade().context("no longer live")?; + for peer in initial_peers { + live.add_peer_if_not_seen(peer).context("torrent closed")?; + } + } + + if let Some(mut peer_rx) = peer_rx { + while let Some(peer) = peer_rx.next().await { + live.upgrade() + .context("no longer live")? + .add_peer_if_not_seen(peer) + .context("torrent closed")?; + } + } else { + error!("peer rx is not set"); + } + + Ok(()) + }; + + let span = self.info.span.clone(); + match &g.state { ManagedTorrentState::Live(_) => { bail!("torrent is already live"); @@ -136,7 +166,6 @@ impl ManagedTorrent { ManagedTorrentState::Initializing(init) => { let init = init.clone(); let t = self.clone(); - let span = self.info.span.clone(); spawn( error_span!(parent: span.clone(), "initialize_and_start"), async move { @@ -145,30 +174,9 @@ impl ManagedTorrent { let live = TorrentStateLive::new(paused); t.locked.write().state = ManagedTorrentState::Live(live.clone()); - let live = Arc::downgrade(&live); spawn( error_span!(parent: span.clone(), "external_peer_adder"), - async move { - { - let live: Arc = - live.upgrade().context("no longer live")?; - for peer in initial_peers { - live.add_peer_if_not_seen(peer) - .context("torrent closed")?; - } - } - - if let Some(mut peer_rx) = peer_rx { - while let Some(peer) = peer_rx.next().await { - live.upgrade() - .context("no longer live")? - .add_peer_if_not_seen(peer) - .context("torrent closed")?; - } - } - - Ok(()) - }, + peer_adder(Arc::downgrade(&live)), ); Ok(()) @@ -186,7 +194,11 @@ impl ManagedTorrent { ManagedTorrentState::Paused(_) => { let paused = g.state.take().assert_paused(); let live = TorrentStateLive::new(paused); - g.state = ManagedTorrentState::Live(live); + g.state = ManagedTorrentState::Live(live.clone()); + spawn( + error_span!(parent: span.clone(), "external_peer_adder"), + peer_adder(Arc::downgrade(&live)), + ); Ok(()) } ManagedTorrentState::Error(_) => {