PEX: Announce support and message definition

This commit is contained in:
Ivan 2024-08-24 18:18:07 +02:00 committed by Igor Katson
parent c758d0022d
commit bbc951733f
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
7 changed files with 213 additions and 35 deletions

View file

@ -1,16 +1,16 @@
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
net::IpAddr,
};
use buffers::ByteBuf;
use byteorder::ByteOrder;
use byteorder::BE;
use bytes::Bytes;
use clone_to_owned::CloneToOwned;
use serde::{Deserialize, Deserializer, Serialize};
use crate::{EXTENDED_UT_METADATA_KEY, MY_EXTENDED_UT_METADATA};
use crate::{EXTENDED_UT_METADATA_KEY, EXTENDED_UT_PEX_KEY, MY_EXTENDED_UT_METADATA, MY_EXTENDED_UT_PEX};
use super::PeerExtendedMessageIds;
#[derive(Deserialize, Serialize, Debug, Default)]
pub struct ExtendedHandshake<ByteBuf: Eq + std::hash::Hash> {
@ -40,6 +40,7 @@ impl ExtendedHandshake<ByteBuf<'static>> {
pub fn new() -> Self {
let mut features = HashMap::new();
features.insert(ByteBuf(EXTENDED_UT_METADATA_KEY), MY_EXTENDED_UT_METADATA);
features.insert(ByteBuf(EXTENDED_UT_PEX_KEY), MY_EXTENDED_UT_PEX);
Self {
m: features,
..Default::default()
@ -58,6 +59,17 @@ where
pub fn ut_metadata(&self) -> Option<u8> {
self.get_msgid(EXTENDED_UT_METADATA_KEY)
}
pub fn ut_pex(&self) -> Option<u8> {
self.get_msgid(EXTENDED_UT_PEX_KEY)
}
pub fn peer_extended_messages(&self) -> PeerExtendedMessageIds {
PeerExtendedMessageIds {
ut_metadata: self.ut_metadata(),
ut_pex: self.ut_pex(),
}
}
}
impl<ByteBuf> CloneToOwned for ExtendedHandshake<ByteBuf>
@ -122,18 +134,11 @@ impl<'de> Deserialize<'de> for YourIP {
E: serde::de::Error,
{
if v.len() == 4 {
return Ok(YourIP(IpAddr::V4(Ipv4Addr::new(v[0], v[1], v[2], v[3]))));
let ip_bytes: &[u8; 4] = v[0..4].try_into().unwrap(); // Safe to unwrap as we check slice length
return Ok(YourIP(IpAddr::from(*ip_bytes)));
} else if v.len() == 16 {
return Ok(YourIP(IpAddr::V6(Ipv6Addr::new(
BE::read_u16(&v[..2]),
BE::read_u16(&v[2..4]),
BE::read_u16(&v[4..6]),
BE::read_u16(&v[6..8]),
BE::read_u16(&v[8..10]),
BE::read_u16(&v[10..12]),
BE::read_u16(&v[12..14]),
BE::read_u16(&v[14..]),
))));
let ip_bytes: &[u8; 16] = v[0..16].try_into().unwrap(); // Safe to unwrap as we check slice length
return Ok(YourIP(IpAddr::from(*ip_bytes)));
}
Err(E::custom("expected 4 or 16 byte address"))
}

View file

@ -4,6 +4,9 @@ use bencode::BencodeValue;
use bytes::Bytes;
use clone_to_owned::CloneToOwned;
use serde::{Deserialize, Serialize};
use ut_pex::UtPex;
use crate::MY_EXTENDED_UT_PEX;
use self::{handshake::ExtendedHandshake, ut_metadata::UtMetadata};
@ -11,13 +14,21 @@ use super::MessageDeserializeError;
pub mod handshake;
pub mod ut_metadata;
pub mod ut_pex;
use super::MY_EXTENDED_UT_METADATA;
#[derive(Debug, Default)]
pub struct PeerExtendedMessageIds {
pub ut_metadata: Option<u8>,
pub ut_pex: Option<u8>,
}
#[derive(Debug)]
pub enum ExtendedMessage<ByteBuf: std::hash::Hash + Eq> {
Handshake(ExtendedHandshake<ByteBuf>),
UtMetadata(UtMetadata<ByteBuf>),
UtPex(UtPex<ByteBuf>),
Dyn(u8, BencodeValue<ByteBuf>),
}
@ -37,6 +48,7 @@ where
ExtendedMessage::UtMetadata(m) => {
ExtendedMessage::UtMetadata(m.clone_to_owned(within_buffer))
}
ExtendedMessage::UtPex(m) => ExtendedMessage::UtPex(m.clone_to_owned(within_buffer)),
}
}
}
@ -45,7 +57,7 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage<ByteBuf
pub fn serialize(
&self,
out: &mut Vec<u8>,
extended_handshake_ut_metadata: &dyn Fn() -> Option<u8>,
extended_handshake_ut_metadata: &dyn Fn() -> PeerExtendedMessageIds,
) -> anyhow::Result<()>
where
ByteBuf: AsRef<[u8]>,
@ -60,12 +72,20 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage<ByteBuf
bencode_serialize_to_writer(h, out)?;
}
ExtendedMessage::UtMetadata(u) => {
let emsg_id = extended_handshake_ut_metadata().ok_or_else(|| {
let emsg_id = extended_handshake_ut_metadata().ut_metadata.ok_or_else(|| {
anyhow::anyhow!("need peer's handshake to serialize ut_metadata")
})?;
out.push(emsg_id);
u.serialize(out);
}
},
ExtendedMessage::UtPex(m) => {
let emsg_id = extended_handshake_ut_metadata().ut_pex.ok_or_else(|| {
anyhow::anyhow!("need peer's handshake to serialize ut_pex, or peer does't support ut_pex")
})?;
out.push(emsg_id);
bencode_serialize_to_writer(m, out)?;
},
}
Ok(())
}
@ -91,6 +111,7 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage<ByteBuf
MY_EXTENDED_UT_METADATA => {
Ok(ExtendedMessage::UtMetadata(UtMetadata::deserialize(buf)?))
}
MY_EXTENDED_UT_PEX => Ok(ExtendedMessage::UtPex(from_bytes(buf)?)),
_ => Ok(ExtendedMessage::Dyn(emsg_id, from_bytes(buf)?)),
}
}

View file

@ -0,0 +1,143 @@
use std::net::{IpAddr, SocketAddr};
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use clone_to_owned::CloneToOwned;
use serde::{Deserialize, Serialize};
#[derive(Debug)]
pub struct PexPeerInfo {
pub flags: u8,
pub addr: SocketAddr,
}
impl PexPeerInfo {
pub fn from_bytes(buf: &[u8], flags: Option<u8>) -> anyhow::Result<Self> {
let (ip, port) = match buf.len() {
6 => {
let ip_bytes: &[u8; 4] = (&buf[0..4]).try_into()?;
let ip = IpAddr::from(*ip_bytes);
let port = BE::read_u16(&buf[4..6]);
(ip, port)
}
18 => {
let ip_bytes: &[u8; 16] = (&buf[0..16]).try_into()?;
let ip = IpAddr::from(*ip_bytes);
let port = BE::read_u16(&buf[16..18]);
(ip, port)
}
_ => anyhow::bail!("invalid pex peer info"),
};
Ok(Self {
flags: flags.unwrap_or(0),
addr: (ip, port).into(),
})
}
}
#[derive(Debug, Serialize, Default, Deserialize)]
pub struct UtPex<B> {
#[serde(skip_serializing_if = "Option::is_none")]
added: Option<B>,
#[serde(rename = "added.f")]
#[serde(skip_serializing_if = "Option::is_none")]
added_f: Option<B>,
#[serde(skip_serializing_if = "Option::is_none")]
added6: Option<B>,
#[serde(rename = "added6.f")]
#[serde(skip_serializing_if = "Option::is_none")]
added6_f: Option<B>,
#[serde(skip_serializing_if = "Option::is_none")]
dropped: Option<B>,
#[serde(skip_serializing_if = "Option::is_none")]
dropped6: Option<B>,
}
impl<B> CloneToOwned for UtPex<B>
where
B: CloneToOwned,
{
type Target = UtPex<<B as CloneToOwned>::Target>;
fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target {
UtPex {
added: self.added.clone_to_owned(within_buffer),
added_f: self.added_f.clone_to_owned(within_buffer),
added6: self.added6.clone_to_owned(within_buffer),
added6_f: self.added6_f.clone_to_owned(within_buffer),
dropped: self.dropped.clone_to_owned(within_buffer),
dropped6: self.dropped6.clone_to_owned(within_buffer),
}
}
}
pub struct PeerAddrIterator<'a> {
addrs: &'a [u8],
flags: &'a [u8],
offset: usize,
}
impl<'a> Iterator for PeerAddrIterator<'a> {
type Item = PexPeerInfo;
fn next(&mut self) -> Option<Self::Item> {
const ADDR_SIZE: usize = 6;
if self.offset*ADDR_SIZE >= self.addrs.len() {
return None;
}
let addr = &self.addrs[self.offset*ADDR_SIZE..(self.offset+1)*ADDR_SIZE];
let flags = self.flags.get(self.offset);
self.offset += 1;
Some(PexPeerInfo::from_bytes(addr, flags.cloned()).unwrap()) // safe to unwrap as we assure slice length
}
}
impl<B> UtPex<B>
where
B: AsRef<[u8]>,
{
pub fn added_peers<'a>(&'a self) -> anyhow::Result<Box<dyn Iterator<Item = PexPeerInfo> + 'a>> {
if let Some(added) = &self.added {
if added.as_ref().len() % 6 != 0 {
anyhow::bail!("invalid pex added peers");
}
return Ok(Box::new(PeerAddrIterator {
addrs: added.as_ref(),
flags: self.added_f.as_ref().map(|f| f.as_ref()).unwrap_or(&[]),
offset: 0,
}));
} else {
return Ok(Box::new(std::iter::empty()));
};
}
}
#[cfg(test)]
mod tests {
use bencode::from_bytes;
use buffers::ByteBuf;
use super::*;
fn decode_hex(s: &str) -> Vec<u8> {
assert!(s.len() % 2 == 0);
(0..s.len())
.step_by(2)
.map(|i| u8::from_str_radix(&s[i..i + 2], 16).unwrap())
.collect()
}
#[test]
fn test_pex_deserialization() {
let msg = "64353a616464656431323ab99f9d14b56797f969861090373a61646465642e66323a0c00363a616464656436303a383a6164646564362e66303a373a64726f70706564303a383a64726f7070656436303a65";
let bytes = decode_hex(msg);
let pex = from_bytes::<UtPex<ByteBuf>>(&bytes).unwrap();
let addrs: Vec<_> = pex.added_peers().unwrap().collect();
assert_eq!(2, addrs.len());
assert_eq!("185.159.157.20:46439".parse::<SocketAddr>().unwrap(), addrs[0].addr);
assert_eq!(12, addrs[0].flags);
assert_eq!("151.249.105.134:4240".parse::<SocketAddr>().unwrap(), addrs[1].addr);
assert_eq!(0, addrs[1].flags);
}
}

View file

@ -9,6 +9,7 @@ use buffers::{ByteBuf, ByteBufOwned};
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use clone_to_owned::CloneToOwned;
use extended::PeerExtendedMessageIds;
use librqbit_core::{constants::CHUNK_SIZE, hash_id::Id20, lengths::ChunkInfo};
use serde::{Deserialize, Serialize};
@ -47,6 +48,9 @@ const MSGID_EXTENDED: u8 = 20;
pub const EXTENDED_UT_METADATA_KEY: &[u8] = b"ut_metadata";
pub const MY_EXTENDED_UT_METADATA: u8 = 3;
pub const EXTENDED_UT_PEX_KEY: &[u8] = b"ut_pex";
pub const MY_EXTENDED_UT_PEX: u8 = 1;
#[derive(Debug)]
pub enum MessageDeserializeError {
NotEnoughData(usize, &'static str),
@ -276,7 +280,7 @@ where
pub fn serialize(
&self,
out: &mut Vec<u8>,
extended_handshake_ut_metadata: &dyn Fn() -> Option<u8>,
peer_extended_messages: &dyn Fn() -> PeerExtendedMessageIds,
) -> anyhow::Result<usize> {
let (lp, msg_id) = self.len_prefix_and_msg_id();
@ -326,7 +330,7 @@ where
Ok(msg_len)
}
Message::Extended(e) => {
e.serialize(out, extended_handshake_ut_metadata)?;
e.serialize(out, peer_extended_messages)?;
let msg_size = out.len();
// no fucking idea why +1, but I tweaked that for it all to match up
// with real messages.
@ -636,7 +640,7 @@ mod tests {
fn test_extended_serialize() {
let msg = Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new()));
let mut out = Vec::new();
msg.serialize(&mut out, &|| None).unwrap();
msg.serialize(&mut out, &|| Default::default()).unwrap();
dbg!(out);
}
@ -652,7 +656,7 @@ mod tests {
let (msg, size) = MessageBorrowed::deserialize(&buf).unwrap();
assert_eq!(size, buf.len());
let mut write_buf = Vec::new();
msg.serialize(&mut write_buf, &|| None).unwrap();
msg.serialize(&mut write_buf, &|| Default::default()).unwrap();
if buf != write_buf {
{
use std::io::Write;