From 3797a91be9749054cc9d1bca647f8e9e94ef3869 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 20 Nov 2023 13:29:12 +0000 Subject: [PATCH] Add per-peer counters --- crates/librqbit/src/peer_connection.rs | 8 +- crates/librqbit/src/peer_state.rs | 16 +- crates/librqbit/src/torrent_state.rs | 285 +++++++++++++------------ 3 files changed, 173 insertions(+), 136 deletions(-) diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 0814a9f..cd8ca38 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -1,4 +1,7 @@ -use std::{net::SocketAddr, time::Duration}; +use std::{ + net::SocketAddr, + time::{Duration, Instant}, +}; use anyhow::Context; use buffers::{ByteBuf, ByteString}; @@ -15,6 +18,7 @@ use tracing::{debug, trace}; use crate::spawn_utils::BlockingSpawner; pub trait PeerConnectionHandler { + fn on_connected(&self, _connection_time: Duration) {} fn get_have_bytes(&self) -> u64; fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> Option; fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>; @@ -127,9 +131,11 @@ impl PeerConnection { .connect_timeout .unwrap_or_else(|| Duration::from_secs(10)); + let now = Instant::now(); let mut conn = with_timeout(connect_timeout, tokio::net::TcpStream::connect(self.addr)) .await .context("error connecting")?; + self.handler.on_connected(now.elapsed()); let mut write_buf = Vec::::with_capacity(PIECE_MESSAGE_DEFAULT_LEN); let handshake = Handshake::new(self.info_hash, self.peer_id); diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 12c7a68..1a055e1 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::sync::Arc; use std::time::Duration; use anyhow::Context; @@ -44,14 +45,27 @@ impl SendMany for PeerTx { } } +#[derive(Default, Debug)] +pub struct PeerCounters { + pub fetched_bytes: AtomicU64, + pub total_time_connecting_ms: AtomicU64, + pub connection_attempts: AtomicU32, + pub connections: AtomicU32, + pub errors: AtomicU32, + pub fetched_chunks: AtomicU32, + pub downloaded_and_checked_pieces: AtomicU32, +} + #[derive(Debug)] pub struct PeerStats { + pub counters: Arc, pub backoff: ExponentialBackoff, } impl Default for PeerStats { fn default() -> Self { Self { + counters: Arc::new(Default::default()), backoff: ExponentialBackoffBuilder::new() .with_initial_interval(Duration::from_secs(10)) .with_multiplier(6.) diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index c02b33e..c5d5d4b 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -84,8 +84,8 @@ use crate::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, peer_state::{ - atomic_inc, AggregatePeerStatsAtomic, InflightRequest, LivePeerState, Peer, PeerRx, - PeerState, PeerTx, SendMany, + atomic_inc, AggregatePeerStatsAtomic, InflightRequest, LivePeerState, Peer, PeerCounters, + PeerRx, PeerState, PeerTx, SendMany, }, spawn_utils::{spawn, BlockingSpawner}, type_aliases::{PeerHandle, BF}, @@ -174,12 +174,6 @@ impl PeerStates { .flatten() } - pub fn mark_peer_dead(&self, handle: PeerHandle) -> Option> { - let prev = self.with_peer_mut(handle, "mark_peer_dead", |peer| { - peer.state.to_dead(&self.stats) - })?; - Some(prev.take_live_no_counters()) - } pub fn drop_peer(&self, handle: PeerHandle) -> Option { let p = self.states.remove(&handle).map(|r| r.1)?; self.stats.dec(p.state.get()); @@ -235,17 +229,15 @@ pub struct TorrentStateLocked { #[derive(Default, Debug)] struct AtomicStats { have: AtomicU64, - downloaded_and_checked: AtomicU64, + downloaded_and_checked_pieces: AtomicU64, uploaded: AtomicU64, fetched_bytes: AtomicU64, - - downloaded_pieces: AtomicU64, total_piece_download_ms: AtomicU64, } impl AtomicStats { fn average_piece_download_time(&self) -> Option { - let d = self.downloaded_pieces.load(Ordering::Relaxed); + let d = self.downloaded_and_checked_pieces.load(Ordering::Relaxed); let t = self.total_piece_download_ms.load(Ordering::Relaxed); if d == 0 { return None; @@ -465,6 +457,11 @@ impl TorrentState { let state = self; let (rx, tx) = state.peers.mark_peer_connecting(addr)?; + let counters = state + .peers + .with_peer(addr, |p| p.stats.counters.clone()) + .context("bug: peer not found")?; + let handler = PeerHandler { addr, on_bitfield_notify: Default::default(), @@ -477,13 +474,13 @@ impl TorrentState { state: state.clone(), tx, spawner, + counters, }; let options = PeerConnectionOptions { connect_timeout: state.options.peer_connect_timeout, read_write_timeout: state.options.peer_read_write_timeout, ..Default::default() }; - let peer_connection = PeerConnection::new( addr, state.info_hash, @@ -493,22 +490,26 @@ impl TorrentState { spawner, ); let requester = handler.task_peer_chunk_requester(addr); + + handler + .counters + .connection_attempts + .fetch_add(1, Ordering::Relaxed); let res = tokio::select! { r = requester => {r} r = peer_connection.manage_peer(rx) => {r} }; - let state = handler.state; - state.peer_semaphore.add_permits(1); + handler.state.peer_semaphore.add_permits(1); match res { // We disconnected the peer ourselves as we don't need it Ok(()) => { - state.on_peer_died(addr, None); + handler.on_peer_died(None); } Err(e) => { debug!("error managing peer: {:#}", e); - state.on_peer_died(addr, Some(e)); + handler.on_peer_died(Some(e)); } } Ok::<_, anyhow::Error>(()) @@ -599,109 +600,13 @@ impl TorrentState { } } - fn on_peer_died(self: &Arc, handle: PeerHandle, error: Option) { - let mut pe = match self.peers.states.get_mut(&handle) { - Some(peer) => TimedExistence::new(peer, "on_peer_died"), - None => { - warn!("bug: peer not found in table. Forgetting it forever"); - return; - } - }; - let prev = pe.value_mut().state.take(&self.peers.stats); - - match prev { - PeerState::Connecting(_) => {} - PeerState::Live(live) => { - let mut g = self.lock_write("mark_chunk_requests_canceled"); - for req in live.inflight_requests { - debug!( - "peer dead, marking chunk request cancelled, index={}, chunk={}", - req.piece.get(), - req.chunk - ); - g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); - } - } - PeerState::NotNeeded => { - // Restore it as std::mem::take() replaced it above. - pe.value_mut() - .state - .set(PeerState::NotNeeded, &self.peers.stats); - return; - } - s @ PeerState::Queued | s @ PeerState::Dead => { - warn!("bug: peer was in a wrong state {s:?}, ignoring it forever"); - // Prevent deadlocks. - drop(pe); - self.peers.drop_peer(handle); - return; - } - }; - - if error.is_none() { - debug!("peer died without errors, not re-queueing"); - pe.value_mut() - .state - .set(PeerState::NotNeeded, &self.peers.stats); - return; - } - - if self.is_finished() { - debug!("torrent finished, not re-queueing"); - pe.value_mut() - .state - .set(PeerState::NotNeeded, &self.peers.stats); - return; - } - - pe.value_mut().state.set(PeerState::Dead, &self.peers.stats); - let backoff = pe.value_mut().stats.backoff.next_backoff(); - - // Prevent deadlocks. - drop(pe); - - if let Some(dur) = backoff { - let state = self.clone(); - spawn( - span!( - parent: None, - Level::ERROR, - "wait_for_peer", - peer = handle.to_string(), - duration = format!("{dur:?}") - ), - async move { - tokio::time::sleep(dur).await; - state - .peers - .with_peer_mut(handle, "dead_to_queued", |peer| { - match peer.state.get() { - PeerState::Dead => { - peer.state.set(PeerState::Queued, &state.peers.stats) - } - other => bail!( - "peer is in unexpected state: {}. Expected dead", - other.name() - ), - }; - Ok(()) - }) - .context("bug: peer disappeared")??; - state.peer_queue_tx.send(handle)?; - Ok::<_, anyhow::Error>(()) - }, - ); - } else { - debug!("dropping peer, backoff exhausted"); - self.peers.drop_peer(handle); - } - } - pub fn get_uploaded(&self) -> u64 { self.stats.uploaded.load(Ordering::Relaxed) } pub fn get_downloaded(&self) -> u64 { - self.stats.downloaded_and_checked.load(Ordering::Acquire) + self.stats + .downloaded_and_checked_pieces + .load(Ordering::Acquire) } pub fn is_finished(&self) -> bool { @@ -779,12 +684,12 @@ impl TorrentState { pub fn stats_snapshot(&self) -> StatsSnapshot { use Ordering::*; - let downloaded = self.stats.downloaded_and_checked.load(Relaxed); + let downloaded = self.stats.downloaded_and_checked_pieces.load(Relaxed); let remaining = self.needed - downloaded; StatsSnapshot { have_bytes: self.stats.have.load(Relaxed), downloaded_and_checked_bytes: downloaded, - downloaded_and_checked_pieces: self.stats.downloaded_pieces.load(Relaxed), + downloaded_and_checked_pieces: self.stats.downloaded_and_checked_pieces.load(Relaxed), fetched_bytes: self.stats.fetched_bytes.load(Relaxed), uploaded_bytes: self.stats.uploaded.load(Relaxed), total_bytes: self.have_plus_needed, @@ -813,9 +718,10 @@ struct PeerHandlerLocked { } // All peer state that would never be used by other actors should pe put here. +// This state tracks a live peer. struct PeerHandler { state: Arc, - + counters: Arc, // Semantically, we don't need an RwLock here, as this is only requested from // one future (requester + manage_peer). // @@ -840,6 +746,12 @@ struct PeerHandler { } impl<'a> PeerConnectionHandler for &'a PeerHandler { + fn on_connected(&self, connection_time: Duration) { + self.counters.connections.fetch_add(1, Ordering::Relaxed); + self.counters + .total_time_connecting_ms + .fetch_add(connection_time.as_millis() as u64, Ordering::Relaxed); + } fn on_received_message(&self, message: Message>) -> anyhow::Result<()> { match message { Message::Request(request) => { @@ -901,6 +813,102 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { } impl PeerHandler { + fn on_peer_died(self, error: Option) { + let peers = &self.state.peers; + let pstats = &peers.stats; + let handle = self.addr; + let mut pe = match peers.states.get_mut(&handle) { + Some(peer) => TimedExistence::new(peer, "on_peer_died"), + None => { + warn!("bug: peer not found in table. Forgetting it forever"); + return; + } + }; + let prev = pe.value_mut().state.take(pstats); + + match prev { + PeerState::Connecting(_) => {} + PeerState::Live(live) => { + let mut g = self.state.lock_write("mark_chunk_requests_canceled"); + for req in live.inflight_requests { + debug!( + "peer dead, marking chunk request cancelled, index={}, chunk={}", + req.piece.get(), + req.chunk + ); + g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); + } + } + PeerState::NotNeeded => { + // Restore it as std::mem::take() replaced it above. + pe.value_mut().state.set(PeerState::NotNeeded, pstats); + return; + } + s @ PeerState::Queued | s @ PeerState::Dead => { + warn!("bug: peer was in a wrong state {s:?}, ignoring it forever"); + // Prevent deadlocks. + drop(pe); + self.state.peers.drop_peer(handle); + return; + } + }; + + if error.is_none() { + debug!("peer died without errors, not re-queueing"); + pe.value_mut().state.set(PeerState::NotNeeded, pstats); + return; + } else { + self.counters.errors.fetch_add(1, Ordering::Relaxed); + } + + if self.state.is_finished() { + debug!("torrent finished, not re-queueing"); + pe.value_mut().state.set(PeerState::NotNeeded, pstats); + return; + } + + pe.value_mut().state.set(PeerState::Dead, pstats); + let backoff = pe.value_mut().stats.backoff.next_backoff(); + + // Prevent deadlocks. + drop(pe); + + if let Some(dur) = backoff { + spawn( + span!( + parent: None, + Level::ERROR, + "wait_for_peer", + peer = handle.to_string(), + duration = format!("{dur:?}") + ), + async move { + tokio::time::sleep(dur).await; + self.state + .peers + .with_peer_mut(handle, "dead_to_queued", |peer| { + match peer.state.get() { + PeerState::Dead => { + peer.state.set(PeerState::Queued, &self.state.peers.stats) + } + other => bail!( + "peer is in unexpected state: {}. Expected dead", + other.name() + ), + }; + Ok(()) + }) + .context("bug: peer disappeared")??; + self.state.peer_queue_tx.send(handle)?; + Ok::<_, anyhow::Error>(()) + }, + ); + } else { + debug!("dropping peer, backoff exhausted"); + self.state.peers.drop_peer(handle); + } + } + fn reserve_next_needed_piece(&self) -> Option { // TODO: locking one inside the other in different order results in deadlocks. self.state @@ -938,7 +946,11 @@ impl PeerHandler { } fn try_steal_old_slow_piece(&self, threshold: f64) -> Option { - let total = self.state.stats.downloaded_pieces.load(Ordering::Relaxed); + let total = self + .state + .stats + .downloaded_and_checked_pieces + .load(Ordering::Acquire); // heuristic for not enough precision in average time if total < 20 { @@ -1209,14 +1221,21 @@ impl PeerHandler { self.requests_sem.add_permits(1); + // Peer chunk/byte counters. + self.counters + .fetched_bytes + .fetch_add(piece.block.len() as u64, Ordering::Relaxed); + self.counters.fetched_chunks.fetch_add(1, Ordering::Relaxed); + + // Global chunk/byte counters. + self.state + .stats + .fetched_bytes + .fetch_add(piece.block.len() as u64, Ordering::Relaxed); + self.state .peers .with_live_mut(self.addr, "inflight_requests.remove", |h| { - self.state - .stats - .fetched_bytes - .fetch_add(piece.block.len() as u64, Ordering::Relaxed); - if !h .inflight_requests .remove(&InflightRequest::from(&chunk_info)) @@ -1312,11 +1331,12 @@ impl PeerHandler { .with_context(|| format!("error checking piece={index}"))? { true => { + // Global piece counters. let piece_len = self.state.lengths.piece_length(chunk_info.piece_index) as u64; self.state .stats - .downloaded_and_checked + .downloaded_and_checked_pieces // This counter is used to compute "is_finished", so using // stronger ordering. .fetch_add(piece_len, Ordering::Release); @@ -1324,18 +1344,15 @@ impl PeerHandler { .stats .have .fetch_add(piece_len, Ordering::Relaxed); - self.state - .stats - .downloaded_pieces - .fetch_add(1, Ordering::Relaxed); - self.state - .stats - .downloaded_pieces - .fetch_add(1, Ordering::Relaxed); self.state.stats.total_piece_download_ms.fetch_add( full_piece_download_time.as_millis() as u64, Ordering::Relaxed, ); + + // Per-peer piece counters. + self.counters + .downloaded_and_checked_pieces + .fetch_add(1, Ordering::Relaxed); { let mut g = self.state.lock_write("mark_piece_downloaded"); g.chunks.mark_piece_downloaded(chunk_info.piece_index);