Merge pull request #235 from ikatson/fastresume-detect-missing-files

Fastresume: check at least one piece from each file + windows fix
This commit is contained in:
Igor Katson 2024-09-13 09:00:47 +01:00 committed by GitHub
commit 25ae54acd7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 59 additions and 19 deletions

View file

@ -8,6 +8,7 @@ use bitvec::{
vec::BitVec,
view::{AsBits, AsMutBits},
};
use tracing::trace;
pub trait BitV: Send + Sync {
fn as_slice(&self) -> &BitSlice<u8, Msb0>;
@ -24,6 +25,12 @@ pub struct MmapBitV {
mmap: memmap2::MmapMut,
}
impl Drop for MmapBitV {
fn drop(&mut self) {
trace!("dropping MmapBitV, this should unmap the .bitv file")
}
}
impl MmapBitV {
pub fn new(file: File) -> anyhow::Result<Self> {
let mmap =

View file

@ -1251,14 +1251,6 @@ impl Session {
debug!("error pausing torrent before deletion: {e:?}")
}
if let Some(p) = self.persistence.as_ref() {
if let Err(e) = p.delete(id).await {
error!(error=?e, "error deleting torrent from persistence database");
} else {
debug!(?id, "deleted torrent from persistence database")
}
}
let storage = removed
.with_state_mut(|s| match s.take() {
ManagedTorrentState::Initializing(p) => p.files.take().ok(),
@ -1277,6 +1269,14 @@ impl Session {
.map(Ok)
.unwrap_or_else(|| removed.shared.storage_factory.create(removed.shared()));
if let Some(p) = self.persistence.as_ref() {
if let Err(e) = p.delete(id).await {
error!(error=?e, "error deleting torrent from persistence database");
} else {
debug!(?id, "deleted torrent from persistence database")
}
}
match (storage, delete_files) {
(Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"),
(Ok(storage), true) => {

View file

@ -100,27 +100,60 @@ impl TorrentStateInitializing {
use rand::seq::SliceRandom;
let mut have_pieces = hp
.as_slice()
.iter_ones()
.filter_map(|i| self.shared.lengths.validate_piece_index(i.try_into().ok()?))
.collect_vec();
have_pieces.shuffle(&mut rand::thread_rng());
let mut to_validate = BF::from_boxed_slice(
vec![0u8; self.shared.lengths.piece_bitfield_bytes()].into_boxed_slice(),
);
let mut queue = hp.as_slice().to_owned();
// Validate a certain threshold of fastresume pieces with decreasing probability of actual disk reads.
for (tmp_id, hpiece) in have_pieces.iter().enumerate() {
// Validate at least one piece from each file, if we claim we have it.
for fi in self.shared.file_infos.iter() {
let prange = fi.piece_range_usize();
let offset = prange.start;
for piece_id in hp
.as_slice()
.get(fi.piece_range_usize())
.into_iter()
.flat_map(|s| s.iter_ones())
.map(|pid| pid + offset)
.take(1)
{
to_validate.set(piece_id, true);
queue.set(piece_id, false);
}
}
// For all the remaining pieces we claim we have, validate them with decreasing probability.
let mut queue = queue.iter_ones().collect_vec();
queue.shuffle(&mut rand::thread_rng());
for (tmp_id, piece_id) in queue.into_iter().enumerate() {
let denom: u32 = (tmp_id + 1).min(50).try_into().unwrap();
if rand::thread_rng().gen_ratio(1, denom) && fo.check_piece(*hpiece).is_err() {
if rand::thread_rng().gen_ratio(1, denom) {
to_validate.set(piece_id, true);
}
}
let to_validate_count = to_validate.count_ones();
for (id, piece_id) in to_validate
.iter_ones()
.filter_map(|id| {
self.shared
.lengths
.validate_piece_index(id.try_into().ok()?)
})
.enumerate()
{
if fo.check_piece(piece_id).is_err() {
return true;
}
#[allow(clippy::cast_possible_truncation)]
let progress = (self.shared.lengths.total_length() as f64
/ have_pieces.len() as f64
* (tmp_id + 1) as f64) as u64;
/ to_validate_count as f64
* (id + 1) as f64) as u64;
let progress = progress.min(self.shared.lengths.total_length());
self.checked_bytes.store(progress, Ordering::Relaxed);
}
false
});