From a483bc658604877336a7f1289831660ee9fe3d7b Mon Sep 17 00:00:00 2001 From: Ivan Date: Wed, 25 Sep 2024 22:32:38 +0200 Subject: [PATCH 1/8] Update outgoing address for incoming peers --- crates/librqbit/src/torrent_state/live/mod.rs | 11 +++++++++++ crates/librqbit/src/torrent_state/live/peer/mod.rs | 3 +++ 2 files changed, 14 insertions(+) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 5cd70ad..13e10a5 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -919,6 +919,17 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { if let Some(peer_pex_msg_id) = hs.ut_pex() { trace!("peer supports pex at {peer_pex_msg_id}"); } + if let Some(port) = hs.p { + // Lets update outgoing Socket address for incoming connection + if self.incoming { + if let Ok(port) = >::try_into(port) { + let outgoing_addr = SocketAddr::new(self.addr.ip(), port); + self.state.peers.with_peer_mut(self.addr, "update outgoing addr", + |peer| peer.outgoing_address=Some(outgoing_addr)); + } + + } + } Ok(()) } diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 9efd6b0..d932f55 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -1,6 +1,7 @@ pub mod stats; use std::collections::HashSet; +use std::net::SocketAddr; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::ChunkInfo; @@ -20,6 +21,7 @@ pub(crate) type PeerTx = UnboundedSender; pub(crate) struct Peer { pub state: PeerStateNoMut, pub stats: stats::atomic::PeerStats, + pub outgoing_address: Option, } impl Peer { @@ -35,6 +37,7 @@ impl Peer { Self { state, stats: Default::default(), + outgoing_address: None, } } } From 4bd757173d0e4d6c8e22b7c0d8e6d04f8fce24ea Mon Sep 17 00:00:00 2001 From: Ivan Date: Thu, 26 Sep 2024 18:11:05 +0200 Subject: [PATCH 2/8] Set outgoing IP address for incomming peers after Ext. handshake --- crates/librqbit/src/torrent_state/live/mod.rs | 22 +++++++++++-------- .../src/extended/handshake.rs | 22 +++++++++++++++++++ 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 13e10a5..65e7918 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -784,10 +784,11 @@ impl TorrentStateLive { pub(crate) fn reconnect_all_not_needed_peers(&self) { for mut pe in self.peers.states.iter_mut() { - if pe.state.not_needed_to_queued(&self.peer_stats()) - && self.peer_queue_tx.send(*pe.key()).is_err() - { - return; + if pe.state.not_needed_to_queued(&self.peer_stats()) { + let retry_addr = pe.value().outgoing_address.unwrap_or_else(|| *pe.key()); + if self.peer_queue_tx.send(retry_addr).is_err() { + return; + } } } } @@ -922,12 +923,15 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { if let Some(port) = hs.p { // Lets update outgoing Socket address for incoming connection if self.incoming { - if let Ok(port) = >::try_into(port) { - let outgoing_addr = SocketAddr::new(self.addr.ip(), port); - self.state.peers.with_peer_mut(self.addr, "update outgoing addr", - |peer| peer.outgoing_address=Some(outgoing_addr)); + if let Ok(port) = hs.port() { + let peer_ip = hs.ip_addr().unwrap_or(self.addr.ip()); + let outgoing_addr = SocketAddr::new(peer_ip, port); + self.state + .peers + .with_peer_mut(self.addr, "update outgoing addr", |peer| { + peer.outgoing_address = Some(outgoing_addr) + }); } - } } Ok(()) diff --git a/crates/peer_binary_protocol/src/extended/handshake.rs b/crates/peer_binary_protocol/src/extended/handshake.rs index 721fff8..65af794 100644 --- a/crates/peer_binary_protocol/src/extended/handshake.rs +++ b/crates/peer_binary_protocol/src/extended/handshake.rs @@ -69,6 +69,28 @@ where ut_pex: self.ut_pex(), } } + + pub fn ip_addr(&self) -> Option { + if let Some(b) = self.ipv4 { + let b = b.as_slice(); + if b.len() == 4 { + let ip_bytes: &[u8; 4] = b[0..4].try_into().unwrap(); // Safe to unwrap as we check slice length + return Some(IpAddr::from(*ip_bytes)); + } + } + if let Some(b) = self.ipv6 { + let b = b.as_slice(); + if b.len() == 16 { + let ip_bytes: &[u8; 16] = b[0..16].try_into().unwrap(); // Safe to unwrap as we check slice length + return Some(IpAddr::from(*ip_bytes)); + } + } + None + } + + pub fn port(&self) -> Option { + self.p.and_then(|p| u16::try_from(p).ok()) + } } impl CloneToOwned for ExtendedHandshake From aec1eeef348a69ab4df0752636b9e8e56b9ca448 Mon Sep 17 00:00:00 2001 From: Ivan Date: Thu, 26 Sep 2024 19:15:16 +0200 Subject: [PATCH 3/8] Enum pro outgoing addr and some fixes --- crates/librqbit/src/torrent_state/live/mod.rs | 29 ++++++++++--------- .../src/torrent_state/live/peer/mod.rs | 12 ++++++-- .../src/extended/handshake.rs | 4 +-- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 65e7918..f58357c 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -66,6 +66,7 @@ use librqbit_core::{ torrent_metainfo::TorrentMetaV1Info, }; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use peer::OutgoingAddressType; use peer_binary_protocol::{ extended::{ handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, @@ -785,7 +786,11 @@ impl TorrentStateLive { pub(crate) fn reconnect_all_not_needed_peers(&self) { for mut pe in self.peers.states.iter_mut() { if pe.state.not_needed_to_queued(&self.peer_stats()) { - let retry_addr = pe.value().outgoing_address.unwrap_or_else(|| *pe.key()); + let retry_addr = match pe.value().outgoing_address { + peer::OutgoingAddressType::Default => *pe.key(), + peer::OutgoingAddressType::None => continue, + peer::OutgoingAddressType::Known(socket_addr) => socket_addr, + }; if self.peer_queue_tx.send(retry_addr).is_err() { return; } @@ -920,18 +925,16 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { if let Some(peer_pex_msg_id) = hs.ut_pex() { trace!("peer supports pex at {peer_pex_msg_id}"); } - if let Some(port) = hs.p { - // Lets update outgoing Socket address for incoming connection - if self.incoming { - if let Ok(port) = hs.port() { - let peer_ip = hs.ip_addr().unwrap_or(self.addr.ip()); - let outgoing_addr = SocketAddr::new(peer_ip, port); - self.state - .peers - .with_peer_mut(self.addr, "update outgoing addr", |peer| { - peer.outgoing_address = Some(outgoing_addr) - }); - } + // Lets update outgoing Socket address for incoming connection + if self.incoming { + if let Some(port) = hs.port() { + let peer_ip = hs.ip_addr().unwrap_or(self.addr.ip()); + let outgoing_addr = SocketAddr::new(peer_ip, port); + self.state + .peers + .with_peer_mut(self.addr, "update outgoing addr", |peer| { + peer.outgoing_address = OutgoingAddressType::Known(outgoing_addr) + }); } } Ok(()) diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index d932f55..ac2521b 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -17,11 +17,19 @@ pub(crate) type InflightRequest = ChunkInfo; pub(crate) type PeerRx = UnboundedReceiver; pub(crate) type PeerTx = UnboundedSender; +#[derive(Debug, Default)] +pub(crate) enum OutgoingAddressType { + #[default] + Default, + None, + Known(SocketAddr), +} + #[derive(Debug, Default)] pub(crate) struct Peer { pub state: PeerStateNoMut, pub stats: stats::atomic::PeerStats, - pub outgoing_address: Option, + pub outgoing_address: OutgoingAddressType, } impl Peer { @@ -37,7 +45,7 @@ impl Peer { Self { state, stats: Default::default(), - outgoing_address: None, + outgoing_address: OutgoingAddressType::None, } } } diff --git a/crates/peer_binary_protocol/src/extended/handshake.rs b/crates/peer_binary_protocol/src/extended/handshake.rs index 65af794..66a004f 100644 --- a/crates/peer_binary_protocol/src/extended/handshake.rs +++ b/crates/peer_binary_protocol/src/extended/handshake.rs @@ -71,14 +71,14 @@ where } pub fn ip_addr(&self) -> Option { - if let Some(b) = self.ipv4 { + if let Some(ref b) = self.ipv4 { let b = b.as_slice(); if b.len() == 4 { let ip_bytes: &[u8; 4] = b[0..4].try_into().unwrap(); // Safe to unwrap as we check slice length return Some(IpAddr::from(*ip_bytes)); } } - if let Some(b) = self.ipv6 { + if let Some(ref b) = self.ipv6 { let b = b.as_slice(); if b.len() == 16 { let ip_bytes: &[u8; 16] = b[0..16].try_into().unwrap(); // Safe to unwrap as we check slice length From a31e8344b28de01df8de6605e131d5007a339522 Mon Sep 17 00:00:00 2001 From: Ivan Date: Thu, 26 Sep 2024 19:19:56 +0200 Subject: [PATCH 4/8] Format - delete trailing spaces --- crates/librqbit/src/session.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index c6803f1..8c2b749 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -895,7 +895,7 @@ impl Session { if let Some(custom_trackers) = opts.trackers.clone() { trackers.extend(custom_trackers); } - trackers + trackers }, announce_port, opts.force_tracker_interval, @@ -989,7 +989,7 @@ impl Session { if let Some(custom_trackers) = opts.trackers.clone() { trackers.extend(custom_trackers); } - + let peer_rx = if paused { None } else { From cd7349121c091d6c3f9b0fd07a966a7bc36a1b58 Mon Sep 17 00:00:00 2001 From: Ivan Date: Thu, 26 Sep 2024 21:30:25 +0200 Subject: [PATCH 5/8] Queuing only peers that will be retried --- crates/librqbit/src/torrent_state/live/mod.rs | 23 +++++++------- .../src/torrent_state/live/peer/mod.rs | 30 +++++++++++++++++++ 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index f58357c..fced756 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -784,18 +784,17 @@ impl TorrentStateLive { } pub(crate) fn reconnect_all_not_needed_peers(&self) { - for mut pe in self.peers.states.iter_mut() { - if pe.state.not_needed_to_queued(&self.peer_stats()) { - let retry_addr = match pe.value().outgoing_address { - peer::OutgoingAddressType::Default => *pe.key(), - peer::OutgoingAddressType::None => continue, - peer::OutgoingAddressType::Known(socket_addr) => socket_addr, - }; - if self.peer_queue_tx.send(retry_addr).is_err() { - return; - } - } - } + self.peers + .states + .iter_mut() + .filter_map(|mut p| { + let known_addr = *p.key(); + p.value_mut() + .reconnect_not_needed_peer(known_addr, &self.peer_stats()) + }) + .map(|socket_addr| self.peer_queue_tx.send(socket_addr)) + .take_while(|r| r.is_ok()) + .last(); } } diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index ac2521b..49b9f6b 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -48,6 +48,36 @@ impl Peer { outgoing_address: OutgoingAddressType::None, } } + + pub(crate) fn reconnect_not_needed_peer( + &mut self, + known_address: SocketAddr, + counters: &[&AggregatePeerStatsAtomic], + ) -> Option { + if let PeerState::NotNeeded = self.state.get() { + match self.outgoing_address { + OutgoingAddressType::Default => { + self.state.set(PeerState::Queued, counters); + Some(known_address) + } + OutgoingAddressType::None => None, + OutgoingAddressType::Known(socket_addr) => { + if known_address == socket_addr { + self.state.set(PeerState::Queued, counters); + } + Some(socket_addr) + }, + } + } else { + None + } + // pe.state.not_needed_to_queued(&self.peer_stats()) { + // let retry_addr = match pe.value().outgoing_address { + // peer::OutgoingAddressType::Default => *pe.key(), + // peer::OutgoingAddressType::None => unreachable!("bug"), // already filtered + // peer::OutgoingAddressType::Known(socket_addr) => socket_addr, + // }; + } } #[derive(Debug, Default)] From 1ce5971f07c36d7c0083036205c309889fc9d529 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sat, 28 Sep 2024 19:36:54 +0200 Subject: [PATCH 6/8] Add debug and remove unneeded --- .../librqbit/src/torrent_state/live/peer/mod.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 49b9f6b..c2ec15c 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -7,6 +7,7 @@ use librqbit_core::hash_id::Id20; use librqbit_core::lengths::ChunkInfo; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tracing::debug; use crate::peer_connection::WriterRequest; use crate::type_aliases::BF; @@ -64,9 +65,14 @@ impl Peer { OutgoingAddressType::Known(socket_addr) => { if known_address == socket_addr { self.state.set(PeerState::Queued, counters); + } else { + debug!( + peer = known_address.to_string(), + "peer will by retried on different address {}", socket_addr + ); } Some(socket_addr) - }, + } } } else { None @@ -174,14 +180,6 @@ impl PeerStateNoMut { } } - pub fn not_needed_to_queued(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> bool { - if let PeerState::NotNeeded = &self.0 { - self.set(PeerState::Queued, counters); - return true; - } - false - } - pub fn incoming_connection( &mut self, peer_id: Id20, From 23bd537dd72a7a71404cb78e776323eb5714a762 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sat, 28 Sep 2024 19:47:48 +0200 Subject: [PATCH 7/8] Remove commented code --- crates/librqbit/src/torrent_state/live/peer/mod.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index c2ec15c..90d32fe 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -77,12 +77,6 @@ impl Peer { } else { None } - // pe.state.not_needed_to_queued(&self.peer_stats()) { - // let retry_addr = match pe.value().outgoing_address { - // peer::OutgoingAddressType::Default => *pe.key(), - // peer::OutgoingAddressType::None => unreachable!("bug"), // already filtered - // peer::OutgoingAddressType::Known(socket_addr) => socket_addr, - // }; } } From 76402184887f2b2b34baa27de395e3cd806a3565 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sun, 29 Sep 2024 22:28:36 +0200 Subject: [PATCH 8/8] Make outgoing addr Option --- crates/librqbit/src/torrent_state/live/mod.rs | 3 +- .../src/torrent_state/live/peer/mod.rs | 30 ++++++++----------- .../src/torrent_state/live/peers/mod.rs | 2 +- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index fced756..95a4681 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -66,7 +66,6 @@ use librqbit_core::{ torrent_metainfo::TorrentMetaV1Info, }; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use peer::OutgoingAddressType; use peer_binary_protocol::{ extended::{ handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, @@ -932,7 +931,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { self.state .peers .with_peer_mut(self.addr, "update outgoing addr", |peer| { - peer.outgoing_address = OutgoingAddressType::Known(outgoing_addr) + peer.outgoing_address = Some(outgoing_addr) }); } } diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 90d32fe..b0895f8 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -18,19 +18,11 @@ pub(crate) type InflightRequest = ChunkInfo; pub(crate) type PeerRx = UnboundedReceiver; pub(crate) type PeerTx = UnboundedSender; -#[derive(Debug, Default)] -pub(crate) enum OutgoingAddressType { - #[default] - Default, - None, - Known(SocketAddr), -} - #[derive(Debug, Default)] pub(crate) struct Peer { pub state: PeerStateNoMut, pub stats: stats::atomic::PeerStats, - pub outgoing_address: OutgoingAddressType, + pub outgoing_address: Option, } impl Peer { @@ -46,7 +38,14 @@ impl Peer { Self { state, stats: Default::default(), - outgoing_address: OutgoingAddressType::None, + outgoing_address: None, + } + } + + pub fn new_with_outgoing_address(addr: SocketAddr) -> Self { + Self { + outgoing_address: Some(addr), + ..Default::default() } } @@ -57,18 +56,15 @@ impl Peer { ) -> Option { if let PeerState::NotNeeded = self.state.get() { match self.outgoing_address { - OutgoingAddressType::Default => { - self.state.set(PeerState::Queued, counters); - Some(known_address) - } - OutgoingAddressType::None => None, - OutgoingAddressType::Known(socket_addr) => { + None => None, + Some(socket_addr) => { if known_address == socket_addr { self.state.set(PeerState::Queued, counters); } else { debug!( peer = known_address.to_string(), - "peer will by retried on different address {}", socket_addr + outgoing_addr = socket_addr.to_string(), + "peer will by retried on different address", ); } Some(socket_addr) diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index 03c6c10..110df73 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -43,7 +43,7 @@ impl PeerStates { match self.states.entry(addr) { Entry::Occupied(_) => None, Entry::Vacant(vac) => { - vac.insert(Default::default()); + vac.insert(Peer::new_with_outgoing_address(addr)); atomic_inc(&self.stats.queued); atomic_inc(&self.session_stats.peers.queued);