From 325855ba56ae26bc8fc52dc80418ecbe9fb27c1a Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 14 Dec 2023 11:58:09 +0000 Subject: [PATCH] Fix only files not working properly (#59) * 1/n fixing only files - tracking stats better * 2/n proper tracking of stats when only certain files selected --- crates/librqbit/src/chunk_tracker.rs | 24 +++++++++- crates/librqbit/src/file_ops.rs | 20 +++++++- .../src/torrent_state/initializing.rs | 7 ++- crates/librqbit/src/torrent_state/live/mod.rs | 16 ++++--- .../src/torrent_state/live/stats/snapshot.rs | 8 ++-- crates/librqbit/src/torrent_state/mod.rs | 14 ++++-- crates/librqbit/src/torrent_state/paused.rs | 1 + crates/rqbit/src/main.rs | 47 +++++++++---------- 8 files changed, 91 insertions(+), 46 deletions(-) diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index dea12e7..4369963 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -24,6 +24,8 @@ pub struct ChunkTracker { // What pieces to download first. priority_piece_ids: Vec, + + total_selected_bytes: u64, } // TODO: this should be redone from "have" pieces, not from "needed" pieces. @@ -58,7 +60,12 @@ pub enum ChunkMarkingResult { } impl ChunkTracker { - pub fn new(needed_pieces: BF, have_pieces: BF, lengths: Lengths) -> Self { + pub fn new( + needed_pieces: BF, + have_pieces: BF, + lengths: Lengths, + total_selected_bytes: u64, + ) -> Self { // TODO: ideally this needs to be a list based on needed files, e.g. // last needed piece for each file. But let's keep simple for now. @@ -80,9 +87,14 @@ impl ChunkTracker { lengths, have: have_pieces, priority_piece_ids, + total_selected_bytes, } } + pub fn get_total_selected_bytes(&self) -> u64 { + self.total_selected_bytes + } + pub fn get_lengths(&self) -> &Lengths { &self.lengths } @@ -104,6 +116,16 @@ impl ChunkTracker { .sum() } + pub fn calc_needed_bytes(&self) -> u64 { + self.needed_pieces + .iter_ones() + .filter_map(|piece_id| { + let piece_id = self.lengths.validate_piece_index(piece_id as u32)?; + Some(self.lengths.piece_length(piece_id) as u64) + }) + .sum() + } + pub fn iter_needed_pieces(&self) -> impl Iterator + '_ { self.priority_piece_ids .iter() diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index 80cb07c..6472426 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -22,10 +22,18 @@ use tracing::{debug, trace, warn}; use crate::type_aliases::{PeerHandle, BF}; pub(crate) struct InitialCheckResults { + // The pieces that we need to download. pub needed_pieces: BF, + // The pieces we have downloaded. pub have_pieces: BF, + // How many bytes we have. This can be MORE than "total_selected_bytes", + // if we downloaded some pieces, and later the "only_files" was changed. pub have_bytes: u64, + // How many bytes we need to download. pub needed_bytes: u64, + // How many bytes are in selected pieces. + // If all selected, this must be equal to total torrent length. + pub total_selected_bytes: u64, } pub fn update_hash_from_file( @@ -77,6 +85,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { let mut have_bytes = 0u64; let mut needed_bytes = 0u64; + let mut total_selected_bytes = 0u64; #[derive(Debug)] struct CurrentFile<'a> { @@ -135,6 +144,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { let mut to_read_in_file = std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize; + // Keep changing the current file to next until we find a file that has greater than 0 length. while to_read_in_file == 0 { current_file = file_iterator .next() @@ -157,7 +167,8 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { let mut fd = current_file.fd.lock(); - fd.seek(SeekFrom::Start(pos)).unwrap(); + fd.seek(SeekFrom::Start(pos)) + .context("bug? error seeking")?; if let Err(err) = update_hash_from_file( &mut fd, &mut computed_hash, @@ -173,6 +184,10 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { } } + if at_least_one_file_required { + total_selected_bytes += piece_info.len as u64; + } + if at_least_one_file_required && some_files_broken { trace!( "piece {} had errors, marking as needed", @@ -187,7 +202,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { if self .torrent .compare_hash(piece_info.piece_index.get(), computed_hash.finish()) - .unwrap() + .context("bug: either torrent info broken or we have a bug - piece index invalid")? { trace!( "piece {} is fine, not marking as needed", @@ -215,6 +230,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { have_pieces, have_bytes, needed_bytes, + total_selected_bytes, }) } diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 45d7ada..ffe80c4 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -86,9 +86,10 @@ impl TorrentStateInitializing { })?; info!( - "Initial check results: have {}, needed {}", + "Initial check results: have {}, needed {}, total selected {}", SF::new(initial_check_results.have_bytes), - SF::new(initial_check_results.needed_bytes) + SF::new(initial_check_results.needed_bytes), + SF::new(initial_check_results.total_selected_bytes) ); self.meta.spawner.spawn_block_in_place(|| { @@ -126,6 +127,7 @@ impl TorrentStateInitializing { initial_check_results.needed_pieces, initial_check_results.have_pieces, self.meta.lengths, + initial_check_results.total_selected_bytes, ); let paused = TorrentStatePaused { @@ -134,6 +136,7 @@ impl TorrentStateInitializing { filenames, chunk_tracker, have_bytes: initial_check_results.have_bytes, + needed_bytes: initial_check_results.needed_bytes, }; Ok(paused) } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 67d4e8e..c439df2 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -174,6 +174,7 @@ pub struct TorrentStateLive { filenames: Vec, initially_needed_bytes: u64, + total_selected_bytes: u64, stats: AtomicStats, lengths: Lengths, @@ -203,7 +204,8 @@ impl TorrentStateLive { let up_speed_estimator = SpeedEstimator::new(5); let have_bytes = paused.have_bytes; - let needed_bytes = paused.info.lengths.total_length() - have_bytes; + let needed_bytes = paused.needed_bytes; + let total_selected_bytes = paused.chunk_tracker.get_total_selected_bytes(); let lengths = *paused.chunk_tracker.get_lengths(); let state = Arc::new(TorrentStateLive { @@ -222,6 +224,7 @@ impl TorrentStateLive { }, initially_needed_bytes: needed_bytes, lengths, + total_selected_bytes, peer_semaphore: Arc::new(Semaphore::new(128)), peer_queue_tx, finished_notify: Notify::new(), @@ -599,6 +602,10 @@ impl TorrentStateLive { }); } + pub fn get_total_selected_bytes(&self) -> u64 { + self.total_selected_bytes + } + pub fn get_uploaded_bytes(&self) -> u64 { self.stats.uploaded_bytes.load(Ordering::Relaxed) } @@ -690,16 +697,11 @@ impl TorrentStateLive { pub fn stats_snapshot(&self) -> StatsSnapshot { use Ordering::*; let downloaded_bytes = self.stats.downloaded_and_checked_bytes.load(Relaxed); - let remaining = self.initially_needed_bytes - downloaded_bytes; StatsSnapshot { - have_bytes: self.stats.have_bytes.load(Relaxed), downloaded_and_checked_bytes: downloaded_bytes, downloaded_and_checked_pieces: self.stats.downloaded_and_checked_pieces.load(Relaxed), fetched_bytes: self.stats.fetched_bytes.load(Relaxed), uploaded_bytes: self.stats.uploaded_bytes.load(Relaxed), - total_bytes: self.lengths.total_length(), - initially_needed_bytes: self.initially_needed_bytes, - remaining_bytes: remaining, total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), peer_stats: self.peers.stats(), } @@ -750,6 +752,7 @@ impl TorrentStateLive { chunk_tracker.mark_piece_broken(piece_id); } let have_bytes = chunk_tracker.calc_have_bytes(); + let needed_bytes = chunk_tracker.calc_needed_bytes(); // g.chunks; Ok(TorrentStatePaused { @@ -758,6 +761,7 @@ impl TorrentStateLive { filenames, chunk_tracker, have_bytes, + needed_bytes, }) } diff --git a/crates/librqbit/src/torrent_state/live/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/stats/snapshot.rs index 2e5dd53..dc12998 100644 --- a/crates/librqbit/src/torrent_state/live/stats/snapshot.rs +++ b/crates/librqbit/src/torrent_state/live/stats/snapshot.rs @@ -6,14 +6,12 @@ use crate::torrent_state::live::peers::stats::snapshot::AggregatePeerStats; #[derive(Debug, Serialize, Default)] pub struct StatsSnapshot { - pub have_bytes: u64, pub downloaded_and_checked_bytes: u64, - pub downloaded_and_checked_pieces: u64, + pub fetched_bytes: u64, pub uploaded_bytes: u64, - pub initially_needed_bytes: u64, - pub remaining_bytes: u64, - pub total_bytes: u64, + + pub downloaded_and_checked_pieces: u64, pub total_piece_download_ms: u64, pub peer_stats: AggregatePeerStats, } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 6df68fe..92f1c70 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -364,14 +364,20 @@ impl ManagedTorrent { } ManagedTorrentState::Paused(p) => { resp.state = "paused"; - resp.progress_bytes = p.have_bytes; - resp.finished = p.have_bytes == resp.total_bytes; + resp.total_bytes = p.chunk_tracker.get_total_selected_bytes(); + resp.progress_bytes = resp.total_bytes - p.needed_bytes; + resp.finished = resp.progress_bytes == resp.total_bytes; } ManagedTorrentState::Live(l) => { resp.state = "live"; let live_stats = LiveStats::from(l.as_ref()); - resp.progress_bytes = live_stats.snapshot.have_bytes; - resp.finished = resp.progress_bytes == resp.total_bytes; + let total = l.get_total_selected_bytes(); + let remaining = l.get_left_to_download_bytes(); + let progress = total - remaining; + + resp.progress_bytes = progress; + resp.total_bytes = total; + resp.finished = remaining == 0; resp.live = Some(live_stats); } ManagedTorrentState::Error(e) => { diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index 62a4553..b8ac7c1 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -12,6 +12,7 @@ pub struct TorrentStatePaused { pub(crate) filenames: Vec, pub(crate) chunk_tracker: ChunkTracker, pub(crate) have_bytes: u64, + pub(crate) needed_bytes: u64, } // impl TorrentStatePaused { diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 873a09d..ceb5fa5 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -7,7 +7,7 @@ use librqbit::{ http_api::{HttpApi, HttpApiOptions}, http_api_client, librqbit_spawn, tracing_subscriber_config_utils::{init_logging, InitLoggingOptions}, - AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ListOnlyResponse, ManagedTorrentState, + AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ListOnlyResponse, PeerConnectionOptions, Session, SessionOptions, }; use size_format::SizeFormatterBinary as SF; @@ -271,29 +271,23 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { loop { session.with_torrents(|torrents| { for (idx, torrent) in torrents { - let live = torrent.with_state(|s| { - match s { - ManagedTorrentState::Initializing(i) => { - let total = torrent.get_total_bytes(); - let progress = i.get_checked_bytes(); - let pct = (progress as f64 / total as f64) * 100f64; - info!("[{}] initializing {:.2}%", idx, pct) - }, - ManagedTorrentState::Live(h) => return Some(h.clone()), - _ => {}, - }; - None - }); - let handle = match live { - Some(live) => live, - None => continue + let stats = torrent.stats(); + if stats.state == "initializing" { + let total = stats.total_bytes; + let progress = stats.progress_bytes; + let pct = (progress as f64 / total as f64) * 100f64; + info!("[{}] initializing {:.2}%", idx, pct); + continue; + } + let (live, live_stats) = match (torrent.live(), stats.live.as_ref()) { + (Some(live), Some(live_stats)) => (live, live_stats), + _ => continue }; - let stats = handle.stats_snapshot(); - let down_speed = handle.down_speed_estimator(); - let up_speed = handle.up_speed_estimator(); + let down_speed = live.down_speed_estimator(); + let up_speed = live.up_speed_estimator(); let total = stats.total_bytes; - let progress = stats.total_bytes - stats.remaining_bytes; - let downloaded_pct = if stats.remaining_bytes == 0 { + let progress = stats.progress_bytes; + let downloaded_pct = if stats.finished { 100f64 } else { (progress as f64 / total as f64) * 100f64 @@ -303,6 +297,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { Some(d) => format!(", ETA: {:?}", d), None => String::new() }; + let peer_stats = &live_stats.snapshot.peer_stats; info!( "[{}]: {:.2}% ({:.2} / {:.2}), ↓{:.2} MiB/s, ↑{:.2} MiB/s ({:.2}){}, {{live: {}, queued: {}, dead: {}}}", idx, @@ -311,11 +306,11 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { SF::new(total), down_speed.mbps(), up_speed.mbps(), - SF::new(stats.uploaded_bytes), + SF::new(live_stats.snapshot.uploaded_bytes), eta, - stats.peer_stats.live + stats.peer_stats.connecting, - stats.peer_stats.queued, - stats.peer_stats.dead, + peer_stats.live + peer_stats.connecting, + peer_stats.queued, + peer_stats.dead, ); } });