Merge pull request #221 from ikatson/pex-updates

PEX - Peer Exchange initial impl by @izderadicka
This commit is contained in:
Igor Katson 2024-08-28 10:30:18 +01:00 committed by GitHub
commit b612c379e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 325 additions and 116 deletions

9
Cargo.lock generated
View file

@ -1638,6 +1638,7 @@ dependencies = [
"bitvec",
"byteorder",
"bytes",
"itertools",
"librqbit-bencode",
"librqbit-buffers",
"librqbit-clone-to-owned",
@ -2598,9 +2599,9 @@ checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152"
[[package]]
name = "rustix"
version = "0.38.34"
version = "0.38.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f"
checksum = "a85d50532239da68e9addb745ba38ff4612a242c1c7ceea689c4bc7c2f43c36f"
dependencies = [
"bitflags 2.6.0",
"errno",
@ -2641,9 +2642,9 @@ checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0"
[[package]]
name = "rustls-webpki"
version = "0.102.6"
version = "0.102.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e"
checksum = "84678086bd54edf2b415183ed7a94d0efb049f1b646a33e22a36f3794be6ae56"
dependencies = [
"ring",
"rustls-pki-types",

View file

@ -3,8 +3,10 @@
//
// Not useful outside of librqbit.
use std::borrow::Borrow;
use bytes::Bytes;
use serde::{Deserialize, Deserializer};
use serde::{Deserialize, Deserializer, Serialize};
use clone_to_owned::CloneToOwned;
@ -15,7 +17,9 @@ pub struct ByteBufOwned(pub bytes::Bytes);
#[serde(transparent)]
pub struct ByteBuf<'a>(pub &'a [u8]);
pub trait ByteBufT {
pub trait ByteBufT:
AsRef<[u8]> + std::hash::Hash + Serialize + Eq + core::fmt::Debug + CloneToOwned + Borrow<[u8]>
{
fn as_slice(&self) -> &[u8];
}

View file

@ -14,10 +14,7 @@ use librqbit_core::{
};
use parking_lot::RwLock;
use peer_binary_protocol::{
extended::{
handshake::{ExtendedHandshake, YourIP},
ExtendedMessage,
},
extended::{handshake::ExtendedHandshake, ExtendedMessage},
serialize_piece_preamble, Handshake, Message, MessageOwned, PIECE_MESSAGE_DEFAULT_LEN,
};
use serde::{Deserialize, Serialize};
@ -248,14 +245,14 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let supports_extended = handshake_supports_extended;
if supports_extended {
let your_ip = self.addr.ip();
let mut my_extended = ExtendedHandshake::new();
my_extended.yourip = Some(YourIP(your_ip));
self.handler
.update_my_extended_handshake(&mut my_extended)?;
let my_extended = Message::Extended(ExtendedMessage::Handshake(my_extended));
trace!("sending extended handshake: {:?}", &my_extended);
my_extended.serialize(&mut write_buf, &|| None).unwrap();
my_extended
.serialize(&mut write_buf, &Default::default)
.unwrap();
with_timeout(rwtimeout, conn.write_all(&write_buf))
.await
.context("error writing extended handshake")?;
@ -318,7 +315,8 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
extended_handshake_ref
.read()
.as_ref()
.and_then(|e| e.ut_metadata())
.map(|e| e.peer_extended_messages())
.unwrap_or_default()
})?,
WriterRequest::ReadChunkRequest(chunk) => {
#[allow(unused_mut)]
@ -397,7 +395,6 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
if let Message::Extended(ExtendedMessage::Handshake(h)) = &message {
*extended_handshake_ref.write() = Some(h.clone_to_owned(None));
self.handler.on_extended_handshake(h)?;
trace!("remembered extended handshake for future serializing");
} else {
self.handler
.on_received_message(message)

View file

@ -57,7 +57,7 @@ impl ReadBuf {
anyhow::bail!("peer disconnected while reading handshake");
}
let (h, size) = Handshake::deserialize(&self.buf[..self.filled])
.map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?;
.map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?} hadshake data {:?}", e, &self.buf[..self.filled.min(19)]))?;
self.processed = size;
Ok(h)
}

View file

@ -67,7 +67,12 @@ use librqbit_core::{
};
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use peer_binary_protocol::{
extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage},
extended::{
handshake::{ExtendedHandshake, YourIP},
ut_metadata::UtMetadata,
ut_pex::UtPex,
ExtendedMessage,
},
Handshake, Message, MessageOwned, Piece, Request,
};
use peers::stats::atomic::AggregatePeerStatsAtomic;
@ -877,6 +882,9 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
self.send_metadata_piece(metadata_piece_id)
.with_context(|| format!("error sending metadata piece {metadata_piece_id}"))?;
}
Message::Extended(ExtendedMessage::UtPex(pex)) => {
self.on_pex_message(pex);
}
message => {
warn!("received unsupported message {:?}, ignoring", message);
}
@ -887,7 +895,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize> {
let g = self.state.lock_read("serialize_bitfield_message_to_buf");
let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_bytes()));
let len = msg.serialize(buf, &|| None)?;
let len = msg.serialize(buf, &Default::default)?;
trace!("sending: {:?}, length={}", &msg, len);
Ok(len)
}
@ -914,7 +922,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
self.state.file_ops().read_chunk(self.addr, chunk, buf)
}
fn on_extended_handshake(&self, _: &ExtendedHandshake<ByteBuf>) -> anyhow::Result<()> {
fn on_extended_handshake(&self, hs: &ExtendedHandshake<ByteBuf>) -> anyhow::Result<()> {
if let Some(peer_pex_msg_id) = hs.ut_pex() {
trace!("peer supports pex at {peer_pex_msg_id}");
}
Ok(())
}
@ -937,6 +948,8 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
&self,
handshake: &mut ExtendedHandshake<ByteBuf>,
) -> anyhow::Result<()> {
let your_ip = self.addr.ip();
handshake.yourip = Some(YourIP(your_ip));
let info_bytes = &self.state.torrent().info_bytes;
if !info_bytes.is_empty() {
if let Ok(len) = info_bytes.len().try_into() {
@ -1659,4 +1672,20 @@ impl PeerHandler {
.context("error sending UtMetadata: channel closed")?;
Ok(())
}
fn on_pex_message<B>(&self, msg: UtPex<B>)
where
B: AsRef<[u8]> + std::fmt::Debug,
{
// TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ...
msg.added_peers().for_each(|peer| {
self.state
.add_peer_if_not_seen(peer.addr)
.map_err(|error| {
warn!(?peer, ?error, "failed to add peer");
error
})
.ok();
});
}
}

View file

@ -20,3 +20,4 @@ librqbit-core = { path = "../librqbit_core", default-features = false, version =
bitvec = "1"
anyhow = "1"
bytes = "1.7.1"
itertools = "0.12"

View file

@ -1,19 +1,18 @@
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
};
use std::{collections::HashMap, net::IpAddr};
use buffers::ByteBuf;
use byteorder::ByteOrder;
use byteorder::BE;
use buffers::{ByteBuf, ByteBufT};
use bytes::Bytes;
use clone_to_owned::CloneToOwned;
use serde::{Deserialize, Deserializer, Serialize};
use crate::{EXTENDED_UT_METADATA_KEY, MY_EXTENDED_UT_METADATA};
use crate::{
EXTENDED_UT_METADATA_KEY, EXTENDED_UT_PEX_KEY, MY_EXTENDED_UT_METADATA, MY_EXTENDED_UT_PEX,
};
use super::PeerExtendedMessageIds;
#[derive(Deserialize, Serialize, Debug, Default)]
pub struct ExtendedHandshake<ByteBuf: Eq + std::hash::Hash> {
pub struct ExtendedHandshake<ByteBuf: ByteBufT> {
#[serde(bound(deserialize = "ByteBuf: From<&'de [u8]>"))]
pub m: HashMap<ByteBuf, u8>,
#[serde(skip_serializing_if = "Option::is_none")]
@ -40,6 +39,7 @@ impl ExtendedHandshake<ByteBuf<'static>> {
pub fn new() -> Self {
let mut features = HashMap::new();
features.insert(ByteBuf(EXTENDED_UT_METADATA_KEY), MY_EXTENDED_UT_METADATA);
features.insert(ByteBuf(EXTENDED_UT_PEX_KEY), MY_EXTENDED_UT_PEX);
Self {
m: features,
..Default::default()
@ -49,7 +49,7 @@ impl ExtendedHandshake<ByteBuf<'static>> {
impl<'a, ByteBuf> ExtendedHandshake<ByteBuf>
where
ByteBuf: Eq + std::hash::Hash + std::borrow::Borrow<[u8]>,
ByteBuf: ByteBufT,
{
fn get_msgid(&self, msg_type: &'a [u8]) -> Option<u8> {
self.m.get(msg_type).copied()
@ -58,12 +58,23 @@ where
pub fn ut_metadata(&self) -> Option<u8> {
self.get_msgid(EXTENDED_UT_METADATA_KEY)
}
pub fn ut_pex(&self) -> Option<u8> {
self.get_msgid(EXTENDED_UT_PEX_KEY)
}
pub fn peer_extended_messages(&self) -> PeerExtendedMessageIds {
PeerExtendedMessageIds {
ut_metadata: self.ut_metadata(),
ut_pex: self.ut_pex(),
}
}
}
impl<ByteBuf> CloneToOwned for ExtendedHandshake<ByteBuf>
where
ByteBuf: CloneToOwned + Eq + std::hash::Hash,
<ByteBuf as CloneToOwned>::Target: Eq + std::hash::Hash,
ByteBuf: ByteBufT,
<ByteBuf as CloneToOwned>::Target: ByteBufT,
{
type Target = ExtendedHandshake<<ByteBuf as CloneToOwned>::Target>;
@ -122,18 +133,11 @@ impl<'de> Deserialize<'de> for YourIP {
E: serde::de::Error,
{
if v.len() == 4 {
return Ok(YourIP(IpAddr::V4(Ipv4Addr::new(v[0], v[1], v[2], v[3]))));
let ip_bytes: &[u8; 4] = v[0..4].try_into().unwrap(); // Safe to unwrap as we check slice length
return Ok(YourIP(IpAddr::from(*ip_bytes)));
} else if v.len() == 16 {
return Ok(YourIP(IpAddr::V6(Ipv6Addr::new(
BE::read_u16(&v[..2]),
BE::read_u16(&v[2..4]),
BE::read_u16(&v[4..6]),
BE::read_u16(&v[6..8]),
BE::read_u16(&v[8..10]),
BE::read_u16(&v[10..12]),
BE::read_u16(&v[12..14]),
BE::read_u16(&v[14..]),
))));
let ip_bytes: &[u8; 16] = v[0..16].try_into().unwrap(); // Safe to unwrap as we check slice length
return Ok(YourIP(IpAddr::from(*ip_bytes)));
}
Err(E::custom("expected 4 or 16 byte address"))
}

View file

@ -1,9 +1,13 @@
use bencode::bencode_serialize_to_writer;
use bencode::from_bytes;
use bencode::BencodeValue;
use buffers::ByteBufT;
use bytes::Bytes;
use clone_to_owned::CloneToOwned;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use ut_pex::UtPex;
use crate::MY_EXTENDED_UT_PEX;
use self::{handshake::ExtendedHandshake, ut_metadata::UtMetadata};
@ -11,20 +15,28 @@ use super::MessageDeserializeError;
pub mod handshake;
pub mod ut_metadata;
pub mod ut_pex;
use super::MY_EXTENDED_UT_METADATA;
#[derive(Debug, Default)]
pub struct PeerExtendedMessageIds {
pub ut_metadata: Option<u8>,
pub ut_pex: Option<u8>,
}
#[derive(Debug)]
pub enum ExtendedMessage<ByteBuf: std::hash::Hash + Eq> {
pub enum ExtendedMessage<ByteBuf: ByteBufT> {
Handshake(ExtendedHandshake<ByteBuf>),
UtMetadata(UtMetadata<ByteBuf>),
UtPex(UtPex<ByteBuf>),
Dyn(u8, BencodeValue<ByteBuf>),
}
impl<ByteBuf> CloneToOwned for ExtendedMessage<ByteBuf>
where
ByteBuf: CloneToOwned + std::hash::Hash + Eq,
<ByteBuf as CloneToOwned>::Target: std::hash::Hash + Eq,
ByteBuf: ByteBufT,
<ByteBuf as CloneToOwned>::Target: ByteBufT,
{
type Target = ExtendedMessage<<ByteBuf as CloneToOwned>::Target>;
@ -37,15 +49,16 @@ where
ExtendedMessage::UtMetadata(m) => {
ExtendedMessage::UtMetadata(m.clone_to_owned(within_buffer))
}
ExtendedMessage::UtPex(m) => ExtendedMessage::UtPex(m.clone_to_owned(within_buffer)),
}
}
}
impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage<ByteBuf> {
impl<ByteBuf: ByteBufT> ExtendedMessage<ByteBuf> {
pub fn serialize(
&self,
out: &mut Vec<u8>,
extended_handshake_ut_metadata: &dyn Fn() -> Option<u8>,
extended_handshake_ut_metadata: &dyn Fn() -> PeerExtendedMessageIds,
) -> anyhow::Result<()>
where
ByteBuf: AsRef<[u8]>,
@ -60,17 +73,28 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage<ByteBuf
bencode_serialize_to_writer(h, out)?;
}
ExtendedMessage::UtMetadata(u) => {
let emsg_id = extended_handshake_ut_metadata().ok_or_else(|| {
anyhow::anyhow!("need peer's handshake to serialize ut_metadata")
})?;
let emsg_id = extended_handshake_ut_metadata()
.ut_metadata
.ok_or_else(|| {
anyhow::anyhow!("need peer's handshake to serialize ut_metadata")
})?;
out.push(emsg_id);
u.serialize(out);
}
ExtendedMessage::UtPex(m) => {
let emsg_id = extended_handshake_ut_metadata().ut_pex.ok_or_else(|| {
anyhow::anyhow!(
"need peer's handshake to serialize ut_pex, or peer does't support ut_pex"
)
})?;
out.push(emsg_id);
bencode_serialize_to_writer(m, out)?;
}
}
Ok(())
}
pub fn deserialize(mut buf: &'a [u8]) -> Result<Self, MessageDeserializeError>
pub fn deserialize<'a>(mut buf: &'a [u8]) -> Result<Self, MessageDeserializeError>
where
ByteBuf: Deserialize<'a> + From<&'a [u8]>,
{
@ -91,6 +115,7 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage<ByteBuf
MY_EXTENDED_UT_METADATA => {
Ok(ExtendedMessage::UtMetadata(UtMetadata::deserialize(buf)?))
}
MY_EXTENDED_UT_PEX => Ok(ExtendedMessage::UtPex(from_bytes(buf)?)),
_ => Ok(ExtendedMessage::Dyn(emsg_id, from_bytes(buf)?)),
}
}

View file

@ -1,5 +1,6 @@
use bencode::bencode_serialize_to_writer;
use bencode::BencodeDeserializer;
use buffers::ByteBufT;
use bytes::Bytes;
use clone_to_owned::CloneToOwned;
use serde::Deserialize;
@ -39,7 +40,7 @@ impl<ByteBuf: CloneToOwned> CloneToOwned for UtMetadata<ByteBuf> {
}
}
impl<'a, ByteBuf: 'a> UtMetadata<ByteBuf> {
impl<ByteBuf: ByteBufT> UtMetadata<ByteBuf> {
pub fn serialize(&self, buf: &mut Vec<u8>)
where
ByteBuf: AsRef<[u8]>,
@ -83,7 +84,7 @@ impl<'a, ByteBuf: 'a> UtMetadata<ByteBuf> {
}
}
}
pub fn deserialize(buf: &'a [u8]) -> Result<Self, MessageDeserializeError>
pub fn deserialize<'a>(buf: &'a [u8]) -> Result<Self, MessageDeserializeError>
where
ByteBuf: From<&'a [u8]>,
{

View file

@ -0,0 +1,151 @@
use std::net::{IpAddr, SocketAddr};
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use clone_to_owned::CloneToOwned;
use itertools::{EitherOrBoth, Itertools};
use serde::{Deserialize, Serialize};
pub struct PexPeerInfo {
pub flags: u8,
pub addr: SocketAddr,
}
impl core::fmt::Debug for PexPeerInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.addr)?;
if self.flags != 0 {
write!(f, ";flags={}", self.flags)?;
}
Ok(())
}
}
#[derive(Serialize, Default, Deserialize)]
pub struct UtPex<B> {
added: B,
#[serde(rename = "added.f")]
#[serde(skip_serializing_if = "Option::is_none")]
added_f: Option<B>,
added6: B,
#[serde(rename = "added6.f")]
#[serde(skip_serializing_if = "Option::is_none")]
added6_f: Option<B>,
dropped: B,
dropped6: B,
}
impl<B> core::fmt::Debug for UtPex<B>
where
B: AsRef<[u8]>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
struct IterDebug<I>(I);
impl<I> core::fmt::Debug for IterDebug<I>
where
I: Iterator<Item = PexPeerInfo> + Clone,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_list().entries(self.0.clone()).finish()
}
}
f.debug_struct("UtPex")
.field("added", &IterDebug(self.added_peers()))
.field("dropped", &IterDebug(self.dropped_peers()))
.finish()
}
}
impl<B> CloneToOwned for UtPex<B>
where
B: CloneToOwned,
{
type Target = UtPex<<B as CloneToOwned>::Target>;
fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target {
UtPex {
added: self.added.clone_to_owned(within_buffer),
added_f: self.added_f.clone_to_owned(within_buffer),
added6: self.added6.clone_to_owned(within_buffer),
added6_f: self.added6_f.clone_to_owned(within_buffer),
dropped: self.dropped.clone_to_owned(within_buffer),
dropped6: self.dropped6.clone_to_owned(within_buffer),
}
}
}
impl<B> UtPex<B>
where
B: AsRef<[u8]>,
{
fn added_peers_inner<'a>(
&'a self,
buf: &'a B,
flags: &'a Option<B>,
ip_len: usize,
) -> impl Iterator<Item = PexPeerInfo> + Clone + 'a {
let addrs = buf.as_ref().chunks_exact(ip_len + 2).map(move |c| {
let ip = match ip_len {
4 => IpAddr::from(TryInto::<[u8; 4]>::try_into(&c[..4]).unwrap()),
16 => IpAddr::from(TryInto::<[u8; 16]>::try_into(&c[..16]).unwrap()),
_ => unreachable!(),
};
let port = BE::read_u16(&c[ip_len..]);
SocketAddr::new(ip, port)
});
let flags = flags
.as_ref()
.map(|b| b.as_ref().iter().copied())
.into_iter()
.flatten();
addrs.zip_longest(flags).filter_map(|eob| match eob {
EitherOrBoth::Both(addr, flags) => Some(PexPeerInfo { flags, addr }),
EitherOrBoth::Left(addr) => Some(PexPeerInfo { flags: 0, addr }),
EitherOrBoth::Right(_) => None,
})
}
pub fn added_peers(&self) -> impl Iterator<Item = PexPeerInfo> + Clone + '_ {
self.added_peers_inner(&self.added, &self.added_f, 4)
.chain(self.added_peers_inner(&self.added6, &self.added6_f, 16))
}
pub fn dropped_peers(&self) -> impl Iterator<Item = PexPeerInfo> + Clone + '_ {
self.added_peers_inner(&self.dropped, &None, 4)
.chain(self.added_peers_inner(&self.dropped6, &None, 16))
}
}
#[cfg(test)]
mod tests {
use bencode::from_bytes;
use buffers::ByteBuf;
use super::*;
fn decode_hex(s: &str) -> Vec<u8> {
assert!(s.len() % 2 == 0);
(0..s.len())
.step_by(2)
.map(|i| u8::from_str_radix(&s[i..i + 2], 16).unwrap())
.collect()
}
#[test]
fn test_pex_deserialization() {
let msg = "64353a616464656431323ab99f9d14b56797f969861090373a61646465642e66323a0c00363a616464656436303a383a6164646564362e66303a373a64726f70706564303a383a64726f7070656436303a65";
let bytes = decode_hex(msg);
let pex = from_bytes::<UtPex<ByteBuf>>(&bytes).unwrap();
let addrs: Vec<_> = pex.added_peers().collect();
assert_eq!(2, addrs.len());
assert_eq!(
"185.159.157.20:46439".parse::<SocketAddr>().unwrap(),
addrs[0].addr
);
assert_eq!(12, addrs[0].flags);
assert_eq!(
"151.249.105.134:4240".parse::<SocketAddr>().unwrap(),
addrs[1].addr
);
assert_eq!(0, addrs[1].flags);
}
}

View file

@ -5,10 +5,11 @@
pub mod extended;
use bincode::Options;
use buffers::{ByteBuf, ByteBufOwned};
use buffers::{ByteBuf, ByteBufOwned, ByteBufT};
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use clone_to_owned::CloneToOwned;
use extended::PeerExtendedMessageIds;
use librqbit_core::{constants::CHUNK_SIZE, hash_id::Id20, lengths::ChunkInfo};
use serde::{Deserialize, Serialize};
@ -47,6 +48,9 @@ const MSGID_EXTENDED: u8 = 20;
pub const EXTENDED_UT_METADATA_KEY: &[u8] = b"ut_metadata";
pub const MY_EXTENDED_UT_METADATA: u8 = 3;
pub const EXTENDED_UT_PEX_KEY: &[u8] = b"ut_pex";
pub const MY_EXTENDED_UT_PEX: u8 = 1;
#[derive(Debug)]
pub enum MessageDeserializeError {
NotEnoughData(usize, &'static str),
@ -182,7 +186,7 @@ impl From<anyhow::Error> for MessageDeserializeError {
}
#[derive(Debug)]
pub enum Message<ByteBuf: std::hash::Hash + Eq> {
pub enum Message<ByteBuf: ByteBufT> {
Request(Request),
Cancel(Request),
Bitfield(ByteBuf),
@ -208,8 +212,8 @@ pub struct Bitfield<'a> {
impl<ByteBuf> CloneToOwned for Message<ByteBuf>
where
ByteBuf: CloneToOwned + std::hash::Hash + Eq,
<ByteBuf as CloneToOwned>::Target: std::hash::Hash + Eq,
ByteBuf: ByteBufT,
<ByteBuf as CloneToOwned>::Target: ByteBufT,
{
type Target = Message<<ByteBuf as CloneToOwned>::Target>;
@ -253,7 +257,7 @@ impl<'a> std::fmt::Debug for Bitfield<'a> {
impl<ByteBuf> Message<ByteBuf>
where
ByteBuf: AsRef<[u8]> + std::hash::Hash + Eq + Serialize,
ByteBuf: ByteBufT,
{
pub fn len_prefix_and_msg_id(&self) -> (u32, u8) {
match self {
@ -276,7 +280,7 @@ where
pub fn serialize(
&self,
out: &mut Vec<u8>,
extended_handshake_ut_metadata: &dyn Fn() -> Option<u8>,
peer_extended_messages: &dyn Fn() -> PeerExtendedMessageIds,
) -> anyhow::Result<usize> {
let (lp, msg_id) = self.len_prefix_and_msg_id();
@ -326,7 +330,7 @@ where
Ok(msg_len)
}
Message::Extended(e) => {
e.serialize(out, extended_handshake_ut_metadata)?;
e.serialize(out, peer_extended_messages)?;
let msg_size = out.len();
// no fucking idea why +1, but I tweaked that for it all to match up
// with real messages.
@ -636,7 +640,7 @@ mod tests {
fn test_extended_serialize() {
let msg = Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new()));
let mut out = Vec::new();
msg.serialize(&mut out, &|| None).unwrap();
msg.serialize(&mut out, &Default::default).unwrap();
dbg!(out);
}
@ -652,7 +656,7 @@ mod tests {
let (msg, size) = MessageBorrowed::deserialize(&buf).unwrap();
assert_eq!(size, buf.len());
let mut write_buf = Vec::new();
msg.serialize(&mut write_buf, &|| None).unwrap();
msg.serialize(&mut write_buf, &Default::default).unwrap();
if buf != write_buf {
{
use std::io::Write;

View file

@ -68,17 +68,6 @@ version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "async-recursion"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
]
[[package]]
name = "async-stream"
version = "0.3.5"
@ -329,9 +318,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
[[package]]
name = "bytemuck"
version = "1.17.0"
version = "1.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fd4c6dcc3b0aea2f5c0b4b82c2b15fe39ddbc76041a310848f4706edf76bb31"
checksum = "773d90827bc3feecfb67fab12e24de0749aad83c74b9504ecde46237b5cd24e2"
[[package]]
name = "byteorder"
@ -901,9 +890,9 @@ dependencies = [
[[package]]
name = "filetime"
version = "0.2.24"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf401df4a4e3872c4fe8151134cf483738e74b67fc934d6532c882b3d24a4550"
checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586"
dependencies = [
"cfg-if",
"libc",
@ -1842,7 +1831,7 @@ dependencies = [
[[package]]
name = "librqbit"
version = "7.0.0-beta.3"
version = "7.0.0"
dependencies = [
"anyhow",
"async-stream",
@ -1868,6 +1857,7 @@ dependencies = [
"librqbit-sha1-wrapper",
"librqbit-tracker-comms",
"librqbit-upnp",
"librqbit-upnp-serve",
"memmap2",
"mime_guess",
"parking_lot",
@ -1887,7 +1877,6 @@ dependencies = [
"tower-http",
"tracing",
"tracing-subscriber",
"upnp-serve",
"url",
"urlencoding",
"uuid",
@ -1895,7 +1884,7 @@ dependencies = [
[[package]]
name = "librqbit-bencode"
version = "3.0.0"
version = "3.0.1"
dependencies = [
"anyhow",
"bytes",
@ -1923,7 +1912,7 @@ dependencies = [
[[package]]
name = "librqbit-core"
version = "4.0.0"
version = "4.0.1"
dependencies = [
"anyhow",
"bytes",
@ -1978,6 +1967,7 @@ dependencies = [
"bitvec",
"byteorder",
"bytes",
"itertools",
"librqbit-bencode",
"librqbit-buffers",
"librqbit-clone-to-owned",
@ -1987,14 +1977,14 @@ dependencies = [
[[package]]
name = "librqbit-sha1-wrapper"
version = "3.0.0"
version = "4.0.0"
dependencies = [
"crypto-hash",
]
[[package]]
name = "librqbit-tracker-comms"
version = "1.0.3"
version = "2.0.0"
dependencies = [
"anyhow",
"async-stream",
@ -2017,7 +2007,6 @@ name = "librqbit-upnp"
version = "0.1.1"
dependencies = [
"anyhow",
"async-recursion",
"bstr",
"futures",
"httparse",
@ -2030,6 +2019,32 @@ dependencies = [
"url",
]
[[package]]
name = "librqbit-upnp-serve"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"bstr",
"gethostname",
"http 1.1.0",
"httparse",
"librqbit-core",
"librqbit-sha1-wrapper",
"librqbit-upnp",
"mime_guess",
"parking_lot",
"quick-xml 0.36.1",
"reqwest",
"serde",
"socket2",
"tokio",
"tokio-util",
"tracing",
"url",
"uuid",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.14"
@ -2235,8 +2250,9 @@ dependencies = [
[[package]]
name = "network-interface"
version = "1.1.1"
source = "git+https://github.com/ikatson/network-interface?branch=compile-on-freebsd#aca8a95ab1bb41a27bc82c6a2425eb4824bf0352"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "433419f898328beca4f2c6c73a1b52540658d92b0a99f0269330457e0fd998d5"
dependencies = [
"cc",
"libc",
@ -2809,6 +2825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc"
dependencies = [
"memchr",
"serde",
]
[[package]]
@ -3054,18 +3071,18 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustc_version"
version = "0.4.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
dependencies = [
"semver",
]
[[package]]
name = "rustix"
version = "0.38.34"
version = "0.38.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f"
checksum = "a85d50532239da68e9addb745ba38ff4612a242c1c7ceea689c4bc7c2f43c36f"
dependencies = [
"bitflags 2.6.0",
"errno",
@ -4241,31 +4258,6 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202"
[[package]]
name = "upnp-serve"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"bstr",
"gethostname",
"http 1.1.0",
"httparse",
"librqbit-core",
"librqbit-sha1-wrapper",
"librqbit-upnp",
"mime_guess",
"parking_lot",
"quick-xml 0.36.1",
"reqwest",
"socket2",
"tokio",
"tokio-util",
"tracing",
"url",
"uuid",
]
[[package]]
name = "url"
version = "2.5.2"