Only successful outgoing connections marked

This commit is contained in:
Igor Katson 2024-12-04 10:36:22 +00:00
parent ac775affef
commit fb760b282e
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 44 additions and 59 deletions

View file

@ -257,7 +257,7 @@ impl TorrentStateLive {
session_stats: session_stats.clone(), session_stats: session_stats.clone(),
stats: Default::default(), stats: Default::default(),
states: Default::default(), states: Default::default(),
live_peers: Default::default(), live_outgoing_peers: Default::default(),
}, },
locked: RwLock::new(TorrentStateLocked { locked: RwLock::new(TorrentStateLocked {
chunks: Some(paused.chunk_tracker), chunks: Some(paused.chunk_tracker),
@ -841,7 +841,6 @@ impl TorrentStateLive {
// As per BEP 11 recommended interval is min 60 seconds // As per BEP 11 recommended interval is min 60 seconds
const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60);
let mut live_peers = HashSet::new();
let mut connected = Vec::with_capacity(MAX_SENT_PEERS); let mut connected = Vec::with_capacity(MAX_SENT_PEERS);
let mut dropped = Vec::with_capacity(MAX_SENT_PEERS); let mut dropped = Vec::with_capacity(MAX_SENT_PEERS);
let mut peer_view_of_live_peers = HashSet::new(); let mut peer_view_of_live_peers = HashSet::new();
@ -854,44 +853,25 @@ impl TorrentStateLive {
loop { loop {
interval.tick().await; interval.tick().await;
// TODO: store them in a shared place {
// Fill in live_peers let live_peers = self.peers.live_outgoing_peers.read();
for ps in self.peers.states.iter() { connected.clear();
let peer = ps.value(); dropped.clear();
let addr = *peer.outgoing_address.as_ref().unwrap_or_else(|| ps.key());
// As per BEP 11 share only those we were able to connect connected.extend(
let has_outgoing_connections = peer live_peers
.stats .difference(&peer_view_of_live_peers)
.counters .take(MAX_SENT_PEERS)
.outgoing_connections .copied(),
.load(Ordering::Relaxed) );
> 0; dropped.extend(
peer_view_of_live_peers
let is_live = has_outgoing_connections && ps.value().is_live(); .difference(&live_peers)
if is_live { .take(MAX_SENT_PEERS)
live_peers.insert(addr); .copied(),
} else { );
live_peers.remove(&addr);
}
} }
connected.clear();
dropped.clear();
connected.extend(
live_peers
.difference(&peer_view_of_live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);
dropped.extend(
peer_view_of_live_peers
.difference(&live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);
// BEP 11 - Dont send closed if they are now in live // BEP 11 - Dont send closed if they are now in live
// it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent,
// and addrs_closed_to_sent are only filtered addresses from sent_peers_live // and addrs_closed_to_sent are only filtered addresses from sent_peers_live
@ -1104,10 +1084,6 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
impl PeerHandler { impl PeerHandler {
fn on_peer_died(self, error: Option<anyhow::Error>) -> anyhow::Result<()> { fn on_peer_died(self, error: Option<anyhow::Error>) -> anyhow::Result<()> {
let peers = &self.state.peers; let peers = &self.state.peers;
let pstats = {
let this = &self.state;
&this.peers
};
let handle = self.addr; let handle = self.addr;
let mut pe = match peers.states.get_mut(&handle) { let mut pe = match peers.states.get_mut(&handle) {
Some(peer) => TimedExistence::new(peer, "on_peer_died"), Some(peer) => TimedExistence::new(peer, "on_peer_died"),
@ -1116,7 +1092,7 @@ impl PeerHandler {
return Ok(()); return Ok(());
} }
}; };
let prev = pe.value_mut().take_state(pstats); let prev = pe.value_mut().take_state(peers);
match prev { match prev {
PeerState::Connecting(_) => {} PeerState::Connecting(_) => {}
@ -1135,7 +1111,7 @@ impl PeerHandler {
} }
PeerState::NotNeeded => { PeerState::NotNeeded => {
// Restore it as std::mem::take() replaced it above. // Restore it as std::mem::take() replaced it above.
pe.value_mut().set_state(PeerState::NotNeeded, pstats); pe.value_mut().set_state(PeerState::NotNeeded, peers);
return Ok(()); return Ok(());
} }
s @ PeerState::Queued | s @ PeerState::Dead => { s @ PeerState::Queued | s @ PeerState::Dead => {
@ -1151,7 +1127,7 @@ impl PeerHandler {
Some(e) => e, Some(e) => e,
None => { None => {
trace!("peer died without errors, not re-queueing"); trace!("peer died without errors, not re-queueing");
pe.value_mut().set_state(PeerState::NotNeeded, pstats); pe.value_mut().set_state(PeerState::NotNeeded, peers);
return Ok(()); return Ok(());
} }
}; };
@ -1160,11 +1136,11 @@ impl PeerHandler {
if self.state.is_finished_and_no_active_streams() { if self.state.is_finished_and_no_active_streams() {
debug!("torrent finished, not re-queueing"); debug!("torrent finished, not re-queueing");
pe.value_mut().set_state(PeerState::NotNeeded, pstats); pe.value_mut().set_state(PeerState::NotNeeded, peers);
return Ok(()); return Ok(());
} }
pe.value_mut().set_state(PeerState::Dead, pstats); pe.value_mut().set_state(PeerState::Dead, peers);
if self.incoming { if self.incoming {
// do not retry incoming peers // do not retry incoming peers

View file

@ -2,6 +2,7 @@ pub mod stats;
use std::collections::HashSet; use std::collections::HashSet;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use librqbit_core::hash_id::Id20; use librqbit_core::hash_id::Id20;
use librqbit_core::lengths::ChunkInfo; use librqbit_core::lengths::ChunkInfo;
@ -133,8 +134,8 @@ impl Peer {
for counter in [&counters.session_stats.peers, &counters.stats] { for counter in [&counters.session_stats.peers, &counters.stats] {
counter.dec(&self.state); counter.dec(&self.state);
} }
if matches!(&self.state, PeerState::Live(..)) { if let (Some(addr), PeerState::Live(..)) = (self.outgoing_address, &self.state) {
counters.live_peers.write().retain(|a| *a != self.addr); counters.live_outgoing_peers.write().remove(&addr);
} }
} }
@ -142,12 +143,22 @@ impl Peer {
for counter in [&counters.session_stats.peers, &counters.stats] { for counter in [&counters.session_stats.peers, &counters.stats] {
counter.incdec(&self.state, &new); counter.incdec(&self.state, &new);
} }
if matches!(&self.state, PeerState::Live(..)) { if let Some(addr) = self.outgoing_address {
counters.live_peers.write().retain(|a| *a != self.addr); if matches!(&self.state, PeerState::Live(..)) {
} counters.live_outgoing_peers.write().remove(&addr);
if matches!(&new, PeerState::Live(..)) { }
counters.live_peers.write().push(self.addr); if matches!(&new, PeerState::Live(..))
&& self
.stats
.counters
.outgoing_connections
.load(Ordering::Relaxed)
> 0
{
counters.live_outgoing_peers.write().insert(addr);
}
} }
std::mem::replace(&mut self.state, new) std::mem::replace(&mut self.state, new)
} }
@ -158,10 +169,6 @@ impl Peer {
} }
} }
pub fn is_live(&self) -> bool {
matches!(&self.state, PeerState::Live(_))
}
pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> {
match &mut self.state { match &mut self.state {
PeerState::Live(l) => Some(l), PeerState::Live(l) => Some(l),

View file

@ -1,4 +1,4 @@
use std::{net::SocketAddr, sync::Arc}; use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use anyhow::Context; use anyhow::Context;
use backoff::backoff::Backoff; use backoff::backoff::Backoff;
@ -22,7 +22,9 @@ pub mod stats;
pub(crate) struct PeerStates { pub(crate) struct PeerStates {
pub session_stats: Arc<AtomicSessionStats>, pub session_stats: Arc<AtomicSessionStats>,
pub live_peers: RwLock<Vec<PeerHandle>>,
// This keeps track of live addresses we connected to, for PEX.
pub live_outgoing_peers: RwLock<HashSet<PeerHandle>>,
pub stats: AggregatePeerStatsAtomic, pub stats: AggregatePeerStatsAtomic,
pub states: DashMap<PeerHandle, Peer>, pub states: DashMap<PeerHandle, Peer>,
} }