diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 137618b..87ba127 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -279,6 +279,116 @@ impl ManagedTorrent { peer_rx: Option, start_paused: bool, ) -> anyhow::Result<()> { + // State machine transitions. + // + // - error -> initializing + // - initializing -> paused + // - paused -> live + // - live -> paused + // + // - initializing -> error + // - live -> error + + fn _start<'a>( + t: &'a Arc, + peer_rx: Option, + start_paused: bool, + session: Arc, + g: Option>, + token: CancellationToken, + ) -> anyhow::Result<()> { + let mut g = g.unwrap_or_else(|| t.locked.write()); + + match &g.state { + ManagedTorrentState::Live(_) => { + bail!("torrent is already live"); + } + ManagedTorrentState::Initializing(init) => { + let init = init.clone(); + drop(g); + let t = t.clone(); + let span = t.shared().span.clone(); + let token = token.clone(); + + spawn_with_cancel( + error_span!(parent: span.clone(), "initialize_and_start"), + token.clone(), + async move { + let concurrent_init_semaphore = + session.concurrent_initialize_semaphore.clone(); + let _permit = concurrent_init_semaphore + .acquire() + .await + .context("bug: concurrent init semaphore was closed")?; + + match init.check().await { + Ok(paused) => { + let mut g = t.locked.write(); + if let ManagedTorrentState::Initializing(_) = &g.state { + } else { + debug!("no need to start torrent anymore, as it switched state from initilizing"); + return Ok(()); + } + + g.state = ManagedTorrentState::Paused(paused); + t.state_change_notify.notify_waiters(); + + if start_paused { + return Ok(()); + } + + _start(&t, peer_rx, start_paused, session, Some(g), token) + } + Err(err) => { + let result = anyhow::anyhow!("{:?}", err); + t.locked.write().state = ManagedTorrentState::Error(err); + t.state_change_notify.notify_waiters(); + Err(result) + } + } + }, + ); + Ok(()) + } + ManagedTorrentState::Paused(_) => { + if start_paused { + warn!("start(start_paused=true) called, but torrent already paused"); + return Ok(()); + } + let paused = g.state.take().assert_paused(); + let (tx, rx) = tokio::sync::oneshot::channel(); + let live = TorrentStateLive::new(paused, tx, token.clone())?; + g.state = ManagedTorrentState::Live(live.clone()); + t.state_change_notify.notify_waiters(); + drop(g); + + spawn_fatal_errors_receiver(t, rx, token); + if let Some(peer_rx) = peer_rx { + spawn_peer_adder(&live, peer_rx); + } + Ok(()) + } + ManagedTorrentState::Error(_) => { + let metadata = t.metadata.load_full().expect("TODO"); + let initializing = Arc::new(TorrentStateInitializing::new( + t.shared.clone(), + metadata.clone(), + g.only_files.clone(), + t.shared + .storage_factory + .create_and_init(t.shared(), &metadata)?, + true, + )); + g.state = ManagedTorrentState::Initializing(initializing.clone()); + t.state_change_notify.notify_waiters(); + + // Recurse. + _start(t, peer_rx, start_paused, session, Some(g), token) + } + ManagedTorrentState::None => bail!("bug: torrent is in empty state"), + } + } + let session = self .shared .session @@ -288,165 +398,14 @@ impl ManagedTorrent { g.paused = start_paused; let cancellation_token = session.cancellation_token().child_token(); - let spawn_fatal_errors_receiver = - |state: &Arc, - rx: tokio::sync::oneshot::Receiver, - token: CancellationToken| { - let span = state.shared.span.clone(); - let state = Arc::downgrade(state); - spawn_with_cancel( - error_span!(parent: span, "fatal_errors_receiver"), - token, - async move { - let e = match rx.await { - Ok(e) => e, - Err(_) => return Ok(()), - }; - if let Some(state) = state.upgrade() { - state.stop_with_error(e); - } else { - warn!("tried to stop the torrent with error, but couldn't upgrade the arc"); - } - Ok(()) - }, - ); - }; - - fn spawn_peer_adder(live: &Arc, mut peer_rx: PeerStream) { - live.spawn( - error_span!(parent: live.torrent().span.clone(), "external_peer_adder"), - { - let live = live.clone(); - async move { - let live = { - let weak = Arc::downgrade(&live); - drop(live); - weak - }; - - loop { - match timeout(Duration::from_secs(5), peer_rx.next()).await { - Ok(Some(peer)) => { - trace!(?peer, "received peer from peer_rx"); - let live = match live.upgrade() { - Some(live) => live, - None => return Ok(()), - }; - live.add_peer_if_not_seen(peer).context("torrent closed")?; - } - Ok(None) => { - debug!("peer_rx closed, closing peer adder"); - return Ok(()); - } - // If timeout, check if the torrent is live. - Err(_) if live.strong_count() == 0 => { - debug!("timed out waiting for peers, torrent isn't live, closing peer adder"); - return Ok(()); - } - Err(_) => continue, - } - } - } - }, - ); - } - - match &g.state { - ManagedTorrentState::Live(_) => { - bail!("torrent is already live"); - } - ManagedTorrentState::Initializing(init) => { - let init = init.clone(); - drop(g); - let t = self.clone(); - let span = self.shared().span.clone(); - let token = cancellation_token.clone(); - - spawn_with_cancel( - error_span!(parent: span.clone(), "initialize_and_start"), - token.clone(), - async move { - let concurrent_init_semaphore = - session.concurrent_initialize_semaphore.clone(); - let _permit = concurrent_init_semaphore - .acquire() - .await - .context("bug: concurrent init semaphore was closed")?; - - match init.check().await { - Ok(paused) => { - let mut g = t.locked.write(); - if let ManagedTorrentState::Initializing(_) = &g.state { - } else { - debug!("no need to start torrent anymore, as it switched state from initilizing"); - return Ok(()); - } - - if start_paused { - g.state = ManagedTorrentState::Paused(paused); - t.state_change_notify.notify_waiters(); - return Ok(()); - } - - let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new(paused, tx, cancellation_token)?; - g.state = ManagedTorrentState::Live(live.clone()); - drop(g); - - t.state_change_notify.notify_waiters(); - - spawn_fatal_errors_receiver(&t, rx, token); - if let Some(peer_rx) = peer_rx { - spawn_peer_adder(&live, peer_rx); - } - - Ok(()) - } - Err(err) => { - let result = anyhow::anyhow!("{:?}", err); - t.locked.write().state = ManagedTorrentState::Error(err); - t.state_change_notify.notify_waiters(); - Err(result) - } - } - }, - ); - Ok(()) - } - ManagedTorrentState::Paused(_) => { - let paused = g.state.take().assert_paused(); - let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new(paused, tx, cancellation_token.clone())?; - g.state = ManagedTorrentState::Live(live.clone()); - drop(g); - - spawn_fatal_errors_receiver(self, rx, cancellation_token); - if let Some(peer_rx) = peer_rx { - spawn_peer_adder(&live, peer_rx); - } - Ok(()) - } - ManagedTorrentState::Error(_) => { - let metadata = self.metadata.load_full().expect("TODO"); - let initializing = Arc::new(TorrentStateInitializing::new( - self.shared.clone(), - metadata.clone(), - g.only_files.clone(), - self.shared - .storage_factory - .create_and_init(self.shared(), &metadata)?, - true, - )); - g.state = ManagedTorrentState::Initializing(initializing.clone()); - drop(g); - - self.state_change_notify.notify_waiters(); - - // Recurse. - self.start(peer_rx, start_paused) - } - ManagedTorrentState::None => bail!("bug: torrent is in empty state"), - } + _start( + self, + peer_rx, + start_paused, + session, + Some(g), + cancellation_token, + ) } pub fn is_paused(&self) -> bool { @@ -618,3 +577,67 @@ impl ManagedTorrent { } pub type ManagedTorrentHandle = Arc; + +fn spawn_fatal_errors_receiver( + state: &Arc, + rx: tokio::sync::oneshot::Receiver, + token: CancellationToken, +) { + let span = state.shared.span.clone(); + let state = Arc::downgrade(state); + spawn_with_cancel( + error_span!(parent: span, "fatal_errors_receiver"), + token, + async move { + let e = match rx.await { + Ok(e) => e, + Err(_) => return Ok(()), + }; + if let Some(state) = state.upgrade() { + state.stop_with_error(e); + } else { + warn!("tried to stop the torrent with error, but couldn't upgrade the arc"); + } + Ok(()) + }, + ); +} + +fn spawn_peer_adder(live: &Arc, mut peer_rx: PeerStream) { + live.spawn( + error_span!(parent: live.torrent().span.clone(), "external_peer_adder"), + { + let live = live.clone(); + async move { + let live = { + let weak = Arc::downgrade(&live); + drop(live); + weak + }; + + loop { + match timeout(Duration::from_secs(5), peer_rx.next()).await { + Ok(Some(peer)) => { + trace!(?peer, "received peer from peer_rx"); + let live = match live.upgrade() { + Some(live) => live, + None => return Ok(()), + }; + live.add_peer_if_not_seen(peer).context("torrent closed")?; + } + Ok(None) => { + debug!("peer_rx closed, closing peer adder"); + return Ok(()); + } + // If timeout, check if the torrent is live. + Err(_) if live.strong_count() == 0 => { + debug!("timed out waiting for peers, torrent isn't live, closing peer adder"); + return Ok(()); + } + Err(_) => continue, + } + } + } + }, + ); +}