Add speed estimator

This commit is contained in:
Igor Katson 2021-06-30 23:26:22 +01:00
parent c1f34a6599
commit 47966e094c
5 changed files with 172 additions and 22 deletions

View file

@ -5,10 +5,14 @@ use std::sync::atomic::Ordering;
use std::time::Instant;
use warp::Filter;
use crate::speed_estimator::SpeedEstimator;
use crate::torrent_state::TorrentState;
// This is just a stub for debugging, nothing useful here.
pub async fn make_and_run_http_api(state: Arc<TorrentState>) -> anyhow::Result<()> {
pub async fn make_and_run_http_api(
state: Arc<TorrentState>,
estimator: Arc<SpeedEstimator>,
) -> anyhow::Result<()> {
let dump_haves = warp::path("haves").map({
let state = state.clone();
move || format!("{:?}", state.locked.read().chunks.get_have_pieces())
@ -20,13 +24,12 @@ pub async fn make_and_run_http_api(state: Arc<TorrentState>) -> anyhow::Result<(
let initial_downloaded_and_checked =
state.stats.downloaded_and_checked.load(Ordering::Relaxed);
move || {
let stats = &state.stats;
let mut buf = Vec::new();
writeln!(buf, "{:#?}", &stats).unwrap();
writeln!(buf, "{:#?}", state.stats_snapshot()).unwrap();
writeln!(
buf,
"Average download time: {:?}",
stats.average_piece_download_time()
state.stats.average_piece_download_time()
)
.unwrap();
@ -37,11 +40,20 @@ pub async fn make_and_run_http_api(state: Arc<TorrentState>) -> anyhow::Result<(
let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64;
writeln!(
buf,
"Speed: {:.2}Mbps",
"Total download speed over all time: {:.2}Mbps",
downloaded_mb / elapsed.as_secs_f64()
)
.unwrap();
writeln!(buf, "Download speed: {:.2}Mbps", estimator.download_mbps()).unwrap();
match estimator.time_remaining() {
Some(time) => {
writeln!(buf, "Time remaining: {:?}", time).unwrap();
}
None => {
writeln!(buf, "Time remaining: unknown").unwrap();
}
}
buf
}
});

View file

@ -12,6 +12,7 @@ pub mod peer_state;
pub mod serde_bencode;
pub mod sha1w;
pub mod spawn_utils;
pub mod speed_estimator;
pub mod torrent_manager;
pub mod torrent_metainfo;
pub mod torrent_state;

View file

@ -0,0 +1,83 @@
use std::{
collections::VecDeque,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use parking_lot::Mutex;
use tokio::time::sleep;
use crate::torrent_state::{StatsSnapshot, TorrentState};
pub struct SpeedEstimator {
state: Arc<TorrentState>,
latest_per_second_snapshots: Mutex<VecDeque<StatsSnapshot>>,
download_bytes_per_second: AtomicU64,
time_remaining_millis: AtomicU64,
}
impl SpeedEstimator {
pub fn new(state: Arc<TorrentState>, window_seconds: usize) -> Arc<Self> {
assert!(window_seconds > 1);
let estimator = Arc::new(Self {
state,
latest_per_second_snapshots: Mutex::new(VecDeque::with_capacity(window_seconds)),
download_bytes_per_second: Default::default(),
time_remaining_millis: Default::default(),
});
estimator
}
pub fn time_remaining(&self) -> Option<Duration> {
let tr = self.time_remaining_millis.load(Ordering::Relaxed);
if tr == 0 {
return None;
}
Some(Duration::from_millis(tr))
}
pub fn download_bps(&self) -> u64 {
self.download_bytes_per_second.load(Ordering::Relaxed)
}
pub fn download_mbps(&self) -> f64 {
self.download_bps() as f64 / 1024f64 / 1024f64
}
pub async fn run_forever(self: Arc<Self>) -> anyhow::Result<()> {
loop {
let current = self.state.stats_snapshot();
{
let mut g = self.latest_per_second_snapshots.lock();
if g.len() < g.capacity() {
g.push_back(current);
continue;
}
let first = g.pop_front().unwrap();
let downloaded_bytes =
current.downloaded_and_checked_bytes - first.downloaded_and_checked_bytes;
let elapsed = first.time.elapsed();
let bps = downloaded_bytes as f64 / elapsed.as_secs_f64();
let time_remaining_millis_rounded: u64 = if downloaded_bytes > 0 {
let time_remaining_secs = current.remaining_bytes as f64 / bps;
(time_remaining_secs * 1000f64) as u64
} else {
0
};
self.time_remaining_millis
.store(time_remaining_millis_rounded, Ordering::Relaxed);
self.download_bytes_per_second
.store(bps as u64, Ordering::Relaxed);
g.push_back(current);
}
sleep(Duration::from_secs(1)).await;
}
}
}

View file

@ -22,6 +22,7 @@ use crate::{
http_api::make_and_run_http_api,
lengths::Lengths,
spawn_utils::spawn,
speed_estimator::SpeedEstimator,
torrent_metainfo::TorrentMetaV1Owned,
torrent_state::{AtomicStats, TorrentState, TorrentStateLocked},
tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse},
@ -90,6 +91,7 @@ impl TorrentManagerHandle {
#[derive(Clone)]
struct TorrentManager {
state: Arc<TorrentState>,
speed_estimator: Arc<SpeedEstimator>,
force_tracker_interval: Option<Duration>,
}
@ -168,29 +170,38 @@ impl TorrentManager {
lengths,
);
let state = Arc::new(TorrentState {
info_hash: torrent.info_hash,
torrent,
peer_id,
locked: Arc::new(RwLock::new(TorrentStateLocked {
peers: Default::default(),
chunks: chunk_tracker,
})),
files,
stats: AtomicStats {
have: AtomicU64::new(initial_check_results.have_bytes),
..Default::default()
},
needed: initial_check_results.needed_bytes,
lengths,
});
let estimator = SpeedEstimator::new(state.clone(), 5);
let mgr = Self {
state: Arc::new(TorrentState {
info_hash: torrent.info_hash,
torrent,
peer_id,
locked: Arc::new(RwLock::new(TorrentStateLocked {
peers: Default::default(),
chunks: chunk_tracker,
})),
files,
stats: AtomicStats {
have: AtomicU64::new(initial_check_results.have_bytes),
..Default::default()
},
needed: initial_check_results.needed_bytes,
lengths,
}),
state,
speed_estimator: estimator,
force_tracker_interval,
};
spawn("tracker monitor", mgr.clone().task_tracker_monitor());
spawn("stats printer", mgr.clone().stats_printer());
spawn("http api", make_and_run_http_api(mgr.state.clone()));
spawn(
"http api",
make_and_run_http_api(mgr.state.clone(), mgr.speed_estimator.clone()),
);
spawn("speed estimator", mgr.speed_estimator.clone().run_forever());
Ok(mgr.into_handle())
}

View file

@ -168,6 +168,21 @@ impl AtomicStats {
}
}
#[derive(Debug)]
pub struct StatsSnapshot {
pub have_bytes: u64,
pub downloaded_and_checked_bytes: u64,
pub downloaded_and_checked_pieces: u64,
pub fetched_bytes: u64,
pub uploaded_bytes: u64,
pub initially_needed_bytes: u64,
pub remaining_bytes: u64,
pub live_peers: u32,
pub seen_peers: u32,
pub connecting_peers: u32,
pub time: Instant,
}
pub struct TorrentState {
pub torrent: TorrentMetaV1Owned,
pub locked: Arc<RwLock<TorrentStateLocked>>,
@ -395,4 +410,32 @@ impl TorrentState {
});
true
}
pub fn stats_snapshot(&self) -> StatsSnapshot {
let g = self.locked.read();
use Ordering::*;
let (live, connecting) =
g.peers
.states
.values()
.fold((0u32, 0u32), |(live, connecting), p| match p {
PeerState::Connecting(_) => (live, connecting + 1),
PeerState::Live(_) => (live + 1, connecting),
});
let downloaded = self.stats.downloaded_and_checked.load(Relaxed);
let remaining = self.needed - downloaded;
StatsSnapshot {
have_bytes: self.stats.have.load(Relaxed),
downloaded_and_checked_bytes: downloaded,
downloaded_and_checked_pieces: self.stats.downloaded_pieces.load(Relaxed),
fetched_bytes: self.stats.fetched_bytes.load(Relaxed),
uploaded_bytes: self.stats.fetched_bytes.load(Relaxed),
live_peers: live,
seen_peers: g.peers.seen.len() as u32,
connecting_peers: connecting,
time: Instant::now(),
initially_needed_bytes: self.needed,
remaining_bytes: remaining,
}
}
}