De-entangle speed estimator from complex objects

This commit is contained in:
Igor Katson 2021-07-01 10:07:12 +01:00
parent 47966e094c
commit 4b6ed36927
2 changed files with 56 additions and 50 deletions

View file

@ -1,34 +1,30 @@
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
sync::{ sync::atomic::{AtomicU64, Ordering},
atomic::{AtomicU64, Ordering}, time::{Duration, Instant},
Arc,
},
time::Duration,
}; };
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::time::sleep;
use crate::torrent_state::{StatsSnapshot, TorrentState}; struct ProgressSnapshot {
downloaded_bytes: u64,
instant: Instant,
}
pub struct SpeedEstimator { pub struct SpeedEstimator {
state: Arc<TorrentState>, latest_per_second_snapshots: Mutex<VecDeque<ProgressSnapshot>>,
latest_per_second_snapshots: Mutex<VecDeque<StatsSnapshot>>,
download_bytes_per_second: AtomicU64, download_bytes_per_second: AtomicU64,
time_remaining_millis: AtomicU64, time_remaining_millis: AtomicU64,
} }
impl SpeedEstimator { impl SpeedEstimator {
pub fn new(state: Arc<TorrentState>, window_seconds: usize) -> Arc<Self> { pub fn new(window_seconds: usize) -> Self {
assert!(window_seconds > 1); assert!(window_seconds > 1);
let estimator = Arc::new(Self { Self {
state,
latest_per_second_snapshots: Mutex::new(VecDeque::with_capacity(window_seconds)), latest_per_second_snapshots: Mutex::new(VecDeque::with_capacity(window_seconds)),
download_bytes_per_second: Default::default(), download_bytes_per_second: Default::default(),
time_remaining_millis: Default::default(), time_remaining_millis: Default::default(),
}); }
estimator
} }
pub fn time_remaining(&self) -> Option<Duration> { pub fn time_remaining(&self) -> Option<Duration> {
@ -47,37 +43,35 @@ impl SpeedEstimator {
self.download_bps() as f64 / 1024f64 / 1024f64 self.download_bps() as f64 / 1024f64 / 1024f64
} }
pub async fn run_forever(self: Arc<Self>) -> anyhow::Result<()> { pub fn add_snapshot(&self, downloaded_bytes: u64, remaining_bytes: u64, instant: Instant) {
loop { let mut g = self.latest_per_second_snapshots.lock();
let current = self.state.stats_snapshot(); if g.len() < g.capacity() {
{ g.push_back(ProgressSnapshot {
let mut g = self.latest_per_second_snapshots.lock(); downloaded_bytes,
if g.len() < g.capacity() { instant,
g.push_back(current); });
continue; return;
}
let first = g.pop_front().unwrap();
let downloaded_bytes =
current.downloaded_and_checked_bytes - first.downloaded_and_checked_bytes;
let elapsed = first.time.elapsed();
let bps = downloaded_bytes as f64 / elapsed.as_secs_f64();
let time_remaining_millis_rounded: u64 = if downloaded_bytes > 0 {
let time_remaining_secs = current.remaining_bytes as f64 / bps;
(time_remaining_secs * 1000f64) as u64
} else {
0
};
self.time_remaining_millis
.store(time_remaining_millis_rounded, Ordering::Relaxed);
self.download_bytes_per_second
.store(bps as u64, Ordering::Relaxed);
g.push_back(current);
}
sleep(Duration::from_secs(1)).await;
} }
let first = g.pop_front().unwrap();
let downloaded_bytes_diff = downloaded_bytes - first.downloaded_bytes;
let elapsed = instant - first.instant;
let bps = downloaded_bytes_diff as f64 / elapsed.as_secs_f64();
let time_remaining_millis_rounded: u64 = if downloaded_bytes_diff > 0 {
let time_remaining_secs = remaining_bytes as f64 / bps;
(time_remaining_secs * 1000f64) as u64
} else {
0
};
self.time_remaining_millis
.store(time_remaining_millis_rounded, Ordering::Relaxed);
self.download_bytes_per_second
.store(bps as u64, Ordering::Relaxed);
g.push_back(ProgressSnapshot {
downloaded_bytes,
instant,
});
} }
} }

View file

@ -6,7 +6,7 @@ use std::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
Arc, Arc,
}, },
time::Duration, time::{Duration, Instant},
}; };
use anyhow::Context; use anyhow::Context;
@ -186,11 +186,11 @@ impl TorrentManager {
needed: initial_check_results.needed_bytes, needed: initial_check_results.needed_bytes,
lengths, lengths,
}); });
let estimator = SpeedEstimator::new(state.clone(), 5); let estimator = Arc::new(SpeedEstimator::new(5));
let mgr = Self { let mgr = Self {
state, state,
speed_estimator: estimator, speed_estimator: estimator.clone(),
force_tracker_interval, force_tracker_interval,
}; };
@ -198,9 +198,21 @@ impl TorrentManager {
spawn("stats printer", mgr.clone().stats_printer()); spawn("stats printer", mgr.clone().stats_printer());
spawn( spawn(
"http api", "http api",
make_and_run_http_api(mgr.state.clone(), mgr.speed_estimator.clone()), make_and_run_http_api(mgr.state.clone(), estimator.clone()),
); );
spawn("speed estimator", mgr.speed_estimator.clone().run_forever()); spawn("speed estimator updater", {
let state = mgr.state.clone();
let estimator = estimator.clone();
async move {
loop {
let downloaded = state.stats.downloaded_and_checked.load(Ordering::Relaxed);
let needed = state.needed;
let remaining = needed - downloaded;
estimator.add_snapshot(downloaded, remaining, Instant::now());
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
});
Ok(mgr.into_handle()) Ok(mgr.into_handle())
} }