From 06e88c138f24e8a9763c9173750bc40af065e95b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 11:57:45 +0100 Subject: [PATCH] Session speed estimator --- crates/librqbit/src/chunk_tracker.rs | 1 + crates/librqbit/src/session.rs | 18 +++++---- crates/librqbit/src/session_stats/mod.rs | 49 ++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 8 deletions(-) diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 7c25458..4d0a48f 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -79,6 +79,7 @@ fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BS) -> anyhow::Res lengths.total_pieces() ); } + let required_size = lengths.chunk_bitfield_bytes(); let vec = vec![0u8; required_size]; let mut chunk_bf = BF::from_boxed_slice(vec.into_boxed_slice()); diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index af1a892..d707b12 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -16,7 +16,7 @@ use crate::{ peer_connection::PeerConnectionOptions, read_buf::ReadBuf, session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore}, - session_stats::atomic::AtomicSessionStats, + session_stats::SessionStats, spawn_utils::BlockingSpawner, storage::{ filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage, @@ -117,7 +117,7 @@ pub struct Session { root_span: Option, - stats: Arc, + pub(crate) stats: SessionStats, // This is stored for all tasks to stop when session is dropped. _cancellation_token_drop_guard: DropGuard, @@ -605,8 +605,8 @@ impl Session { reqwest_client, connector: stream_connector, root_span: opts.root_span, - stats: Default::default(), - concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3))) + stats: SessionStats::new(), + concurrent_initialize_semaphore: Arc::new( tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3))) }); if let Some(mut disk_write_rx) = disk_write_rx { @@ -638,7 +638,7 @@ impl Session { if let Some(persistence) = session.persistence.as_ref() { info!("will use {persistence:?} for session persistence"); - let mut ps = persistence.stream_all().await?; + let mut ps = persistence.stream_all().await?; let mut added_all = false; let mut futs = FuturesUnordered::new(); @@ -665,6 +665,8 @@ impl Session { } } + session.start_speed_estimator_updater(); + Ok(session) } .boxed() @@ -789,7 +791,7 @@ impl Session { spawn_with_cancel(span, self.cancellation_token.clone(), fut); } - fn rs(&self) -> Option { + pub(crate) fn rs(&self) -> Option { self.root_span.as_ref().and_then(|s| s.id()) } @@ -1151,7 +1153,7 @@ impl Session { self.cancellation_token.child_token(), self.concurrent_initialize_semaphore.clone(), self.bitv_factory.clone(), - self.stats.clone(), + self.stats.atomic.clone(), ) .context("error starting torrent")?; } @@ -1308,7 +1310,7 @@ impl Session { self.cancellation_token.child_token(), self.concurrent_initialize_semaphore.clone(), self.bitv_factory.clone(), - self.stats.clone(), + self.stats.atomic.clone(), )?; self.try_update_persistence_metadata(handle).await; Ok(()) diff --git a/crates/librqbit/src/session_stats/mod.rs b/crates/librqbit/src/session_stats/mod.rs index 652223f..87afcf6 100644 --- a/crates/librqbit/src/session_stats/mod.rs +++ b/crates/librqbit/src/session_stats/mod.rs @@ -1 +1,50 @@ +use std::{ + sync::{atomic::Ordering, Arc}, + time::{Duration, Instant}, +}; + +use atomic::AtomicSessionStats; +use librqbit_core::speed_estimator::SpeedEstimator; +use tracing::error_span; + +use crate::Session; + pub mod atomic; + +pub struct SessionStats { + pub atomic: Arc, + pub down_speed_estimator: SpeedEstimator, + pub up_speed_estimator: SpeedEstimator, +} + +impl SessionStats { + pub fn new() -> Self { + SessionStats { + atomic: Default::default(), + down_speed_estimator: SpeedEstimator::new(5), + up_speed_estimator: SpeedEstimator::new(5), + } + } +} + +impl Session { + pub(crate) fn start_speed_estimator_updater(self: &Arc) { + self.spawn(error_span!(parent: self.rs(), "speed_estimator"), { + let s = self.clone(); + + async move { + let mut i = tokio::time::interval(Duration::from_secs(1)); + loop { + i.tick().await; + let now = Instant::now(); + let fetched = s.stats.atomic.fetched_bytes.load(Ordering::Relaxed); + let uploaded = s.stats.atomic.uploaded_bytes.load(Ordering::Relaxed); + s.stats + .down_speed_estimator + .add_snapshot(fetched, None, now); + s.stats.up_speed_estimator.add_snapshot(uploaded, None, now); + } + } + }) + } +}