Merge pull request #157 from izderadicka/naive_streaming_fix

Add spawn blocking to FileStream
This commit is contained in:
Igor Katson 2024-07-27 07:58:38 +01:00 committed by GitHub
commit 76083be286
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 19 additions and 11 deletions

View file

@ -170,7 +170,7 @@ impl HttpApi {
let range_header = headers.get(http::header::RANGE); let range_header = headers.get(http::header::RANGE);
trace!(torrent_id=idx, file_id=file_id, range=?range_header, "request for HTTP stream"); trace!(torrent_id=idx, file_id=file_id, range=?range_header, "request for HTTP stream");
if let Some(range) = headers.get(http::header::RANGE) { if let Some(range) = range_header {
let offset: Option<u64> = range let offset: Option<u64> = range
.to_str() .to_str()
.ok() .ok()

View file

@ -1,6 +1,7 @@
use std::{net::SocketAddr, time::Duration}; use std::{net::SocketAddr, time::Duration};
use anyhow::Context; use anyhow::Context;
use tempfile::TempDir;
use tokio::{io::AsyncReadExt, time::timeout}; use tokio::{io::AsyncReadExt, time::timeout};
use tracing::info; use tracing::info;
@ -22,7 +23,7 @@ async fn e2e_stream() -> anyhow::Result<()> {
let orig_content = std::fs::read(files.path().join("0.data")).unwrap(); let orig_content = std::fs::read(files.path().join("0.data")).unwrap();
let server_session = Session::new_with_opts( let server_session = Session::new_with_opts(
"/does-not-matter".into(), files.path().into(),
crate::SessionOptions { crate::SessionOptions {
disable_dht: true, disable_dht: true,
persistence: false, persistence: false,
@ -63,8 +64,10 @@ async fn e2e_stream() -> anyhow::Result<()> {
server_session.tcp_listen_port().unwrap(), server_session.tcp_listen_port().unwrap(),
); );
let client_dir = TempDir::with_prefix("test_e2e_stream_client")?;
let client_session = Session::new_with_opts( let client_session = Session::new_with_opts(
"/does-not-matter".into(), client_dir.path().into(),
crate::SessionOptions { crate::SessionOptions {
disable_dht: true, disable_dht: true,
persistence: false, persistence: false,

View file

@ -15,7 +15,9 @@ use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex};
use tokio::io::{AsyncRead, AsyncSeek}; use tokio::io::{AsyncRead, AsyncSeek};
use tracing::{debug, trace}; use tracing::{debug, trace};
use crate::{file_info::FileInfo, storage::TorrentStorage, ManagedTorrent}; use crate::{
file_info::FileInfo, spawn_utils::BlockingSpawner, storage::TorrentStorage, ManagedTorrent,
};
use super::ManagedTorrentHandle; use super::ManagedTorrentHandle;
@ -136,6 +138,8 @@ pub struct FileStream {
// file params // file params
file_len: u64, file_len: u64,
file_torrent_abs_offset: u64, file_torrent_abs_offset: u64,
spawner: BlockingSpawner,
} }
macro_rules! map_io_err { macro_rules! map_io_err {
@ -211,13 +215,13 @@ impl AsyncRead for FileStream {
"will write bytes" "will write bytes"
); );
poll_try_io!(poll_try_io!(self.torrent.with_storage_and_file( poll_try_io!(poll_try_io!(self.spawner.spawn_block_in_place(|| {
self.file_id, self.torrent
|files, _fi| { .with_storage_and_file(self.file_id, |files, _fi| {
files.pread_exact(self.file_id, self.position, buf)?; files.pread_exact(self.file_id, self.position, buf)?;
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
} })
))); })));
self.as_mut().advance(bytes_to_read as u64); self.as_mut().advance(bytes_to_read as u64);
tbuf.advance(bytes_to_read); tbuf.advance(bytes_to_read);
@ -324,6 +328,7 @@ impl ManagedTorrent {
file_len: fd_len, file_len: fd_len,
file_torrent_abs_offset: fd_offset, file_torrent_abs_offset: fd_offset,
torrent: self, torrent: self,
spawner: BlockingSpawner::default(),
}; };
s.torrent.maybe_reconnect_needed_peers_for_file(file_id); s.torrent.maybe_reconnect_needed_peers_for_file(file_id);
streams.streams.insert( streams.streams.insert(