From 2fee0ca8c27806ceedfc7440907f6441afb23cac Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 20 Aug 2024 21:09:58 +0100 Subject: [PATCH] flushing bitfield --- crates/librqbit/src/bitv.rs | 16 ++++------ crates/librqbit/src/chunk_tracker.rs | 4 +++ .../src/torrent_state/initializing.rs | 14 +++++++- crates/librqbit/src/torrent_state/live/mod.rs | 32 ++++++++++++++++++- 4 files changed, 54 insertions(+), 12 deletions(-) diff --git a/crates/librqbit/src/bitv.rs b/crates/librqbit/src/bitv.rs index a80c01d..dedc6e7 100644 --- a/crates/librqbit/src/bitv.rs +++ b/crates/librqbit/src/bitv.rs @@ -9,14 +9,12 @@ use bitvec::{ view::{AsBits, AsMutBits}, }; -#[async_trait::async_trait] pub trait BitV: Send + Sync { fn as_slice(&self) -> &BitSlice; fn as_slice_mut(&mut self) -> &mut BitSlice; fn into_dyn(self) -> Box; fn as_bytes(&self) -> &[u8]; - - async fn flush(&mut self) -> anyhow::Result<()>; + fn flush(&mut self) -> anyhow::Result<()>; } pub type BoxBitV = Box; @@ -48,7 +46,7 @@ impl BitV for BitVec { self.as_raw_slice() } - async fn flush(&mut self) -> anyhow::Result<()> { + fn flush(&mut self) -> anyhow::Result<()> { Ok(()) } @@ -71,7 +69,7 @@ impl BitV for BitBox { self.as_raw_slice() } - async fn flush(&mut self) -> anyhow::Result<()> { + fn flush(&mut self) -> anyhow::Result<()> { Ok(()) } @@ -80,7 +78,6 @@ impl BitV for BitBox { } } -#[async_trait::async_trait] impl BitV for MmapBitV { fn as_slice(&self) -> &BitSlice { self.mmap.as_bits() @@ -94,7 +91,7 @@ impl BitV for MmapBitV { &self.mmap } - async fn flush(&mut self) -> anyhow::Result<()> { + fn flush(&mut self) -> anyhow::Result<()> { Ok(self.mmap.flush()?) } @@ -103,7 +100,6 @@ impl BitV for MmapBitV { } } -#[async_trait::async_trait] impl BitV for Box { fn as_slice(&self) -> &BitSlice { (**self).as_slice() @@ -117,8 +113,8 @@ impl BitV for Box { (**self).as_bytes() } - async fn flush(&mut self) -> anyhow::Result<()> { - (**self).flush().await + fn flush(&mut self) -> anyhow::Result<()> { + (**self).flush() } fn into_dyn(self) -> Box { diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index ffab7c2..e113943 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -185,6 +185,10 @@ impl ChunkTracker { &*self.have } + pub fn get_have_pieces_mut(&mut self) -> &mut dyn BitV { + &mut *self.have + } + pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) { self.queue_pieces.set(index.get() as usize, false) } diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 9ea781a..44c0941 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -70,10 +70,22 @@ impl TorrentStateInitializing { bitv_factory: Arc, ) -> anyhow::Result { let id: TorrentIdOrHash = self.meta.info_hash.into(); - let have_pieces = bitv_factory + let mut have_pieces = bitv_factory .load(id) .await .context("error loading have_pieces")?; + if let Some(hp) = have_pieces.as_ref() { + let actual = hp.as_bytes().len(); + let expected = self.meta.lengths.piece_bitfield_bytes(); + if actual != expected { + warn!( + actual, + expected, + "the bitfield loaded isn't of correct length, ignoring it, will do full check" + ); + have_pieces = None; + } + } let have_pieces = match have_pieces { Some(h) => h, None => { diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index c11d16a..ffabf18 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -131,6 +131,8 @@ pub(crate) struct TorrentStateLocked { // If this is None, then it was already used fatal_errors_tx: Option>, + + unflushed_bitv_bytes: u64, } impl TorrentStateLocked { @@ -145,6 +147,23 @@ impl TorrentStateLocked { .as_mut() .context("chunk tracker empty, torrent was paused") } + + fn try_flush_bitv(&mut self) { + if self.unflushed_bitv_bytes == 0 { + return; + } + + if let Some(Err(e)) = self + .chunks + .as_mut() + .map(|ct| ct.get_have_pieces_mut().flush()) + { + warn!(error=?e, "error flushing bitfield"); + } else { + trace!("flushed bitfield"); + self.unflushed_bitv_bytes = 0; + } + } } #[derive(Default)] @@ -155,6 +174,8 @@ pub struct TorrentStateOptions { pub peer_read_write_timeout: Option, } +const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024; + pub struct TorrentStateLive { peers: PeerStates, meta: Arc, @@ -223,6 +244,7 @@ impl TorrentStateLive { inflight_pieces: Default::default(), file_priorities, fatal_errors_tx: Some(fatal_errors_tx), + unflushed_bitv_bytes: 0, }), files: paused.files, stats: AtomicStats { @@ -684,6 +706,7 @@ impl TorrentStateLive { fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> { let mut g = self.lock_write("on_piece_completed"); + let g = &mut **g; let chunks = g.get_chunks_mut()?; // if we have all the pieces of the file, reopen it read only @@ -701,13 +724,20 @@ impl TorrentStateLive { self.streams .wake_streams_on_piece_completed(id, &self.meta.lengths); + g.unflushed_bitv_bytes += self.meta.lengths.piece_length(id) as u64; + if g.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES { + g.try_flush_bitv() + } + + let chunks = g.get_chunks()?; if chunks.is_finished() { if chunks.get_selected_pieces()[id.get_usize()] { + g.try_flush_bitv(); info!("torrent finished downloading"); } self.finished_notify.notify_waiters(); - if !self.has_active_streams_unfinished_files(&g) { + if !self.has_active_streams_unfinished_files(g) { // There is not poing being connected to peers that have all the torrent, when // we don't need anything from them, and they don't need anything from us. self.disconnect_all_peers_that_have_full_torrent();