Check SOME fastresume data, dont trust it completely.
This commit is contained in:
parent
babe470f9a
commit
9225e126d9
5 changed files with 67 additions and 25 deletions
1
Makefile
1
Makefile
|
|
@ -14,6 +14,7 @@ webui-dev: webui-deps
|
|||
export RQBIT_UPNP_SERVER_ENABLE ?= true
|
||||
export RQBIT_UPNP_SERVER_FRIENDLY_NAME ?= rqbit-dev
|
||||
export RQBIT_HTTP_API_LISTEN_ADDR ?= 0.0.0.0:3030
|
||||
export RQBIT_FASTRESUME = true
|
||||
CARGO_RUN_FLAGS ?=
|
||||
RQBIT_OUTPUT_FOLDER ?= /tmp/scratch
|
||||
RQBIT_POSTGRES_CONNECTION_STRING ?= postgres:///rqbit
|
||||
|
|
|
|||
|
|
@ -173,12 +173,7 @@ impl<'a> FileOps<'a> {
|
|||
Ok(have_pieces)
|
||||
}
|
||||
|
||||
pub fn check_piece(
|
||||
&self,
|
||||
who_sent: PeerHandle,
|
||||
piece_index: ValidPieceIndex,
|
||||
last_received_chunk: &ChunkInfo,
|
||||
) -> anyhow::Result<bool> {
|
||||
pub fn check_piece(&self, piece_index: ValidPieceIndex) -> anyhow::Result<bool> {
|
||||
let mut h = Sha1::new();
|
||||
let piece_length = self.lengths.piece_length(piece_index);
|
||||
let mut absolute_offset = self.lengths.piece_offset(piece_index);
|
||||
|
|
@ -196,12 +191,10 @@ impl<'a> FileOps<'a> {
|
|||
let to_read_in_file: usize =
|
||||
std::cmp::min(file_remaining_len, piece_remaining_bytes as u64).try_into()?;
|
||||
trace!(
|
||||
"piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}",
|
||||
"piece={}, file_idx={}, seeking to {}",
|
||||
piece_index,
|
||||
who_sent,
|
||||
file_idx,
|
||||
absolute_offset,
|
||||
&last_received_chunk
|
||||
);
|
||||
update_hash_from_file(
|
||||
file_idx,
|
||||
|
|
|
|||
|
|
@ -5,13 +5,16 @@ use std::{
|
|||
|
||||
use anyhow::Context;
|
||||
|
||||
use itertools::Itertools;
|
||||
use librqbit_core::lengths::Lengths;
|
||||
use rand::Rng;
|
||||
use size_format::SizeFormatterBinary as SF;
|
||||
use tracing::{info, trace, warn};
|
||||
|
||||
use crate::{
|
||||
api::TorrentIdOrHash,
|
||||
bitv::BitV,
|
||||
bitv_factory::BitVFactory,
|
||||
chunk_tracker::ChunkTracker,
|
||||
file_ops::FileOps,
|
||||
type_aliases::{FileStorage, BF},
|
||||
|
|
@ -67,6 +70,61 @@ impl TorrentStateInitializing {
|
|||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
async fn validate_fastresume(
|
||||
&self,
|
||||
bitv_factory: &dyn BitVFactory,
|
||||
have_pieces: Option<Box<dyn BitV>>,
|
||||
) -> Option<Box<dyn BitV>> {
|
||||
let hp = have_pieces?;
|
||||
let actual = hp.as_bytes().len();
|
||||
let expected = self.shared.lengths.piece_bitfield_bytes();
|
||||
if actual != expected {
|
||||
warn!(
|
||||
actual,
|
||||
expected,
|
||||
"the bitfield loaded isn't of correct length, ignoring it, will do full check"
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
let is_broken = self.shared.spawner.spawn_block_in_place(|| {
|
||||
let fo = crate::file_ops::FileOps::new(
|
||||
&self.shared.info,
|
||||
&self.files,
|
||||
&self.shared.file_infos,
|
||||
&self.shared.lengths,
|
||||
);
|
||||
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
let mut have_pieces = hp
|
||||
.as_slice()
|
||||
.iter_ones()
|
||||
.filter_map(|i| self.shared.lengths.validate_piece_index(i.try_into().ok()?))
|
||||
.collect_vec();
|
||||
have_pieces.shuffle(&mut rand::thread_rng());
|
||||
|
||||
// Validate a certain threshold of fastresume pieces with decreasing probability of actual disk reads.
|
||||
for (denom_minus_one, hpiece) in have_pieces.into_iter().enumerate() {
|
||||
let denom: u32 = (denom_minus_one + 1).min(50).try_into().unwrap();
|
||||
if rand::thread_rng().gen_ratio(1, denom) && fo.check_piece(hpiece).is_err() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
});
|
||||
|
||||
if is_broken {
|
||||
warn!("data corrupted, ignoring fastresume data");
|
||||
if let Err(e) = bitv_factory.clear(self.shared.id.into()).await {
|
||||
warn!(error=?e, "error clearing bitfield");
|
||||
}
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(hp)
|
||||
}
|
||||
|
||||
pub async fn check(&self) -> anyhow::Result<TorrentStatePaused> {
|
||||
let id: TorrentIdOrHash = self.shared.info_hash.into();
|
||||
let bitv_factory = self
|
||||
|
|
@ -76,7 +134,7 @@ impl TorrentStateInitializing {
|
|||
.context("session is dead")?
|
||||
.bitv_factory
|
||||
.clone();
|
||||
let mut have_pieces = if self.previously_errored {
|
||||
let have_pieces = if self.previously_errored {
|
||||
if let Err(e) = bitv_factory.clear(id).await {
|
||||
warn!(error=?e, "error clearing bitfield");
|
||||
}
|
||||
|
|
@ -88,18 +146,8 @@ impl TorrentStateInitializing {
|
|||
.context("error loading have_pieces")?
|
||||
};
|
||||
|
||||
if let Some(hp) = have_pieces.as_ref() {
|
||||
let actual = hp.as_bytes().len();
|
||||
let expected = self.shared.lengths.piece_bitfield_bytes();
|
||||
if actual != expected {
|
||||
warn!(
|
||||
actual,
|
||||
expected,
|
||||
"the bitfield loaded isn't of correct length, ignoring it, will do full check"
|
||||
);
|
||||
have_pieces = None;
|
||||
}
|
||||
}
|
||||
let have_pieces = self.validate_fastresume(&*bitv_factory, have_pieces).await;
|
||||
|
||||
let have_pieces = match have_pieces {
|
||||
Some(h) => h,
|
||||
None => {
|
||||
|
|
|
|||
|
|
@ -1557,7 +1557,7 @@ impl PeerHandler {
|
|||
|
||||
match state
|
||||
.file_ops()
|
||||
.check_piece(addr, chunk_info.piece_index, chunk_info)
|
||||
.check_piece(chunk_info.piece_index)
|
||||
.with_context(|| format!("error checking piece={index}"))?
|
||||
{
|
||||
true => {
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
println!("{}: {s:#?}", r.location);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(location=%r.location, "error discovering")
|
||||
tracing::error!(error=?e, location=%r.location, "error discovering")
|
||||
}
|
||||
}
|
||||
drop(stx);
|
||||
|
|
@ -30,6 +30,6 @@ async fn main() -> anyhow::Result<()> {
|
|||
|
||||
let f3 = async move { while (srx.recv().await).is_some() {} };
|
||||
|
||||
tokio::join!(f1, f2, f3);
|
||||
tokio::join!(f1, f2, f3).0.unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue