Remove "read_one" macro

This commit is contained in:
Igor Katson 2023-12-29 18:54:08 -05:00
parent 396bacff0c
commit 09252c0397
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 39 additions and 32 deletions

View file

@ -76,32 +76,6 @@ where
}
}
macro_rules! read_one {
($conn:ident, $read_buf:ident, $read_so_far:ident, $rwtimeout:ident) => {{
let (extended, size) = loop {
match MessageBorrowed::deserialize(&$read_buf[..$read_so_far]) {
Ok((msg, size)) => break (msg, size),
Err(MessageDeserializeError::NotEnoughData(d, _)) => {
if $read_buf.len() < $read_so_far + d {
$read_buf.reserve(d);
$read_buf.resize($read_buf.capacity(), 0);
}
let size = with_timeout($rwtimeout, $conn.read(&mut $read_buf[$read_so_far..]))
.await
.context("error reading from peer")?;
if size == 0 {
anyhow::bail!("disconnected while reading, read so far: {}", $read_so_far)
}
$read_so_far += size;
}
Err(e) => return Err(e.into()),
}
};
(extended, size)
}};
}
impl<H: PeerConnectionHandler> PeerConnection<H> {
pub fn new(
addr: SocketAddr,
@ -354,7 +328,31 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let reader = async move {
loop {
let (message, size) = read_one!(read_half, read_buf, read_so_far, rwtimeout);
let (message, size) = loop {
match MessageBorrowed::deserialize(&read_buf[..read_so_far]) {
Ok((msg, size)) => break (msg, size),
Err(MessageDeserializeError::NotEnoughData(d, _)) => {
if read_buf.len() < read_so_far + d {
read_buf.reserve(d);
read_buf.resize(read_buf.capacity(), 0);
}
let size = with_timeout(
rwtimeout,
read_half.read(&mut read_buf[read_so_far..]),
)
.await
.context("error reading from peer")?;
if size == 0 {
anyhow::bail!(
"disconnected while reading, read so far: {}",
read_so_far
)
}
read_so_far += size;
}
Err(e) => return Err(e.into()),
}
};
trace!("received: {:?}", &message);
if let Message::Extended(ExtendedMessage::Handshake(h)) = &message {
@ -378,7 +376,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
Ok::<_, anyhow::Error>(())
};
let r = tokio::select! {
tokio::select! {
r = reader => {
trace!("reader is done, exiting");
r
@ -387,7 +385,6 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
trace!("writer is done, exiting");
r
}
};
r
}
}
}

View file

@ -11,7 +11,7 @@ use std::{
use anyhow::{bail, Context};
use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::{ByteBufT, ByteString};
use buffers::{ByteBuf, ByteBufT, ByteString};
use clone_to_owned::CloneToOwned;
use dht::{
Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream,
@ -22,7 +22,9 @@ use librqbit_core::{
magnet::Magnet,
peer_id::generate_peer_id,
spawn_utils::spawn_with_cancel,
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned},
torrent_metainfo::{
torrent_from_bytes as bencode_torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned,
},
};
use parking_lot::RwLock;
use peer_binary_protocol::{Handshake, PIECE_MESSAGE_DEFAULT_LEN};
@ -49,6 +51,14 @@ pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
pub type TorrentId = usize;
fn torrent_from_bytes(bytes: &[u8]) -> anyhow::Result<TorrentMetaV1Owned> {
debug!(
"all fields in torrent: {:#?}",
bencode::dyn_from_bytes::<ByteBuf>(bytes)
);
bencode_torrent_from_bytes(bytes)
}
#[derive(Default)]
pub struct SessionDatabase {
next_id: TorrentId,