No sleeps, wait properly now

This commit is contained in:
Igor Katson 2024-04-30 10:19:28 +01:00
parent e3254f97bf
commit aec1b94544

View file

@ -26,6 +26,7 @@ use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
pub use live::*;
use parking_lot::RwLock;
use tokio::sync::Notify;
use tokio::time::timeout;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
@ -108,6 +109,8 @@ pub struct ManagedTorrentInfo {
pub struct ManagedTorrent {
pub info: Arc<ManagedTorrentInfo>,
pub(crate) storage_factory: Box<dyn StorageFactory>,
state_change_notify: Notify,
locked: RwLock<ManagedTorrentLocked>,
}
@ -181,6 +184,8 @@ impl ManagedTorrent {
_ => {}
};
self.state_change_notify.notify_waiters();
g.state = ManagedTorrentState::Error(error)
}
@ -279,6 +284,7 @@ impl ManagedTorrent {
if start_paused {
g.state = ManagedTorrentState::Paused(paused);
t.state_change_notify.notify_waiters();
return Ok(());
}
@ -286,6 +292,7 @@ impl ManagedTorrent {
let live =
TorrentStateLive::new(paused, tx, live_cancellation_token)?;
g.state = ManagedTorrentState::Live(live.clone());
t.state_change_notify.notify_waiters();
spawn_fatal_errors_receiver(&t, rx, token);
spawn_peer_adder(&live, peer_rx);
@ -295,6 +302,7 @@ impl ManagedTorrent {
Err(err) => {
let result = anyhow::anyhow!("{:?}", err);
t.locked.write().state = ManagedTorrentState::Error(err);
t.state_change_notify.notify_waiters();
Err(result)
}
}
@ -317,6 +325,7 @@ impl ManagedTorrent {
g.only_files.clone(),
));
g.state = ManagedTorrentState::Initializing(initializing.clone());
self.state_change_notify.notify_waiters();
drop(g);
// Recurse.
@ -333,6 +342,7 @@ impl ManagedTorrent {
ManagedTorrentState::Live(live) => {
let paused = live.pause()?;
g.state = ManagedTorrentState::Paused(paused);
self.state_change_notify.notify_waiters();
Ok(())
}
ManagedTorrentState::Initializing(_) => {
@ -419,7 +429,7 @@ impl ManagedTorrent {
if done {
return Ok(());
}
tokio::time::sleep(Duration::from_secs(1)).await;
let _ = timeout(Duration::from_secs(1), self.state_change_notify.notified()).await;
}
}
.boxed()
@ -441,7 +451,7 @@ impl ManagedTorrent {
if let Some(live) = live {
break live;
}
tokio::time::sleep(Duration::from_secs(1)).await;
let _ = timeout(Duration::from_secs(1), self.state_change_notify.notified()).await;
};
live.wait_until_completed().await;
@ -645,6 +655,7 @@ impl ManagedTorrentBuilder {
state: ManagedTorrentState::Initializing(initializing),
only_files: self.only_files,
}),
state_change_notify: Notify::new(),
storage_factory,
info,
}))