From 3c470e1670ce7cd52c266fbc1769429864192faf Mon Sep 17 00:00:00 2001 From: Ivan Date: Mon, 21 Oct 2024 13:12:05 +0200 Subject: [PATCH 01/14] Sending PEX - initial concept --- crates/librqbit/src/torrent_state/live/mod.rs | 83 ++++++++++++++++++- .../src/extended/ut_pex.rs | 83 ++++++++++++++++++- 2 files changed, 161 insertions(+), 5 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index da9037b..ab1bbb2 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -69,7 +69,7 @@ use librqbit_core::{ use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use peer_binary_protocol::{ extended::{ - handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, + self, handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, }, Handshake, Message, MessageOwned, Piece, Request, }; @@ -838,6 +838,74 @@ impl TorrentStateLive { .take_while(|r| r.is_ok()) .last(); } + + async fn task_send_pex_to_peer( + self: Arc, + peer_addr: SocketAddr, + tx: PeerTx, + ) -> anyhow::Result<()> { + let mut sent_peers_live: HashSet = HashSet::new(); + const MAX_SENT_PEERS: usize = 50; // As per BEP 11 we should not send more than 50 peers at once (here it also applies to fist message, should be OK as we anyhow really have more) + loop { + let addrs_live_to_sent = self + .peers + .states + .iter() + .filter_map(|e| { + let peer = e.value(); + let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key()); + + if *addr != peer_addr { + if peer.state.is_live() && !sent_peers_live.contains(addr) { + Some(*addr) + } else { + None + } + } else { + None + } + }) + .take(50) + .collect::>(); + + let addrs_closed_to_sent = sent_peers_live + .iter() + .filter(|addr| { + self.peers + .states + .get(addr) + .map(|p| !p.value().state.is_live()) + .unwrap_or(true) + }) + .copied() + .take(MAX_SENT_PEERS) + .collect::>(); + + // BEP 11 - Dont send closed if they are now in live + // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, + // and addrs_closed_to_sent are only filtered addresses from sent_peers_live + + if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { + let pex_msg = extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); + let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); + let msg = Message::Extended(ext_msg); + + tx.send(WriterRequest::Message(msg))?; + + debug!(peer=?peer_addr, "sending PEX with {} live and {} closed peers", addrs_live_to_sent.len(), addrs_closed_to_sent.len()); + sent_peers_live.extend(&addrs_live_to_sent); + sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); + + + } + + tokio::select! { + _ = tx.closed() => return Ok(()), + _ = tokio::time::sleep(Duration::from_secs(60)) => {}, + + } + } + } } struct PeerHandlerLocked { @@ -963,8 +1031,17 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { } fn on_extended_handshake(&self, hs: &ExtendedHandshake) -> anyhow::Result<()> { - if let Some(peer_pex_msg_id) = hs.ut_pex() { - trace!("peer supports pex at {peer_pex_msg_id}"); + if let Some(_peer_pex_msg_id) = hs.ut_pex() { + self.state.clone().spawn( + error_span!( + parent: self.state.torrent.span.clone(), + "sending_pex_to_peer", + peer = self.addr.to_string() + ), + self.state + .clone() + .task_send_pex_to_peer(self.addr, self.tx.clone()), + ); } // Lets update outgoing Socket address for incoming connection if self.incoming { diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index 8e1b0a8..0dc6b15 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -1,7 +1,8 @@ use std::net::{IpAddr, SocketAddr}; +use buffers::ByteBufOwned; use byteorder::{ByteOrder, BE}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use clone_to_owned::CloneToOwned; use serde::{Deserialize, Serialize}; @@ -121,9 +122,58 @@ where } } +impl UtPex { + + pub fn from_addrs<'a, I,J>(addrs_live: I, addrs_closed: J) -> Self + where + I: IntoIterator, + J: IntoIterator, + { + + + fn addrs_to_bytes<'a,I>(addrs: I) -> (Option, Option) + where + I: IntoIterator, + { + let mut ipv4_addrs = BytesMut::new(); + let mut ipv6_addrs = BytesMut::new(); + for addr in addrs { + match addr { + SocketAddr::V4(v4) => { + ipv4_addrs.extend_from_slice(&v4.ip().octets()); + ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes()); + } + SocketAddr::V6(v6) => { + ipv6_addrs.extend_from_slice(&v6.ip().octets()); + ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes()); + } + } + } + + let freeze = |buf: BytesMut| -> Option { if !buf.is_empty() {Some(buf.freeze().into())} else {None} }; + + (freeze(ipv4_addrs), freeze(ipv6_addrs)) + } + + let (added, added6) = addrs_to_bytes(addrs_live); + let (dropped, dropped6) = addrs_to_bytes(addrs_closed); + + + Self { + added, + added6, + dropped, + dropped6, + ..Default::default() + } + + } + +} + #[cfg(test)] mod tests { - use bencode::from_bytes; + use bencode::{bencode_serialize_to_writer, from_bytes}; use buffers::ByteBuf; use super::*; @@ -154,4 +204,33 @@ mod tests { ); assert_eq!(0, addrs[1].flags); } + + #[test] + fn test_pex_roundtrip() { + let a1 = "185.159.157.20:46439".parse::().unwrap(); + let a2 = "151.249.105.134:4240".parse::().unwrap(); + //IPV6 + let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439".parse::().unwrap(); + let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240".parse::().unwrap(); + + let addrs = vec![a1, aa1, a2, aa2]; + let pex = UtPex::from_addrs(&addrs, &addrs); + let mut bytes = Vec::new(); + bencode_serialize_to_writer(&pex, &mut bytes).unwrap(); + let pex2 = from_bytes::>(&bytes).unwrap(); + assert_eq!(4, pex2.added_peers().count()); + assert_eq!(pex.added_peers().count(), pex2.added_peers().count()); + let addrs2: Vec<_> = pex2.added_peers().collect(); + assert_eq!(a1, addrs2[0].addr); + assert_eq!(a2, addrs2[1].addr); + assert_eq!(aa1, addrs2[2].addr); + assert_eq!(aa2, addrs2[3].addr); + let addrs2: Vec<_> = pex2.dropped_peers().collect(); + assert_eq!(a1, addrs2[0].addr); + assert_eq!(a2, addrs2[1].addr); + assert_eq!(aa1, addrs2[2].addr); + assert_eq!(aa2, addrs2[3].addr); + + + } } From 1129d5b6e62a8be4f0ce29ee00f619fa373ffe8b Mon Sep 17 00:00:00 2001 From: Ivan Date: Mon, 21 Oct 2024 17:05:52 +0200 Subject: [PATCH 02/14] Fix missing fn --- crates/librqbit/src/torrent_state/live/peer/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index b0895f8..3c6e6af 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -148,6 +148,10 @@ impl PeerStateNoMut { } } + pub fn is_live(&self) -> bool { + matches!(&self.0, PeerState::Live(_)) + } + pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { match &mut self.0 { PeerState::Live(l) => Some(l), From 1fa05837fb5406cff522a6e43d06aaff5faba5c4 Mon Sep 17 00:00:00 2001 From: Ivan Date: Wed, 23 Oct 2024 12:57:29 +0200 Subject: [PATCH 03/14] Include only addresses with known working connections --- crates/librqbit/src/torrent_state/live/mod.rs | 48 +++++++++------ .../src/extended/ut_pex.rs | 60 ++++++++++--------- 2 files changed, 61 insertions(+), 47 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ab1bbb2..e5e4571 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -846,7 +846,17 @@ impl TorrentStateLive { ) -> anyhow::Result<()> { let mut sent_peers_live: HashSet = HashSet::new(); const MAX_SENT_PEERS: usize = 50; // As per BEP 11 we should not send more than 50 peers at once (here it also applies to fist message, should be OK as we anyhow really have more) + const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); // As per BEP 11 recommended interval is min 60 seconds + let mut delay = Duration::from_secs(10); // Wait 10 seconds before sending the first message to assure that peer will stay with us + loop { + tokio::select! { + _ = tx.closed() => return Ok(()), + _ = tokio::time::sleep(delay) => {}, + + } + delay = PEX_MESSAGE_INTERVAL; + let addrs_live_to_sent = self .peers .states @@ -856,7 +866,16 @@ impl TorrentStateLive { let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key()); if *addr != peer_addr { - if peer.state.is_live() && !sent_peers_live.contains(addr) { + let has_outgoing_connections = peer + .stats + .counters + .outgoing_connections + .load(Ordering::Relaxed) + > 0; // As per BEP 11 share only those we were able to connect + if peer.state.is_live() + && has_outgoing_connections + && !sent_peers_live.contains(addr) + { Some(*addr) } else { None @@ -865,7 +884,7 @@ impl TorrentStateLive { None } }) - .take(50) + .take(50) .collect::>(); let addrs_closed_to_sent = sent_peers_live @@ -881,28 +900,21 @@ impl TorrentStateLive { .take(MAX_SENT_PEERS) .collect::>(); - // BEP 11 - Dont send closed if they are now in live - // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, + // BEP 11 - Dont send closed if they are now in live + // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, // and addrs_closed_to_sent are only filtered addresses from sent_peers_live if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { - let pex_msg = extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); + debug!(peer=?peer_addr, "sending PEX with {} live ({:?})and {} closed peers", addrs_live_to_sent.len(), addrs_live_to_sent,addrs_closed_to_sent.len()); + let pex_msg = + extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); let msg = Message::Extended(ext_msg); - tx.send(WriterRequest::Message(msg))?; - - debug!(peer=?peer_addr, "sending PEX with {} live and {} closed peers", addrs_live_to_sent.len(), addrs_closed_to_sent.len()); - sent_peers_live.extend(&addrs_live_to_sent); - sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); - - - } - - tokio::select! { - _ = tx.closed() => return Ok(()), - _ = tokio::time::sleep(Duration::from_secs(60)) => {}, - + if tx.send(WriterRequest::Message(msg)).is_ok() { + sent_peers_live.extend(&addrs_live_to_sent); + sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); + } } } } diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index 0dc6b15..a7c24b9 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -123,42 +123,44 @@ where } impl UtPex { - - pub fn from_addrs<'a, I,J>(addrs_live: I, addrs_closed: J) -> Self + pub fn from_addrs<'a, I, J>(addrs_live: I, addrs_closed: J) -> Self where I: IntoIterator, J: IntoIterator, { - - - fn addrs_to_bytes<'a,I>(addrs: I) -> (Option, Option) + fn addrs_to_bytes<'a, I>(addrs: I) -> (Option, Option) where I: IntoIterator, - { - let mut ipv4_addrs = BytesMut::new(); - let mut ipv6_addrs = BytesMut::new(); - for addr in addrs { - match addr { - SocketAddr::V4(v4) => { - ipv4_addrs.extend_from_slice(&v4.ip().octets()); - ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes()); - } - SocketAddr::V6(v6) => { - ipv6_addrs.extend_from_slice(&v6.ip().octets()); - ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes()); + { + let mut ipv4_addrs = BytesMut::new(); + let mut ipv6_addrs = BytesMut::new(); + for addr in addrs { + match addr { + SocketAddr::V4(v4) => { + ipv4_addrs.extend_from_slice(&v4.ip().octets()); + ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes()); + } + SocketAddr::V6(v6) => { + ipv6_addrs.extend_from_slice(&v6.ip().octets()); + ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes()); + } } } + + let freeze = |buf: BytesMut| -> Option { + if !buf.is_empty() { + Some(buf.freeze().into()) + } else { + None + } + }; + + (freeze(ipv4_addrs), freeze(ipv6_addrs)) } - let freeze = |buf: BytesMut| -> Option { if !buf.is_empty() {Some(buf.freeze().into())} else {None} }; - - (freeze(ipv4_addrs), freeze(ipv6_addrs)) - } - let (added, added6) = addrs_to_bytes(addrs_live); let (dropped, dropped6) = addrs_to_bytes(addrs_closed); - Self { added, added6, @@ -166,9 +168,7 @@ impl UtPex { dropped6, ..Default::default() } - } - } #[cfg(test)] @@ -210,8 +210,12 @@ mod tests { let a1 = "185.159.157.20:46439".parse::().unwrap(); let a2 = "151.249.105.134:4240".parse::().unwrap(); //IPV6 - let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439".parse::().unwrap(); - let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240".parse::().unwrap(); + let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439" + .parse::() + .unwrap(); + let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240" + .parse::() + .unwrap(); let addrs = vec![a1, aa1, a2, aa2]; let pex = UtPex::from_addrs(&addrs, &addrs); @@ -230,7 +234,5 @@ mod tests { assert_eq!(a2, addrs2[1].addr); assert_eq!(aa1, addrs2[2].addr); assert_eq!(aa2, addrs2[3].addr); - - } } From 42d184ff6b0b0476696fa70a26c3595eef158a73 Mon Sep 17 00:00:00 2001 From: Ivan Date: Wed, 23 Oct 2024 13:42:52 +0200 Subject: [PATCH 04/14] Logging --- crates/librqbit/src/torrent_state/live/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index e5e4571..408d3b9 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -905,7 +905,7 @@ impl TorrentStateLive { // and addrs_closed_to_sent are only filtered addresses from sent_peers_live if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { - debug!(peer=?peer_addr, "sending PEX with {} live ({:?})and {} closed peers", addrs_live_to_sent.len(), addrs_live_to_sent,addrs_closed_to_sent.len()); + debug!("sending PEX with {} live ({:?})and {} closed peers", addrs_live_to_sent.len(), addrs_live_to_sent,addrs_closed_to_sent.len()); let pex_msg = extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); @@ -1859,6 +1859,7 @@ impl PeerHandler { B: AsRef<[u8]> + std::fmt::Debug, { // TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ... + debug!("received PEX message with {} added peers and {} dropped peers", msg.added_peers().count(), msg.dropped_peers().count()); msg.dropped_peers() .chain(msg.added_peers()) .for_each(|peer| { From 90a823b3edb8db584233276e4b1a8c8badb109d4 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sat, 9 Nov 2024 11:42:48 +0100 Subject: [PATCH 05/14] Small fixes base on comments in PR #261 --- crates/librqbit/src/torrent_state/live/mod.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 408d3b9..4b1806c 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -884,7 +884,7 @@ impl TorrentStateLive { None } }) - .take(50) + .take(MAX_SENT_PEERS) .collect::>(); let addrs_closed_to_sent = sent_peers_live @@ -905,16 +905,23 @@ impl TorrentStateLive { // and addrs_closed_to_sent are only filtered addresses from sent_peers_live if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { - debug!("sending PEX with {} live ({:?})and {} closed peers", addrs_live_to_sent.len(), addrs_live_to_sent,addrs_closed_to_sent.len()); + debug!( + "sending PEX with {} live ({:?})and {} closed peers", + addrs_live_to_sent.len(), + addrs_live_to_sent, + addrs_closed_to_sent.len() + ); let pex_msg = extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); let msg = Message::Extended(ext_msg); - if tx.send(WriterRequest::Message(msg)).is_ok() { - sent_peers_live.extend(&addrs_live_to_sent); - sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); + if tx.send(WriterRequest::Message(msg)).is_err() { + return Ok(()); // Peer disconnected } + + sent_peers_live.extend(&addrs_live_to_sent); + sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); } } } @@ -1859,7 +1866,6 @@ impl PeerHandler { B: AsRef<[u8]> + std::fmt::Debug, { // TODO: this is just first attempt at pex - will need more sophistication on adding peers - BEP 40, check number of live, seen peers ... - debug!("received PEX message with {} added peers and {} dropped peers", msg.added_peers().count(), msg.dropped_peers().count()); msg.dropped_peers() .chain(msg.added_peers()) .for_each(|peer| { From c8cd17ce8e853564ac54c33afa1b08e32b0cbe1e Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 28 Nov 2024 09:58:44 +0100 Subject: [PATCH 06/14] Format comments --- crates/librqbit/src/torrent_state/live/mod.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 4b1806c..b98d3e7 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -845,9 +845,13 @@ impl TorrentStateLive { tx: PeerTx, ) -> anyhow::Result<()> { let mut sent_peers_live: HashSet = HashSet::new(); - const MAX_SENT_PEERS: usize = 50; // As per BEP 11 we should not send more than 50 peers at once (here it also applies to fist message, should be OK as we anyhow really have more) - const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); // As per BEP 11 recommended interval is min 60 seconds - let mut delay = Duration::from_secs(10); // Wait 10 seconds before sending the first message to assure that peer will stay with us + // As per BEP 11 we should not send more than 50 peers at once + // (here it also applies to fist message, should be OK as we anyhow really have more) + const MAX_SENT_PEERS: usize = 50; + // As per BEP 11 recommended interval is min 60 seconds + const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); + // Wait 10 seconds before sending the first message to assure that peer will stay with us + let mut delay = Duration::from_secs(10); loop { tokio::select! { From 7a52721af9e27358b3f5dc1f456e0631cf3b7cb7 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 28 Nov 2024 10:32:58 +0100 Subject: [PATCH 07/14] Refactor for readability --- crates/librqbit/src/torrent_state/live/mod.rs | 117 ++++++++---------- 1 file changed, 55 insertions(+), 62 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index b98d3e7..ca2922a 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -841,91 +841,84 @@ impl TorrentStateLive { async fn task_send_pex_to_peer( self: Arc, - peer_addr: SocketAddr, + _peer_addr: SocketAddr, tx: PeerTx, ) -> anyhow::Result<()> { - let mut sent_peers_live: HashSet = HashSet::new(); // As per BEP 11 we should not send more than 50 peers at once // (here it also applies to fist message, should be OK as we anyhow really have more) const MAX_SENT_PEERS: usize = 50; // As per BEP 11 recommended interval is min 60 seconds const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); + + let mut live_peers = HashSet::new(); + let mut connected = Vec::with_capacity(MAX_SENT_PEERS); + let mut dropped = Vec::with_capacity(MAX_SENT_PEERS); + let mut peer_view_of_live_peers = HashSet::new(); + // Wait 10 seconds before sending the first message to assure that peer will stay with us - let mut delay = Duration::from_secs(10); + tokio::time::sleep(Duration::from_secs(10)).await; + + let mut interval = tokio::time::interval(PEX_MESSAGE_INTERVAL); loop { - tokio::select! { - _ = tx.closed() => return Ok(()), - _ = tokio::time::sleep(delay) => {}, + interval.tick().await; + // TODO: store them in a shared place + // Fill in live_peers + for ps in self.peers.states.iter() { + let peer = ps.value(); + let addr = *peer.outgoing_address.as_ref().unwrap_or_else(|| ps.key()); + + // As per BEP 11 share only those we were able to connect + let has_outgoing_connections = peer + .stats + .counters + .outgoing_connections + .load(Ordering::Relaxed) + > 0; + + let is_live = has_outgoing_connections && ps.value().state.is_live(); + if is_live { + live_peers.insert(addr); + } else { + live_peers.remove(&addr); + } } - delay = PEX_MESSAGE_INTERVAL; - let addrs_live_to_sent = self - .peers - .states - .iter() - .filter_map(|e| { - let peer = e.value(); - let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key()); + connected.clear(); + dropped.clear(); - if *addr != peer_addr { - let has_outgoing_connections = peer - .stats - .counters - .outgoing_connections - .load(Ordering::Relaxed) - > 0; // As per BEP 11 share only those we were able to connect - if peer.state.is_live() - && has_outgoing_connections - && !sent_peers_live.contains(addr) - { - Some(*addr) - } else { - None - } - } else { - None - } - }) - .take(MAX_SENT_PEERS) - .collect::>(); - - let addrs_closed_to_sent = sent_peers_live - .iter() - .filter(|addr| { - self.peers - .states - .get(addr) - .map(|p| !p.value().state.is_live()) - .unwrap_or(true) - }) - .copied() - .take(MAX_SENT_PEERS) - .collect::>(); + connected.extend( + live_peers + .difference(&peer_view_of_live_peers) + .take(MAX_SENT_PEERS) + .copied(), + ); + dropped.extend( + peer_view_of_live_peers + .difference(&live_peers) + .take(MAX_SENT_PEERS) + .copied(), + ); // BEP 11 - Dont send closed if they are now in live // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, // and addrs_closed_to_sent are only filtered addresses from sent_peers_live - if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { - debug!( - "sending PEX with {} live ({:?})and {} closed peers", - addrs_live_to_sent.len(), - addrs_live_to_sent, - addrs_closed_to_sent.len() - ); - let pex_msg = - extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); + if !connected.is_empty() || !dropped.is_empty() { + let pex_msg = extended::ut_pex::UtPex::from_addrs(&connected, &dropped); let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); - let msg = Message::Extended(ext_msg); - - if tx.send(WriterRequest::Message(msg)).is_err() { + if tx + .send(WriterRequest::Message(Message::Extended(ext_msg))) + .is_err() + { return Ok(()); // Peer disconnected } - sent_peers_live.extend(&addrs_live_to_sent); - sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); + for addr in &dropped { + peer_view_of_live_peers.remove(addr); + } + peer_view_of_live_peers.extend(connected.iter().copied()); } } } From adc2ca97b3cb4beabd62cd2b9ccc77c8e658d167 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 4 Dec 2024 10:03:08 +0000 Subject: [PATCH 08/14] A bit of refactoring to store live_peers --- crates/librqbit/src/torrent_state/live/mod.rs | 33 +++++++++---------- .../src/torrent_state/live/peer/mod.rs | 29 ++++++++-------- .../src/torrent_state/live/peers/mod.rs | 9 ++--- 3 files changed, 34 insertions(+), 37 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ca2922a..0712cb3 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -73,7 +73,6 @@ use peer_binary_protocol::{ }, Handshake, Message, MessageOwned, Piece, Request, }; -use peers::stats::atomic::AggregatePeerStatsAtomic; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, Notify, OwnedSemaphorePermit, Semaphore, @@ -258,6 +257,7 @@ impl TorrentStateLive { session_stats: session_stats.clone(), stats: Default::default(), states: Default::default(), + live_peers: Default::default(), }, locked: RwLock::new(TorrentStateLocked { chunks: Some(paused.chunk_tracker), @@ -337,10 +337,6 @@ impl TorrentStateLive { spawn_with_cancel(span, self.cancellation_token.clone(), fut); } - fn peer_stats(&self) -> [&AggregatePeerStatsAtomic; 2] { - [&self.peers.stats, &self.peers.session_stats.peers] - } - pub fn down_speed_estimator(&self) -> &SpeedEstimator { &self.down_speed_estimator } @@ -377,7 +373,7 @@ impl TorrentStateLive { .incoming_connection( Id20::new(checked_peer.handshake.peer_id), tx.clone(), - &self.peer_stats(), + &self.peers, ) .context("peer already existed")?; peer.stats.counters.clone() @@ -387,7 +383,7 @@ impl TorrentStateLive { let peer = Peer::new_live_for_incoming_connection( Id20::new(checked_peer.handshake.peer_id), tx.clone(), - &self.peer_stats(), + &self.peers, ); let counters = peer.stats.counters.clone(); vac.insert(peer); @@ -620,7 +616,7 @@ impl TorrentStateLive { fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { self.peers.with_peer_mut(handle, "set_peer_live", |p| { p.state - .connecting_to_live(Id20::new(h.peer_id), &self.peer_stats()); + .connecting_to_live(Id20::new(h.peer_id), &self.peers); }); } @@ -814,7 +810,7 @@ impl TorrentStateLive { for mut pe in self.peers.states.iter_mut() { if let PeerState::Live(l) = pe.value().state.get() { if l.has_full_torrent(self.lengths.total_pieces() as usize) { - let prev = pe.value_mut().state.set_not_needed(&self.peer_stats()); + let prev = pe.value_mut().state.set_not_needed(&self.peers); let _ = prev .take_live_no_counters() .unwrap() @@ -832,7 +828,7 @@ impl TorrentStateLive { .filter_map(|mut p| { let known_addr = *p.key(); p.value_mut() - .reconnect_not_needed_peer(known_addr, &self.peer_stats()) + .reconnect_not_needed_peer(known_addr, &self.peers) }) .map(|socket_addr| self.peer_queue_tx.send(socket_addr)) .take_while(|r| r.is_ok()) @@ -1113,7 +1109,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { impl PeerHandler { fn on_peer_died(self, error: Option) -> anyhow::Result<()> { let peers = &self.state.peers; - let pstats = self.state.peer_stats(); + let pstats = { + let this = &self.state; + &this.peers + }; let handle = self.addr; let mut pe = match peers.states.get_mut(&handle) { Some(peer) => TimedExistence::new(peer, "on_peer_died"), @@ -1122,7 +1121,7 @@ impl PeerHandler { return Ok(()); } }; - let prev = pe.value_mut().state.take(&pstats); + let prev = pe.value_mut().state.take(pstats); match prev { PeerState::Connecting(_) => {} @@ -1141,7 +1140,7 @@ impl PeerHandler { } PeerState::NotNeeded => { // Restore it as std::mem::take() replaced it above. - pe.value_mut().state.set(PeerState::NotNeeded, &pstats); + pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); } s @ PeerState::Queued | s @ PeerState::Dead => { @@ -1157,7 +1156,7 @@ impl PeerHandler { Some(e) => e, None => { trace!("peer died without errors, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, &pstats); + pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); } }; @@ -1166,11 +1165,11 @@ impl PeerHandler { if self.state.is_finished_and_no_active_streams() { debug!("torrent finished, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, &pstats); + pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); } - pe.value_mut().state.set(PeerState::Dead, &pstats); + pe.value_mut().state.set(PeerState::Dead, pstats); if self.incoming { // do not retry incoming peers @@ -1203,7 +1202,7 @@ impl PeerHandler { .with_peer_mut(handle, "dead_to_queued", |peer| { match peer.state.get() { PeerState::Dead => { - peer.state.set(PeerState::Queued, &self.state.peer_stats()) + peer.state.set(PeerState::Queued, &self.state.peers) } other => bail!( "peer is in unexpected state: {}. Expected dead", diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 3c6e6af..94e4daf 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -12,7 +12,7 @@ use tracing::debug; use crate::peer_connection::WriterRequest; use crate::type_aliases::BF; -use super::peers::stats::atomic::AggregatePeerStatsAtomic; +use super::PeerStates; pub(crate) type InflightRequest = ChunkInfo; pub(crate) type PeerRx = UnboundedReceiver; @@ -29,10 +29,10 @@ impl Peer { pub fn new_live_for_incoming_connection( peer_id: Id20, tx: PeerTx, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> Self { let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true))); - for counter in counters { + for counter in [&counters.session_stats.peers, &counters.stats] { counter.inc(&state.0); } Self { @@ -52,7 +52,7 @@ impl Peer { pub(crate) fn reconnect_not_needed_peer( &mut self, known_address: SocketAddr, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> Option { if let PeerState::NotNeeded = self.state.get() { match self.outgoing_address { @@ -124,18 +124,18 @@ impl PeerStateNoMut { &self.0 } - pub fn take(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { + pub fn take(&mut self, counters: &PeerStates) -> PeerState { self.set(Default::default(), counters) } - pub fn destroy(self, counters: &[&AggregatePeerStatsAtomic]) { - for counter in counters { + pub fn destroy(self, counters: &PeerStates) { + for counter in [&counters.session_stats.peers, &counters.stats] { counter.dec(&self.0); } } - pub fn set(&mut self, new: PeerState, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { - for counter in counters { + pub fn set(&mut self, new: PeerState, counters: &PeerStates) -> PeerState { + for counter in [&counters.session_stats.peers, &counters.stats] { counter.incdec(&self.0, &new); } std::mem::replace(&mut self.0, new) @@ -159,10 +159,7 @@ impl PeerStateNoMut { } } - pub fn idle_to_connecting( - &mut self, - counters: &[&AggregatePeerStatsAtomic], - ) -> Option<(PeerRx, PeerTx)> { + pub fn idle_to_connecting(&mut self, counters: &PeerStates) -> Option<(PeerRx, PeerTx)> { match &self.0 { PeerState::Queued | PeerState::NotNeeded => { let (tx, rx) = unbounded_channel(); @@ -178,7 +175,7 @@ impl PeerStateNoMut { &mut self, peer_id: Id20, tx: PeerTx, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> anyhow::Result<()> { if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { anyhow::bail!("peer already active"); @@ -198,7 +195,7 @@ impl PeerStateNoMut { pub fn connecting_to_live( &mut self, peer_id: Id20, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> Option<&mut LivePeerState> { if let PeerState::Connecting(_) = &self.0 { let tx = match self.take(counters) { @@ -215,7 +212,7 @@ impl PeerStateNoMut { } } - pub fn set_not_needed(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { + pub fn set_not_needed(&mut self, counters: &PeerStates) -> PeerState { self.set(PeerState::NotNeeded, counters) } } diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index 110df73..e68dafb 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -4,6 +4,7 @@ use anyhow::Context; use backoff::backoff::Backoff; use dashmap::DashMap; use librqbit_core::lengths::ValidPieceIndex; +use parking_lot::RwLock; use peer_binary_protocol::{Message, Request}; use crate::{ @@ -21,6 +22,7 @@ pub mod stats; pub(crate) struct PeerStates { pub session_stats: Arc, + pub live_peers: RwLock>, pub stats: AggregatePeerStatsAtomic, pub states: DashMap, } @@ -28,7 +30,7 @@ pub(crate) struct PeerStates { impl Drop for PeerStates { fn drop(&mut self) { for (_, p) in std::mem::take(&mut self.states).into_iter() { - p.state.destroy(&[&self.session_stats.peers]); + p.state.destroy(self); } } } @@ -115,7 +117,7 @@ impl PeerStates { let rx = self .with_peer_mut(h, "mark_peer_connecting", |peer| { peer.state - .idle_to_connecting(&[&self.stats, &self.session_stats.peers]) + .idle_to_connecting(self) .context("invalid peer state") }) .context("peer not found in states")??; @@ -130,8 +132,7 @@ impl PeerStates { pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { - peer.state - .set_not_needed(&[&self.stats, &self.session_stats.peers]) + peer.state.set_not_needed(self) })?; Some(prev) } From fa3e8d949bb2e16b6e5b42679f85e568f0faa723 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 4 Dec 2024 10:06:08 +0000 Subject: [PATCH 09/14] Peer struct knows its addr --- crates/librqbit/src/torrent_state/live/mod.rs | 1 + crates/librqbit/src/torrent_state/live/peer/mod.rs | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 0712cb3..eec629f 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -381,6 +381,7 @@ impl TorrentStateLive { Entry::Vacant(vac) => { atomic_inc(&self.peers.stats.seen); let peer = Peer::new_live_for_incoming_connection( + *vac.key(), Id20::new(checked_peer.handshake.peer_id), tx.clone(), &self.peers, diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 94e4daf..e173b96 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -18,8 +18,9 @@ pub(crate) type InflightRequest = ChunkInfo; pub(crate) type PeerRx = UnboundedReceiver; pub(crate) type PeerTx = UnboundedSender; -#[derive(Debug, Default)] +#[derive(Debug)] pub(crate) struct Peer { + pub addr: SocketAddr, pub state: PeerStateNoMut, pub stats: stats::atomic::PeerStats, pub outgoing_address: Option, @@ -27,6 +28,7 @@ pub(crate) struct Peer { impl Peer { pub fn new_live_for_incoming_connection( + addr: SocketAddr, peer_id: Id20, tx: PeerTx, counters: &PeerStates, @@ -36,6 +38,7 @@ impl Peer { counter.inc(&state.0); } Self { + addr, state, stats: Default::default(), outgoing_address: None, @@ -44,8 +47,10 @@ impl Peer { pub fn new_with_outgoing_address(addr: SocketAddr) -> Self { Self { + addr, outgoing_address: Some(addr), - ..Default::default() + stats: Default::default(), + state: Default::default(), } } From 85b65dcef5a3f477231dc49306ae566783cd7648 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 4 Dec 2024 10:08:51 +0000 Subject: [PATCH 10/14] Rename a couple methods --- crates/librqbit/src/torrent_state/live/mod.rs | 8 ++++---- crates/librqbit/src/torrent_state/live/peer/mod.rs | 10 +++++----- .../src/torrent_state/live/peer/stats/snapshot.rs | 2 +- crates/librqbit/src/torrent_state/live/peers/mod.rs | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index eec629f..975844e 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -674,7 +674,7 @@ impl TorrentStateLive { .peers .states .iter() - .filter(|e| filter.state.matches(e.value().state.get())) + .filter(|e| filter.state.matches(e.value().state.get_state())) .map(|e| (e.key().to_string(), e.value().into())) .collect(), } @@ -809,7 +809,7 @@ impl TorrentStateLive { fn disconnect_all_peers_that_have_full_torrent(&self) { for mut pe in self.peers.states.iter_mut() { - if let PeerState::Live(l) = pe.value().state.get() { + if let PeerState::Live(l) = pe.value().state.get_state() { if l.has_full_torrent(self.lengths.total_pieces() as usize) { let prev = pe.value_mut().state.set_not_needed(&self.peers); let _ = prev @@ -1122,7 +1122,7 @@ impl PeerHandler { return Ok(()); } }; - let prev = pe.value_mut().state.take(pstats); + let prev = pe.value_mut().state.take_state(pstats); match prev { PeerState::Connecting(_) => {} @@ -1201,7 +1201,7 @@ impl PeerHandler { self.state .peers .with_peer_mut(handle, "dead_to_queued", |peer| { - match peer.state.get() { + match peer.state.get_state() { PeerState::Dead => { peer.state.set(PeerState::Queued, &self.state.peers) } diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index e173b96..6c35abb 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -59,7 +59,7 @@ impl Peer { known_address: SocketAddr, counters: &PeerStates, ) -> Option { - if let PeerState::NotNeeded = self.state.get() { + if let PeerState::NotNeeded = self.state.get_state() { match self.outgoing_address { None => None, Some(socket_addr) => { @@ -125,11 +125,11 @@ impl PeerState { pub(crate) struct PeerStateNoMut(PeerState); impl PeerStateNoMut { - pub fn get(&self) -> &PeerState { + pub fn get_state(&self) -> &PeerState { &self.0 } - pub fn take(&mut self, counters: &PeerStates) -> PeerState { + pub fn take_state(&mut self, counters: &PeerStates) -> PeerState { self.set(Default::default(), counters) } @@ -185,7 +185,7 @@ impl PeerStateNoMut { if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { anyhow::bail!("peer already active"); } - match self.take(counters) { + match self.take_state(counters) { PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => { self.set( PeerState::Live(LivePeerState::new(peer_id, tx, true)), @@ -203,7 +203,7 @@ impl PeerStateNoMut { counters: &PeerStates, ) -> Option<&mut LivePeerState> { if let PeerState::Connecting(_) = &self.0 { - let tx = match self.take(counters) { + let tx = match self.take_state(counters) { PeerState::Connecting(tx) => tx, _ => unreachable!(), }; diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs index 471e3f7..602ef5d 100644 --- a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs +++ b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs @@ -51,7 +51,7 @@ impl From<&Peer> for PeerStats { fn from(peer: &Peer) -> Self { Self { counters: peer.stats.counters.as_ref().into(), - state: peer.state.get().name(), + state: peer.state.get_state().name(), } } } diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index e68dafb..602d59f 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -87,7 +87,7 @@ impl PeerStates { pub fn drop_peer(&self, handle: PeerHandle) -> Option { let p = self.states.remove(&handle).map(|r| r.1)?; - let s = p.state.get(); + let s = p.state.get_state(); self.stats.dec(s); self.session_stats.peers.dec(s); From 956e6d4f080ea1bb3e28216afdd52217b1bbbe04 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 4 Dec 2024 10:12:04 +0000 Subject: [PATCH 11/14] Move all methods to Peer, not PeerStateNoMut --- crates/librqbit/src/torrent_state/live/mod.rs | 38 ++++++++-------- .../src/torrent_state/live/peer/mod.rs | 43 ++++++++++--------- .../torrent_state/live/peer/stats/snapshot.rs | 2 +- .../src/torrent_state/live/peers/mod.rs | 14 +++--- 4 files changed, 48 insertions(+), 49 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 975844e..5dbfa7e 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -369,13 +369,12 @@ impl TorrentStateLive { let counters = match self.peers.states.entry(checked_peer.addr) { Entry::Occupied(mut occ) => { let peer = occ.get_mut(); - peer.state - .incoming_connection( - Id20::new(checked_peer.handshake.peer_id), - tx.clone(), - &self.peers, - ) - .context("peer already existed")?; + peer.incoming_connection( + Id20::new(checked_peer.handshake.peer_id), + tx.clone(), + &self.peers, + ) + .context("peer already existed")?; peer.stats.counters.clone() } Entry::Vacant(vac) => { @@ -616,8 +615,7 @@ impl TorrentStateLive { fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { self.peers.with_peer_mut(handle, "set_peer_live", |p| { - p.state - .connecting_to_live(Id20::new(h.peer_id), &self.peers); + p.connecting_to_live(Id20::new(h.peer_id), &self.peers); }); } @@ -674,7 +672,7 @@ impl TorrentStateLive { .peers .states .iter() - .filter(|e| filter.state.matches(e.value().state.get_state())) + .filter(|e| filter.state.matches(e.value().get_state())) .map(|e| (e.key().to_string(), e.value().into())) .collect(), } @@ -809,9 +807,9 @@ impl TorrentStateLive { fn disconnect_all_peers_that_have_full_torrent(&self) { for mut pe in self.peers.states.iter_mut() { - if let PeerState::Live(l) = pe.value().state.get_state() { + if let PeerState::Live(l) = pe.value().get_state() { if l.has_full_torrent(self.lengths.total_pieces() as usize) { - let prev = pe.value_mut().state.set_not_needed(&self.peers); + let prev = pe.value_mut().set_not_needed(&self.peers); let _ = prev .take_live_no_counters() .unwrap() @@ -874,7 +872,7 @@ impl TorrentStateLive { .load(Ordering::Relaxed) > 0; - let is_live = has_outgoing_connections && ps.value().state.is_live(); + let is_live = has_outgoing_connections && ps.value().is_live(); if is_live { live_peers.insert(addr); } else { @@ -1122,7 +1120,7 @@ impl PeerHandler { return Ok(()); } }; - let prev = pe.value_mut().state.take_state(pstats); + let prev = pe.value_mut().take_state(pstats); match prev { PeerState::Connecting(_) => {} @@ -1141,7 +1139,7 @@ impl PeerHandler { } PeerState::NotNeeded => { // Restore it as std::mem::take() replaced it above. - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().set_state(PeerState::NotNeeded, pstats); return Ok(()); } s @ PeerState::Queued | s @ PeerState::Dead => { @@ -1157,7 +1155,7 @@ impl PeerHandler { Some(e) => e, None => { trace!("peer died without errors, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().set_state(PeerState::NotNeeded, pstats); return Ok(()); } }; @@ -1166,11 +1164,11 @@ impl PeerHandler { if self.state.is_finished_and_no_active_streams() { debug!("torrent finished, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().set_state(PeerState::NotNeeded, pstats); return Ok(()); } - pe.value_mut().state.set(PeerState::Dead, pstats); + pe.value_mut().set_state(PeerState::Dead, pstats); if self.incoming { // do not retry incoming peers @@ -1201,9 +1199,9 @@ impl PeerHandler { self.state .peers .with_peer_mut(handle, "dead_to_queued", |peer| { - match peer.state.get_state() { + match peer.get_state() { PeerState::Dead => { - peer.state.set(PeerState::Queued, &self.state.peers) + peer.set_state(PeerState::Queued, &self.state.peers) } other => bail!( "peer is in unexpected state: {}. Expected dead", diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 6c35abb..fa4a870 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -21,7 +21,7 @@ pub(crate) type PeerTx = UnboundedSender; #[derive(Debug)] pub(crate) struct Peer { pub addr: SocketAddr, - pub state: PeerStateNoMut, + state: PeerStateNoMut, pub stats: stats::atomic::PeerStats, pub outgoing_address: Option, } @@ -59,12 +59,12 @@ impl Peer { known_address: SocketAddr, counters: &PeerStates, ) -> Option { - if let PeerState::NotNeeded = self.state.get_state() { + if let PeerState::NotNeeded = self.get_state() { match self.outgoing_address { None => None, Some(socket_addr) => { if known_address == socket_addr { - self.state.set(PeerState::Queued, counters); + self.set_state(PeerState::Queued, counters); } else { debug!( peer = known_address.to_string(), @@ -124,52 +124,52 @@ impl PeerState { #[derive(Debug, Default)] pub(crate) struct PeerStateNoMut(PeerState); -impl PeerStateNoMut { +impl Peer { pub fn get_state(&self) -> &PeerState { - &self.0 + &self.state.0 } pub fn take_state(&mut self, counters: &PeerStates) -> PeerState { - self.set(Default::default(), counters) + self.set_state(Default::default(), counters) } pub fn destroy(self, counters: &PeerStates) { for counter in [&counters.session_stats.peers, &counters.stats] { - counter.dec(&self.0); + counter.dec(&self.state.0); } } - pub fn set(&mut self, new: PeerState, counters: &PeerStates) -> PeerState { + pub fn set_state(&mut self, new: PeerState, counters: &PeerStates) -> PeerState { for counter in [&counters.session_stats.peers, &counters.stats] { - counter.incdec(&self.0, &new); + counter.incdec(&self.state.0, &new); } - std::mem::replace(&mut self.0, new) + std::mem::replace(&mut self.state.0, new) } pub fn get_live(&self) -> Option<&LivePeerState> { - match &self.0 { + match &self.state.0 { PeerState::Live(l) => Some(l), _ => None, } } pub fn is_live(&self) -> bool { - matches!(&self.0, PeerState::Live(_)) + matches!(&self.state.0, PeerState::Live(_)) } pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { - match &mut self.0 { + match &mut self.state.0 { PeerState::Live(l) => Some(l), _ => None, } } pub fn idle_to_connecting(&mut self, counters: &PeerStates) -> Option<(PeerRx, PeerTx)> { - match &self.0 { + match &self.state.0 { PeerState::Queued | PeerState::NotNeeded => { let (tx, rx) = unbounded_channel(); let tx_2 = tx.clone(); - self.set(PeerState::Connecting(tx), counters); + self.set_state(PeerState::Connecting(tx), counters); Some((rx, tx_2)) } _ => None, @@ -182,12 +182,15 @@ impl PeerStateNoMut { tx: PeerTx, counters: &PeerStates, ) -> anyhow::Result<()> { - if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { + if matches!( + &self.state.0, + PeerState::Connecting(..) | PeerState::Live(..) + ) { anyhow::bail!("peer already active"); } match self.take_state(counters) { PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => { - self.set( + self.set_state( PeerState::Live(LivePeerState::new(peer_id, tx, true)), counters, ); @@ -202,12 +205,12 @@ impl PeerStateNoMut { peer_id: Id20, counters: &PeerStates, ) -> Option<&mut LivePeerState> { - if let PeerState::Connecting(_) = &self.0 { + if let PeerState::Connecting(_) = &self.state.0 { let tx = match self.take_state(counters) { PeerState::Connecting(tx) => tx, _ => unreachable!(), }; - self.set( + self.set_state( PeerState::Live(LivePeerState::new(peer_id, tx, false)), counters, ); @@ -218,7 +221,7 @@ impl PeerStateNoMut { } pub fn set_not_needed(&mut self, counters: &PeerStates) -> PeerState { - self.set(PeerState::NotNeeded, counters) + self.set_state(PeerState::NotNeeded, counters) } } diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs index 602ef5d..4d6e2b3 100644 --- a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs +++ b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs @@ -51,7 +51,7 @@ impl From<&Peer> for PeerStats { fn from(peer: &Peer) -> Self { Self { counters: peer.stats.counters.as_ref().into(), - state: peer.state.get_state().name(), + state: peer.get_state().name(), } } } diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index 602d59f..4311273 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -30,7 +30,7 @@ pub(crate) struct PeerStates { impl Drop for PeerStates { fn drop(&mut self) { for (_, p) in std::mem::take(&mut self.states).into_iter() { - p.state.destroy(self); + p.destroy(self); } } } @@ -71,7 +71,7 @@ impl PeerStates { } pub fn with_live(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option { - self.with_peer(addr, |peer| peer.state.get_live().map(f)) + self.with_peer(addr, |peer| peer.get_live().map(f)) .flatten() } @@ -81,13 +81,13 @@ impl PeerStates { reason: &'static str, f: impl FnOnce(&mut LivePeerState) -> R, ) -> Option { - self.with_peer_mut(addr, reason, |peer| peer.state.get_live_mut().map(f)) + self.with_peer_mut(addr, reason, |peer| peer.get_live_mut().map(f)) .flatten() } pub fn drop_peer(&self, handle: PeerHandle) -> Option { let p = self.states.remove(&handle).map(|r| r.1)?; - let s = p.state.get_state(); + let s = p.get_state(); self.stats.dec(s); self.session_stats.peers.dec(s); @@ -116,9 +116,7 @@ impl PeerStates { pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<(PeerRx, PeerTx)> { let rx = self .with_peer_mut(h, "mark_peer_connecting", |peer| { - peer.state - .idle_to_connecting(self) - .context("invalid peer state") + peer.idle_to_connecting(self).context("invalid peer state") }) .context("peer not found in states")??; Ok(rx) @@ -132,7 +130,7 @@ impl PeerStates { pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { - peer.state.set_not_needed(self) + peer.set_not_needed(self) })?; Some(prev) } From 9d71eb487205dd28cb37ff2534f40ad7e916963b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 4 Dec 2024 10:17:42 +0000 Subject: [PATCH 12/14] Remove PeerStateNoMut --- .../src/torrent_state/live/peer/mod.rs | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index fa4a870..f7f2dac 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -21,7 +21,7 @@ pub(crate) type PeerTx = UnboundedSender; #[derive(Debug)] pub(crate) struct Peer { pub addr: SocketAddr, - state: PeerStateNoMut, + state: PeerState, pub stats: stats::atomic::PeerStats, pub outgoing_address: Option, } @@ -33,9 +33,9 @@ impl Peer { tx: PeerTx, counters: &PeerStates, ) -> Self { - let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true))); + let state = PeerState::Live(LivePeerState::new(peer_id, tx, true)); for counter in [&counters.session_stats.peers, &counters.stats] { - counter.inc(&state.0); + counter.inc(&state); } Self { addr, @@ -121,12 +121,9 @@ impl PeerState { } } -#[derive(Debug, Default)] -pub(crate) struct PeerStateNoMut(PeerState); - impl Peer { pub fn get_state(&self) -> &PeerState { - &self.state.0 + &self.state } pub fn take_state(&mut self, counters: &PeerStates) -> PeerState { @@ -135,37 +132,46 @@ impl Peer { pub fn destroy(self, counters: &PeerStates) { for counter in [&counters.session_stats.peers, &counters.stats] { - counter.dec(&self.state.0); + counter.dec(&self.state); + } + if matches!(&self.state, PeerState::Live(..)) { + counters.live_peers.write().retain(|a| *a != self.addr); } } pub fn set_state(&mut self, new: PeerState, counters: &PeerStates) -> PeerState { for counter in [&counters.session_stats.peers, &counters.stats] { - counter.incdec(&self.state.0, &new); + counter.incdec(&self.state, &new); } - std::mem::replace(&mut self.state.0, new) + if matches!(&self.state, PeerState::Live(..)) { + counters.live_peers.write().retain(|a| *a != self.addr); + } + if matches!(&new, PeerState::Live(..)) { + counters.live_peers.write().push(self.addr); + } + std::mem::replace(&mut self.state, new) } pub fn get_live(&self) -> Option<&LivePeerState> { - match &self.state.0 { + match &self.state { PeerState::Live(l) => Some(l), _ => None, } } pub fn is_live(&self) -> bool { - matches!(&self.state.0, PeerState::Live(_)) + matches!(&self.state, PeerState::Live(_)) } pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { - match &mut self.state.0 { + match &mut self.state { PeerState::Live(l) => Some(l), _ => None, } } pub fn idle_to_connecting(&mut self, counters: &PeerStates) -> Option<(PeerRx, PeerTx)> { - match &self.state.0 { + match &self.state { PeerState::Queued | PeerState::NotNeeded => { let (tx, rx) = unbounded_channel(); let tx_2 = tx.clone(); @@ -182,10 +188,7 @@ impl Peer { tx: PeerTx, counters: &PeerStates, ) -> anyhow::Result<()> { - if matches!( - &self.state.0, - PeerState::Connecting(..) | PeerState::Live(..) - ) { + if matches!(&self.state, PeerState::Connecting(..) | PeerState::Live(..)) { anyhow::bail!("peer already active"); } match self.take_state(counters) { @@ -205,7 +208,7 @@ impl Peer { peer_id: Id20, counters: &PeerStates, ) -> Option<&mut LivePeerState> { - if let PeerState::Connecting(_) = &self.state.0 { + if let PeerState::Connecting(_) = &self.state { let tx = match self.take_state(counters) { PeerState::Connecting(tx) => tx, _ => unreachable!(), From ac775affefdde6d3d8d4aa47a03e7dae03850ed2 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 4 Dec 2024 10:21:10 +0000 Subject: [PATCH 13/14] nit: decrease branch nesting --- crates/librqbit/src/torrent_state/live/mod.rs | 6 +----- .../src/torrent_state/live/peer/mod.rs | 19 +++++++++---------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 5dbfa7e..b0e17c7 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -824,11 +824,7 @@ impl TorrentStateLive { self.peers .states .iter_mut() - .filter_map(|mut p| { - let known_addr = *p.key(); - p.value_mut() - .reconnect_not_needed_peer(known_addr, &self.peers) - }) + .filter_map(|mut p| p.value_mut().reconnect_not_needed_peer(&self.peers)) .map(|socket_addr| self.peer_queue_tx.send(socket_addr)) .take_while(|r| r.is_ok()) .last(); diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index f7f2dac..c495f43 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -56,22 +56,21 @@ impl Peer { pub(crate) fn reconnect_not_needed_peer( &mut self, - known_address: SocketAddr, counters: &PeerStates, ) -> Option { if let PeerState::NotNeeded = self.get_state() { match self.outgoing_address { None => None, + Some(socket_addr) if self.addr == socket_addr => { + self.set_state(PeerState::Queued, counters); + Some(socket_addr) + } Some(socket_addr) => { - if known_address == socket_addr { - self.set_state(PeerState::Queued, counters); - } else { - debug!( - peer = known_address.to_string(), - outgoing_addr = socket_addr.to_string(), - "peer will by retried on different address", - ); - } + debug!( + peer = %self.addr, + outgoing_addr = %socket_addr, + "peer will by retried on different address", + ); Some(socket_addr) } } From fb760b282ed62aa49ac09f0bd3ed8cc0a6104fc5 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 4 Dec 2024 10:36:22 +0000 Subject: [PATCH 14/14] Only successful outgoing connections marked --- crates/librqbit/src/torrent_state/live/mod.rs | 68 ++++++------------- .../src/torrent_state/live/peer/mod.rs | 29 +++++--- .../src/torrent_state/live/peers/mod.rs | 6 +- 3 files changed, 44 insertions(+), 59 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index b0e17c7..159bb60 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -257,7 +257,7 @@ impl TorrentStateLive { session_stats: session_stats.clone(), stats: Default::default(), states: Default::default(), - live_peers: Default::default(), + live_outgoing_peers: Default::default(), }, locked: RwLock::new(TorrentStateLocked { chunks: Some(paused.chunk_tracker), @@ -841,7 +841,6 @@ impl TorrentStateLive { // As per BEP 11 recommended interval is min 60 seconds const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); - let mut live_peers = HashSet::new(); let mut connected = Vec::with_capacity(MAX_SENT_PEERS); let mut dropped = Vec::with_capacity(MAX_SENT_PEERS); let mut peer_view_of_live_peers = HashSet::new(); @@ -854,44 +853,25 @@ impl TorrentStateLive { loop { interval.tick().await; - // TODO: store them in a shared place - // Fill in live_peers - for ps in self.peers.states.iter() { - let peer = ps.value(); - let addr = *peer.outgoing_address.as_ref().unwrap_or_else(|| ps.key()); + { + let live_peers = self.peers.live_outgoing_peers.read(); + connected.clear(); + dropped.clear(); - // As per BEP 11 share only those we were able to connect - let has_outgoing_connections = peer - .stats - .counters - .outgoing_connections - .load(Ordering::Relaxed) - > 0; - - let is_live = has_outgoing_connections && ps.value().is_live(); - if is_live { - live_peers.insert(addr); - } else { - live_peers.remove(&addr); - } + connected.extend( + live_peers + .difference(&peer_view_of_live_peers) + .take(MAX_SENT_PEERS) + .copied(), + ); + dropped.extend( + peer_view_of_live_peers + .difference(&live_peers) + .take(MAX_SENT_PEERS) + .copied(), + ); } - connected.clear(); - dropped.clear(); - - connected.extend( - live_peers - .difference(&peer_view_of_live_peers) - .take(MAX_SENT_PEERS) - .copied(), - ); - dropped.extend( - peer_view_of_live_peers - .difference(&live_peers) - .take(MAX_SENT_PEERS) - .copied(), - ); - // BEP 11 - Dont send closed if they are now in live // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, // and addrs_closed_to_sent are only filtered addresses from sent_peers_live @@ -1104,10 +1084,6 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { impl PeerHandler { fn on_peer_died(self, error: Option) -> anyhow::Result<()> { let peers = &self.state.peers; - let pstats = { - let this = &self.state; - &this.peers - }; let handle = self.addr; let mut pe = match peers.states.get_mut(&handle) { Some(peer) => TimedExistence::new(peer, "on_peer_died"), @@ -1116,7 +1092,7 @@ impl PeerHandler { return Ok(()); } }; - let prev = pe.value_mut().take_state(pstats); + let prev = pe.value_mut().take_state(peers); match prev { PeerState::Connecting(_) => {} @@ -1135,7 +1111,7 @@ impl PeerHandler { } PeerState::NotNeeded => { // Restore it as std::mem::take() replaced it above. - pe.value_mut().set_state(PeerState::NotNeeded, pstats); + pe.value_mut().set_state(PeerState::NotNeeded, peers); return Ok(()); } s @ PeerState::Queued | s @ PeerState::Dead => { @@ -1151,7 +1127,7 @@ impl PeerHandler { Some(e) => e, None => { trace!("peer died without errors, not re-queueing"); - pe.value_mut().set_state(PeerState::NotNeeded, pstats); + pe.value_mut().set_state(PeerState::NotNeeded, peers); return Ok(()); } }; @@ -1160,11 +1136,11 @@ impl PeerHandler { if self.state.is_finished_and_no_active_streams() { debug!("torrent finished, not re-queueing"); - pe.value_mut().set_state(PeerState::NotNeeded, pstats); + pe.value_mut().set_state(PeerState::NotNeeded, peers); return Ok(()); } - pe.value_mut().set_state(PeerState::Dead, pstats); + pe.value_mut().set_state(PeerState::Dead, peers); if self.incoming { // do not retry incoming peers diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index c495f43..fc1d04f 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -2,6 +2,7 @@ pub mod stats; use std::collections::HashSet; use std::net::SocketAddr; +use std::sync::atomic::Ordering; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::ChunkInfo; @@ -133,8 +134,8 @@ impl Peer { for counter in [&counters.session_stats.peers, &counters.stats] { counter.dec(&self.state); } - if matches!(&self.state, PeerState::Live(..)) { - counters.live_peers.write().retain(|a| *a != self.addr); + if let (Some(addr), PeerState::Live(..)) = (self.outgoing_address, &self.state) { + counters.live_outgoing_peers.write().remove(&addr); } } @@ -142,12 +143,22 @@ impl Peer { for counter in [&counters.session_stats.peers, &counters.stats] { counter.incdec(&self.state, &new); } - if matches!(&self.state, PeerState::Live(..)) { - counters.live_peers.write().retain(|a| *a != self.addr); - } - if matches!(&new, PeerState::Live(..)) { - counters.live_peers.write().push(self.addr); + if let Some(addr) = self.outgoing_address { + if matches!(&self.state, PeerState::Live(..)) { + counters.live_outgoing_peers.write().remove(&addr); + } + if matches!(&new, PeerState::Live(..)) + && self + .stats + .counters + .outgoing_connections + .load(Ordering::Relaxed) + > 0 + { + counters.live_outgoing_peers.write().insert(addr); + } } + std::mem::replace(&mut self.state, new) } @@ -158,10 +169,6 @@ impl Peer { } } - pub fn is_live(&self) -> bool { - matches!(&self.state, PeerState::Live(_)) - } - pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { match &mut self.state { PeerState::Live(l) => Some(l), diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index 4311273..17c0758 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{collections::HashSet, net::SocketAddr, sync::Arc}; use anyhow::Context; use backoff::backoff::Backoff; @@ -22,7 +22,9 @@ pub mod stats; pub(crate) struct PeerStates { pub session_stats: Arc, - pub live_peers: RwLock>, + + // This keeps track of live addresses we connected to, for PEX. + pub live_outgoing_peers: RwLock>, pub stats: AggregatePeerStatsAtomic, pub states: DashMap, }