diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index c6803f1..8c2b749 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -895,7 +895,7 @@ impl Session { if let Some(custom_trackers) = opts.trackers.clone() { trackers.extend(custom_trackers); } - trackers + trackers }, announce_port, opts.force_tracker_interval, @@ -989,7 +989,7 @@ impl Session { if let Some(custom_trackers) = opts.trackers.clone() { trackers.extend(custom_trackers); } - + let peer_rx = if paused { None } else { diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 5cd70ad..95a4681 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -783,13 +783,17 @@ impl TorrentStateLive { } pub(crate) fn reconnect_all_not_needed_peers(&self) { - for mut pe in self.peers.states.iter_mut() { - if pe.state.not_needed_to_queued(&self.peer_stats()) - && self.peer_queue_tx.send(*pe.key()).is_err() - { - return; - } - } + self.peers + .states + .iter_mut() + .filter_map(|mut p| { + let known_addr = *p.key(); + p.value_mut() + .reconnect_not_needed_peer(known_addr, &self.peer_stats()) + }) + .map(|socket_addr| self.peer_queue_tx.send(socket_addr)) + .take_while(|r| r.is_ok()) + .last(); } } @@ -919,6 +923,18 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { if let Some(peer_pex_msg_id) = hs.ut_pex() { trace!("peer supports pex at {peer_pex_msg_id}"); } + // Lets update outgoing Socket address for incoming connection + if self.incoming { + if let Some(port) = hs.port() { + let peer_ip = hs.ip_addr().unwrap_or(self.addr.ip()); + let outgoing_addr = SocketAddr::new(peer_ip, port); + self.state + .peers + .with_peer_mut(self.addr, "update outgoing addr", |peer| { + peer.outgoing_address = Some(outgoing_addr) + }); + } + } Ok(()) } diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 9efd6b0..b0895f8 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -1,11 +1,13 @@ pub mod stats; use std::collections::HashSet; +use std::net::SocketAddr; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::ChunkInfo; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tracing::debug; use crate::peer_connection::WriterRequest; use crate::type_aliases::BF; @@ -20,6 +22,7 @@ pub(crate) type PeerTx = UnboundedSender; pub(crate) struct Peer { pub state: PeerStateNoMut, pub stats: stats::atomic::PeerStats, + pub outgoing_address: Option, } impl Peer { @@ -35,6 +38,40 @@ impl Peer { Self { state, stats: Default::default(), + outgoing_address: None, + } + } + + pub fn new_with_outgoing_address(addr: SocketAddr) -> Self { + Self { + outgoing_address: Some(addr), + ..Default::default() + } + } + + pub(crate) fn reconnect_not_needed_peer( + &mut self, + known_address: SocketAddr, + counters: &[&AggregatePeerStatsAtomic], + ) -> Option { + if let PeerState::NotNeeded = self.state.get() { + match self.outgoing_address { + None => None, + Some(socket_addr) => { + if known_address == socket_addr { + self.state.set(PeerState::Queued, counters); + } else { + debug!( + peer = known_address.to_string(), + outgoing_addr = socket_addr.to_string(), + "peer will by retried on different address", + ); + } + Some(socket_addr) + } + } + } else { + None } } } @@ -133,14 +170,6 @@ impl PeerStateNoMut { } } - pub fn not_needed_to_queued(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> bool { - if let PeerState::NotNeeded = &self.0 { - self.set(PeerState::Queued, counters); - return true; - } - false - } - pub fn incoming_connection( &mut self, peer_id: Id20, diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index 03c6c10..110df73 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -43,7 +43,7 @@ impl PeerStates { match self.states.entry(addr) { Entry::Occupied(_) => None, Entry::Vacant(vac) => { - vac.insert(Default::default()); + vac.insert(Peer::new_with_outgoing_address(addr)); atomic_inc(&self.stats.queued); atomic_inc(&self.session_stats.peers.queued); diff --git a/crates/peer_binary_protocol/src/extended/handshake.rs b/crates/peer_binary_protocol/src/extended/handshake.rs index 721fff8..66a004f 100644 --- a/crates/peer_binary_protocol/src/extended/handshake.rs +++ b/crates/peer_binary_protocol/src/extended/handshake.rs @@ -69,6 +69,28 @@ where ut_pex: self.ut_pex(), } } + + pub fn ip_addr(&self) -> Option { + if let Some(ref b) = self.ipv4 { + let b = b.as_slice(); + if b.len() == 4 { + let ip_bytes: &[u8; 4] = b[0..4].try_into().unwrap(); // Safe to unwrap as we check slice length + return Some(IpAddr::from(*ip_bytes)); + } + } + if let Some(ref b) = self.ipv6 { + let b = b.as_slice(); + if b.len() == 16 { + let ip_bytes: &[u8; 16] = b[0..16].try_into().unwrap(); // Safe to unwrap as we check slice length + return Some(IpAddr::from(*ip_bytes)); + } + } + None + } + + pub fn port(&self) -> Option { + self.p.and_then(|p| u16::try_from(p).ok()) + } } impl CloneToOwned for ExtendedHandshake