diff --git a/Makefile b/Makefile index 79501fb..4a1f9dc 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,7 @@ webui-dev: webui-deps export RQBIT_UPNP_SERVER_ENABLE ?= true export RQBIT_UPNP_SERVER_FRIENDLY_NAME ?= rqbit-dev export RQBIT_HTTP_API_LISTEN_ADDR ?= 0.0.0.0:3030 +export RQBIT_FASTRESUME = true CARGO_RUN_FLAGS ?= RQBIT_OUTPUT_FOLDER ?= /tmp/scratch RQBIT_POSTGRES_CONNECTION_STRING ?= postgres:///rqbit diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index 27052f9..377afe3 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -173,12 +173,7 @@ impl<'a> FileOps<'a> { Ok(have_pieces) } - pub fn check_piece( - &self, - who_sent: PeerHandle, - piece_index: ValidPieceIndex, - last_received_chunk: &ChunkInfo, - ) -> anyhow::Result { + pub fn check_piece(&self, piece_index: ValidPieceIndex) -> anyhow::Result { let mut h = Sha1::new(); let piece_length = self.lengths.piece_length(piece_index); let mut absolute_offset = self.lengths.piece_offset(piece_index); @@ -196,12 +191,10 @@ impl<'a> FileOps<'a> { let to_read_in_file: usize = std::cmp::min(file_remaining_len, piece_remaining_bytes as u64).try_into()?; trace!( - "piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}", + "piece={}, file_idx={}, seeking to {}", piece_index, - who_sent, file_idx, absolute_offset, - &last_received_chunk ); update_hash_from_file( file_idx, diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 89341db..868cea6 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -1,17 +1,23 @@ use std::{ - sync::{atomic::AtomicU64, Arc}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, time::Instant, }; use anyhow::Context; +use itertools::Itertools; use librqbit_core::lengths::Lengths; +use rand::Rng; use size_format::SizeFormatterBinary as SF; use tracing::{info, trace, warn}; use crate::{ api::TorrentIdOrHash, bitv::BitV, + bitv_factory::BitVFactory, chunk_tracker::ChunkTracker, file_ops::FileOps, type_aliases::{FileStorage, BF}, @@ -67,6 +73,69 @@ impl TorrentStateInitializing { .load(std::sync::atomic::Ordering::Relaxed) } + async fn validate_fastresume( + &self, + bitv_factory: &dyn BitVFactory, + have_pieces: Option>, + ) -> Option> { + let hp = have_pieces?; + let actual = hp.as_bytes().len(); + let expected = self.shared.lengths.piece_bitfield_bytes(); + if actual != expected { + warn!( + actual, + expected, + "the bitfield loaded isn't of correct length, ignoring it, will do full check" + ); + return None; + } + + let is_broken = self.shared.spawner.spawn_block_in_place(|| { + let fo = crate::file_ops::FileOps::new( + &self.shared.info, + &self.files, + &self.shared.file_infos, + &self.shared.lengths, + ); + + use rand::seq::SliceRandom; + + let mut have_pieces = hp + .as_slice() + .iter_ones() + .filter_map(|i| self.shared.lengths.validate_piece_index(i.try_into().ok()?)) + .collect_vec(); + have_pieces.shuffle(&mut rand::thread_rng()); + + // Validate a certain threshold of fastresume pieces with decreasing probability of actual disk reads. + for (tmp_id, hpiece) in have_pieces.iter().enumerate() { + let denom: u32 = (tmp_id + 1).min(50).try_into().unwrap(); + if rand::thread_rng().gen_ratio(1, denom) && fo.check_piece(*hpiece).is_err() { + return true; + } + + #[allow(clippy::cast_possible_truncation)] + let progress = (self.shared.lengths.total_length() as f64 + / have_pieces.len() as f64 + * (tmp_id + 1) as f64) as u64; + let progress = progress.min(self.shared.lengths.total_length()); + self.checked_bytes.store(progress, Ordering::Relaxed); + } + false + }); + + if is_broken { + warn!("data corrupted, ignoring fastresume data"); + if let Err(e) = bitv_factory.clear(self.shared.id.into()).await { + warn!(error=?e, "error clearing bitfield"); + } + self.checked_bytes.store(0, Ordering::Relaxed); + return None; + } + + Some(hp) + } + pub async fn check(&self) -> anyhow::Result { let id: TorrentIdOrHash = self.shared.info_hash.into(); let bitv_factory = self @@ -76,7 +145,7 @@ impl TorrentStateInitializing { .context("session is dead")? .bitv_factory .clone(); - let mut have_pieces = if self.previously_errored { + let have_pieces = if self.previously_errored { if let Err(e) = bitv_factory.clear(id).await { warn!(error=?e, "error clearing bitfield"); } @@ -88,18 +157,8 @@ impl TorrentStateInitializing { .context("error loading have_pieces")? }; - if let Some(hp) = have_pieces.as_ref() { - let actual = hp.as_bytes().len(); - let expected = self.shared.lengths.piece_bitfield_bytes(); - if actual != expected { - warn!( - actual, - expected, - "the bitfield loaded isn't of correct length, ignoring it, will do full check" - ); - have_pieces = None; - } - } + let have_pieces = self.validate_fastresume(&*bitv_factory, have_pieces).await; + let have_pieces = match have_pieces { Some(h) => h, None => { diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 89bc729..8e83bdd 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1557,7 +1557,7 @@ impl PeerHandler { match state .file_ops() - .check_piece(addr, chunk_info.piece_index, chunk_info) + .check_piece(chunk_info.piece_index) .with_context(|| format!("error checking piece={index}"))? { true => { diff --git a/crates/upnp/examples/discover.rs b/crates/upnp/examples/discover.rs index a445476..c5c7a45 100644 --- a/crates/upnp/examples/discover.rs +++ b/crates/upnp/examples/discover.rs @@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> { println!("{}: {s:#?}", r.location); } Err(e) => { - tracing::error!(location=%r.location, "error discovering") + tracing::error!(error=?e, location=%r.location, "error discovering") } } drop(stx); @@ -30,6 +30,6 @@ async fn main() -> anyhow::Result<()> { let f3 = async move { while (srx.recv().await).is_some() {} }; - tokio::join!(f1, f2, f3); + tokio::join!(f1, f2, f3).0.unwrap(); Ok(()) }