Feeding peers from PEX added - PoC
This commit is contained in:
parent
bbc951733f
commit
5e09525dd5
2 changed files with 79 additions and 5 deletions
|
|
@ -67,7 +67,12 @@ use librqbit_core::{
|
||||||
};
|
};
|
||||||
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||||
use peer_binary_protocol::{
|
use peer_binary_protocol::{
|
||||||
extended::{handshake::{ExtendedHandshake, YourIP}, ut_metadata::UtMetadata, ExtendedMessage},
|
extended::{
|
||||||
|
handshake::{ExtendedHandshake, YourIP},
|
||||||
|
ut_metadata::UtMetadata,
|
||||||
|
ut_pex::UtPex,
|
||||||
|
ExtendedMessage,
|
||||||
|
},
|
||||||
Handshake, Message, MessageOwned, Piece, Request,
|
Handshake, Message, MessageOwned, Piece, Request,
|
||||||
};
|
};
|
||||||
use peers::stats::atomic::AggregatePeerStatsAtomic;
|
use peers::stats::atomic::AggregatePeerStatsAtomic;
|
||||||
|
|
@ -878,7 +883,8 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
||||||
.with_context(|| format!("error sending metadata piece {metadata_piece_id}"))?;
|
.with_context(|| format!("error sending metadata piece {metadata_piece_id}"))?;
|
||||||
}
|
}
|
||||||
Message::Extended(ExtendedMessage::UtPex(pex)) => {
|
Message::Extended(ExtendedMessage::UtPex(pex)) => {
|
||||||
trace!("received ut_pex: {:?} added peers v4: {:?}", pex, pex.added_peers().unwrap().collect::<Vec<_>>());
|
trace!("received ut_pex: {:?}", pex);
|
||||||
|
self.on_pex_message(pex);
|
||||||
}
|
}
|
||||||
message => {
|
message => {
|
||||||
warn!("received unsupported message {:?}, ignoring", message);
|
warn!("received unsupported message {:?}, ignoring", message);
|
||||||
|
|
@ -1667,4 +1673,21 @@ impl PeerHandler {
|
||||||
.context("error sending UtMetadata: channel closed")?;
|
.context("error sending UtMetadata: channel closed")?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn on_pex_message<B>(&self, msg: UtPex<B>)
|
||||||
|
where
|
||||||
|
B: AsRef<[u8]> + std::fmt::Debug,
|
||||||
|
{
|
||||||
|
// TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ...
|
||||||
|
if let Ok(peers) = msg.added_peers() {
|
||||||
|
peers.for_each(|peer| {
|
||||||
|
self.state
|
||||||
|
.add_peer_if_not_seen(peer.addr)
|
||||||
|
.inspect_err(|e| warn!("failed to add peer: {peer:?} due to: {e}"))
|
||||||
|
.ok();
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
warn!("received invalid pex message: {:?}", msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -74,17 +74,19 @@ pub struct PeerAddrIterator<'a> {
|
||||||
addrs: &'a [u8],
|
addrs: &'a [u8],
|
||||||
flags: &'a [u8],
|
flags: &'a [u8],
|
||||||
offset: usize,
|
offset: usize,
|
||||||
|
addr_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
impl<'a> Iterator for PeerAddrIterator<'a> {
|
impl<'a> Iterator for PeerAddrIterator<'a> {
|
||||||
type Item = PexPeerInfo;
|
type Item = PexPeerInfo;
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
const ADDR_SIZE: usize = 6;
|
if self.offset*self.addr_size >= self.addrs.len() {
|
||||||
if self.offset*ADDR_SIZE >= self.addrs.len() {
|
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
let addr = &self.addrs[self.offset*ADDR_SIZE..(self.offset+1)*ADDR_SIZE];
|
let addr = &self.addrs[self.offset*self.addr_size..(self.offset+1)*self.addr_size];
|
||||||
let flags = self.flags.get(self.offset);
|
let flags = self.flags.get(self.offset);
|
||||||
self.offset += 1;
|
self.offset += 1;
|
||||||
Some(PexPeerInfo::from_bytes(addr, flags.cloned()).unwrap()) // safe to unwrap as we assure slice length
|
Some(PexPeerInfo::from_bytes(addr, flags.cloned()).unwrap()) // safe to unwrap as we assure slice length
|
||||||
|
|
@ -106,6 +108,55 @@ where
|
||||||
addrs: added.as_ref(),
|
addrs: added.as_ref(),
|
||||||
flags: self.added_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]),
|
flags: self.added_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]),
|
||||||
offset: 0,
|
offset: 0,
|
||||||
|
addr_size: 6,
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
return Ok(Box::new(std::iter::empty()));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn added_peers_v6<'a>(&'a self) -> anyhow::Result<Box<dyn Iterator<Item = PexPeerInfo> + 'a>> {
|
||||||
|
if let Some(added) = &self.added6 {
|
||||||
|
if added.as_ref().len() % 18 != 0 {
|
||||||
|
anyhow::bail!("invalid pex added6 peers");
|
||||||
|
}
|
||||||
|
return Ok(Box::new(PeerAddrIterator {
|
||||||
|
addrs: added.as_ref(),
|
||||||
|
flags: self.added6_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]),
|
||||||
|
offset: 0,
|
||||||
|
addr_size: 18,
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
return Ok(Box::new(std::iter::empty()));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn dropped_peers<'a>(&'a self) -> anyhow::Result<Box<dyn Iterator<Item = PexPeerInfo> + 'a>> {
|
||||||
|
if let Some(dropped) = &self.dropped {
|
||||||
|
if dropped.as_ref().len() % 6 != 0 {
|
||||||
|
anyhow::bail!("invalid pex dropped peers");
|
||||||
|
}
|
||||||
|
return Ok(Box::new(PeerAddrIterator {
|
||||||
|
addrs: dropped.as_ref(),
|
||||||
|
flags: &[],
|
||||||
|
offset: 0,
|
||||||
|
addr_size: 6,
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
return Ok(Box::new(std::iter::empty()));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn dropped_peers_v6<'a>(&'a self) -> anyhow::Result<Box<dyn Iterator<Item = PexPeerInfo> + 'a>> {
|
||||||
|
if let Some(dropped) = &self.dropped6 {
|
||||||
|
if dropped.as_ref().len() % 18 != 0 {
|
||||||
|
anyhow::bail!("invalid pex dropped6 peers");
|
||||||
|
}
|
||||||
|
return Ok(Box::new(PeerAddrIterator {
|
||||||
|
addrs: dropped.as_ref(),
|
||||||
|
flags: &[],
|
||||||
|
offset: 0,
|
||||||
|
addr_size: 18,
|
||||||
}));
|
}));
|
||||||
} else {
|
} else {
|
||||||
return Ok(Box::new(std::iter::empty()));
|
return Ok(Box::new(std::iter::empty()));
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue