diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 975844e..5dbfa7e 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -369,13 +369,12 @@ impl TorrentStateLive { let counters = match self.peers.states.entry(checked_peer.addr) { Entry::Occupied(mut occ) => { let peer = occ.get_mut(); - peer.state - .incoming_connection( - Id20::new(checked_peer.handshake.peer_id), - tx.clone(), - &self.peers, - ) - .context("peer already existed")?; + peer.incoming_connection( + Id20::new(checked_peer.handshake.peer_id), + tx.clone(), + &self.peers, + ) + .context("peer already existed")?; peer.stats.counters.clone() } Entry::Vacant(vac) => { @@ -616,8 +615,7 @@ impl TorrentStateLive { fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { self.peers.with_peer_mut(handle, "set_peer_live", |p| { - p.state - .connecting_to_live(Id20::new(h.peer_id), &self.peers); + p.connecting_to_live(Id20::new(h.peer_id), &self.peers); }); } @@ -674,7 +672,7 @@ impl TorrentStateLive { .peers .states .iter() - .filter(|e| filter.state.matches(e.value().state.get_state())) + .filter(|e| filter.state.matches(e.value().get_state())) .map(|e| (e.key().to_string(), e.value().into())) .collect(), } @@ -809,9 +807,9 @@ impl TorrentStateLive { fn disconnect_all_peers_that_have_full_torrent(&self) { for mut pe in self.peers.states.iter_mut() { - if let PeerState::Live(l) = pe.value().state.get_state() { + if let PeerState::Live(l) = pe.value().get_state() { if l.has_full_torrent(self.lengths.total_pieces() as usize) { - let prev = pe.value_mut().state.set_not_needed(&self.peers); + let prev = pe.value_mut().set_not_needed(&self.peers); let _ = prev .take_live_no_counters() .unwrap() @@ -874,7 +872,7 @@ impl TorrentStateLive { .load(Ordering::Relaxed) > 0; - let is_live = has_outgoing_connections && ps.value().state.is_live(); + let is_live = has_outgoing_connections && ps.value().is_live(); if is_live { live_peers.insert(addr); } else { @@ -1122,7 +1120,7 @@ impl PeerHandler { return Ok(()); } }; - let prev = pe.value_mut().state.take_state(pstats); + let prev = pe.value_mut().take_state(pstats); match prev { PeerState::Connecting(_) => {} @@ -1141,7 +1139,7 @@ impl PeerHandler { } PeerState::NotNeeded => { // Restore it as std::mem::take() replaced it above. - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().set_state(PeerState::NotNeeded, pstats); return Ok(()); } s @ PeerState::Queued | s @ PeerState::Dead => { @@ -1157,7 +1155,7 @@ impl PeerHandler { Some(e) => e, None => { trace!("peer died without errors, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().set_state(PeerState::NotNeeded, pstats); return Ok(()); } }; @@ -1166,11 +1164,11 @@ impl PeerHandler { if self.state.is_finished_and_no_active_streams() { debug!("torrent finished, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().set_state(PeerState::NotNeeded, pstats); return Ok(()); } - pe.value_mut().state.set(PeerState::Dead, pstats); + pe.value_mut().set_state(PeerState::Dead, pstats); if self.incoming { // do not retry incoming peers @@ -1201,9 +1199,9 @@ impl PeerHandler { self.state .peers .with_peer_mut(handle, "dead_to_queued", |peer| { - match peer.state.get_state() { + match peer.get_state() { PeerState::Dead => { - peer.state.set(PeerState::Queued, &self.state.peers) + peer.set_state(PeerState::Queued, &self.state.peers) } other => bail!( "peer is in unexpected state: {}. Expected dead", diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 6c35abb..fa4a870 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -21,7 +21,7 @@ pub(crate) type PeerTx = UnboundedSender; #[derive(Debug)] pub(crate) struct Peer { pub addr: SocketAddr, - pub state: PeerStateNoMut, + state: PeerStateNoMut, pub stats: stats::atomic::PeerStats, pub outgoing_address: Option, } @@ -59,12 +59,12 @@ impl Peer { known_address: SocketAddr, counters: &PeerStates, ) -> Option { - if let PeerState::NotNeeded = self.state.get_state() { + if let PeerState::NotNeeded = self.get_state() { match self.outgoing_address { None => None, Some(socket_addr) => { if known_address == socket_addr { - self.state.set(PeerState::Queued, counters); + self.set_state(PeerState::Queued, counters); } else { debug!( peer = known_address.to_string(), @@ -124,52 +124,52 @@ impl PeerState { #[derive(Debug, Default)] pub(crate) struct PeerStateNoMut(PeerState); -impl PeerStateNoMut { +impl Peer { pub fn get_state(&self) -> &PeerState { - &self.0 + &self.state.0 } pub fn take_state(&mut self, counters: &PeerStates) -> PeerState { - self.set(Default::default(), counters) + self.set_state(Default::default(), counters) } pub fn destroy(self, counters: &PeerStates) { for counter in [&counters.session_stats.peers, &counters.stats] { - counter.dec(&self.0); + counter.dec(&self.state.0); } } - pub fn set(&mut self, new: PeerState, counters: &PeerStates) -> PeerState { + pub fn set_state(&mut self, new: PeerState, counters: &PeerStates) -> PeerState { for counter in [&counters.session_stats.peers, &counters.stats] { - counter.incdec(&self.0, &new); + counter.incdec(&self.state.0, &new); } - std::mem::replace(&mut self.0, new) + std::mem::replace(&mut self.state.0, new) } pub fn get_live(&self) -> Option<&LivePeerState> { - match &self.0 { + match &self.state.0 { PeerState::Live(l) => Some(l), _ => None, } } pub fn is_live(&self) -> bool { - matches!(&self.0, PeerState::Live(_)) + matches!(&self.state.0, PeerState::Live(_)) } pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { - match &mut self.0 { + match &mut self.state.0 { PeerState::Live(l) => Some(l), _ => None, } } pub fn idle_to_connecting(&mut self, counters: &PeerStates) -> Option<(PeerRx, PeerTx)> { - match &self.0 { + match &self.state.0 { PeerState::Queued | PeerState::NotNeeded => { let (tx, rx) = unbounded_channel(); let tx_2 = tx.clone(); - self.set(PeerState::Connecting(tx), counters); + self.set_state(PeerState::Connecting(tx), counters); Some((rx, tx_2)) } _ => None, @@ -182,12 +182,15 @@ impl PeerStateNoMut { tx: PeerTx, counters: &PeerStates, ) -> anyhow::Result<()> { - if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { + if matches!( + &self.state.0, + PeerState::Connecting(..) | PeerState::Live(..) + ) { anyhow::bail!("peer already active"); } match self.take_state(counters) { PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => { - self.set( + self.set_state( PeerState::Live(LivePeerState::new(peer_id, tx, true)), counters, ); @@ -202,12 +205,12 @@ impl PeerStateNoMut { peer_id: Id20, counters: &PeerStates, ) -> Option<&mut LivePeerState> { - if let PeerState::Connecting(_) = &self.0 { + if let PeerState::Connecting(_) = &self.state.0 { let tx = match self.take_state(counters) { PeerState::Connecting(tx) => tx, _ => unreachable!(), }; - self.set( + self.set_state( PeerState::Live(LivePeerState::new(peer_id, tx, false)), counters, ); @@ -218,7 +221,7 @@ impl PeerStateNoMut { } pub fn set_not_needed(&mut self, counters: &PeerStates) -> PeerState { - self.set(PeerState::NotNeeded, counters) + self.set_state(PeerState::NotNeeded, counters) } } diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs index 602ef5d..4d6e2b3 100644 --- a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs +++ b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs @@ -51,7 +51,7 @@ impl From<&Peer> for PeerStats { fn from(peer: &Peer) -> Self { Self { counters: peer.stats.counters.as_ref().into(), - state: peer.state.get_state().name(), + state: peer.get_state().name(), } } } diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index 602d59f..4311273 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -30,7 +30,7 @@ pub(crate) struct PeerStates { impl Drop for PeerStates { fn drop(&mut self) { for (_, p) in std::mem::take(&mut self.states).into_iter() { - p.state.destroy(self); + p.destroy(self); } } } @@ -71,7 +71,7 @@ impl PeerStates { } pub fn with_live(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option { - self.with_peer(addr, |peer| peer.state.get_live().map(f)) + self.with_peer(addr, |peer| peer.get_live().map(f)) .flatten() } @@ -81,13 +81,13 @@ impl PeerStates { reason: &'static str, f: impl FnOnce(&mut LivePeerState) -> R, ) -> Option { - self.with_peer_mut(addr, reason, |peer| peer.state.get_live_mut().map(f)) + self.with_peer_mut(addr, reason, |peer| peer.get_live_mut().map(f)) .flatten() } pub fn drop_peer(&self, handle: PeerHandle) -> Option { let p = self.states.remove(&handle).map(|r| r.1)?; - let s = p.state.get_state(); + let s = p.get_state(); self.stats.dec(s); self.session_stats.peers.dec(s); @@ -116,9 +116,7 @@ impl PeerStates { pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<(PeerRx, PeerTx)> { let rx = self .with_peer_mut(h, "mark_peer_connecting", |peer| { - peer.state - .idle_to_connecting(self) - .context("invalid peer state") + peer.idle_to_connecting(self).context("invalid peer state") }) .context("peer not found in states")??; Ok(rx) @@ -132,7 +130,7 @@ impl PeerStates { pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { - peer.state.set_not_needed(self) + peer.set_not_needed(self) })?; Some(prev) }