Sending PEX - initial concept
This commit is contained in:
parent
fb4e4c5741
commit
3c470e1670
2 changed files with 161 additions and 5 deletions
|
|
@ -69,7 +69,7 @@ use librqbit_core::{
|
|||
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use peer_binary_protocol::{
|
||||
extended::{
|
||||
handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage,
|
||||
self, handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage,
|
||||
},
|
||||
Handshake, Message, MessageOwned, Piece, Request,
|
||||
};
|
||||
|
|
@ -838,6 +838,74 @@ impl TorrentStateLive {
|
|||
.take_while(|r| r.is_ok())
|
||||
.last();
|
||||
}
|
||||
|
||||
async fn task_send_pex_to_peer(
|
||||
self: Arc<Self>,
|
||||
peer_addr: SocketAddr,
|
||||
tx: PeerTx,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut sent_peers_live: HashSet<SocketAddr> = HashSet::new();
|
||||
const MAX_SENT_PEERS: usize = 50; // As per BEP 11 we should not send more than 50 peers at once (here it also applies to fist message, should be OK as we anyhow really have more)
|
||||
loop {
|
||||
let addrs_live_to_sent = self
|
||||
.peers
|
||||
.states
|
||||
.iter()
|
||||
.filter_map(|e| {
|
||||
let peer = e.value();
|
||||
let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key());
|
||||
|
||||
if *addr != peer_addr {
|
||||
if peer.state.is_live() && !sent_peers_live.contains(addr) {
|
||||
Some(*addr)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.take(50)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let addrs_closed_to_sent = sent_peers_live
|
||||
.iter()
|
||||
.filter(|addr| {
|
||||
self.peers
|
||||
.states
|
||||
.get(addr)
|
||||
.map(|p| !p.value().state.is_live())
|
||||
.unwrap_or(true)
|
||||
})
|
||||
.copied()
|
||||
.take(MAX_SENT_PEERS)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
// BEP 11 - Dont send closed if they are now in live
|
||||
// it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent,
|
||||
// and addrs_closed_to_sent are only filtered addresses from sent_peers_live
|
||||
|
||||
if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() {
|
||||
let pex_msg = extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent);
|
||||
let ext_msg = extended::ExtendedMessage::UtPex(pex_msg);
|
||||
let msg = Message::Extended(ext_msg);
|
||||
|
||||
tx.send(WriterRequest::Message(msg))?;
|
||||
|
||||
debug!(peer=?peer_addr, "sending PEX with {} live and {} closed peers", addrs_live_to_sent.len(), addrs_closed_to_sent.len());
|
||||
sent_peers_live.extend(&addrs_live_to_sent);
|
||||
sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr));
|
||||
|
||||
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
_ = tx.closed() => return Ok(()),
|
||||
_ = tokio::time::sleep(Duration::from_secs(60)) => {},
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PeerHandlerLocked {
|
||||
|
|
@ -963,8 +1031,17 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
|||
}
|
||||
|
||||
fn on_extended_handshake(&self, hs: &ExtendedHandshake<ByteBuf>) -> anyhow::Result<()> {
|
||||
if let Some(peer_pex_msg_id) = hs.ut_pex() {
|
||||
trace!("peer supports pex at {peer_pex_msg_id}");
|
||||
if let Some(_peer_pex_msg_id) = hs.ut_pex() {
|
||||
self.state.clone().spawn(
|
||||
error_span!(
|
||||
parent: self.state.torrent.span.clone(),
|
||||
"sending_pex_to_peer",
|
||||
peer = self.addr.to_string()
|
||||
),
|
||||
self.state
|
||||
.clone()
|
||||
.task_send_pex_to_peer(self.addr, self.tx.clone()),
|
||||
);
|
||||
}
|
||||
// Lets update outgoing Socket address for incoming connection
|
||||
if self.incoming {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
use std::net::{IpAddr, SocketAddr};
|
||||
|
||||
use buffers::ByteBufOwned;
|
||||
use byteorder::{ByteOrder, BE};
|
||||
use bytes::Bytes;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use clone_to_owned::CloneToOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
|
|
@ -121,9 +122,58 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl UtPex<ByteBufOwned> {
|
||||
|
||||
pub fn from_addrs<'a, I,J>(addrs_live: I, addrs_closed: J) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = &'a SocketAddr>,
|
||||
J: IntoIterator<Item = &'a SocketAddr>,
|
||||
{
|
||||
|
||||
|
||||
fn addrs_to_bytes<'a,I>(addrs: I) -> (Option<ByteBufOwned>, Option<ByteBufOwned>)
|
||||
where
|
||||
I: IntoIterator<Item = &'a SocketAddr>,
|
||||
{
|
||||
let mut ipv4_addrs = BytesMut::new();
|
||||
let mut ipv6_addrs = BytesMut::new();
|
||||
for addr in addrs {
|
||||
match addr {
|
||||
SocketAddr::V4(v4) => {
|
||||
ipv4_addrs.extend_from_slice(&v4.ip().octets());
|
||||
ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes());
|
||||
}
|
||||
SocketAddr::V6(v6) => {
|
||||
ipv6_addrs.extend_from_slice(&v6.ip().octets());
|
||||
ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let freeze = |buf: BytesMut| -> Option<ByteBufOwned> { if !buf.is_empty() {Some(buf.freeze().into())} else {None} };
|
||||
|
||||
(freeze(ipv4_addrs), freeze(ipv6_addrs))
|
||||
}
|
||||
|
||||
let (added, added6) = addrs_to_bytes(addrs_live);
|
||||
let (dropped, dropped6) = addrs_to_bytes(addrs_closed);
|
||||
|
||||
|
||||
Self {
|
||||
added,
|
||||
added6,
|
||||
dropped,
|
||||
dropped6,
|
||||
..Default::default()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use bencode::from_bytes;
|
||||
use bencode::{bencode_serialize_to_writer, from_bytes};
|
||||
use buffers::ByteBuf;
|
||||
|
||||
use super::*;
|
||||
|
|
@ -154,4 +204,33 @@ mod tests {
|
|||
);
|
||||
assert_eq!(0, addrs[1].flags);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pex_roundtrip() {
|
||||
let a1 = "185.159.157.20:46439".parse::<SocketAddr>().unwrap();
|
||||
let a2 = "151.249.105.134:4240".parse::<SocketAddr>().unwrap();
|
||||
//IPV6
|
||||
let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439".parse::<SocketAddr>().unwrap();
|
||||
let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240".parse::<SocketAddr>().unwrap();
|
||||
|
||||
let addrs = vec![a1, aa1, a2, aa2];
|
||||
let pex = UtPex::from_addrs(&addrs, &addrs);
|
||||
let mut bytes = Vec::new();
|
||||
bencode_serialize_to_writer(&pex, &mut bytes).unwrap();
|
||||
let pex2 = from_bytes::<UtPex<ByteBuf>>(&bytes).unwrap();
|
||||
assert_eq!(4, pex2.added_peers().count());
|
||||
assert_eq!(pex.added_peers().count(), pex2.added_peers().count());
|
||||
let addrs2: Vec<_> = pex2.added_peers().collect();
|
||||
assert_eq!(a1, addrs2[0].addr);
|
||||
assert_eq!(a2, addrs2[1].addr);
|
||||
assert_eq!(aa1, addrs2[2].addr);
|
||||
assert_eq!(aa2, addrs2[3].addr);
|
||||
let addrs2: Vec<_> = pex2.dropped_peers().collect();
|
||||
assert_eq!(a1, addrs2[0].addr);
|
||||
assert_eq!(a2, addrs2[1].addr);
|
||||
assert_eq!(aa1, addrs2[2].addr);
|
||||
assert_eq!(aa2, addrs2[3].addr);
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue