diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 8afc060..ffbac0c 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -707,7 +707,8 @@ impl TorrentStateLive { } } - self.streams.wake_streams_on_piece_completed(id); + self.streams + .wake_streams_on_piece_completed(id, &self.meta.lengths); if self.is_finished() { if self.lock_read("chunks").get_chunks()?.get_selected_pieces()[id.get_usize()] { diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 692d0a4..ec98835 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -1,4 +1,5 @@ use std::{ + collections::VecDeque, io::{Read, Seek, SeekFrom}, sync::{ atomic::{AtomicUsize, Ordering}, @@ -9,7 +10,6 @@ use std::{ use anyhow::Context; use dashmap::DashMap; -use itertools::Itertools; use librqbit_core::lengths::{Lengths, ValidPieceIndex}; use tokio::io::{AsyncRead, AsyncSeek}; use tracing::{debug, trace}; @@ -20,24 +20,64 @@ use super::ManagedTorrentHandle; type StreamId = usize; -// Buffer either 1/10th of the file forward. -const PER_STREAM_BUF_PART: u64 = 10; -// Or 32 mb, whichever is larger -const PER_STREAM_BUF_MIN: u64 = 32 * 1024 * 1024; +// 32 mb lookahead by default. +const PER_STREAM_BUF_DEFAULT: u64 = 32 * 1024 * 1024; struct StreamState { file_id: usize, - current_piece: ValidPieceIndex, file_len: u64, + file_abs_offset: u64, + position: u64, waker: Option, } +impl StreamState { + fn current_piece(&self, lengths: &Lengths) -> Option { + compute_current_piece(lengths, self.position, self.file_abs_offset) + } + + fn queue<'a>(&self, lengths: &'a Lengths) -> impl Iterator + 'a { + let start = self.file_abs_offset + self.position; + let end = (start + PER_STREAM_BUF_DEFAULT).min(self.file_abs_offset + self.file_len); + let dpl = lengths.default_piece_length(); + let start_id = (start / dpl as u64).try_into().unwrap(); + let end_id = end.div_ceil(dpl as u64).try_into().unwrap(); + (start_id..end_id).filter_map(|i| lengths.validate_piece_index(i)) + } +} + #[derive(Default)] pub(crate) struct TorrentStreams { next_stream_id: AtomicUsize, streams: DashMap, } +struct CurrentPiece { + id: ValidPieceIndex, + piece_remaining: u32, +} + +fn compute_current_piece( + lengths: &Lengths, + file_pos: u64, + file_torrent_abs_offset: u64, +) -> Option { + let dpl = lengths.default_piece_length(); + + let abs_pos = file_torrent_abs_offset + file_pos; + let piece_id = abs_pos / dpl as u64; + let piece_id: u32 = piece_id.try_into().ok()?; + + let piece_id = lengths.validate_piece_index(piece_id)?; + let piece_len = lengths.piece_length(piece_id); + Some(CurrentPiece { + id: piece_id, + piece_remaining: (piece_len as u64 - (abs_pos % dpl as u64)) + .try_into() + .ok()?, + }) +} + impl TorrentStreams { fn next_id(&self) -> usize { self.next_stream_id.fetch_add(1, Ordering::Relaxed) @@ -50,61 +90,40 @@ impl TorrentStreams { } } - // Queue 1st, 2nd etc pieces from each stream in turn until they get 1/10th of the file . - pub(crate) fn iter_next_pieces( - &self, - lengths: &Lengths, - ) -> impl Iterator { - let all = self - .streams - .iter() - .map(|s| { - let remaining = (s.value().file_len - + lengths.piece_length(s.value().current_piece) as u64) - .div_ceil(PER_STREAM_BUF_PART) - .max(PER_STREAM_BUF_MIN); - (s.value().current_piece, remaining) - }) - .map(Some) - .collect_vec(); - - struct It { - all: Vec>, - lengths: Lengths, + // Interleave 1st, 2nd etc pieces from each active stream in turn until they get 1/10th of the file . + pub(crate) fn iter_next_pieces<'a>( + &'a self, + lengths: &'a Lengths, + ) -> impl Iterator + 'a { + struct Interleave { + all: VecDeque, } - impl Iterator for It { + impl> Iterator for Interleave { type Item = ValidPieceIndex; fn next(&mut self) -> Option { - for item in self.all.iter_mut() { - if let Some((p, remaining)) = item { - let y = *p; - let pl = self.lengths.piece_length(y); - *remaining = remaining.saturating_sub(pl as u64); - if *remaining == 0 { - *item = None; - } else if let Some(next_p) = self.lengths.validate_piece_index(y.get() + 1) - { - *item = Some((next_p, *remaining)) - } else { - *item = None; - } - return Some(y); + while let Some(mut it) = self.all.pop_front() { + if let Some(piece) = it.next() { + self.all.push_back(it); + return Some(piece); } } None } } - It { - all, - lengths: *lengths, - } + + let all = self.streams.iter().map(|s| s.queue(lengths)).collect(); + Interleave { all } } - pub(crate) fn wake_streams_on_piece_completed(&self, piece_id: ValidPieceIndex) { + pub(crate) fn wake_streams_on_piece_completed( + &self, + piece_id: ValidPieceIndex, + lengths: &Lengths, + ) { for mut w in self.streams.iter_mut() { - if w.value().current_piece == piece_id { + if w.value().current_piece(lengths).map(|p| p.id) == Some(piece_id) { if let Some(waker) = w.value_mut().waker.take() { trace!( stream_id = *w.key(), @@ -117,9 +136,9 @@ impl TorrentStreams { } } - fn drop_stream(&self, stream_id: StreamId) { + fn drop_stream(&self, stream_id: StreamId) -> Option { trace!(stream_id, "dropping stream"); - self.streams.remove(&stream_id); + self.streams.remove(&stream_id).map(|s| s.1) } pub(crate) fn streamed_file_ids(&self) -> impl Iterator + '_ { @@ -174,30 +193,17 @@ impl AsyncRead for FileStream { return Poll::Ready(Ok(())); } - // determine the piece that position is pointing to. - let lengths = &self.torrent.info().lengths; - let dpl = lengths.default_piece_length(); - - let abs_pos = self.file_torrent_abs_offset + self.position; - let piece_id = abs_pos / dpl as u64; - let piece_id: u32 = poll_try_io!(piece_id.try_into()); - - let piece_id = poll_try_io!(lengths - .validate_piece_index(piece_id) - .context("bug: invalid piece")); - let piece_len = lengths.piece_length(piece_id); - let piece_offset = abs_pos % dpl as u64; - let piece_remaining = piece_len as u64 - piece_offset; - - // queue N pieces after this if not yet. The "if let" should never fail. - if let Some(mut s) = self.streams.streams.get_mut(&self.stream_id) { - s.value_mut().current_piece = piece_id; - } + let current = poll_try_io!(compute_current_piece( + &self.torrent.info().lengths, + self.position, + self.file_torrent_abs_offset + ) + .context("invalid position")); // if the piece is not there, register to wake when it is // check if we have the piece for real let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| { - let have = ct.get_have_pieces()[piece_id.get() as usize]; + let have = ct.get_have_pieces()[current.id.get() as usize]; if !have { self.streams .register_waker(self.stream_id, cx.waker().clone()); @@ -205,16 +211,15 @@ impl AsyncRead for FileStream { have })); if !have { - trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %piece_id, "poll pending, not have"); + trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, "poll pending, not have"); return Poll::Pending; } // actually stream the piece let buf = tbuf.initialize_unfilled(); let file_remaining = self.file_len - self.position; - let bytes_to_read: usize = poll_try_io!((piece_len as u64) - .min(buf.len() as u64) - .min(piece_remaining) + let bytes_to_read: usize = poll_try_io!((buf.len() as u64) + .min(current.piece_remaining as u64) .min(file_remaining) .try_into()); @@ -238,6 +243,12 @@ impl AsyncRead for FileStream { self.as_mut().position += buf.len() as u64; tbuf.advance(bytes_to_read); + self.streams + .streams + .get_mut(&self.stream_id) + .unwrap() + .value_mut() + .position = self.position; Poll::Ready(Ok(())) } @@ -276,7 +287,7 @@ impl AsyncSeek for FileStream { impl Drop for FileStream { fn drop(&mut self) { - self.streams.drop_stream(self.stream_id) + self.streams.drop_stream(self.stream_id); } } @@ -321,7 +332,6 @@ impl ManagedTorrent { let (fd_len, fd_offset) = self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?; let streams = self.streams()?; - let first_piece = self.info().lengths.validate_piece_index(0).context("bug")?; let s = FileStream { stream_id: streams.next_id(), streams: streams.clone(), @@ -333,15 +343,17 @@ impl ManagedTorrent { torrent: self, }; if s.torrent.maybe_reconnect_needed_peers_for_file(file_id) { - s.torrent.with_opened_file(file_id, |f| f.reopen(false))??; + s.torrent + .with_opened_file(file_id, |fd| fd.reopen(false))??; } streams.streams.insert( s.stream_id, StreamState { file_id, - current_piece: first_piece, + position: 0, waker: None, - file_len: s.file_len, + file_len: fd_len, + file_abs_offset: fd_offset, }, );