diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 280c7d6..e1dff8c 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -170,7 +170,7 @@ impl HttpApi { let range_header = headers.get(http::header::RANGE); trace!(torrent_id=idx, file_id=file_id, range=?range_header, "request for HTTP stream"); - if let Some(range) = headers.get(http::header::RANGE) { + if let Some(range) = range_header { let offset: Option = range .to_str() .ok() diff --git a/crates/librqbit/src/tests/e2e_stream.rs b/crates/librqbit/src/tests/e2e_stream.rs index 97c65ea..770f4d1 100644 --- a/crates/librqbit/src/tests/e2e_stream.rs +++ b/crates/librqbit/src/tests/e2e_stream.rs @@ -1,6 +1,7 @@ use std::{net::SocketAddr, time::Duration}; use anyhow::Context; +use tempfile::TempDir; use tokio::{io::AsyncReadExt, time::timeout}; use tracing::info; @@ -22,7 +23,7 @@ async fn e2e_stream() -> anyhow::Result<()> { let orig_content = std::fs::read(files.path().join("0.data")).unwrap(); let server_session = Session::new_with_opts( - "/does-not-matter".into(), + files.path().into(), crate::SessionOptions { disable_dht: true, persistence: false, @@ -63,8 +64,10 @@ async fn e2e_stream() -> anyhow::Result<()> { server_session.tcp_listen_port().unwrap(), ); + let client_dir = TempDir::with_prefix("test_e2e_stream_client")?; + let client_session = Session::new_with_opts( - "/does-not-matter".into(), + client_dir.path().into(), crate::SessionOptions { disable_dht: true, persistence: false, diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 48b4b3e..19009eb 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -15,7 +15,9 @@ use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex}; use tokio::io::{AsyncRead, AsyncSeek}; use tracing::{debug, trace}; -use crate::{file_info::FileInfo, storage::TorrentStorage, ManagedTorrent}; +use crate::{ + file_info::FileInfo, spawn_utils::BlockingSpawner, storage::TorrentStorage, ManagedTorrent, +}; use super::ManagedTorrentHandle; @@ -136,6 +138,8 @@ pub struct FileStream { // file params file_len: u64, file_torrent_abs_offset: u64, + + spawner: BlockingSpawner, } macro_rules! map_io_err { @@ -211,13 +215,13 @@ impl AsyncRead for FileStream { "will write bytes" ); - poll_try_io!(poll_try_io!(self.torrent.with_storage_and_file( - self.file_id, - |files, _fi| { - files.pread_exact(self.file_id, self.position, buf)?; - Ok::<_, anyhow::Error>(()) - } - ))); + poll_try_io!(poll_try_io!(self.spawner.spawn_block_in_place(|| { + self.torrent + .with_storage_and_file(self.file_id, |files, _fi| { + files.pread_exact(self.file_id, self.position, buf)?; + Ok::<_, anyhow::Error>(()) + }) + }))); self.as_mut().advance(bytes_to_read as u64); tbuf.advance(bytes_to_read); @@ -324,6 +328,7 @@ impl ManagedTorrent { file_len: fd_len, file_torrent_abs_offset: fd_offset, torrent: self, + spawner: BlockingSpawner::default(), }; s.torrent.maybe_reconnect_needed_peers_for_file(file_id); streams.streams.insert(