diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 8401ae8..6d4ac1e 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -14,10 +14,7 @@ use librqbit_core::{ }; use parking_lot::RwLock; use peer_binary_protocol::{ - extended::{ - handshake::{ExtendedHandshake, YourIP}, - ExtendedMessage, - }, + extended::{handshake::ExtendedHandshake, ExtendedMessage}, serialize_piece_preamble, Handshake, Message, MessageOwned, PIECE_MESSAGE_DEFAULT_LEN, }; use serde::{Deserialize, Serialize}; @@ -248,14 +245,14 @@ impl PeerConnection { let supports_extended = handshake_supports_extended; if supports_extended { - let your_ip = self.addr.ip(); let mut my_extended = ExtendedHandshake::new(); - my_extended.yourip = Some(YourIP(your_ip)); self.handler .update_my_extended_handshake(&mut my_extended)?; let my_extended = Message::Extended(ExtendedMessage::Handshake(my_extended)); trace!("sending extended handshake: {:?}", &my_extended); - my_extended.serialize(&mut write_buf, &|| None).unwrap(); + my_extended + .serialize(&mut write_buf, &|| Default::default()) + .unwrap(); with_timeout(rwtimeout, conn.write_all(&write_buf)) .await .context("error writing extended handshake")?; @@ -318,7 +315,8 @@ impl PeerConnection { extended_handshake_ref .read() .as_ref() - .and_then(|e| e.ut_metadata()) + .map(|e| e.peer_extended_messages()) + .unwrap_or_default() })?, WriterRequest::ReadChunkRequest(chunk) => { #[allow(unused_mut)] @@ -397,7 +395,6 @@ impl PeerConnection { if let Message::Extended(ExtendedMessage::Handshake(h)) = &message { *extended_handshake_ref.write() = Some(h.clone_to_owned(None)); self.handler.on_extended_handshake(h)?; - trace!("remembered extended handshake for future serializing"); } else { self.handler .on_received_message(message) diff --git a/crates/librqbit/src/read_buf.rs b/crates/librqbit/src/read_buf.rs index 2d57007..4b1428e 100644 --- a/crates/librqbit/src/read_buf.rs +++ b/crates/librqbit/src/read_buf.rs @@ -57,7 +57,7 @@ impl ReadBuf { anyhow::bail!("peer disconnected while reading handshake"); } let (h, size) = Handshake::deserialize(&self.buf[..self.filled]) - .map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?; + .map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?} hadshake data {:?}", e, &self.buf[..self.filled.min(19)]))?; self.processed = size; Ok(h) } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 385dd56..ad5c66b 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -67,7 +67,7 @@ use librqbit_core::{ }; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use peer_binary_protocol::{ - extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage}, + extended::{handshake::{ExtendedHandshake, YourIP}, ut_metadata::UtMetadata, ExtendedMessage}, Handshake, Message, MessageOwned, Piece, Request, }; use peers::stats::atomic::AggregatePeerStatsAtomic; @@ -877,6 +877,9 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { self.send_metadata_piece(metadata_piece_id) .with_context(|| format!("error sending metadata piece {metadata_piece_id}"))?; } + Message::Extended(ExtendedMessage::UtPex(pex)) => { + trace!("received ut_pex: {:?} added peers v4: {:?}", pex, pex.added_peers().unwrap().collect::>()); + } message => { warn!("received unsupported message {:?}, ignoring", message); } @@ -887,7 +890,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> anyhow::Result { let g = self.state.lock_read("serialize_bitfield_message_to_buf"); let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_bytes())); - let len = msg.serialize(buf, &|| None)?; + let len = msg.serialize(buf, &|| Default::default())?; trace!("sending: {:?}, length={}", &msg, len); Ok(len) } @@ -914,7 +917,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { self.state.file_ops().read_chunk(self.addr, chunk, buf) } - fn on_extended_handshake(&self, _: &ExtendedHandshake) -> anyhow::Result<()> { + fn on_extended_handshake(&self, hs: &ExtendedHandshake) -> anyhow::Result<()> { + if let Some(peer_pex_msg_id) = hs.ut_pex() { + trace!("peer supports pex at {peer_pex_msg_id}"); + } Ok(()) } @@ -937,6 +943,8 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { &self, handshake: &mut ExtendedHandshake, ) -> anyhow::Result<()> { + let your_ip = self.addr.ip(); + handshake.yourip = Some(YourIP(your_ip)); let info_bytes = &self.state.torrent().info_bytes; if !info_bytes.is_empty() { if let Ok(len) = info_bytes.len().try_into() { diff --git a/crates/peer_binary_protocol/src/extended/handshake.rs b/crates/peer_binary_protocol/src/extended/handshake.rs index 7cc05df..8ccb80e 100644 --- a/crates/peer_binary_protocol/src/extended/handshake.rs +++ b/crates/peer_binary_protocol/src/extended/handshake.rs @@ -1,16 +1,16 @@ use std::{ collections::HashMap, - net::{IpAddr, Ipv4Addr, Ipv6Addr}, + net::IpAddr, }; use buffers::ByteBuf; -use byteorder::ByteOrder; -use byteorder::BE; use bytes::Bytes; use clone_to_owned::CloneToOwned; use serde::{Deserialize, Deserializer, Serialize}; -use crate::{EXTENDED_UT_METADATA_KEY, MY_EXTENDED_UT_METADATA}; +use crate::{EXTENDED_UT_METADATA_KEY, EXTENDED_UT_PEX_KEY, MY_EXTENDED_UT_METADATA, MY_EXTENDED_UT_PEX}; + +use super::PeerExtendedMessageIds; #[derive(Deserialize, Serialize, Debug, Default)] pub struct ExtendedHandshake { @@ -40,6 +40,7 @@ impl ExtendedHandshake> { pub fn new() -> Self { let mut features = HashMap::new(); features.insert(ByteBuf(EXTENDED_UT_METADATA_KEY), MY_EXTENDED_UT_METADATA); + features.insert(ByteBuf(EXTENDED_UT_PEX_KEY), MY_EXTENDED_UT_PEX); Self { m: features, ..Default::default() @@ -58,6 +59,17 @@ where pub fn ut_metadata(&self) -> Option { self.get_msgid(EXTENDED_UT_METADATA_KEY) } + + pub fn ut_pex(&self) -> Option { + self.get_msgid(EXTENDED_UT_PEX_KEY) + } + + pub fn peer_extended_messages(&self) -> PeerExtendedMessageIds { + PeerExtendedMessageIds { + ut_metadata: self.ut_metadata(), + ut_pex: self.ut_pex(), + } + } } impl CloneToOwned for ExtendedHandshake @@ -122,18 +134,11 @@ impl<'de> Deserialize<'de> for YourIP { E: serde::de::Error, { if v.len() == 4 { - return Ok(YourIP(IpAddr::V4(Ipv4Addr::new(v[0], v[1], v[2], v[3])))); + let ip_bytes: &[u8; 4] = v[0..4].try_into().unwrap(); // Safe to unwrap as we check slice length + return Ok(YourIP(IpAddr::from(*ip_bytes))); } else if v.len() == 16 { - return Ok(YourIP(IpAddr::V6(Ipv6Addr::new( - BE::read_u16(&v[..2]), - BE::read_u16(&v[2..4]), - BE::read_u16(&v[4..6]), - BE::read_u16(&v[6..8]), - BE::read_u16(&v[8..10]), - BE::read_u16(&v[10..12]), - BE::read_u16(&v[12..14]), - BE::read_u16(&v[14..]), - )))); + let ip_bytes: &[u8; 16] = v[0..16].try_into().unwrap(); // Safe to unwrap as we check slice length + return Ok(YourIP(IpAddr::from(*ip_bytes))); } Err(E::custom("expected 4 or 16 byte address")) } diff --git a/crates/peer_binary_protocol/src/extended/mod.rs b/crates/peer_binary_protocol/src/extended/mod.rs index 2ac4607..5ffecac 100644 --- a/crates/peer_binary_protocol/src/extended/mod.rs +++ b/crates/peer_binary_protocol/src/extended/mod.rs @@ -4,6 +4,9 @@ use bencode::BencodeValue; use bytes::Bytes; use clone_to_owned::CloneToOwned; use serde::{Deserialize, Serialize}; +use ut_pex::UtPex; + +use crate::MY_EXTENDED_UT_PEX; use self::{handshake::ExtendedHandshake, ut_metadata::UtMetadata}; @@ -11,13 +14,21 @@ use super::MessageDeserializeError; pub mod handshake; pub mod ut_metadata; +pub mod ut_pex; use super::MY_EXTENDED_UT_METADATA; +#[derive(Debug, Default)] +pub struct PeerExtendedMessageIds { + pub ut_metadata: Option, + pub ut_pex: Option, +} + #[derive(Debug)] pub enum ExtendedMessage { Handshake(ExtendedHandshake), UtMetadata(UtMetadata), + UtPex(UtPex), Dyn(u8, BencodeValue), } @@ -37,6 +48,7 @@ where ExtendedMessage::UtMetadata(m) => { ExtendedMessage::UtMetadata(m.clone_to_owned(within_buffer)) } + ExtendedMessage::UtPex(m) => ExtendedMessage::UtPex(m.clone_to_owned(within_buffer)), } } } @@ -45,7 +57,7 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage, - extended_handshake_ut_metadata: &dyn Fn() -> Option, + extended_handshake_ut_metadata: &dyn Fn() -> PeerExtendedMessageIds, ) -> anyhow::Result<()> where ByteBuf: AsRef<[u8]>, @@ -60,12 +72,20 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage { - let emsg_id = extended_handshake_ut_metadata().ok_or_else(|| { + let emsg_id = extended_handshake_ut_metadata().ut_metadata.ok_or_else(|| { anyhow::anyhow!("need peer's handshake to serialize ut_metadata") })?; out.push(emsg_id); u.serialize(out); - } + }, + ExtendedMessage::UtPex(m) => { + let emsg_id = extended_handshake_ut_metadata().ut_pex.ok_or_else(|| { + anyhow::anyhow!("need peer's handshake to serialize ut_pex, or peer does't support ut_pex") + })?; + out.push(emsg_id); + bencode_serialize_to_writer(m, out)?; + }, + } Ok(()) } @@ -91,6 +111,7 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage { Ok(ExtendedMessage::UtMetadata(UtMetadata::deserialize(buf)?)) } + MY_EXTENDED_UT_PEX => Ok(ExtendedMessage::UtPex(from_bytes(buf)?)), _ => Ok(ExtendedMessage::Dyn(emsg_id, from_bytes(buf)?)), } } diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs new file mode 100644 index 0000000..470015d --- /dev/null +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -0,0 +1,143 @@ +use std::net::{IpAddr, SocketAddr}; + +use byteorder::{ByteOrder, BE}; +use bytes::Bytes; +use clone_to_owned::CloneToOwned; +use serde::{Deserialize, Serialize}; + +#[derive(Debug)] +pub struct PexPeerInfo { + pub flags: u8, + pub addr: SocketAddr, +} + +impl PexPeerInfo { + pub fn from_bytes(buf: &[u8], flags: Option) -> anyhow::Result { + let (ip, port) = match buf.len() { + 6 => { + let ip_bytes: &[u8; 4] = (&buf[0..4]).try_into()?; + let ip = IpAddr::from(*ip_bytes); + let port = BE::read_u16(&buf[4..6]); + (ip, port) + } + 18 => { + let ip_bytes: &[u8; 16] = (&buf[0..16]).try_into()?; + let ip = IpAddr::from(*ip_bytes); + let port = BE::read_u16(&buf[16..18]); + (ip, port) + } + _ => anyhow::bail!("invalid pex peer info"), + }; + Ok(Self { + flags: flags.unwrap_or(0), + addr: (ip, port).into(), + }) + } +} + +#[derive(Debug, Serialize, Default, Deserialize)] +pub struct UtPex { + #[serde(skip_serializing_if = "Option::is_none")] + added: Option, + #[serde(rename = "added.f")] + #[serde(skip_serializing_if = "Option::is_none")] + added_f: Option, + #[serde(skip_serializing_if = "Option::is_none")] + added6: Option, + #[serde(rename = "added6.f")] + #[serde(skip_serializing_if = "Option::is_none")] + added6_f: Option, + #[serde(skip_serializing_if = "Option::is_none")] + dropped: Option, + #[serde(skip_serializing_if = "Option::is_none")] + dropped6: Option, +} + +impl CloneToOwned for UtPex +where + B: CloneToOwned, +{ + type Target = UtPex<::Target>; + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { + UtPex { + added: self.added.clone_to_owned(within_buffer), + added_f: self.added_f.clone_to_owned(within_buffer), + added6: self.added6.clone_to_owned(within_buffer), + added6_f: self.added6_f.clone_to_owned(within_buffer), + dropped: self.dropped.clone_to_owned(within_buffer), + dropped6: self.dropped6.clone_to_owned(within_buffer), + } + } +} + +pub struct PeerAddrIterator<'a> { + addrs: &'a [u8], + flags: &'a [u8], + offset: usize, +} + +impl<'a> Iterator for PeerAddrIterator<'a> { + type Item = PexPeerInfo; + fn next(&mut self) -> Option { + const ADDR_SIZE: usize = 6; + if self.offset*ADDR_SIZE >= self.addrs.len() { + return None; + } + + let addr = &self.addrs[self.offset*ADDR_SIZE..(self.offset+1)*ADDR_SIZE]; + let flags = self.flags.get(self.offset); + self.offset += 1; + Some(PexPeerInfo::from_bytes(addr, flags.cloned()).unwrap()) // safe to unwrap as we assure slice length + + + } +} + +impl UtPex +where + B: AsRef<[u8]>, +{ + pub fn added_peers<'a>(&'a self) -> anyhow::Result + 'a>> { + if let Some(added) = &self.added { + if added.as_ref().len() % 6 != 0 { + anyhow::bail!("invalid pex added peers"); + } + return Ok(Box::new(PeerAddrIterator { + addrs: added.as_ref(), + flags: self.added_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]), + offset: 0, + })); + } else { + return Ok(Box::new(std::iter::empty())); + }; + } +} + +#[cfg(test)] +mod tests { + use bencode::from_bytes; + use buffers::ByteBuf; + + use super::*; + + fn decode_hex(s: &str) -> Vec { + assert!(s.len() % 2 == 0); + (0..s.len()) + .step_by(2) + .map(|i| u8::from_str_radix(&s[i..i + 2], 16).unwrap()) + .collect() + } + + #[test] + fn test_pex_deserialization() { + let msg = "64353a616464656431323ab99f9d14b56797f969861090373a61646465642e66323a0c00363a616464656436303a383a6164646564362e66303a373a64726f70706564303a383a64726f7070656436303a65"; + let bytes = decode_hex(msg); + let pex = from_bytes::>(&bytes).unwrap(); + let addrs: Vec<_> = pex.added_peers().unwrap().collect(); + assert_eq!(2, addrs.len()); + assert_eq!("185.159.157.20:46439".parse::().unwrap(), addrs[0].addr); + assert_eq!(12, addrs[0].flags); + assert_eq!("151.249.105.134:4240".parse::().unwrap(), addrs[1].addr); + assert_eq!(0, addrs[1].flags); + } +} diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index bd2d1a8..a093984 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -9,6 +9,7 @@ use buffers::{ByteBuf, ByteBufOwned}; use byteorder::{ByteOrder, BE}; use bytes::Bytes; use clone_to_owned::CloneToOwned; +use extended::PeerExtendedMessageIds; use librqbit_core::{constants::CHUNK_SIZE, hash_id::Id20, lengths::ChunkInfo}; use serde::{Deserialize, Serialize}; @@ -47,6 +48,9 @@ const MSGID_EXTENDED: u8 = 20; pub const EXTENDED_UT_METADATA_KEY: &[u8] = b"ut_metadata"; pub const MY_EXTENDED_UT_METADATA: u8 = 3; +pub const EXTENDED_UT_PEX_KEY: &[u8] = b"ut_pex"; +pub const MY_EXTENDED_UT_PEX: u8 = 1; + #[derive(Debug)] pub enum MessageDeserializeError { NotEnoughData(usize, &'static str), @@ -276,7 +280,7 @@ where pub fn serialize( &self, out: &mut Vec, - extended_handshake_ut_metadata: &dyn Fn() -> Option, + peer_extended_messages: &dyn Fn() -> PeerExtendedMessageIds, ) -> anyhow::Result { let (lp, msg_id) = self.len_prefix_and_msg_id(); @@ -326,7 +330,7 @@ where Ok(msg_len) } Message::Extended(e) => { - e.serialize(out, extended_handshake_ut_metadata)?; + e.serialize(out, peer_extended_messages)?; let msg_size = out.len(); // no fucking idea why +1, but I tweaked that for it all to match up // with real messages. @@ -636,7 +640,7 @@ mod tests { fn test_extended_serialize() { let msg = Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new())); let mut out = Vec::new(); - msg.serialize(&mut out, &|| None).unwrap(); + msg.serialize(&mut out, &|| Default::default()).unwrap(); dbg!(out); } @@ -652,7 +656,7 @@ mod tests { let (msg, size) = MessageBorrowed::deserialize(&buf).unwrap(); assert_eq!(size, buf.len()); let mut write_buf = Vec::new(); - msg.serialize(&mut write_buf, &|| None).unwrap(); + msg.serialize(&mut write_buf, &|| Default::default()).unwrap(); if buf != write_buf { { use std::io::Write;