Merge pull request #244 from izderadicka/correct_peer_address

Correct peer outgoing address for incomming peers
This commit is contained in:
Igor Katson 2024-10-01 09:27:14 +01:00 committed by GitHub
commit abe4cf58a5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 85 additions and 18 deletions

View file

@ -895,7 +895,7 @@ impl Session {
if let Some(custom_trackers) = opts.trackers.clone() { if let Some(custom_trackers) = opts.trackers.clone() {
trackers.extend(custom_trackers); trackers.extend(custom_trackers);
} }
trackers trackers
}, },
announce_port, announce_port,
opts.force_tracker_interval, opts.force_tracker_interval,
@ -989,7 +989,7 @@ impl Session {
if let Some(custom_trackers) = opts.trackers.clone() { if let Some(custom_trackers) = opts.trackers.clone() {
trackers.extend(custom_trackers); trackers.extend(custom_trackers);
} }
let peer_rx = if paused { let peer_rx = if paused {
None None
} else { } else {

View file

@ -783,13 +783,17 @@ impl TorrentStateLive {
} }
pub(crate) fn reconnect_all_not_needed_peers(&self) { pub(crate) fn reconnect_all_not_needed_peers(&self) {
for mut pe in self.peers.states.iter_mut() { self.peers
if pe.state.not_needed_to_queued(&self.peer_stats()) .states
&& self.peer_queue_tx.send(*pe.key()).is_err() .iter_mut()
{ .filter_map(|mut p| {
return; let known_addr = *p.key();
} p.value_mut()
} .reconnect_not_needed_peer(known_addr, &self.peer_stats())
})
.map(|socket_addr| self.peer_queue_tx.send(socket_addr))
.take_while(|r| r.is_ok())
.last();
} }
} }
@ -919,6 +923,18 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
if let Some(peer_pex_msg_id) = hs.ut_pex() { if let Some(peer_pex_msg_id) = hs.ut_pex() {
trace!("peer supports pex at {peer_pex_msg_id}"); trace!("peer supports pex at {peer_pex_msg_id}");
} }
// Lets update outgoing Socket address for incoming connection
if self.incoming {
if let Some(port) = hs.port() {
let peer_ip = hs.ip_addr().unwrap_or(self.addr.ip());
let outgoing_addr = SocketAddr::new(peer_ip, port);
self.state
.peers
.with_peer_mut(self.addr, "update outgoing addr", |peer| {
peer.outgoing_address = Some(outgoing_addr)
});
}
}
Ok(()) Ok(())
} }

View file

@ -1,11 +1,13 @@
pub mod stats; pub mod stats;
use std::collections::HashSet; use std::collections::HashSet;
use std::net::SocketAddr;
use librqbit_core::hash_id::Id20; use librqbit_core::hash_id::Id20;
use librqbit_core::lengths::ChunkInfo; use librqbit_core::lengths::ChunkInfo;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tracing::debug;
use crate::peer_connection::WriterRequest; use crate::peer_connection::WriterRequest;
use crate::type_aliases::BF; use crate::type_aliases::BF;
@ -20,6 +22,7 @@ pub(crate) type PeerTx = UnboundedSender<WriterRequest>;
pub(crate) struct Peer { pub(crate) struct Peer {
pub state: PeerStateNoMut, pub state: PeerStateNoMut,
pub stats: stats::atomic::PeerStats, pub stats: stats::atomic::PeerStats,
pub outgoing_address: Option<SocketAddr>,
} }
impl Peer { impl Peer {
@ -35,6 +38,40 @@ impl Peer {
Self { Self {
state, state,
stats: Default::default(), stats: Default::default(),
outgoing_address: None,
}
}
pub fn new_with_outgoing_address(addr: SocketAddr) -> Self {
Self {
outgoing_address: Some(addr),
..Default::default()
}
}
pub(crate) fn reconnect_not_needed_peer(
&mut self,
known_address: SocketAddr,
counters: &[&AggregatePeerStatsAtomic],
) -> Option<SocketAddr> {
if let PeerState::NotNeeded = self.state.get() {
match self.outgoing_address {
None => None,
Some(socket_addr) => {
if known_address == socket_addr {
self.state.set(PeerState::Queued, counters);
} else {
debug!(
peer = known_address.to_string(),
outgoing_addr = socket_addr.to_string(),
"peer will by retried on different address",
);
}
Some(socket_addr)
}
}
} else {
None
} }
} }
} }
@ -133,14 +170,6 @@ impl PeerStateNoMut {
} }
} }
pub fn not_needed_to_queued(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> bool {
if let PeerState::NotNeeded = &self.0 {
self.set(PeerState::Queued, counters);
return true;
}
false
}
pub fn incoming_connection( pub fn incoming_connection(
&mut self, &mut self,
peer_id: Id20, peer_id: Id20,

View file

@ -43,7 +43,7 @@ impl PeerStates {
match self.states.entry(addr) { match self.states.entry(addr) {
Entry::Occupied(_) => None, Entry::Occupied(_) => None,
Entry::Vacant(vac) => { Entry::Vacant(vac) => {
vac.insert(Default::default()); vac.insert(Peer::new_with_outgoing_address(addr));
atomic_inc(&self.stats.queued); atomic_inc(&self.stats.queued);
atomic_inc(&self.session_stats.peers.queued); atomic_inc(&self.session_stats.peers.queued);

View file

@ -69,6 +69,28 @@ where
ut_pex: self.ut_pex(), ut_pex: self.ut_pex(),
} }
} }
pub fn ip_addr(&self) -> Option<IpAddr> {
if let Some(ref b) = self.ipv4 {
let b = b.as_slice();
if b.len() == 4 {
let ip_bytes: &[u8; 4] = b[0..4].try_into().unwrap(); // Safe to unwrap as we check slice length
return Some(IpAddr::from(*ip_bytes));
}
}
if let Some(ref b) = self.ipv6 {
let b = b.as_slice();
if b.len() == 16 {
let ip_bytes: &[u8; 16] = b[0..16].try_into().unwrap(); // Safe to unwrap as we check slice length
return Some(IpAddr::from(*ip_bytes));
}
}
None
}
pub fn port(&self) -> Option<u16> {
self.p.and_then(|p| u16::try_from(p).ok())
}
} }
impl<ByteBuf> CloneToOwned for ExtendedHandshake<ByteBuf> impl<ByteBuf> CloneToOwned for ExtendedHandshake<ByteBuf>