Move all methods to Peer, not PeerStateNoMut

This commit is contained in:
Igor Katson 2024-12-04 10:12:04 +00:00
parent 85b65dcef5
commit 956e6d4f08
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 48 additions and 49 deletions

View file

@ -369,13 +369,12 @@ impl TorrentStateLive {
let counters = match self.peers.states.entry(checked_peer.addr) { let counters = match self.peers.states.entry(checked_peer.addr) {
Entry::Occupied(mut occ) => { Entry::Occupied(mut occ) => {
let peer = occ.get_mut(); let peer = occ.get_mut();
peer.state peer.incoming_connection(
.incoming_connection( Id20::new(checked_peer.handshake.peer_id),
Id20::new(checked_peer.handshake.peer_id), tx.clone(),
tx.clone(), &self.peers,
&self.peers, )
) .context("peer already existed")?;
.context("peer already existed")?;
peer.stats.counters.clone() peer.stats.counters.clone()
} }
Entry::Vacant(vac) => { Entry::Vacant(vac) => {
@ -616,8 +615,7 @@ impl TorrentStateLive {
fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) { fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) {
self.peers.with_peer_mut(handle, "set_peer_live", |p| { self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.state p.connecting_to_live(Id20::new(h.peer_id), &self.peers);
.connecting_to_live(Id20::new(h.peer_id), &self.peers);
}); });
} }
@ -674,7 +672,7 @@ impl TorrentStateLive {
.peers .peers
.states .states
.iter() .iter()
.filter(|e| filter.state.matches(e.value().state.get_state())) .filter(|e| filter.state.matches(e.value().get_state()))
.map(|e| (e.key().to_string(), e.value().into())) .map(|e| (e.key().to_string(), e.value().into()))
.collect(), .collect(),
} }
@ -809,9 +807,9 @@ impl TorrentStateLive {
fn disconnect_all_peers_that_have_full_torrent(&self) { fn disconnect_all_peers_that_have_full_torrent(&self) {
for mut pe in self.peers.states.iter_mut() { for mut pe in self.peers.states.iter_mut() {
if let PeerState::Live(l) = pe.value().state.get_state() { if let PeerState::Live(l) = pe.value().get_state() {
if l.has_full_torrent(self.lengths.total_pieces() as usize) { if l.has_full_torrent(self.lengths.total_pieces() as usize) {
let prev = pe.value_mut().state.set_not_needed(&self.peers); let prev = pe.value_mut().set_not_needed(&self.peers);
let _ = prev let _ = prev
.take_live_no_counters() .take_live_no_counters()
.unwrap() .unwrap()
@ -874,7 +872,7 @@ impl TorrentStateLive {
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
> 0; > 0;
let is_live = has_outgoing_connections && ps.value().state.is_live(); let is_live = has_outgoing_connections && ps.value().is_live();
if is_live { if is_live {
live_peers.insert(addr); live_peers.insert(addr);
} else { } else {
@ -1122,7 +1120,7 @@ impl PeerHandler {
return Ok(()); return Ok(());
} }
}; };
let prev = pe.value_mut().state.take_state(pstats); let prev = pe.value_mut().take_state(pstats);
match prev { match prev {
PeerState::Connecting(_) => {} PeerState::Connecting(_) => {}
@ -1141,7 +1139,7 @@ impl PeerHandler {
} }
PeerState::NotNeeded => { PeerState::NotNeeded => {
// Restore it as std::mem::take() replaced it above. // Restore it as std::mem::take() replaced it above.
pe.value_mut().state.set(PeerState::NotNeeded, pstats); pe.value_mut().set_state(PeerState::NotNeeded, pstats);
return Ok(()); return Ok(());
} }
s @ PeerState::Queued | s @ PeerState::Dead => { s @ PeerState::Queued | s @ PeerState::Dead => {
@ -1157,7 +1155,7 @@ impl PeerHandler {
Some(e) => e, Some(e) => e,
None => { None => {
trace!("peer died without errors, not re-queueing"); trace!("peer died without errors, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats); pe.value_mut().set_state(PeerState::NotNeeded, pstats);
return Ok(()); return Ok(());
} }
}; };
@ -1166,11 +1164,11 @@ impl PeerHandler {
if self.state.is_finished_and_no_active_streams() { if self.state.is_finished_and_no_active_streams() {
debug!("torrent finished, not re-queueing"); debug!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats); pe.value_mut().set_state(PeerState::NotNeeded, pstats);
return Ok(()); return Ok(());
} }
pe.value_mut().state.set(PeerState::Dead, pstats); pe.value_mut().set_state(PeerState::Dead, pstats);
if self.incoming { if self.incoming {
// do not retry incoming peers // do not retry incoming peers
@ -1201,9 +1199,9 @@ impl PeerHandler {
self.state self.state
.peers .peers
.with_peer_mut(handle, "dead_to_queued", |peer| { .with_peer_mut(handle, "dead_to_queued", |peer| {
match peer.state.get_state() { match peer.get_state() {
PeerState::Dead => { PeerState::Dead => {
peer.state.set(PeerState::Queued, &self.state.peers) peer.set_state(PeerState::Queued, &self.state.peers)
} }
other => bail!( other => bail!(
"peer is in unexpected state: {}. Expected dead", "peer is in unexpected state: {}. Expected dead",

View file

@ -21,7 +21,7 @@ pub(crate) type PeerTx = UnboundedSender<WriterRequest>;
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Peer { pub(crate) struct Peer {
pub addr: SocketAddr, pub addr: SocketAddr,
pub state: PeerStateNoMut, state: PeerStateNoMut,
pub stats: stats::atomic::PeerStats, pub stats: stats::atomic::PeerStats,
pub outgoing_address: Option<SocketAddr>, pub outgoing_address: Option<SocketAddr>,
} }
@ -59,12 +59,12 @@ impl Peer {
known_address: SocketAddr, known_address: SocketAddr,
counters: &PeerStates, counters: &PeerStates,
) -> Option<SocketAddr> { ) -> Option<SocketAddr> {
if let PeerState::NotNeeded = self.state.get_state() { if let PeerState::NotNeeded = self.get_state() {
match self.outgoing_address { match self.outgoing_address {
None => None, None => None,
Some(socket_addr) => { Some(socket_addr) => {
if known_address == socket_addr { if known_address == socket_addr {
self.state.set(PeerState::Queued, counters); self.set_state(PeerState::Queued, counters);
} else { } else {
debug!( debug!(
peer = known_address.to_string(), peer = known_address.to_string(),
@ -124,52 +124,52 @@ impl PeerState {
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub(crate) struct PeerStateNoMut(PeerState); pub(crate) struct PeerStateNoMut(PeerState);
impl PeerStateNoMut { impl Peer {
pub fn get_state(&self) -> &PeerState { pub fn get_state(&self) -> &PeerState {
&self.0 &self.state.0
} }
pub fn take_state(&mut self, counters: &PeerStates) -> PeerState { pub fn take_state(&mut self, counters: &PeerStates) -> PeerState {
self.set(Default::default(), counters) self.set_state(Default::default(), counters)
} }
pub fn destroy(self, counters: &PeerStates) { pub fn destroy(self, counters: &PeerStates) {
for counter in [&counters.session_stats.peers, &counters.stats] { for counter in [&counters.session_stats.peers, &counters.stats] {
counter.dec(&self.0); counter.dec(&self.state.0);
} }
} }
pub fn set(&mut self, new: PeerState, counters: &PeerStates) -> PeerState { pub fn set_state(&mut self, new: PeerState, counters: &PeerStates) -> PeerState {
for counter in [&counters.session_stats.peers, &counters.stats] { for counter in [&counters.session_stats.peers, &counters.stats] {
counter.incdec(&self.0, &new); counter.incdec(&self.state.0, &new);
} }
std::mem::replace(&mut self.0, new) std::mem::replace(&mut self.state.0, new)
} }
pub fn get_live(&self) -> Option<&LivePeerState> { pub fn get_live(&self) -> Option<&LivePeerState> {
match &self.0 { match &self.state.0 {
PeerState::Live(l) => Some(l), PeerState::Live(l) => Some(l),
_ => None, _ => None,
} }
} }
pub fn is_live(&self) -> bool { pub fn is_live(&self) -> bool {
matches!(&self.0, PeerState::Live(_)) matches!(&self.state.0, PeerState::Live(_))
} }
pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> {
match &mut self.0 { match &mut self.state.0 {
PeerState::Live(l) => Some(l), PeerState::Live(l) => Some(l),
_ => None, _ => None,
} }
} }
pub fn idle_to_connecting(&mut self, counters: &PeerStates) -> Option<(PeerRx, PeerTx)> { pub fn idle_to_connecting(&mut self, counters: &PeerStates) -> Option<(PeerRx, PeerTx)> {
match &self.0 { match &self.state.0 {
PeerState::Queued | PeerState::NotNeeded => { PeerState::Queued | PeerState::NotNeeded => {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let tx_2 = tx.clone(); let tx_2 = tx.clone();
self.set(PeerState::Connecting(tx), counters); self.set_state(PeerState::Connecting(tx), counters);
Some((rx, tx_2)) Some((rx, tx_2))
} }
_ => None, _ => None,
@ -182,12 +182,15 @@ impl PeerStateNoMut {
tx: PeerTx, tx: PeerTx,
counters: &PeerStates, counters: &PeerStates,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { if matches!(
&self.state.0,
PeerState::Connecting(..) | PeerState::Live(..)
) {
anyhow::bail!("peer already active"); anyhow::bail!("peer already active");
} }
match self.take_state(counters) { match self.take_state(counters) {
PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => { PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => {
self.set( self.set_state(
PeerState::Live(LivePeerState::new(peer_id, tx, true)), PeerState::Live(LivePeerState::new(peer_id, tx, true)),
counters, counters,
); );
@ -202,12 +205,12 @@ impl PeerStateNoMut {
peer_id: Id20, peer_id: Id20,
counters: &PeerStates, counters: &PeerStates,
) -> Option<&mut LivePeerState> { ) -> Option<&mut LivePeerState> {
if let PeerState::Connecting(_) = &self.0 { if let PeerState::Connecting(_) = &self.state.0 {
let tx = match self.take_state(counters) { let tx = match self.take_state(counters) {
PeerState::Connecting(tx) => tx, PeerState::Connecting(tx) => tx,
_ => unreachable!(), _ => unreachable!(),
}; };
self.set( self.set_state(
PeerState::Live(LivePeerState::new(peer_id, tx, false)), PeerState::Live(LivePeerState::new(peer_id, tx, false)),
counters, counters,
); );
@ -218,7 +221,7 @@ impl PeerStateNoMut {
} }
pub fn set_not_needed(&mut self, counters: &PeerStates) -> PeerState { pub fn set_not_needed(&mut self, counters: &PeerStates) -> PeerState {
self.set(PeerState::NotNeeded, counters) self.set_state(PeerState::NotNeeded, counters)
} }
} }

View file

@ -51,7 +51,7 @@ impl From<&Peer> for PeerStats {
fn from(peer: &Peer) -> Self { fn from(peer: &Peer) -> Self {
Self { Self {
counters: peer.stats.counters.as_ref().into(), counters: peer.stats.counters.as_ref().into(),
state: peer.state.get_state().name(), state: peer.get_state().name(),
} }
} }
} }

View file

@ -30,7 +30,7 @@ pub(crate) struct PeerStates {
impl Drop for PeerStates { impl Drop for PeerStates {
fn drop(&mut self) { fn drop(&mut self) {
for (_, p) in std::mem::take(&mut self.states).into_iter() { for (_, p) in std::mem::take(&mut self.states).into_iter() {
p.state.destroy(self); p.destroy(self);
} }
} }
} }
@ -71,7 +71,7 @@ impl PeerStates {
} }
pub fn with_live<R>(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option<R> { pub fn with_live<R>(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option<R> {
self.with_peer(addr, |peer| peer.state.get_live().map(f)) self.with_peer(addr, |peer| peer.get_live().map(f))
.flatten() .flatten()
} }
@ -81,13 +81,13 @@ impl PeerStates {
reason: &'static str, reason: &'static str,
f: impl FnOnce(&mut LivePeerState) -> R, f: impl FnOnce(&mut LivePeerState) -> R,
) -> Option<R> { ) -> Option<R> {
self.with_peer_mut(addr, reason, |peer| peer.state.get_live_mut().map(f)) self.with_peer_mut(addr, reason, |peer| peer.get_live_mut().map(f))
.flatten() .flatten()
} }
pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> { pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> {
let p = self.states.remove(&handle).map(|r| r.1)?; let p = self.states.remove(&handle).map(|r| r.1)?;
let s = p.state.get_state(); let s = p.get_state();
self.stats.dec(s); self.stats.dec(s);
self.session_stats.peers.dec(s); self.session_stats.peers.dec(s);
@ -116,9 +116,7 @@ impl PeerStates {
pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<(PeerRx, PeerTx)> { pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<(PeerRx, PeerTx)> {
let rx = self let rx = self
.with_peer_mut(h, "mark_peer_connecting", |peer| { .with_peer_mut(h, "mark_peer_connecting", |peer| {
peer.state peer.idle_to_connecting(self).context("invalid peer state")
.idle_to_connecting(self)
.context("invalid peer state")
}) })
.context("peer not found in states")??; .context("peer not found in states")??;
Ok(rx) Ok(rx)
@ -132,7 +130,7 @@ impl PeerStates {
pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<PeerState> { pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<PeerState> {
let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| {
peer.state.set_not_needed(self) peer.set_not_needed(self)
})?; })?;
Some(prev) Some(prev)
} }