This commit is contained in:
Igor Katson 2024-03-30 18:51:05 +00:00
parent efcffdd072
commit 51d1a0b0c7
7 changed files with 183 additions and 115 deletions

View file

@ -14,7 +14,9 @@ pub struct ChunkTracker {
//
// Initially this is the opposite of "have", until we start making requests.
// An in-flight request is not in "needed", and not in "have".
needed_pieces: BF,
//
// needed initial value = selected & !have
queue_pieces: BF,
// This has a bit set per each chunk (block) that we have written to the output file.
// It doesn't mean it's valid yet. Used to track how much is left in each piece.
@ -23,18 +25,35 @@ pub struct ChunkTracker {
// These are the pieces that we actually have, fully checked and downloaded.
have: BF,
// The pieces that the user selected. This doesn't change unless update_only_files
// was called.
selected: BF,
lengths: Lengths,
// What pieces to download first.
priority_piece_ids: Vec<usize>,
total_selected_bytes: u64,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct HaveNeeded {
pub struct HaveNeededSelected {
pub have_bytes: u64,
pub needed_bytes: u64,
pub selected_bytes: u64,
}
impl HaveNeededSelected {
pub const fn progress(&self) -> u64 {
self.selected_bytes - self.needed_bytes
}
pub const fn total(&self) -> u64 {
self.selected_bytes
}
pub const fn finished(&self) -> bool {
self.needed_bytes == 0
}
}
// Comput the have-status of chunks.
@ -68,6 +87,29 @@ fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BF) -> anyhow::Res
Ok(chunk_bf)
}
fn compute_queued_pieces_unchecked(have_pieces: &BF, selected_pieces: &BF) -> BF {
// it's needed ONLY if it's selected and we don't have it.
use core::ops::BitAnd;
use core::ops::Not;
have_pieces.clone().not().bitand(selected_pieces)
}
fn compute_queued_pieces(have_pieces: &BF, selected_pieces: &BF) -> anyhow::Result<BF> {
if have_pieces.len() != selected_pieces.len() {
anyhow::bail!(
"have_pieces.len() != selected_pieces.len(), {} != {}",
have_pieces.len(),
selected_pieces.len()
);
}
Ok(compute_queued_pieces_unchecked(
have_pieces,
selected_pieces,
))
}
pub enum ChunkMarkingResult {
PreviouslyCompleted,
NotCompleted,
@ -76,15 +118,15 @@ pub enum ChunkMarkingResult {
impl ChunkTracker {
pub fn new(
// Needed pieces are the ones we need to download. NOTE: if all files are selected,
// this is the inverse of have_pieces. But if partial files are selected, we may need more/less
// than we have.
needed_pieces: BF,
// Have pieces are the ones we have already downloaded and verified.
have_pieces: BF,
// Selected pieces are the ones the user has selected
selected_pieces: BF,
lengths: Lengths,
total_selected_bytes: u64,
) -> anyhow::Result<Self> {
let needed_pieces = compute_queued_pieces(&have_pieces, &selected_pieces)
.context("error computing needed pieces")?;
// TODO: ideally this needs to be a list based on needed files, e.g.
// last needed piece for each file. But let's keep simple for now.
@ -103,18 +145,14 @@ impl ChunkTracker {
Ok(Self {
chunk_status: compute_chunk_have_status(&lengths, &have_pieces)
.context("error computing chunk status")?,
needed_pieces,
queue_pieces: needed_pieces,
selected: selected_pieces,
lengths,
have: have_pieces,
priority_piece_ids,
total_selected_bytes,
})
}
pub fn get_total_selected_bytes(&self) -> u64 {
self.total_selected_bytes
}
pub fn get_lengths(&self) -> &Lengths {
&self.lengths
}
@ -123,7 +161,7 @@ impl ChunkTracker {
&self.have
}
pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) {
self.needed_pieces.set(index.get() as usize, false)
self.queue_pieces.set(index.get() as usize, false)
}
pub fn calc_have_bytes(&self) -> u64 {
@ -137,22 +175,28 @@ impl ChunkTracker {
}
pub fn calc_needed_bytes(&self) -> u64 {
self.needed_pieces
.iter_ones()
.filter_map(|piece_id| {
let piece_id = self.lengths.validate_piece_index(piece_id as u32)?;
Some(self.lengths.piece_length(piece_id) as u64)
self.have
.iter()
.zip(self.selected.iter())
.enumerate()
.filter_map(|(piece_id, (have, selected))| {
if *selected && !*have {
let piece_id = self.lengths.validate_piece_index(piece_id as u32)?;
Some(self.lengths.piece_length(piece_id) as u64)
} else {
None
}
})
.sum()
}
pub fn iter_needed_pieces(&self) -> impl Iterator<Item = usize> + '_ {
pub fn iter_queued_pieces(&self) -> impl Iterator<Item = usize> + '_ {
self.priority_piece_ids
.iter()
.copied()
.filter(move |piece_id| self.needed_pieces[*piece_id])
.filter(move |piece_id| self.queue_pieces[*piece_id])
.chain(
self.needed_pieces
self.queue_pieces
.iter_ones()
.filter(move |id| !self.priority_piece_ids.contains(id)),
)
@ -172,7 +216,7 @@ impl ChunkTracker {
// This will trigger the requesters to re-check each chunk in this piece.
let chunk_range = self.lengths.chunk_range(index);
if !self.chunk_status.get(chunk_range)?.all() {
self.needed_pieces.set(index.get() as usize, true);
self.queue_pieces.set(index.get() as usize, true);
}
Some(true)
}
@ -187,7 +231,7 @@ impl ChunkTracker {
return;
}
debug!("remarking piece={} as broken", index);
self.needed_pieces.set(index.get() as usize, true);
self.queue_pieces.set(index.get() as usize, true);
if let Some(s) = self.chunk_status.get_mut(self.lengths.chunk_range(index)) {
s.fill(false);
}
@ -244,14 +288,15 @@ impl ChunkTracker {
file_lengths_iterator: impl IntoIterator<Item = u64>,
// TODO: maybe make this a BF
new_only_files: &HashSet<usize>,
) -> anyhow::Result<HaveNeeded> {
) -> anyhow::Result<HaveNeededSelected> {
let mut piece_it = self.lengths.iter_piece_infos();
let mut current_piece = piece_it
.next()
.context("bug: iter_piece_infos() returned empty iterator")?;
let mut current_piece_needed = false;
let mut current_piece_selected = false;
let mut current_piece_remaining = current_piece.len;
let mut have_bytes = 0u64;
let mut selected_bytes = 0u64;
let mut needed_bytes = 0u64;
for (idx, len) in file_lengths_iterator.into_iter().enumerate() {
@ -260,31 +305,38 @@ impl ChunkTracker {
let mut remaining_file_len = len;
while remaining_file_len > 0 {
current_piece_needed |= len > 0 && file_required;
current_piece_selected |= len > 0 && file_required;
let shift = std::cmp::min(current_piece_remaining as u64, remaining_file_len);
assert!(shift > 0);
remaining_file_len -= shift;
current_piece_remaining -= shift as u32;
dbg!(
idx,
shift,
remaining_file_len,
current_piece_remaining,
current_piece_needed,
file_required,
current_piece
);
// dbg!(
// idx,
// shift,
// remaining_file_len,
// current_piece_remaining,
// current_piece_needed,
// file_required,
// current_piece
// );
if current_piece_remaining == 0 {
let current_piece_have = self.have[current_piece.piece_index.get() as usize];
if current_piece_have {
have_bytes += current_piece.len as u64;
}
if current_piece_needed {
if current_piece_selected {
selected_bytes += current_piece.len as u64;
}
if current_piece_selected && !current_piece_have {
needed_bytes += current_piece.len as u64;
}
match (current_piece_needed, current_piece_have) {
self.selected.set(
current_piece.piece_index.get() as usize,
current_piece_selected,
);
match (current_piece_selected, current_piece_have) {
(true, true) => {}
(true, false) => {
dbg!(self.mark_piece_broken_if_not_have(current_piece.piece_index))
@ -293,7 +345,7 @@ impl ChunkTracker {
(false, false) => {
// don't need the piece, and don't have it - cancel downloading it
dbg!(self
.needed_pieces
.queue_pieces
.set(current_piece.piece_index.get() as usize, false));
}
}
@ -302,16 +354,17 @@ impl ChunkTracker {
current_piece = piece_it.next().context(
"bug: iter_piece_infos() pieces ended earlier than expected",
)?;
current_piece_needed = false;
current_piece_selected = false;
current_piece_remaining = current_piece.len;
}
}
}
}
Ok(HaveNeeded {
Ok(HaveNeededSelected {
have_bytes,
needed_bytes,
selected_bytes,
})
}
}
@ -322,7 +375,7 @@ mod tests {
use librqbit_core::{constants::CHUNK_SIZE, lengths::Lengths};
use crate::{chunk_tracker::HaveNeeded, type_aliases::BF};
use crate::{chunk_tracker::HaveNeededSelected, type_aliases::BF};
use super::{compute_chunk_have_status, ChunkTracker};
@ -444,103 +497,104 @@ mod tests {
let initial_needed = BF::from_boxed_slice(vec![u8::MAX; bf_len].into_boxed_slice());
// Initially, we need all files and all pieces.
let mut ct = ChunkTracker::new(
initial_needed.clone(),
initial_have.clone(),
l,
l.total_length(),
)
.unwrap();
let mut ct = ChunkTracker::new(initial_needed.clone(), initial_have.clone(), l).unwrap();
// Select all file, no changes.
assert_eq!(
ct.update_only_files(all_files.into_iter(), &HashSet::from_iter([0, 1, 2, 3]))
.unwrap(),
HaveNeeded {
HaveNeededSelected {
have_bytes: 0,
needed_bytes: total_len
selected_bytes: total_len,
needed_bytes: total_len,
}
);
assert_eq!(ct.have, initial_have);
assert_eq!(ct.needed_pieces, initial_needed);
assert_eq!(ct.queue_pieces, initial_needed);
// Select only the first file.
println!("Select only the first file.");
assert_eq!(
ct.update_only_files(all_files, &HashSet::from_iter([0]))
.unwrap(),
HaveNeeded {
HaveNeededSelected {
have_bytes: 0,
selected_bytes: all_files[0],
needed_bytes: all_files[0],
}
);
assert_eq!(ct.needed_pieces[0], true);
assert_eq!(ct.needed_pieces[1], false);
assert_eq!(ct.needed_pieces[2], false);
assert_eq!(ct.queue_pieces[0], true);
assert_eq!(ct.queue_pieces[1], false);
assert_eq!(ct.queue_pieces[2], false);
// Select only the second file.
assert_eq!(
ct.update_only_files(all_files, &HashSet::from_iter([1]))
.unwrap(),
HaveNeeded {
HaveNeededSelected {
have_bytes: 0,
selected_bytes: piece_len as u64,
needed_bytes: piece_len as u64,
}
);
assert_eq!(ct.needed_pieces[0], false);
assert_eq!(ct.needed_pieces[1], true);
assert_eq!(ct.needed_pieces[2], false);
assert_eq!(ct.queue_pieces[0], false);
assert_eq!(ct.queue_pieces[1], true);
assert_eq!(ct.queue_pieces[2], false);
// Select only the third file (zero sized one!).
assert_eq!(
ct.update_only_files(all_files, &HashSet::from_iter([2]))
.unwrap(),
HaveNeeded {
HaveNeededSelected {
have_bytes: 0,
selected_bytes: 0,
needed_bytes: 0,
}
);
assert_eq!(ct.needed_pieces[0], false);
assert_eq!(ct.needed_pieces[1], false);
assert_eq!(ct.needed_pieces[2], false);
assert_eq!(ct.queue_pieces[0], false);
assert_eq!(ct.queue_pieces[1], false);
assert_eq!(ct.queue_pieces[2], false);
// Select only the fourth file.
assert_eq!(
ct.update_only_files(all_files, &HashSet::from_iter([3]))
.unwrap(),
HaveNeeded {
HaveNeededSelected {
have_bytes: 0,
selected_bytes: (piece_len + 1) as u64,
needed_bytes: (piece_len + 1) as u64,
}
);
assert_eq!(ct.needed_pieces[0], false);
assert_eq!(ct.needed_pieces[1], true);
assert_eq!(ct.needed_pieces[2], true);
assert_eq!(ct.queue_pieces[0], false);
assert_eq!(ct.queue_pieces[1], true);
assert_eq!(ct.queue_pieces[2], true);
// Select first and last file
assert_eq!(
ct.update_only_files(all_files.clone(), &HashSet::from_iter([0, 3]))
.unwrap(),
HaveNeeded {
HaveNeededSelected {
have_bytes: 0,
selected_bytes: all_files[0] + all_files[3] + 1,
needed_bytes: all_files[0] + all_files[3] + 1,
}
);
assert_eq!(ct.needed_pieces[0], true);
assert_eq!(ct.needed_pieces[1], true);
assert_eq!(ct.needed_pieces[2], true);
assert_eq!(ct.queue_pieces[0], true);
assert_eq!(ct.queue_pieces[1], true);
assert_eq!(ct.queue_pieces[2], true);
// Select all files
assert_eq!(
ct.update_only_files(all_files.clone(), &HashSet::from_iter([0, 1, 2, 3]))
.unwrap(),
HaveNeeded {
HaveNeededSelected {
have_bytes: 0,
needed_bytes: total_len,
selected_bytes: total_len,
needed_bytes: total_len
}
);
assert_eq!(ct.needed_pieces[0], true);
assert_eq!(ct.needed_pieces[1], true);
assert_eq!(ct.needed_pieces[2], true);
assert_eq!(ct.queue_pieces[0], true);
assert_eq!(ct.queue_pieces[1], true);
assert_eq!(ct.queue_pieces[2], true);
}
}