From f0788f2c4a91fb986d76192dfe57a0580f53e331 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 24 Apr 2024 22:21:41 +0100 Subject: [PATCH] Works fine now finally --- crates/librqbit/src/chunk_tracker.rs | 8 ++ crates/librqbit/src/http_api.rs | 3 - crates/librqbit/src/torrent_state/live/mod.rs | 16 ++- .../librqbit/src/torrent_state/streaming.rs | 129 ++++++++++++++---- 4 files changed, 121 insertions(+), 35 deletions(-) diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index ef8db7c..1bd0879 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -194,6 +194,14 @@ impl ChunkTracker { .filter_map(|id| self.lengths.validate_piece_index(id)) } + pub(crate) fn is_piece_queued(&self, id: ValidPieceIndex) -> bool { + self.queue_pieces[id.get() as usize] + } + + pub(crate) fn is_piece_have(&self, id: ValidPieceIndex) -> bool { + self.have[id.get() as usize] + } + // None if wrong chunk // true if did something // false if didn't do anything diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 0214f6f..f787c8a 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -163,9 +163,6 @@ impl HttpApi { headers: http::HeaderMap, ) -> Result { let mut stream = state.api_stream(idx, file_id)?; - - dbg!(&headers); - let mut status = StatusCode::OK; let mut output_headers = HeaderMap::new(); output_headers.insert("Accept-Ranges", HeaderValue::from_static("bytes")); diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 57b758e..6c69c58 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -965,10 +965,18 @@ impl PeerHandler { let n = { let mut n_opt = None; let bf = &live.bitfield; - for n in g - .get_chunks()? - .iter_queued_pieces(&g.file_priorities, &self.state.files) - { + let chunk_tracker = g.get_chunks()?; + let priority_streamed_pieces = self + .state + .streams + .iter_next_pieces(&self.state.lengths) + .filter(|pid| { + !chunk_tracker.is_piece_have(*pid) + && chunk_tracker.is_piece_queued(*pid) + }); + let natural_order_pieces = + chunk_tracker.iter_queued_pieces(&g.file_priorities, &self.state.files); + for n in priority_streamed_pieces.chain(natural_order_pieces) { if bf.get(n.get() as usize).map(|v| *v) == Some(true) { n_opt = Some(n); break; diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 0e332fc..1814d1a 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -9,7 +9,8 @@ use std::{ use anyhow::Context; use dashmap::DashMap; -use librqbit_core::lengths::ValidPieceIndex; +use itertools::Itertools; +use librqbit_core::lengths::{Lengths, ValidPieceIndex}; use tokio::io::{AsyncRead, AsyncSeek}; use tracing::{debug, trace}; @@ -19,10 +20,21 @@ use super::ManagedTorrentHandle; type StreamId = usize; +// Buffer either 1/10th of the file forward. +const PER_STREAM_BUF_PART: u64 = 10; +// Or 32 mb, whichever is larger +const PER_STREAM_BUF_MIN: u64 = 32 * 1024 * 1024; + +struct StreamState { + current_piece: ValidPieceIndex, + file_len: u64, + waker: Option, +} + #[derive(Default)] pub(crate) struct TorrentStreams { next_stream_id: AtomicUsize, - wakers_by_stream: DashMap, + streams: DashMap, } impl TorrentStreams { @@ -30,31 +42,83 @@ impl TorrentStreams { self.next_stream_id.fetch_add(1, Ordering::Relaxed) } - fn register_waker(&self, stream_id: StreamId, piece_id: ValidPieceIndex, waker: Waker) { - self.wakers_by_stream.insert(stream_id, (piece_id, waker)); + fn register_waker(&self, stream_id: StreamId, waker: Waker) { + if let Some(mut s) = self.streams.get_mut(&stream_id) { + let vm = s.value_mut(); + vm.waker = Some(waker); + } + } + + // Queue 1st, 2nd etc pieces from each stream in turn until they get 1/10th of the file . + pub(crate) fn iter_next_pieces( + &self, + lengths: &Lengths, + ) -> impl Iterator { + let all = self + .streams + .iter() + .map(|s| { + let remaining = (s.value().file_len + + lengths.piece_length(s.value().current_piece) as u64) + .div_ceil(PER_STREAM_BUF_PART) + .max(PER_STREAM_BUF_MIN); + (s.value().current_piece, remaining) + }) + .map(Some) + .collect_vec(); + + struct It { + all: Vec>, + lengths: Lengths, + } + + impl Iterator for It { + type Item = ValidPieceIndex; + + fn next(&mut self) -> Option { + for item in self.all.iter_mut() { + if let Some((p, remaining)) = item { + let y = *p; + let pl = self.lengths.piece_length(y); + *remaining = remaining.saturating_sub(pl as u64); + if *remaining == 0 { + *item = None; + } else if let Some(next_p) = self.lengths.validate_piece_index(y.get() + 1) + { + *item = Some((next_p, *remaining)) + } else { + *item = None; + } + return Some(y); + } + } + None + } + } + It { + all, + lengths: *lengths, + } } pub(crate) fn wake_streams_on_piece_completed(&self, piece_id: ValidPieceIndex) { - let mut woken = Vec::new(); - for w in self.wakers_by_stream.iter() { - if w.value().0 == piece_id { - trace!( - stream_id = *w.key(), - piece_id = piece_id.get(), - "waking stream" - ); - w.value().1.wake_by_ref(); - woken.push(*w.key()); + for mut w in self.streams.iter_mut() { + if w.value().current_piece == piece_id { + if let Some(waker) = w.value_mut().waker.take() { + trace!( + stream_id = *w.key(), + piece_id = piece_id.get(), + "waking stream" + ); + waker.wake(); + } } } - for w in woken { - self.wakers_by_stream.remove(&w); - } } fn drop_stream(&self, stream_id: StreamId) { trace!(stream_id, "dropping stream"); - self.wakers_by_stream.remove(&stream_id); + self.streams.remove(&stream_id); } } @@ -120,8 +184,10 @@ impl AsyncRead for FileStream { let piece_offset = abs_pos % dpl as u64; let piece_remaining = piece_len as u64 - piece_offset; - // queue N pieces after this if not yet - // TODO + // queue N pieces after this if not yet. The "if let" should never fail. + if let Some(mut s) = self.streams.streams.get_mut(&self.stream_id) { + s.value_mut().current_piece = piece_id; + } // if the piece is not there, register to wake when it is // check if we have the piece for real @@ -129,7 +195,7 @@ impl AsyncRead for FileStream { let have = ct.get_have_pieces()[piece_id.get() as usize]; if !have { self.streams - .register_waker(self.stream_id, piece_id, cx.waker().clone()); + .register_waker(self.stream_id, cx.waker().clone()); } have })); @@ -179,10 +245,7 @@ impl AsyncSeek for FileStream { ) -> std::io::Result<()> { let end_i64 = map_io_err!(TryInto::::try_into(self.file_len))?; let new_pos: i64 = match position { - SeekFrom::Start(s) => { - self.as_mut().position = s; - return Ok(()); - } + SeekFrom::Start(s) => map_io_err!(s.try_into())?, SeekFrom::End(e) => map_io_err!(TryInto::::try_into(self.file_len))? + e, SeekFrom::Current(o) => map_io_err!(TryInto::::try_into(self.position))? + o, }; @@ -240,16 +303,26 @@ impl ManagedTorrent { let (fd_len, fd_offset) = self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?; let streams = self.streams()?; - Ok(FileStream { + let first_piece = self.info().lengths.validate_piece_index(0).context("bug")?; + let s = FileStream { stream_id: streams.next_id(), - streams, + streams: streams.clone(), file_id, position: 0, file_len: fd_len, file_torrent_abs_offset: fd_offset, torrent: self, - }) + }; + streams.streams.insert( + s.stream_id, + StreamState { + current_piece: first_piece, + waker: None, + file_len: s.file_len, + }, + ); + Ok(s) } }