From bbc951733f46f255cf9596fc71171db284982bfa Mon Sep 17 00:00:00 2001 From: Ivan Date: Sat, 24 Aug 2024 18:18:07 +0200 Subject: [PATCH 1/8] PEX: Announce support and message definition --- crates/librqbit/src/peer_connection.rs | 15 +- crates/librqbit/src/read_buf.rs | 2 +- crates/librqbit/src/torrent_state/live/mod.rs | 14 +- .../src/extended/handshake.rs | 35 +++-- .../peer_binary_protocol/src/extended/mod.rs | 27 +++- .../src/extended/ut_pex.rs | 143 ++++++++++++++++++ crates/peer_binary_protocol/src/lib.rs | 12 +- 7 files changed, 213 insertions(+), 35 deletions(-) create mode 100644 crates/peer_binary_protocol/src/extended/ut_pex.rs diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 8401ae8..6d4ac1e 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -14,10 +14,7 @@ use librqbit_core::{ }; use parking_lot::RwLock; use peer_binary_protocol::{ - extended::{ - handshake::{ExtendedHandshake, YourIP}, - ExtendedMessage, - }, + extended::{handshake::ExtendedHandshake, ExtendedMessage}, serialize_piece_preamble, Handshake, Message, MessageOwned, PIECE_MESSAGE_DEFAULT_LEN, }; use serde::{Deserialize, Serialize}; @@ -248,14 +245,14 @@ impl PeerConnection { let supports_extended = handshake_supports_extended; if supports_extended { - let your_ip = self.addr.ip(); let mut my_extended = ExtendedHandshake::new(); - my_extended.yourip = Some(YourIP(your_ip)); self.handler .update_my_extended_handshake(&mut my_extended)?; let my_extended = Message::Extended(ExtendedMessage::Handshake(my_extended)); trace!("sending extended handshake: {:?}", &my_extended); - my_extended.serialize(&mut write_buf, &|| None).unwrap(); + my_extended + .serialize(&mut write_buf, &|| Default::default()) + .unwrap(); with_timeout(rwtimeout, conn.write_all(&write_buf)) .await .context("error writing extended handshake")?; @@ -318,7 +315,8 @@ impl PeerConnection { extended_handshake_ref .read() .as_ref() - .and_then(|e| e.ut_metadata()) + .map(|e| e.peer_extended_messages()) + .unwrap_or_default() })?, WriterRequest::ReadChunkRequest(chunk) => { #[allow(unused_mut)] @@ -397,7 +395,6 @@ impl PeerConnection { if let Message::Extended(ExtendedMessage::Handshake(h)) = &message { *extended_handshake_ref.write() = Some(h.clone_to_owned(None)); self.handler.on_extended_handshake(h)?; - trace!("remembered extended handshake for future serializing"); } else { self.handler .on_received_message(message) diff --git a/crates/librqbit/src/read_buf.rs b/crates/librqbit/src/read_buf.rs index 2d57007..4b1428e 100644 --- a/crates/librqbit/src/read_buf.rs +++ b/crates/librqbit/src/read_buf.rs @@ -57,7 +57,7 @@ impl ReadBuf { anyhow::bail!("peer disconnected while reading handshake"); } let (h, size) = Handshake::deserialize(&self.buf[..self.filled]) - .map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?; + .map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?} hadshake data {:?}", e, &self.buf[..self.filled.min(19)]))?; self.processed = size; Ok(h) } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 385dd56..ad5c66b 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -67,7 +67,7 @@ use librqbit_core::{ }; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use peer_binary_protocol::{ - extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage}, + extended::{handshake::{ExtendedHandshake, YourIP}, ut_metadata::UtMetadata, ExtendedMessage}, Handshake, Message, MessageOwned, Piece, Request, }; use peers::stats::atomic::AggregatePeerStatsAtomic; @@ -877,6 +877,9 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { self.send_metadata_piece(metadata_piece_id) .with_context(|| format!("error sending metadata piece {metadata_piece_id}"))?; } + Message::Extended(ExtendedMessage::UtPex(pex)) => { + trace!("received ut_pex: {:?} added peers v4: {:?}", pex, pex.added_peers().unwrap().collect::>()); + } message => { warn!("received unsupported message {:?}, ignoring", message); } @@ -887,7 +890,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> anyhow::Result { let g = self.state.lock_read("serialize_bitfield_message_to_buf"); let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_bytes())); - let len = msg.serialize(buf, &|| None)?; + let len = msg.serialize(buf, &|| Default::default())?; trace!("sending: {:?}, length={}", &msg, len); Ok(len) } @@ -914,7 +917,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { self.state.file_ops().read_chunk(self.addr, chunk, buf) } - fn on_extended_handshake(&self, _: &ExtendedHandshake) -> anyhow::Result<()> { + fn on_extended_handshake(&self, hs: &ExtendedHandshake) -> anyhow::Result<()> { + if let Some(peer_pex_msg_id) = hs.ut_pex() { + trace!("peer supports pex at {peer_pex_msg_id}"); + } Ok(()) } @@ -937,6 +943,8 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { &self, handshake: &mut ExtendedHandshake, ) -> anyhow::Result<()> { + let your_ip = self.addr.ip(); + handshake.yourip = Some(YourIP(your_ip)); let info_bytes = &self.state.torrent().info_bytes; if !info_bytes.is_empty() { if let Ok(len) = info_bytes.len().try_into() { diff --git a/crates/peer_binary_protocol/src/extended/handshake.rs b/crates/peer_binary_protocol/src/extended/handshake.rs index 7cc05df..8ccb80e 100644 --- a/crates/peer_binary_protocol/src/extended/handshake.rs +++ b/crates/peer_binary_protocol/src/extended/handshake.rs @@ -1,16 +1,16 @@ use std::{ collections::HashMap, - net::{IpAddr, Ipv4Addr, Ipv6Addr}, + net::IpAddr, }; use buffers::ByteBuf; -use byteorder::ByteOrder; -use byteorder::BE; use bytes::Bytes; use clone_to_owned::CloneToOwned; use serde::{Deserialize, Deserializer, Serialize}; -use crate::{EXTENDED_UT_METADATA_KEY, MY_EXTENDED_UT_METADATA}; +use crate::{EXTENDED_UT_METADATA_KEY, EXTENDED_UT_PEX_KEY, MY_EXTENDED_UT_METADATA, MY_EXTENDED_UT_PEX}; + +use super::PeerExtendedMessageIds; #[derive(Deserialize, Serialize, Debug, Default)] pub struct ExtendedHandshake { @@ -40,6 +40,7 @@ impl ExtendedHandshake> { pub fn new() -> Self { let mut features = HashMap::new(); features.insert(ByteBuf(EXTENDED_UT_METADATA_KEY), MY_EXTENDED_UT_METADATA); + features.insert(ByteBuf(EXTENDED_UT_PEX_KEY), MY_EXTENDED_UT_PEX); Self { m: features, ..Default::default() @@ -58,6 +59,17 @@ where pub fn ut_metadata(&self) -> Option { self.get_msgid(EXTENDED_UT_METADATA_KEY) } + + pub fn ut_pex(&self) -> Option { + self.get_msgid(EXTENDED_UT_PEX_KEY) + } + + pub fn peer_extended_messages(&self) -> PeerExtendedMessageIds { + PeerExtendedMessageIds { + ut_metadata: self.ut_metadata(), + ut_pex: self.ut_pex(), + } + } } impl CloneToOwned for ExtendedHandshake @@ -122,18 +134,11 @@ impl<'de> Deserialize<'de> for YourIP { E: serde::de::Error, { if v.len() == 4 { - return Ok(YourIP(IpAddr::V4(Ipv4Addr::new(v[0], v[1], v[2], v[3])))); + let ip_bytes: &[u8; 4] = v[0..4].try_into().unwrap(); // Safe to unwrap as we check slice length + return Ok(YourIP(IpAddr::from(*ip_bytes))); } else if v.len() == 16 { - return Ok(YourIP(IpAddr::V6(Ipv6Addr::new( - BE::read_u16(&v[..2]), - BE::read_u16(&v[2..4]), - BE::read_u16(&v[4..6]), - BE::read_u16(&v[6..8]), - BE::read_u16(&v[8..10]), - BE::read_u16(&v[10..12]), - BE::read_u16(&v[12..14]), - BE::read_u16(&v[14..]), - )))); + let ip_bytes: &[u8; 16] = v[0..16].try_into().unwrap(); // Safe to unwrap as we check slice length + return Ok(YourIP(IpAddr::from(*ip_bytes))); } Err(E::custom("expected 4 or 16 byte address")) } diff --git a/crates/peer_binary_protocol/src/extended/mod.rs b/crates/peer_binary_protocol/src/extended/mod.rs index 2ac4607..5ffecac 100644 --- a/crates/peer_binary_protocol/src/extended/mod.rs +++ b/crates/peer_binary_protocol/src/extended/mod.rs @@ -4,6 +4,9 @@ use bencode::BencodeValue; use bytes::Bytes; use clone_to_owned::CloneToOwned; use serde::{Deserialize, Serialize}; +use ut_pex::UtPex; + +use crate::MY_EXTENDED_UT_PEX; use self::{handshake::ExtendedHandshake, ut_metadata::UtMetadata}; @@ -11,13 +14,21 @@ use super::MessageDeserializeError; pub mod handshake; pub mod ut_metadata; +pub mod ut_pex; use super::MY_EXTENDED_UT_METADATA; +#[derive(Debug, Default)] +pub struct PeerExtendedMessageIds { + pub ut_metadata: Option, + pub ut_pex: Option, +} + #[derive(Debug)] pub enum ExtendedMessage { Handshake(ExtendedHandshake), UtMetadata(UtMetadata), + UtPex(UtPex), Dyn(u8, BencodeValue), } @@ -37,6 +48,7 @@ where ExtendedMessage::UtMetadata(m) => { ExtendedMessage::UtMetadata(m.clone_to_owned(within_buffer)) } + ExtendedMessage::UtPex(m) => ExtendedMessage::UtPex(m.clone_to_owned(within_buffer)), } } } @@ -45,7 +57,7 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage, - extended_handshake_ut_metadata: &dyn Fn() -> Option, + extended_handshake_ut_metadata: &dyn Fn() -> PeerExtendedMessageIds, ) -> anyhow::Result<()> where ByteBuf: AsRef<[u8]>, @@ -60,12 +72,20 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage { - let emsg_id = extended_handshake_ut_metadata().ok_or_else(|| { + let emsg_id = extended_handshake_ut_metadata().ut_metadata.ok_or_else(|| { anyhow::anyhow!("need peer's handshake to serialize ut_metadata") })?; out.push(emsg_id); u.serialize(out); - } + }, + ExtendedMessage::UtPex(m) => { + let emsg_id = extended_handshake_ut_metadata().ut_pex.ok_or_else(|| { + anyhow::anyhow!("need peer's handshake to serialize ut_pex, or peer does't support ut_pex") + })?; + out.push(emsg_id); + bencode_serialize_to_writer(m, out)?; + }, + } Ok(()) } @@ -91,6 +111,7 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage { Ok(ExtendedMessage::UtMetadata(UtMetadata::deserialize(buf)?)) } + MY_EXTENDED_UT_PEX => Ok(ExtendedMessage::UtPex(from_bytes(buf)?)), _ => Ok(ExtendedMessage::Dyn(emsg_id, from_bytes(buf)?)), } } diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs new file mode 100644 index 0000000..470015d --- /dev/null +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -0,0 +1,143 @@ +use std::net::{IpAddr, SocketAddr}; + +use byteorder::{ByteOrder, BE}; +use bytes::Bytes; +use clone_to_owned::CloneToOwned; +use serde::{Deserialize, Serialize}; + +#[derive(Debug)] +pub struct PexPeerInfo { + pub flags: u8, + pub addr: SocketAddr, +} + +impl PexPeerInfo { + pub fn from_bytes(buf: &[u8], flags: Option) -> anyhow::Result { + let (ip, port) = match buf.len() { + 6 => { + let ip_bytes: &[u8; 4] = (&buf[0..4]).try_into()?; + let ip = IpAddr::from(*ip_bytes); + let port = BE::read_u16(&buf[4..6]); + (ip, port) + } + 18 => { + let ip_bytes: &[u8; 16] = (&buf[0..16]).try_into()?; + let ip = IpAddr::from(*ip_bytes); + let port = BE::read_u16(&buf[16..18]); + (ip, port) + } + _ => anyhow::bail!("invalid pex peer info"), + }; + Ok(Self { + flags: flags.unwrap_or(0), + addr: (ip, port).into(), + }) + } +} + +#[derive(Debug, Serialize, Default, Deserialize)] +pub struct UtPex { + #[serde(skip_serializing_if = "Option::is_none")] + added: Option, + #[serde(rename = "added.f")] + #[serde(skip_serializing_if = "Option::is_none")] + added_f: Option, + #[serde(skip_serializing_if = "Option::is_none")] + added6: Option, + #[serde(rename = "added6.f")] + #[serde(skip_serializing_if = "Option::is_none")] + added6_f: Option, + #[serde(skip_serializing_if = "Option::is_none")] + dropped: Option, + #[serde(skip_serializing_if = "Option::is_none")] + dropped6: Option, +} + +impl CloneToOwned for UtPex +where + B: CloneToOwned, +{ + type Target = UtPex<::Target>; + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { + UtPex { + added: self.added.clone_to_owned(within_buffer), + added_f: self.added_f.clone_to_owned(within_buffer), + added6: self.added6.clone_to_owned(within_buffer), + added6_f: self.added6_f.clone_to_owned(within_buffer), + dropped: self.dropped.clone_to_owned(within_buffer), + dropped6: self.dropped6.clone_to_owned(within_buffer), + } + } +} + +pub struct PeerAddrIterator<'a> { + addrs: &'a [u8], + flags: &'a [u8], + offset: usize, +} + +impl<'a> Iterator for PeerAddrIterator<'a> { + type Item = PexPeerInfo; + fn next(&mut self) -> Option { + const ADDR_SIZE: usize = 6; + if self.offset*ADDR_SIZE >= self.addrs.len() { + return None; + } + + let addr = &self.addrs[self.offset*ADDR_SIZE..(self.offset+1)*ADDR_SIZE]; + let flags = self.flags.get(self.offset); + self.offset += 1; + Some(PexPeerInfo::from_bytes(addr, flags.cloned()).unwrap()) // safe to unwrap as we assure slice length + + + } +} + +impl UtPex +where + B: AsRef<[u8]>, +{ + pub fn added_peers<'a>(&'a self) -> anyhow::Result + 'a>> { + if let Some(added) = &self.added { + if added.as_ref().len() % 6 != 0 { + anyhow::bail!("invalid pex added peers"); + } + return Ok(Box::new(PeerAddrIterator { + addrs: added.as_ref(), + flags: self.added_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]), + offset: 0, + })); + } else { + return Ok(Box::new(std::iter::empty())); + }; + } +} + +#[cfg(test)] +mod tests { + use bencode::from_bytes; + use buffers::ByteBuf; + + use super::*; + + fn decode_hex(s: &str) -> Vec { + assert!(s.len() % 2 == 0); + (0..s.len()) + .step_by(2) + .map(|i| u8::from_str_radix(&s[i..i + 2], 16).unwrap()) + .collect() + } + + #[test] + fn test_pex_deserialization() { + let msg = "64353a616464656431323ab99f9d14b56797f969861090373a61646465642e66323a0c00363a616464656436303a383a6164646564362e66303a373a64726f70706564303a383a64726f7070656436303a65"; + let bytes = decode_hex(msg); + let pex = from_bytes::>(&bytes).unwrap(); + let addrs: Vec<_> = pex.added_peers().unwrap().collect(); + assert_eq!(2, addrs.len()); + assert_eq!("185.159.157.20:46439".parse::().unwrap(), addrs[0].addr); + assert_eq!(12, addrs[0].flags); + assert_eq!("151.249.105.134:4240".parse::().unwrap(), addrs[1].addr); + assert_eq!(0, addrs[1].flags); + } +} diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index bd2d1a8..a093984 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -9,6 +9,7 @@ use buffers::{ByteBuf, ByteBufOwned}; use byteorder::{ByteOrder, BE}; use bytes::Bytes; use clone_to_owned::CloneToOwned; +use extended::PeerExtendedMessageIds; use librqbit_core::{constants::CHUNK_SIZE, hash_id::Id20, lengths::ChunkInfo}; use serde::{Deserialize, Serialize}; @@ -47,6 +48,9 @@ const MSGID_EXTENDED: u8 = 20; pub const EXTENDED_UT_METADATA_KEY: &[u8] = b"ut_metadata"; pub const MY_EXTENDED_UT_METADATA: u8 = 3; +pub const EXTENDED_UT_PEX_KEY: &[u8] = b"ut_pex"; +pub const MY_EXTENDED_UT_PEX: u8 = 1; + #[derive(Debug)] pub enum MessageDeserializeError { NotEnoughData(usize, &'static str), @@ -276,7 +280,7 @@ where pub fn serialize( &self, out: &mut Vec, - extended_handshake_ut_metadata: &dyn Fn() -> Option, + peer_extended_messages: &dyn Fn() -> PeerExtendedMessageIds, ) -> anyhow::Result { let (lp, msg_id) = self.len_prefix_and_msg_id(); @@ -326,7 +330,7 @@ where Ok(msg_len) } Message::Extended(e) => { - e.serialize(out, extended_handshake_ut_metadata)?; + e.serialize(out, peer_extended_messages)?; let msg_size = out.len(); // no fucking idea why +1, but I tweaked that for it all to match up // with real messages. @@ -636,7 +640,7 @@ mod tests { fn test_extended_serialize() { let msg = Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new())); let mut out = Vec::new(); - msg.serialize(&mut out, &|| None).unwrap(); + msg.serialize(&mut out, &|| Default::default()).unwrap(); dbg!(out); } @@ -652,7 +656,7 @@ mod tests { let (msg, size) = MessageBorrowed::deserialize(&buf).unwrap(); assert_eq!(size, buf.len()); let mut write_buf = Vec::new(); - msg.serialize(&mut write_buf, &|| None).unwrap(); + msg.serialize(&mut write_buf, &|| Default::default()).unwrap(); if buf != write_buf { { use std::io::Write; From 5e09525dd534a8d9054627488487dbfdc71dce54 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sun, 25 Aug 2024 09:38:40 +0200 Subject: [PATCH 2/8] Feeding peers from PEX added - PoC --- crates/librqbit/src/torrent_state/live/mod.rs | 27 ++++++++- .../src/extended/ut_pex.rs | 57 ++++++++++++++++++- 2 files changed, 79 insertions(+), 5 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ad5c66b..26fd846 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -67,7 +67,12 @@ use librqbit_core::{ }; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use peer_binary_protocol::{ - extended::{handshake::{ExtendedHandshake, YourIP}, ut_metadata::UtMetadata, ExtendedMessage}, + extended::{ + handshake::{ExtendedHandshake, YourIP}, + ut_metadata::UtMetadata, + ut_pex::UtPex, + ExtendedMessage, + }, Handshake, Message, MessageOwned, Piece, Request, }; use peers::stats::atomic::AggregatePeerStatsAtomic; @@ -878,7 +883,8 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { .with_context(|| format!("error sending metadata piece {metadata_piece_id}"))?; } Message::Extended(ExtendedMessage::UtPex(pex)) => { - trace!("received ut_pex: {:?} added peers v4: {:?}", pex, pex.added_peers().unwrap().collect::>()); + trace!("received ut_pex: {:?}", pex); + self.on_pex_message(pex); } message => { warn!("received unsupported message {:?}, ignoring", message); @@ -1667,4 +1673,21 @@ impl PeerHandler { .context("error sending UtMetadata: channel closed")?; Ok(()) } + + fn on_pex_message(&self, msg: UtPex) + where + B: AsRef<[u8]> + std::fmt::Debug, + { + // TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ... + if let Ok(peers) = msg.added_peers() { + peers.for_each(|peer| { + self.state + .add_peer_if_not_seen(peer.addr) + .inspect_err(|e| warn!("failed to add peer: {peer:?} due to: {e}")) + .ok(); + }); + } else { + warn!("received invalid pex message: {:?}", msg); + } + } } diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index 470015d..c0312b6 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -74,17 +74,19 @@ pub struct PeerAddrIterator<'a> { addrs: &'a [u8], flags: &'a [u8], offset: usize, + addr_size: usize, } + + impl<'a> Iterator for PeerAddrIterator<'a> { type Item = PexPeerInfo; fn next(&mut self) -> Option { - const ADDR_SIZE: usize = 6; - if self.offset*ADDR_SIZE >= self.addrs.len() { + if self.offset*self.addr_size >= self.addrs.len() { return None; } - let addr = &self.addrs[self.offset*ADDR_SIZE..(self.offset+1)*ADDR_SIZE]; + let addr = &self.addrs[self.offset*self.addr_size..(self.offset+1)*self.addr_size]; let flags = self.flags.get(self.offset); self.offset += 1; Some(PexPeerInfo::from_bytes(addr, flags.cloned()).unwrap()) // safe to unwrap as we assure slice length @@ -106,6 +108,55 @@ where addrs: added.as_ref(), flags: self.added_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]), offset: 0, + addr_size: 6, + })); + } else { + return Ok(Box::new(std::iter::empty())); + }; + } + + pub fn added_peers_v6<'a>(&'a self) -> anyhow::Result + 'a>> { + if let Some(added) = &self.added6 { + if added.as_ref().len() % 18 != 0 { + anyhow::bail!("invalid pex added6 peers"); + } + return Ok(Box::new(PeerAddrIterator { + addrs: added.as_ref(), + flags: self.added6_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]), + offset: 0, + addr_size: 18, + })); + } else { + return Ok(Box::new(std::iter::empty())); + }; + } + + pub fn dropped_peers<'a>(&'a self) -> anyhow::Result + 'a>> { + if let Some(dropped) = &self.dropped { + if dropped.as_ref().len() % 6 != 0 { + anyhow::bail!("invalid pex dropped peers"); + } + return Ok(Box::new(PeerAddrIterator { + addrs: dropped.as_ref(), + flags: &[], + offset: 0, + addr_size: 6, + })); + } else { + return Ok(Box::new(std::iter::empty())); + }; + } + + pub fn dropped_peers_v6<'a>(&'a self) -> anyhow::Result + 'a>> { + if let Some(dropped) = &self.dropped6 { + if dropped.as_ref().len() % 18 != 0 { + anyhow::bail!("invalid pex dropped6 peers"); + } + return Ok(Box::new(PeerAddrIterator { + addrs: dropped.as_ref(), + flags: &[], + offset: 0, + addr_size: 18, })); } else { return Ok(Box::new(std::iter::empty())); From 90bfb85bcc962ed7200828245db0f58811de5576 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 25 Aug 2024 12:53:28 +0100 Subject: [PATCH 3/8] Some cleanups for utpex --- crates/librqbit/src/peer_connection.rs | 2 +- crates/librqbit/src/torrent_state/live/mod.rs | 18 +- crates/peer_binary_protocol/Cargo.toml | 1 + .../src/extended/ut_pex.rs | 163 +++++------------- crates/peer_binary_protocol/src/lib.rs | 4 +- 5 files changed, 56 insertions(+), 132 deletions(-) diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 6d4ac1e..9b3ae73 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -251,7 +251,7 @@ impl PeerConnection { let my_extended = Message::Extended(ExtendedMessage::Handshake(my_extended)); trace!("sending extended handshake: {:?}", &my_extended); my_extended - .serialize(&mut write_buf, &|| Default::default()) + .serialize(&mut write_buf, &Default::default) .unwrap(); with_timeout(rwtimeout, conn.write_all(&write_buf)) .await diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 26fd846..6d57bcb 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -896,7 +896,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> anyhow::Result { let g = self.state.lock_read("serialize_bitfield_message_to_buf"); let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_bytes())); - let len = msg.serialize(buf, &|| Default::default())?; + let len = msg.serialize(buf, &Default::default)?; trace!("sending: {:?}, length={}", &msg, len); Ok(len) } @@ -1679,15 +1679,11 @@ impl PeerHandler { B: AsRef<[u8]> + std::fmt::Debug, { // TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ... - if let Ok(peers) = msg.added_peers() { - peers.for_each(|peer| { - self.state - .add_peer_if_not_seen(peer.addr) - .inspect_err(|e| warn!("failed to add peer: {peer:?} due to: {e}")) - .ok(); - }); - } else { - warn!("received invalid pex message: {:?}", msg); - } + msg.added_peers().for_each(|peer| { + self.state + .add_peer_if_not_seen(peer.addr) + .inspect_err(|error| warn!(?peer, ?error, "failed to add peer")) + .ok(); + }); } } diff --git a/crates/peer_binary_protocol/Cargo.toml b/crates/peer_binary_protocol/Cargo.toml index 4c2ebc5..4ef36a9 100644 --- a/crates/peer_binary_protocol/Cargo.toml +++ b/crates/peer_binary_protocol/Cargo.toml @@ -20,3 +20,4 @@ librqbit-core = { path = "../librqbit_core", default-features = false, version = bitvec = "1" anyhow = "1" bytes = "1.7.1" +itertools = "0.12" diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index c0312b6..00014c1 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -3,6 +3,7 @@ use std::net::{IpAddr, SocketAddr}; use byteorder::{ByteOrder, BE}; use bytes::Bytes; use clone_to_owned::CloneToOwned; +use itertools::{EitherOrBoth, Itertools}; use serde::{Deserialize, Serialize}; #[derive(Debug)] @@ -11,46 +12,18 @@ pub struct PexPeerInfo { pub addr: SocketAddr, } -impl PexPeerInfo { - pub fn from_bytes(buf: &[u8], flags: Option) -> anyhow::Result { - let (ip, port) = match buf.len() { - 6 => { - let ip_bytes: &[u8; 4] = (&buf[0..4]).try_into()?; - let ip = IpAddr::from(*ip_bytes); - let port = BE::read_u16(&buf[4..6]); - (ip, port) - } - 18 => { - let ip_bytes: &[u8; 16] = (&buf[0..16]).try_into()?; - let ip = IpAddr::from(*ip_bytes); - let port = BE::read_u16(&buf[16..18]); - (ip, port) - } - _ => anyhow::bail!("invalid pex peer info"), - }; - Ok(Self { - flags: flags.unwrap_or(0), - addr: (ip, port).into(), - }) - } -} - #[derive(Debug, Serialize, Default, Deserialize)] pub struct UtPex { - #[serde(skip_serializing_if = "Option::is_none")] - added: Option, + added: B, #[serde(rename = "added.f")] #[serde(skip_serializing_if = "Option::is_none")] added_f: Option, - #[serde(skip_serializing_if = "Option::is_none")] - added6: Option, + added6: B, #[serde(rename = "added6.f")] #[serde(skip_serializing_if = "Option::is_none")] added6_f: Option, - #[serde(skip_serializing_if = "Option::is_none")] - dropped: Option, - #[serde(skip_serializing_if = "Option::is_none")] - dropped6: Option, + dropped: B, + dropped6: B, } impl CloneToOwned for UtPex @@ -70,97 +43,45 @@ where } } -pub struct PeerAddrIterator<'a> { - addrs: &'a [u8], - flags: &'a [u8], - offset: usize, - addr_size: usize, -} - - - -impl<'a> Iterator for PeerAddrIterator<'a> { - type Item = PexPeerInfo; - fn next(&mut self) -> Option { - if self.offset*self.addr_size >= self.addrs.len() { - return None; - } - - let addr = &self.addrs[self.offset*self.addr_size..(self.offset+1)*self.addr_size]; - let flags = self.flags.get(self.offset); - self.offset += 1; - Some(PexPeerInfo::from_bytes(addr, flags.cloned()).unwrap()) // safe to unwrap as we assure slice length - - - } -} - impl UtPex where B: AsRef<[u8]>, { - pub fn added_peers<'a>(&'a self) -> anyhow::Result + 'a>> { - if let Some(added) = &self.added { - if added.as_ref().len() % 6 != 0 { - anyhow::bail!("invalid pex added peers"); - } - return Ok(Box::new(PeerAddrIterator { - addrs: added.as_ref(), - flags: self.added_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]), - offset: 0, - addr_size: 6, - })); - } else { - return Ok(Box::new(std::iter::empty())); - }; + fn added_peers_inner<'a>( + &'a self, + buf: &'a B, + flags: &'a Option, + ip_len: usize, + ) -> impl Iterator + 'a { + let addrs = buf.as_ref().chunks_exact(ip_len + 2).map(move |c| { + let ip = match ip_len { + 4 => IpAddr::from(TryInto::<[u8; 4]>::try_into(&c[..4]).unwrap()), + 16 => IpAddr::from(TryInto::<[u8; 16]>::try_into(&c[..16]).unwrap()), + _ => unreachable!(), + }; + let port = BE::read_u16(&c[ip_len..]); + SocketAddr::new(ip, port) + }); + let flags = flags + .as_ref() + .map(|b| b.as_ref().iter().copied()) + .into_iter() + .flatten(); + addrs.zip_longest(flags).filter_map(|eob| match eob { + EitherOrBoth::Both(addr, flags) => Some(PexPeerInfo { flags, addr }), + EitherOrBoth::Left(addr) => Some(PexPeerInfo { flags: 0, addr }), + EitherOrBoth::Right(_) => None, + }) } - pub fn added_peers_v6<'a>(&'a self) -> anyhow::Result + 'a>> { - if let Some(added) = &self.added6 { - if added.as_ref().len() % 18 != 0 { - anyhow::bail!("invalid pex added6 peers"); - } - return Ok(Box::new(PeerAddrIterator { - addrs: added.as_ref(), - flags: self.added6_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]), - offset: 0, - addr_size: 18, - })); - } else { - return Ok(Box::new(std::iter::empty())); - }; + pub fn added_peers(&self) -> impl Iterator + '_ { + self.added_peers_inner(&self.added, &self.added_f, 4) + .chain(self.added_peers_inner(&self.added6, &self.added6_f, 16)) } - pub fn dropped_peers<'a>(&'a self) -> anyhow::Result + 'a>> { - if let Some(dropped) = &self.dropped { - if dropped.as_ref().len() % 6 != 0 { - anyhow::bail!("invalid pex dropped peers"); - } - return Ok(Box::new(PeerAddrIterator { - addrs: dropped.as_ref(), - flags: &[], - offset: 0, - addr_size: 6, - })); - } else { - return Ok(Box::new(std::iter::empty())); - }; - } - - pub fn dropped_peers_v6<'a>(&'a self) -> anyhow::Result + 'a>> { - if let Some(dropped) = &self.dropped6 { - if dropped.as_ref().len() % 18 != 0 { - anyhow::bail!("invalid pex dropped6 peers"); - } - return Ok(Box::new(PeerAddrIterator { - addrs: dropped.as_ref(), - flags: &[], - offset: 0, - addr_size: 18, - })); - } else { - return Ok(Box::new(std::iter::empty())); - }; + pub fn dropped_peers(&self) -> impl Iterator + '_ { + self.added_peers_inner(&self.dropped, &None, 4) + .chain(self.added_peers_inner(&self.dropped6, &None, 16)) } } @@ -184,11 +105,17 @@ mod tests { let msg = "64353a616464656431323ab99f9d14b56797f969861090373a61646465642e66323a0c00363a616464656436303a383a6164646564362e66303a373a64726f70706564303a383a64726f7070656436303a65"; let bytes = decode_hex(msg); let pex = from_bytes::>(&bytes).unwrap(); - let addrs: Vec<_> = pex.added_peers().unwrap().collect(); + let addrs: Vec<_> = pex.added_peers().collect(); assert_eq!(2, addrs.len()); - assert_eq!("185.159.157.20:46439".parse::().unwrap(), addrs[0].addr); + assert_eq!( + "185.159.157.20:46439".parse::().unwrap(), + addrs[0].addr + ); assert_eq!(12, addrs[0].flags); - assert_eq!("151.249.105.134:4240".parse::().unwrap(), addrs[1].addr); + assert_eq!( + "151.249.105.134:4240".parse::().unwrap(), + addrs[1].addr + ); assert_eq!(0, addrs[1].flags); } } diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index a093984..64a39e5 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -640,7 +640,7 @@ mod tests { fn test_extended_serialize() { let msg = Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new())); let mut out = Vec::new(); - msg.serialize(&mut out, &|| Default::default()).unwrap(); + msg.serialize(&mut out, &Default::default).unwrap(); dbg!(out); } @@ -656,7 +656,7 @@ mod tests { let (msg, size) = MessageBorrowed::deserialize(&buf).unwrap(); assert_eq!(size, buf.len()); let mut write_buf = Vec::new(); - msg.serialize(&mut write_buf, &|| Default::default()).unwrap(); + msg.serialize(&mut write_buf, &Default::default).unwrap(); if buf != write_buf { { use std::io::Write; From c7b7dc300ff065ec206bf06d10ab90e7069204b0 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 25 Aug 2024 13:20:13 +0100 Subject: [PATCH 4/8] impl Debug for UtPex --- .../peer_binary_protocol/src/extended/mod.rs | 27 +++++++++-------- .../src/extended/ut_pex.rs | 29 ++++++++++++++++--- crates/peer_binary_protocol/src/lib.rs | 6 ++-- 3 files changed, 43 insertions(+), 19 deletions(-) diff --git a/crates/peer_binary_protocol/src/extended/mod.rs b/crates/peer_binary_protocol/src/extended/mod.rs index 5ffecac..0be0647 100644 --- a/crates/peer_binary_protocol/src/extended/mod.rs +++ b/crates/peer_binary_protocol/src/extended/mod.rs @@ -25,7 +25,7 @@ pub struct PeerExtendedMessageIds { } #[derive(Debug)] -pub enum ExtendedMessage { +pub enum ExtendedMessage> { Handshake(ExtendedHandshake), UtMetadata(UtMetadata), UtPex(UtPex), @@ -34,8 +34,8 @@ pub enum ExtendedMessage { impl CloneToOwned for ExtendedMessage where - ByteBuf: CloneToOwned + std::hash::Hash + Eq, - ::Target: std::hash::Hash + Eq, + ByteBuf: CloneToOwned + std::hash::Hash + Eq + AsRef<[u8]>, + ::Target: std::hash::Hash + Eq + AsRef<[u8]>, { type Target = ExtendedMessage<::Target>; @@ -53,7 +53,7 @@ where } } -impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage { +impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize + AsRef<[u8]>> ExtendedMessage { pub fn serialize( &self, out: &mut Vec, @@ -72,27 +72,30 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage { - let emsg_id = extended_handshake_ut_metadata().ut_metadata.ok_or_else(|| { - anyhow::anyhow!("need peer's handshake to serialize ut_metadata") - })?; + let emsg_id = extended_handshake_ut_metadata() + .ut_metadata + .ok_or_else(|| { + anyhow::anyhow!("need peer's handshake to serialize ut_metadata") + })?; out.push(emsg_id); u.serialize(out); - }, + } ExtendedMessage::UtPex(m) => { let emsg_id = extended_handshake_ut_metadata().ut_pex.ok_or_else(|| { - anyhow::anyhow!("need peer's handshake to serialize ut_pex, or peer does't support ut_pex") + anyhow::anyhow!( + "need peer's handshake to serialize ut_pex, or peer does't support ut_pex" + ) })?; out.push(emsg_id); bencode_serialize_to_writer(m, out)?; - }, - + } } Ok(()) } pub fn deserialize(mut buf: &'a [u8]) -> Result where - ByteBuf: Deserialize<'a> + From<&'a [u8]>, + ByteBuf: Deserialize<'a> + From<&'a [u8]> + AsRef<[u8]>, { let emsg_id = buf.first().copied().ok_or_else(|| { MessageDeserializeError::Other(anyhow::anyhow!( diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index 00014c1..149e8c0 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -12,7 +12,7 @@ pub struct PexPeerInfo { pub addr: SocketAddr, } -#[derive(Debug, Serialize, Default, Deserialize)] +#[derive(Serialize, Default, Deserialize)] pub struct UtPex { added: B, #[serde(rename = "added.f")] @@ -26,6 +26,27 @@ pub struct UtPex { dropped6: B, } +impl core::fmt::Debug for UtPex +where + B: AsRef<[u8]>, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + struct IterDebug(I); + impl core::fmt::Debug for IterDebug + where + I: Iterator + Clone, + { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_list().entries(self.0.clone()).finish() + } + } + f.debug_struct("UtPex") + .field("added", &IterDebug(self.added_peers())) + .field("dropped", &IterDebug(self.dropped_peers())) + .finish() + } +} + impl CloneToOwned for UtPex where B: CloneToOwned, @@ -52,7 +73,7 @@ where buf: &'a B, flags: &'a Option, ip_len: usize, - ) -> impl Iterator + 'a { + ) -> impl Iterator + Clone + 'a { let addrs = buf.as_ref().chunks_exact(ip_len + 2).map(move |c| { let ip = match ip_len { 4 => IpAddr::from(TryInto::<[u8; 4]>::try_into(&c[..4]).unwrap()), @@ -74,12 +95,12 @@ where }) } - pub fn added_peers(&self) -> impl Iterator + '_ { + pub fn added_peers(&self) -> impl Iterator + Clone + '_ { self.added_peers_inner(&self.added, &self.added_f, 4) .chain(self.added_peers_inner(&self.added6, &self.added6_f, 16)) } - pub fn dropped_peers(&self) -> impl Iterator + '_ { + pub fn dropped_peers(&self) -> impl Iterator + Clone + '_ { self.added_peers_inner(&self.dropped, &None, 4) .chain(self.added_peers_inner(&self.dropped6, &None, 16)) } diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index 64a39e5..71a92c5 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -186,7 +186,7 @@ impl From for MessageDeserializeError { } #[derive(Debug)] -pub enum Message { +pub enum Message> { Request(Request), Cancel(Request), Bitfield(ByteBuf), @@ -212,8 +212,8 @@ pub struct Bitfield<'a> { impl CloneToOwned for Message where - ByteBuf: CloneToOwned + std::hash::Hash + Eq, - ::Target: std::hash::Hash + Eq, + ByteBuf: CloneToOwned + std::hash::Hash + Eq + AsRef<[u8]>, + ::Target: std::hash::Hash + Eq + AsRef<[u8]>, { type Target = Message<::Target>; From 9d1ef5c35c2dfb48fa47d022cc247f8536e42060 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 25 Aug 2024 13:29:28 +0100 Subject: [PATCH 5/8] Shorten ByteBuf trait bounds everywhere --- crates/buffers/src/lib.rs | 8 ++++++-- crates/librqbit/src/torrent_state/live/mod.rs | 5 ++++- .../src/extended/handshake.rs | 19 +++++++++---------- .../peer_binary_protocol/src/extended/mod.rs | 15 ++++++++------- .../src/extended/ut_metadata.rs | 5 +++-- crates/peer_binary_protocol/src/lib.rs | 10 +++++----- 6 files changed, 35 insertions(+), 27 deletions(-) diff --git a/crates/buffers/src/lib.rs b/crates/buffers/src/lib.rs index 814e843..f6cae86 100644 --- a/crates/buffers/src/lib.rs +++ b/crates/buffers/src/lib.rs @@ -3,8 +3,10 @@ // // Not useful outside of librqbit. +use std::borrow::Borrow; + use bytes::Bytes; -use serde::{Deserialize, Deserializer}; +use serde::{Deserialize, Deserializer, Serialize}; use clone_to_owned::CloneToOwned; @@ -15,7 +17,9 @@ pub struct ByteBufOwned(pub bytes::Bytes); #[serde(transparent)] pub struct ByteBuf<'a>(pub &'a [u8]); -pub trait ByteBufT { +pub trait ByteBufT: + AsRef<[u8]> + std::hash::Hash + Serialize + Eq + core::fmt::Debug + CloneToOwned + Borrow<[u8]> +{ fn as_slice(&self) -> &[u8]; } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 6d57bcb..dc86f11 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1682,7 +1682,10 @@ impl PeerHandler { msg.added_peers().for_each(|peer| { self.state .add_peer_if_not_seen(peer.addr) - .inspect_err(|error| warn!(?peer, ?error, "failed to add peer")) + .map_err(|error| { + warn!(?peer, ?error, "failed to add peer"); + error + }) .ok(); }); } diff --git a/crates/peer_binary_protocol/src/extended/handshake.rs b/crates/peer_binary_protocol/src/extended/handshake.rs index 8ccb80e..721fff8 100644 --- a/crates/peer_binary_protocol/src/extended/handshake.rs +++ b/crates/peer_binary_protocol/src/extended/handshake.rs @@ -1,19 +1,18 @@ -use std::{ - collections::HashMap, - net::IpAddr, -}; +use std::{collections::HashMap, net::IpAddr}; -use buffers::ByteBuf; +use buffers::{ByteBuf, ByteBufT}; use bytes::Bytes; use clone_to_owned::CloneToOwned; use serde::{Deserialize, Deserializer, Serialize}; -use crate::{EXTENDED_UT_METADATA_KEY, EXTENDED_UT_PEX_KEY, MY_EXTENDED_UT_METADATA, MY_EXTENDED_UT_PEX}; +use crate::{ + EXTENDED_UT_METADATA_KEY, EXTENDED_UT_PEX_KEY, MY_EXTENDED_UT_METADATA, MY_EXTENDED_UT_PEX, +}; use super::PeerExtendedMessageIds; #[derive(Deserialize, Serialize, Debug, Default)] -pub struct ExtendedHandshake { +pub struct ExtendedHandshake { #[serde(bound(deserialize = "ByteBuf: From<&'de [u8]>"))] pub m: HashMap, #[serde(skip_serializing_if = "Option::is_none")] @@ -50,7 +49,7 @@ impl ExtendedHandshake> { impl<'a, ByteBuf> ExtendedHandshake where - ByteBuf: Eq + std::hash::Hash + std::borrow::Borrow<[u8]>, + ByteBuf: ByteBufT, { fn get_msgid(&self, msg_type: &'a [u8]) -> Option { self.m.get(msg_type).copied() @@ -74,8 +73,8 @@ where impl CloneToOwned for ExtendedHandshake where - ByteBuf: CloneToOwned + Eq + std::hash::Hash, - ::Target: Eq + std::hash::Hash, + ByteBuf: ByteBufT, + ::Target: ByteBufT, { type Target = ExtendedHandshake<::Target>; diff --git a/crates/peer_binary_protocol/src/extended/mod.rs b/crates/peer_binary_protocol/src/extended/mod.rs index 0be0647..85296ba 100644 --- a/crates/peer_binary_protocol/src/extended/mod.rs +++ b/crates/peer_binary_protocol/src/extended/mod.rs @@ -1,9 +1,10 @@ use bencode::bencode_serialize_to_writer; use bencode::from_bytes; use bencode::BencodeValue; +use buffers::ByteBufT; use bytes::Bytes; use clone_to_owned::CloneToOwned; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use ut_pex::UtPex; use crate::MY_EXTENDED_UT_PEX; @@ -25,7 +26,7 @@ pub struct PeerExtendedMessageIds { } #[derive(Debug)] -pub enum ExtendedMessage> { +pub enum ExtendedMessage { Handshake(ExtendedHandshake), UtMetadata(UtMetadata), UtPex(UtPex), @@ -34,8 +35,8 @@ pub enum ExtendedMessage> { impl CloneToOwned for ExtendedMessage where - ByteBuf: CloneToOwned + std::hash::Hash + Eq + AsRef<[u8]>, - ::Target: std::hash::Hash + Eq + AsRef<[u8]>, + ByteBuf: ByteBufT, + ::Target: ByteBufT, { type Target = ExtendedMessage<::Target>; @@ -53,7 +54,7 @@ where } } -impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize + AsRef<[u8]>> ExtendedMessage { +impl ExtendedMessage { pub fn serialize( &self, out: &mut Vec, @@ -93,9 +94,9 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize + AsRef<[u8]>> ExtendedM Ok(()) } - pub fn deserialize(mut buf: &'a [u8]) -> Result + pub fn deserialize<'a>(mut buf: &'a [u8]) -> Result where - ByteBuf: Deserialize<'a> + From<&'a [u8]> + AsRef<[u8]>, + ByteBuf: Deserialize<'a> + From<&'a [u8]>, { let emsg_id = buf.first().copied().ok_or_else(|| { MessageDeserializeError::Other(anyhow::anyhow!( diff --git a/crates/peer_binary_protocol/src/extended/ut_metadata.rs b/crates/peer_binary_protocol/src/extended/ut_metadata.rs index ac7e58b..face0bd 100644 --- a/crates/peer_binary_protocol/src/extended/ut_metadata.rs +++ b/crates/peer_binary_protocol/src/extended/ut_metadata.rs @@ -1,5 +1,6 @@ use bencode::bencode_serialize_to_writer; use bencode::BencodeDeserializer; +use buffers::ByteBufT; use bytes::Bytes; use clone_to_owned::CloneToOwned; use serde::Deserialize; @@ -39,7 +40,7 @@ impl CloneToOwned for UtMetadata { } } -impl<'a, ByteBuf: 'a> UtMetadata { +impl UtMetadata { pub fn serialize(&self, buf: &mut Vec) where ByteBuf: AsRef<[u8]>, @@ -83,7 +84,7 @@ impl<'a, ByteBuf: 'a> UtMetadata { } } } - pub fn deserialize(buf: &'a [u8]) -> Result + pub fn deserialize<'a>(buf: &'a [u8]) -> Result where ByteBuf: From<&'a [u8]>, { diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index 71a92c5..27926df 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -5,7 +5,7 @@ pub mod extended; use bincode::Options; -use buffers::{ByteBuf, ByteBufOwned}; +use buffers::{ByteBuf, ByteBufOwned, ByteBufT}; use byteorder::{ByteOrder, BE}; use bytes::Bytes; use clone_to_owned::CloneToOwned; @@ -186,7 +186,7 @@ impl From for MessageDeserializeError { } #[derive(Debug)] -pub enum Message> { +pub enum Message { Request(Request), Cancel(Request), Bitfield(ByteBuf), @@ -212,8 +212,8 @@ pub struct Bitfield<'a> { impl CloneToOwned for Message where - ByteBuf: CloneToOwned + std::hash::Hash + Eq + AsRef<[u8]>, - ::Target: std::hash::Hash + Eq + AsRef<[u8]>, + ByteBuf: ByteBufT, + ::Target: ByteBufT, { type Target = Message<::Target>; @@ -257,7 +257,7 @@ impl<'a> std::fmt::Debug for Bitfield<'a> { impl Message where - ByteBuf: AsRef<[u8]> + std::hash::Hash + Eq + Serialize, + ByteBuf: ByteBufT, { pub fn len_prefix_and_msg_id(&self) -> (u32, u8) { match self { From 9b5c69f37838bed9db12ac6e5cee699bf8d63983 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 25 Aug 2024 13:32:46 +0100 Subject: [PATCH 6/8] Shorten debug for PexPeerInfo --- crates/peer_binary_protocol/src/extended/ut_pex.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index 149e8c0..73698f2 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -6,12 +6,21 @@ use clone_to_owned::CloneToOwned; use itertools::{EitherOrBoth, Itertools}; use serde::{Deserialize, Serialize}; -#[derive(Debug)] pub struct PexPeerInfo { pub flags: u8, pub addr: SocketAddr, } +impl core::fmt::Debug for PexPeerInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.addr)?; + if self.flags != 0 { + write!(f, ";flags={}", self.flags)?; + } + Ok(()) + } +} + #[derive(Serialize, Default, Deserialize)] pub struct UtPex { added: B, From 14e0c9d83398abf14d187ed52611530a67c0d245 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 28 Aug 2024 09:59:32 +0100 Subject: [PATCH 7/8] Remove duplicate ut_pex message --- crates/librqbit/src/torrent_state/live/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index dc86f11..6afa729 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -883,7 +883,6 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { .with_context(|| format!("error sending metadata piece {metadata_piece_id}"))?; } Message::Extended(ExtendedMessage::UtPex(pex)) => { - trace!("received ut_pex: {:?}", pex); self.on_pex_message(pex); } message => { From 322ff24edf4764cd3e3631d4fb8c9127c3dd3973 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 28 Aug 2024 10:23:28 +0100 Subject: [PATCH 8/8] Cargo update --- Cargo.lock | 9 ++-- desktop/src-tauri/Cargo.lock | 98 +++++++++++++++++------------------- 2 files changed, 50 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a15b72..df88fd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1638,6 +1638,7 @@ dependencies = [ "bitvec", "byteorder", "bytes", + "itertools", "librqbit-bencode", "librqbit-buffers", "librqbit-clone-to-owned", @@ -2598,9 +2599,9 @@ checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" [[package]] name = "rustix" -version = "0.38.34" +version = "0.38.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +checksum = "a85d50532239da68e9addb745ba38ff4612a242c1c7ceea689c4bc7c2f43c36f" dependencies = [ "bitflags 2.6.0", "errno", @@ -2641,9 +2642,9 @@ checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" [[package]] name = "rustls-webpki" -version = "0.102.6" +version = "0.102.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" +checksum = "84678086bd54edf2b415183ed7a94d0efb049f1b646a33e22a36f3794be6ae56" dependencies = [ "ring", "rustls-pki-types", diff --git a/desktop/src-tauri/Cargo.lock b/desktop/src-tauri/Cargo.lock index 0169880..bdfd740 100644 --- a/desktop/src-tauri/Cargo.lock +++ b/desktop/src-tauri/Cargo.lock @@ -68,17 +68,6 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" -[[package]] -name = "async-recursion" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.76", -] - [[package]] name = "async-stream" version = "0.3.5" @@ -329,9 +318,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.17.0" +version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fd4c6dcc3b0aea2f5c0b4b82c2b15fe39ddbc76041a310848f4706edf76bb31" +checksum = "773d90827bc3feecfb67fab12e24de0749aad83c74b9504ecde46237b5cd24e2" [[package]] name = "byteorder" @@ -901,9 +890,9 @@ dependencies = [ [[package]] name = "filetime" -version = "0.2.24" +version = "0.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf401df4a4e3872c4fe8151134cf483738e74b67fc934d6532c882b3d24a4550" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" dependencies = [ "cfg-if", "libc", @@ -1842,7 +1831,7 @@ dependencies = [ [[package]] name = "librqbit" -version = "7.0.0-beta.3" +version = "7.0.0" dependencies = [ "anyhow", "async-stream", @@ -1868,6 +1857,7 @@ dependencies = [ "librqbit-sha1-wrapper", "librqbit-tracker-comms", "librqbit-upnp", + "librqbit-upnp-serve", "memmap2", "mime_guess", "parking_lot", @@ -1887,7 +1877,6 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", - "upnp-serve", "url", "urlencoding", "uuid", @@ -1895,7 +1884,7 @@ dependencies = [ [[package]] name = "librqbit-bencode" -version = "3.0.0" +version = "3.0.1" dependencies = [ "anyhow", "bytes", @@ -1923,7 +1912,7 @@ dependencies = [ [[package]] name = "librqbit-core" -version = "4.0.0" +version = "4.0.1" dependencies = [ "anyhow", "bytes", @@ -1978,6 +1967,7 @@ dependencies = [ "bitvec", "byteorder", "bytes", + "itertools", "librqbit-bencode", "librqbit-buffers", "librqbit-clone-to-owned", @@ -1987,14 +1977,14 @@ dependencies = [ [[package]] name = "librqbit-sha1-wrapper" -version = "3.0.0" +version = "4.0.0" dependencies = [ "crypto-hash", ] [[package]] name = "librqbit-tracker-comms" -version = "1.0.3" +version = "2.0.0" dependencies = [ "anyhow", "async-stream", @@ -2017,7 +2007,6 @@ name = "librqbit-upnp" version = "0.1.1" dependencies = [ "anyhow", - "async-recursion", "bstr", "futures", "httparse", @@ -2030,6 +2019,32 @@ dependencies = [ "url", ] +[[package]] +name = "librqbit-upnp-serve" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "bstr", + "gethostname", + "http 1.1.0", + "httparse", + "librqbit-core", + "librqbit-sha1-wrapper", + "librqbit-upnp", + "mime_guess", + "parking_lot", + "quick-xml 0.36.1", + "reqwest", + "serde", + "socket2", + "tokio", + "tokio-util", + "tracing", + "url", + "uuid", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -2235,8 +2250,9 @@ dependencies = [ [[package]] name = "network-interface" -version = "1.1.1" -source = "git+https://github.com/ikatson/network-interface?branch=compile-on-freebsd#aca8a95ab1bb41a27bc82c6a2425eb4824bf0352" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433419f898328beca4f2c6c73a1b52540658d92b0a99f0269330457e0fd998d5" dependencies = [ "cc", "libc", @@ -2809,6 +2825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc" dependencies = [ "memchr", + "serde", ] [[package]] @@ -3054,18 +3071,18 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustc_version" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" dependencies = [ "semver", ] [[package]] name = "rustix" -version = "0.38.34" +version = "0.38.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +checksum = "a85d50532239da68e9addb745ba38ff4612a242c1c7ceea689c4bc7c2f43c36f" dependencies = [ "bitflags 2.6.0", "errno", @@ -4241,31 +4258,6 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" -[[package]] -name = "upnp-serve" -version = "0.1.0" -dependencies = [ - "anyhow", - "axum", - "bstr", - "gethostname", - "http 1.1.0", - "httparse", - "librqbit-core", - "librqbit-sha1-wrapper", - "librqbit-upnp", - "mime_guess", - "parking_lot", - "quick-xml 0.36.1", - "reqwest", - "socket2", - "tokio", - "tokio-util", - "tracing", - "url", - "uuid", -] - [[package]] name = "url" version = "2.5.2"