Enum pro outgoing addr and some fixes
This commit is contained in:
parent
4bd757173d
commit
aec1eeef34
3 changed files with 28 additions and 17 deletions
|
|
@ -66,6 +66,7 @@ use librqbit_core::{
|
||||||
torrent_metainfo::TorrentMetaV1Info,
|
torrent_metainfo::TorrentMetaV1Info,
|
||||||
};
|
};
|
||||||
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||||
|
use peer::OutgoingAddressType;
|
||||||
use peer_binary_protocol::{
|
use peer_binary_protocol::{
|
||||||
extended::{
|
extended::{
|
||||||
handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage,
|
handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage,
|
||||||
|
|
@ -785,7 +786,11 @@ impl TorrentStateLive {
|
||||||
pub(crate) fn reconnect_all_not_needed_peers(&self) {
|
pub(crate) fn reconnect_all_not_needed_peers(&self) {
|
||||||
for mut pe in self.peers.states.iter_mut() {
|
for mut pe in self.peers.states.iter_mut() {
|
||||||
if pe.state.not_needed_to_queued(&self.peer_stats()) {
|
if pe.state.not_needed_to_queued(&self.peer_stats()) {
|
||||||
let retry_addr = pe.value().outgoing_address.unwrap_or_else(|| *pe.key());
|
let retry_addr = match pe.value().outgoing_address {
|
||||||
|
peer::OutgoingAddressType::Default => *pe.key(),
|
||||||
|
peer::OutgoingAddressType::None => continue,
|
||||||
|
peer::OutgoingAddressType::Known(socket_addr) => socket_addr,
|
||||||
|
};
|
||||||
if self.peer_queue_tx.send(retry_addr).is_err() {
|
if self.peer_queue_tx.send(retry_addr).is_err() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -920,18 +925,16 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
||||||
if let Some(peer_pex_msg_id) = hs.ut_pex() {
|
if let Some(peer_pex_msg_id) = hs.ut_pex() {
|
||||||
trace!("peer supports pex at {peer_pex_msg_id}");
|
trace!("peer supports pex at {peer_pex_msg_id}");
|
||||||
}
|
}
|
||||||
if let Some(port) = hs.p {
|
// Lets update outgoing Socket address for incoming connection
|
||||||
// Lets update outgoing Socket address for incoming connection
|
if self.incoming {
|
||||||
if self.incoming {
|
if let Some(port) = hs.port() {
|
||||||
if let Ok(port) = hs.port() {
|
let peer_ip = hs.ip_addr().unwrap_or(self.addr.ip());
|
||||||
let peer_ip = hs.ip_addr().unwrap_or(self.addr.ip());
|
let outgoing_addr = SocketAddr::new(peer_ip, port);
|
||||||
let outgoing_addr = SocketAddr::new(peer_ip, port);
|
self.state
|
||||||
self.state
|
.peers
|
||||||
.peers
|
.with_peer_mut(self.addr, "update outgoing addr", |peer| {
|
||||||
.with_peer_mut(self.addr, "update outgoing addr", |peer| {
|
peer.outgoing_address = OutgoingAddressType::Known(outgoing_addr)
|
||||||
peer.outgoing_address = Some(outgoing_addr)
|
});
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
|
|
@ -17,11 +17,19 @@ pub(crate) type InflightRequest = ChunkInfo;
|
||||||
pub(crate) type PeerRx = UnboundedReceiver<WriterRequest>;
|
pub(crate) type PeerRx = UnboundedReceiver<WriterRequest>;
|
||||||
pub(crate) type PeerTx = UnboundedSender<WriterRequest>;
|
pub(crate) type PeerTx = UnboundedSender<WriterRequest>;
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub(crate) enum OutgoingAddressType {
|
||||||
|
#[default]
|
||||||
|
Default,
|
||||||
|
None,
|
||||||
|
Known(SocketAddr),
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub(crate) struct Peer {
|
pub(crate) struct Peer {
|
||||||
pub state: PeerStateNoMut,
|
pub state: PeerStateNoMut,
|
||||||
pub stats: stats::atomic::PeerStats,
|
pub stats: stats::atomic::PeerStats,
|
||||||
pub outgoing_address: Option<SocketAddr>,
|
pub outgoing_address: OutgoingAddressType,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Peer {
|
impl Peer {
|
||||||
|
|
@ -37,7 +45,7 @@ impl Peer {
|
||||||
Self {
|
Self {
|
||||||
state,
|
state,
|
||||||
stats: Default::default(),
|
stats: Default::default(),
|
||||||
outgoing_address: None,
|
outgoing_address: OutgoingAddressType::None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -71,14 +71,14 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn ip_addr(&self) -> Option<IpAddr> {
|
pub fn ip_addr(&self) -> Option<IpAddr> {
|
||||||
if let Some(b) = self.ipv4 {
|
if let Some(ref b) = self.ipv4 {
|
||||||
let b = b.as_slice();
|
let b = b.as_slice();
|
||||||
if b.len() == 4 {
|
if b.len() == 4 {
|
||||||
let ip_bytes: &[u8; 4] = b[0..4].try_into().unwrap(); // Safe to unwrap as we check slice length
|
let ip_bytes: &[u8; 4] = b[0..4].try_into().unwrap(); // Safe to unwrap as we check slice length
|
||||||
return Some(IpAddr::from(*ip_bytes));
|
return Some(IpAddr::from(*ip_bytes));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(b) = self.ipv6 {
|
if let Some(ref b) = self.ipv6 {
|
||||||
let b = b.as_slice();
|
let b = b.as_slice();
|
||||||
if b.len() == 16 {
|
if b.len() == 16 {
|
||||||
let ip_bytes: &[u8; 16] = b[0..16].try_into().unwrap(); // Safe to unwrap as we check slice length
|
let ip_bytes: &[u8; 16] = b[0..16].try_into().unwrap(); // Safe to unwrap as we check slice length
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue