diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 57d02d3..716c4b2 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -7,7 +7,10 @@ use futures::Stream; use http::StatusCode; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::UnboundedSender; +use tokio::{ + io::{AsyncRead, AsyncSeek}, + sync::mpsc::UnboundedSender, +}; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; use tracing::warn; @@ -239,6 +242,11 @@ impl Api { let mgr = self.mgr_handle(idx)?; Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?) } + + pub fn api_stream(&self, idx: TorrentId, file_id: usize) -> Result { + let mgr = self.mgr_handle(idx)?; + Ok(mgr.stream(file_id)?) + } } #[derive(Serialize)] diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index ccb0a5e..d33a5ad 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -8,9 +8,11 @@ use futures::{FutureExt, TryStreamExt}; use itertools::Itertools; use serde::{Deserialize, Serialize}; +use std::io::SeekFrom; use std::net::SocketAddr; use std::str::FromStr; use std::time::Duration; +use tokio::io::AsyncSeekExt; use tracing::{debug, info}; use axum::Router; @@ -154,6 +156,31 @@ impl HttpApi { state.api_peer_stats(idx, filter).map(axum::Json) } + async fn torrent_stream_file( + State(state): State, + Path((idx, file_id)): Path<(usize, usize)>, + headers: http::HeaderMap, + ) -> Result { + let mut stream = state.api_stream(idx, file_id)?; + if let Some(range) = headers.get(http::header::RANGE) { + let offset: Option = range + .to_str() + .ok() + .and_then(|s| s.strip_prefix("bytes=")) + .and_then(|s| s.strip_suffix('-')) + .and_then(|s| s.parse().ok()); + if let Some(offset) = offset { + stream + .seek(SeekFrom::Start(offset)) + .await + .context("error seeking")?; + } + } + + let s = tokio_util::io::ReaderStream::new(stream); + Ok(axum::body::Body::from_stream(s)) + } + async fn torrent_action_pause( State(state): State, Path(idx): Path, @@ -223,7 +250,8 @@ impl HttpApi { .route("/torrents/:id/haves", get(torrent_haves)) .route("/torrents/:id/stats", get(torrent_stats_v0)) .route("/torrents/:id/stats/v1", get(torrent_stats_v1)) - .route("/torrents/:id/peer_stats", get(peer_stats)); + .route("/torrents/:id/peer_stats", get(peer_stats)) + .route("/torrents/:id/stream/:file_id", get(torrent_stream_file)); if !self.opts.read_only { app = app