diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index e96e966..19009eb 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -12,13 +12,12 @@ use anyhow::Context; use dashmap::DashMap; use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex}; -use tokio::{ - io::{AsyncRead, AsyncSeek}, - task::block_in_place, -}; +use tokio::io::{AsyncRead, AsyncSeek}; use tracing::{debug, trace}; -use crate::{file_info::FileInfo, storage::TorrentStorage, ManagedTorrent}; +use crate::{ + file_info::FileInfo, spawn_utils::BlockingSpawner, storage::TorrentStorage, ManagedTorrent, +}; use super::ManagedTorrentHandle; @@ -139,6 +138,8 @@ pub struct FileStream { // file params file_len: u64, file_torrent_abs_offset: u64, + + spawner: BlockingSpawner, } macro_rules! map_io_err { @@ -214,7 +215,7 @@ impl AsyncRead for FileStream { "will write bytes" ); - poll_try_io!(poll_try_io!(block_in_place(|| { + poll_try_io!(poll_try_io!(self.spawner.spawn_block_in_place(|| { self.torrent .with_storage_and_file(self.file_id, |files, _fi| { files.pread_exact(self.file_id, self.position, buf)?; @@ -327,6 +328,7 @@ impl ManagedTorrent { file_len: fd_len, file_torrent_abs_offset: fd_offset, torrent: self, + spawner: BlockingSpawner::default(), }; s.torrent.maybe_reconnect_needed_peers_for_file(file_id); streams.streams.insert(