diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 716c4b2..436ccc5 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -7,10 +7,7 @@ use futures::Stream; use http::StatusCode; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use serde::{Deserialize, Serialize}; -use tokio::{ - io::{AsyncRead, AsyncSeek}, - sync::mpsc::UnboundedSender, -}; +use tokio::sync::mpsc::UnboundedSender; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; use tracing::warn; @@ -21,7 +18,7 @@ use crate::{ }, torrent_state::{ peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}, - ManagedTorrentHandle, + FileStream, ManagedTorrentHandle, }, tracing_subscriber_config_utils::LineBroadcast, }; @@ -243,7 +240,7 @@ impl Api { Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?) } - pub fn api_stream(&self, idx: TorrentId, file_id: usize) -> Result { + pub fn api_stream(&self, idx: TorrentId, file_id: usize) -> Result { let mgr = self.mgr_handle(idx)?; Ok(mgr.stream(file_id)?) } diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index d33a5ad..0214f6f 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -5,6 +5,7 @@ use axum::response::IntoResponse; use axum::routing::{get, post}; use futures::future::BoxFuture; use futures::{FutureExt, TryStreamExt}; +use http::{HeaderMap, HeaderValue, StatusCode}; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -162,6 +163,13 @@ impl HttpApi { headers: http::HeaderMap, ) -> Result { let mut stream = state.api_stream(idx, file_id)?; + + dbg!(&headers); + + let mut status = StatusCode::OK; + let mut output_headers = HeaderMap::new(); + output_headers.insert("Accept-Ranges", HeaderValue::from_static("bytes")); + if let Some(range) = headers.get(http::header::RANGE) { let offset: Option = range .to_str() @@ -170,15 +178,38 @@ impl HttpApi { .and_then(|s| s.strip_suffix('-')) .and_then(|s| s.parse().ok()); if let Some(offset) = offset { + status = StatusCode::PARTIAL_CONTENT; + info!(offset, "range request offset"); stream .seek(SeekFrom::Start(offset)) .await .context("error seeking")?; + + output_headers.insert( + http::header::CONTENT_LENGTH, + HeaderValue::from_str(&format!("{}", stream.len() - stream.position())) + .context("bug")?, + ); + output_headers.insert( + http::header::CONTENT_RANGE, + HeaderValue::from_str(&format!( + "bytes {}-{}/{}", + stream.position(), + stream.len().saturating_sub(1), + stream.len() + )) + .context("bug")?, + ); + } else { + output_headers.insert( + http::header::CONTENT_LENGTH, + HeaderValue::from_str(&format!("{}", stream.len())).context("bug")?, + ); } } let s = tokio_util::io::ReaderStream::new(stream); - Ok(axum::body::Body::from_stream(s)) + Ok((status, (output_headers, axum::body::Body::from_stream(s)))) } async fn torrent_action_pause( diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index e203236..752beb4 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -42,6 +42,7 @@ use initializing::TorrentStateInitializing; use self::paused::TorrentStatePaused; pub use self::stats::{TorrentStats, TorrentStatsState}; +pub use self::streaming::FileStream; pub enum ManagedTorrentState { Initializing(Arc), diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 27836c9..0e332fc 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -11,6 +11,7 @@ use anyhow::Context; use dashmap::DashMap; use librqbit_core::lengths::ValidPieceIndex; use tokio::io::{AsyncRead, AsyncSeek}; +use tracing::{debug, trace}; use crate::{opened_file::OpenedFile, ManagedTorrent}; @@ -37,6 +38,11 @@ impl TorrentStreams { let mut woken = Vec::new(); for w in self.wakers_by_stream.iter() { if w.value().0 == piece_id { + trace!( + stream_id = *w.key(), + piece_id = piece_id.get(), + "waking stream" + ); w.value().1.wake_by_ref(); woken.push(*w.key()); } @@ -47,11 +53,12 @@ impl TorrentStreams { } fn drop_stream(&self, stream_id: StreamId) { + trace!(stream_id, "dropping stream"); self.wakers_by_stream.remove(&stream_id); } } -struct FileStream { +pub struct FileStream { torrent: ManagedTorrentHandle, streams: Arc, stream_id: usize, @@ -74,7 +81,10 @@ macro_rules! poll_try_io { let e = map_io_err!($e); match e { Ok(r) => r, - Err(e) => return Poll::Ready(Err(e)), + Err(e) => { + debug!("stream error {e:?}"); + return Poll::Ready(Err(e)); + } } }}; } @@ -83,10 +93,15 @@ impl AsyncRead for FileStream { fn poll_read( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { // if the file is over, return 0 if self.position == self.file_len { + trace!( + stream_id = self.stream_id, + file_id = self.file_id, + "stream completed, EOF" + ); return Poll::Ready(Ok(())); } @@ -119,11 +134,12 @@ impl AsyncRead for FileStream { have })); if !have { + trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %piece_id, "poll pending, not have"); return Poll::Pending; } // actually stream the piece - let buf = buf.initialize_unfilled(); + let buf = tbuf.initialize_unfilled(); let file_remaining = self.file_len - self.position; let bytes_to_read: usize = poll_try_io!((piece_len as u64) .min(buf.len() as u64) @@ -132,6 +148,12 @@ impl AsyncRead for FileStream { .try_into()); let buf = &mut buf[..bytes_to_read]; + trace!( + buflen = buf.len(), + stream_id = self.stream_id, + file_id = self.file_id, + "will write bytes" + ); poll_try_io!(poll_try_io!(self.torrent.with_opened_file( self.file_id, @@ -144,6 +166,7 @@ impl AsyncRead for FileStream { ))); self.as_mut().position += buf.len() as u64; + tbuf.advance(bytes_to_read); Poll::Ready(Ok(())) } @@ -213,7 +236,7 @@ impl ManagedTorrent { }) } - pub fn stream(self: Arc, file_id: usize) -> anyhow::Result { + pub fn stream(self: Arc, file_id: usize) -> anyhow::Result { let (fd_len, fd_offset) = self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?; let streams = self.streams()?; @@ -229,3 +252,13 @@ impl ManagedTorrent { }) } } + +impl FileStream { + pub fn position(&self) -> u64 { + self.position + } + + pub fn len(&self) -> u64 { + self.file_len + } +}