From 4b6ed369277d75c1b78e33e12eb269456dfafdc9 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 1 Jul 2021 10:07:12 +0100 Subject: [PATCH] De-entangle speed estimator from complex objects --- crates/librqbit/src/speed_estimator.rs | 84 ++++++++++++-------------- crates/librqbit/src/torrent_manager.rs | 22 +++++-- 2 files changed, 56 insertions(+), 50 deletions(-) diff --git a/crates/librqbit/src/speed_estimator.rs b/crates/librqbit/src/speed_estimator.rs index 4f00f95..0c7d834 100644 --- a/crates/librqbit/src/speed_estimator.rs +++ b/crates/librqbit/src/speed_estimator.rs @@ -1,34 +1,30 @@ use std::{ collections::VecDeque, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - time::Duration, + sync::atomic::{AtomicU64, Ordering}, + time::{Duration, Instant}, }; use parking_lot::Mutex; -use tokio::time::sleep; -use crate::torrent_state::{StatsSnapshot, TorrentState}; +struct ProgressSnapshot { + downloaded_bytes: u64, + instant: Instant, +} pub struct SpeedEstimator { - state: Arc, - latest_per_second_snapshots: Mutex>, + latest_per_second_snapshots: Mutex>, download_bytes_per_second: AtomicU64, time_remaining_millis: AtomicU64, } impl SpeedEstimator { - pub fn new(state: Arc, window_seconds: usize) -> Arc { + pub fn new(window_seconds: usize) -> Self { assert!(window_seconds > 1); - let estimator = Arc::new(Self { - state, + Self { latest_per_second_snapshots: Mutex::new(VecDeque::with_capacity(window_seconds)), download_bytes_per_second: Default::default(), time_remaining_millis: Default::default(), - }); - estimator + } } pub fn time_remaining(&self) -> Option { @@ -47,37 +43,35 @@ impl SpeedEstimator { self.download_bps() as f64 / 1024f64 / 1024f64 } - pub async fn run_forever(self: Arc) -> anyhow::Result<()> { - loop { - let current = self.state.stats_snapshot(); - { - let mut g = self.latest_per_second_snapshots.lock(); - if g.len() < g.capacity() { - g.push_back(current); - continue; - } - let first = g.pop_front().unwrap(); - - let downloaded_bytes = - current.downloaded_and_checked_bytes - first.downloaded_and_checked_bytes; - let elapsed = first.time.elapsed(); - let bps = downloaded_bytes as f64 / elapsed.as_secs_f64(); - - let time_remaining_millis_rounded: u64 = if downloaded_bytes > 0 { - let time_remaining_secs = current.remaining_bytes as f64 / bps; - (time_remaining_secs * 1000f64) as u64 - } else { - 0 - }; - self.time_remaining_millis - .store(time_remaining_millis_rounded, Ordering::Relaxed); - self.download_bytes_per_second - .store(bps as u64, Ordering::Relaxed); - - g.push_back(current); - } - - sleep(Duration::from_secs(1)).await; + pub fn add_snapshot(&self, downloaded_bytes: u64, remaining_bytes: u64, instant: Instant) { + let mut g = self.latest_per_second_snapshots.lock(); + if g.len() < g.capacity() { + g.push_back(ProgressSnapshot { + downloaded_bytes, + instant, + }); + return; } + let first = g.pop_front().unwrap(); + + let downloaded_bytes_diff = downloaded_bytes - first.downloaded_bytes; + let elapsed = instant - first.instant; + let bps = downloaded_bytes_diff as f64 / elapsed.as_secs_f64(); + + let time_remaining_millis_rounded: u64 = if downloaded_bytes_diff > 0 { + let time_remaining_secs = remaining_bytes as f64 / bps; + (time_remaining_secs * 1000f64) as u64 + } else { + 0 + }; + self.time_remaining_millis + .store(time_remaining_millis_rounded, Ordering::Relaxed); + self.download_bytes_per_second + .store(bps as u64, Ordering::Relaxed); + + g.push_back(ProgressSnapshot { + downloaded_bytes, + instant, + }); } } diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 4776c02..ea47a85 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -6,7 +6,7 @@ use std::{ atomic::{AtomicU64, Ordering}, Arc, }, - time::Duration, + time::{Duration, Instant}, }; use anyhow::Context; @@ -186,11 +186,11 @@ impl TorrentManager { needed: initial_check_results.needed_bytes, lengths, }); - let estimator = SpeedEstimator::new(state.clone(), 5); + let estimator = Arc::new(SpeedEstimator::new(5)); let mgr = Self { state, - speed_estimator: estimator, + speed_estimator: estimator.clone(), force_tracker_interval, }; @@ -198,9 +198,21 @@ impl TorrentManager { spawn("stats printer", mgr.clone().stats_printer()); spawn( "http api", - make_and_run_http_api(mgr.state.clone(), mgr.speed_estimator.clone()), + make_and_run_http_api(mgr.state.clone(), estimator.clone()), ); - spawn("speed estimator", mgr.speed_estimator.clone().run_forever()); + spawn("speed estimator updater", { + let state = mgr.state.clone(); + let estimator = estimator.clone(); + async move { + loop { + let downloaded = state.stats.downloaded_and_checked.load(Ordering::Relaxed); + let needed = state.needed; + let remaining = needed - downloaded; + estimator.add_snapshot(downloaded, remaining, Instant::now()); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }); Ok(mgr.into_handle()) }