From 5e09525dd534a8d9054627488487dbfdc71dce54 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sun, 25 Aug 2024 09:38:40 +0200 Subject: [PATCH] Feeding peers from PEX added - PoC --- crates/librqbit/src/torrent_state/live/mod.rs | 27 ++++++++- .../src/extended/ut_pex.rs | 57 ++++++++++++++++++- 2 files changed, 79 insertions(+), 5 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ad5c66b..26fd846 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -67,7 +67,12 @@ use librqbit_core::{ }; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use peer_binary_protocol::{ - extended::{handshake::{ExtendedHandshake, YourIP}, ut_metadata::UtMetadata, ExtendedMessage}, + extended::{ + handshake::{ExtendedHandshake, YourIP}, + ut_metadata::UtMetadata, + ut_pex::UtPex, + ExtendedMessage, + }, Handshake, Message, MessageOwned, Piece, Request, }; use peers::stats::atomic::AggregatePeerStatsAtomic; @@ -878,7 +883,8 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { .with_context(|| format!("error sending metadata piece {metadata_piece_id}"))?; } Message::Extended(ExtendedMessage::UtPex(pex)) => { - trace!("received ut_pex: {:?} added peers v4: {:?}", pex, pex.added_peers().unwrap().collect::>()); + trace!("received ut_pex: {:?}", pex); + self.on_pex_message(pex); } message => { warn!("received unsupported message {:?}, ignoring", message); @@ -1667,4 +1673,21 @@ impl PeerHandler { .context("error sending UtMetadata: channel closed")?; Ok(()) } + + fn on_pex_message(&self, msg: UtPex) + where + B: AsRef<[u8]> + std::fmt::Debug, + { + // TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ... + if let Ok(peers) = msg.added_peers() { + peers.for_each(|peer| { + self.state + .add_peer_if_not_seen(peer.addr) + .inspect_err(|e| warn!("failed to add peer: {peer:?} due to: {e}")) + .ok(); + }); + } else { + warn!("received invalid pex message: {:?}", msg); + } + } } diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index 470015d..c0312b6 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -74,17 +74,19 @@ pub struct PeerAddrIterator<'a> { addrs: &'a [u8], flags: &'a [u8], offset: usize, + addr_size: usize, } + + impl<'a> Iterator for PeerAddrIterator<'a> { type Item = PexPeerInfo; fn next(&mut self) -> Option { - const ADDR_SIZE: usize = 6; - if self.offset*ADDR_SIZE >= self.addrs.len() { + if self.offset*self.addr_size >= self.addrs.len() { return None; } - let addr = &self.addrs[self.offset*ADDR_SIZE..(self.offset+1)*ADDR_SIZE]; + let addr = &self.addrs[self.offset*self.addr_size..(self.offset+1)*self.addr_size]; let flags = self.flags.get(self.offset); self.offset += 1; Some(PexPeerInfo::from_bytes(addr, flags.cloned()).unwrap()) // safe to unwrap as we assure slice length @@ -106,6 +108,55 @@ where addrs: added.as_ref(), flags: self.added_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]), offset: 0, + addr_size: 6, + })); + } else { + return Ok(Box::new(std::iter::empty())); + }; + } + + pub fn added_peers_v6<'a>(&'a self) -> anyhow::Result + 'a>> { + if let Some(added) = &self.added6 { + if added.as_ref().len() % 18 != 0 { + anyhow::bail!("invalid pex added6 peers"); + } + return Ok(Box::new(PeerAddrIterator { + addrs: added.as_ref(), + flags: self.added6_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]), + offset: 0, + addr_size: 18, + })); + } else { + return Ok(Box::new(std::iter::empty())); + }; + } + + pub fn dropped_peers<'a>(&'a self) -> anyhow::Result + 'a>> { + if let Some(dropped) = &self.dropped { + if dropped.as_ref().len() % 6 != 0 { + anyhow::bail!("invalid pex dropped peers"); + } + return Ok(Box::new(PeerAddrIterator { + addrs: dropped.as_ref(), + flags: &[], + offset: 0, + addr_size: 6, + })); + } else { + return Ok(Box::new(std::iter::empty())); + }; + } + + pub fn dropped_peers_v6<'a>(&'a self) -> anyhow::Result + 'a>> { + if let Some(dropped) = &self.dropped6 { + if dropped.as_ref().len() % 18 != 0 { + anyhow::bail!("invalid pex dropped6 peers"); + } + return Ok(Box::new(PeerAddrIterator { + addrs: dropped.as_ref(), + flags: &[], + offset: 0, + addr_size: 18, })); } else { return Ok(Box::new(std::iter::empty()));