From 6f113c5137ba194d8da7e1fd9f5444c4253f544a Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 25 Nov 2023 13:46:50 +0000 Subject: [PATCH] Fatal error handling --- crates/librqbit/src/torrent_state/live/mod.rs | 28 ++++- crates/librqbit/src/torrent_state/mod.rs | 116 +++++++++++++----- 2 files changed, 109 insertions(+), 35 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 4045d85..5aba636 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -137,6 +137,9 @@ pub(crate) struct TorrentStateLocked { // At a moment in time, we are expecting a piece from only one peer. // inflight_pieces stores this information. inflight_pieces: HashMap, + + // If this is None, then it was already used + fatal_errors_tx: Option>, } impl TorrentStateLocked { @@ -187,7 +190,10 @@ pub struct TorrentStateLive { } impl TorrentStateLive { - pub(crate) fn new(paused: TorrentStatePaused) -> Arc { + pub(crate) fn new( + paused: TorrentStatePaused, + fatal_errors_tx: tokio::sync::oneshot::Sender, + ) -> Arc { let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); let speed_estimator = SpeedEstimator::new(5); @@ -204,6 +210,7 @@ impl TorrentStateLive { locked: RwLock::new(TorrentStateLocked { chunks: Some(paused.chunk_tracker), inflight_pieces: Default::default(), + fatal_errors_tx: Some(fatal_errors_tx), }), files: paused.files, filenames: paused.filenames, @@ -438,6 +445,10 @@ impl TorrentStateLive { } } + pub fn meta(&self) -> &ManagedTorrentInfo { + &self.meta + } + pub fn info(&self) -> &TorrentMetaV1Info { &self.meta.info } @@ -668,6 +679,19 @@ impl TorrentStateLive { have_bytes, }) } + + fn on_fatal_error(&self, e: anyhow::Error) -> anyhow::Result<()> { + let mut g = self.lock_write("fatal_error"); + let tx = g + .fatal_errors_tx + .take() + .context("fatal_errors_tx already taken")?; + let res = anyhow::anyhow!("fatal error: {:?}", e); + if tx.send(e).is_err() { + warn!("there's nowhere to send fatal error, receiver is dead"); + } + Err(res) + } } struct PeerHandlerLocked { @@ -1286,7 +1310,7 @@ impl PeerHandler { Ok(()) => {} Err(e) => { error!("FATAL: error writing chunk to disk: {:?}", e); - panic!("{:?}", e); + return self.state.on_fatal_error(e); } } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 2cc9fcd..31b9ca5 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -10,7 +10,6 @@ use std::path::Path; use std::path::PathBuf; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::Weak; use std::time::Duration; use anyhow::bail; @@ -28,6 +27,7 @@ use tokio_stream::StreamExt; use tracing::debug; use tracing::error; use tracing::error_span; +use tracing::warn; use url::Url; use crate::chunk_tracker::ChunkTracker; @@ -138,6 +138,30 @@ impl ManagedTorrent { } } + fn stop_with_error(&self, error: anyhow::Error) { + let mut g = self.locked.write(); + + match g.state.take() { + ManagedTorrentState::Live(live) => { + if let Err(err) = live.pause() { + warn!( + "error pausing live torrent during fatal error handling: {:?}", + err + ); + } + } + ManagedTorrentState::Error(e) => { + warn!("bug: torrent already was in error state when trying to stop it. Previous error was: {:?}", e); + } + ManagedTorrentState::None => { + warn!("bug: torrent encountered in None state during fatal error handling") + } + _ => {} + }; + + g.state = ManagedTorrentState::Error(error) + } + pub fn start( self: &Arc, initial_peers: Vec, @@ -146,30 +170,59 @@ impl ManagedTorrent { ) -> anyhow::Result<()> { let mut g = self.locked.write(); - let peer_adder = |live: Weak| async move { - { - let live: Arc = live.upgrade().context("no longer live")?; - for peer in initial_peers { - live.add_peer_if_not_seen(peer).context("torrent closed")?; - } - } + let spawn_fatal_errors_receiver = + |state: &Arc, rx: tokio::sync::oneshot::Receiver| { + let span = state.info.span.clone(); + let state = Arc::downgrade(state); + spawn( + "fatal_errors_receiver", + error_span!(parent: span, "fatal_errors_receiver"), + async move { + let e = match rx.await { + Ok(e) => e, + Err(_) => return Ok(()), + }; + if let Some(state) = state.upgrade() { + state.stop_with_error(e); + } else { + warn!("tried to stop the torrent with error, but it's couldn't upgrade the arc"); + } + Ok(()) + }, + ); + }; - if let Some(mut peer_rx) = peer_rx { - while let Some(peer) = peer_rx.next().await { - live.upgrade() - .context("no longer live")? - .add_peer_if_not_seen(peer) - .context("torrent closed")?; - } - } else { - error!("peer rx is not set"); - } + let spawn_peer_adder = |live: &Arc| { + let span = live.meta().span.clone(); + let live = Arc::downgrade(live); + spawn( + "external_peer_adder", + error_span!(parent: span, "external_peer_adder"), + async move { + { + let live: Arc = + live.upgrade().context("no longer live")?; + for peer in initial_peers { + live.add_peer_if_not_seen(peer).context("torrent closed")?; + } + } - Ok(()) + if let Some(mut peer_rx) = peer_rx { + while let Some(peer) = peer_rx.next().await { + live.upgrade() + .context("no longer live")? + .add_peer_if_not_seen(peer) + .context("torrent closed")?; + } + } else { + error!("peer rx is not set"); + } + + Ok(()) + }, + ); }; - let span = self.info.span.clone(); - match &g.state { ManagedTorrentState::Live(_) => { bail!("torrent is already live"); @@ -177,6 +230,7 @@ impl ManagedTorrent { ManagedTorrentState::Initializing(init) => { let init = init.clone(); let t = self.clone(); + let span = self.info().span.clone(); spawn( "initialize_and_start", error_span!(parent: span.clone(), "initialize_and_start"), @@ -195,14 +249,12 @@ impl ManagedTorrent { return Ok(()); } - let live = TorrentStateLive::new(paused); + let (tx, rx) = tokio::sync::oneshot::channel(); + let live = TorrentStateLive::new(paused, tx); g.state = ManagedTorrentState::Live(live.clone()); - spawn( - "external_peer_adder", - error_span!(parent: span.clone(), "external_peer_adder"), - peer_adder(Arc::downgrade(&live)), - ); + spawn_fatal_errors_receiver(&t, rx); + spawn_peer_adder(&live); Ok(()) } @@ -218,13 +270,11 @@ impl ManagedTorrent { } ManagedTorrentState::Paused(_) => { let paused = g.state.take().assert_paused(); - let live = TorrentStateLive::new(paused); + let (tx, rx) = tokio::sync::oneshot::channel(); + let live = TorrentStateLive::new(paused, tx); g.state = ManagedTorrentState::Live(live.clone()); - spawn( - "external_peer_adder", - error_span!(parent: span.clone(), "external_peer_adder"), - peer_adder(Arc::downgrade(&live)), - ); + spawn_fatal_errors_receiver(self, rx); + spawn_peer_adder(&live); Ok(()) } ManagedTorrentState::Error(_) => {