Better API for stats printing

This commit is contained in:
Igor Katson 2023-11-25 10:11:40 +00:00
parent bec5e1be7f
commit 1bea1f9235
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
8 changed files with 273 additions and 137 deletions

View file

@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let dht = Dht::new().await.context("error initializing DHT")?;
let mut stream = dht.get_peers(info_hash).await?;
let mut stream = dht.get_peers(info_hash)?;
let stats_printer = async {
loop {

View file

@ -16,6 +16,10 @@ const MAGNET_LINK: &str = "magnet:?xt=urn:btih:cab507494d02ebb1178b38f2e9d7be299
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// Output logs to console.
match std::env::var("RUST_LOG") {
Ok(_) => {}
Err(_) => std::env::set_var("RUST_LOG", "info"),
}
tracing_subscriber::fmt::init();
let output_dir = std::env::args()
@ -44,19 +48,21 @@ async fn main() -> Result<(), anyhow::Error> {
.await
.context("error adding torrent")?
{
AddTorrentResponse::Added(handle) => handle,
AddTorrentResponse::Added(_, handle) => handle,
// For a brand new session other variants won't happen.
_ => unreachable!(),
};
info!("Details: {:?}", &handle.info().info);
// Print stats periodically.
tokio::spawn({
let handle = handle.clone();
async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let stats = handle.torrent_state().stats_snapshot();
info!("stats: {stats:?}");
let stats = handle.stats();
info!("{stats:}");
}
}
});

View file

@ -11,9 +11,7 @@ use librqbit_core::id20::Id20;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{info, warn};
@ -24,8 +22,8 @@ use crate::session::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId,
};
use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot};
use crate::torrent_state::stats::snapshot::StatsSnapshot;
use crate::torrent_state::{ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive};
use crate::torrent_state::stats::{LiveStats, TorrentStats};
use crate::torrent_state::ManagedTorrentHandle;
// Public API
#[derive(Clone)]
@ -233,27 +231,6 @@ impl HttpApi {
type Result<T> = std::result::Result<T, ApiError>;
#[derive(Serialize, Default)]
struct Speed {
mbps: f64,
human_readable: String,
}
impl Speed {
fn new(mbps: f64) -> Self {
Self {
mbps,
human_readable: format!("{mbps:.2} MiB/s"),
}
}
}
impl From<f64> for Speed {
fn from(mbps: f64) -> Self {
Self::new(mbps)
}
}
#[derive(Serialize)]
struct TorrentListResponseItem {
id: usize,
@ -281,45 +258,6 @@ pub struct TorrentDetailsResponse {
pub files: Vec<TorrentDetailsResponseFile>,
}
struct DurationWithHumanReadable(Duration);
impl Serialize for DurationWithHumanReadable {
fn serialize<S>(&self, serializer: S) -> core::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
#[derive(Serialize)]
struct Tmp {
duration: Duration,
human_readable: String,
}
Tmp {
duration: self.0,
human_readable: format!("{:?}", self.0),
}
.serialize(serializer)
}
}
#[derive(Serialize, Default)]
struct LiveStats {
snapshot: StatsSnapshot,
average_piece_download_time: Option<Duration>,
download_speed: Speed,
all_time_download_speed: Speed,
time_remaining: Option<DurationWithHumanReadable>,
}
#[derive(Serialize)]
struct StatsResponse {
state: &'static str,
error: Option<String>,
progress_bytes: u64,
total_bytes: u64,
finished: bool,
live: Option<LiveStats>,
}
#[derive(Serialize, Deserialize)]
pub struct ApiAddTorrentResponse {
pub id: Option<usize>,
@ -393,7 +331,6 @@ impl TorrentAddQueryParams {
// Private HTTP API internals. Agnostic of web framework.
struct ApiInternal {
startup_time: Instant,
session: Arc<Session>,
rust_log_reload_tx: Option<UnboundedSender<String>>,
}
@ -403,7 +340,6 @@ type ApiState = Arc<ApiInternal>;
impl ApiInternal {
pub fn new(session: Arc<Session>, rust_log_reload_tx: Option<UnboundedSender<String>>) -> Self {
Self {
startup_time: Instant::now(),
session,
rust_log_reload_tx,
}
@ -543,70 +479,15 @@ impl ApiInternal {
Ok(dht.with_routing_table(|r| r.clone()))
}
fn make_live_stats(&self, live: &TorrentStateLive) -> LiveStats {
let snapshot = live.stats_snapshot();
let estimator = live.speed_estimator();
// Poor mans download speed computation
let elapsed = self.startup_time.elapsed();
let downloaded_bytes = snapshot.downloaded_and_checked_bytes;
let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64;
LiveStats {
average_piece_download_time: snapshot.average_piece_download_time(),
snapshot,
all_time_download_speed: (downloaded_mb / elapsed.as_secs_f64()).into(),
download_speed: estimator.download_mbps().into(),
time_remaining: estimator.time_remaining().map(DurationWithHumanReadable),
}
}
fn api_stats_v0(&self, idx: TorrentId) -> Result<LiveStats> {
let mgr = self.mgr_handle(idx)?;
let live = mgr.live().context("torrent not live")?;
Ok(self.make_live_stats(&live))
Ok(LiveStats::from(&*live))
}
fn api_stats_v1(&self, idx: TorrentId) -> Result<StatsResponse> {
fn api_stats_v1(&self, idx: TorrentId) -> Result<TorrentStats> {
let mgr = self.mgr_handle(idx)?;
let mut resp = StatsResponse {
total_bytes: mgr.info().lengths.total_length(),
state: "",
error: None,
progress_bytes: 0,
finished: false,
live: None,
};
mgr.with_state(|s| {
match s {
ManagedTorrentState::Initializing(i) => {
resp.state = "initializing";
resp.progress_bytes = i.checked_bytes.load(Ordering::Relaxed);
}
ManagedTorrentState::Paused(p) => {
resp.state = "paused";
resp.progress_bytes = p.have_bytes;
resp.finished = p.have_bytes == resp.total_bytes;
}
ManagedTorrentState::Live(l) => {
resp.state = "live";
let live_stats = self.make_live_stats(l);
resp.progress_bytes = live_stats.snapshot.have_bytes;
resp.finished = resp.progress_bytes == resp.total_bytes;
resp.live = Some(live_stats);
}
ManagedTorrentState::Error(e) => {
resp.state = "error";
resp.error = Some(format!("{:?}", e))
}
ManagedTorrentState::None => {
resp.state = "error";
resp.error = Some("bug: torrent in broken \"None\" state".to_string());
}
}
Ok(resp)
})
Ok(mgr.stats())
}
fn api_dump_haves(&self, idx: usize) -> Result<String> {

View file

@ -8,7 +8,6 @@ pub mod peer_connection;
pub mod peer_info_reader;
pub mod session;
pub mod spawn_utils;
// pub mod torrent_manager;
pub mod torrent_state;
pub mod tracker_comms;
pub mod type_aliases;

View file

@ -1,7 +1,6 @@
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fs::{File, OpenOptions},
io::{BufReader, BufWriter, Read},
net::SocketAddr,
path::PathBuf,

View file

@ -14,8 +14,6 @@ pub struct StatsSnapshot {
pub initially_needed_bytes: u64,
pub remaining_bytes: u64,
pub total_bytes: u64,
// #[serde(skip)]
// pub time: Instant,
pub total_piece_download_ms: u64,
pub peer_stats: AggregatePeerStats,
}

View file

@ -1,12 +1,14 @@
pub mod initializing;
pub mod live;
pub mod paused;
pub mod stats;
pub mod utils;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
@ -31,10 +33,12 @@ use url::Url;
use crate::chunk_tracker::ChunkTracker;
use crate::spawn_utils::spawn;
use crate::spawn_utils::BlockingSpawner;
use crate::torrent_state::stats::LiveStats;
use initializing::TorrentStateInitializing;
use self::paused::TorrentStatePaused;
use self::stats::TorrentStats;
pub enum ManagedTorrentState {
Initializing(Arc<TorrentStateInitializing>),
@ -251,12 +255,63 @@ impl ManagedTorrent {
}
}
pub fn stats(&self) -> TorrentStats {
let mut resp = TorrentStats {
total_bytes: self.info().lengths.total_length(),
state: "",
error: None,
progress_bytes: 0,
finished: false,
live: None,
};
self.with_state(|s| {
match s {
ManagedTorrentState::Initializing(i) => {
resp.state = "initializing";
resp.progress_bytes = i.checked_bytes.load(Ordering::Relaxed);
}
ManagedTorrentState::Paused(p) => {
resp.state = "paused";
resp.progress_bytes = p.have_bytes;
resp.finished = p.have_bytes == resp.total_bytes;
}
ManagedTorrentState::Live(l) => {
resp.state = "live";
let live_stats = LiveStats::from(l.as_ref());
resp.progress_bytes = live_stats.snapshot.have_bytes;
resp.finished = resp.progress_bytes == resp.total_bytes;
resp.live = Some(live_stats);
}
ManagedTorrentState::Error(e) => {
resp.state = "error";
resp.error = Some(format!("{:?}", e))
}
ManagedTorrentState::None => {
resp.state = "error";
resp.error = Some("bug: torrent in broken \"None\" state".to_string());
}
}
resp
})
}
pub async fn wait_until_completed(&self) -> anyhow::Result<()> {
// TODO: rewrite
self.live()
.context("torrent isn't live")?
.wait_until_completed()
.await;
// TODO: rewrite, this polling is horrible
let live = loop {
let live = self.with_state(|s| match s {
ManagedTorrentState::Initializing(_) | ManagedTorrentState::Paused(_) => Ok(None),
ManagedTorrentState::Live(l) => Ok(Some(l.clone())),
ManagedTorrentState::Error(e) => bail!("{:?}", e),
ManagedTorrentState::None => bail!("bug: torrent state is None"),
})?;
if let Some(live) = live {
break live;
}
tokio::time::sleep(Duration::from_secs(1)).await;
};
live.wait_until_completed().await;
Ok(())
}
}

View file

@ -0,0 +1,198 @@
use std::time::Duration;
use serde::Serialize;
use super::{live::stats::snapshot::StatsSnapshot, TorrentStateLive};
use size_format::SizeFormatterBinary as SF;
#[derive(Serialize, Default, Debug)]
pub struct LiveStats {
pub snapshot: StatsSnapshot,
pub average_piece_download_time: Option<Duration>,
pub download_speed: Speed,
pub time_remaining: Option<DurationWithHumanReadable>,
}
impl std::fmt::Display for LiveStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "down speed: {}", self.download_speed)?;
if let Some(time_remaining) = &self.time_remaining {
write!(f, " eta: {time_remaining}")?;
}
Ok(())
}
}
impl From<&TorrentStateLive> for LiveStats {
fn from(live: &TorrentStateLive) -> Self {
let snapshot = live.stats_snapshot();
let estimator = live.speed_estimator();
Self {
average_piece_download_time: snapshot.average_piece_download_time(),
snapshot,
download_speed: estimator.download_mbps().into(),
time_remaining: estimator.time_remaining().map(DurationWithHumanReadable),
}
}
}
#[derive(Serialize, Debug)]
pub struct TorrentStats {
pub state: &'static str,
pub error: Option<String>,
pub progress_bytes: u64,
pub total_bytes: u64,
pub finished: bool,
pub live: Option<LiveStats>,
}
impl std::fmt::Display for TorrentStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: ", self.state)?;
if let Some(error) = &self.error {
return write!(f, "{error}");
}
write!(
f,
"{} ({})",
self.progress_percent_human_readable(),
self.progress_bytes_human_readable()
)?;
if let Some(live) = &self.live {
write!(f, " [{live}]")?;
}
Ok(())
}
}
impl TorrentStats {
pub fn progress_percent_human_readable(&self) -> impl std::fmt::Display {
struct Percents {
progress: u64,
total: u64,
}
impl std::fmt::Display for Percents {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.total == 0 {
return write!(f, "N/A");
}
let pct = self.progress as f64 / self.total as f64 * 100f64;
write!(f, "{pct:.2}%")
}
}
Percents {
progress: self.progress_bytes,
total: self.total_bytes,
}
}
pub fn progress_bytes_human_readable(&self) -> impl std::fmt::Display {
struct Progress {
progress: u64,
total: u64,
}
impl std::fmt::Display for Progress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} / {}", SF::new(self.progress), SF::new(self.total))
}
}
Progress {
progress: self.progress_bytes,
total: self.total_bytes,
}
}
}
fn format_seconds_to_time(seconds: u64, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let hours = seconds / 3600;
let minutes = (seconds % 3600) / 60;
let seconds = seconds % 60;
if hours > 0 {
write!(f, "{}h {}m", hours, minutes)
} else if minutes > 0 {
write!(f, "{}m {}s", minutes, seconds)
} else {
write!(f, "{}s", seconds)
}
}
pub struct DurationWithHumanReadable(Duration);
impl core::fmt::Display for DurationWithHumanReadable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result {
format_seconds_to_time(self.0.as_secs(), f)
}
}
impl core::fmt::Debug for DurationWithHumanReadable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
impl Serialize for DurationWithHumanReadable {
fn serialize<S>(&self, serializer: S) -> core::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
#[derive(Serialize)]
struct Tmp {
duration: Duration,
human_readable: String,
}
Tmp {
duration: self.0,
human_readable: format!("{}", self),
}
.serialize(serializer)
}
}
#[derive(Default)]
pub struct Speed {
pub mbps: f64,
}
impl Speed {
fn new(mbps: f64) -> Self {
Self { mbps }
}
}
impl From<f64> for Speed {
fn from(mbps: f64) -> Self {
Self::new(mbps)
}
}
impl core::fmt::Display for Speed {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:.2} MiB/s", self.mbps)
}
}
impl core::fmt::Debug for Speed {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
impl Serialize for Speed {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
#[derive(Serialize)]
struct Tmp {
mbps: f64,
human_readable: String,
}
Tmp {
mbps: self.mbps,
human_readable: format!("{:?}", self.mbps),
}
.serialize(serializer)
}
}