diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 9a429bf..1f4bfeb 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -76,32 +76,6 @@ where } } -macro_rules! read_one { - ($conn:ident, $read_buf:ident, $read_so_far:ident, $rwtimeout:ident) => {{ - let (extended, size) = loop { - match MessageBorrowed::deserialize(&$read_buf[..$read_so_far]) { - Ok((msg, size)) => break (msg, size), - Err(MessageDeserializeError::NotEnoughData(d, _)) => { - if $read_buf.len() < $read_so_far + d { - $read_buf.reserve(d); - $read_buf.resize($read_buf.capacity(), 0); - } - - let size = with_timeout($rwtimeout, $conn.read(&mut $read_buf[$read_so_far..])) - .await - .context("error reading from peer")?; - if size == 0 { - anyhow::bail!("disconnected while reading, read so far: {}", $read_so_far) - } - $read_so_far += size; - } - Err(e) => return Err(e.into()), - } - }; - (extended, size) - }}; -} - impl PeerConnection { pub fn new( addr: SocketAddr, @@ -354,7 +328,31 @@ impl PeerConnection { let reader = async move { loop { - let (message, size) = read_one!(read_half, read_buf, read_so_far, rwtimeout); + let (message, size) = loop { + match MessageBorrowed::deserialize(&read_buf[..read_so_far]) { + Ok((msg, size)) => break (msg, size), + Err(MessageDeserializeError::NotEnoughData(d, _)) => { + if read_buf.len() < read_so_far + d { + read_buf.reserve(d); + read_buf.resize(read_buf.capacity(), 0); + } + let size = with_timeout( + rwtimeout, + read_half.read(&mut read_buf[read_so_far..]), + ) + .await + .context("error reading from peer")?; + if size == 0 { + anyhow::bail!( + "disconnected while reading, read so far: {}", + read_so_far + ) + } + read_so_far += size; + } + Err(e) => return Err(e.into()), + } + }; trace!("received: {:?}", &message); if let Message::Extended(ExtendedMessage::Handshake(h)) = &message { @@ -378,7 +376,7 @@ impl PeerConnection { Ok::<_, anyhow::Error>(()) }; - let r = tokio::select! { + tokio::select! { r = reader => { trace!("reader is done, exiting"); r @@ -387,7 +385,6 @@ impl PeerConnection { trace!("writer is done, exiting"); r } - }; - r + } } } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index c825988..46642aa 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -11,7 +11,7 @@ use std::{ use anyhow::{bail, Context}; use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; -use buffers::{ByteBufT, ByteString}; +use buffers::{ByteBuf, ByteBufT, ByteString}; use clone_to_owned::CloneToOwned; use dht::{ Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream, @@ -22,7 +22,9 @@ use librqbit_core::{ magnet::Magnet, peer_id::generate_peer_id, spawn_utils::spawn_with_cancel, - torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned}, + torrent_metainfo::{ + torrent_from_bytes as bencode_torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned, + }, }; use parking_lot::RwLock; use peer_binary_protocol::{Handshake, PIECE_MESSAGE_DEFAULT_LEN}; @@ -49,6 +51,14 @@ pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; pub type TorrentId = usize; +fn torrent_from_bytes(bytes: &[u8]) -> anyhow::Result { + debug!( + "all fields in torrent: {:#?}", + bencode::dyn_from_bytes::(bytes) + ); + bencode_torrent_from_bytes(bytes) +} + #[derive(Default)] pub struct SessionDatabase { next_id: TorrentId,