Initialization progress reporting

This commit is contained in:
Igor Katson 2023-11-24 15:04:36 +00:00
parent b79a21179f
commit 876afbf41b
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
9 changed files with 109 additions and 40 deletions

View file

@ -2,7 +2,10 @@ use std::{
fs::File, fs::File,
io::{Read, Seek, SeekFrom, Write}, io::{Read, Seek, SeekFrom, Write},
marker::PhantomData, marker::PhantomData,
sync::Arc, sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
}; };
use anyhow::Context; use anyhow::Context;
@ -67,6 +70,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
pub fn initial_check( pub fn initial_check(
&self, &self,
only_files: Option<&[usize]>, only_files: Option<&[usize]>,
progress: &AtomicU64,
) -> anyhow::Result<InitialCheckResults> { ) -> anyhow::Result<InitialCheckResults> {
let mut needed_pieces = BF::from_vec(vec![0u8; self.lengths.piece_bitfield_bytes()]); let mut needed_pieces = BF::from_vec(vec![0u8; self.lengths.piece_bitfield_bytes()]);
let mut have_pieces = BF::from_vec(vec![0u8; self.lengths.piece_bitfield_bytes()]); let mut have_pieces = BF::from_vec(vec![0u8; self.lengths.piece_bitfield_bytes()]);
@ -125,6 +129,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
let mut piece_remaining = piece_info.len as usize; let mut piece_remaining = piece_info.len as usize;
let mut some_files_broken = false; let mut some_files_broken = false;
let mut at_least_one_file_required = current_file.full_file_required; let mut at_least_one_file_required = current_file.full_file_required;
progress.fetch_add(piece_info.len as u64, Ordering::Relaxed);
while piece_remaining > 0 { while piece_remaining > 0 {
let mut to_read_in_file = let mut to_read_in_file =

View file

@ -11,6 +11,7 @@ use librqbit_core::id20::Id20;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tracing::{info, warn}; use tracing::{info, warn};
@ -23,7 +24,7 @@ use crate::session::{
}; };
use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}; use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot};
use crate::torrent_state::stats::snapshot::StatsSnapshot; use crate::torrent_state::stats::snapshot::StatsSnapshot;
use crate::torrent_state::ManagedTorrentHandle; use crate::torrent_state::{ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive};
// Public API // Public API
#[derive(Clone)] #[derive(Clone)]
@ -100,11 +101,18 @@ impl HttpApi {
state.api_dump_haves(idx) state.api_dump_haves(idx)
} }
async fn torrent_stats( async fn torrent_stats_v0(
State(state): State<ApiState>, State(state): State<ApiState>,
Path(idx): Path<usize>, Path(idx): Path<usize>,
) -> Result<impl IntoResponse> { ) -> Result<impl IntoResponse> {
state.api_stats(idx).map(axum::Json) state.api_stats_v0(idx).map(axum::Json)
}
async fn torrent_stats_v1(
State(state): State<ApiState>,
Path(idx): Path<usize>,
) -> Result<impl IntoResponse> {
state.api_stats_v1(idx).map(axum::Json)
} }
async fn peer_stats( async fn peer_stats(
@ -137,7 +145,8 @@ impl HttpApi {
.route("/torrents", get(torrents_list).post(torrents_post)) .route("/torrents", get(torrents_list).post(torrents_post))
.route("/torrents/:id", get(torrent_details)) .route("/torrents/:id", get(torrent_details))
.route("/torrents/:id/haves", get(torrent_haves)) .route("/torrents/:id/haves", get(torrent_haves))
.route("/torrents/:id/stats", get(torrent_stats)) .route("/torrents/:id/stats", get(torrent_stats_v0))
.route("/torrents/:id/stats/v1", get(torrent_stats_v1))
.route("/torrents/:id/peer_stats", get(peer_stats)) .route("/torrents/:id/peer_stats", get(peer_stats))
.route("/torrents/:id/pause", post(torrent_action_pause)) .route("/torrents/:id/pause", post(torrent_action_pause))
.route("/torrents/:id/start", post(torrent_action_start)); .route("/torrents/:id/start", post(torrent_action_start));
@ -196,7 +205,7 @@ impl HttpApi {
type Result<T> = std::result::Result<T, ApiError>; type Result<T> = std::result::Result<T, ApiError>;
#[derive(Serialize)] #[derive(Serialize, Default)]
struct Speed { struct Speed {
mbps: f64, mbps: f64,
human_readable: String, human_readable: String,
@ -261,8 +270,8 @@ impl Serialize for DurationWithHumanReadable {
} }
} }
#[derive(Serialize)] #[derive(Serialize, Default)]
struct StatsResponse { struct LiveStats {
snapshot: StatsSnapshot, snapshot: StatsSnapshot,
average_piece_download_time: Option<Duration>, average_piece_download_time: Option<Duration>,
download_speed: Speed, download_speed: Speed,
@ -270,6 +279,15 @@ struct StatsResponse {
time_remaining: Option<DurationWithHumanReadable>, time_remaining: Option<DurationWithHumanReadable>,
} }
#[derive(Serialize)]
struct StatsResponse {
state: &'static str,
error: Option<String>,
progress_bytes: u64,
total_bytes: u64,
live: Option<LiveStats>,
}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct ApiAddTorrentResponse { pub struct ApiAddTorrentResponse {
pub id: Option<usize>, pub id: Option<usize>,
@ -465,9 +483,7 @@ impl ApiInternal {
Ok(dht.with_routing_table(|r| r.clone())) Ok(dht.with_routing_table(|r| r.clone()))
} }
fn api_stats(&self, idx: TorrentId) -> Result<StatsResponse> { fn make_live_stats(&self, live: &TorrentStateLive) -> LiveStats {
let mgr = self.mgr_handle(idx)?;
let live = mgr.live().context("not live")?;
let snapshot = live.stats_snapshot(); let snapshot = live.stats_snapshot();
let estimator = live.speed_estimator(); let estimator = live.speed_estimator();
@ -476,12 +492,57 @@ impl ApiInternal {
let downloaded_bytes = snapshot.downloaded_and_checked_bytes; let downloaded_bytes = snapshot.downloaded_and_checked_bytes;
let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64; let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64;
Ok(StatsResponse { LiveStats {
average_piece_download_time: snapshot.average_piece_download_time(), average_piece_download_time: snapshot.average_piece_download_time(),
snapshot, snapshot,
all_time_download_speed: (downloaded_mb / elapsed.as_secs_f64()).into(), all_time_download_speed: (downloaded_mb / elapsed.as_secs_f64()).into(),
download_speed: estimator.download_mbps().into(), download_speed: estimator.download_mbps().into(),
time_remaining: estimator.time_remaining().map(DurationWithHumanReadable), time_remaining: estimator.time_remaining().map(DurationWithHumanReadable),
}
}
fn api_stats_v0(&self, idx: TorrentId) -> Result<LiveStats> {
let mgr = self.mgr_handle(idx)?;
let live = mgr.live().context("torrent not live")?;
Ok(self.make_live_stats(&live))
}
fn api_stats_v1(&self, idx: TorrentId) -> Result<StatsResponse> {
let mgr = self.mgr_handle(idx)?;
let mut resp = StatsResponse {
total_bytes: mgr.info().lengths.total_length(),
state: "",
error: None,
progress_bytes: 0,
live: None,
};
mgr.with_state(|s| {
match s {
ManagedTorrentState::Initializing(i) => {
resp.state = "initializing";
resp.progress_bytes = i.checked_bytes.load(Ordering::Relaxed);
}
ManagedTorrentState::Paused(p) => {
resp.state = "paused";
resp.progress_bytes = p.have_bytes;
}
ManagedTorrentState::Live(l) => {
resp.state = "live";
let live_stats = self.make_live_stats(l);
resp.progress_bytes = live_stats.snapshot.downloaded_and_checked_bytes;
resp.live = Some(live_stats);
}
ManagedTorrentState::Error(e) => {
resp.state = "error";
resp.error = Some(format!("{:?}", e))
}
ManagedTorrentState::None => {
resp.state = "error";
resp.error = Some("bug: torrent in broken \"None\" state".to_string());
}
}
Ok(resp)
}) })
} }

View file

@ -412,7 +412,7 @@ impl Session {
{ {
return Ok(AddTorrentResponse::AlreadyManaged(id, handle.clone())); return Ok(AddTorrentResponse::AlreadyManaged(id, handle.clone()));
} }
let managed_torrent = builder.build(); let managed_torrent = builder.build()?;
let id = g.add_torrent(managed_torrent.clone()); let id = g.add_torrent(managed_torrent.clone());
(managed_torrent, id) (managed_torrent, id)
}; };

View file

@ -1,12 +1,11 @@
use std::{ use std::{
fs::{File, OpenOptions}, fs::{File, OpenOptions},
sync::Arc, sync::{atomic::AtomicU64, Arc},
time::Instant, time::Instant,
}; };
use anyhow::Context; use anyhow::Context;
use librqbit_core::{lengths::Lengths, torrent_metainfo::TorrentMetaV1Info};
use parking_lot::Mutex; use parking_lot::Mutex;
use sha1w::Sha1; use sha1w::Sha1;
@ -17,13 +16,6 @@ use crate::{chunk_tracker::ChunkTracker, file_ops::FileOps};
use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; use super::{paused::TorrentStatePaused, ManagedTorrentInfo};
fn make_lengths<ByteBuf: AsRef<[u8]>>(
torrent: &TorrentMetaV1Info<ByteBuf>,
) -> anyhow::Result<Lengths> {
let total_length = torrent.iter_file_lengths()?.sum();
Lengths::new(total_length, torrent.piece_length, None)
}
fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> { fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> {
Ok(file.set_len(length)?) Ok(file.set_len(length)?)
} }
@ -31,11 +23,16 @@ fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> {
pub struct TorrentStateInitializing { pub struct TorrentStateInitializing {
pub(crate) meta: Arc<ManagedTorrentInfo>, pub(crate) meta: Arc<ManagedTorrentInfo>,
pub(crate) only_files: Option<Vec<usize>>, pub(crate) only_files: Option<Vec<usize>>,
pub(crate) checked_bytes: AtomicU64,
} }
impl TorrentStateInitializing { impl TorrentStateInitializing {
pub fn new(meta: Arc<ManagedTorrentInfo>, only_files: Option<Vec<usize>>) -> Self { pub fn new(meta: Arc<ManagedTorrentInfo>, only_files: Option<Vec<usize>>) -> Self {
Self { meta, only_files } Self {
meta,
only_files,
checked_bytes: AtomicU64::new(0),
}
} }
pub async fn check(&self) -> anyhow::Result<TorrentStatePaused> { pub async fn check(&self) -> anyhow::Result<TorrentStatePaused> {
@ -72,14 +69,12 @@ impl TorrentStateInitializing {
(files, filenames) (files, filenames)
}; };
let lengths = debug!("computed lengths: {:?}", &self.meta.lengths);
make_lengths(&self.meta.info).context("unable to compute Lengths from torrent")?;
debug!("computed lengths: {:?}", &lengths);
info!("Doing initial checksum validation, this might take a while..."); info!("Doing initial checksum validation, this might take a while...");
let initial_check_results = self.meta.spawner.spawn_block_in_place(|| { let initial_check_results = self.meta.spawner.spawn_block_in_place(|| {
FileOps::<Sha1>::new(&self.meta.info, &files, &lengths) FileOps::<Sha1>::new(&self.meta.info, &files, &self.meta.lengths)
.initial_check(self.only_files.as_deref()) .initial_check(self.only_files.as_deref(), &self.checked_bytes)
})?; })?;
info!( info!(
@ -122,7 +117,7 @@ impl TorrentStateInitializing {
let chunk_tracker = ChunkTracker::new( let chunk_tracker = ChunkTracker::new(
initial_check_results.needed_pieces, initial_check_results.needed_pieces,
initial_check_results.have_pieces, initial_check_results.have_pieces,
lengths, self.meta.lengths,
); );
let paused = TorrentStatePaused { let paused = TorrentStatePaused {
@ -131,7 +126,6 @@ impl TorrentStateInitializing {
filenames, filenames,
chunk_tracker, chunk_tracker,
have_bytes: initial_check_results.have_bytes, have_bytes: initial_check_results.have_bytes,
needed_bytes: initial_check_results.needed_bytes,
}; };
Ok(paused) Ok(paused)
} }

View file

@ -163,7 +163,7 @@ impl TorrentStateLive {
let speed_estimator = SpeedEstimator::new(5); let speed_estimator = SpeedEstimator::new(5);
let have_bytes = paused.have_bytes; let have_bytes = paused.have_bytes;
let needed_bytes = paused.needed_bytes; let needed_bytes = paused.info.lengths.total_length() - have_bytes;
let lengths = *paused.chunk_tracker.get_lengths(); let lengths = *paused.chunk_tracker.get_lengths();
let state = Arc::new(TorrentStateLive { let state = Arc::new(TorrentStateLive {
@ -533,7 +533,6 @@ impl TorrentStateLive {
fetched_bytes: self.stats.fetched_bytes.load(Relaxed), fetched_bytes: self.stats.fetched_bytes.load(Relaxed),
uploaded_bytes: self.stats.uploaded_bytes.load(Relaxed), uploaded_bytes: self.stats.uploaded_bytes.load(Relaxed),
total_bytes: self.have_plus_needed_bytes, total_bytes: self.have_plus_needed_bytes,
time: Instant::now(),
initially_needed_bytes: self.needed_bytes, initially_needed_bytes: self.needed_bytes,
remaining_bytes: remaining, remaining_bytes: remaining,
total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed),

View file

@ -4,7 +4,7 @@ use serde::Serialize;
use crate::torrent_state::live::peers::stats::snapshot::AggregatePeerStats; use crate::torrent_state::live::peers::stats::snapshot::AggregatePeerStats;
#[derive(Debug, Serialize)] #[derive(Debug, Serialize, Default)]
pub struct StatsSnapshot { pub struct StatsSnapshot {
pub have_bytes: u64, pub have_bytes: u64,
pub downloaded_and_checked_bytes: u64, pub downloaded_and_checked_bytes: u64,
@ -14,8 +14,8 @@ pub struct StatsSnapshot {
pub initially_needed_bytes: u64, pub initially_needed_bytes: u64,
pub remaining_bytes: u64, pub remaining_bytes: u64,
pub total_bytes: u64, pub total_bytes: u64,
#[serde(skip)] // #[serde(skip)]
pub time: Instant, // pub time: Instant,
pub total_piece_download_ms: u64, pub total_piece_download_ms: u64,
pub peer_stats: AggregatePeerStats, pub peer_stats: AggregatePeerStats,
} }

View file

@ -13,6 +13,7 @@ use anyhow::bail;
use anyhow::Context; use anyhow::Context;
use buffers::ByteString; use buffers::ByteString;
use librqbit_core::id20::Id20; use librqbit_core::id20::Id20;
use librqbit_core::lengths::Lengths;
use librqbit_core::peer_id::generate_peer_id; use librqbit_core::peer_id::generate_peer_id;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
@ -73,6 +74,7 @@ pub struct ManagedTorrentInfo {
pub spawner: BlockingSpawner, pub spawner: BlockingSpawner,
pub trackers: Vec<Url>, pub trackers: Vec<Url>,
pub peer_id: Id20, pub peer_id: Id20,
pub lengths: Lengths,
pub(crate) options: ManagedTorrentOptions, pub(crate) options: ManagedTorrentOptions,
} }
@ -286,7 +288,8 @@ impl ManagedTorrentBuilder {
self self
} }
pub(crate) fn build(self) -> ManagedTorrentHandle { pub(crate) fn build(self) -> anyhow::Result<ManagedTorrentHandle> {
let lengths = Lengths::from_torrent(&self.info)?;
let info = Arc::new(ManagedTorrentInfo { let info = Arc::new(ManagedTorrentInfo {
info: self.info, info: self.info,
info_hash: self.info_hash, info_hash: self.info_hash,
@ -294,6 +297,7 @@ impl ManagedTorrentBuilder {
trackers: self.trackers.into_iter().collect(), trackers: self.trackers.into_iter().collect(),
spawner: self.spawner.unwrap_or_default(), spawner: self.spawner.unwrap_or_default(),
peer_id: self.peer_id.unwrap_or_else(generate_peer_id), peer_id: self.peer_id.unwrap_or_else(generate_peer_id),
lengths,
options: ManagedTorrentOptions { options: ManagedTorrentOptions {
force_tracker_interval: self.force_tracker_interval, force_tracker_interval: self.force_tracker_interval,
peer_connect_timeout: self.peer_connect_timeout, peer_connect_timeout: self.peer_connect_timeout,
@ -305,13 +309,13 @@ impl ManagedTorrentBuilder {
info.clone(), info.clone(),
self.only_files.clone(), self.only_files.clone(),
)); ));
Arc::new(ManagedTorrent { Ok(Arc::new(ManagedTorrent {
only_files: self.only_files, only_files: self.only_files,
locked: RwLock::new(ManagedTorrentLocked { locked: RwLock::new(ManagedTorrentLocked {
state: ManagedTorrentState::Initializing(initializing), state: ManagedTorrentState::Initializing(initializing),
}), }),
info, info,
}) }))
} }
} }

View file

@ -12,7 +12,6 @@ pub struct TorrentStatePaused {
pub(crate) filenames: Vec<PathBuf>, pub(crate) filenames: Vec<PathBuf>,
pub(crate) chunk_tracker: ChunkTracker, pub(crate) chunk_tracker: ChunkTracker,
pub(crate) have_bytes: u64, pub(crate) have_bytes: u64,
pub(crate) needed_bytes: u64,
} }
// impl TorrentStatePaused { // impl TorrentStatePaused {

View file

@ -1,4 +1,4 @@
use crate::constants::CHUNK_SIZE; use crate::{constants::CHUNK_SIZE, torrent_metainfo::TorrentMetaV1Info};
const fn is_power_of_two(x: u64) -> bool { const fn is_power_of_two(x: u64) -> bool {
(x != 0) && ((x & (x - 1)) == 0) (x != 0) && ((x & (x - 1)) == 0)
@ -61,6 +61,13 @@ impl ValidPieceIndex {
} }
impl Lengths { impl Lengths {
pub fn from_torrent<ByteBuf: AsRef<[u8]>>(
torrent: &TorrentMetaV1Info<ByteBuf>,
) -> anyhow::Result<Lengths> {
let total_length = torrent.iter_file_lengths()?.sum();
Lengths::new(total_length, torrent.piece_length, None)
}
pub fn new( pub fn new(
total_length: u64, total_length: u64,
piece_length: u32, piece_length: u32,