Use in chunk_tracker

This commit is contained in:
Igor Katson 2024-08-20 17:15:37 +01:00
parent e771162fa7
commit 8135b31a5d
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
7 changed files with 87 additions and 29 deletions

View file

@ -363,7 +363,7 @@ impl Api {
pub fn api_dump_haves(&self, idx: TorrentIdOrHash) -> Result<String> {
let mgr = self.mgr_handle(idx)?;
Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?)
Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces().as_slice()))?)
}
pub fn api_stream(&self, idx: TorrentIdOrHash, file_id: usize) -> Result<FileStream> {

View file

@ -1,17 +1,26 @@
use std::fs::File;
use anyhow::Context;
use bitvec::{order::Lsb0, slice::BitSlice, vec::BitVec, view::AsBits, view::AsMutBits};
use bitvec::{
boxed::BitBox,
order::Lsb0,
slice::BitSlice,
vec::BitVec,
view::{AsBits, AsMutBits},
};
#[async_trait::async_trait]
pub trait BitV: Send {
pub trait BitV: Send + Sync {
fn as_slice(&self) -> &BitSlice<u8, Lsb0>;
fn as_slice_mut(&mut self) -> &mut BitSlice<u8, Lsb0>;
fn into_dyn(self) -> Box<dyn BitV>;
fn as_bytes(&self) -> &[u8];
async fn flush(&mut self) -> anyhow::Result<()>;
}
pub type BoxBitV = Box<dyn BitV>;
pub struct MmapBitV {
_file: File,
mmap: memmap2::MmapMut,
@ -35,6 +44,33 @@ impl BitV for BitVec<u8, Lsb0> {
self.as_mut_bitslice()
}
fn as_bytes(&self) -> &[u8] {
self.as_raw_slice()
}
async fn flush(&mut self) -> anyhow::Result<()> {
Ok(())
}
fn into_dyn(self) -> Box<dyn BitV> {
Box::new(self)
}
}
#[async_trait::async_trait]
impl BitV for BitBox<u8, Lsb0> {
fn as_slice(&self) -> &BitSlice<u8, Lsb0> {
self.as_bitslice()
}
fn as_slice_mut(&mut self) -> &mut BitSlice<u8, Lsb0> {
self.as_mut_bitslice()
}
fn as_bytes(&self) -> &[u8] {
self.as_raw_slice()
}
async fn flush(&mut self) -> anyhow::Result<()> {
Ok(())
}
@ -54,6 +90,10 @@ impl BitV for MmapBitV {
self.mmap.as_mut_bits()
}
fn as_bytes(&self) -> &[u8] {
&self.mmap
}
async fn flush(&mut self) -> anyhow::Result<()> {
Ok(self.mmap.flush()?)
}
@ -73,6 +113,10 @@ impl BitV for Box<dyn BitV> {
(**self).as_slice_mut()
}
fn as_bytes(&self) -> &[u8] {
(**self).as_bytes()
}
async fn flush(&mut self) -> anyhow::Result<()> {
(**self).flush().await
}

View file

@ -1,13 +1,15 @@
use std::collections::HashSet;
use anyhow::Context;
use bitvec::{order::Lsb0, slice::BitSlice};
use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex};
use peer_binary_protocol::Piece;
use tracing::{debug, trace};
use crate::{
bitv::{BitV, BoxBitV},
file_info::FileInfo,
type_aliases::{FileInfos, FilePriorities, BF},
type_aliases::{FileInfos, FilePriorities, BF, BS},
};
pub struct ChunkTracker {
@ -26,7 +28,7 @@ pub struct ChunkTracker {
chunk_status: BF,
// These are the pieces that we actually have, fully checked and downloaded.
have: BF,
have: BoxBitV,
// The pieces that the user selected. This doesn't change unless update_only_files
// was called.
@ -70,7 +72,7 @@ impl HaveNeededSelected {
// Comput the have-status of chunks.
//
// Save as "have_pieces", but there's one bit per chunk (not per piece).
fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BF) -> anyhow::Result<BF> {
fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BS) -> anyhow::Result<BF> {
if have_pieces.len() < lengths.total_pieces() as usize {
anyhow::bail!(
"bug: have_pieces.len() < lengths.total_pieces(); {} < {}",
@ -98,15 +100,19 @@ fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BF) -> anyhow::Res
Ok(chunk_bf)
}
fn compute_queued_pieces_unchecked(have_pieces: &BF, selected_pieces: &BF) -> BF {
fn compute_queued_pieces_unchecked(have_pieces: &BS, selected_pieces: &BS) -> BF {
// it's needed ONLY if it's selected and we don't have it.
use core::ops::BitAnd;
use core::ops::Not;
have_pieces.clone().not().bitand(selected_pieces)
have_pieces
.to_bitvec()
.not()
.bitand(selected_pieces)
.into_boxed_bitslice()
}
fn compute_queued_pieces(have_pieces: &BF, selected_pieces: &BF) -> anyhow::Result<BF> {
fn compute_queued_pieces(have_pieces: &BS, selected_pieces: &BS) -> anyhow::Result<BF> {
if have_pieces.len() != selected_pieces.len() {
anyhow::bail!(
"have_pieces.len() != selected_pieces.len(), {} != {}",
@ -131,20 +137,20 @@ pub enum ChunkMarkingResult {
impl ChunkTracker {
pub fn new(
// Have pieces are the ones we have already downloaded and verified.
have_pieces: BF,
have_pieces: BoxBitV,
// Selected pieces are the ones the user has selected
selected_pieces: BF,
lengths: Lengths,
file_infos: &FileInfos,
) -> anyhow::Result<Self> {
let needed_pieces = compute_queued_pieces(&have_pieces, &selected_pieces)
let needed_pieces = compute_queued_pieces(have_pieces.as_slice(), &selected_pieces)
.context("error computing needed pieces")?;
// TODO: ideally this needs to be a list based on needed files, e.g.
// last needed piece for each file. But let's keep simple for now.
let mut ct = Self {
chunk_status: compute_chunk_have_status(&lengths, &have_pieces)
chunk_status: compute_chunk_have_status(&lengths, have_pieces.as_slice())
.context("error computing chunk status")?,
queue_pieces: needed_pieces,
selected: selected_pieces,
@ -163,7 +169,7 @@ impl ChunkTracker {
*slot = fi
.piece_range
.clone()
.filter(|p| self.have[*p as usize])
.filter(|p| self.have.as_slice()[*p as usize])
.map(|id| {
self.lengths
.size_of_piece_in_file(id, fi.offset_in_torrent, fi.len)
@ -176,8 +182,8 @@ impl ChunkTracker {
&self.lengths
}
pub fn get_have_pieces(&self) -> &BF {
&self.have
pub fn get_have_pieces(&self) -> &dyn BitV {
&*self.have
}
pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) {
@ -193,7 +199,7 @@ impl ChunkTracker {
for piece in self.lengths.iter_piece_infos() {
let id = piece.piece_index.get() as usize;
let len = piece.len as u64;
let is_have = self.have[id];
let is_have = self.have.as_slice()[id];
let is_selected = self.selected[id];
let is_needed = is_selected && !is_have;
hns.have_bytes += len * (is_have as u64);
@ -219,12 +225,13 @@ impl ChunkTracker {
}
pub(crate) fn is_piece_have(&self, id: ValidPieceIndex) -> bool {
self.have[id.get() as usize]
self.have.as_slice()[id.get() as usize]
}
pub fn mark_piece_broken_if_not_have(&mut self, index: ValidPieceIndex) {
if self
.have
.as_slice()
.get(index.get() as usize)
.map(|r| *r)
.unwrap_or_default()
@ -240,8 +247,8 @@ impl ChunkTracker {
pub fn mark_piece_downloaded(&mut self, idx: ValidPieceIndex) {
let id = idx.get() as usize;
if !self.have[id] {
self.have.set(id, true);
if !self.have.as_slice()[id] {
self.have.as_slice_mut().set(id, true);
let len = self.lengths.piece_length(idx) as u64;
self.hns.have_bytes += len;
if self.selected[id] {
@ -252,6 +259,7 @@ impl ChunkTracker {
pub fn is_chunk_ready_to_upload(&self, chunk: &ChunkInfo) -> bool {
self.have
.as_slice()
.get(chunk.piece_index.get() as usize)
.map(|b| *b)
.unwrap_or(false)
@ -327,7 +335,8 @@ impl ChunkTracker {
current_piece_remaining -= TryInto::<u32>::try_into(shift)?;
if current_piece_remaining == 0 {
let current_piece_have = self.have[current_piece.piece_index.get() as usize];
let current_piece_have =
self.have.as_slice()[current_piece.piece_index.get() as usize];
if current_piece_have {
have_bytes += current_piece.len as u64;
}
@ -380,6 +389,7 @@ impl ChunkTracker {
pub fn is_file_finished(&self, file_info: &FileInfo) -> bool {
self.have
.as_slice()
.get(file_info.piece_range_usize())
.map(|r| r.all())
.unwrap_or(true)
@ -414,9 +424,10 @@ impl ChunkTracker {
mod tests {
use std::collections::HashSet;
use bitvec::{order::Lsb0, vec::BitVec};
use librqbit_core::{constants::CHUNK_SIZE, lengths::Lengths};
use crate::{chunk_tracker::HaveNeededSelected, type_aliases::BF};
use crate::{bitv::BitV, chunk_tracker::HaveNeededSelected, type_aliases::BF};
use super::{compute_chunk_have_status, ChunkTracker};
@ -534,12 +545,12 @@ mod tests {
];
let bf_len = l.piece_bitfield_bytes();
let initial_have = BF::from_boxed_slice(vec![0u8; bf_len].into_boxed_slice());
let initial_have: BitVec<u8, Lsb0> = BitVec::from_vec(vec![0u8; bf_len]);
let initial_selected = BF::from_boxed_slice(vec![u8::MAX; bf_len].into_boxed_slice());
// Initially, we need all files and all pieces.
let mut ct = ChunkTracker::new(
initial_have.clone(),
initial_have.clone().into_dyn(),
initial_selected.clone(),
l,
&Default::default(),
@ -556,7 +567,7 @@ mod tests {
needed_bytes: total_len,
}
);
assert_eq!(ct.have, initial_have);
assert_eq!(ct.have.as_slice(), initial_have.as_bitslice());
assert_eq!(ct.queue_pieces, initial_selected);
// Select only the first file.

View file

@ -8,7 +8,9 @@ use anyhow::Context;
use size_format::SizeFormatterBinary as SF;
use tracing::{debug, info, warn};
use crate::{chunk_tracker::ChunkTracker, file_ops::FileOps, type_aliases::FileStorage};
use crate::{
bitv::BitV, chunk_tracker::ChunkTracker, file_ops::FileOps, type_aliases::FileStorage,
};
use super::{paused::TorrentStatePaused, ManagedTorrentInfo};
@ -86,7 +88,7 @@ impl TorrentStateInitializing {
})?;
let chunk_tracker = ChunkTracker::new(
initial_check_results.have_pieces,
initial_check_results.have_pieces.into_dyn(),
initial_check_results.selected_pieces,
self.meta.lengths,
&self.meta.file_infos,

View file

@ -835,7 +835,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize> {
let g = self.state.lock_read("serialize_bitfield_message_to_buf");
let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_raw_slice()));
let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_bytes()));
let len = msg.serialize(buf, &|| None)?;
trace!("sending: {:?}, length={}", &msg, len);
Ok(len)

View file

@ -187,7 +187,7 @@ impl AsyncRead for FileStream {
// if the piece is not there, register to wake when it is
// check if we have the piece for real
let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| {
let have = ct.get_have_pieces()[current.id.get() as usize];
let have = ct.get_have_pieces().as_slice()[current.id.get() as usize];
if !have {
self.streams
.register_waker(self.stream_id, cx.waker().clone());

View file

@ -4,7 +4,8 @@ use futures::stream::BoxStream;
use crate::{file_info::FileInfo, storage::TorrentStorage};
pub type BF = bitvec::boxed::BitBox<u8, bitvec::order::Msb0>;
pub type BS = bitvec::slice::BitSlice<u8, bitvec::order::Lsb0>;
pub type BF = bitvec::boxed::BitBox<u8, bitvec::order::Lsb0>;
pub type PeerHandle = SocketAddr;
pub type PeerStream = BoxStream<'static, SocketAddr>;