From 51d1a0b0c7a4c993710653a2b2c18e066d7354fa Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 30 Mar 2024 18:51:05 +0000 Subject: [PATCH] Jeez... --- crates/librqbit/src/chunk_tracker.rs | 212 +++++++++++------- crates/librqbit/src/file_ops.rs | 34 +-- crates/librqbit/src/tests/e2e.rs | 2 +- .../src/torrent_state/initializing.rs | 17 +- crates/librqbit/src/torrent_state/live/mod.rs | 17 +- crates/librqbit/src/torrent_state/mod.rs | 6 +- crates/librqbit/src/torrent_state/paused.rs | 10 +- 7 files changed, 183 insertions(+), 115 deletions(-) diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 72e86ee..4f4644d 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -14,7 +14,9 @@ pub struct ChunkTracker { // // Initially this is the opposite of "have", until we start making requests. // An in-flight request is not in "needed", and not in "have". - needed_pieces: BF, + // + // needed initial value = selected & !have + queue_pieces: BF, // This has a bit set per each chunk (block) that we have written to the output file. // It doesn't mean it's valid yet. Used to track how much is left in each piece. @@ -23,18 +25,35 @@ pub struct ChunkTracker { // These are the pieces that we actually have, fully checked and downloaded. have: BF, + // The pieces that the user selected. This doesn't change unless update_only_files + // was called. + selected: BF, + lengths: Lengths, // What pieces to download first. priority_piece_ids: Vec, - - total_selected_bytes: u64, } #[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub struct HaveNeeded { +pub struct HaveNeededSelected { pub have_bytes: u64, pub needed_bytes: u64, + pub selected_bytes: u64, +} + +impl HaveNeededSelected { + pub const fn progress(&self) -> u64 { + self.selected_bytes - self.needed_bytes + } + + pub const fn total(&self) -> u64 { + self.selected_bytes + } + + pub const fn finished(&self) -> bool { + self.needed_bytes == 0 + } } // Comput the have-status of chunks. @@ -68,6 +87,29 @@ fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BF) -> anyhow::Res Ok(chunk_bf) } +fn compute_queued_pieces_unchecked(have_pieces: &BF, selected_pieces: &BF) -> BF { + // it's needed ONLY if it's selected and we don't have it. + use core::ops::BitAnd; + use core::ops::Not; + + have_pieces.clone().not().bitand(selected_pieces) +} + +fn compute_queued_pieces(have_pieces: &BF, selected_pieces: &BF) -> anyhow::Result { + if have_pieces.len() != selected_pieces.len() { + anyhow::bail!( + "have_pieces.len() != selected_pieces.len(), {} != {}", + have_pieces.len(), + selected_pieces.len() + ); + } + + Ok(compute_queued_pieces_unchecked( + have_pieces, + selected_pieces, + )) +} + pub enum ChunkMarkingResult { PreviouslyCompleted, NotCompleted, @@ -76,15 +118,15 @@ pub enum ChunkMarkingResult { impl ChunkTracker { pub fn new( - // Needed pieces are the ones we need to download. NOTE: if all files are selected, - // this is the inverse of have_pieces. But if partial files are selected, we may need more/less - // than we have. - needed_pieces: BF, // Have pieces are the ones we have already downloaded and verified. have_pieces: BF, + // Selected pieces are the ones the user has selected + selected_pieces: BF, lengths: Lengths, - total_selected_bytes: u64, ) -> anyhow::Result { + let needed_pieces = compute_queued_pieces(&have_pieces, &selected_pieces) + .context("error computing needed pieces")?; + // TODO: ideally this needs to be a list based on needed files, e.g. // last needed piece for each file. But let's keep simple for now. @@ -103,18 +145,14 @@ impl ChunkTracker { Ok(Self { chunk_status: compute_chunk_have_status(&lengths, &have_pieces) .context("error computing chunk status")?, - needed_pieces, + queue_pieces: needed_pieces, + selected: selected_pieces, lengths, have: have_pieces, priority_piece_ids, - total_selected_bytes, }) } - pub fn get_total_selected_bytes(&self) -> u64 { - self.total_selected_bytes - } - pub fn get_lengths(&self) -> &Lengths { &self.lengths } @@ -123,7 +161,7 @@ impl ChunkTracker { &self.have } pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) { - self.needed_pieces.set(index.get() as usize, false) + self.queue_pieces.set(index.get() as usize, false) } pub fn calc_have_bytes(&self) -> u64 { @@ -137,22 +175,28 @@ impl ChunkTracker { } pub fn calc_needed_bytes(&self) -> u64 { - self.needed_pieces - .iter_ones() - .filter_map(|piece_id| { - let piece_id = self.lengths.validate_piece_index(piece_id as u32)?; - Some(self.lengths.piece_length(piece_id) as u64) + self.have + .iter() + .zip(self.selected.iter()) + .enumerate() + .filter_map(|(piece_id, (have, selected))| { + if *selected && !*have { + let piece_id = self.lengths.validate_piece_index(piece_id as u32)?; + Some(self.lengths.piece_length(piece_id) as u64) + } else { + None + } }) .sum() } - pub fn iter_needed_pieces(&self) -> impl Iterator + '_ { + pub fn iter_queued_pieces(&self) -> impl Iterator + '_ { self.priority_piece_ids .iter() .copied() - .filter(move |piece_id| self.needed_pieces[*piece_id]) + .filter(move |piece_id| self.queue_pieces[*piece_id]) .chain( - self.needed_pieces + self.queue_pieces .iter_ones() .filter(move |id| !self.priority_piece_ids.contains(id)), ) @@ -172,7 +216,7 @@ impl ChunkTracker { // This will trigger the requesters to re-check each chunk in this piece. let chunk_range = self.lengths.chunk_range(index); if !self.chunk_status.get(chunk_range)?.all() { - self.needed_pieces.set(index.get() as usize, true); + self.queue_pieces.set(index.get() as usize, true); } Some(true) } @@ -187,7 +231,7 @@ impl ChunkTracker { return; } debug!("remarking piece={} as broken", index); - self.needed_pieces.set(index.get() as usize, true); + self.queue_pieces.set(index.get() as usize, true); if let Some(s) = self.chunk_status.get_mut(self.lengths.chunk_range(index)) { s.fill(false); } @@ -244,14 +288,15 @@ impl ChunkTracker { file_lengths_iterator: impl IntoIterator, // TODO: maybe make this a BF new_only_files: &HashSet, - ) -> anyhow::Result { + ) -> anyhow::Result { let mut piece_it = self.lengths.iter_piece_infos(); let mut current_piece = piece_it .next() .context("bug: iter_piece_infos() returned empty iterator")?; - let mut current_piece_needed = false; + let mut current_piece_selected = false; let mut current_piece_remaining = current_piece.len; let mut have_bytes = 0u64; + let mut selected_bytes = 0u64; let mut needed_bytes = 0u64; for (idx, len) in file_lengths_iterator.into_iter().enumerate() { @@ -260,31 +305,38 @@ impl ChunkTracker { let mut remaining_file_len = len; while remaining_file_len > 0 { - current_piece_needed |= len > 0 && file_required; + current_piece_selected |= len > 0 && file_required; let shift = std::cmp::min(current_piece_remaining as u64, remaining_file_len); assert!(shift > 0); remaining_file_len -= shift; current_piece_remaining -= shift as u32; - dbg!( - idx, - shift, - remaining_file_len, - current_piece_remaining, - current_piece_needed, - file_required, - current_piece - ); + // dbg!( + // idx, + // shift, + // remaining_file_len, + // current_piece_remaining, + // current_piece_needed, + // file_required, + // current_piece + // ); if current_piece_remaining == 0 { let current_piece_have = self.have[current_piece.piece_index.get() as usize]; if current_piece_have { have_bytes += current_piece.len as u64; } - if current_piece_needed { + if current_piece_selected { + selected_bytes += current_piece.len as u64; + } + if current_piece_selected && !current_piece_have { needed_bytes += current_piece.len as u64; } - match (current_piece_needed, current_piece_have) { + self.selected.set( + current_piece.piece_index.get() as usize, + current_piece_selected, + ); + match (current_piece_selected, current_piece_have) { (true, true) => {} (true, false) => { dbg!(self.mark_piece_broken_if_not_have(current_piece.piece_index)) @@ -293,7 +345,7 @@ impl ChunkTracker { (false, false) => { // don't need the piece, and don't have it - cancel downloading it dbg!(self - .needed_pieces + .queue_pieces .set(current_piece.piece_index.get() as usize, false)); } } @@ -302,16 +354,17 @@ impl ChunkTracker { current_piece = piece_it.next().context( "bug: iter_piece_infos() pieces ended earlier than expected", )?; - current_piece_needed = false; + current_piece_selected = false; current_piece_remaining = current_piece.len; } } } } - Ok(HaveNeeded { + Ok(HaveNeededSelected { have_bytes, needed_bytes, + selected_bytes, }) } } @@ -322,7 +375,7 @@ mod tests { use librqbit_core::{constants::CHUNK_SIZE, lengths::Lengths}; - use crate::{chunk_tracker::HaveNeeded, type_aliases::BF}; + use crate::{chunk_tracker::HaveNeededSelected, type_aliases::BF}; use super::{compute_chunk_have_status, ChunkTracker}; @@ -444,103 +497,104 @@ mod tests { let initial_needed = BF::from_boxed_slice(vec![u8::MAX; bf_len].into_boxed_slice()); // Initially, we need all files and all pieces. - let mut ct = ChunkTracker::new( - initial_needed.clone(), - initial_have.clone(), - l, - l.total_length(), - ) - .unwrap(); + let mut ct = ChunkTracker::new(initial_needed.clone(), initial_have.clone(), l).unwrap(); // Select all file, no changes. assert_eq!( ct.update_only_files(all_files.into_iter(), &HashSet::from_iter([0, 1, 2, 3])) .unwrap(), - HaveNeeded { + HaveNeededSelected { have_bytes: 0, - needed_bytes: total_len + selected_bytes: total_len, + needed_bytes: total_len, } ); assert_eq!(ct.have, initial_have); - assert_eq!(ct.needed_pieces, initial_needed); + assert_eq!(ct.queue_pieces, initial_needed); // Select only the first file. println!("Select only the first file."); assert_eq!( ct.update_only_files(all_files, &HashSet::from_iter([0])) .unwrap(), - HaveNeeded { + HaveNeededSelected { have_bytes: 0, + selected_bytes: all_files[0], needed_bytes: all_files[0], } ); - assert_eq!(ct.needed_pieces[0], true); - assert_eq!(ct.needed_pieces[1], false); - assert_eq!(ct.needed_pieces[2], false); + assert_eq!(ct.queue_pieces[0], true); + assert_eq!(ct.queue_pieces[1], false); + assert_eq!(ct.queue_pieces[2], false); // Select only the second file. assert_eq!( ct.update_only_files(all_files, &HashSet::from_iter([1])) .unwrap(), - HaveNeeded { + HaveNeededSelected { have_bytes: 0, + selected_bytes: piece_len as u64, needed_bytes: piece_len as u64, } ); - assert_eq!(ct.needed_pieces[0], false); - assert_eq!(ct.needed_pieces[1], true); - assert_eq!(ct.needed_pieces[2], false); + assert_eq!(ct.queue_pieces[0], false); + assert_eq!(ct.queue_pieces[1], true); + assert_eq!(ct.queue_pieces[2], false); // Select only the third file (zero sized one!). assert_eq!( ct.update_only_files(all_files, &HashSet::from_iter([2])) .unwrap(), - HaveNeeded { + HaveNeededSelected { have_bytes: 0, + selected_bytes: 0, needed_bytes: 0, } ); - assert_eq!(ct.needed_pieces[0], false); - assert_eq!(ct.needed_pieces[1], false); - assert_eq!(ct.needed_pieces[2], false); + assert_eq!(ct.queue_pieces[0], false); + assert_eq!(ct.queue_pieces[1], false); + assert_eq!(ct.queue_pieces[2], false); // Select only the fourth file. assert_eq!( ct.update_only_files(all_files, &HashSet::from_iter([3])) .unwrap(), - HaveNeeded { + HaveNeededSelected { have_bytes: 0, + selected_bytes: (piece_len + 1) as u64, needed_bytes: (piece_len + 1) as u64, } ); - assert_eq!(ct.needed_pieces[0], false); - assert_eq!(ct.needed_pieces[1], true); - assert_eq!(ct.needed_pieces[2], true); + assert_eq!(ct.queue_pieces[0], false); + assert_eq!(ct.queue_pieces[1], true); + assert_eq!(ct.queue_pieces[2], true); // Select first and last file assert_eq!( ct.update_only_files(all_files.clone(), &HashSet::from_iter([0, 3])) .unwrap(), - HaveNeeded { + HaveNeededSelected { have_bytes: 0, + selected_bytes: all_files[0] + all_files[3] + 1, needed_bytes: all_files[0] + all_files[3] + 1, } ); - assert_eq!(ct.needed_pieces[0], true); - assert_eq!(ct.needed_pieces[1], true); - assert_eq!(ct.needed_pieces[2], true); + assert_eq!(ct.queue_pieces[0], true); + assert_eq!(ct.queue_pieces[1], true); + assert_eq!(ct.queue_pieces[2], true); // Select all files assert_eq!( ct.update_only_files(all_files.clone(), &HashSet::from_iter([0, 1, 2, 3])) .unwrap(), - HaveNeeded { + HaveNeededSelected { have_bytes: 0, - needed_bytes: total_len, + selected_bytes: total_len, + needed_bytes: total_len } ); - assert_eq!(ct.needed_pieces[0], true); - assert_eq!(ct.needed_pieces[1], true); - assert_eq!(ct.needed_pieces[2], true); + assert_eq!(ct.queue_pieces[0], true); + assert_eq!(ct.queue_pieces[1], true); + assert_eq!(ct.queue_pieces[2], true); } } diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index 4d051d9..f395218 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -22,18 +22,26 @@ use tracing::{debug, trace, warn}; use crate::type_aliases::{PeerHandle, BF}; pub(crate) struct InitialCheckResults { - // The pieces that we need to download. - pub needed_pieces: BF, + // A piece as flags based on these dimensions: + // - if the asked for it or not (only_files) + // - if we have it downloaded and verified + // - if we need to queue it for downloading + // this one depends if we queued it already or not. + // The pieces we have downloaded. pub have_pieces: BF, + // The pieces that the user selected to download. + pub selected_pieces: BF, + // How many bytes we have. This can be MORE than "total_selected_bytes", // if we downloaded some pieces, and later the "only_files" was changed. pub have_bytes: u64, // How many bytes we need to download. pub needed_bytes: u64, + // How many bytes are in selected pieces. // If all selected, this must be equal to total torrent length. - pub total_selected_bytes: u64, + pub selected_bytes: u64, } pub fn update_hash_from_file( @@ -82,8 +90,8 @@ impl<'a> FileOps<'a> { ) -> anyhow::Result { let mut needed_pieces = BF::from_boxed_slice(vec![0u8; self.lengths.piece_bitfield_bytes()].into()); - let mut have_pieces = - BF::from_boxed_slice(vec![0u8; self.lengths.piece_bitfield_bytes()].into()); + let mut have_pieces = needed_pieces.clone(); + let mut selected_pieces = needed_pieces.clone(); let mut have_bytes = 0u64; let mut needed_bytes = 0u64; @@ -139,7 +147,7 @@ impl<'a> FileOps<'a> { let mut computed_hash = Sha1::new(); let mut piece_remaining = piece_info.len as usize; let mut some_files_broken = false; - let mut at_least_one_file_required = current_file.full_file_required; + let mut piece_selected = current_file.full_file_required; progress.fetch_add(piece_info.len as u64, Ordering::Relaxed); while piece_remaining > 0 { @@ -152,7 +160,7 @@ impl<'a> FileOps<'a> { .next() .ok_or_else(|| anyhow::anyhow!("broken torrent metadata"))?; - at_least_one_file_required |= current_file.full_file_required; + piece_selected |= current_file.full_file_required; to_read_in_file = std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize; @@ -186,18 +194,18 @@ impl<'a> FileOps<'a> { } } - if at_least_one_file_required { + if piece_selected { total_selected_bytes += piece_info.len as u64; + selected_pieces.set(piece_info.piece_index.get() as usize, true); } - if at_least_one_file_required && some_files_broken { + if piece_selected && some_files_broken { trace!( "piece {} had errors, marking as needed", piece_info.piece_index ); needed_bytes += piece_info.len as u64; - needed_pieces.set(piece_info.piece_index.get() as usize, true); continue; } @@ -212,7 +220,7 @@ impl<'a> FileOps<'a> { ); have_bytes += piece_info.len as u64; have_pieces.set(piece_info.piece_index.get() as usize, true); - } else if at_least_one_file_required { + } else if piece_selected { trace!( "piece {} hash does not match, marking as needed", piece_info.piece_index @@ -228,11 +236,11 @@ impl<'a> FileOps<'a> { } Ok(InitialCheckResults { - needed_pieces, have_pieces, + selected_pieces, have_bytes, needed_bytes, - total_selected_bytes, + selected_bytes: total_selected_bytes, }) } diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 3920709..8498920 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -253,7 +253,7 @@ async fn test_e2e() { .with_state(|s| match s { crate::ManagedTorrentState::Initializing(_) => Ok(false), crate::ManagedTorrentState::Paused(p) => { - assert_eq!(p.needed_bytes, 0); + assert_eq!(p.hns.needed_bytes, 0); Ok(true) } _ => bail!("bugged state"), diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index fa02232..b4aa62b 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -11,7 +11,10 @@ use parking_lot::Mutex; use size_format::SizeFormatterBinary as SF; use tracing::{debug, info, warn}; -use crate::{chunk_tracker::ChunkTracker, file_ops::FileOps}; +use crate::{ + chunk_tracker::{ChunkTracker, HaveNeededSelected}, + file_ops::FileOps, +}; use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; @@ -88,7 +91,7 @@ impl TorrentStateInitializing { "Initial check results: have {}, needed {}, total selected {}", SF::new(initial_check_results.have_bytes), SF::new(initial_check_results.needed_bytes), - SF::new(initial_check_results.total_selected_bytes) + SF::new(initial_check_results.selected_bytes) ); self.meta.spawner.spawn_block_in_place(|| { @@ -123,10 +126,9 @@ impl TorrentStateInitializing { }); let chunk_tracker = ChunkTracker::new( - initial_check_results.needed_pieces, initial_check_results.have_pieces, + initial_check_results.selected_pieces, self.meta.lengths, - initial_check_results.total_selected_bytes, ) .context("error creating chunk tracker")?; @@ -135,8 +137,11 @@ impl TorrentStateInitializing { files, filenames, chunk_tracker, - have_bytes: initial_check_results.have_bytes, - needed_bytes: initial_check_results.needed_bytes, + hns: HaveNeededSelected { + have_bytes: initial_check_results.have_bytes, + needed_bytes: initial_check_results.needed_bytes, + selected_bytes: initial_check_results.selected_bytes, + }, }; Ok(paused) } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 85d4f1c..0da2308 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -83,7 +83,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, error_span, info, trace, warn}; use crate::{ - chunk_tracker::{ChunkMarkingResult, ChunkTracker}, + chunk_tracker::{ChunkMarkingResult, ChunkTracker, HaveNeededSelected}, file_ops::FileOps, peer_connection::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, @@ -203,9 +203,9 @@ impl TorrentStateLive { let down_speed_estimator = SpeedEstimator::new(5); let up_speed_estimator = SpeedEstimator::new(5); - let have_bytes = paused.have_bytes; - let needed_bytes = paused.needed_bytes; - let total_selected_bytes = paused.chunk_tracker.get_total_selected_bytes(); + let have_bytes = paused.hns.have_bytes; + let needed_bytes = paused.hns.needed_bytes; + let total_selected_bytes = paused.hns.selected_bytes; let lengths = *paused.chunk_tracker.get_lengths(); let state = Arc::new(TorrentStateLive { @@ -676,8 +676,11 @@ impl TorrentStateLive { files, filenames, chunk_tracker, - have_bytes, - needed_bytes, + hns: HaveNeededSelected { + have_bytes, + needed_bytes, + selected_bytes: self.total_selected_bytes, + }, }) } @@ -916,7 +919,7 @@ impl PeerHandler { let n = { let mut n_opt = None; let bf = &live.bitfield; - for n in g.get_chunks()?.iter_needed_pieces() { + for n in g.get_chunks()?.iter_queued_pieces() { if bf.get(n).map(|v| *v) == Some(true) { n_opt = Some(n); break; diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index e698374..eaeab1c 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -353,9 +353,9 @@ impl ManagedTorrent { } ManagedTorrentState::Paused(p) => { resp.state = S::Paused; - resp.total_bytes = p.chunk_tracker.get_total_selected_bytes(); - resp.progress_bytes = resp.total_bytes - p.needed_bytes; - resp.finished = resp.progress_bytes == resp.total_bytes; + resp.total_bytes = p.hns.total(); + resp.progress_bytes = p.hns.progress(); + resp.finished = p.hns.finished(); } ManagedTorrentState::Live(l) => { resp.state = S::Live; diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index 5f06278..c5c079e 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -2,7 +2,7 @@ use std::{collections::HashSet, fs::File, path::PathBuf, sync::Arc}; use parking_lot::Mutex; -use crate::chunk_tracker::ChunkTracker; +use crate::chunk_tracker::{ChunkTracker, HaveNeededSelected}; use super::ManagedTorrentInfo; @@ -11,17 +11,15 @@ pub struct TorrentStatePaused { pub(crate) files: Vec>>, pub(crate) filenames: Vec, pub(crate) chunk_tracker: ChunkTracker, - pub(crate) have_bytes: u64, - pub(crate) needed_bytes: u64, + pub(crate) hns: HaveNeededSelected, } impl TorrentStatePaused { pub(crate) fn update_only_files(&mut self, only_files: &HashSet) -> anyhow::Result<()> { - let hn = self + let hns = self .chunk_tracker .update_only_files(self.info.info.iter_file_lengths()?, only_files)?; - self.have_bytes = hn.have_bytes; - self.needed_bytes = hn.needed_bytes; + self.hns = hns; Ok(()) } }