diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 49e4303..f4b96ec 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -5,10 +5,14 @@ use std::sync::atomic::Ordering; use std::time::Instant; use warp::Filter; +use crate::speed_estimator::SpeedEstimator; use crate::torrent_state::TorrentState; // This is just a stub for debugging, nothing useful here. -pub async fn make_and_run_http_api(state: Arc) -> anyhow::Result<()> { +pub async fn make_and_run_http_api( + state: Arc, + estimator: Arc, +) -> anyhow::Result<()> { let dump_haves = warp::path("haves").map({ let state = state.clone(); move || format!("{:?}", state.locked.read().chunks.get_have_pieces()) @@ -20,13 +24,12 @@ pub async fn make_and_run_http_api(state: Arc) -> anyhow::Result<( let initial_downloaded_and_checked = state.stats.downloaded_and_checked.load(Ordering::Relaxed); move || { - let stats = &state.stats; let mut buf = Vec::new(); - writeln!(buf, "{:#?}", &stats).unwrap(); + writeln!(buf, "{:#?}", state.stats_snapshot()).unwrap(); writeln!( buf, "Average download time: {:?}", - stats.average_piece_download_time() + state.stats.average_piece_download_time() ) .unwrap(); @@ -37,11 +40,20 @@ pub async fn make_and_run_http_api(state: Arc) -> anyhow::Result<( let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64; writeln!( buf, - "Speed: {:.2}Mbps", + "Total download speed over all time: {:.2}Mbps", downloaded_mb / elapsed.as_secs_f64() ) .unwrap(); + writeln!(buf, "Download speed: {:.2}Mbps", estimator.download_mbps()).unwrap(); + match estimator.time_remaining() { + Some(time) => { + writeln!(buf, "Time remaining: {:?}", time).unwrap(); + } + None => { + writeln!(buf, "Time remaining: unknown").unwrap(); + } + } buf } }); diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 1074fcc..5c608fe 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -12,6 +12,7 @@ pub mod peer_state; pub mod serde_bencode; pub mod sha1w; pub mod spawn_utils; +pub mod speed_estimator; pub mod torrent_manager; pub mod torrent_metainfo; pub mod torrent_state; diff --git a/crates/librqbit/src/speed_estimator.rs b/crates/librqbit/src/speed_estimator.rs new file mode 100644 index 0000000..4f00f95 --- /dev/null +++ b/crates/librqbit/src/speed_estimator.rs @@ -0,0 +1,83 @@ +use std::{ + collections::VecDeque, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; + +use parking_lot::Mutex; +use tokio::time::sleep; + +use crate::torrent_state::{StatsSnapshot, TorrentState}; + +pub struct SpeedEstimator { + state: Arc, + latest_per_second_snapshots: Mutex>, + download_bytes_per_second: AtomicU64, + time_remaining_millis: AtomicU64, +} + +impl SpeedEstimator { + pub fn new(state: Arc, window_seconds: usize) -> Arc { + assert!(window_seconds > 1); + let estimator = Arc::new(Self { + state, + latest_per_second_snapshots: Mutex::new(VecDeque::with_capacity(window_seconds)), + download_bytes_per_second: Default::default(), + time_remaining_millis: Default::default(), + }); + estimator + } + + pub fn time_remaining(&self) -> Option { + let tr = self.time_remaining_millis.load(Ordering::Relaxed); + if tr == 0 { + return None; + } + Some(Duration::from_millis(tr)) + } + + pub fn download_bps(&self) -> u64 { + self.download_bytes_per_second.load(Ordering::Relaxed) + } + + pub fn download_mbps(&self) -> f64 { + self.download_bps() as f64 / 1024f64 / 1024f64 + } + + pub async fn run_forever(self: Arc) -> anyhow::Result<()> { + loop { + let current = self.state.stats_snapshot(); + { + let mut g = self.latest_per_second_snapshots.lock(); + if g.len() < g.capacity() { + g.push_back(current); + continue; + } + let first = g.pop_front().unwrap(); + + let downloaded_bytes = + current.downloaded_and_checked_bytes - first.downloaded_and_checked_bytes; + let elapsed = first.time.elapsed(); + let bps = downloaded_bytes as f64 / elapsed.as_secs_f64(); + + let time_remaining_millis_rounded: u64 = if downloaded_bytes > 0 { + let time_remaining_secs = current.remaining_bytes as f64 / bps; + (time_remaining_secs * 1000f64) as u64 + } else { + 0 + }; + self.time_remaining_millis + .store(time_remaining_millis_rounded, Ordering::Relaxed); + self.download_bytes_per_second + .store(bps as u64, Ordering::Relaxed); + + g.push_back(current); + } + + sleep(Duration::from_secs(1)).await; + } + } +} diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 69780a8..4776c02 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -22,6 +22,7 @@ use crate::{ http_api::make_and_run_http_api, lengths::Lengths, spawn_utils::spawn, + speed_estimator::SpeedEstimator, torrent_metainfo::TorrentMetaV1Owned, torrent_state::{AtomicStats, TorrentState, TorrentStateLocked}, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, @@ -90,6 +91,7 @@ impl TorrentManagerHandle { #[derive(Clone)] struct TorrentManager { state: Arc, + speed_estimator: Arc, force_tracker_interval: Option, } @@ -168,29 +170,38 @@ impl TorrentManager { lengths, ); + let state = Arc::new(TorrentState { + info_hash: torrent.info_hash, + torrent, + peer_id, + locked: Arc::new(RwLock::new(TorrentStateLocked { + peers: Default::default(), + chunks: chunk_tracker, + })), + files, + stats: AtomicStats { + have: AtomicU64::new(initial_check_results.have_bytes), + ..Default::default() + }, + needed: initial_check_results.needed_bytes, + lengths, + }); + let estimator = SpeedEstimator::new(state.clone(), 5); + let mgr = Self { - state: Arc::new(TorrentState { - info_hash: torrent.info_hash, - torrent, - peer_id, - locked: Arc::new(RwLock::new(TorrentStateLocked { - peers: Default::default(), - chunks: chunk_tracker, - })), - files, - stats: AtomicStats { - have: AtomicU64::new(initial_check_results.have_bytes), - ..Default::default() - }, - needed: initial_check_results.needed_bytes, - lengths, - }), + state, + speed_estimator: estimator, force_tracker_interval, }; spawn("tracker monitor", mgr.clone().task_tracker_monitor()); spawn("stats printer", mgr.clone().stats_printer()); - spawn("http api", make_and_run_http_api(mgr.state.clone())); + spawn( + "http api", + make_and_run_http_api(mgr.state.clone(), mgr.speed_estimator.clone()), + ); + spawn("speed estimator", mgr.speed_estimator.clone().run_forever()); + Ok(mgr.into_handle()) } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 1746822..6e5ff9f 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -168,6 +168,21 @@ impl AtomicStats { } } +#[derive(Debug)] +pub struct StatsSnapshot { + pub have_bytes: u64, + pub downloaded_and_checked_bytes: u64, + pub downloaded_and_checked_pieces: u64, + pub fetched_bytes: u64, + pub uploaded_bytes: u64, + pub initially_needed_bytes: u64, + pub remaining_bytes: u64, + pub live_peers: u32, + pub seen_peers: u32, + pub connecting_peers: u32, + pub time: Instant, +} + pub struct TorrentState { pub torrent: TorrentMetaV1Owned, pub locked: Arc>, @@ -395,4 +410,32 @@ impl TorrentState { }); true } + + pub fn stats_snapshot(&self) -> StatsSnapshot { + let g = self.locked.read(); + use Ordering::*; + let (live, connecting) = + g.peers + .states + .values() + .fold((0u32, 0u32), |(live, connecting), p| match p { + PeerState::Connecting(_) => (live, connecting + 1), + PeerState::Live(_) => (live + 1, connecting), + }); + let downloaded = self.stats.downloaded_and_checked.load(Relaxed); + let remaining = self.needed - downloaded; + StatsSnapshot { + have_bytes: self.stats.have.load(Relaxed), + downloaded_and_checked_bytes: downloaded, + downloaded_and_checked_pieces: self.stats.downloaded_pieces.load(Relaxed), + fetched_bytes: self.stats.fetched_bytes.load(Relaxed), + uploaded_bytes: self.stats.fetched_bytes.load(Relaxed), + live_peers: live, + seen_peers: g.peers.seen.len() as u32, + connecting_peers: connecting, + time: Instant::now(), + initially_needed_bytes: self.needed, + remaining_bytes: remaining, + } + } }