diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index fa4a870..f7f2dac 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -21,7 +21,7 @@ pub(crate) type PeerTx = UnboundedSender; #[derive(Debug)] pub(crate) struct Peer { pub addr: SocketAddr, - state: PeerStateNoMut, + state: PeerState, pub stats: stats::atomic::PeerStats, pub outgoing_address: Option, } @@ -33,9 +33,9 @@ impl Peer { tx: PeerTx, counters: &PeerStates, ) -> Self { - let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true))); + let state = PeerState::Live(LivePeerState::new(peer_id, tx, true)); for counter in [&counters.session_stats.peers, &counters.stats] { - counter.inc(&state.0); + counter.inc(&state); } Self { addr, @@ -121,12 +121,9 @@ impl PeerState { } } -#[derive(Debug, Default)] -pub(crate) struct PeerStateNoMut(PeerState); - impl Peer { pub fn get_state(&self) -> &PeerState { - &self.state.0 + &self.state } pub fn take_state(&mut self, counters: &PeerStates) -> PeerState { @@ -135,37 +132,46 @@ impl Peer { pub fn destroy(self, counters: &PeerStates) { for counter in [&counters.session_stats.peers, &counters.stats] { - counter.dec(&self.state.0); + counter.dec(&self.state); + } + if matches!(&self.state, PeerState::Live(..)) { + counters.live_peers.write().retain(|a| *a != self.addr); } } pub fn set_state(&mut self, new: PeerState, counters: &PeerStates) -> PeerState { for counter in [&counters.session_stats.peers, &counters.stats] { - counter.incdec(&self.state.0, &new); + counter.incdec(&self.state, &new); } - std::mem::replace(&mut self.state.0, new) + if matches!(&self.state, PeerState::Live(..)) { + counters.live_peers.write().retain(|a| *a != self.addr); + } + if matches!(&new, PeerState::Live(..)) { + counters.live_peers.write().push(self.addr); + } + std::mem::replace(&mut self.state, new) } pub fn get_live(&self) -> Option<&LivePeerState> { - match &self.state.0 { + match &self.state { PeerState::Live(l) => Some(l), _ => None, } } pub fn is_live(&self) -> bool { - matches!(&self.state.0, PeerState::Live(_)) + matches!(&self.state, PeerState::Live(_)) } pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { - match &mut self.state.0 { + match &mut self.state { PeerState::Live(l) => Some(l), _ => None, } } pub fn idle_to_connecting(&mut self, counters: &PeerStates) -> Option<(PeerRx, PeerTx)> { - match &self.state.0 { + match &self.state { PeerState::Queued | PeerState::NotNeeded => { let (tx, rx) = unbounded_channel(); let tx_2 = tx.clone(); @@ -182,10 +188,7 @@ impl Peer { tx: PeerTx, counters: &PeerStates, ) -> anyhow::Result<()> { - if matches!( - &self.state.0, - PeerState::Connecting(..) | PeerState::Live(..) - ) { + if matches!(&self.state, PeerState::Connecting(..) | PeerState::Live(..)) { anyhow::bail!("peer already active"); } match self.take_state(counters) { @@ -205,7 +208,7 @@ impl Peer { peer_id: Id20, counters: &PeerStates, ) -> Option<&mut LivePeerState> { - if let PeerState::Connecting(_) = &self.state.0 { + if let PeerState::Connecting(_) = &self.state { let tx = match self.take_state(counters) { PeerState::Connecting(tx) => tx, _ => unreachable!(),