Add per-peer counters

This commit is contained in:
Igor Katson 2023-11-20 13:29:12 +00:00
parent ef441b18e6
commit 3797a91be9
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 173 additions and 136 deletions

View file

@ -1,4 +1,7 @@
use std::{net::SocketAddr, time::Duration};
use std::{
net::SocketAddr,
time::{Duration, Instant},
};
use anyhow::Context;
use buffers::{ByteBuf, ByteString};
@ -15,6 +18,7 @@ use tracing::{debug, trace};
use crate::spawn_utils::BlockingSpawner;
pub trait PeerConnectionHandler {
fn on_connected(&self, _connection_time: Duration) {}
fn get_have_bytes(&self) -> u64;
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> Option<usize>;
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>;
@ -127,9 +131,11 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
.connect_timeout
.unwrap_or_else(|| Duration::from_secs(10));
let now = Instant::now();
let mut conn = with_timeout(connect_timeout, tokio::net::TcpStream::connect(self.addr))
.await
.context("error connecting")?;
self.handler.on_connected(now.elapsed());
let mut write_buf = Vec::<u8>::with_capacity(PIECE_MESSAGE_DEFAULT_LEN);
let handshake = Handshake::new(self.info_hash, self.peer_id);

View file

@ -1,5 +1,6 @@
use std::collections::HashSet;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
@ -44,14 +45,27 @@ impl SendMany for PeerTx {
}
}
#[derive(Default, Debug)]
pub struct PeerCounters {
pub fetched_bytes: AtomicU64,
pub total_time_connecting_ms: AtomicU64,
pub connection_attempts: AtomicU32,
pub connections: AtomicU32,
pub errors: AtomicU32,
pub fetched_chunks: AtomicU32,
pub downloaded_and_checked_pieces: AtomicU32,
}
#[derive(Debug)]
pub struct PeerStats {
pub counters: Arc<PeerCounters>,
pub backoff: ExponentialBackoff,
}
impl Default for PeerStats {
fn default() -> Self {
Self {
counters: Arc::new(Default::default()),
backoff: ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_secs(10))
.with_multiplier(6.)

View file

@ -84,8 +84,8 @@ use crate::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
},
peer_state::{
atomic_inc, AggregatePeerStatsAtomic, InflightRequest, LivePeerState, Peer, PeerRx,
PeerState, PeerTx, SendMany,
atomic_inc, AggregatePeerStatsAtomic, InflightRequest, LivePeerState, Peer, PeerCounters,
PeerRx, PeerState, PeerTx, SendMany,
},
spawn_utils::{spawn, BlockingSpawner},
type_aliases::{PeerHandle, BF},
@ -174,12 +174,6 @@ impl PeerStates {
.flatten()
}
pub fn mark_peer_dead(&self, handle: PeerHandle) -> Option<Option<LivePeerState>> {
let prev = self.with_peer_mut(handle, "mark_peer_dead", |peer| {
peer.state.to_dead(&self.stats)
})?;
Some(prev.take_live_no_counters())
}
pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> {
let p = self.states.remove(&handle).map(|r| r.1)?;
self.stats.dec(p.state.get());
@ -235,17 +229,15 @@ pub struct TorrentStateLocked {
#[derive(Default, Debug)]
struct AtomicStats {
have: AtomicU64,
downloaded_and_checked: AtomicU64,
downloaded_and_checked_pieces: AtomicU64,
uploaded: AtomicU64,
fetched_bytes: AtomicU64,
downloaded_pieces: AtomicU64,
total_piece_download_ms: AtomicU64,
}
impl AtomicStats {
fn average_piece_download_time(&self) -> Option<Duration> {
let d = self.downloaded_pieces.load(Ordering::Relaxed);
let d = self.downloaded_and_checked_pieces.load(Ordering::Relaxed);
let t = self.total_piece_download_ms.load(Ordering::Relaxed);
if d == 0 {
return None;
@ -465,6 +457,11 @@ impl TorrentState {
let state = self;
let (rx, tx) = state.peers.mark_peer_connecting(addr)?;
let counters = state
.peers
.with_peer(addr, |p| p.stats.counters.clone())
.context("bug: peer not found")?;
let handler = PeerHandler {
addr,
on_bitfield_notify: Default::default(),
@ -477,13 +474,13 @@ impl TorrentState {
state: state.clone(),
tx,
spawner,
counters,
};
let options = PeerConnectionOptions {
connect_timeout: state.options.peer_connect_timeout,
read_write_timeout: state.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
addr,
state.info_hash,
@ -493,22 +490,26 @@ impl TorrentState {
spawner,
);
let requester = handler.task_peer_chunk_requester(addr);
handler
.counters
.connection_attempts
.fetch_add(1, Ordering::Relaxed);
let res = tokio::select! {
r = requester => {r}
r = peer_connection.manage_peer(rx) => {r}
};
let state = handler.state;
state.peer_semaphore.add_permits(1);
handler.state.peer_semaphore.add_permits(1);
match res {
// We disconnected the peer ourselves as we don't need it
Ok(()) => {
state.on_peer_died(addr, None);
handler.on_peer_died(None);
}
Err(e) => {
debug!("error managing peer: {:#}", e);
state.on_peer_died(addr, Some(e));
handler.on_peer_died(Some(e));
}
}
Ok::<_, anyhow::Error>(())
@ -599,109 +600,13 @@ impl TorrentState {
}
}
fn on_peer_died(self: &Arc<Self>, handle: PeerHandle, error: Option<anyhow::Error>) {
let mut pe = match self.peers.states.get_mut(&handle) {
Some(peer) => TimedExistence::new(peer, "on_peer_died"),
None => {
warn!("bug: peer not found in table. Forgetting it forever");
return;
}
};
let prev = pe.value_mut().state.take(&self.peers.stats);
match prev {
PeerState::Connecting(_) => {}
PeerState::Live(live) => {
let mut g = self.lock_write("mark_chunk_requests_canceled");
for req in live.inflight_requests {
debug!(
"peer dead, marking chunk request cancelled, index={}, chunk={}",
req.piece.get(),
req.chunk
);
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
}
}
PeerState::NotNeeded => {
// Restore it as std::mem::take() replaced it above.
pe.value_mut()
.state
.set(PeerState::NotNeeded, &self.peers.stats);
return;
}
s @ PeerState::Queued | s @ PeerState::Dead => {
warn!("bug: peer was in a wrong state {s:?}, ignoring it forever");
// Prevent deadlocks.
drop(pe);
self.peers.drop_peer(handle);
return;
}
};
if error.is_none() {
debug!("peer died without errors, not re-queueing");
pe.value_mut()
.state
.set(PeerState::NotNeeded, &self.peers.stats);
return;
}
if self.is_finished() {
debug!("torrent finished, not re-queueing");
pe.value_mut()
.state
.set(PeerState::NotNeeded, &self.peers.stats);
return;
}
pe.value_mut().state.set(PeerState::Dead, &self.peers.stats);
let backoff = pe.value_mut().stats.backoff.next_backoff();
// Prevent deadlocks.
drop(pe);
if let Some(dur) = backoff {
let state = self.clone();
spawn(
span!(
parent: None,
Level::ERROR,
"wait_for_peer",
peer = handle.to_string(),
duration = format!("{dur:?}")
),
async move {
tokio::time::sleep(dur).await;
state
.peers
.with_peer_mut(handle, "dead_to_queued", |peer| {
match peer.state.get() {
PeerState::Dead => {
peer.state.set(PeerState::Queued, &state.peers.stats)
}
other => bail!(
"peer is in unexpected state: {}. Expected dead",
other.name()
),
};
Ok(())
})
.context("bug: peer disappeared")??;
state.peer_queue_tx.send(handle)?;
Ok::<_, anyhow::Error>(())
},
);
} else {
debug!("dropping peer, backoff exhausted");
self.peers.drop_peer(handle);
}
}
pub fn get_uploaded(&self) -> u64 {
self.stats.uploaded.load(Ordering::Relaxed)
}
pub fn get_downloaded(&self) -> u64 {
self.stats.downloaded_and_checked.load(Ordering::Acquire)
self.stats
.downloaded_and_checked_pieces
.load(Ordering::Acquire)
}
pub fn is_finished(&self) -> bool {
@ -779,12 +684,12 @@ impl TorrentState {
pub fn stats_snapshot(&self) -> StatsSnapshot {
use Ordering::*;
let downloaded = self.stats.downloaded_and_checked.load(Relaxed);
let downloaded = self.stats.downloaded_and_checked_pieces.load(Relaxed);
let remaining = self.needed - downloaded;
StatsSnapshot {
have_bytes: self.stats.have.load(Relaxed),
downloaded_and_checked_bytes: downloaded,
downloaded_and_checked_pieces: self.stats.downloaded_pieces.load(Relaxed),
downloaded_and_checked_pieces: self.stats.downloaded_and_checked_pieces.load(Relaxed),
fetched_bytes: self.stats.fetched_bytes.load(Relaxed),
uploaded_bytes: self.stats.uploaded.load(Relaxed),
total_bytes: self.have_plus_needed,
@ -813,9 +718,10 @@ struct PeerHandlerLocked {
}
// All peer state that would never be used by other actors should pe put here.
// This state tracks a live peer.
struct PeerHandler {
state: Arc<TorrentState>,
counters: Arc<PeerCounters>,
// Semantically, we don't need an RwLock here, as this is only requested from
// one future (requester + manage_peer).
//
@ -840,6 +746,12 @@ struct PeerHandler {
}
impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn on_connected(&self, connection_time: Duration) {
self.counters.connections.fetch_add(1, Ordering::Relaxed);
self.counters
.total_time_connecting_ms
.fetch_add(connection_time.as_millis() as u64, Ordering::Relaxed);
}
fn on_received_message(&self, message: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
match message {
Message::Request(request) => {
@ -901,6 +813,102 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
}
impl PeerHandler {
fn on_peer_died(self, error: Option<anyhow::Error>) {
let peers = &self.state.peers;
let pstats = &peers.stats;
let handle = self.addr;
let mut pe = match peers.states.get_mut(&handle) {
Some(peer) => TimedExistence::new(peer, "on_peer_died"),
None => {
warn!("bug: peer not found in table. Forgetting it forever");
return;
}
};
let prev = pe.value_mut().state.take(pstats);
match prev {
PeerState::Connecting(_) => {}
PeerState::Live(live) => {
let mut g = self.state.lock_write("mark_chunk_requests_canceled");
for req in live.inflight_requests {
debug!(
"peer dead, marking chunk request cancelled, index={}, chunk={}",
req.piece.get(),
req.chunk
);
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
}
}
PeerState::NotNeeded => {
// Restore it as std::mem::take() replaced it above.
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return;
}
s @ PeerState::Queued | s @ PeerState::Dead => {
warn!("bug: peer was in a wrong state {s:?}, ignoring it forever");
// Prevent deadlocks.
drop(pe);
self.state.peers.drop_peer(handle);
return;
}
};
if error.is_none() {
debug!("peer died without errors, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return;
} else {
self.counters.errors.fetch_add(1, Ordering::Relaxed);
}
if self.state.is_finished() {
debug!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return;
}
pe.value_mut().state.set(PeerState::Dead, pstats);
let backoff = pe.value_mut().stats.backoff.next_backoff();
// Prevent deadlocks.
drop(pe);
if let Some(dur) = backoff {
spawn(
span!(
parent: None,
Level::ERROR,
"wait_for_peer",
peer = handle.to_string(),
duration = format!("{dur:?}")
),
async move {
tokio::time::sleep(dur).await;
self.state
.peers
.with_peer_mut(handle, "dead_to_queued", |peer| {
match peer.state.get() {
PeerState::Dead => {
peer.state.set(PeerState::Queued, &self.state.peers.stats)
}
other => bail!(
"peer is in unexpected state: {}. Expected dead",
other.name()
),
};
Ok(())
})
.context("bug: peer disappeared")??;
self.state.peer_queue_tx.send(handle)?;
Ok::<_, anyhow::Error>(())
},
);
} else {
debug!("dropping peer, backoff exhausted");
self.state.peers.drop_peer(handle);
}
}
fn reserve_next_needed_piece(&self) -> Option<ValidPieceIndex> {
// TODO: locking one inside the other in different order results in deadlocks.
self.state
@ -938,7 +946,11 @@ impl PeerHandler {
}
fn try_steal_old_slow_piece(&self, threshold: f64) -> Option<ValidPieceIndex> {
let total = self.state.stats.downloaded_pieces.load(Ordering::Relaxed);
let total = self
.state
.stats
.downloaded_and_checked_pieces
.load(Ordering::Acquire);
// heuristic for not enough precision in average time
if total < 20 {
@ -1209,14 +1221,21 @@ impl PeerHandler {
self.requests_sem.add_permits(1);
// Peer chunk/byte counters.
self.counters
.fetched_bytes
.fetch_add(piece.block.len() as u64, Ordering::Relaxed);
self.counters.fetched_chunks.fetch_add(1, Ordering::Relaxed);
// Global chunk/byte counters.
self.state
.stats
.fetched_bytes
.fetch_add(piece.block.len() as u64, Ordering::Relaxed);
self.state
.peers
.with_live_mut(self.addr, "inflight_requests.remove", |h| {
self.state
.stats
.fetched_bytes
.fetch_add(piece.block.len() as u64, Ordering::Relaxed);
if !h
.inflight_requests
.remove(&InflightRequest::from(&chunk_info))
@ -1312,11 +1331,12 @@ impl PeerHandler {
.with_context(|| format!("error checking piece={index}"))?
{
true => {
// Global piece counters.
let piece_len =
self.state.lengths.piece_length(chunk_info.piece_index) as u64;
self.state
.stats
.downloaded_and_checked
.downloaded_and_checked_pieces
// This counter is used to compute "is_finished", so using
// stronger ordering.
.fetch_add(piece_len, Ordering::Release);
@ -1324,18 +1344,15 @@ impl PeerHandler {
.stats
.have
.fetch_add(piece_len, Ordering::Relaxed);
self.state
.stats
.downloaded_pieces
.fetch_add(1, Ordering::Relaxed);
self.state
.stats
.downloaded_pieces
.fetch_add(1, Ordering::Relaxed);
self.state.stats.total_piece_download_ms.fetch_add(
full_piece_download_time.as_millis() as u64,
Ordering::Relaxed,
);
// Per-peer piece counters.
self.counters
.downloaded_and_checked_pieces
.fetch_add(1, Ordering::Relaxed);
{
let mut g = self.state.lock_write("mark_piece_downloaded");
g.chunks.mark_piece_downloaded(chunk_info.piece_index);