From 876afbf41bdc18a910663d2d3d758dfb72492647 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 24 Nov 2023 15:04:36 +0000 Subject: [PATCH] Initialization progress reporting --- crates/librqbit/src/file_ops.rs | 7 +- crates/librqbit/src/http_api.rs | 83 ++++++++++++++++--- crates/librqbit/src/session.rs | 2 +- .../src/torrent_state/initializing.rs | 28 +++---- crates/librqbit/src/torrent_state/live/mod.rs | 3 +- .../src/torrent_state/live/stats/snapshot.rs | 6 +- crates/librqbit/src/torrent_state/mod.rs | 10 ++- crates/librqbit/src/torrent_state/paused.rs | 1 - crates/librqbit_core/src/lengths.rs | 9 +- 9 files changed, 109 insertions(+), 40 deletions(-) diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index e413dc4..aee6625 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -2,7 +2,10 @@ use std::{ fs::File, io::{Read, Seek, SeekFrom, Write}, marker::PhantomData, - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, }; use anyhow::Context; @@ -67,6 +70,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { pub fn initial_check( &self, only_files: Option<&[usize]>, + progress: &AtomicU64, ) -> anyhow::Result { let mut needed_pieces = BF::from_vec(vec![0u8; self.lengths.piece_bitfield_bytes()]); let mut have_pieces = BF::from_vec(vec![0u8; self.lengths.piece_bitfield_bytes()]); @@ -125,6 +129,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { let mut piece_remaining = piece_info.len as usize; let mut some_files_broken = false; let mut at_least_one_file_required = current_file.full_file_required; + progress.fetch_add(piece_info.len as u64, Ordering::Relaxed); while piece_remaining > 0 { let mut to_read_in_file = diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 179e2c6..87bb46c 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -11,6 +11,7 @@ use librqbit_core::id20::Id20; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; use tracing::{info, warn}; @@ -23,7 +24,7 @@ use crate::session::{ }; use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}; use crate::torrent_state::stats::snapshot::StatsSnapshot; -use crate::torrent_state::ManagedTorrentHandle; +use crate::torrent_state::{ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive}; // Public API #[derive(Clone)] @@ -100,11 +101,18 @@ impl HttpApi { state.api_dump_haves(idx) } - async fn torrent_stats( + async fn torrent_stats_v0( State(state): State, Path(idx): Path, ) -> Result { - state.api_stats(idx).map(axum::Json) + state.api_stats_v0(idx).map(axum::Json) + } + + async fn torrent_stats_v1( + State(state): State, + Path(idx): Path, + ) -> Result { + state.api_stats_v1(idx).map(axum::Json) } async fn peer_stats( @@ -137,7 +145,8 @@ impl HttpApi { .route("/torrents", get(torrents_list).post(torrents_post)) .route("/torrents/:id", get(torrent_details)) .route("/torrents/:id/haves", get(torrent_haves)) - .route("/torrents/:id/stats", get(torrent_stats)) + .route("/torrents/:id/stats", get(torrent_stats_v0)) + .route("/torrents/:id/stats/v1", get(torrent_stats_v1)) .route("/torrents/:id/peer_stats", get(peer_stats)) .route("/torrents/:id/pause", post(torrent_action_pause)) .route("/torrents/:id/start", post(torrent_action_start)); @@ -196,7 +205,7 @@ impl HttpApi { type Result = std::result::Result; -#[derive(Serialize)] +#[derive(Serialize, Default)] struct Speed { mbps: f64, human_readable: String, @@ -261,8 +270,8 @@ impl Serialize for DurationWithHumanReadable { } } -#[derive(Serialize)] -struct StatsResponse { +#[derive(Serialize, Default)] +struct LiveStats { snapshot: StatsSnapshot, average_piece_download_time: Option, download_speed: Speed, @@ -270,6 +279,15 @@ struct StatsResponse { time_remaining: Option, } +#[derive(Serialize)] +struct StatsResponse { + state: &'static str, + error: Option, + progress_bytes: u64, + total_bytes: u64, + live: Option, +} + #[derive(Serialize, Deserialize)] pub struct ApiAddTorrentResponse { pub id: Option, @@ -465,9 +483,7 @@ impl ApiInternal { Ok(dht.with_routing_table(|r| r.clone())) } - fn api_stats(&self, idx: TorrentId) -> Result { - let mgr = self.mgr_handle(idx)?; - let live = mgr.live().context("not live")?; + fn make_live_stats(&self, live: &TorrentStateLive) -> LiveStats { let snapshot = live.stats_snapshot(); let estimator = live.speed_estimator(); @@ -476,12 +492,57 @@ impl ApiInternal { let downloaded_bytes = snapshot.downloaded_and_checked_bytes; let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64; - Ok(StatsResponse { + LiveStats { average_piece_download_time: snapshot.average_piece_download_time(), snapshot, all_time_download_speed: (downloaded_mb / elapsed.as_secs_f64()).into(), download_speed: estimator.download_mbps().into(), time_remaining: estimator.time_remaining().map(DurationWithHumanReadable), + } + } + + fn api_stats_v0(&self, idx: TorrentId) -> Result { + let mgr = self.mgr_handle(idx)?; + let live = mgr.live().context("torrent not live")?; + Ok(self.make_live_stats(&live)) + } + + fn api_stats_v1(&self, idx: TorrentId) -> Result { + let mgr = self.mgr_handle(idx)?; + let mut resp = StatsResponse { + total_bytes: mgr.info().lengths.total_length(), + state: "", + error: None, + progress_bytes: 0, + live: None, + }; + + mgr.with_state(|s| { + match s { + ManagedTorrentState::Initializing(i) => { + resp.state = "initializing"; + resp.progress_bytes = i.checked_bytes.load(Ordering::Relaxed); + } + ManagedTorrentState::Paused(p) => { + resp.state = "paused"; + resp.progress_bytes = p.have_bytes; + } + ManagedTorrentState::Live(l) => { + resp.state = "live"; + let live_stats = self.make_live_stats(l); + resp.progress_bytes = live_stats.snapshot.downloaded_and_checked_bytes; + resp.live = Some(live_stats); + } + ManagedTorrentState::Error(e) => { + resp.state = "error"; + resp.error = Some(format!("{:?}", e)) + } + ManagedTorrentState::None => { + resp.state = "error"; + resp.error = Some("bug: torrent in broken \"None\" state".to_string()); + } + } + Ok(resp) }) } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 2809973..f3af02c 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -412,7 +412,7 @@ impl Session { { return Ok(AddTorrentResponse::AlreadyManaged(id, handle.clone())); } - let managed_torrent = builder.build(); + let managed_torrent = builder.build()?; let id = g.add_torrent(managed_torrent.clone()); (managed_torrent, id) }; diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 1858878..c40d2db 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -1,12 +1,11 @@ use std::{ fs::{File, OpenOptions}, - sync::Arc, + sync::{atomic::AtomicU64, Arc}, time::Instant, }; use anyhow::Context; -use librqbit_core::{lengths::Lengths, torrent_metainfo::TorrentMetaV1Info}; use parking_lot::Mutex; use sha1w::Sha1; @@ -17,13 +16,6 @@ use crate::{chunk_tracker::ChunkTracker, file_ops::FileOps}; use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; -fn make_lengths>( - torrent: &TorrentMetaV1Info, -) -> anyhow::Result { - let total_length = torrent.iter_file_lengths()?.sum(); - Lengths::new(total_length, torrent.piece_length, None) -} - fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> { Ok(file.set_len(length)?) } @@ -31,11 +23,16 @@ fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> { pub struct TorrentStateInitializing { pub(crate) meta: Arc, pub(crate) only_files: Option>, + pub(crate) checked_bytes: AtomicU64, } impl TorrentStateInitializing { pub fn new(meta: Arc, only_files: Option>) -> Self { - Self { meta, only_files } + Self { + meta, + only_files, + checked_bytes: AtomicU64::new(0), + } } pub async fn check(&self) -> anyhow::Result { @@ -72,14 +69,12 @@ impl TorrentStateInitializing { (files, filenames) }; - let lengths = - make_lengths(&self.meta.info).context("unable to compute Lengths from torrent")?; - debug!("computed lengths: {:?}", &lengths); + debug!("computed lengths: {:?}", &self.meta.lengths); info!("Doing initial checksum validation, this might take a while..."); let initial_check_results = self.meta.spawner.spawn_block_in_place(|| { - FileOps::::new(&self.meta.info, &files, &lengths) - .initial_check(self.only_files.as_deref()) + FileOps::::new(&self.meta.info, &files, &self.meta.lengths) + .initial_check(self.only_files.as_deref(), &self.checked_bytes) })?; info!( @@ -122,7 +117,7 @@ impl TorrentStateInitializing { let chunk_tracker = ChunkTracker::new( initial_check_results.needed_pieces, initial_check_results.have_pieces, - lengths, + self.meta.lengths, ); let paused = TorrentStatePaused { @@ -131,7 +126,6 @@ impl TorrentStateInitializing { filenames, chunk_tracker, have_bytes: initial_check_results.have_bytes, - needed_bytes: initial_check_results.needed_bytes, }; Ok(paused) } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 957ca5d..68a96c8 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -163,7 +163,7 @@ impl TorrentStateLive { let speed_estimator = SpeedEstimator::new(5); let have_bytes = paused.have_bytes; - let needed_bytes = paused.needed_bytes; + let needed_bytes = paused.info.lengths.total_length() - have_bytes; let lengths = *paused.chunk_tracker.get_lengths(); let state = Arc::new(TorrentStateLive { @@ -533,7 +533,6 @@ impl TorrentStateLive { fetched_bytes: self.stats.fetched_bytes.load(Relaxed), uploaded_bytes: self.stats.uploaded_bytes.load(Relaxed), total_bytes: self.have_plus_needed_bytes, - time: Instant::now(), initially_needed_bytes: self.needed_bytes, remaining_bytes: remaining, total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), diff --git a/crates/librqbit/src/torrent_state/live/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/stats/snapshot.rs index f22b8be..45dce43 100644 --- a/crates/librqbit/src/torrent_state/live/stats/snapshot.rs +++ b/crates/librqbit/src/torrent_state/live/stats/snapshot.rs @@ -4,7 +4,7 @@ use serde::Serialize; use crate::torrent_state::live::peers::stats::snapshot::AggregatePeerStats; -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Default)] pub struct StatsSnapshot { pub have_bytes: u64, pub downloaded_and_checked_bytes: u64, @@ -14,8 +14,8 @@ pub struct StatsSnapshot { pub initially_needed_bytes: u64, pub remaining_bytes: u64, pub total_bytes: u64, - #[serde(skip)] - pub time: Instant, + // #[serde(skip)] + // pub time: Instant, pub total_piece_download_ms: u64, pub peer_stats: AggregatePeerStats, } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 1451da4..a81a59f 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -13,6 +13,7 @@ use anyhow::bail; use anyhow::Context; use buffers::ByteString; use librqbit_core::id20::Id20; +use librqbit_core::lengths::Lengths; use librqbit_core::peer_id::generate_peer_id; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; @@ -73,6 +74,7 @@ pub struct ManagedTorrentInfo { pub spawner: BlockingSpawner, pub trackers: Vec, pub peer_id: Id20, + pub lengths: Lengths, pub(crate) options: ManagedTorrentOptions, } @@ -286,7 +288,8 @@ impl ManagedTorrentBuilder { self } - pub(crate) fn build(self) -> ManagedTorrentHandle { + pub(crate) fn build(self) -> anyhow::Result { + let lengths = Lengths::from_torrent(&self.info)?; let info = Arc::new(ManagedTorrentInfo { info: self.info, info_hash: self.info_hash, @@ -294,6 +297,7 @@ impl ManagedTorrentBuilder { trackers: self.trackers.into_iter().collect(), spawner: self.spawner.unwrap_or_default(), peer_id: self.peer_id.unwrap_or_else(generate_peer_id), + lengths, options: ManagedTorrentOptions { force_tracker_interval: self.force_tracker_interval, peer_connect_timeout: self.peer_connect_timeout, @@ -305,13 +309,13 @@ impl ManagedTorrentBuilder { info.clone(), self.only_files.clone(), )); - Arc::new(ManagedTorrent { + Ok(Arc::new(ManagedTorrent { only_files: self.only_files, locked: RwLock::new(ManagedTorrentLocked { state: ManagedTorrentState::Initializing(initializing), }), info, - }) + })) } } diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index b8ac7c1..62a4553 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -12,7 +12,6 @@ pub struct TorrentStatePaused { pub(crate) filenames: Vec, pub(crate) chunk_tracker: ChunkTracker, pub(crate) have_bytes: u64, - pub(crate) needed_bytes: u64, } // impl TorrentStatePaused { diff --git a/crates/librqbit_core/src/lengths.rs b/crates/librqbit_core/src/lengths.rs index 05c7c82..64855ad 100644 --- a/crates/librqbit_core/src/lengths.rs +++ b/crates/librqbit_core/src/lengths.rs @@ -1,4 +1,4 @@ -use crate::constants::CHUNK_SIZE; +use crate::{constants::CHUNK_SIZE, torrent_metainfo::TorrentMetaV1Info}; const fn is_power_of_two(x: u64) -> bool { (x != 0) && ((x & (x - 1)) == 0) @@ -61,6 +61,13 @@ impl ValidPieceIndex { } impl Lengths { + pub fn from_torrent>( + torrent: &TorrentMetaV1Info, + ) -> anyhow::Result { + let total_length = torrent.iter_file_lengths()?.sum(); + Lengths::new(total_length, torrent.piece_length, None) + } + pub fn new( total_length: u64, piece_length: u32,