diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index b965946..7fe2897 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -6,7 +6,9 @@ use futures::{stream::FuturesUnordered, Stream, StreamExt}; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use log::debug; -use crate::{peer_connection::PeerConnectionOptions, peer_info_reader}; +use crate::{ + peer_connection::PeerConnectionOptions, peer_info_reader, spawn_utils::BlockingSpawner, +}; use librqbit_core::id20::Id20; #[derive(Debug)] @@ -45,6 +47,7 @@ pub async fn read_metainfo_from_peer_receiver + Unp peer_id, info_hash, peer_connection_options, + BlockingSpawner::new(true), ) .await .with_context(|| format!("error reading metainfo from {}", addr)); diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 6914d16..afadad4 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -12,6 +12,8 @@ use peer_binary_protocol::{ }; use tokio::time::timeout; +use crate::spawn_utils::BlockingSpawner; + pub trait PeerConnectionHandler { fn get_have_bytes(&self) -> u64; fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> Option; @@ -43,6 +45,7 @@ pub struct PeerConnection { info_hash: Id20, peer_id: Id20, options: PeerConnectionOptions, + spawner: BlockingSpawner, } // async fn read_one<'a, R: AsyncReadExt + Unpin>( @@ -107,12 +110,14 @@ impl PeerConnection { peer_id: Id20, handler: H, options: Option, + spawner: BlockingSpawner, ) -> Self { PeerConnection { handler, addr, info_hash, peer_id, + spawner, options: options.unwrap_or_default(), } } @@ -243,12 +248,16 @@ impl PeerConnection { WriterRequest::ReadChunkRequest(chunk) => { // this whole section is an optimization write_buf.resize(PIECE_MESSAGE_DEFAULT_LEN, 0); - let preamble_len = serialize_piece_preamble(&chunk, &mut write_buf); + let preamble_len = serialize_piece_preamble(chunk, &mut write_buf); let full_len = preamble_len + chunk.size as usize; write_buf.resize(full_len, 0); - self.handler - .read_chunk(chunk, &mut write_buf[preamble_len..]) + self.spawner + .spawn_block_in_place(|| { + self.handler + .read_chunk(chunk, &mut write_buf[preamble_len..]) + }) .with_context(|| format!("error reading chunk {:?}", chunk))?; + uploaded_add = Some(chunk.size); full_len } diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index 6bb2f9b..8a30cc1 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -17,8 +17,11 @@ use peer_binary_protocol::{ use sha1w::{ISha1, Sha1}; use tokio::sync::mpsc::UnboundedSender; -use crate::peer_connection::{ - PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, +use crate::{ + peer_connection::{ + PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, + }, + spawn_utils::BlockingSpawner, }; pub async fn read_metainfo_from_peer( @@ -26,6 +29,7 @@ pub async fn read_metainfo_from_peer( peer_id: Id20, info_hash: Id20, peer_connection_options: Option, + spawner: BlockingSpawner, ) -> anyhow::Result> { let (result_tx, result_rx) = tokio::sync::oneshot::channel::>>(); @@ -37,8 +41,14 @@ pub async fn read_metainfo_from_peer( result_tx: Mutex::new(Some(result_tx)), locked: RwLock::new(None), }; - let connection = - PeerConnection::new(addr, info_hash, peer_id, handler, peer_connection_options); + let connection = PeerConnection::new( + addr, + info_hash, + peer_id, + handler, + peer_connection_options, + spawner, + ); let result_reader = async move { result_rx.await? }; let connection_runner = async move { connection.manage_peer(writer_rx).await }; @@ -219,6 +229,8 @@ mod tests { use librqbit_core::id20::Id20; use librqbit_core::peer_id::generate_peer_id; + use crate::spawn_utils::BlockingSpawner; + use super::read_metainfo_from_peer; static LOG_INIT: Once = std::sync::Once::new(); @@ -234,8 +246,10 @@ mod tests { let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap(); let peer_id = generate_peer_id(); let info_hash = Id20::from_str("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap(); - dbg!(read_metainfo_from_peer(addr, peer_id, info_hash, None) - .await - .unwrap()); + dbg!( + read_metainfo_from_peer(addr, peer_id, info_hash, None, BlockingSpawner::new(true)) + .await + .unwrap() + ); } } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index e26d270..64c87fd 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -235,8 +235,6 @@ pub struct TorrentState { lengths: Lengths, needed: u64, stats: AtomicStats, - spawner: BlockingSpawner, - options: TorrentStateOptions, peer_semaphore: Semaphore, @@ -274,7 +272,6 @@ impl TorrentState { }, needed: needed_bytes, lengths, - spawner, options, peer_semaphore: Semaphore::new(128), @@ -299,7 +296,7 @@ impl TorrentState { let handler = PeerHandler { addr, state: state.clone(), - spawner: state.spawner, + spawner, }; let options = PeerConnectionOptions { connect_timeout: state.options.peer_connect_timeout, @@ -311,6 +308,7 @@ impl TorrentState { state.peer_id, handler, Some(options), + spawner, ); spawn(format!("manage_peer({})", addr), async move { if let Err(e) = peer_connection.manage_peer(out_rx).await { diff --git a/crates/librqbit/src/tracker_comms.rs b/crates/librqbit/src/tracker_comms.rs index e3b3987..490b46b 100644 --- a/crates/librqbit/src/tracker_comms.rs +++ b/crates/librqbit/src/tracker_comms.rs @@ -45,6 +45,7 @@ pub struct DictPeer<'a> { #[serde(deserialize_with = "deserialize_ip_string")] ip: IpAddr, #[serde(borrow)] + #[allow(dead_code)] peer_id: Option>, port: u16, }