Merge pull request #204 from ikatson/session_stats

[Feature] session stats
This commit is contained in:
Igor Katson 2024-08-21 13:28:30 +01:00 committed by GitHub
commit 715149db4d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 396 additions and 72 deletions

View file

@ -20,7 +20,7 @@ devserver:
echo -n '' > /tmp/rqbit-log && cargo run -- \ echo -n '' > /tmp/rqbit-log && cargo run -- \
--log-file /tmp/rqbit-log \ --log-file /tmp/rqbit-log \
--log-file-rust-log=debug,librqbit=trace \ --log-file-rust-log=debug,librqbit=trace \
server start --fastresume /tmp/scratch/ server start /tmp/scratch/
@PHONY: devserver @PHONY: devserver
devserver-postgres: devserver-postgres:

View file

@ -14,6 +14,7 @@ use crate::{
session::{ session::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId, AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId,
}, },
session_stats::snapshot::SessionStatsSnapshot,
torrent_state::{ torrent_state::{
peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}, peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot},
FileStream, ManagedTorrentHandle, FileStream, ManagedTorrentHandle,
@ -68,6 +69,14 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash {
struct V<'de> { struct V<'de> {
p: PhantomData<&'de ()>, p: PhantomData<&'de ()>,
} }
macro_rules! visit_int {
($v:expr) => {{
let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:?}")))?;
Ok(TorrentIdOrHash::from(tid))
}};
}
impl<'de> serde::de::Visitor<'de> for V<'de> { impl<'de> serde::de::Visitor<'de> for V<'de> {
type Value = TorrentIdOrHash; type Value = TorrentIdOrHash;
@ -75,16 +84,47 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash {
f.write_str("integer or 40 byte info hash") f.write_str("integer or 40 byte info hash")
} }
fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
visit_int!(v)
}
fn visit_i128<E>(self, v: i128) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
visit_int!(v)
}
fn visit_u128<E>(self, v: u128) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
visit_int!(v)
}
fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
visit_int!(v)
}
fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E> fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
where where
E: serde::de::Error, E: serde::de::Error,
{ {
TorrentIdOrHash::parse(v) TorrentIdOrHash::parse(v).map_err(|e| {
.map_err(|_| E::custom("expected integer or 40 byte info hash")) E::custom(format!(
"expected integer or 40 byte info hash, couldn't parse string: {e:?}"
))
})
} }
} }
deserializer.deserialize_str(V::default()) deserializer.deserialize_any(V::default())
} }
} }
@ -171,6 +211,10 @@ impl Api {
make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref()) make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref())
} }
pub fn api_session_stats(&self) -> SessionStatsSnapshot {
self.session().stats_snapshot()
}
pub fn torrent_file_mime_type( pub fn torrent_file_mime_type(
&self, &self,
idx: TorrentIdOrHash, idx: TorrentIdOrHash,

View file

@ -79,6 +79,7 @@ fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BS) -> anyhow::Res
lengths.total_pieces() lengths.total_pieces()
); );
} }
let required_size = lengths.chunk_bitfield_bytes(); let required_size = lengths.chunk_bitfield_bytes();
let vec = vec![0u8; required_size]; let vec = vec![0u8; required_size];
let mut chunk_bf = BF::from_boxed_slice(vec.into_boxed_slice()); let mut chunk_bf = BF::from_boxed_slice(vec.into_boxed_slice());

View file

@ -62,10 +62,15 @@ impl HttpApi {
"GET /dht/stats": "DHT stats", "GET /dht/stats": "DHT stats",
"GET /dht/table": "DHT routing table", "GET /dht/table": "DHT routing table",
"GET /torrents": "List torrents", "GET /torrents": "List torrents",
"GET /torrents/playlist": "Generate M3U8 playlist for all files in all torrents",
"GET /stats": "Global session stats",
"POST /torrents/resolve_magnet": "Resolve a magnet to torrent file bytes",
"GET /torrents/{id_or_infohash}": "Torrent details", "GET /torrents/{id_or_infohash}": "Torrent details",
"GET /torrents/{id_or_infohash}/haves": "The bitfield of have pieces", "GET /torrents/{id_or_infohash}/haves": "The bitfield of have pieces",
"GET /torrents/{id_or_infohash}/playlist": "Generate M3U8 playlist for this torrent",
"GET /torrents/{id_or_infohash}/stats/v1": "Torrent stats", "GET /torrents/{id_or_infohash}/stats/v1": "Torrent stats",
"GET /torrents/{id_or_infohash}/peer_stats": "Per peer stats", "GET /torrents/{id_or_infohash}/peer_stats": "Per peer stats",
"GET /torrents/{id_or_infohash}/stream/{file_idx}": "Stream a file. Accepts Range header to seek.",
"POST /torrents/{id_or_infohash}/pause": "Pause torrent", "POST /torrents/{id_or_infohash}/pause": "Pause torrent",
"POST /torrents/{id_or_infohash}/start": "Resume torrent", "POST /torrents/{id_or_infohash}/start": "Resume torrent",
"POST /torrents/{id_or_infohash}/forget": "Forget about the torrent, keep the files", "POST /torrents/{id_or_infohash}/forget": "Forget about the torrent, keep the files",
@ -88,6 +93,10 @@ impl HttpApi {
state.api_dht_table().map(axum::Json) state.api_dht_table().map(axum::Json)
} }
async fn session_stats(State(state): State<ApiState>) -> impl IntoResponse {
axum::Json(state.api_session_stats())
}
async fn torrents_list(State(state): State<ApiState>) -> impl IntoResponse { async fn torrents_list(State(state): State<ApiState>) -> impl IntoResponse {
axum::Json(state.api_torrent_list()) axum::Json(state.api_torrent_list())
} }
@ -446,6 +455,7 @@ impl HttpApi {
.route("/rust_log", post(set_rust_log)) .route("/rust_log", post(set_rust_log))
.route("/dht/stats", get(dht_stats)) .route("/dht/stats", get(dht_stats))
.route("/dht/table", get(dht_table)) .route("/dht/table", get(dht_table))
.route("/stats", get(session_stats))
.route("/torrents", get(torrents_list)) .route("/torrents", get(torrents_list))
.route("/torrents/:id", get(torrent_details)) .route("/torrents/:id", get(torrent_details))
.route("/torrents/:id/haves", get(torrent_haves)) .route("/torrents/:id/haves", get(torrent_haves))

View file

@ -57,6 +57,7 @@ mod peer_info_reader;
mod read_buf; mod read_buf;
mod session; mod session;
mod session_persistence; mod session_persistence;
pub mod session_stats;
mod spawn_utils; mod spawn_utils;
pub mod storage; pub mod storage;
mod stream_connect; mod stream_connect;

View file

@ -16,6 +16,7 @@ use crate::{
peer_connection::PeerConnectionOptions, peer_connection::PeerConnectionOptions,
read_buf::ReadBuf, read_buf::ReadBuf,
session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore}, session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore},
session_stats::SessionStats,
spawn_utils::BlockingSpawner, spawn_utils::BlockingSpawner,
storage::{ storage::{
filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage, filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage,
@ -116,6 +117,8 @@ pub struct Session {
root_span: Option<Span>, root_span: Option<Span>,
pub(crate) stats: SessionStats,
// This is stored for all tasks to stop when session is dropped. // This is stored for all tasks to stop when session is dropped.
_cancellation_token_drop_guard: DropGuard, _cancellation_token_drop_guard: DropGuard,
} }
@ -509,8 +512,10 @@ impl Session {
async fn persistence_factory( async fn persistence_factory(
opts: &SessionOptions, opts: &SessionOptions,
) -> anyhow::Result<(Option<Arc<dyn SessionPersistenceStore>>, Arc<dyn BitVFactory>)> { ) -> anyhow::Result<(
Option<Arc<dyn SessionPersistenceStore>>,
Arc<dyn BitVFactory>,
)> {
macro_rules! make_result { macro_rules! make_result {
($store:expr) => { ($store:expr) => {
if opts.fastresume { if opts.fastresume {
@ -535,7 +540,7 @@ impl Session {
); );
make_result!(s) make_result!(s)
}, }
#[cfg(feature = "postgres")] #[cfg(feature = "postgres")]
Some(SessionPersistenceConfig::Postgres { connection_string }) => { Some(SessionPersistenceConfig::Postgres { connection_string }) => {
use crate::session_persistence::postgres::PostgresSessionStorage; use crate::session_persistence::postgres::PostgresSessionStorage;
@ -602,17 +607,23 @@ impl Session {
reqwest_client, reqwest_client,
connector: stream_connector, connector: stream_connector,
root_span: opts.root_span, root_span: opts.root_span,
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3))) stats: SessionStats::new(),
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(
opts.concurrent_init_limit.unwrap_or(3),
)),
}); });
if let Some(mut disk_write_rx) = disk_write_rx { if let Some(mut disk_write_rx) = disk_write_rx {
session.spawn(error_span!(parent: session.rs(), "disk_writer"), async move { session.spawn(
while let Some(work) = disk_write_rx.recv().await { error_span!(parent: session.rs(), "disk_writer"),
trace!(disk_write_rx_queue_len = disk_write_rx.len()); async move {
spawner.spawn_block_in_place(work); while let Some(work) = disk_write_rx.recv().await {
} trace!(disk_write_rx_queue_len = disk_write_rx.len());
Ok(()) spawner.spawn_block_in_place(work);
}); }
Ok(())
},
);
} }
if let Some(tcp_listener) = tcp_listener { if let Some(tcp_listener) = tcp_listener {
@ -639,28 +650,36 @@ impl Session {
let mut futs = FuturesUnordered::new(); let mut futs = FuturesUnordered::new();
while !added_all || !futs.is_empty() { while !added_all || !futs.is_empty() {
// NOTE: this closure exists purely to workaround rustfmt screwing up when inlining it.
let add_torrent_span = |info_hash: &Id20| -> tracing::Span {
error_span!(parent: session.rs(), "add_torrent", info_hash=?info_hash)
};
tokio::select! { tokio::select! {
Some(res) = futs.next(), if !futs.is_empty() => { Some(res) = futs.next(), if !futs.is_empty() => {
if let Err(e) = res { if let Err(e) = res {
error!("error adding torrent to session: {e:?}"); error!("error adding torrent to session: {e:?}");
} }
}, }
st = ps.next(), if !added_all => { st = ps.next(), if !added_all => {
if let Some(st) = st { match st {
let (id, st) = st?; Some(st) => {
let span = error_span!(parent: session.rs(), "add_torrent", info_hash=?st.info_hash()); let (id, st) = st?;
let (add_torrent, mut opts) = st.into_add_torrent()?; let span = add_torrent_span(st.info_hash());
opts.preferred_id = Some(id); let (add_torrent, mut opts) = st.into_add_torrent()?;
let fut = session.add_torrent(add_torrent, Some(opts)).instrument(span); opts.preferred_id = Some(id);
futs.push(fut); let fut = session.add_torrent(add_torrent, Some(opts));
} else { let fut = fut.instrument(span);
added_all = true; futs.push(fut);
} },
}, None => added_all = true
} };
}
};
} }
} }
session.start_speed_estimator_updater();
Ok(session) Ok(session)
} }
.boxed() .boxed()
@ -785,7 +804,7 @@ impl Session {
spawn_with_cancel(span, self.cancellation_token.clone(), fut); spawn_with_cancel(span, self.cancellation_token.clone(), fut);
} }
fn rs(&self) -> Option<tracing::Id> { pub(crate) fn rs(&self) -> Option<tracing::Id> {
self.root_span.as_ref().and_then(|s| s.id()) self.root_span.as_ref().and_then(|s| s.id())
} }
@ -1147,6 +1166,7 @@ impl Session {
self.cancellation_token.child_token(), self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(), self.concurrent_initialize_semaphore.clone(),
self.bitv_factory.clone(), self.bitv_factory.clone(),
self.stats.atomic.clone(),
) )
.context("error starting torrent")?; .context("error starting torrent")?;
} }
@ -1303,6 +1323,7 @@ impl Session {
self.cancellation_token.child_token(), self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(), self.concurrent_initialize_semaphore.clone(),
self.bitv_factory.clone(), self.bitv_factory.clone(),
self.stats.atomic.clone(),
)?; )?;
self.try_update_persistence_metadata(handle).await; self.try_update_persistence_metadata(handle).await;
Ok(()) Ok(())

View file

@ -0,0 +1,10 @@
use std::sync::atomic::AtomicU64;
use crate::torrent_state::live::peers::stats::atomic::AggregatePeerStatsAtomic;
#[derive(Default, Debug)]
pub struct AtomicSessionStats {
pub fetched_bytes: AtomicU64,
pub uploaded_bytes: AtomicU64,
pub(crate) peers: AggregatePeerStatsAtomic,
}

View file

@ -0,0 +1,62 @@
use std::{
sync::{atomic::Ordering, Arc},
time::{Duration, Instant},
};
use atomic::AtomicSessionStats;
use librqbit_core::speed_estimator::SpeedEstimator;
use snapshot::SessionStatsSnapshot;
use tracing::error_span;
use crate::Session;
pub mod atomic;
pub mod snapshot;
pub struct SessionStats {
pub atomic: Arc<AtomicSessionStats>,
pub down_speed_estimator: SpeedEstimator,
pub up_speed_estimator: SpeedEstimator,
}
impl SessionStats {
pub fn new() -> Self {
SessionStats {
atomic: Default::default(),
down_speed_estimator: SpeedEstimator::new(5),
up_speed_estimator: SpeedEstimator::new(5),
}
}
}
impl Default for SessionStats {
fn default() -> Self {
Self::new()
}
}
impl Session {
pub(crate) fn start_speed_estimator_updater(self: &Arc<Self>) {
self.spawn(error_span!(parent: self.rs(), "speed_estimator"), {
let s = self.clone();
async move {
let mut i = tokio::time::interval(Duration::from_secs(1));
loop {
i.tick().await;
let now = Instant::now();
let fetched = s.stats.atomic.fetched_bytes.load(Ordering::Relaxed);
let uploaded = s.stats.atomic.uploaded_bytes.load(Ordering::Relaxed);
s.stats
.down_speed_estimator
.add_snapshot(fetched, None, now);
s.stats.up_speed_estimator.add_snapshot(uploaded, None, now);
}
}
})
}
pub fn stats_snapshot(&self) -> SessionStatsSnapshot {
SessionStatsSnapshot::from(&self.stats)
}
}

View file

@ -0,0 +1,22 @@
use serde::Serialize;
use crate::torrent_state::{peers::stats::snapshot::AggregatePeerStats, stats::Speed};
use super::SessionStats;
#[derive(Debug, Serialize)]
pub struct SessionStatsSnapshot {
download_speed: Speed,
upload_speed: Speed,
peers: AggregatePeerStats,
}
impl From<&SessionStats> for SessionStatsSnapshot {
fn from(s: &SessionStats) -> Self {
Self {
download_speed: s.down_speed_estimator.mbps().into(),
upload_speed: s.up_speed_estimator.mbps().into(),
peers: AggregatePeerStats::from(&s.atomic.peers),
}
}
}

View file

@ -70,6 +70,7 @@ use peer_binary_protocol::{
extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage}, extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage},
Handshake, Message, MessageOwned, Piece, Request, Handshake, Message, MessageOwned, Piece, Request,
}; };
use peers::stats::atomic::AggregatePeerStatsAtomic;
use tokio::sync::{ use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Notify, OwnedSemaphorePermit, Semaphore, Notify, OwnedSemaphorePermit, Semaphore,
@ -84,6 +85,7 @@ use crate::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
}, },
session::CheckedIncomingConnection, session::CheckedIncomingConnection,
session_stats::atomic::AtomicSessionStats,
torrent_state::{peer::Peer, utils::atomic_inc}, torrent_state::{peer::Peer, utils::atomic_inc},
type_aliases::{DiskWorkQueueSender, FilePriorities, FileStorage, PeerHandle, BF}, type_aliases::{DiskWorkQueueSender, FilePriorities, FileStorage, PeerHandle, BF},
}; };
@ -201,6 +203,8 @@ pub struct TorrentStateLive {
up_speed_estimator: SpeedEstimator, up_speed_estimator: SpeedEstimator,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
session_stats: Arc<AtomicSessionStats>,
pub(crate) streams: Arc<TorrentStreams>, pub(crate) streams: Arc<TorrentStreams>,
have_broadcast_tx: tokio::sync::broadcast::Sender<ValidPieceIndex>, have_broadcast_tx: tokio::sync::broadcast::Sender<ValidPieceIndex>,
} }
@ -210,6 +214,7 @@ impl TorrentStateLive {
paused: TorrentStatePaused, paused: TorrentStatePaused,
fatal_errors_tx: tokio::sync::oneshot::Sender<anyhow::Error>, fatal_errors_tx: tokio::sync::oneshot::Sender<anyhow::Error>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
session_stats: Arc<AtomicSessionStats>,
) -> anyhow::Result<Arc<Self>> { ) -> anyhow::Result<Arc<Self>> {
let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
@ -237,7 +242,11 @@ impl TorrentStateLive {
let state = Arc::new(TorrentStateLive { let state = Arc::new(TorrentStateLive {
meta: paused.info.clone(), meta: paused.info.clone(),
peers: Default::default(), peers: PeerStates {
session_stats: session_stats.clone(),
stats: Default::default(),
states: Default::default(),
},
locked: RwLock::new(TorrentStateLocked { locked: RwLock::new(TorrentStateLocked {
chunks: Some(paused.chunk_tracker), chunks: Some(paused.chunk_tracker),
// TODO: move under per_piece_locks? // TODO: move under per_piece_locks?
@ -260,6 +269,7 @@ impl TorrentStateLive {
up_speed_estimator, up_speed_estimator,
cancellation_token, cancellation_token,
have_broadcast_tx, have_broadcast_tx,
session_stats,
streams: paused.streams, streams: paused.streams,
per_piece_locks: (0..lengths.total_pieces()) per_piece_locks: (0..lengths.total_pieces())
.map(|_| RwLock::new(())) .map(|_| RwLock::new(()))
@ -307,6 +317,10 @@ impl TorrentStateLive {
spawn_with_cancel(span, self.cancellation_token.clone(), fut); spawn_with_cancel(span, self.cancellation_token.clone(), fut);
} }
fn peer_stats(&self) -> [&AggregatePeerStatsAtomic; 2] {
[&self.peers.stats, &self.peers.session_stats.peers]
}
pub fn down_speed_estimator(&self) -> &SpeedEstimator { pub fn down_speed_estimator(&self) -> &SpeedEstimator {
&self.down_speed_estimator &self.down_speed_estimator
} }
@ -343,7 +357,7 @@ impl TorrentStateLive {
.incoming_connection( .incoming_connection(
Id20::new(checked_peer.handshake.peer_id), Id20::new(checked_peer.handshake.peer_id),
tx.clone(), tx.clone(),
&self.peers.stats, &self.peer_stats(),
) )
.context("peer already existed")?; .context("peer already existed")?;
peer.stats.counters.clone() peer.stats.counters.clone()
@ -353,7 +367,7 @@ impl TorrentStateLive {
let peer = Peer::new_live_for_incoming_connection( let peer = Peer::new_live_for_incoming_connection(
Id20::new(checked_peer.handshake.peer_id), Id20::new(checked_peer.handshake.peer_id),
tx.clone(), tx.clone(),
&self.peers.stats, &self.peer_stats(),
); );
let counters = peer.stats.counters.clone(); let counters = peer.stats.counters.clone();
vac.insert(peer); vac.insert(peer);
@ -562,7 +576,7 @@ impl TorrentStateLive {
fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) { fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) {
self.peers.with_peer_mut(handle, "set_peer_live", |p| { self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.state p.state
.connecting_to_live(Id20::new(h.peer_id), &self.peers.stats); .connecting_to_live(Id20::new(h.peer_id), &self.peer_stats());
}); });
} }
@ -750,7 +764,7 @@ impl TorrentStateLive {
for mut pe in self.peers.states.iter_mut() { for mut pe in self.peers.states.iter_mut() {
if let PeerState::Live(l) = pe.value().state.get() { if let PeerState::Live(l) = pe.value().state.get() {
if l.has_full_torrent(self.lengths.total_pieces() as usize) { if l.has_full_torrent(self.lengths.total_pieces() as usize) {
let prev = pe.value_mut().state.set_not_needed(&self.peers.stats); let prev = pe.value_mut().state.set_not_needed(&self.peer_stats());
let _ = prev let _ = prev
.take_live_no_counters() .take_live_no_counters()
.unwrap() .unwrap()
@ -763,7 +777,7 @@ impl TorrentStateLive {
pub(crate) fn reconnect_all_not_needed_peers(&self) { pub(crate) fn reconnect_all_not_needed_peers(&self) {
for mut pe in self.peers.states.iter_mut() { for mut pe in self.peers.states.iter_mut() {
if pe.state.not_needed_to_queued(&self.peers.stats) if pe.state.not_needed_to_queued(&self.peer_stats())
&& self.peer_queue_tx.send(*pe.key()).is_err() && self.peer_queue_tx.send(*pe.key()).is_err()
{ {
return; return;
@ -883,6 +897,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
.stats .stats
.uploaded_bytes .uploaded_bytes
.fetch_add(bytes as u64, Ordering::Relaxed); .fetch_add(bytes as u64, Ordering::Relaxed);
self.state
.session_stats
.uploaded_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
} }
fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()> { fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()> {
@ -925,7 +943,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
impl PeerHandler { impl PeerHandler {
fn on_peer_died(self, error: Option<anyhow::Error>) -> anyhow::Result<()> { fn on_peer_died(self, error: Option<anyhow::Error>) -> anyhow::Result<()> {
let peers = &self.state.peers; let peers = &self.state.peers;
let pstats = &peers.stats; let pstats = self.state.peer_stats();
let handle = self.addr; let handle = self.addr;
let mut pe = match peers.states.get_mut(&handle) { let mut pe = match peers.states.get_mut(&handle) {
Some(peer) => TimedExistence::new(peer, "on_peer_died"), Some(peer) => TimedExistence::new(peer, "on_peer_died"),
@ -934,7 +952,7 @@ impl PeerHandler {
return Ok(()); return Ok(());
} }
}; };
let prev = pe.value_mut().state.take(pstats); let prev = pe.value_mut().state.take(&pstats);
match prev { match prev {
PeerState::Connecting(_) => {} PeerState::Connecting(_) => {}
@ -953,7 +971,7 @@ impl PeerHandler {
} }
PeerState::NotNeeded => { PeerState::NotNeeded => {
// Restore it as std::mem::take() replaced it above. // Restore it as std::mem::take() replaced it above.
pe.value_mut().state.set(PeerState::NotNeeded, pstats); pe.value_mut().state.set(PeerState::NotNeeded, &pstats);
return Ok(()); return Ok(());
} }
s @ PeerState::Queued | s @ PeerState::Dead => { s @ PeerState::Queued | s @ PeerState::Dead => {
@ -969,7 +987,7 @@ impl PeerHandler {
Some(e) => e, Some(e) => e,
None => { None => {
trace!("peer died without errors, not re-queueing"); trace!("peer died without errors, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats); pe.value_mut().state.set(PeerState::NotNeeded, &pstats);
return Ok(()); return Ok(());
} }
}; };
@ -978,11 +996,11 @@ impl PeerHandler {
if self.state.is_finished_and_no_active_streams() { if self.state.is_finished_and_no_active_streams() {
debug!("torrent finished, not re-queueing"); debug!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats); pe.value_mut().state.set(PeerState::NotNeeded, &pstats);
return Ok(()); return Ok(());
} }
pe.value_mut().state.set(PeerState::Dead, pstats); pe.value_mut().state.set(PeerState::Dead, &pstats);
let backoff = pe.value_mut().stats.backoff.next_backoff(); let backoff = pe.value_mut().stats.backoff.next_backoff();
@ -1006,7 +1024,7 @@ impl PeerHandler {
.with_peer_mut(handle, "dead_to_queued", |peer| { .with_peer_mut(handle, "dead_to_queued", |peer| {
match peer.state.get() { match peer.state.get() {
PeerState::Dead => { PeerState::Dead => {
peer.state.set(PeerState::Queued, &self.state.peers.stats) peer.state.set(PeerState::Queued, &self.state.peer_stats())
} }
other => bail!( other => bail!(
"peer is in unexpected state: {}. Expected dead", "peer is in unexpected state: {}. Expected dead",
@ -1415,6 +1433,10 @@ impl PeerHandler {
.stats .stats
.fetched_bytes .fetched_bytes
.fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed); .fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed);
self.state
.session_stats
.fetched_bytes
.fetch_add(piece.block.len() as u64, Ordering::Relaxed);
fn write_to_disk( fn write_to_disk(
state: &TorrentStateLive, state: &TorrentStateLive,

View file

@ -26,10 +26,12 @@ impl Peer {
pub fn new_live_for_incoming_connection( pub fn new_live_for_incoming_connection(
peer_id: Id20, peer_id: Id20,
tx: PeerTx, tx: PeerTx,
counters: &AggregatePeerStatsAtomic, counters: &[&AggregatePeerStatsAtomic],
) -> Self { ) -> Self {
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true))); let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true)));
counters.inc(&state.0); for counter in counters {
counter.inc(&state.0);
}
Self { Self {
state, state,
stats: Default::default(), stats: Default::default(),
@ -85,12 +87,20 @@ impl PeerStateNoMut {
&self.0 &self.0
} }
pub fn take(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { pub fn take(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState {
self.set(Default::default(), counters) self.set(Default::default(), counters)
} }
pub fn set(&mut self, new: PeerState, counters: &AggregatePeerStatsAtomic) -> PeerState { pub fn destroy(self, counters: &[&AggregatePeerStatsAtomic]) {
counters.incdec(&self.0, &new); for counter in counters {
counter.dec(&self.0);
}
}
pub fn set(&mut self, new: PeerState, counters: &[&AggregatePeerStatsAtomic]) -> PeerState {
for counter in counters {
counter.incdec(&self.0, &new);
}
std::mem::replace(&mut self.0, new) std::mem::replace(&mut self.0, new)
} }
@ -110,7 +120,7 @@ impl PeerStateNoMut {
pub fn idle_to_connecting( pub fn idle_to_connecting(
&mut self, &mut self,
counters: &AggregatePeerStatsAtomic, counters: &[&AggregatePeerStatsAtomic],
) -> Option<(PeerRx, PeerTx)> { ) -> Option<(PeerRx, PeerTx)> {
match &self.0 { match &self.0 {
PeerState::Queued | PeerState::NotNeeded => { PeerState::Queued | PeerState::NotNeeded => {
@ -123,7 +133,7 @@ impl PeerStateNoMut {
} }
} }
pub fn not_needed_to_queued(&mut self, counters: &AggregatePeerStatsAtomic) -> bool { pub fn not_needed_to_queued(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> bool {
if let PeerState::NotNeeded = &self.0 { if let PeerState::NotNeeded = &self.0 {
self.set(PeerState::Queued, counters); self.set(PeerState::Queued, counters);
return true; return true;
@ -135,7 +145,7 @@ impl PeerStateNoMut {
&mut self, &mut self,
peer_id: Id20, peer_id: Id20,
tx: PeerTx, tx: PeerTx,
counters: &AggregatePeerStatsAtomic, counters: &[&AggregatePeerStatsAtomic],
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) {
anyhow::bail!("peer already active"); anyhow::bail!("peer already active");
@ -155,7 +165,7 @@ impl PeerStateNoMut {
pub fn connecting_to_live( pub fn connecting_to_live(
&mut self, &mut self,
peer_id: Id20, peer_id: Id20,
counters: &AggregatePeerStatsAtomic, counters: &[&AggregatePeerStatsAtomic],
) -> Option<&mut LivePeerState> { ) -> Option<&mut LivePeerState> {
if let PeerState::Connecting(_) = &self.0 { if let PeerState::Connecting(_) = &self.0 {
let tx = match self.take(counters) { let tx = match self.take(counters) {
@ -172,7 +182,7 @@ impl PeerStateNoMut {
} }
} }
pub fn set_not_needed(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { pub fn set_not_needed(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState {
self.set(PeerState::NotNeeded, counters) self.set(PeerState::NotNeeded, counters)
} }
} }

View file

@ -1,4 +1,4 @@
use std::net::SocketAddr; use std::{net::SocketAddr, sync::Arc};
use anyhow::Context; use anyhow::Context;
use backoff::backoff::Backoff; use backoff::backoff::Backoff;
@ -8,6 +8,7 @@ use peer_binary_protocol::{Message, Request};
use crate::{ use crate::{
peer_connection::WriterRequest, peer_connection::WriterRequest,
session_stats::atomic::AtomicSessionStats,
torrent_state::utils::{atomic_inc, TimedExistence}, torrent_state::utils::{atomic_inc, TimedExistence},
type_aliases::{PeerHandle, BF}, type_aliases::{PeerHandle, BF},
}; };
@ -18,12 +19,20 @@ use super::peer::{LivePeerState, Peer, PeerRx, PeerState, PeerTx};
pub mod stats; pub mod stats;
#[derive(Default)]
pub(crate) struct PeerStates { pub(crate) struct PeerStates {
pub session_stats: Arc<AtomicSessionStats>,
pub stats: AggregatePeerStatsAtomic, pub stats: AggregatePeerStatsAtomic,
pub states: DashMap<PeerHandle, Peer>, pub states: DashMap<PeerHandle, Peer>,
} }
impl Drop for PeerStates {
fn drop(&mut self) {
for (_, p) in std::mem::take(&mut self.states).into_iter() {
p.state.destroy(&[&self.session_stats.peers]);
}
}
}
impl PeerStates { impl PeerStates {
pub fn stats(&self) -> AggregatePeerStats { pub fn stats(&self) -> AggregatePeerStats {
AggregatePeerStats::from(&self.stats) AggregatePeerStats::from(&self.stats)
@ -36,7 +45,10 @@ impl PeerStates {
Entry::Vacant(vac) => { Entry::Vacant(vac) => {
vac.insert(Default::default()); vac.insert(Default::default());
atomic_inc(&self.stats.queued); atomic_inc(&self.stats.queued);
atomic_inc(&self.session_stats.peers.queued);
atomic_inc(&self.stats.seen); atomic_inc(&self.stats.seen);
atomic_inc(&self.session_stats.peers.seen);
Some(addr) Some(addr)
} }
} }
@ -73,7 +85,10 @@ impl PeerStates {
pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> { pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> {
let p = self.states.remove(&handle).map(|r| r.1)?; let p = self.states.remove(&handle).map(|r| r.1)?;
self.stats.dec(p.state.get()); let s = p.state.get();
self.stats.dec(s);
self.session_stats.peers.dec(s);
Some(p) Some(p)
} }
@ -99,7 +114,7 @@ impl PeerStates {
let rx = self let rx = self
.with_peer_mut(h, "mark_peer_connecting", |peer| { .with_peer_mut(h, "mark_peer_connecting", |peer| {
peer.state peer.state
.idle_to_connecting(&self.stats) .idle_to_connecting(&[&self.stats, &self.session_stats.peers])
.context("invalid peer state") .context("invalid peer state")
}) })
.context("peer not found in states")??; .context("peer not found in states")??;
@ -114,7 +129,8 @@ impl PeerStates {
pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<PeerState> { pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<PeerState> {
let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| {
peer.state.set_not_needed(&self.stats) peer.state
.set_not_needed(&[&self.stats, &self.session_stats.peers])
})?; })?;
Some(prev) Some(prev)
} }
@ -132,6 +148,7 @@ impl PeerStates {
atomic_inc(&p.stats.counters.times_stolen_from_me); atomic_inc(&p.stats.counters.times_stolen_from_me);
}); });
self.stats.inc_steals(); self.stats.inc_steals();
self.session_stats.peers.inc_steals();
self.with_live_mut(from_peer, "send_cancellations", |live| { self.with_live_mut(from_peer, "send_cancellations", |live| {
let to_remove = live let to_remove = live

View file

@ -38,6 +38,7 @@ use crate::bitv_factory::BitVFactory;
use crate::chunk_tracker::ChunkTracker; use crate::chunk_tracker::ChunkTracker;
use crate::file_info::FileInfo; use crate::file_info::FileInfo;
use crate::session::TorrentId; use crate::session::TorrentId;
use crate::session_stats::atomic::AtomicSessionStats;
use crate::spawn_utils::BlockingSpawner; use crate::spawn_utils::BlockingSpawner;
use crate::storage::BoxStorageFactory; use crate::storage::BoxStorageFactory;
use crate::stream_connect::StreamConnector; use crate::stream_connect::StreamConnector;
@ -211,6 +212,7 @@ impl ManagedTorrent {
live_cancellation_token: CancellationToken, live_cancellation_token: CancellationToken,
init_semaphore: Arc<tokio::sync::Semaphore>, init_semaphore: Arc<tokio::sync::Semaphore>,
bitv_factory: Arc<dyn BitVFactory>, bitv_factory: Arc<dyn BitVFactory>,
session_stats: Arc<AtomicSessionStats>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut g = self.locked.write(); let mut g = self.locked.write();
@ -319,8 +321,12 @@ impl ManagedTorrent {
} }
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
let live = let live = TorrentStateLive::new(
TorrentStateLive::new(paused, tx, live_cancellation_token)?; paused,
tx,
live_cancellation_token,
session_stats,
)?;
g.state = ManagedTorrentState::Live(live.clone()); g.state = ManagedTorrentState::Live(live.clone());
drop(g); drop(g);
@ -345,7 +351,12 @@ impl ManagedTorrent {
ManagedTorrentState::Paused(_) => { ManagedTorrentState::Paused(_) => {
let paused = g.state.take().assert_paused(); let paused = g.state.take().assert_paused();
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone())?; let live = TorrentStateLive::new(
paused,
tx,
live_cancellation_token.clone(),
session_stats,
)?;
g.state = ManagedTorrentState::Live(live.clone()); g.state = ManagedTorrentState::Live(live.clone());
drop(g); drop(g);
@ -371,6 +382,7 @@ impl ManagedTorrent {
live_cancellation_token, live_cancellation_token,
init_semaphore, init_semaphore,
bitv_factory, bitv_factory,
session_stats,
) )
} }
ManagedTorrentState::None => bail!("bug: torrent is in empty state"), ManagedTorrentState::None => bail!("bug: torrent is in empty state"),

View file

@ -34,6 +34,21 @@ export interface Speed {
human_readable: string; human_readable: string;
} }
export interface AggregatePeerStats {
queued: number;
connecting: number;
live: number;
seen: number;
dead: number;
not_needed: number;
}
export interface SessionStats {
download_speed: Speed;
upload_speed: Speed;
peers: AggregatePeerStats;
}
// Interface for the Torrent Stats API response // Interface for the Torrent Stats API response
export interface LiveTorrentStats { export interface LiveTorrentStats {
snapshot: { snapshot: {
@ -46,14 +61,7 @@ export interface LiveTorrentStats {
remaining_bytes: number; remaining_bytes: number;
total_bytes: number; total_bytes: number;
total_piece_download_ms: number; total_piece_download_ms: number;
peer_stats: { peer_stats: AggregatePeerStats;
queued: number;
connecting: number;
live: number;
seen: number;
dead: number;
not_needed: number;
};
}; };
average_piece_download_time: { average_piece_download_time: {
secs: number; secs: number;
@ -182,4 +190,5 @@ export interface RqbitAPI {
start: (index: number) => Promise<void>; start: (index: number) => Promise<void>;
forget: (index: number) => Promise<void>; forget: (index: number) => Promise<void>;
delete: (index: number) => Promise<void>; delete: (index: number) => Promise<void>;
stats: () => Promise<SessionStats>;
} }

View file

@ -0,0 +1,12 @@
import { useStatsStore } from "../stores/statsStore";
import { Speed } from "./Speed";
export const Footer: React.FC<{}> = () => {
let stats = useStatsStore((stats) => stats.stats);
return (
<div className="sticky bottom-0 bg-white/10 dark:text-gray-200 backdrop-blur text-nowrap text-xs font-medium text-gray-500 flex p-1 gap-x-3 justify-center">
<div> {stats.download_speed.human_readable}</div>
<div> {stats.upload_speed.human_readable}</div>
</div>
);
};

View file

@ -19,7 +19,9 @@ export const TorrentsList = (props: {
<p className="text-center">No existing torrents found.</p> <p className="text-center">No existing torrents found.</p>
) : ( ) : (
props.torrents.map((t: TorrentId) => ( props.torrents.map((t: TorrentId) => (
<Torrent id={t.id} key={t.id} torrent={t} /> <>
<Torrent id={t.id} key={t.id} torrent={t} />
</>
)) ))
)} )}
</div> </div>

View file

@ -1,5 +1,5 @@
import { createContext } from "react"; import { createContext } from "react";
import { RqbitAPI } from "./api-types"; import { RqbitAPI, SessionStats } from "./api-types";
export const APIContext = createContext<RqbitAPI>({ export const APIContext = createContext<RqbitAPI>({
listTorrents: () => { listTorrents: () => {
@ -38,5 +38,8 @@ export const APIContext = createContext<RqbitAPI>({
getPlaylistUrl: function (index: number): string | null { getPlaylistUrl: function (index: number): string | null {
throw new Error("Function not implemented."); throw new Error("Function not implemented.");
}, },
stats: function (): Promise<SessionStats> {
throw new Error("Function not implemented.");
},
}); });
export const RefreshTorrentStatsContext = createContext({ refresh: () => {} }); export const RefreshTorrentStatsContext = createContext({ refresh: () => {} });

View file

@ -3,6 +3,7 @@ import {
ErrorDetails, ErrorDetails,
ListTorrentsResponse, ListTorrentsResponse,
RqbitAPI, RqbitAPI,
SessionStats,
TorrentDetails, TorrentDetails,
TorrentStats, TorrentStats,
} from "./api-types"; } from "./api-types";
@ -82,6 +83,9 @@ export const API: RqbitAPI & { getVersion: () => Promise<string> } = {
getTorrentStats: (index: number): Promise<TorrentStats> => { getTorrentStats: (index: number): Promise<TorrentStats> => {
return makeRequest("GET", `/torrents/${index}/stats/v1`); return makeRequest("GET", `/torrents/${index}/stats/v1`);
}, },
stats: (): Promise<SessionStats> => {
return makeRequest("GET", "/stats");
},
uploadTorrent: (data, opts): Promise<AddTorrentResponse> => { uploadTorrent: (data, opts): Promise<AddTorrentResponse> => {
let url = "/torrents?&overwrite=true"; let url = "/torrents?&overwrite=true";
@ -152,6 +156,6 @@ export const API: RqbitAPI & { getVersion: () => Promise<string> } = {
return url; return url;
}, },
getPlaylistUrl: (index: number) => { getPlaylistUrl: (index: number) => {
return (apiUrl || window.origin) + `/torrents/${index}/playlist`; return (apiUrl || window.origin) + `/torrents/${index}/playlist`;
}, },
}; };

View file

@ -11,6 +11,8 @@ import { DarkMode } from "./helper/darkMode";
import { useTorrentStore } from "./stores/torrentStore"; import { useTorrentStore } from "./stores/torrentStore";
import { useErrorStore } from "./stores/errorStore"; import { useErrorStore } from "./stores/errorStore";
import { AlertModal } from "./components/modal/AlertModal"; import { AlertModal } from "./components/modal/AlertModal";
import { useStatsStore } from "./stores/statsStore";
import { Footer } from "./components/Footer";
export interface ErrorWithLabel { export interface ErrorWithLabel {
text: string; text: string;
@ -49,6 +51,8 @@ export const RqbitWebUI = (props: {
}; };
setRefreshTorrents(refreshTorrents); setRefreshTorrents(refreshTorrents);
const setStats = useStatsStore((state) => state.setStats);
useEffect(() => { useEffect(() => {
return customSetInterval( return customSetInterval(
async () => async () =>
@ -67,8 +71,25 @@ export const RqbitWebUI = (props: {
); );
}, []); }, []);
useEffect(() => {
return customSetInterval(
async () =>
API.stats().then(
(stats) => {
setStats(stats);
return 1000;
},
(e) => {
console.error(e);
return 5000;
}
),
0
);
}, []);
return ( return (
<div className="dark:bg-gray-900 dark:text-gray-200 min-h-screen"> <div className="dark:bg-gray-900 dark:text-gray-200 min-h-screen flex flex-col">
<Header title={props.title} version={props.version} /> <Header title={props.title} version={props.version} />
<div className="relative"> <div className="relative">
{/* Menu buttons */} {/* Menu buttons */}
@ -82,10 +103,14 @@ export const RqbitWebUI = (props: {
<BsMoon /> <BsMoon />
</IconButton> </IconButton>
</div> </div>
</div>
<div className="grow">
<RootContent /> <RootContent />
</div> </div>
<Footer />
<LogStreamModal show={logsOpened} onClose={() => setLogsOpened(false)} /> <LogStreamModal show={logsOpened} onClose={() => setLogsOpened(false)} />
<AlertModal /> <AlertModal />
</div> </div>

View file

@ -0,0 +1,26 @@
import { create } from "zustand";
import { SessionStats } from "../api-types";
export interface StatsStore {
stats: SessionStats;
setStats: (stats: SessionStats) => void;
}
export const useStatsStore = create<StatsStore>((set) => ({
stats: {
download_speed: { human_readable: "N/A", mbps: 0 },
upload_speed: { human_readable: "N/A", mbps: 0 },
peers: {
connecting: 0,
dead: 0,
live: 0,
not_needed: 0,
queued: 0,
seen: 0,
},
},
setStats: (stats) => {
set({ stats });
},
}));

View file

@ -19,6 +19,7 @@ use librqbit::{
TorrentListResponse, TorrentStats, TorrentListResponse, TorrentStats,
}, },
dht::PersistentDhtConfig, dht::PersistentDhtConfig,
session_stats::snapshot::SessionStatsSnapshot,
tracing_subscriber_config_utils::{init_logging, InitLoggingOptions, InitLoggingResult}, tracing_subscriber_config_utils::{init_logging, InitLoggingOptions, InitLoggingResult},
AddTorrent, AddTorrentOptions, Api, ApiError, PeerConnectionOptions, Session, SessionOptions, AddTorrent, AddTorrentOptions, Api, ApiError, PeerConnectionOptions, Session, SessionOptions,
SessionPersistenceConfig, SessionPersistenceConfig,
@ -318,6 +319,11 @@ async fn torrent_action_configure(
.await .await
} }
#[tauri::command]
async fn stats(state: tauri::State<'_, State>) -> Result<SessionStatsSnapshot, ApiError> {
Ok(state.api()?.api_session_stats())
}
#[tauri::command] #[tauri::command]
fn get_version() -> &'static str { fn get_version() -> &'static str {
env!("CARGO_PKG_VERSION") env!("CARGO_PKG_VERSION")
@ -352,6 +358,7 @@ async fn start() {
torrent_action_start, torrent_action_start,
torrent_action_configure, torrent_action_configure,
torrent_create_from_base64_file, torrent_create_from_base64_file,
stats,
get_version, get_version,
config_default, config_default,
config_current, config_current,

View file

@ -6,6 +6,7 @@ import {
TorrentDetails, TorrentDetails,
TorrentStats, TorrentStats,
ErrorDetails, ErrorDetails,
SessionStats,
} from "rqbit-webui/src/api-types"; } from "rqbit-webui/src/api-types";
import { InvokeArgs, invoke } from "@tauri-apps/api/tauri"; import { InvokeArgs, invoke } from "@tauri-apps/api/tauri";
@ -141,5 +142,8 @@ export const makeAPI = (configuration: RqbitDesktopConfig): RqbitAPI => {
} }
return `${httpBase}/torrents/${index}/playlist`; return `${httpBase}/torrents/${index}/playlist`;
}, },
stats: () => {
return invokeAPI<SessionStats>("stats");
},
}; };
}; };