Reorganize _start() a bit

This commit is contained in:
Igor Katson 2024-12-06 12:08:03 +00:00
parent 7ed037cba8
commit 38fec48879
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -279,6 +279,116 @@ impl ManagedTorrent {
peer_rx: Option<PeerStream>,
start_paused: bool,
) -> anyhow::Result<()> {
// State machine transitions.
//
// - error -> initializing
// - initializing -> paused
// - paused -> live
// - live -> paused
//
// - initializing -> error
// - live -> error
fn _start<'a>(
t: &'a Arc<ManagedTorrent>,
peer_rx: Option<PeerStream>,
start_paused: bool,
session: Arc<Session>,
g: Option<parking_lot::RwLockWriteGuard<'a, ManagedTorrentLocked>>,
token: CancellationToken,
) -> anyhow::Result<()> {
let mut g = g.unwrap_or_else(|| t.locked.write());
match &g.state {
ManagedTorrentState::Live(_) => {
bail!("torrent is already live");
}
ManagedTorrentState::Initializing(init) => {
let init = init.clone();
drop(g);
let t = t.clone();
let span = t.shared().span.clone();
let token = token.clone();
spawn_with_cancel(
error_span!(parent: span.clone(), "initialize_and_start"),
token.clone(),
async move {
let concurrent_init_semaphore =
session.concurrent_initialize_semaphore.clone();
let _permit = concurrent_init_semaphore
.acquire()
.await
.context("bug: concurrent init semaphore was closed")?;
match init.check().await {
Ok(paused) => {
let mut g = t.locked.write();
if let ManagedTorrentState::Initializing(_) = &g.state {
} else {
debug!("no need to start torrent anymore, as it switched state from initilizing");
return Ok(());
}
g.state = ManagedTorrentState::Paused(paused);
t.state_change_notify.notify_waiters();
if start_paused {
return Ok(());
}
_start(&t, peer_rx, start_paused, session, Some(g), token)
}
Err(err) => {
let result = anyhow::anyhow!("{:?}", err);
t.locked.write().state = ManagedTorrentState::Error(err);
t.state_change_notify.notify_waiters();
Err(result)
}
}
},
);
Ok(())
}
ManagedTorrentState::Paused(_) => {
if start_paused {
warn!("start(start_paused=true) called, but torrent already paused");
return Ok(());
}
let paused = g.state.take().assert_paused();
let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx, token.clone())?;
g.state = ManagedTorrentState::Live(live.clone());
t.state_change_notify.notify_waiters();
drop(g);
spawn_fatal_errors_receiver(t, rx, token);
if let Some(peer_rx) = peer_rx {
spawn_peer_adder(&live, peer_rx);
}
Ok(())
}
ManagedTorrentState::Error(_) => {
let metadata = t.metadata.load_full().expect("TODO");
let initializing = Arc::new(TorrentStateInitializing::new(
t.shared.clone(),
metadata.clone(),
g.only_files.clone(),
t.shared
.storage_factory
.create_and_init(t.shared(), &metadata)?,
true,
));
g.state = ManagedTorrentState::Initializing(initializing.clone());
t.state_change_notify.notify_waiters();
// Recurse.
_start(t, peer_rx, start_paused, session, Some(g), token)
}
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
}
}
let session = self
.shared
.session
@ -288,165 +398,14 @@ impl ManagedTorrent {
g.paused = start_paused;
let cancellation_token = session.cancellation_token().child_token();
let spawn_fatal_errors_receiver =
|state: &Arc<Self>,
rx: tokio::sync::oneshot::Receiver<anyhow::Error>,
token: CancellationToken| {
let span = state.shared.span.clone();
let state = Arc::downgrade(state);
spawn_with_cancel(
error_span!(parent: span, "fatal_errors_receiver"),
token,
async move {
let e = match rx.await {
Ok(e) => e,
Err(_) => return Ok(()),
};
if let Some(state) = state.upgrade() {
state.stop_with_error(e);
} else {
warn!("tried to stop the torrent with error, but couldn't upgrade the arc");
}
Ok(())
},
);
};
fn spawn_peer_adder(live: &Arc<TorrentStateLive>, mut peer_rx: PeerStream) {
live.spawn(
error_span!(parent: live.torrent().span.clone(), "external_peer_adder"),
{
let live = live.clone();
async move {
let live = {
let weak = Arc::downgrade(&live);
drop(live);
weak
};
loop {
match timeout(Duration::from_secs(5), peer_rx.next()).await {
Ok(Some(peer)) => {
trace!(?peer, "received peer from peer_rx");
let live = match live.upgrade() {
Some(live) => live,
None => return Ok(()),
};
live.add_peer_if_not_seen(peer).context("torrent closed")?;
}
Ok(None) => {
debug!("peer_rx closed, closing peer adder");
return Ok(());
}
// If timeout, check if the torrent is live.
Err(_) if live.strong_count() == 0 => {
debug!("timed out waiting for peers, torrent isn't live, closing peer adder");
return Ok(());
}
Err(_) => continue,
}
}
}
},
);
}
match &g.state {
ManagedTorrentState::Live(_) => {
bail!("torrent is already live");
}
ManagedTorrentState::Initializing(init) => {
let init = init.clone();
drop(g);
let t = self.clone();
let span = self.shared().span.clone();
let token = cancellation_token.clone();
spawn_with_cancel(
error_span!(parent: span.clone(), "initialize_and_start"),
token.clone(),
async move {
let concurrent_init_semaphore =
session.concurrent_initialize_semaphore.clone();
let _permit = concurrent_init_semaphore
.acquire()
.await
.context("bug: concurrent init semaphore was closed")?;
match init.check().await {
Ok(paused) => {
let mut g = t.locked.write();
if let ManagedTorrentState::Initializing(_) = &g.state {
} else {
debug!("no need to start torrent anymore, as it switched state from initilizing");
return Ok(());
}
if start_paused {
g.state = ManagedTorrentState::Paused(paused);
t.state_change_notify.notify_waiters();
return Ok(());
}
let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx, cancellation_token)?;
g.state = ManagedTorrentState::Live(live.clone());
drop(g);
t.state_change_notify.notify_waiters();
spawn_fatal_errors_receiver(&t, rx, token);
if let Some(peer_rx) = peer_rx {
spawn_peer_adder(&live, peer_rx);
}
Ok(())
}
Err(err) => {
let result = anyhow::anyhow!("{:?}", err);
t.locked.write().state = ManagedTorrentState::Error(err);
t.state_change_notify.notify_waiters();
Err(result)
}
}
},
);
Ok(())
}
ManagedTorrentState::Paused(_) => {
let paused = g.state.take().assert_paused();
let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx, cancellation_token.clone())?;
g.state = ManagedTorrentState::Live(live.clone());
drop(g);
spawn_fatal_errors_receiver(self, rx, cancellation_token);
if let Some(peer_rx) = peer_rx {
spawn_peer_adder(&live, peer_rx);
}
Ok(())
}
ManagedTorrentState::Error(_) => {
let metadata = self.metadata.load_full().expect("TODO");
let initializing = Arc::new(TorrentStateInitializing::new(
self.shared.clone(),
metadata.clone(),
g.only_files.clone(),
self.shared
.storage_factory
.create_and_init(self.shared(), &metadata)?,
true,
));
g.state = ManagedTorrentState::Initializing(initializing.clone());
drop(g);
self.state_change_notify.notify_waiters();
// Recurse.
self.start(peer_rx, start_paused)
}
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
}
_start(
self,
peer_rx,
start_paused,
session,
Some(g),
cancellation_token,
)
}
pub fn is_paused(&self) -> bool {
@ -618,3 +577,67 @@ impl ManagedTorrent {
}
pub type ManagedTorrentHandle = Arc<ManagedTorrent>;
fn spawn_fatal_errors_receiver(
state: &Arc<ManagedTorrent>,
rx: tokio::sync::oneshot::Receiver<anyhow::Error>,
token: CancellationToken,
) {
let span = state.shared.span.clone();
let state = Arc::downgrade(state);
spawn_with_cancel(
error_span!(parent: span, "fatal_errors_receiver"),
token,
async move {
let e = match rx.await {
Ok(e) => e,
Err(_) => return Ok(()),
};
if let Some(state) = state.upgrade() {
state.stop_with_error(e);
} else {
warn!("tried to stop the torrent with error, but couldn't upgrade the arc");
}
Ok(())
},
);
}
fn spawn_peer_adder(live: &Arc<TorrentStateLive>, mut peer_rx: PeerStream) {
live.spawn(
error_span!(parent: live.torrent().span.clone(), "external_peer_adder"),
{
let live = live.clone();
async move {
let live = {
let weak = Arc::downgrade(&live);
drop(live);
weak
};
loop {
match timeout(Duration::from_secs(5), peer_rx.next()).await {
Ok(Some(peer)) => {
trace!(?peer, "received peer from peer_rx");
let live = match live.upgrade() {
Some(live) => live,
None => return Ok(()),
};
live.add_peer_if_not_seen(peer).context("torrent closed")?;
}
Ok(None) => {
debug!("peer_rx closed, closing peer adder");
return Ok(());
}
// If timeout, check if the torrent is live.
Err(_) if live.strong_count() == 0 => {
debug!("timed out waiting for peers, torrent isn't live, closing peer adder");
return Ok(());
}
Err(_) => continue,
}
}
}
},
);
}