Fatal error handling

This commit is contained in:
Igor Katson 2023-11-25 13:46:50 +00:00
parent 051a231482
commit 6f113c5137
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 109 additions and 35 deletions

View file

@ -137,6 +137,9 @@ pub(crate) struct TorrentStateLocked {
// At a moment in time, we are expecting a piece from only one peer.
// inflight_pieces stores this information.
inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
// If this is None, then it was already used
fatal_errors_tx: Option<tokio::sync::oneshot::Sender<anyhow::Error>>,
}
impl TorrentStateLocked {
@ -187,7 +190,10 @@ pub struct TorrentStateLive {
}
impl TorrentStateLive {
pub(crate) fn new(paused: TorrentStatePaused) -> Arc<Self> {
pub(crate) fn new(
paused: TorrentStatePaused,
fatal_errors_tx: tokio::sync::oneshot::Sender<anyhow::Error>,
) -> Arc<Self> {
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
let speed_estimator = SpeedEstimator::new(5);
@ -204,6 +210,7 @@ impl TorrentStateLive {
locked: RwLock::new(TorrentStateLocked {
chunks: Some(paused.chunk_tracker),
inflight_pieces: Default::default(),
fatal_errors_tx: Some(fatal_errors_tx),
}),
files: paused.files,
filenames: paused.filenames,
@ -438,6 +445,10 @@ impl TorrentStateLive {
}
}
pub fn meta(&self) -> &ManagedTorrentInfo {
&self.meta
}
pub fn info(&self) -> &TorrentMetaV1Info<ByteString> {
&self.meta.info
}
@ -668,6 +679,19 @@ impl TorrentStateLive {
have_bytes,
})
}
fn on_fatal_error(&self, e: anyhow::Error) -> anyhow::Result<()> {
let mut g = self.lock_write("fatal_error");
let tx = g
.fatal_errors_tx
.take()
.context("fatal_errors_tx already taken")?;
let res = anyhow::anyhow!("fatal error: {:?}", e);
if tx.send(e).is_err() {
warn!("there's nowhere to send fatal error, receiver is dead");
}
Err(res)
}
}
struct PeerHandlerLocked {
@ -1286,7 +1310,7 @@ impl PeerHandler {
Ok(()) => {}
Err(e) => {
error!("FATAL: error writing chunk to disk: {:?}", e);
panic!("{:?}", e);
return self.state.on_fatal_error(e);
}
}

View file

@ -10,7 +10,6 @@ use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use anyhow::bail;
@ -28,6 +27,7 @@ use tokio_stream::StreamExt;
use tracing::debug;
use tracing::error;
use tracing::error_span;
use tracing::warn;
use url::Url;
use crate::chunk_tracker::ChunkTracker;
@ -138,6 +138,30 @@ impl ManagedTorrent {
}
}
fn stop_with_error(&self, error: anyhow::Error) {
let mut g = self.locked.write();
match g.state.take() {
ManagedTorrentState::Live(live) => {
if let Err(err) = live.pause() {
warn!(
"error pausing live torrent during fatal error handling: {:?}",
err
);
}
}
ManagedTorrentState::Error(e) => {
warn!("bug: torrent already was in error state when trying to stop it. Previous error was: {:?}", e);
}
ManagedTorrentState::None => {
warn!("bug: torrent encountered in None state during fatal error handling")
}
_ => {}
};
g.state = ManagedTorrentState::Error(error)
}
pub fn start(
self: &Arc<Self>,
initial_peers: Vec<SocketAddr>,
@ -146,30 +170,59 @@ impl ManagedTorrent {
) -> anyhow::Result<()> {
let mut g = self.locked.write();
let peer_adder = |live: Weak<TorrentStateLive>| async move {
{
let live: Arc<TorrentStateLive> = live.upgrade().context("no longer live")?;
for peer in initial_peers {
live.add_peer_if_not_seen(peer).context("torrent closed")?;
}
}
let spawn_fatal_errors_receiver =
|state: &Arc<Self>, rx: tokio::sync::oneshot::Receiver<anyhow::Error>| {
let span = state.info.span.clone();
let state = Arc::downgrade(state);
spawn(
"fatal_errors_receiver",
error_span!(parent: span, "fatal_errors_receiver"),
async move {
let e = match rx.await {
Ok(e) => e,
Err(_) => return Ok(()),
};
if let Some(state) = state.upgrade() {
state.stop_with_error(e);
} else {
warn!("tried to stop the torrent with error, but it's couldn't upgrade the arc");
}
Ok(())
},
);
};
if let Some(mut peer_rx) = peer_rx {
while let Some(peer) = peer_rx.next().await {
live.upgrade()
.context("no longer live")?
.add_peer_if_not_seen(peer)
.context("torrent closed")?;
}
} else {
error!("peer rx is not set");
}
let spawn_peer_adder = |live: &Arc<TorrentStateLive>| {
let span = live.meta().span.clone();
let live = Arc::downgrade(live);
spawn(
"external_peer_adder",
error_span!(parent: span, "external_peer_adder"),
async move {
{
let live: Arc<TorrentStateLive> =
live.upgrade().context("no longer live")?;
for peer in initial_peers {
live.add_peer_if_not_seen(peer).context("torrent closed")?;
}
}
Ok(())
if let Some(mut peer_rx) = peer_rx {
while let Some(peer) = peer_rx.next().await {
live.upgrade()
.context("no longer live")?
.add_peer_if_not_seen(peer)
.context("torrent closed")?;
}
} else {
error!("peer rx is not set");
}
Ok(())
},
);
};
let span = self.info.span.clone();
match &g.state {
ManagedTorrentState::Live(_) => {
bail!("torrent is already live");
@ -177,6 +230,7 @@ impl ManagedTorrent {
ManagedTorrentState::Initializing(init) => {
let init = init.clone();
let t = self.clone();
let span = self.info().span.clone();
spawn(
"initialize_and_start",
error_span!(parent: span.clone(), "initialize_and_start"),
@ -195,14 +249,12 @@ impl ManagedTorrent {
return Ok(());
}
let live = TorrentStateLive::new(paused);
let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx);
g.state = ManagedTorrentState::Live(live.clone());
spawn(
"external_peer_adder",
error_span!(parent: span.clone(), "external_peer_adder"),
peer_adder(Arc::downgrade(&live)),
);
spawn_fatal_errors_receiver(&t, rx);
spawn_peer_adder(&live);
Ok(())
}
@ -218,13 +270,11 @@ impl ManagedTorrent {
}
ManagedTorrentState::Paused(_) => {
let paused = g.state.take().assert_paused();
let live = TorrentStateLive::new(paused);
let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx);
g.state = ManagedTorrentState::Live(live.clone());
spawn(
"external_peer_adder",
error_span!(parent: span.clone(), "external_peer_adder"),
peer_adder(Arc::downgrade(&live)),
);
spawn_fatal_errors_receiver(self, rx);
spawn_peer_adder(&live);
Ok(())
}
ManagedTorrentState::Error(_) => {