Streaming test

This commit is contained in:
Igor Katson 2024-04-24 18:58:30 +01:00
parent 10974888fa
commit d1c6a5b584
5 changed files with 24 additions and 13 deletions

View file

@ -136,6 +136,7 @@ impl TorrentStateInitializing {
info: self.meta.clone(), info: self.meta.clone(),
files, files,
chunk_tracker, chunk_tracker,
streams: Arc::new(Default::default()),
}; };
Ok(paused) Ok(paused)
} }

View file

@ -104,6 +104,7 @@ use self::{
use super::{ use super::{
paused::TorrentStatePaused, paused::TorrentStatePaused,
streaming::TorrentStreams,
utils::{timeit, TimedExistence}, utils::{timeit, TimedExistence},
ManagedTorrentInfo, ManagedTorrentInfo,
}; };
@ -174,6 +175,8 @@ pub struct TorrentStateLive {
down_speed_estimator: SpeedEstimator, down_speed_estimator: SpeedEstimator,
up_speed_estimator: SpeedEstimator, up_speed_estimator: SpeedEstimator,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
pub(crate) streams: Arc<TorrentStreams>,
} }
impl TorrentStateLive { impl TorrentStateLive {
@ -219,6 +222,7 @@ impl TorrentStateLive {
down_speed_estimator, down_speed_estimator,
up_speed_estimator, up_speed_estimator,
cancellation_token, cancellation_token,
streams: paused.streams,
}); });
state.spawn( state.spawn(
@ -647,6 +651,7 @@ impl TorrentStateLive {
info: self.meta.clone(), info: self.meta.clone(),
files, files,
chunk_tracker, chunk_tracker,
streams: self.streams.clone(),
}) })
} }
@ -692,6 +697,8 @@ impl TorrentStateLive {
} }
} }
self.streams.wake_streams_on_piece_completed(id);
if self.is_finished() { if self.is_finished() {
info!("torrent finished downloading"); info!("torrent finished downloading");
self.finished_notify.notify_waiters(); self.finished_notify.notify_waiters();

View file

@ -42,7 +42,6 @@ use initializing::TorrentStateInitializing;
use self::paused::TorrentStatePaused; use self::paused::TorrentStatePaused;
pub use self::stats::{TorrentStats, TorrentStatsState}; pub use self::stats::{TorrentStats, TorrentStatsState};
use self::streaming::TorrentStreams;
pub enum ManagedTorrentState { pub enum ManagedTorrentState {
Initializing(Arc<TorrentStateInitializing>), Initializing(Arc<TorrentStateInitializing>),
@ -94,7 +93,6 @@ pub struct ManagedTorrentInfo {
pub struct ManagedTorrent { pub struct ManagedTorrent {
pub info: Arc<ManagedTorrentInfo>, pub info: Arc<ManagedTorrentInfo>,
pub(crate) streams: TorrentStreams,
locked: RwLock<ManagedTorrentLocked>, locked: RwLock<ManagedTorrentLocked>,
} }
@ -551,7 +549,6 @@ impl ManagedTorrentBuilder {
only_files: self.only_files, only_files: self.only_files,
}), }),
info, info,
streams: Default::default(),
})) }))
} }
} }

View file

@ -5,12 +5,13 @@ use crate::{
type_aliases::OpenedFiles, type_aliases::OpenedFiles,
}; };
use super::ManagedTorrentInfo; use super::{streaming::TorrentStreams, ManagedTorrentInfo};
pub struct TorrentStatePaused { pub struct TorrentStatePaused {
pub(crate) info: Arc<ManagedTorrentInfo>, pub(crate) info: Arc<ManagedTorrentInfo>,
pub(crate) files: OpenedFiles, pub(crate) files: OpenedFiles,
pub(crate) chunk_tracker: ChunkTracker, pub(crate) chunk_tracker: ChunkTracker,
pub(crate) streams: Arc<TorrentStreams>,
} }
impl TorrentStatePaused { impl TorrentStatePaused {

View file

@ -53,6 +53,7 @@ impl TorrentStreams {
struct FileStream { struct FileStream {
torrent: ManagedTorrentHandle, torrent: ManagedTorrentHandle,
streams: Arc<TorrentStreams>,
stream_id: usize, stream_id: usize,
file_id: usize, file_id: usize,
position: u64, position: u64,
@ -112,8 +113,7 @@ impl AsyncRead for FileStream {
let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| { let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| {
let have = ct.get_have_pieces()[piece_id.get() as usize]; let have = ct.get_have_pieces()[piece_id.get() as usize];
if !have { if !have {
self.torrent self.streams
.streams
.register_waker(self.stream_id, piece_id, cx.waker().clone()); .register_waker(self.stream_id, piece_id, cx.waker().clone());
} }
have have
@ -185,7 +185,7 @@ impl AsyncSeek for FileStream {
impl Drop for FileStream { impl Drop for FileStream {
fn drop(&mut self) { fn drop(&mut self) {
self.torrent.streams.drop_stream(self.stream_id) self.streams.drop_stream(self.stream_id)
} }
} }
@ -205,16 +205,21 @@ impl ManagedTorrent {
}) })
} }
fn streams(&self) -> anyhow::Result<Arc<TorrentStreams>> {
self.with_state(|s| match s {
crate::ManagedTorrentState::Paused(p) => Ok(p.streams.clone()),
crate::ManagedTorrentState::Live(l) => Ok(l.streams.clone()),
_ => anyhow::bail!("invalid state"),
})
}
pub fn stream(self: Arc<Self>, file_id: usize) -> anyhow::Result<impl AsyncRead + AsyncSeek> { pub fn stream(self: Arc<Self>, file_id: usize) -> anyhow::Result<impl AsyncRead + AsyncSeek> {
let (fd_len, fd_offset) = let (fd_len, fd_offset) =
self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?; self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?;
let streams = self.streams()?;
Ok(FileStream { Ok(FileStream {
stream_id: { stream_id: streams.next_id(),
let this = &self; streams,
&this.streams
}
.next_id(),
file_id, file_id,
position: 0, position: 0,