From 6a9578cc0be83a799fcc2403e470de5504b3c4b9 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 29 Apr 2024 18:26:36 +0100 Subject: [PATCH] Fix a bug --- crates/librqbit/src/http_api.rs | 5 ++- .../librqbit/src/torrent_state/streaming.rs | 36 +++++++++++++------ 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 2bde54e..776e4f5 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -14,7 +14,7 @@ use std::net::SocketAddr; use std::str::FromStr; use std::time::Duration; use tokio::io::AsyncSeekExt; -use tracing::{debug, info}; +use tracing::{debug, info, trace}; use axum::Router; @@ -167,6 +167,9 @@ impl HttpApi { let mut output_headers = HeaderMap::new(); output_headers.insert("Accept-Ranges", HeaderValue::from_static("bytes")); + let range_header = headers.get(http::header::RANGE); + trace!(torrent_id=idx, file_id=file_id, range=?range_header, "request for HTTP stream"); + if let Some(range) = headers.get(http::header::RANGE) { let offset: Option = range .to_str() diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index f53d288..efd4818 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -113,8 +113,13 @@ impl TorrentStreams { } } - let all = self.streams.iter().map(|s| s.queue(lengths)).collect(); - Interleave { all } + let mut all: Vec<_> = self.streams.iter().map(|s| s.queue(lengths)).collect(); + + // Shuffle to decrease determinism and make queueing fairer. + use rand::seq::SliceRandom; + all.shuffle(&mut rand::thread_rng()); + + Interleave { all: all.into() } } pub(crate) fn wake_streams_on_piece_completed( @@ -241,14 +246,8 @@ impl AsyncRead for FileStream { } ))); - self.as_mut().position += buf.len() as u64; + self.as_mut().advance(bytes_to_read as u64); tbuf.advance(bytes_to_read); - self.streams - .streams - .get_mut(&self.stream_id) - .unwrap() - .value_mut() - .position = self.position; Poll::Ready(Ok(())) } @@ -273,7 +272,8 @@ impl AsyncSeek for FileStream { )); } - self.as_mut().position = map_io_err!(new_pos.try_into())?; + self.as_mut().set_position(map_io_err!(new_pos.try_into())?); + trace!(stream_id = self.stream_id, position = self.position, "seek"); Ok(()) } @@ -357,6 +357,8 @@ impl ManagedTorrent { }, ); + debug!(stream_id = s.stream_id, file_id, "started stream"); + Ok(s) } } @@ -366,6 +368,20 @@ impl FileStream { self.position } + fn advance(&mut self, diff: u64) { + self.set_position(self.position + diff) + } + + fn set_position(&mut self, new_pos: u64) { + self.position = new_pos; + self.streams + .streams + .get_mut(&self.stream_id) + .unwrap() + .value_mut() + .position = new_pos; + } + pub fn len(&self) -> u64 { self.file_len }