diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 4bbfdd3..6003966 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -363,7 +363,7 @@ impl Api { pub fn api_dump_haves(&self, idx: TorrentIdOrHash) -> Result { let mgr = self.mgr_handle(idx)?; - Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?) + Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces().as_slice()))?) } pub fn api_stream(&self, idx: TorrentIdOrHash, file_id: usize) -> Result { diff --git a/crates/librqbit/src/bitv.rs b/crates/librqbit/src/bitv.rs index 7d2ff2b..a80c01d 100644 --- a/crates/librqbit/src/bitv.rs +++ b/crates/librqbit/src/bitv.rs @@ -1,17 +1,26 @@ use std::fs::File; use anyhow::Context; -use bitvec::{order::Lsb0, slice::BitSlice, vec::BitVec, view::AsBits, view::AsMutBits}; +use bitvec::{ + boxed::BitBox, + order::Lsb0, + slice::BitSlice, + vec::BitVec, + view::{AsBits, AsMutBits}, +}; #[async_trait::async_trait] -pub trait BitV: Send { +pub trait BitV: Send + Sync { fn as_slice(&self) -> &BitSlice; fn as_slice_mut(&mut self) -> &mut BitSlice; fn into_dyn(self) -> Box; + fn as_bytes(&self) -> &[u8]; async fn flush(&mut self) -> anyhow::Result<()>; } +pub type BoxBitV = Box; + pub struct MmapBitV { _file: File, mmap: memmap2::MmapMut, @@ -35,6 +44,33 @@ impl BitV for BitVec { self.as_mut_bitslice() } + fn as_bytes(&self) -> &[u8] { + self.as_raw_slice() + } + + async fn flush(&mut self) -> anyhow::Result<()> { + Ok(()) + } + + fn into_dyn(self) -> Box { + Box::new(self) + } +} + +#[async_trait::async_trait] +impl BitV for BitBox { + fn as_slice(&self) -> &BitSlice { + self.as_bitslice() + } + + fn as_slice_mut(&mut self) -> &mut BitSlice { + self.as_mut_bitslice() + } + + fn as_bytes(&self) -> &[u8] { + self.as_raw_slice() + } + async fn flush(&mut self) -> anyhow::Result<()> { Ok(()) } @@ -54,6 +90,10 @@ impl BitV for MmapBitV { self.mmap.as_mut_bits() } + fn as_bytes(&self) -> &[u8] { + &self.mmap + } + async fn flush(&mut self) -> anyhow::Result<()> { Ok(self.mmap.flush()?) } @@ -73,6 +113,10 @@ impl BitV for Box { (**self).as_slice_mut() } + fn as_bytes(&self) -> &[u8] { + (**self).as_bytes() + } + async fn flush(&mut self) -> anyhow::Result<()> { (**self).flush().await } diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index d3a87cb..de6d026 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -1,13 +1,15 @@ use std::collections::HashSet; use anyhow::Context; +use bitvec::{order::Lsb0, slice::BitSlice}; use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex}; use peer_binary_protocol::Piece; use tracing::{debug, trace}; use crate::{ + bitv::{BitV, BoxBitV}, file_info::FileInfo, - type_aliases::{FileInfos, FilePriorities, BF}, + type_aliases::{FileInfos, FilePriorities, BF, BS}, }; pub struct ChunkTracker { @@ -26,7 +28,7 @@ pub struct ChunkTracker { chunk_status: BF, // These are the pieces that we actually have, fully checked and downloaded. - have: BF, + have: BoxBitV, // The pieces that the user selected. This doesn't change unless update_only_files // was called. @@ -70,7 +72,7 @@ impl HaveNeededSelected { // Comput the have-status of chunks. // // Save as "have_pieces", but there's one bit per chunk (not per piece). -fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BF) -> anyhow::Result { +fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BS) -> anyhow::Result { if have_pieces.len() < lengths.total_pieces() as usize { anyhow::bail!( "bug: have_pieces.len() < lengths.total_pieces(); {} < {}", @@ -98,15 +100,19 @@ fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BF) -> anyhow::Res Ok(chunk_bf) } -fn compute_queued_pieces_unchecked(have_pieces: &BF, selected_pieces: &BF) -> BF { +fn compute_queued_pieces_unchecked(have_pieces: &BS, selected_pieces: &BS) -> BF { // it's needed ONLY if it's selected and we don't have it. use core::ops::BitAnd; use core::ops::Not; - have_pieces.clone().not().bitand(selected_pieces) + have_pieces + .to_bitvec() + .not() + .bitand(selected_pieces) + .into_boxed_bitslice() } -fn compute_queued_pieces(have_pieces: &BF, selected_pieces: &BF) -> anyhow::Result { +fn compute_queued_pieces(have_pieces: &BS, selected_pieces: &BS) -> anyhow::Result { if have_pieces.len() != selected_pieces.len() { anyhow::bail!( "have_pieces.len() != selected_pieces.len(), {} != {}", @@ -131,20 +137,20 @@ pub enum ChunkMarkingResult { impl ChunkTracker { pub fn new( // Have pieces are the ones we have already downloaded and verified. - have_pieces: BF, + have_pieces: BoxBitV, // Selected pieces are the ones the user has selected selected_pieces: BF, lengths: Lengths, file_infos: &FileInfos, ) -> anyhow::Result { - let needed_pieces = compute_queued_pieces(&have_pieces, &selected_pieces) + let needed_pieces = compute_queued_pieces(have_pieces.as_slice(), &selected_pieces) .context("error computing needed pieces")?; // TODO: ideally this needs to be a list based on needed files, e.g. // last needed piece for each file. But let's keep simple for now. let mut ct = Self { - chunk_status: compute_chunk_have_status(&lengths, &have_pieces) + chunk_status: compute_chunk_have_status(&lengths, have_pieces.as_slice()) .context("error computing chunk status")?, queue_pieces: needed_pieces, selected: selected_pieces, @@ -163,7 +169,7 @@ impl ChunkTracker { *slot = fi .piece_range .clone() - .filter(|p| self.have[*p as usize]) + .filter(|p| self.have.as_slice()[*p as usize]) .map(|id| { self.lengths .size_of_piece_in_file(id, fi.offset_in_torrent, fi.len) @@ -176,8 +182,8 @@ impl ChunkTracker { &self.lengths } - pub fn get_have_pieces(&self) -> &BF { - &self.have + pub fn get_have_pieces(&self) -> &dyn BitV { + &*self.have } pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) { @@ -193,7 +199,7 @@ impl ChunkTracker { for piece in self.lengths.iter_piece_infos() { let id = piece.piece_index.get() as usize; let len = piece.len as u64; - let is_have = self.have[id]; + let is_have = self.have.as_slice()[id]; let is_selected = self.selected[id]; let is_needed = is_selected && !is_have; hns.have_bytes += len * (is_have as u64); @@ -219,12 +225,13 @@ impl ChunkTracker { } pub(crate) fn is_piece_have(&self, id: ValidPieceIndex) -> bool { - self.have[id.get() as usize] + self.have.as_slice()[id.get() as usize] } pub fn mark_piece_broken_if_not_have(&mut self, index: ValidPieceIndex) { if self .have + .as_slice() .get(index.get() as usize) .map(|r| *r) .unwrap_or_default() @@ -240,8 +247,8 @@ impl ChunkTracker { pub fn mark_piece_downloaded(&mut self, idx: ValidPieceIndex) { let id = idx.get() as usize; - if !self.have[id] { - self.have.set(id, true); + if !self.have.as_slice()[id] { + self.have.as_slice_mut().set(id, true); let len = self.lengths.piece_length(idx) as u64; self.hns.have_bytes += len; if self.selected[id] { @@ -252,6 +259,7 @@ impl ChunkTracker { pub fn is_chunk_ready_to_upload(&self, chunk: &ChunkInfo) -> bool { self.have + .as_slice() .get(chunk.piece_index.get() as usize) .map(|b| *b) .unwrap_or(false) @@ -327,7 +335,8 @@ impl ChunkTracker { current_piece_remaining -= TryInto::::try_into(shift)?; if current_piece_remaining == 0 { - let current_piece_have = self.have[current_piece.piece_index.get() as usize]; + let current_piece_have = + self.have.as_slice()[current_piece.piece_index.get() as usize]; if current_piece_have { have_bytes += current_piece.len as u64; } @@ -380,6 +389,7 @@ impl ChunkTracker { pub fn is_file_finished(&self, file_info: &FileInfo) -> bool { self.have + .as_slice() .get(file_info.piece_range_usize()) .map(|r| r.all()) .unwrap_or(true) @@ -414,9 +424,10 @@ impl ChunkTracker { mod tests { use std::collections::HashSet; + use bitvec::{order::Lsb0, vec::BitVec}; use librqbit_core::{constants::CHUNK_SIZE, lengths::Lengths}; - use crate::{chunk_tracker::HaveNeededSelected, type_aliases::BF}; + use crate::{bitv::BitV, chunk_tracker::HaveNeededSelected, type_aliases::BF}; use super::{compute_chunk_have_status, ChunkTracker}; @@ -534,12 +545,12 @@ mod tests { ]; let bf_len = l.piece_bitfield_bytes(); - let initial_have = BF::from_boxed_slice(vec![0u8; bf_len].into_boxed_slice()); + let initial_have: BitVec = BitVec::from_vec(vec![0u8; bf_len]); let initial_selected = BF::from_boxed_slice(vec![u8::MAX; bf_len].into_boxed_slice()); // Initially, we need all files and all pieces. let mut ct = ChunkTracker::new( - initial_have.clone(), + initial_have.clone().into_dyn(), initial_selected.clone(), l, &Default::default(), @@ -556,7 +567,7 @@ mod tests { needed_bytes: total_len, } ); - assert_eq!(ct.have, initial_have); + assert_eq!(ct.have.as_slice(), initial_have.as_bitslice()); assert_eq!(ct.queue_pieces, initial_selected); // Select only the first file. diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 953bfd8..9e97c09 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -8,7 +8,9 @@ use anyhow::Context; use size_format::SizeFormatterBinary as SF; use tracing::{debug, info, warn}; -use crate::{chunk_tracker::ChunkTracker, file_ops::FileOps, type_aliases::FileStorage}; +use crate::{ + bitv::BitV, chunk_tracker::ChunkTracker, file_ops::FileOps, type_aliases::FileStorage, +}; use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; @@ -86,7 +88,7 @@ impl TorrentStateInitializing { })?; let chunk_tracker = ChunkTracker::new( - initial_check_results.have_pieces, + initial_check_results.have_pieces.into_dyn(), initial_check_results.selected_pieces, self.meta.lengths, &self.meta.file_infos, diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 7d913fe..c11d16a 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -835,7 +835,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> anyhow::Result { let g = self.state.lock_read("serialize_bitfield_message_to_buf"); - let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_raw_slice())); + let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_bytes())); let len = msg.serialize(buf, &|| None)?; trace!("sending: {:?}, length={}", &msg, len); Ok(len) diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 19009eb..cd711a2 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -187,7 +187,7 @@ impl AsyncRead for FileStream { // if the piece is not there, register to wake when it is // check if we have the piece for real let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| { - let have = ct.get_have_pieces()[current.id.get() as usize]; + let have = ct.get_have_pieces().as_slice()[current.id.get() as usize]; if !have { self.streams .register_waker(self.stream_id, cx.waker().clone()); diff --git a/crates/librqbit/src/type_aliases.rs b/crates/librqbit/src/type_aliases.rs index be5741e..2f104e3 100644 --- a/crates/librqbit/src/type_aliases.rs +++ b/crates/librqbit/src/type_aliases.rs @@ -4,7 +4,8 @@ use futures::stream::BoxStream; use crate::{file_info::FileInfo, storage::TorrentStorage}; -pub type BF = bitvec::boxed::BitBox; +pub type BS = bitvec::slice::BitSlice; +pub type BF = bitvec::boxed::BitBox; pub type PeerHandle = SocketAddr; pub type PeerStream = BoxStream<'static, SocketAddr>;