From 90bfb85bcc962ed7200828245db0f58811de5576 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 25 Aug 2024 12:53:28 +0100 Subject: [PATCH] Some cleanups for utpex --- crates/librqbit/src/peer_connection.rs | 2 +- crates/librqbit/src/torrent_state/live/mod.rs | 18 +- crates/peer_binary_protocol/Cargo.toml | 1 + .../src/extended/ut_pex.rs | 163 +++++------------- crates/peer_binary_protocol/src/lib.rs | 4 +- 5 files changed, 56 insertions(+), 132 deletions(-) diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 6d4ac1e..9b3ae73 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -251,7 +251,7 @@ impl PeerConnection { let my_extended = Message::Extended(ExtendedMessage::Handshake(my_extended)); trace!("sending extended handshake: {:?}", &my_extended); my_extended - .serialize(&mut write_buf, &|| Default::default()) + .serialize(&mut write_buf, &Default::default) .unwrap(); with_timeout(rwtimeout, conn.write_all(&write_buf)) .await diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 26fd846..6d57bcb 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -896,7 +896,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> anyhow::Result { let g = self.state.lock_read("serialize_bitfield_message_to_buf"); let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_bytes())); - let len = msg.serialize(buf, &|| Default::default())?; + let len = msg.serialize(buf, &Default::default)?; trace!("sending: {:?}, length={}", &msg, len); Ok(len) } @@ -1679,15 +1679,11 @@ impl PeerHandler { B: AsRef<[u8]> + std::fmt::Debug, { // TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ... - if let Ok(peers) = msg.added_peers() { - peers.for_each(|peer| { - self.state - .add_peer_if_not_seen(peer.addr) - .inspect_err(|e| warn!("failed to add peer: {peer:?} due to: {e}")) - .ok(); - }); - } else { - warn!("received invalid pex message: {:?}", msg); - } + msg.added_peers().for_each(|peer| { + self.state + .add_peer_if_not_seen(peer.addr) + .inspect_err(|error| warn!(?peer, ?error, "failed to add peer")) + .ok(); + }); } } diff --git a/crates/peer_binary_protocol/Cargo.toml b/crates/peer_binary_protocol/Cargo.toml index 4c2ebc5..4ef36a9 100644 --- a/crates/peer_binary_protocol/Cargo.toml +++ b/crates/peer_binary_protocol/Cargo.toml @@ -20,3 +20,4 @@ librqbit-core = { path = "../librqbit_core", default-features = false, version = bitvec = "1" anyhow = "1" bytes = "1.7.1" +itertools = "0.12" diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index c0312b6..00014c1 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -3,6 +3,7 @@ use std::net::{IpAddr, SocketAddr}; use byteorder::{ByteOrder, BE}; use bytes::Bytes; use clone_to_owned::CloneToOwned; +use itertools::{EitherOrBoth, Itertools}; use serde::{Deserialize, Serialize}; #[derive(Debug)] @@ -11,46 +12,18 @@ pub struct PexPeerInfo { pub addr: SocketAddr, } -impl PexPeerInfo { - pub fn from_bytes(buf: &[u8], flags: Option) -> anyhow::Result { - let (ip, port) = match buf.len() { - 6 => { - let ip_bytes: &[u8; 4] = (&buf[0..4]).try_into()?; - let ip = IpAddr::from(*ip_bytes); - let port = BE::read_u16(&buf[4..6]); - (ip, port) - } - 18 => { - let ip_bytes: &[u8; 16] = (&buf[0..16]).try_into()?; - let ip = IpAddr::from(*ip_bytes); - let port = BE::read_u16(&buf[16..18]); - (ip, port) - } - _ => anyhow::bail!("invalid pex peer info"), - }; - Ok(Self { - flags: flags.unwrap_or(0), - addr: (ip, port).into(), - }) - } -} - #[derive(Debug, Serialize, Default, Deserialize)] pub struct UtPex { - #[serde(skip_serializing_if = "Option::is_none")] - added: Option, + added: B, #[serde(rename = "added.f")] #[serde(skip_serializing_if = "Option::is_none")] added_f: Option, - #[serde(skip_serializing_if = "Option::is_none")] - added6: Option, + added6: B, #[serde(rename = "added6.f")] #[serde(skip_serializing_if = "Option::is_none")] added6_f: Option, - #[serde(skip_serializing_if = "Option::is_none")] - dropped: Option, - #[serde(skip_serializing_if = "Option::is_none")] - dropped6: Option, + dropped: B, + dropped6: B, } impl CloneToOwned for UtPex @@ -70,97 +43,45 @@ where } } -pub struct PeerAddrIterator<'a> { - addrs: &'a [u8], - flags: &'a [u8], - offset: usize, - addr_size: usize, -} - - - -impl<'a> Iterator for PeerAddrIterator<'a> { - type Item = PexPeerInfo; - fn next(&mut self) -> Option { - if self.offset*self.addr_size >= self.addrs.len() { - return None; - } - - let addr = &self.addrs[self.offset*self.addr_size..(self.offset+1)*self.addr_size]; - let flags = self.flags.get(self.offset); - self.offset += 1; - Some(PexPeerInfo::from_bytes(addr, flags.cloned()).unwrap()) // safe to unwrap as we assure slice length - - - } -} - impl UtPex where B: AsRef<[u8]>, { - pub fn added_peers<'a>(&'a self) -> anyhow::Result + 'a>> { - if let Some(added) = &self.added { - if added.as_ref().len() % 6 != 0 { - anyhow::bail!("invalid pex added peers"); - } - return Ok(Box::new(PeerAddrIterator { - addrs: added.as_ref(), - flags: self.added_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]), - offset: 0, - addr_size: 6, - })); - } else { - return Ok(Box::new(std::iter::empty())); - }; + fn added_peers_inner<'a>( + &'a self, + buf: &'a B, + flags: &'a Option, + ip_len: usize, + ) -> impl Iterator + 'a { + let addrs = buf.as_ref().chunks_exact(ip_len + 2).map(move |c| { + let ip = match ip_len { + 4 => IpAddr::from(TryInto::<[u8; 4]>::try_into(&c[..4]).unwrap()), + 16 => IpAddr::from(TryInto::<[u8; 16]>::try_into(&c[..16]).unwrap()), + _ => unreachable!(), + }; + let port = BE::read_u16(&c[ip_len..]); + SocketAddr::new(ip, port) + }); + let flags = flags + .as_ref() + .map(|b| b.as_ref().iter().copied()) + .into_iter() + .flatten(); + addrs.zip_longest(flags).filter_map(|eob| match eob { + EitherOrBoth::Both(addr, flags) => Some(PexPeerInfo { flags, addr }), + EitherOrBoth::Left(addr) => Some(PexPeerInfo { flags: 0, addr }), + EitherOrBoth::Right(_) => None, + }) } - pub fn added_peers_v6<'a>(&'a self) -> anyhow::Result + 'a>> { - if let Some(added) = &self.added6 { - if added.as_ref().len() % 18 != 0 { - anyhow::bail!("invalid pex added6 peers"); - } - return Ok(Box::new(PeerAddrIterator { - addrs: added.as_ref(), - flags: self.added6_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]), - offset: 0, - addr_size: 18, - })); - } else { - return Ok(Box::new(std::iter::empty())); - }; + pub fn added_peers(&self) -> impl Iterator + '_ { + self.added_peers_inner(&self.added, &self.added_f, 4) + .chain(self.added_peers_inner(&self.added6, &self.added6_f, 16)) } - pub fn dropped_peers<'a>(&'a self) -> anyhow::Result + 'a>> { - if let Some(dropped) = &self.dropped { - if dropped.as_ref().len() % 6 != 0 { - anyhow::bail!("invalid pex dropped peers"); - } - return Ok(Box::new(PeerAddrIterator { - addrs: dropped.as_ref(), - flags: &[], - offset: 0, - addr_size: 6, - })); - } else { - return Ok(Box::new(std::iter::empty())); - }; - } - - pub fn dropped_peers_v6<'a>(&'a self) -> anyhow::Result + 'a>> { - if let Some(dropped) = &self.dropped6 { - if dropped.as_ref().len() % 18 != 0 { - anyhow::bail!("invalid pex dropped6 peers"); - } - return Ok(Box::new(PeerAddrIterator { - addrs: dropped.as_ref(), - flags: &[], - offset: 0, - addr_size: 18, - })); - } else { - return Ok(Box::new(std::iter::empty())); - }; + pub fn dropped_peers(&self) -> impl Iterator + '_ { + self.added_peers_inner(&self.dropped, &None, 4) + .chain(self.added_peers_inner(&self.dropped6, &None, 16)) } } @@ -184,11 +105,17 @@ mod tests { let msg = "64353a616464656431323ab99f9d14b56797f969861090373a61646465642e66323a0c00363a616464656436303a383a6164646564362e66303a373a64726f70706564303a383a64726f7070656436303a65"; let bytes = decode_hex(msg); let pex = from_bytes::>(&bytes).unwrap(); - let addrs: Vec<_> = pex.added_peers().unwrap().collect(); + let addrs: Vec<_> = pex.added_peers().collect(); assert_eq!(2, addrs.len()); - assert_eq!("185.159.157.20:46439".parse::().unwrap(), addrs[0].addr); + assert_eq!( + "185.159.157.20:46439".parse::().unwrap(), + addrs[0].addr + ); assert_eq!(12, addrs[0].flags); - assert_eq!("151.249.105.134:4240".parse::().unwrap(), addrs[1].addr); + assert_eq!( + "151.249.105.134:4240".parse::().unwrap(), + addrs[1].addr + ); assert_eq!(0, addrs[1].flags); } } diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index a093984..64a39e5 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -640,7 +640,7 @@ mod tests { fn test_extended_serialize() { let msg = Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new())); let mut out = Vec::new(); - msg.serialize(&mut out, &|| Default::default()).unwrap(); + msg.serialize(&mut out, &Default::default).unwrap(); dbg!(out); } @@ -656,7 +656,7 @@ mod tests { let (msg, size) = MessageBorrowed::deserialize(&buf).unwrap(); assert_eq!(size, buf.len()); let mut write_buf = Vec::new(); - msg.serialize(&mut write_buf, &|| Default::default()).unwrap(); + msg.serialize(&mut write_buf, &Default::default).unwrap(); if buf != write_buf { { use std::io::Write;