diff --git a/crates/dht/examples/dht.rs b/crates/dht/examples/dht.rs index cac7bd6..8862cdc 100644 --- a/crates/dht/examples/dht.rs +++ b/crates/dht/examples/dht.rs @@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let dht = Dht::new().await.context("error initializing DHT")?; - let mut stream = dht.get_peers(info_hash).await?; + let mut stream = dht.get_peers(info_hash)?; let stats_printer = async { loop { diff --git a/crates/librqbit/examples/ubuntu.rs b/crates/librqbit/examples/ubuntu.rs index c4932b0..7c136de 100644 --- a/crates/librqbit/examples/ubuntu.rs +++ b/crates/librqbit/examples/ubuntu.rs @@ -16,6 +16,10 @@ const MAGNET_LINK: &str = "magnet:?xt=urn:btih:cab507494d02ebb1178b38f2e9d7be299 #[tokio::main] async fn main() -> Result<(), anyhow::Error> { // Output logs to console. + match std::env::var("RUST_LOG") { + Ok(_) => {} + Err(_) => std::env::set_var("RUST_LOG", "info"), + } tracing_subscriber::fmt::init(); let output_dir = std::env::args() @@ -44,19 +48,21 @@ async fn main() -> Result<(), anyhow::Error> { .await .context("error adding torrent")? { - AddTorrentResponse::Added(handle) => handle, + AddTorrentResponse::Added(_, handle) => handle, // For a brand new session other variants won't happen. _ => unreachable!(), }; + info!("Details: {:?}", &handle.info().info); + // Print stats periodically. tokio::spawn({ let handle = handle.clone(); async move { loop { tokio::time::sleep(Duration::from_secs(1)).await; - let stats = handle.torrent_state().stats_snapshot(); - info!("stats: {stats:?}"); + let stats = handle.stats(); + info!("{stats:}"); } } }); diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 4266c6a..c44e68d 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -11,9 +11,7 @@ use librqbit_core::id20::Id20; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; -use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::{Duration, Instant}; use tokio::sync::mpsc::UnboundedSender; use tracing::{info, warn}; @@ -24,8 +22,8 @@ use crate::session::{ AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId, }; use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}; -use crate::torrent_state::stats::snapshot::StatsSnapshot; -use crate::torrent_state::{ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive}; +use crate::torrent_state::stats::{LiveStats, TorrentStats}; +use crate::torrent_state::ManagedTorrentHandle; // Public API #[derive(Clone)] @@ -233,27 +231,6 @@ impl HttpApi { type Result = std::result::Result; -#[derive(Serialize, Default)] -struct Speed { - mbps: f64, - human_readable: String, -} - -impl Speed { - fn new(mbps: f64) -> Self { - Self { - mbps, - human_readable: format!("{mbps:.2} MiB/s"), - } - } -} - -impl From for Speed { - fn from(mbps: f64) -> Self { - Self::new(mbps) - } -} - #[derive(Serialize)] struct TorrentListResponseItem { id: usize, @@ -281,45 +258,6 @@ pub struct TorrentDetailsResponse { pub files: Vec, } -struct DurationWithHumanReadable(Duration); - -impl Serialize for DurationWithHumanReadable { - fn serialize(&self, serializer: S) -> core::result::Result - where - S: serde::Serializer, - { - #[derive(Serialize)] - struct Tmp { - duration: Duration, - human_readable: String, - } - Tmp { - duration: self.0, - human_readable: format!("{:?}", self.0), - } - .serialize(serializer) - } -} - -#[derive(Serialize, Default)] -struct LiveStats { - snapshot: StatsSnapshot, - average_piece_download_time: Option, - download_speed: Speed, - all_time_download_speed: Speed, - time_remaining: Option, -} - -#[derive(Serialize)] -struct StatsResponse { - state: &'static str, - error: Option, - progress_bytes: u64, - total_bytes: u64, - finished: bool, - live: Option, -} - #[derive(Serialize, Deserialize)] pub struct ApiAddTorrentResponse { pub id: Option, @@ -393,7 +331,6 @@ impl TorrentAddQueryParams { // Private HTTP API internals. Agnostic of web framework. struct ApiInternal { - startup_time: Instant, session: Arc, rust_log_reload_tx: Option>, } @@ -403,7 +340,6 @@ type ApiState = Arc; impl ApiInternal { pub fn new(session: Arc, rust_log_reload_tx: Option>) -> Self { Self { - startup_time: Instant::now(), session, rust_log_reload_tx, } @@ -543,70 +479,15 @@ impl ApiInternal { Ok(dht.with_routing_table(|r| r.clone())) } - fn make_live_stats(&self, live: &TorrentStateLive) -> LiveStats { - let snapshot = live.stats_snapshot(); - let estimator = live.speed_estimator(); - - // Poor mans download speed computation - let elapsed = self.startup_time.elapsed(); - let downloaded_bytes = snapshot.downloaded_and_checked_bytes; - let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64; - - LiveStats { - average_piece_download_time: snapshot.average_piece_download_time(), - snapshot, - all_time_download_speed: (downloaded_mb / elapsed.as_secs_f64()).into(), - download_speed: estimator.download_mbps().into(), - time_remaining: estimator.time_remaining().map(DurationWithHumanReadable), - } - } - fn api_stats_v0(&self, idx: TorrentId) -> Result { let mgr = self.mgr_handle(idx)?; let live = mgr.live().context("torrent not live")?; - Ok(self.make_live_stats(&live)) + Ok(LiveStats::from(&*live)) } - fn api_stats_v1(&self, idx: TorrentId) -> Result { + fn api_stats_v1(&self, idx: TorrentId) -> Result { let mgr = self.mgr_handle(idx)?; - let mut resp = StatsResponse { - total_bytes: mgr.info().lengths.total_length(), - state: "", - error: None, - progress_bytes: 0, - finished: false, - live: None, - }; - - mgr.with_state(|s| { - match s { - ManagedTorrentState::Initializing(i) => { - resp.state = "initializing"; - resp.progress_bytes = i.checked_bytes.load(Ordering::Relaxed); - } - ManagedTorrentState::Paused(p) => { - resp.state = "paused"; - resp.progress_bytes = p.have_bytes; - resp.finished = p.have_bytes == resp.total_bytes; - } - ManagedTorrentState::Live(l) => { - resp.state = "live"; - let live_stats = self.make_live_stats(l); - resp.progress_bytes = live_stats.snapshot.have_bytes; - resp.finished = resp.progress_bytes == resp.total_bytes; - resp.live = Some(live_stats); - } - ManagedTorrentState::Error(e) => { - resp.state = "error"; - resp.error = Some(format!("{:?}", e)) - } - ManagedTorrentState::None => { - resp.state = "error"; - resp.error = Some("bug: torrent in broken \"None\" state".to_string()); - } - } - Ok(resp) - }) + Ok(mgr.stats()) } fn api_dump_haves(&self, idx: usize) -> Result { diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 4be7c2d..ac5c18a 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -8,7 +8,6 @@ pub mod peer_connection; pub mod peer_info_reader; pub mod session; pub mod spawn_utils; -// pub mod torrent_manager; pub mod torrent_state; pub mod tracker_comms; pub mod type_aliases; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index ddf82dc..0e3cc4e 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1,7 +1,6 @@ use std::{ borrow::Cow, collections::{HashMap, HashSet}, - fs::{File, OpenOptions}, io::{BufReader, BufWriter, Read}, net::SocketAddr, path::PathBuf, diff --git a/crates/librqbit/src/torrent_state/live/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/stats/snapshot.rs index 6331b6a..2e5dd53 100644 --- a/crates/librqbit/src/torrent_state/live/stats/snapshot.rs +++ b/crates/librqbit/src/torrent_state/live/stats/snapshot.rs @@ -14,8 +14,6 @@ pub struct StatsSnapshot { pub initially_needed_bytes: u64, pub remaining_bytes: u64, pub total_bytes: u64, - // #[serde(skip)] - // pub time: Instant, pub total_piece_download_ms: u64, pub peer_stats: AggregatePeerStats, } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index e95c5b3..2cc9fcd 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -1,12 +1,14 @@ pub mod initializing; pub mod live; pub mod paused; +pub mod stats; pub mod utils; use std::collections::HashSet; use std::net::SocketAddr; use std::path::Path; use std::path::PathBuf; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Weak; use std::time::Duration; @@ -31,10 +33,12 @@ use url::Url; use crate::chunk_tracker::ChunkTracker; use crate::spawn_utils::spawn; use crate::spawn_utils::BlockingSpawner; +use crate::torrent_state::stats::LiveStats; use initializing::TorrentStateInitializing; use self::paused::TorrentStatePaused; +use self::stats::TorrentStats; pub enum ManagedTorrentState { Initializing(Arc), @@ -251,12 +255,63 @@ impl ManagedTorrent { } } + pub fn stats(&self) -> TorrentStats { + let mut resp = TorrentStats { + total_bytes: self.info().lengths.total_length(), + state: "", + error: None, + progress_bytes: 0, + finished: false, + live: None, + }; + + self.with_state(|s| { + match s { + ManagedTorrentState::Initializing(i) => { + resp.state = "initializing"; + resp.progress_bytes = i.checked_bytes.load(Ordering::Relaxed); + } + ManagedTorrentState::Paused(p) => { + resp.state = "paused"; + resp.progress_bytes = p.have_bytes; + resp.finished = p.have_bytes == resp.total_bytes; + } + ManagedTorrentState::Live(l) => { + resp.state = "live"; + let live_stats = LiveStats::from(l.as_ref()); + resp.progress_bytes = live_stats.snapshot.have_bytes; + resp.finished = resp.progress_bytes == resp.total_bytes; + resp.live = Some(live_stats); + } + ManagedTorrentState::Error(e) => { + resp.state = "error"; + resp.error = Some(format!("{:?}", e)) + } + ManagedTorrentState::None => { + resp.state = "error"; + resp.error = Some("bug: torrent in broken \"None\" state".to_string()); + } + } + resp + }) + } + pub async fn wait_until_completed(&self) -> anyhow::Result<()> { - // TODO: rewrite - self.live() - .context("torrent isn't live")? - .wait_until_completed() - .await; + // TODO: rewrite, this polling is horrible + let live = loop { + let live = self.with_state(|s| match s { + ManagedTorrentState::Initializing(_) | ManagedTorrentState::Paused(_) => Ok(None), + ManagedTorrentState::Live(l) => Ok(Some(l.clone())), + ManagedTorrentState::Error(e) => bail!("{:?}", e), + ManagedTorrentState::None => bail!("bug: torrent state is None"), + })?; + if let Some(live) = live { + break live; + } + tokio::time::sleep(Duration::from_secs(1)).await; + }; + + live.wait_until_completed().await; Ok(()) } } diff --git a/crates/librqbit/src/torrent_state/stats.rs b/crates/librqbit/src/torrent_state/stats.rs new file mode 100644 index 0000000..8940aa7 --- /dev/null +++ b/crates/librqbit/src/torrent_state/stats.rs @@ -0,0 +1,198 @@ +use std::time::Duration; + +use serde::Serialize; + +use super::{live::stats::snapshot::StatsSnapshot, TorrentStateLive}; +use size_format::SizeFormatterBinary as SF; + +#[derive(Serialize, Default, Debug)] +pub struct LiveStats { + pub snapshot: StatsSnapshot, + pub average_piece_download_time: Option, + pub download_speed: Speed, + pub time_remaining: Option, +} + +impl std::fmt::Display for LiveStats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "down speed: {}", self.download_speed)?; + if let Some(time_remaining) = &self.time_remaining { + write!(f, " eta: {time_remaining}")?; + } + Ok(()) + } +} + +impl From<&TorrentStateLive> for LiveStats { + fn from(live: &TorrentStateLive) -> Self { + let snapshot = live.stats_snapshot(); + let estimator = live.speed_estimator(); + + Self { + average_piece_download_time: snapshot.average_piece_download_time(), + snapshot, + download_speed: estimator.download_mbps().into(), + time_remaining: estimator.time_remaining().map(DurationWithHumanReadable), + } + } +} + +#[derive(Serialize, Debug)] +pub struct TorrentStats { + pub state: &'static str, + pub error: Option, + pub progress_bytes: u64, + pub total_bytes: u64, + pub finished: bool, + pub live: Option, +} + +impl std::fmt::Display for TorrentStats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}: ", self.state)?; + if let Some(error) = &self.error { + return write!(f, "{error}"); + } + write!( + f, + "{} ({})", + self.progress_percent_human_readable(), + self.progress_bytes_human_readable() + )?; + if let Some(live) = &self.live { + write!(f, " [{live}]")?; + } + Ok(()) + } +} + +impl TorrentStats { + pub fn progress_percent_human_readable(&self) -> impl std::fmt::Display { + struct Percents { + progress: u64, + total: u64, + } + impl std::fmt::Display for Percents { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.total == 0 { + return write!(f, "N/A"); + } + let pct = self.progress as f64 / self.total as f64 * 100f64; + write!(f, "{pct:.2}%") + } + } + Percents { + progress: self.progress_bytes, + total: self.total_bytes, + } + } + + pub fn progress_bytes_human_readable(&self) -> impl std::fmt::Display { + struct Progress { + progress: u64, + total: u64, + } + impl std::fmt::Display for Progress { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} / {}", SF::new(self.progress), SF::new(self.total)) + } + } + Progress { + progress: self.progress_bytes, + total: self.total_bytes, + } + } +} + +fn format_seconds_to_time(seconds: u64, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let hours = seconds / 3600; + let minutes = (seconds % 3600) / 60; + let seconds = seconds % 60; + + if hours > 0 { + write!(f, "{}h {}m", hours, minutes) + } else if minutes > 0 { + write!(f, "{}m {}s", minutes, seconds) + } else { + write!(f, "{}s", seconds) + } +} + +pub struct DurationWithHumanReadable(Duration); + +impl core::fmt::Display for DurationWithHumanReadable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result { + format_seconds_to_time(self.0.as_secs(), f) + } +} + +impl core::fmt::Debug for DurationWithHumanReadable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self) + } +} + +impl Serialize for DurationWithHumanReadable { + fn serialize(&self, serializer: S) -> core::result::Result + where + S: serde::Serializer, + { + #[derive(Serialize)] + struct Tmp { + duration: Duration, + human_readable: String, + } + Tmp { + duration: self.0, + human_readable: format!("{}", self), + } + .serialize(serializer) + } +} + +#[derive(Default)] +pub struct Speed { + pub mbps: f64, +} + +impl Speed { + fn new(mbps: f64) -> Self { + Self { mbps } + } +} + +impl From for Speed { + fn from(mbps: f64) -> Self { + Self::new(mbps) + } +} + +impl core::fmt::Display for Speed { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:.2} MiB/s", self.mbps) + } +} + +impl core::fmt::Debug for Speed { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self) + } +} + +impl Serialize for Speed { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + #[derive(Serialize)] + struct Tmp { + mbps: f64, + human_readable: String, + } + Tmp { + mbps: self.mbps, + human_readable: format!("{:?}", self.mbps), + } + .serialize(serializer) + } +}