diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 02f1376..2d55189 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -136,6 +136,7 @@ impl TorrentStateInitializing { info: self.meta.clone(), files, chunk_tracker, + streams: Arc::new(Default::default()), }; Ok(paused) } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 60dd1d5..57b758e 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -104,6 +104,7 @@ use self::{ use super::{ paused::TorrentStatePaused, + streaming::TorrentStreams, utils::{timeit, TimedExistence}, ManagedTorrentInfo, }; @@ -174,6 +175,8 @@ pub struct TorrentStateLive { down_speed_estimator: SpeedEstimator, up_speed_estimator: SpeedEstimator, cancellation_token: CancellationToken, + + pub(crate) streams: Arc, } impl TorrentStateLive { @@ -219,6 +222,7 @@ impl TorrentStateLive { down_speed_estimator, up_speed_estimator, cancellation_token, + streams: paused.streams, }); state.spawn( @@ -647,6 +651,7 @@ impl TorrentStateLive { info: self.meta.clone(), files, chunk_tracker, + streams: self.streams.clone(), }) } @@ -692,6 +697,8 @@ impl TorrentStateLive { } } + self.streams.wake_streams_on_piece_completed(id); + if self.is_finished() { info!("torrent finished downloading"); self.finished_notify.notify_waiters(); diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 65071ef..e203236 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -42,7 +42,6 @@ use initializing::TorrentStateInitializing; use self::paused::TorrentStatePaused; pub use self::stats::{TorrentStats, TorrentStatsState}; -use self::streaming::TorrentStreams; pub enum ManagedTorrentState { Initializing(Arc), @@ -94,7 +93,6 @@ pub struct ManagedTorrentInfo { pub struct ManagedTorrent { pub info: Arc, - pub(crate) streams: TorrentStreams, locked: RwLock, } @@ -551,7 +549,6 @@ impl ManagedTorrentBuilder { only_files: self.only_files, }), info, - streams: Default::default(), })) } } diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index 23d8fd3..848d89d 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -5,12 +5,13 @@ use crate::{ type_aliases::OpenedFiles, }; -use super::ManagedTorrentInfo; +use super::{streaming::TorrentStreams, ManagedTorrentInfo}; pub struct TorrentStatePaused { pub(crate) info: Arc, pub(crate) files: OpenedFiles, pub(crate) chunk_tracker: ChunkTracker, + pub(crate) streams: Arc, } impl TorrentStatePaused { diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 555df7a..27836c9 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -53,6 +53,7 @@ impl TorrentStreams { struct FileStream { torrent: ManagedTorrentHandle, + streams: Arc, stream_id: usize, file_id: usize, position: u64, @@ -112,8 +113,7 @@ impl AsyncRead for FileStream { let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| { let have = ct.get_have_pieces()[piece_id.get() as usize]; if !have { - self.torrent - .streams + self.streams .register_waker(self.stream_id, piece_id, cx.waker().clone()); } have @@ -185,7 +185,7 @@ impl AsyncSeek for FileStream { impl Drop for FileStream { fn drop(&mut self) { - self.torrent.streams.drop_stream(self.stream_id) + self.streams.drop_stream(self.stream_id) } } @@ -205,16 +205,21 @@ impl ManagedTorrent { }) } + fn streams(&self) -> anyhow::Result> { + self.with_state(|s| match s { + crate::ManagedTorrentState::Paused(p) => Ok(p.streams.clone()), + crate::ManagedTorrentState::Live(l) => Ok(l.streams.clone()), + _ => anyhow::bail!("invalid state"), + }) + } + pub fn stream(self: Arc, file_id: usize) -> anyhow::Result { let (fd_len, fd_offset) = self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?; + let streams = self.streams()?; Ok(FileStream { - stream_id: { - let this = &self; - &this.streams - } - .next_id(), - + stream_id: streams.next_id(), + streams, file_id, position: 0,