Add a struct for session stats + bump the counters
This commit is contained in:
parent
ee2ad7138e
commit
67afdb0aa5
8 changed files with 96 additions and 31 deletions
|
|
@ -57,6 +57,7 @@ mod peer_info_reader;
|
||||||
mod read_buf;
|
mod read_buf;
|
||||||
mod session;
|
mod session;
|
||||||
mod session_persistence;
|
mod session_persistence;
|
||||||
|
mod session_stats;
|
||||||
mod spawn_utils;
|
mod spawn_utils;
|
||||||
pub mod storage;
|
pub mod storage;
|
||||||
mod stream_connect;
|
mod stream_connect;
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ use crate::{
|
||||||
peer_connection::PeerConnectionOptions,
|
peer_connection::PeerConnectionOptions,
|
||||||
read_buf::ReadBuf,
|
read_buf::ReadBuf,
|
||||||
session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore},
|
session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore},
|
||||||
|
session_stats::atomic::AtomicSessionStats,
|
||||||
spawn_utils::BlockingSpawner,
|
spawn_utils::BlockingSpawner,
|
||||||
storage::{
|
storage::{
|
||||||
filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage,
|
filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage,
|
||||||
|
|
@ -116,6 +117,8 @@ pub struct Session {
|
||||||
|
|
||||||
root_span: Option<Span>,
|
root_span: Option<Span>,
|
||||||
|
|
||||||
|
stats: Arc<AtomicSessionStats>,
|
||||||
|
|
||||||
// This is stored for all tasks to stop when session is dropped.
|
// This is stored for all tasks to stop when session is dropped.
|
||||||
_cancellation_token_drop_guard: DropGuard,
|
_cancellation_token_drop_guard: DropGuard,
|
||||||
}
|
}
|
||||||
|
|
@ -602,6 +605,7 @@ impl Session {
|
||||||
reqwest_client,
|
reqwest_client,
|
||||||
connector: stream_connector,
|
connector: stream_connector,
|
||||||
root_span: opts.root_span,
|
root_span: opts.root_span,
|
||||||
|
stats: Default::default(),
|
||||||
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3)))
|
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3)))
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -1147,6 +1151,7 @@ impl Session {
|
||||||
self.cancellation_token.child_token(),
|
self.cancellation_token.child_token(),
|
||||||
self.concurrent_initialize_semaphore.clone(),
|
self.concurrent_initialize_semaphore.clone(),
|
||||||
self.bitv_factory.clone(),
|
self.bitv_factory.clone(),
|
||||||
|
self.stats.clone(),
|
||||||
)
|
)
|
||||||
.context("error starting torrent")?;
|
.context("error starting torrent")?;
|
||||||
}
|
}
|
||||||
|
|
@ -1303,6 +1308,7 @@ impl Session {
|
||||||
self.cancellation_token.child_token(),
|
self.cancellation_token.child_token(),
|
||||||
self.concurrent_initialize_semaphore.clone(),
|
self.concurrent_initialize_semaphore.clone(),
|
||||||
self.bitv_factory.clone(),
|
self.bitv_factory.clone(),
|
||||||
|
self.stats.clone(),
|
||||||
)?;
|
)?;
|
||||||
self.try_update_persistence_metadata(handle).await;
|
self.try_update_persistence_metadata(handle).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
10
crates/librqbit/src/session_stats/atomic.rs
Normal file
10
crates/librqbit/src/session_stats/atomic.rs
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
use std::sync::atomic::AtomicU64;
|
||||||
|
|
||||||
|
use crate::torrent_state::live::peers::stats::atomic::AggregatePeerStatsAtomic;
|
||||||
|
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
pub struct AtomicSessionStats {
|
||||||
|
pub fetched_bytes: AtomicU64,
|
||||||
|
pub uploaded_bytes: AtomicU64,
|
||||||
|
pub peers: AggregatePeerStatsAtomic,
|
||||||
|
}
|
||||||
1
crates/librqbit/src/session_stats/mod.rs
Normal file
1
crates/librqbit/src/session_stats/mod.rs
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
pub mod atomic;
|
||||||
|
|
@ -70,6 +70,7 @@ use peer_binary_protocol::{
|
||||||
extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage},
|
extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage},
|
||||||
Handshake, Message, MessageOwned, Piece, Request,
|
Handshake, Message, MessageOwned, Piece, Request,
|
||||||
};
|
};
|
||||||
|
use peers::stats::atomic::AggregatePeerStatsAtomic;
|
||||||
use tokio::sync::{
|
use tokio::sync::{
|
||||||
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
||||||
Notify, OwnedSemaphorePermit, Semaphore,
|
Notify, OwnedSemaphorePermit, Semaphore,
|
||||||
|
|
@ -84,6 +85,7 @@ use crate::{
|
||||||
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
|
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
|
||||||
},
|
},
|
||||||
session::CheckedIncomingConnection,
|
session::CheckedIncomingConnection,
|
||||||
|
session_stats::atomic::AtomicSessionStats,
|
||||||
torrent_state::{peer::Peer, utils::atomic_inc},
|
torrent_state::{peer::Peer, utils::atomic_inc},
|
||||||
type_aliases::{DiskWorkQueueSender, FilePriorities, FileStorage, PeerHandle, BF},
|
type_aliases::{DiskWorkQueueSender, FilePriorities, FileStorage, PeerHandle, BF},
|
||||||
};
|
};
|
||||||
|
|
@ -201,6 +203,8 @@ pub struct TorrentStateLive {
|
||||||
up_speed_estimator: SpeedEstimator,
|
up_speed_estimator: SpeedEstimator,
|
||||||
cancellation_token: CancellationToken,
|
cancellation_token: CancellationToken,
|
||||||
|
|
||||||
|
session_stats: Arc<AtomicSessionStats>,
|
||||||
|
|
||||||
pub(crate) streams: Arc<TorrentStreams>,
|
pub(crate) streams: Arc<TorrentStreams>,
|
||||||
have_broadcast_tx: tokio::sync::broadcast::Sender<ValidPieceIndex>,
|
have_broadcast_tx: tokio::sync::broadcast::Sender<ValidPieceIndex>,
|
||||||
}
|
}
|
||||||
|
|
@ -210,6 +214,7 @@ impl TorrentStateLive {
|
||||||
paused: TorrentStatePaused,
|
paused: TorrentStatePaused,
|
||||||
fatal_errors_tx: tokio::sync::oneshot::Sender<anyhow::Error>,
|
fatal_errors_tx: tokio::sync::oneshot::Sender<anyhow::Error>,
|
||||||
cancellation_token: CancellationToken,
|
cancellation_token: CancellationToken,
|
||||||
|
session_stats: Arc<AtomicSessionStats>,
|
||||||
) -> anyhow::Result<Arc<Self>> {
|
) -> anyhow::Result<Arc<Self>> {
|
||||||
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
|
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
|
||||||
|
|
||||||
|
|
@ -237,7 +242,11 @@ impl TorrentStateLive {
|
||||||
|
|
||||||
let state = Arc::new(TorrentStateLive {
|
let state = Arc::new(TorrentStateLive {
|
||||||
meta: paused.info.clone(),
|
meta: paused.info.clone(),
|
||||||
peers: Default::default(),
|
peers: PeerStates {
|
||||||
|
session_stats: session_stats.clone(),
|
||||||
|
stats: Default::default(),
|
||||||
|
states: Default::default(),
|
||||||
|
},
|
||||||
locked: RwLock::new(TorrentStateLocked {
|
locked: RwLock::new(TorrentStateLocked {
|
||||||
chunks: Some(paused.chunk_tracker),
|
chunks: Some(paused.chunk_tracker),
|
||||||
// TODO: move under per_piece_locks?
|
// TODO: move under per_piece_locks?
|
||||||
|
|
@ -260,6 +269,7 @@ impl TorrentStateLive {
|
||||||
up_speed_estimator,
|
up_speed_estimator,
|
||||||
cancellation_token,
|
cancellation_token,
|
||||||
have_broadcast_tx,
|
have_broadcast_tx,
|
||||||
|
session_stats,
|
||||||
streams: paused.streams,
|
streams: paused.streams,
|
||||||
per_piece_locks: (0..lengths.total_pieces())
|
per_piece_locks: (0..lengths.total_pieces())
|
||||||
.map(|_| RwLock::new(()))
|
.map(|_| RwLock::new(()))
|
||||||
|
|
@ -307,6 +317,10 @@ impl TorrentStateLive {
|
||||||
spawn_with_cancel(span, self.cancellation_token.clone(), fut);
|
spawn_with_cancel(span, self.cancellation_token.clone(), fut);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn peer_stats(&self) -> [&AggregatePeerStatsAtomic; 2] {
|
||||||
|
[&self.peers.stats, &self.peers.session_stats.peers]
|
||||||
|
}
|
||||||
|
|
||||||
pub fn down_speed_estimator(&self) -> &SpeedEstimator {
|
pub fn down_speed_estimator(&self) -> &SpeedEstimator {
|
||||||
&self.down_speed_estimator
|
&self.down_speed_estimator
|
||||||
}
|
}
|
||||||
|
|
@ -343,7 +357,7 @@ impl TorrentStateLive {
|
||||||
.incoming_connection(
|
.incoming_connection(
|
||||||
Id20::new(checked_peer.handshake.peer_id),
|
Id20::new(checked_peer.handshake.peer_id),
|
||||||
tx.clone(),
|
tx.clone(),
|
||||||
&self.peers.stats,
|
&self.peer_stats(),
|
||||||
)
|
)
|
||||||
.context("peer already existed")?;
|
.context("peer already existed")?;
|
||||||
peer.stats.counters.clone()
|
peer.stats.counters.clone()
|
||||||
|
|
@ -353,7 +367,7 @@ impl TorrentStateLive {
|
||||||
let peer = Peer::new_live_for_incoming_connection(
|
let peer = Peer::new_live_for_incoming_connection(
|
||||||
Id20::new(checked_peer.handshake.peer_id),
|
Id20::new(checked_peer.handshake.peer_id),
|
||||||
tx.clone(),
|
tx.clone(),
|
||||||
&self.peers.stats,
|
&self.peer_stats(),
|
||||||
);
|
);
|
||||||
let counters = peer.stats.counters.clone();
|
let counters = peer.stats.counters.clone();
|
||||||
vac.insert(peer);
|
vac.insert(peer);
|
||||||
|
|
@ -562,7 +576,7 @@ impl TorrentStateLive {
|
||||||
fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) {
|
fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) {
|
||||||
self.peers.with_peer_mut(handle, "set_peer_live", |p| {
|
self.peers.with_peer_mut(handle, "set_peer_live", |p| {
|
||||||
p.state
|
p.state
|
||||||
.connecting_to_live(Id20::new(h.peer_id), &self.peers.stats);
|
.connecting_to_live(Id20::new(h.peer_id), &self.peer_stats());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -750,7 +764,7 @@ impl TorrentStateLive {
|
||||||
for mut pe in self.peers.states.iter_mut() {
|
for mut pe in self.peers.states.iter_mut() {
|
||||||
if let PeerState::Live(l) = pe.value().state.get() {
|
if let PeerState::Live(l) = pe.value().state.get() {
|
||||||
if l.has_full_torrent(self.lengths.total_pieces() as usize) {
|
if l.has_full_torrent(self.lengths.total_pieces() as usize) {
|
||||||
let prev = pe.value_mut().state.set_not_needed(&self.peers.stats);
|
let prev = pe.value_mut().state.set_not_needed(&self.peer_stats());
|
||||||
let _ = prev
|
let _ = prev
|
||||||
.take_live_no_counters()
|
.take_live_no_counters()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
|
@ -763,7 +777,7 @@ impl TorrentStateLive {
|
||||||
|
|
||||||
pub(crate) fn reconnect_all_not_needed_peers(&self) {
|
pub(crate) fn reconnect_all_not_needed_peers(&self) {
|
||||||
for mut pe in self.peers.states.iter_mut() {
|
for mut pe in self.peers.states.iter_mut() {
|
||||||
if pe.state.not_needed_to_queued(&self.peers.stats)
|
if pe.state.not_needed_to_queued(&self.peer_stats())
|
||||||
&& self.peer_queue_tx.send(*pe.key()).is_err()
|
&& self.peer_queue_tx.send(*pe.key()).is_err()
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
|
|
@ -883,6 +897,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
||||||
.stats
|
.stats
|
||||||
.uploaded_bytes
|
.uploaded_bytes
|
||||||
.fetch_add(bytes as u64, Ordering::Relaxed);
|
.fetch_add(bytes as u64, Ordering::Relaxed);
|
||||||
|
self.state
|
||||||
|
.session_stats
|
||||||
|
.uploaded_bytes
|
||||||
|
.fetch_add(bytes as u64, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()> {
|
fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()> {
|
||||||
|
|
@ -925,7 +943,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
||||||
impl PeerHandler {
|
impl PeerHandler {
|
||||||
fn on_peer_died(self, error: Option<anyhow::Error>) -> anyhow::Result<()> {
|
fn on_peer_died(self, error: Option<anyhow::Error>) -> anyhow::Result<()> {
|
||||||
let peers = &self.state.peers;
|
let peers = &self.state.peers;
|
||||||
let pstats = &peers.stats;
|
let pstats = self.state.peer_stats();
|
||||||
let handle = self.addr;
|
let handle = self.addr;
|
||||||
let mut pe = match peers.states.get_mut(&handle) {
|
let mut pe = match peers.states.get_mut(&handle) {
|
||||||
Some(peer) => TimedExistence::new(peer, "on_peer_died"),
|
Some(peer) => TimedExistence::new(peer, "on_peer_died"),
|
||||||
|
|
@ -934,7 +952,7 @@ impl PeerHandler {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let prev = pe.value_mut().state.take(pstats);
|
let prev = pe.value_mut().state.take(&pstats);
|
||||||
|
|
||||||
match prev {
|
match prev {
|
||||||
PeerState::Connecting(_) => {}
|
PeerState::Connecting(_) => {}
|
||||||
|
|
@ -953,7 +971,7 @@ impl PeerHandler {
|
||||||
}
|
}
|
||||||
PeerState::NotNeeded => {
|
PeerState::NotNeeded => {
|
||||||
// Restore it as std::mem::take() replaced it above.
|
// Restore it as std::mem::take() replaced it above.
|
||||||
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
|
pe.value_mut().state.set(PeerState::NotNeeded, &pstats);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
s @ PeerState::Queued | s @ PeerState::Dead => {
|
s @ PeerState::Queued | s @ PeerState::Dead => {
|
||||||
|
|
@ -969,7 +987,7 @@ impl PeerHandler {
|
||||||
Some(e) => e,
|
Some(e) => e,
|
||||||
None => {
|
None => {
|
||||||
trace!("peer died without errors, not re-queueing");
|
trace!("peer died without errors, not re-queueing");
|
||||||
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
|
pe.value_mut().state.set(PeerState::NotNeeded, &pstats);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -978,11 +996,11 @@ impl PeerHandler {
|
||||||
|
|
||||||
if self.state.is_finished_and_no_active_streams() {
|
if self.state.is_finished_and_no_active_streams() {
|
||||||
debug!("torrent finished, not re-queueing");
|
debug!("torrent finished, not re-queueing");
|
||||||
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
|
pe.value_mut().state.set(PeerState::NotNeeded, &pstats);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
pe.value_mut().state.set(PeerState::Dead, pstats);
|
pe.value_mut().state.set(PeerState::Dead, &pstats);
|
||||||
|
|
||||||
let backoff = pe.value_mut().stats.backoff.next_backoff();
|
let backoff = pe.value_mut().stats.backoff.next_backoff();
|
||||||
|
|
||||||
|
|
@ -1006,7 +1024,7 @@ impl PeerHandler {
|
||||||
.with_peer_mut(handle, "dead_to_queued", |peer| {
|
.with_peer_mut(handle, "dead_to_queued", |peer| {
|
||||||
match peer.state.get() {
|
match peer.state.get() {
|
||||||
PeerState::Dead => {
|
PeerState::Dead => {
|
||||||
peer.state.set(PeerState::Queued, &self.state.peers.stats)
|
peer.state.set(PeerState::Queued, &self.state.peer_stats())
|
||||||
}
|
}
|
||||||
other => bail!(
|
other => bail!(
|
||||||
"peer is in unexpected state: {}. Expected dead",
|
"peer is in unexpected state: {}. Expected dead",
|
||||||
|
|
@ -1415,6 +1433,10 @@ impl PeerHandler {
|
||||||
.stats
|
.stats
|
||||||
.fetched_bytes
|
.fetched_bytes
|
||||||
.fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed);
|
.fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed);
|
||||||
|
self.state
|
||||||
|
.session_stats
|
||||||
|
.fetched_bytes
|
||||||
|
.fetch_add(piece.block.len() as u64, Ordering::Relaxed);
|
||||||
|
|
||||||
fn write_to_disk(
|
fn write_to_disk(
|
||||||
state: &TorrentStateLive,
|
state: &TorrentStateLive,
|
||||||
|
|
|
||||||
|
|
@ -26,10 +26,12 @@ impl Peer {
|
||||||
pub fn new_live_for_incoming_connection(
|
pub fn new_live_for_incoming_connection(
|
||||||
peer_id: Id20,
|
peer_id: Id20,
|
||||||
tx: PeerTx,
|
tx: PeerTx,
|
||||||
counters: &AggregatePeerStatsAtomic,
|
counters: &[&AggregatePeerStatsAtomic],
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true)));
|
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true)));
|
||||||
counters.inc(&state.0);
|
for counter in counters {
|
||||||
|
counter.inc(&state.0);
|
||||||
|
}
|
||||||
Self {
|
Self {
|
||||||
state,
|
state,
|
||||||
stats: Default::default(),
|
stats: Default::default(),
|
||||||
|
|
@ -85,12 +87,14 @@ impl PeerStateNoMut {
|
||||||
&self.0
|
&self.0
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn take(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState {
|
pub fn take(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState {
|
||||||
self.set(Default::default(), counters)
|
self.set(Default::default(), counters)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set(&mut self, new: PeerState, counters: &AggregatePeerStatsAtomic) -> PeerState {
|
pub fn set(&mut self, new: PeerState, counters: &[&AggregatePeerStatsAtomic]) -> PeerState {
|
||||||
counters.incdec(&self.0, &new);
|
for counter in counters {
|
||||||
|
counter.incdec(&self.0, &new);
|
||||||
|
}
|
||||||
std::mem::replace(&mut self.0, new)
|
std::mem::replace(&mut self.0, new)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -110,7 +114,7 @@ impl PeerStateNoMut {
|
||||||
|
|
||||||
pub fn idle_to_connecting(
|
pub fn idle_to_connecting(
|
||||||
&mut self,
|
&mut self,
|
||||||
counters: &AggregatePeerStatsAtomic,
|
counters: &[&AggregatePeerStatsAtomic],
|
||||||
) -> Option<(PeerRx, PeerTx)> {
|
) -> Option<(PeerRx, PeerTx)> {
|
||||||
match &self.0 {
|
match &self.0 {
|
||||||
PeerState::Queued | PeerState::NotNeeded => {
|
PeerState::Queued | PeerState::NotNeeded => {
|
||||||
|
|
@ -123,7 +127,7 @@ impl PeerStateNoMut {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn not_needed_to_queued(&mut self, counters: &AggregatePeerStatsAtomic) -> bool {
|
pub fn not_needed_to_queued(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> bool {
|
||||||
if let PeerState::NotNeeded = &self.0 {
|
if let PeerState::NotNeeded = &self.0 {
|
||||||
self.set(PeerState::Queued, counters);
|
self.set(PeerState::Queued, counters);
|
||||||
return true;
|
return true;
|
||||||
|
|
@ -135,7 +139,7 @@ impl PeerStateNoMut {
|
||||||
&mut self,
|
&mut self,
|
||||||
peer_id: Id20,
|
peer_id: Id20,
|
||||||
tx: PeerTx,
|
tx: PeerTx,
|
||||||
counters: &AggregatePeerStatsAtomic,
|
counters: &[&AggregatePeerStatsAtomic],
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) {
|
if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) {
|
||||||
anyhow::bail!("peer already active");
|
anyhow::bail!("peer already active");
|
||||||
|
|
@ -155,7 +159,7 @@ impl PeerStateNoMut {
|
||||||
pub fn connecting_to_live(
|
pub fn connecting_to_live(
|
||||||
&mut self,
|
&mut self,
|
||||||
peer_id: Id20,
|
peer_id: Id20,
|
||||||
counters: &AggregatePeerStatsAtomic,
|
counters: &[&AggregatePeerStatsAtomic],
|
||||||
) -> Option<&mut LivePeerState> {
|
) -> Option<&mut LivePeerState> {
|
||||||
if let PeerState::Connecting(_) = &self.0 {
|
if let PeerState::Connecting(_) = &self.0 {
|
||||||
let tx = match self.take(counters) {
|
let tx = match self.take(counters) {
|
||||||
|
|
@ -172,7 +176,7 @@ impl PeerStateNoMut {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_not_needed(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState {
|
pub fn set_not_needed(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState {
|
||||||
self.set(PeerState::NotNeeded, counters)
|
self.set(PeerState::NotNeeded, counters)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
use std::net::SocketAddr;
|
use std::{net::SocketAddr, sync::Arc};
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use backoff::backoff::Backoff;
|
use backoff::backoff::Backoff;
|
||||||
|
|
@ -8,6 +8,7 @@ use peer_binary_protocol::{Message, Request};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
peer_connection::WriterRequest,
|
peer_connection::WriterRequest,
|
||||||
|
session_stats::atomic::AtomicSessionStats,
|
||||||
torrent_state::utils::{atomic_inc, TimedExistence},
|
torrent_state::utils::{atomic_inc, TimedExistence},
|
||||||
type_aliases::{PeerHandle, BF},
|
type_aliases::{PeerHandle, BF},
|
||||||
};
|
};
|
||||||
|
|
@ -18,8 +19,8 @@ use super::peer::{LivePeerState, Peer, PeerRx, PeerState, PeerTx};
|
||||||
|
|
||||||
pub mod stats;
|
pub mod stats;
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub(crate) struct PeerStates {
|
pub(crate) struct PeerStates {
|
||||||
|
pub session_stats: Arc<AtomicSessionStats>,
|
||||||
pub stats: AggregatePeerStatsAtomic,
|
pub stats: AggregatePeerStatsAtomic,
|
||||||
pub states: DashMap<PeerHandle, Peer>,
|
pub states: DashMap<PeerHandle, Peer>,
|
||||||
}
|
}
|
||||||
|
|
@ -36,7 +37,10 @@ impl PeerStates {
|
||||||
Entry::Vacant(vac) => {
|
Entry::Vacant(vac) => {
|
||||||
vac.insert(Default::default());
|
vac.insert(Default::default());
|
||||||
atomic_inc(&self.stats.queued);
|
atomic_inc(&self.stats.queued);
|
||||||
|
atomic_inc(&self.session_stats.peers.queued);
|
||||||
|
|
||||||
atomic_inc(&self.stats.seen);
|
atomic_inc(&self.stats.seen);
|
||||||
|
atomic_inc(&self.session_stats.peers.queued);
|
||||||
Some(addr)
|
Some(addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -73,7 +77,10 @@ impl PeerStates {
|
||||||
|
|
||||||
pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> {
|
pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> {
|
||||||
let p = self.states.remove(&handle).map(|r| r.1)?;
|
let p = self.states.remove(&handle).map(|r| r.1)?;
|
||||||
self.stats.dec(p.state.get());
|
let s = p.state.get();
|
||||||
|
self.stats.dec(s);
|
||||||
|
self.session_stats.peers.dec(s);
|
||||||
|
|
||||||
Some(p)
|
Some(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -99,7 +106,7 @@ impl PeerStates {
|
||||||
let rx = self
|
let rx = self
|
||||||
.with_peer_mut(h, "mark_peer_connecting", |peer| {
|
.with_peer_mut(h, "mark_peer_connecting", |peer| {
|
||||||
peer.state
|
peer.state
|
||||||
.idle_to_connecting(&self.stats)
|
.idle_to_connecting(&[&self.stats, &self.session_stats.peers])
|
||||||
.context("invalid peer state")
|
.context("invalid peer state")
|
||||||
})
|
})
|
||||||
.context("peer not found in states")??;
|
.context("peer not found in states")??;
|
||||||
|
|
@ -114,7 +121,8 @@ impl PeerStates {
|
||||||
|
|
||||||
pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<PeerState> {
|
pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<PeerState> {
|
||||||
let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| {
|
let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| {
|
||||||
peer.state.set_not_needed(&self.stats)
|
peer.state
|
||||||
|
.set_not_needed(&[&self.stats, &self.session_stats.peers])
|
||||||
})?;
|
})?;
|
||||||
Some(prev)
|
Some(prev)
|
||||||
}
|
}
|
||||||
|
|
@ -132,6 +140,7 @@ impl PeerStates {
|
||||||
atomic_inc(&p.stats.counters.times_stolen_from_me);
|
atomic_inc(&p.stats.counters.times_stolen_from_me);
|
||||||
});
|
});
|
||||||
self.stats.inc_steals();
|
self.stats.inc_steals();
|
||||||
|
self.session_stats.peers.inc_steals();
|
||||||
|
|
||||||
self.with_live_mut(from_peer, "send_cancellations", |live| {
|
self.with_live_mut(from_peer, "send_cancellations", |live| {
|
||||||
let to_remove = live
|
let to_remove = live
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ use crate::bitv_factory::BitVFactory;
|
||||||
use crate::chunk_tracker::ChunkTracker;
|
use crate::chunk_tracker::ChunkTracker;
|
||||||
use crate::file_info::FileInfo;
|
use crate::file_info::FileInfo;
|
||||||
use crate::session::TorrentId;
|
use crate::session::TorrentId;
|
||||||
|
use crate::session_stats::atomic::AtomicSessionStats;
|
||||||
use crate::spawn_utils::BlockingSpawner;
|
use crate::spawn_utils::BlockingSpawner;
|
||||||
use crate::storage::BoxStorageFactory;
|
use crate::storage::BoxStorageFactory;
|
||||||
use crate::stream_connect::StreamConnector;
|
use crate::stream_connect::StreamConnector;
|
||||||
|
|
@ -211,6 +212,7 @@ impl ManagedTorrent {
|
||||||
live_cancellation_token: CancellationToken,
|
live_cancellation_token: CancellationToken,
|
||||||
init_semaphore: Arc<tokio::sync::Semaphore>,
|
init_semaphore: Arc<tokio::sync::Semaphore>,
|
||||||
bitv_factory: Arc<dyn BitVFactory>,
|
bitv_factory: Arc<dyn BitVFactory>,
|
||||||
|
session_stats: Arc<AtomicSessionStats>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut g = self.locked.write();
|
let mut g = self.locked.write();
|
||||||
|
|
||||||
|
|
@ -319,8 +321,12 @@ impl ManagedTorrent {
|
||||||
}
|
}
|
||||||
|
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
let live =
|
let live = TorrentStateLive::new(
|
||||||
TorrentStateLive::new(paused, tx, live_cancellation_token)?;
|
paused,
|
||||||
|
tx,
|
||||||
|
live_cancellation_token,
|
||||||
|
session_stats,
|
||||||
|
)?;
|
||||||
g.state = ManagedTorrentState::Live(live.clone());
|
g.state = ManagedTorrentState::Live(live.clone());
|
||||||
drop(g);
|
drop(g);
|
||||||
|
|
||||||
|
|
@ -345,7 +351,12 @@ impl ManagedTorrent {
|
||||||
ManagedTorrentState::Paused(_) => {
|
ManagedTorrentState::Paused(_) => {
|
||||||
let paused = g.state.take().assert_paused();
|
let paused = g.state.take().assert_paused();
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone())?;
|
let live = TorrentStateLive::new(
|
||||||
|
paused,
|
||||||
|
tx,
|
||||||
|
live_cancellation_token.clone(),
|
||||||
|
session_stats,
|
||||||
|
)?;
|
||||||
g.state = ManagedTorrentState::Live(live.clone());
|
g.state = ManagedTorrentState::Live(live.clone());
|
||||||
drop(g);
|
drop(g);
|
||||||
|
|
||||||
|
|
@ -371,6 +382,7 @@ impl ManagedTorrent {
|
||||||
live_cancellation_token,
|
live_cancellation_token,
|
||||||
init_semaphore,
|
init_semaphore,
|
||||||
bitv_factory,
|
bitv_factory,
|
||||||
|
session_stats,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
|
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue