From 1c9aa8ca72ea3586990e6f094a7be80d82634e49 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sat, 20 Jul 2024 20:03:14 +0200 Subject: [PATCH 1/3] Fist naive attempt to improve streaming at leat inform executor about blocking call --- crates/librqbit/src/http_api.rs | 2 +- .../librqbit/src/torrent_state/streaming.rs | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 57c4d8b..dbd8274 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -170,7 +170,7 @@ impl HttpApi { let range_header = headers.get(http::header::RANGE); trace!(torrent_id=idx, file_id=file_id, range=?range_header, "request for HTTP stream"); - if let Some(range) = headers.get(http::header::RANGE) { + if let Some(range) = range_header { let offset: Option = range .to_str() .ok() diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 48b4b3e..e96e966 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -12,7 +12,10 @@ use anyhow::Context; use dashmap::DashMap; use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex}; -use tokio::io::{AsyncRead, AsyncSeek}; +use tokio::{ + io::{AsyncRead, AsyncSeek}, + task::block_in_place, +}; use tracing::{debug, trace}; use crate::{file_info::FileInfo, storage::TorrentStorage, ManagedTorrent}; @@ -211,13 +214,13 @@ impl AsyncRead for FileStream { "will write bytes" ); - poll_try_io!(poll_try_io!(self.torrent.with_storage_and_file( - self.file_id, - |files, _fi| { - files.pread_exact(self.file_id, self.position, buf)?; - Ok::<_, anyhow::Error>(()) - } - ))); + poll_try_io!(poll_try_io!(block_in_place(|| { + self.torrent + .with_storage_and_file(self.file_id, |files, _fi| { + files.pread_exact(self.file_id, self.position, buf)?; + Ok::<_, anyhow::Error>(()) + }) + }))); self.as_mut().advance(bytes_to_read as u64); tbuf.advance(bytes_to_read); From fdacb7bc16ebbe0b17dbfa0a7396e4b1d5a869cc Mon Sep 17 00:00:00 2001 From: Ivan Date: Sun, 21 Jul 2024 15:08:23 +0200 Subject: [PATCH 2/3] Fix e2e streaming test --- crates/librqbit/src/tests/e2e_stream.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/tests/e2e_stream.rs b/crates/librqbit/src/tests/e2e_stream.rs index 97c65ea..770f4d1 100644 --- a/crates/librqbit/src/tests/e2e_stream.rs +++ b/crates/librqbit/src/tests/e2e_stream.rs @@ -1,6 +1,7 @@ use std::{net::SocketAddr, time::Duration}; use anyhow::Context; +use tempfile::TempDir; use tokio::{io::AsyncReadExt, time::timeout}; use tracing::info; @@ -22,7 +23,7 @@ async fn e2e_stream() -> anyhow::Result<()> { let orig_content = std::fs::read(files.path().join("0.data")).unwrap(); let server_session = Session::new_with_opts( - "/does-not-matter".into(), + files.path().into(), crate::SessionOptions { disable_dht: true, persistence: false, @@ -63,8 +64,10 @@ async fn e2e_stream() -> anyhow::Result<()> { server_session.tcp_listen_port().unwrap(), ); + let client_dir = TempDir::with_prefix("test_e2e_stream_client")?; + let client_session = Session::new_with_opts( - "/does-not-matter".into(), + client_dir.path().into(), crate::SessionOptions { disable_dht: true, persistence: false, From 7e8935388c450d131e6efd6f29ad351dbb9b88ae Mon Sep 17 00:00:00 2001 From: Ivan Date: Sat, 27 Jul 2024 08:17:04 +0200 Subject: [PATCH 3/3] Use BlockingSpawner --- crates/librqbit/src/torrent_state/streaming.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index e96e966..19009eb 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -12,13 +12,12 @@ use anyhow::Context; use dashmap::DashMap; use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex}; -use tokio::{ - io::{AsyncRead, AsyncSeek}, - task::block_in_place, -}; +use tokio::io::{AsyncRead, AsyncSeek}; use tracing::{debug, trace}; -use crate::{file_info::FileInfo, storage::TorrentStorage, ManagedTorrent}; +use crate::{ + file_info::FileInfo, spawn_utils::BlockingSpawner, storage::TorrentStorage, ManagedTorrent, +}; use super::ManagedTorrentHandle; @@ -139,6 +138,8 @@ pub struct FileStream { // file params file_len: u64, file_torrent_abs_offset: u64, + + spawner: BlockingSpawner, } macro_rules! map_io_err { @@ -214,7 +215,7 @@ impl AsyncRead for FileStream { "will write bytes" ); - poll_try_io!(poll_try_io!(block_in_place(|| { + poll_try_io!(poll_try_io!(self.spawner.spawn_block_in_place(|| { self.torrent .with_storage_and_file(self.file_id, |files, _fi| { files.pread_exact(self.file_id, self.position, buf)?; @@ -327,6 +328,7 @@ impl ManagedTorrent { file_len: fd_len, file_torrent_abs_offset: fd_offset, torrent: self, + spawner: BlockingSpawner::default(), }; s.torrent.maybe_reconnect_needed_peers_for_file(file_id); streams.streams.insert(