Session speed estimator

This commit is contained in:
Igor Katson 2024-08-21 11:57:45 +01:00
parent 0fdf6ad429
commit 06e88c138f
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 60 additions and 8 deletions

View file

@ -79,6 +79,7 @@ fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BS) -> anyhow::Res
lengths.total_pieces() lengths.total_pieces()
); );
} }
let required_size = lengths.chunk_bitfield_bytes(); let required_size = lengths.chunk_bitfield_bytes();
let vec = vec![0u8; required_size]; let vec = vec![0u8; required_size];
let mut chunk_bf = BF::from_boxed_slice(vec.into_boxed_slice()); let mut chunk_bf = BF::from_boxed_slice(vec.into_boxed_slice());

View file

@ -16,7 +16,7 @@ use crate::{
peer_connection::PeerConnectionOptions, peer_connection::PeerConnectionOptions,
read_buf::ReadBuf, read_buf::ReadBuf,
session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore}, session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore},
session_stats::atomic::AtomicSessionStats, session_stats::SessionStats,
spawn_utils::BlockingSpawner, spawn_utils::BlockingSpawner,
storage::{ storage::{
filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage, filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage,
@ -117,7 +117,7 @@ pub struct Session {
root_span: Option<Span>, root_span: Option<Span>,
stats: Arc<AtomicSessionStats>, pub(crate) stats: SessionStats,
// This is stored for all tasks to stop when session is dropped. // This is stored for all tasks to stop when session is dropped.
_cancellation_token_drop_guard: DropGuard, _cancellation_token_drop_guard: DropGuard,
@ -605,8 +605,8 @@ impl Session {
reqwest_client, reqwest_client,
connector: stream_connector, connector: stream_connector,
root_span: opts.root_span, root_span: opts.root_span,
stats: Default::default(), stats: SessionStats::new(),
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3))) concurrent_initialize_semaphore: Arc::new( tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3)))
}); });
if let Some(mut disk_write_rx) = disk_write_rx { if let Some(mut disk_write_rx) = disk_write_rx {
@ -638,7 +638,7 @@ impl Session {
if let Some(persistence) = session.persistence.as_ref() { if let Some(persistence) = session.persistence.as_ref() {
info!("will use {persistence:?} for session persistence"); info!("will use {persistence:?} for session persistence");
let mut ps = persistence.stream_all().await?; let mut ps = persistence.stream_all().await?;
let mut added_all = false; let mut added_all = false;
let mut futs = FuturesUnordered::new(); let mut futs = FuturesUnordered::new();
@ -665,6 +665,8 @@ impl Session {
} }
} }
session.start_speed_estimator_updater();
Ok(session) Ok(session)
} }
.boxed() .boxed()
@ -789,7 +791,7 @@ impl Session {
spawn_with_cancel(span, self.cancellation_token.clone(), fut); spawn_with_cancel(span, self.cancellation_token.clone(), fut);
} }
fn rs(&self) -> Option<tracing::Id> { pub(crate) fn rs(&self) -> Option<tracing::Id> {
self.root_span.as_ref().and_then(|s| s.id()) self.root_span.as_ref().and_then(|s| s.id())
} }
@ -1151,7 +1153,7 @@ impl Session {
self.cancellation_token.child_token(), self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(), self.concurrent_initialize_semaphore.clone(),
self.bitv_factory.clone(), self.bitv_factory.clone(),
self.stats.clone(), self.stats.atomic.clone(),
) )
.context("error starting torrent")?; .context("error starting torrent")?;
} }
@ -1308,7 +1310,7 @@ impl Session {
self.cancellation_token.child_token(), self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(), self.concurrent_initialize_semaphore.clone(),
self.bitv_factory.clone(), self.bitv_factory.clone(),
self.stats.clone(), self.stats.atomic.clone(),
)?; )?;
self.try_update_persistence_metadata(handle).await; self.try_update_persistence_metadata(handle).await;
Ok(()) Ok(())

View file

@ -1 +1,50 @@
use std::{
sync::{atomic::Ordering, Arc},
time::{Duration, Instant},
};
use atomic::AtomicSessionStats;
use librqbit_core::speed_estimator::SpeedEstimator;
use tracing::error_span;
use crate::Session;
pub mod atomic; pub mod atomic;
pub struct SessionStats {
pub atomic: Arc<AtomicSessionStats>,
pub down_speed_estimator: SpeedEstimator,
pub up_speed_estimator: SpeedEstimator,
}
impl SessionStats {
pub fn new() -> Self {
SessionStats {
atomic: Default::default(),
down_speed_estimator: SpeedEstimator::new(5),
up_speed_estimator: SpeedEstimator::new(5),
}
}
}
impl Session {
pub(crate) fn start_speed_estimator_updater(self: &Arc<Self>) {
self.spawn(error_span!(parent: self.rs(), "speed_estimator"), {
let s = self.clone();
async move {
let mut i = tokio::time::interval(Duration::from_secs(1));
loop {
i.tick().await;
let now = Instant::now();
let fetched = s.stats.atomic.fetched_bytes.load(Ordering::Relaxed);
let uploaded = s.stats.atomic.uploaded_bytes.load(Ordering::Relaxed);
s.stats
.down_speed_estimator
.add_snapshot(fetched, None, now);
s.stats.up_speed_estimator.add_snapshot(uploaded, None, now);
}
}
})
}
}