WTF is going on with counters

This commit is contained in:
Igor Katson 2023-11-20 00:55:31 +00:00
parent 22ea146ff6
commit aa99872e52
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 181 additions and 103 deletions

View file

@ -1,3 +1,4 @@
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use std::{collections::HashSet, sync::Arc};
@ -5,8 +6,10 @@ use anyhow::Context;
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
use librqbit_core::id20::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
use serde::Serialize;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::{Notify, Semaphore};
use tracing::trace;
use crate::peer_connection::WriterRequest;
use crate::type_aliases::BF;
@ -63,10 +66,61 @@ impl Default for PeerStats {
#[derive(Debug, Default)]
pub struct Peer {
pub state: PeerState,
pub state: PeerStateNoMut,
pub stats: PeerStats,
}
#[derive(Debug, Default, Serialize)]
pub struct AggregatePeerStatsAtomic {
pub queued: AtomicU32,
pub connecting: AtomicU32,
pub live: AtomicU32,
pub seen: AtomicU32,
pub dead: AtomicU32,
pub not_needed: AtomicU32,
}
pub fn atomic_inc(c: &AtomicU32) -> u32 {
c.fetch_add(1, Ordering::Relaxed)
}
pub fn atomic_dec(c: &AtomicU32) -> u32 {
c.fetch_sub(1, Ordering::Relaxed)
}
impl AggregatePeerStatsAtomic {
pub fn counter(&self, state: &PeerState) -> &AtomicU32 {
match state {
PeerState::Connecting(_) => &self.connecting,
PeerState::Live(_) => &self.live,
PeerState::Queued => &self.queued,
PeerState::Dead => &self.dead,
PeerState::NotNeeded => &self.not_needed,
}
}
pub fn inc(&self, state: &PeerState) {
trace!(
"inc, new value = {}, state = {}",
atomic_inc(self.counter(state)),
state
);
}
pub fn dec(&self, state: &PeerState) {
trace!(
"dec, new value = {}, state = {}",
atomic_dec(self.counter(state)),
state
);
}
pub fn incdec(&self, old: &PeerState, new: &PeerState) {
self.dec(old);
self.inc(new);
}
}
#[derive(Debug, Default)]
pub enum PeerState {
#[default]
@ -82,6 +136,12 @@ pub enum PeerState {
NotNeeded,
}
impl std::fmt::Display for PeerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
impl PeerState {
pub fn name(&self) -> &'static str {
match self {
@ -93,70 +153,77 @@ impl PeerState {
}
}
fn take_connecting(&mut self) -> Option<PeerTx> {
if let PeerState::Connecting(_) = self {
match std::mem::take(self) {
PeerState::Connecting(tx) => Some(tx),
_ => unreachable!(),
}
} else {
None
pub fn take_live_no_counters(self) -> Option<LivePeerState> {
match self {
PeerState::Live(l) => Some(l),
_ => None,
}
}
}
pub fn take_live(&mut self) -> Option<LivePeerState> {
if let PeerState::Live(_) = self {
match std::mem::take(self) {
PeerState::Live(l) => Some(l),
_ => unreachable!(),
}
} else {
None
}
#[derive(Debug, Default)]
pub struct PeerStateNoMut(PeerState);
impl PeerStateNoMut {
pub fn get(&self) -> &PeerState {
&self.0
}
pub fn take(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState {
self.set(Default::default(), counters)
}
pub fn set(&mut self, new: PeerState, counters: &AggregatePeerStatsAtomic) -> PeerState {
counters.incdec(&self.0, &new);
std::mem::replace(&mut self.0, new)
}
pub fn get_live(&self) -> Option<&LivePeerState> {
match self {
match &self.0 {
PeerState::Live(l) => Some(l),
_ => None,
}
}
pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> {
match self {
match &mut self.0 {
PeerState::Live(l) => Some(l),
_ => None,
}
}
pub fn queued_to_connecting(&mut self) -> Option<PeerRx> {
if let PeerState::Queued = self {
pub fn queued_to_connecting(&mut self, counters: &AggregatePeerStatsAtomic) -> Option<PeerRx> {
if let PeerState::Queued = &self.0 {
let (tx, rx) = unbounded_channel();
*self = PeerState::Connecting(tx);
self.set(PeerState::Connecting(tx), counters);
Some(rx)
} else {
None
}
}
pub fn connecting_to_live(&mut self, peer_id: Id20) -> Option<&mut LivePeerState> {
let tx = self.take_connecting()?;
*self = PeerState::Live(LivePeerState::new(peer_id, tx));
self.get_live_mut()
}
pub fn to_dead(&mut self) -> Option<Option<LivePeerState>> {
match std::mem::replace(self, PeerState::Dead) {
PeerState::Live(l) => Some(Some(l)),
PeerState::Connecting(_) => Some(None),
_ => None,
pub fn connecting_to_live(
&mut self,
peer_id: Id20,
counters: &AggregatePeerStatsAtomic,
) -> Option<&mut LivePeerState> {
if let PeerState::Connecting(_) = &self.0 {
let tx = match self.take(counters) {
PeerState::Connecting(tx) => tx,
_ => unreachable!(),
};
self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters);
self.get_live_mut()
} else {
None
}
}
pub fn to_not_needed(&mut self) -> Option<LivePeerState> {
match std::mem::replace(self, PeerState::NotNeeded) {
PeerState::Live(l) => Some(l),
_ => None,
}
pub fn to_dead(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState {
self.set(PeerState::Dead, counters)
}
pub fn to_not_needed(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState {
self.set(PeerState::NotNeeded, counters)
}
}

View file

@ -14,7 +14,7 @@ use std::{
net::SocketAddr,
path::PathBuf,
sync::{
atomic::{AtomicU32, AtomicU64, Ordering},
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
@ -52,7 +52,10 @@ use crate::{
peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
},
peer_state::{InflightRequest, LivePeerState, Peer, PeerRx, PeerState, PeerTx, SendMany},
peer_state::{
atomic_inc, AggregatePeerStatsAtomic, InflightRequest, LivePeerState, Peer, PeerRx,
PeerState, PeerTx, SendMany,
},
spawn_utils::{spawn, BlockingSpawner},
type_aliases::{PeerHandle, BF},
};
@ -68,29 +71,7 @@ pub struct PeerStates {
states: DashMap<PeerHandle, Peer>,
}
#[derive(Debug, Default, Serialize)]
pub struct AggregatePeerStatsAtomic {
pub queued: AtomicU32,
pub connecting: AtomicU32,
pub live: AtomicU32,
pub seen: AtomicU32,
pub dead: AtomicU32,
pub not_needed: AtomicU32,
}
impl AggregatePeerStatsAtomic {
fn counter(&self, state: &PeerState) -> &AtomicU32 {
match state {
PeerState::Connecting(_) => &self.connecting,
PeerState::Live(_) => &self.live,
PeerState::Queued => &self.queued,
PeerState::Dead => &self.dead,
PeerState::NotNeeded => &self.not_needed,
}
}
}
#[derive(Debug, Default, Serialize)]
#[derive(Debug, Default, Serialize, PartialEq, Eq)]
pub struct AggregatePeerStats {
pub queued: usize,
pub connecting: usize,
@ -123,7 +104,7 @@ impl PeerStates {
.iter()
.fold(AggregatePeerStats::default(), |mut s, p| {
s.seen += 1;
match &p.value().state {
match &p.value().state.get() {
PeerState::Connecting(_) => s.connecting += 1,
PeerState::Live(_) => s.live += 1,
PeerState::Queued => s.queued += 1,
@ -143,7 +124,8 @@ impl PeerStates {
Entry::Occupied(_) => None,
Entry::Vacant(vac) => {
vac.insert(Default::default());
self.stats.queued.fetch_add(1, Ordering::Relaxed);
atomic_inc(&self.stats.queued);
atomic_inc(&self.stats.seen);
Some(addr)
}
}
@ -162,10 +144,12 @@ impl PeerStates {
.map(|e| f(TimedExistence::new(e, reason).value_mut()))
}
pub fn with_live<R>(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option<R> {
self.states.get(&addr).and_then(|e| match &e.value().state {
PeerState::Live(l) => Some(f(l)),
_ => None,
})
self.states
.get(&addr)
.and_then(|e| match &e.value().state.get() {
PeerState::Live(l) => Some(f(l)),
_ => None,
})
}
pub fn with_live_mut<R>(
&self,
@ -173,20 +157,19 @@ impl PeerStates {
reason: &'static str,
f: impl FnOnce(&mut LivePeerState) -> R,
) -> Option<R> {
self.with_peer_mut(addr, reason, |peer| match &mut peer.state {
PeerState::Live(l) => Some(f(l)),
_ => None,
})
.flatten()
self.with_peer_mut(addr, reason, |peer| peer.state.get_live_mut().map(f))
.flatten()
}
pub fn mark_peer_dead(&self, handle: PeerHandle) -> Option<Option<LivePeerState>> {
self.with_peer_mut(handle, "mark_peer_dead", |peer| peer.state.to_dead())
.flatten()
let prev = self.with_peer_mut(handle, "mark_peer_dead", |peer| {
peer.state.to_dead(&self.stats)
})?;
Some(prev.take_live_no_counters())
}
pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> {
let p = self.states.remove(&handle).map(|r| r.1)?;
self.stats.counter(&p.state).fetch_sub(1, Ordering::Relaxed);
self.stats.dec(p.state.get());
Some(p)
}
pub fn mark_i_am_choked(&self, handle: PeerHandle, is_choked: bool) -> Option<bool> {
@ -216,12 +199,14 @@ impl PeerStates {
})
}
pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<PeerRx> {
self.with_peer_mut(h, "mark_peer_connecting", |peer| {
peer.state
.queued_to_connecting()
.context("invalid peer state")
})
.context("peer not found in states")?
let rx = self
.with_peer_mut(h, "mark_peer_connecting", |peer| {
peer.state
.queued_to_connecting(&self.stats)
.context("invalid peer state")
})
.context("peer not found in states")??;
Ok(rx)
}
pub fn clone_tx(&self, handle: PeerHandle) -> Option<PeerTx> {
@ -234,11 +219,11 @@ impl PeerStates {
});
}
fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<LivePeerState> {
self.with_peer_mut(handle, "mark_peer_not_needed", |peer| {
peer.state.to_not_needed()
})
.flatten()
fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<PeerState> {
let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| {
peer.state.to_not_needed(&self.stats)
})?;
Some(prev)
}
}
@ -289,6 +274,7 @@ pub struct StatsSnapshot {
pub time: Instant,
pub total_piece_download_ms: u64,
pub peer_stats: AggregatePeerStats,
pub new_peer_stats: AggregatePeerStats,
}
impl StatsSnapshot {
@ -677,10 +663,14 @@ impl TorrentState {
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.state.connecting_to_live(Id20(h.peer_id)).is_some()
p.state
.connecting_to_live(Id20(h.peer_id), &self.peers.stats)
.is_some()
});
match result {
Some(true) => debug!("set peer to live"),
Some(true) => {
debug!("set peer to live")
}
Some(false) => debug!("can't set peer live, it was in wrong state"),
None => debug!("can't set peer live, it disappeared"),
}
@ -694,7 +684,9 @@ impl TorrentState {
return;
}
};
match std::mem::take(&mut pe.value_mut().state) {
let prev = pe.value_mut().state.take(&self.peers.stats);
match prev {
PeerState::Connecting(_) => {}
PeerState::Live(live) => {
let mut g = self.lock_write("mark_chunk_requests_canceled");
@ -709,7 +701,9 @@ impl TorrentState {
}
PeerState::NotNeeded => {
// Restore it as std::mem::take() replaced it above.
pe.value_mut().state = PeerState::NotNeeded;
pe.value_mut()
.state
.set(PeerState::NotNeeded, &self.peers.stats);
return;
}
s @ PeerState::Queued | s @ PeerState::Dead => {
@ -723,17 +717,21 @@ impl TorrentState {
if error.is_none() {
debug!("peer died without errors, not re-queueing");
pe.value_mut().state = PeerState::NotNeeded;
pe.value_mut()
.state
.set(PeerState::NotNeeded, &self.peers.stats);
return;
}
if self.is_finished() {
debug!("torrent finished, not re-queueing");
pe.value_mut().state = PeerState::NotNeeded;
pe.value_mut()
.state
.set(PeerState::NotNeeded, &self.peers.stats);
return;
}
pe.value_mut().state = PeerState::Dead;
pe.value_mut().state.set(PeerState::Dead, &self.peers.stats);
let backoff = pe.value_mut().stats.backoff.next_backoff();
// Prevent deadlocks.
@ -754,8 +752,10 @@ impl TorrentState {
state
.peers
.with_peer_mut(handle, "dead_to_queued", |peer| {
match &peer.state {
PeerState::Dead => peer.state = PeerState::Queued,
match peer.state.get() {
PeerState::Dead => {
peer.state.set(PeerState::Queued, &state.peers.stats)
}
other => bail!(
"peer is in unexpected state: {}. Expected dead",
other.name()
@ -793,7 +793,7 @@ impl TorrentState {
let mut futures = Vec::new();
for pe in self.peers.states.iter() {
match &pe.value().state {
match &pe.value().state.get() {
PeerState::Live(live) => {
if !live.peer_interested {
continue;
@ -856,11 +856,17 @@ impl TorrentState {
pub fn stats_snapshot(&self, with_peer_stats: bool) -> StatsSnapshot {
use Ordering::*;
let new_peer_stats = self.peers.stats_from_atomic();
let peer_stats = if with_peer_stats {
self.peers.stats()
let old_stats = self.peers.stats();
if old_stats != new_peer_stats {
warn!("old != new: {old_stats:?} != {new_peer_stats:?}")
}
old_stats
} else {
Default::default()
};
let downloaded = self.stats.downloaded_and_checked.load(Relaxed);
let remaining = self.needed - downloaded;
StatsSnapshot {
@ -875,6 +881,7 @@ impl TorrentState {
remaining_bytes: remaining,
total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed),
peer_stats,
new_peer_stats,
}
}
@ -1367,10 +1374,14 @@ impl PeerHandler {
fn disconnect_all_peers_that_have_full_torrent(&self) {
for mut pe in self.state.peers.states.iter_mut() {
if let PeerState::Live(l) = &pe.value().state {
if let PeerState::Live(l) = pe.value().state.get() {
if l.has_full_torrent(self.state.lengths.total_pieces() as usize) {
let live = pe.value_mut().state.to_not_needed().unwrap();
let _ = live.tx.send(WriterRequest::Disconnect);
let prev = pe.value_mut().state.to_not_needed(&self.state.peers.stats);
let _ = prev
.take_live_no_counters()
.unwrap()
.tx
.send(WriterRequest::Disconnect);
}
}
}