Some cleanups for utpex
This commit is contained in:
parent
5e09525dd5
commit
90bfb85bcc
5 changed files with 56 additions and 132 deletions
|
|
@ -251,7 +251,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
let my_extended = Message::Extended(ExtendedMessage::Handshake(my_extended));
|
let my_extended = Message::Extended(ExtendedMessage::Handshake(my_extended));
|
||||||
trace!("sending extended handshake: {:?}", &my_extended);
|
trace!("sending extended handshake: {:?}", &my_extended);
|
||||||
my_extended
|
my_extended
|
||||||
.serialize(&mut write_buf, &|| Default::default())
|
.serialize(&mut write_buf, &Default::default)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
with_timeout(rwtimeout, conn.write_all(&write_buf))
|
with_timeout(rwtimeout, conn.write_all(&write_buf))
|
||||||
.await
|
.await
|
||||||
|
|
|
||||||
|
|
@ -896,7 +896,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
||||||
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize> {
|
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize> {
|
||||||
let g = self.state.lock_read("serialize_bitfield_message_to_buf");
|
let g = self.state.lock_read("serialize_bitfield_message_to_buf");
|
||||||
let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_bytes()));
|
let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_bytes()));
|
||||||
let len = msg.serialize(buf, &|| Default::default())?;
|
let len = msg.serialize(buf, &Default::default)?;
|
||||||
trace!("sending: {:?}, length={}", &msg, len);
|
trace!("sending: {:?}, length={}", &msg, len);
|
||||||
Ok(len)
|
Ok(len)
|
||||||
}
|
}
|
||||||
|
|
@ -1679,15 +1679,11 @@ impl PeerHandler {
|
||||||
B: AsRef<[u8]> + std::fmt::Debug,
|
B: AsRef<[u8]> + std::fmt::Debug,
|
||||||
{
|
{
|
||||||
// TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ...
|
// TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ...
|
||||||
if let Ok(peers) = msg.added_peers() {
|
msg.added_peers().for_each(|peer| {
|
||||||
peers.for_each(|peer| {
|
self.state
|
||||||
self.state
|
.add_peer_if_not_seen(peer.addr)
|
||||||
.add_peer_if_not_seen(peer.addr)
|
.inspect_err(|error| warn!(?peer, ?error, "failed to add peer"))
|
||||||
.inspect_err(|e| warn!("failed to add peer: {peer:?} due to: {e}"))
|
.ok();
|
||||||
.ok();
|
});
|
||||||
});
|
|
||||||
} else {
|
|
||||||
warn!("received invalid pex message: {:?}", msg);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,3 +20,4 @@ librqbit-core = { path = "../librqbit_core", default-features = false, version =
|
||||||
bitvec = "1"
|
bitvec = "1"
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
bytes = "1.7.1"
|
bytes = "1.7.1"
|
||||||
|
itertools = "0.12"
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ use std::net::{IpAddr, SocketAddr};
|
||||||
use byteorder::{ByteOrder, BE};
|
use byteorder::{ByteOrder, BE};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use clone_to_owned::CloneToOwned;
|
use clone_to_owned::CloneToOwned;
|
||||||
|
use itertools::{EitherOrBoth, Itertools};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -11,46 +12,18 @@ pub struct PexPeerInfo {
|
||||||
pub addr: SocketAddr,
|
pub addr: SocketAddr,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PexPeerInfo {
|
|
||||||
pub fn from_bytes(buf: &[u8], flags: Option<u8>) -> anyhow::Result<Self> {
|
|
||||||
let (ip, port) = match buf.len() {
|
|
||||||
6 => {
|
|
||||||
let ip_bytes: &[u8; 4] = (&buf[0..4]).try_into()?;
|
|
||||||
let ip = IpAddr::from(*ip_bytes);
|
|
||||||
let port = BE::read_u16(&buf[4..6]);
|
|
||||||
(ip, port)
|
|
||||||
}
|
|
||||||
18 => {
|
|
||||||
let ip_bytes: &[u8; 16] = (&buf[0..16]).try_into()?;
|
|
||||||
let ip = IpAddr::from(*ip_bytes);
|
|
||||||
let port = BE::read_u16(&buf[16..18]);
|
|
||||||
(ip, port)
|
|
||||||
}
|
|
||||||
_ => anyhow::bail!("invalid pex peer info"),
|
|
||||||
};
|
|
||||||
Ok(Self {
|
|
||||||
flags: flags.unwrap_or(0),
|
|
||||||
addr: (ip, port).into(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Default, Deserialize)]
|
#[derive(Debug, Serialize, Default, Deserialize)]
|
||||||
pub struct UtPex<B> {
|
pub struct UtPex<B> {
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
added: B,
|
||||||
added: Option<B>,
|
|
||||||
#[serde(rename = "added.f")]
|
#[serde(rename = "added.f")]
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
added_f: Option<B>,
|
added_f: Option<B>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
added6: B,
|
||||||
added6: Option<B>,
|
|
||||||
#[serde(rename = "added6.f")]
|
#[serde(rename = "added6.f")]
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
added6_f: Option<B>,
|
added6_f: Option<B>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
dropped: B,
|
||||||
dropped: Option<B>,
|
dropped6: B,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
dropped6: Option<B>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B> CloneToOwned for UtPex<B>
|
impl<B> CloneToOwned for UtPex<B>
|
||||||
|
|
@ -70,97 +43,45 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct PeerAddrIterator<'a> {
|
|
||||||
addrs: &'a [u8],
|
|
||||||
flags: &'a [u8],
|
|
||||||
offset: usize,
|
|
||||||
addr_size: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
impl<'a> Iterator for PeerAddrIterator<'a> {
|
|
||||||
type Item = PexPeerInfo;
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
|
||||||
if self.offset*self.addr_size >= self.addrs.len() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
let addr = &self.addrs[self.offset*self.addr_size..(self.offset+1)*self.addr_size];
|
|
||||||
let flags = self.flags.get(self.offset);
|
|
||||||
self.offset += 1;
|
|
||||||
Some(PexPeerInfo::from_bytes(addr, flags.cloned()).unwrap()) // safe to unwrap as we assure slice length
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<B> UtPex<B>
|
impl<B> UtPex<B>
|
||||||
where
|
where
|
||||||
B: AsRef<[u8]>,
|
B: AsRef<[u8]>,
|
||||||
{
|
{
|
||||||
pub fn added_peers<'a>(&'a self) -> anyhow::Result<Box<dyn Iterator<Item = PexPeerInfo> + 'a>> {
|
fn added_peers_inner<'a>(
|
||||||
if let Some(added) = &self.added {
|
&'a self,
|
||||||
if added.as_ref().len() % 6 != 0 {
|
buf: &'a B,
|
||||||
anyhow::bail!("invalid pex added peers");
|
flags: &'a Option<B>,
|
||||||
}
|
ip_len: usize,
|
||||||
return Ok(Box::new(PeerAddrIterator {
|
) -> impl Iterator<Item = PexPeerInfo> + 'a {
|
||||||
addrs: added.as_ref(),
|
let addrs = buf.as_ref().chunks_exact(ip_len + 2).map(move |c| {
|
||||||
flags: self.added_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]),
|
let ip = match ip_len {
|
||||||
offset: 0,
|
4 => IpAddr::from(TryInto::<[u8; 4]>::try_into(&c[..4]).unwrap()),
|
||||||
addr_size: 6,
|
16 => IpAddr::from(TryInto::<[u8; 16]>::try_into(&c[..16]).unwrap()),
|
||||||
}));
|
_ => unreachable!(),
|
||||||
} else {
|
};
|
||||||
return Ok(Box::new(std::iter::empty()));
|
let port = BE::read_u16(&c[ip_len..]);
|
||||||
};
|
SocketAddr::new(ip, port)
|
||||||
|
});
|
||||||
|
let flags = flags
|
||||||
|
.as_ref()
|
||||||
|
.map(|b| b.as_ref().iter().copied())
|
||||||
|
.into_iter()
|
||||||
|
.flatten();
|
||||||
|
addrs.zip_longest(flags).filter_map(|eob| match eob {
|
||||||
|
EitherOrBoth::Both(addr, flags) => Some(PexPeerInfo { flags, addr }),
|
||||||
|
EitherOrBoth::Left(addr) => Some(PexPeerInfo { flags: 0, addr }),
|
||||||
|
EitherOrBoth::Right(_) => None,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn added_peers_v6<'a>(&'a self) -> anyhow::Result<Box<dyn Iterator<Item = PexPeerInfo> + 'a>> {
|
pub fn added_peers(&self) -> impl Iterator<Item = PexPeerInfo> + '_ {
|
||||||
if let Some(added) = &self.added6 {
|
self.added_peers_inner(&self.added, &self.added_f, 4)
|
||||||
if added.as_ref().len() % 18 != 0 {
|
.chain(self.added_peers_inner(&self.added6, &self.added6_f, 16))
|
||||||
anyhow::bail!("invalid pex added6 peers");
|
|
||||||
}
|
|
||||||
return Ok(Box::new(PeerAddrIterator {
|
|
||||||
addrs: added.as_ref(),
|
|
||||||
flags: self.added6_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]),
|
|
||||||
offset: 0,
|
|
||||||
addr_size: 18,
|
|
||||||
}));
|
|
||||||
} else {
|
|
||||||
return Ok(Box::new(std::iter::empty()));
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dropped_peers<'a>(&'a self) -> anyhow::Result<Box<dyn Iterator<Item = PexPeerInfo> + 'a>> {
|
pub fn dropped_peers(&self) -> impl Iterator<Item = PexPeerInfo> + '_ {
|
||||||
if let Some(dropped) = &self.dropped {
|
self.added_peers_inner(&self.dropped, &None, 4)
|
||||||
if dropped.as_ref().len() % 6 != 0 {
|
.chain(self.added_peers_inner(&self.dropped6, &None, 16))
|
||||||
anyhow::bail!("invalid pex dropped peers");
|
|
||||||
}
|
|
||||||
return Ok(Box::new(PeerAddrIterator {
|
|
||||||
addrs: dropped.as_ref(),
|
|
||||||
flags: &[],
|
|
||||||
offset: 0,
|
|
||||||
addr_size: 6,
|
|
||||||
}));
|
|
||||||
} else {
|
|
||||||
return Ok(Box::new(std::iter::empty()));
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn dropped_peers_v6<'a>(&'a self) -> anyhow::Result<Box<dyn Iterator<Item = PexPeerInfo> + 'a>> {
|
|
||||||
if let Some(dropped) = &self.dropped6 {
|
|
||||||
if dropped.as_ref().len() % 18 != 0 {
|
|
||||||
anyhow::bail!("invalid pex dropped6 peers");
|
|
||||||
}
|
|
||||||
return Ok(Box::new(PeerAddrIterator {
|
|
||||||
addrs: dropped.as_ref(),
|
|
||||||
flags: &[],
|
|
||||||
offset: 0,
|
|
||||||
addr_size: 18,
|
|
||||||
}));
|
|
||||||
} else {
|
|
||||||
return Ok(Box::new(std::iter::empty()));
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -184,11 +105,17 @@ mod tests {
|
||||||
let msg = "64353a616464656431323ab99f9d14b56797f969861090373a61646465642e66323a0c00363a616464656436303a383a6164646564362e66303a373a64726f70706564303a383a64726f7070656436303a65";
|
let msg = "64353a616464656431323ab99f9d14b56797f969861090373a61646465642e66323a0c00363a616464656436303a383a6164646564362e66303a373a64726f70706564303a383a64726f7070656436303a65";
|
||||||
let bytes = decode_hex(msg);
|
let bytes = decode_hex(msg);
|
||||||
let pex = from_bytes::<UtPex<ByteBuf>>(&bytes).unwrap();
|
let pex = from_bytes::<UtPex<ByteBuf>>(&bytes).unwrap();
|
||||||
let addrs: Vec<_> = pex.added_peers().unwrap().collect();
|
let addrs: Vec<_> = pex.added_peers().collect();
|
||||||
assert_eq!(2, addrs.len());
|
assert_eq!(2, addrs.len());
|
||||||
assert_eq!("185.159.157.20:46439".parse::<SocketAddr>().unwrap(), addrs[0].addr);
|
assert_eq!(
|
||||||
|
"185.159.157.20:46439".parse::<SocketAddr>().unwrap(),
|
||||||
|
addrs[0].addr
|
||||||
|
);
|
||||||
assert_eq!(12, addrs[0].flags);
|
assert_eq!(12, addrs[0].flags);
|
||||||
assert_eq!("151.249.105.134:4240".parse::<SocketAddr>().unwrap(), addrs[1].addr);
|
assert_eq!(
|
||||||
|
"151.249.105.134:4240".parse::<SocketAddr>().unwrap(),
|
||||||
|
addrs[1].addr
|
||||||
|
);
|
||||||
assert_eq!(0, addrs[1].flags);
|
assert_eq!(0, addrs[1].flags);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -640,7 +640,7 @@ mod tests {
|
||||||
fn test_extended_serialize() {
|
fn test_extended_serialize() {
|
||||||
let msg = Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new()));
|
let msg = Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new()));
|
||||||
let mut out = Vec::new();
|
let mut out = Vec::new();
|
||||||
msg.serialize(&mut out, &|| Default::default()).unwrap();
|
msg.serialize(&mut out, &Default::default).unwrap();
|
||||||
dbg!(out);
|
dbg!(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -656,7 +656,7 @@ mod tests {
|
||||||
let (msg, size) = MessageBorrowed::deserialize(&buf).unwrap();
|
let (msg, size) = MessageBorrowed::deserialize(&buf).unwrap();
|
||||||
assert_eq!(size, buf.len());
|
assert_eq!(size, buf.len());
|
||||||
let mut write_buf = Vec::new();
|
let mut write_buf = Vec::new();
|
||||||
msg.serialize(&mut write_buf, &|| Default::default()).unwrap();
|
msg.serialize(&mut write_buf, &Default::default).unwrap();
|
||||||
if buf != write_buf {
|
if buf != write_buf {
|
||||||
{
|
{
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue