A bit of refactoring to store live_peers

This commit is contained in:
Igor Katson 2024-12-04 10:03:08 +00:00
parent 7a52721af9
commit adc2ca97b3
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 34 additions and 37 deletions

View file

@ -73,7 +73,6 @@ use peer_binary_protocol::{
}, },
Handshake, Message, MessageOwned, Piece, Request, Handshake, Message, MessageOwned, Piece, Request,
}; };
use peers::stats::atomic::AggregatePeerStatsAtomic;
use tokio::sync::{ use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Notify, OwnedSemaphorePermit, Semaphore, Notify, OwnedSemaphorePermit, Semaphore,
@ -258,6 +257,7 @@ impl TorrentStateLive {
session_stats: session_stats.clone(), session_stats: session_stats.clone(),
stats: Default::default(), stats: Default::default(),
states: Default::default(), states: Default::default(),
live_peers: Default::default(),
}, },
locked: RwLock::new(TorrentStateLocked { locked: RwLock::new(TorrentStateLocked {
chunks: Some(paused.chunk_tracker), chunks: Some(paused.chunk_tracker),
@ -337,10 +337,6 @@ impl TorrentStateLive {
spawn_with_cancel(span, self.cancellation_token.clone(), fut); spawn_with_cancel(span, self.cancellation_token.clone(), fut);
} }
fn peer_stats(&self) -> [&AggregatePeerStatsAtomic; 2] {
[&self.peers.stats, &self.peers.session_stats.peers]
}
pub fn down_speed_estimator(&self) -> &SpeedEstimator { pub fn down_speed_estimator(&self) -> &SpeedEstimator {
&self.down_speed_estimator &self.down_speed_estimator
} }
@ -377,7 +373,7 @@ impl TorrentStateLive {
.incoming_connection( .incoming_connection(
Id20::new(checked_peer.handshake.peer_id), Id20::new(checked_peer.handshake.peer_id),
tx.clone(), tx.clone(),
&self.peer_stats(), &self.peers,
) )
.context("peer already existed")?; .context("peer already existed")?;
peer.stats.counters.clone() peer.stats.counters.clone()
@ -387,7 +383,7 @@ impl TorrentStateLive {
let peer = Peer::new_live_for_incoming_connection( let peer = Peer::new_live_for_incoming_connection(
Id20::new(checked_peer.handshake.peer_id), Id20::new(checked_peer.handshake.peer_id),
tx.clone(), tx.clone(),
&self.peer_stats(), &self.peers,
); );
let counters = peer.stats.counters.clone(); let counters = peer.stats.counters.clone();
vac.insert(peer); vac.insert(peer);
@ -620,7 +616,7 @@ impl TorrentStateLive {
fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) { fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) {
self.peers.with_peer_mut(handle, "set_peer_live", |p| { self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.state p.state
.connecting_to_live(Id20::new(h.peer_id), &self.peer_stats()); .connecting_to_live(Id20::new(h.peer_id), &self.peers);
}); });
} }
@ -814,7 +810,7 @@ impl TorrentStateLive {
for mut pe in self.peers.states.iter_mut() { for mut pe in self.peers.states.iter_mut() {
if let PeerState::Live(l) = pe.value().state.get() { if let PeerState::Live(l) = pe.value().state.get() {
if l.has_full_torrent(self.lengths.total_pieces() as usize) { if l.has_full_torrent(self.lengths.total_pieces() as usize) {
let prev = pe.value_mut().state.set_not_needed(&self.peer_stats()); let prev = pe.value_mut().state.set_not_needed(&self.peers);
let _ = prev let _ = prev
.take_live_no_counters() .take_live_no_counters()
.unwrap() .unwrap()
@ -832,7 +828,7 @@ impl TorrentStateLive {
.filter_map(|mut p| { .filter_map(|mut p| {
let known_addr = *p.key(); let known_addr = *p.key();
p.value_mut() p.value_mut()
.reconnect_not_needed_peer(known_addr, &self.peer_stats()) .reconnect_not_needed_peer(known_addr, &self.peers)
}) })
.map(|socket_addr| self.peer_queue_tx.send(socket_addr)) .map(|socket_addr| self.peer_queue_tx.send(socket_addr))
.take_while(|r| r.is_ok()) .take_while(|r| r.is_ok())
@ -1113,7 +1109,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
impl PeerHandler { impl PeerHandler {
fn on_peer_died(self, error: Option<anyhow::Error>) -> anyhow::Result<()> { fn on_peer_died(self, error: Option<anyhow::Error>) -> anyhow::Result<()> {
let peers = &self.state.peers; let peers = &self.state.peers;
let pstats = self.state.peer_stats(); let pstats = {
let this = &self.state;
&this.peers
};
let handle = self.addr; let handle = self.addr;
let mut pe = match peers.states.get_mut(&handle) { let mut pe = match peers.states.get_mut(&handle) {
Some(peer) => TimedExistence::new(peer, "on_peer_died"), Some(peer) => TimedExistence::new(peer, "on_peer_died"),
@ -1122,7 +1121,7 @@ impl PeerHandler {
return Ok(()); return Ok(());
} }
}; };
let prev = pe.value_mut().state.take(&pstats); let prev = pe.value_mut().state.take(pstats);
match prev { match prev {
PeerState::Connecting(_) => {} PeerState::Connecting(_) => {}
@ -1141,7 +1140,7 @@ impl PeerHandler {
} }
PeerState::NotNeeded => { PeerState::NotNeeded => {
// Restore it as std::mem::take() replaced it above. // Restore it as std::mem::take() replaced it above.
pe.value_mut().state.set(PeerState::NotNeeded, &pstats); pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return Ok(()); return Ok(());
} }
s @ PeerState::Queued | s @ PeerState::Dead => { s @ PeerState::Queued | s @ PeerState::Dead => {
@ -1157,7 +1156,7 @@ impl PeerHandler {
Some(e) => e, Some(e) => e,
None => { None => {
trace!("peer died without errors, not re-queueing"); trace!("peer died without errors, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, &pstats); pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return Ok(()); return Ok(());
} }
}; };
@ -1166,11 +1165,11 @@ impl PeerHandler {
if self.state.is_finished_and_no_active_streams() { if self.state.is_finished_and_no_active_streams() {
debug!("torrent finished, not re-queueing"); debug!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, &pstats); pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return Ok(()); return Ok(());
} }
pe.value_mut().state.set(PeerState::Dead, &pstats); pe.value_mut().state.set(PeerState::Dead, pstats);
if self.incoming { if self.incoming {
// do not retry incoming peers // do not retry incoming peers
@ -1203,7 +1202,7 @@ impl PeerHandler {
.with_peer_mut(handle, "dead_to_queued", |peer| { .with_peer_mut(handle, "dead_to_queued", |peer| {
match peer.state.get() { match peer.state.get() {
PeerState::Dead => { PeerState::Dead => {
peer.state.set(PeerState::Queued, &self.state.peer_stats()) peer.state.set(PeerState::Queued, &self.state.peers)
} }
other => bail!( other => bail!(
"peer is in unexpected state: {}. Expected dead", "peer is in unexpected state: {}. Expected dead",

View file

@ -12,7 +12,7 @@ use tracing::debug;
use crate::peer_connection::WriterRequest; use crate::peer_connection::WriterRequest;
use crate::type_aliases::BF; use crate::type_aliases::BF;
use super::peers::stats::atomic::AggregatePeerStatsAtomic; use super::PeerStates;
pub(crate) type InflightRequest = ChunkInfo; pub(crate) type InflightRequest = ChunkInfo;
pub(crate) type PeerRx = UnboundedReceiver<WriterRequest>; pub(crate) type PeerRx = UnboundedReceiver<WriterRequest>;
@ -29,10 +29,10 @@ impl Peer {
pub fn new_live_for_incoming_connection( pub fn new_live_for_incoming_connection(
peer_id: Id20, peer_id: Id20,
tx: PeerTx, tx: PeerTx,
counters: &[&AggregatePeerStatsAtomic], counters: &PeerStates,
) -> Self { ) -> Self {
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true))); let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true)));
for counter in counters { for counter in [&counters.session_stats.peers, &counters.stats] {
counter.inc(&state.0); counter.inc(&state.0);
} }
Self { Self {
@ -52,7 +52,7 @@ impl Peer {
pub(crate) fn reconnect_not_needed_peer( pub(crate) fn reconnect_not_needed_peer(
&mut self, &mut self,
known_address: SocketAddr, known_address: SocketAddr,
counters: &[&AggregatePeerStatsAtomic], counters: &PeerStates,
) -> Option<SocketAddr> { ) -> Option<SocketAddr> {
if let PeerState::NotNeeded = self.state.get() { if let PeerState::NotNeeded = self.state.get() {
match self.outgoing_address { match self.outgoing_address {
@ -124,18 +124,18 @@ impl PeerStateNoMut {
&self.0 &self.0
} }
pub fn take(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { pub fn take(&mut self, counters: &PeerStates) -> PeerState {
self.set(Default::default(), counters) self.set(Default::default(), counters)
} }
pub fn destroy(self, counters: &[&AggregatePeerStatsAtomic]) { pub fn destroy(self, counters: &PeerStates) {
for counter in counters { for counter in [&counters.session_stats.peers, &counters.stats] {
counter.dec(&self.0); counter.dec(&self.0);
} }
} }
pub fn set(&mut self, new: PeerState, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { pub fn set(&mut self, new: PeerState, counters: &PeerStates) -> PeerState {
for counter in counters { for counter in [&counters.session_stats.peers, &counters.stats] {
counter.incdec(&self.0, &new); counter.incdec(&self.0, &new);
} }
std::mem::replace(&mut self.0, new) std::mem::replace(&mut self.0, new)
@ -159,10 +159,7 @@ impl PeerStateNoMut {
} }
} }
pub fn idle_to_connecting( pub fn idle_to_connecting(&mut self, counters: &PeerStates) -> Option<(PeerRx, PeerTx)> {
&mut self,
counters: &[&AggregatePeerStatsAtomic],
) -> Option<(PeerRx, PeerTx)> {
match &self.0 { match &self.0 {
PeerState::Queued | PeerState::NotNeeded => { PeerState::Queued | PeerState::NotNeeded => {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
@ -178,7 +175,7 @@ impl PeerStateNoMut {
&mut self, &mut self,
peer_id: Id20, peer_id: Id20,
tx: PeerTx, tx: PeerTx,
counters: &[&AggregatePeerStatsAtomic], counters: &PeerStates,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) {
anyhow::bail!("peer already active"); anyhow::bail!("peer already active");
@ -198,7 +195,7 @@ impl PeerStateNoMut {
pub fn connecting_to_live( pub fn connecting_to_live(
&mut self, &mut self,
peer_id: Id20, peer_id: Id20,
counters: &[&AggregatePeerStatsAtomic], counters: &PeerStates,
) -> Option<&mut LivePeerState> { ) -> Option<&mut LivePeerState> {
if let PeerState::Connecting(_) = &self.0 { if let PeerState::Connecting(_) = &self.0 {
let tx = match self.take(counters) { let tx = match self.take(counters) {
@ -215,7 +212,7 @@ impl PeerStateNoMut {
} }
} }
pub fn set_not_needed(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { pub fn set_not_needed(&mut self, counters: &PeerStates) -> PeerState {
self.set(PeerState::NotNeeded, counters) self.set(PeerState::NotNeeded, counters)
} }
} }

View file

@ -4,6 +4,7 @@ use anyhow::Context;
use backoff::backoff::Backoff; use backoff::backoff::Backoff;
use dashmap::DashMap; use dashmap::DashMap;
use librqbit_core::lengths::ValidPieceIndex; use librqbit_core::lengths::ValidPieceIndex;
use parking_lot::RwLock;
use peer_binary_protocol::{Message, Request}; use peer_binary_protocol::{Message, Request};
use crate::{ use crate::{
@ -21,6 +22,7 @@ pub mod stats;
pub(crate) struct PeerStates { pub(crate) struct PeerStates {
pub session_stats: Arc<AtomicSessionStats>, pub session_stats: Arc<AtomicSessionStats>,
pub live_peers: RwLock<Vec<PeerHandle>>,
pub stats: AggregatePeerStatsAtomic, pub stats: AggregatePeerStatsAtomic,
pub states: DashMap<PeerHandle, Peer>, pub states: DashMap<PeerHandle, Peer>,
} }
@ -28,7 +30,7 @@ pub(crate) struct PeerStates {
impl Drop for PeerStates { impl Drop for PeerStates {
fn drop(&mut self) { fn drop(&mut self) {
for (_, p) in std::mem::take(&mut self.states).into_iter() { for (_, p) in std::mem::take(&mut self.states).into_iter() {
p.state.destroy(&[&self.session_stats.peers]); p.state.destroy(self);
} }
} }
} }
@ -115,7 +117,7 @@ impl PeerStates {
let rx = self let rx = self
.with_peer_mut(h, "mark_peer_connecting", |peer| { .with_peer_mut(h, "mark_peer_connecting", |peer| {
peer.state peer.state
.idle_to_connecting(&[&self.stats, &self.session_stats.peers]) .idle_to_connecting(self)
.context("invalid peer state") .context("invalid peer state")
}) })
.context("peer not found in states")??; .context("peer not found in states")??;
@ -130,8 +132,7 @@ impl PeerStates {
pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<PeerState> { pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<PeerState> {
let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| {
peer.state peer.state.set_not_needed(self)
.set_not_needed(&[&self.stats, &self.session_stats.peers])
})?; })?;
Some(prev) Some(prev)
} }