From 1fa05837fb5406cff522a6e43d06aaff5faba5c4 Mon Sep 17 00:00:00 2001 From: Ivan Date: Wed, 23 Oct 2024 12:57:29 +0200 Subject: [PATCH] Include only addresses with known working connections --- crates/librqbit/src/torrent_state/live/mod.rs | 48 +++++++++------ .../src/extended/ut_pex.rs | 60 ++++++++++--------- 2 files changed, 61 insertions(+), 47 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ab1bbb2..e5e4571 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -846,7 +846,17 @@ impl TorrentStateLive { ) -> anyhow::Result<()> { let mut sent_peers_live: HashSet = HashSet::new(); const MAX_SENT_PEERS: usize = 50; // As per BEP 11 we should not send more than 50 peers at once (here it also applies to fist message, should be OK as we anyhow really have more) + const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); // As per BEP 11 recommended interval is min 60 seconds + let mut delay = Duration::from_secs(10); // Wait 10 seconds before sending the first message to assure that peer will stay with us + loop { + tokio::select! { + _ = tx.closed() => return Ok(()), + _ = tokio::time::sleep(delay) => {}, + + } + delay = PEX_MESSAGE_INTERVAL; + let addrs_live_to_sent = self .peers .states @@ -856,7 +866,16 @@ impl TorrentStateLive { let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key()); if *addr != peer_addr { - if peer.state.is_live() && !sent_peers_live.contains(addr) { + let has_outgoing_connections = peer + .stats + .counters + .outgoing_connections + .load(Ordering::Relaxed) + > 0; // As per BEP 11 share only those we were able to connect + if peer.state.is_live() + && has_outgoing_connections + && !sent_peers_live.contains(addr) + { Some(*addr) } else { None @@ -865,7 +884,7 @@ impl TorrentStateLive { None } }) - .take(50) + .take(50) .collect::>(); let addrs_closed_to_sent = sent_peers_live @@ -881,28 +900,21 @@ impl TorrentStateLive { .take(MAX_SENT_PEERS) .collect::>(); - // BEP 11 - Dont send closed if they are now in live - // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, + // BEP 11 - Dont send closed if they are now in live + // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, // and addrs_closed_to_sent are only filtered addresses from sent_peers_live if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { - let pex_msg = extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); + debug!(peer=?peer_addr, "sending PEX with {} live ({:?})and {} closed peers", addrs_live_to_sent.len(), addrs_live_to_sent,addrs_closed_to_sent.len()); + let pex_msg = + extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); let msg = Message::Extended(ext_msg); - tx.send(WriterRequest::Message(msg))?; - - debug!(peer=?peer_addr, "sending PEX with {} live and {} closed peers", addrs_live_to_sent.len(), addrs_closed_to_sent.len()); - sent_peers_live.extend(&addrs_live_to_sent); - sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); - - - } - - tokio::select! { - _ = tx.closed() => return Ok(()), - _ = tokio::time::sleep(Duration::from_secs(60)) => {}, - + if tx.send(WriterRequest::Message(msg)).is_ok() { + sent_peers_live.extend(&addrs_live_to_sent); + sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); + } } } } diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index 0dc6b15..a7c24b9 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -123,42 +123,44 @@ where } impl UtPex { - - pub fn from_addrs<'a, I,J>(addrs_live: I, addrs_closed: J) -> Self + pub fn from_addrs<'a, I, J>(addrs_live: I, addrs_closed: J) -> Self where I: IntoIterator, J: IntoIterator, { - - - fn addrs_to_bytes<'a,I>(addrs: I) -> (Option, Option) + fn addrs_to_bytes<'a, I>(addrs: I) -> (Option, Option) where I: IntoIterator, - { - let mut ipv4_addrs = BytesMut::new(); - let mut ipv6_addrs = BytesMut::new(); - for addr in addrs { - match addr { - SocketAddr::V4(v4) => { - ipv4_addrs.extend_from_slice(&v4.ip().octets()); - ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes()); - } - SocketAddr::V6(v6) => { - ipv6_addrs.extend_from_slice(&v6.ip().octets()); - ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes()); + { + let mut ipv4_addrs = BytesMut::new(); + let mut ipv6_addrs = BytesMut::new(); + for addr in addrs { + match addr { + SocketAddr::V4(v4) => { + ipv4_addrs.extend_from_slice(&v4.ip().octets()); + ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes()); + } + SocketAddr::V6(v6) => { + ipv6_addrs.extend_from_slice(&v6.ip().octets()); + ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes()); + } } } + + let freeze = |buf: BytesMut| -> Option { + if !buf.is_empty() { + Some(buf.freeze().into()) + } else { + None + } + }; + + (freeze(ipv4_addrs), freeze(ipv6_addrs)) } - let freeze = |buf: BytesMut| -> Option { if !buf.is_empty() {Some(buf.freeze().into())} else {None} }; - - (freeze(ipv4_addrs), freeze(ipv6_addrs)) - } - let (added, added6) = addrs_to_bytes(addrs_live); let (dropped, dropped6) = addrs_to_bytes(addrs_closed); - Self { added, added6, @@ -166,9 +168,7 @@ impl UtPex { dropped6, ..Default::default() } - } - } #[cfg(test)] @@ -210,8 +210,12 @@ mod tests { let a1 = "185.159.157.20:46439".parse::().unwrap(); let a2 = "151.249.105.134:4240".parse::().unwrap(); //IPV6 - let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439".parse::().unwrap(); - let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240".parse::().unwrap(); + let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439" + .parse::() + .unwrap(); + let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240" + .parse::() + .unwrap(); let addrs = vec![a1, aa1, a2, aa2]; let pex = UtPex::from_addrs(&addrs, &addrs); @@ -230,7 +234,5 @@ mod tests { assert_eq!(a2, addrs2[1].addr); assert_eq!(aa1, addrs2[2].addr); assert_eq!(aa2, addrs2[3].addr); - - } }