diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index aedd24b..82e4e2c 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -26,6 +26,7 @@ use librqbit_core::torrent_metainfo::TorrentMetaV1Info; pub use live::*; use parking_lot::RwLock; +use tokio::sync::Notify; use tokio::time::timeout; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; @@ -108,6 +109,8 @@ pub struct ManagedTorrentInfo { pub struct ManagedTorrent { pub info: Arc, pub(crate) storage_factory: Box, + + state_change_notify: Notify, locked: RwLock, } @@ -181,6 +184,8 @@ impl ManagedTorrent { _ => {} }; + self.state_change_notify.notify_waiters(); + g.state = ManagedTorrentState::Error(error) } @@ -279,6 +284,7 @@ impl ManagedTorrent { if start_paused { g.state = ManagedTorrentState::Paused(paused); + t.state_change_notify.notify_waiters(); return Ok(()); } @@ -286,6 +292,7 @@ impl ManagedTorrent { let live = TorrentStateLive::new(paused, tx, live_cancellation_token)?; g.state = ManagedTorrentState::Live(live.clone()); + t.state_change_notify.notify_waiters(); spawn_fatal_errors_receiver(&t, rx, token); spawn_peer_adder(&live, peer_rx); @@ -295,6 +302,7 @@ impl ManagedTorrent { Err(err) => { let result = anyhow::anyhow!("{:?}", err); t.locked.write().state = ManagedTorrentState::Error(err); + t.state_change_notify.notify_waiters(); Err(result) } } @@ -317,6 +325,7 @@ impl ManagedTorrent { g.only_files.clone(), )); g.state = ManagedTorrentState::Initializing(initializing.clone()); + self.state_change_notify.notify_waiters(); drop(g); // Recurse. @@ -333,6 +342,7 @@ impl ManagedTorrent { ManagedTorrentState::Live(live) => { let paused = live.pause()?; g.state = ManagedTorrentState::Paused(paused); + self.state_change_notify.notify_waiters(); Ok(()) } ManagedTorrentState::Initializing(_) => { @@ -419,7 +429,7 @@ impl ManagedTorrent { if done { return Ok(()); } - tokio::time::sleep(Duration::from_secs(1)).await; + let _ = timeout(Duration::from_secs(1), self.state_change_notify.notified()).await; } } .boxed() @@ -441,7 +451,7 @@ impl ManagedTorrent { if let Some(live) = live { break live; } - tokio::time::sleep(Duration::from_secs(1)).await; + let _ = timeout(Duration::from_secs(1), self.state_change_notify.notified()).await; }; live.wait_until_completed().await; @@ -645,6 +655,7 @@ impl ManagedTorrentBuilder { state: ManagedTorrentState::Initializing(initializing), only_files: self.only_files, }), + state_change_notify: Notify::new(), storage_factory, info, }))