Refactor
This commit is contained in:
parent
205ce3265a
commit
bde18ab734
5 changed files with 204 additions and 184 deletions
|
|
@ -81,7 +81,7 @@ impl ChunkTracker {
|
|||
pub fn mark_chunk_request_cancelled(
|
||||
&mut self,
|
||||
index: ValidPieceIndex,
|
||||
chunk: u32,
|
||||
_chunk: u32,
|
||||
) -> Option<bool> {
|
||||
if *self.have.get(index.get() as usize)? {
|
||||
return Some(false);
|
||||
|
|
|
|||
195
crates/librqbit/src/file_checking.rs
Normal file
195
crates/librqbit/src/file_checking.rs
Normal file
|
|
@ -0,0 +1,195 @@
|
|||
use std::{
|
||||
fs::File,
|
||||
io::{Read, Seek, SeekFrom},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use log::{debug, trace};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use crate::{
|
||||
buffers::ByteString,
|
||||
lengths::Lengths,
|
||||
torrent_metainfo::{FileIteratorName, TorrentMetaV1Owned},
|
||||
type_aliases::BF,
|
||||
};
|
||||
|
||||
pub struct InitialCheckResults {
|
||||
pub needed_pieces: BF,
|
||||
pub have_pieces: BF,
|
||||
pub have_bytes: u64,
|
||||
pub needed_bytes: u64,
|
||||
}
|
||||
|
||||
pub fn update_hash_from_file(
|
||||
file: &mut File,
|
||||
hash: &mut sha1::Sha1,
|
||||
buf: &mut [u8],
|
||||
mut bytes_to_read: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut read = 0;
|
||||
while bytes_to_read > 0 {
|
||||
let chunk = std::cmp::min(buf.len(), bytes_to_read);
|
||||
file.read_exact(&mut buf[..chunk]).with_context(|| {
|
||||
format!(
|
||||
"failed reading chunk of size {}, read so far {}",
|
||||
chunk, read
|
||||
)
|
||||
})?;
|
||||
bytes_to_read -= chunk;
|
||||
read += chunk;
|
||||
hash.update(&buf[..chunk]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn initial_check(
|
||||
torrent: &TorrentMetaV1Owned,
|
||||
files: &[Arc<Mutex<File>>],
|
||||
only_files: Option<&[usize]>,
|
||||
lengths: &Lengths,
|
||||
) -> anyhow::Result<InitialCheckResults> {
|
||||
let mut needed_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]);
|
||||
let mut have_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]);
|
||||
|
||||
let mut have_bytes = 0u64;
|
||||
let mut needed_bytes = 0u64;
|
||||
|
||||
struct CurrentFile<'a> {
|
||||
index: usize,
|
||||
fd: &'a Arc<Mutex<File>>,
|
||||
len: u64,
|
||||
name: FileIteratorName<'a, ByteString>,
|
||||
full_file_required: bool,
|
||||
processed_bytes: u64,
|
||||
is_broken: bool,
|
||||
}
|
||||
impl<'a> CurrentFile<'a> {
|
||||
fn remaining(&self) -> u64 {
|
||||
self.len - self.processed_bytes
|
||||
}
|
||||
fn mark_processed_bytes(&mut self, bytes: u64) {
|
||||
self.processed_bytes += bytes as u64
|
||||
}
|
||||
}
|
||||
let mut file_iterator = files
|
||||
.iter()
|
||||
.zip(torrent.info.iter_filenames_and_lengths())
|
||||
.enumerate()
|
||||
.map(|(idx, (fd, (name, len)))| {
|
||||
let full_file_required = if let Some(only_files) = only_files {
|
||||
only_files.contains(&idx)
|
||||
} else {
|
||||
true
|
||||
};
|
||||
CurrentFile {
|
||||
index: idx,
|
||||
fd,
|
||||
len,
|
||||
name,
|
||||
full_file_required,
|
||||
processed_bytes: 0,
|
||||
is_broken: false,
|
||||
}
|
||||
});
|
||||
|
||||
let mut current_file = file_iterator
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("empty input file list"))?;
|
||||
|
||||
let mut read_buffer = vec![0u8; 65536];
|
||||
|
||||
for piece_info in lengths.iter_piece_infos() {
|
||||
let mut computed_hash = sha1::Sha1::new();
|
||||
let mut piece_remaining = piece_info.len as usize;
|
||||
let mut some_files_broken = false;
|
||||
let mut at_least_one_file_required = current_file.full_file_required;
|
||||
|
||||
while piece_remaining > 0 {
|
||||
let mut to_read_in_file =
|
||||
std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize;
|
||||
while to_read_in_file == 0 {
|
||||
current_file = file_iterator
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("broken torrent metadata"))?;
|
||||
|
||||
at_least_one_file_required |= current_file.full_file_required;
|
||||
|
||||
to_read_in_file =
|
||||
std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize;
|
||||
}
|
||||
|
||||
let pos = current_file.processed_bytes;
|
||||
piece_remaining -= to_read_in_file;
|
||||
current_file.mark_processed_bytes(to_read_in_file as u64);
|
||||
|
||||
if current_file.is_broken {
|
||||
// no need to read.
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut fd = current_file.fd.lock();
|
||||
|
||||
fd.seek(SeekFrom::Start(pos)).unwrap();
|
||||
if let Err(err) = update_hash_from_file(
|
||||
&mut fd,
|
||||
&mut computed_hash,
|
||||
&mut read_buffer,
|
||||
to_read_in_file,
|
||||
) {
|
||||
debug!(
|
||||
"error reading from file {} ({:?}) at {}: {:#}",
|
||||
current_file.index, current_file.name, pos, &err
|
||||
);
|
||||
current_file.is_broken = true;
|
||||
some_files_broken = true;
|
||||
}
|
||||
}
|
||||
|
||||
if at_least_one_file_required && some_files_broken {
|
||||
trace!(
|
||||
"piece {} had errors, marking as needed",
|
||||
piece_info.piece_index
|
||||
);
|
||||
|
||||
needed_bytes += piece_info.len as u64;
|
||||
needed_pieces.set(piece_info.piece_index.get() as usize, true);
|
||||
continue;
|
||||
}
|
||||
|
||||
if torrent
|
||||
.info
|
||||
.compare_hash(piece_info.piece_index.get(), &computed_hash)
|
||||
.unwrap()
|
||||
{
|
||||
trace!(
|
||||
"piece {} is fine, not marking as needed",
|
||||
piece_info.piece_index
|
||||
);
|
||||
have_bytes += piece_info.len as u64;
|
||||
have_pieces.set(piece_info.piece_index.get() as usize, true);
|
||||
} else {
|
||||
if at_least_one_file_required {
|
||||
trace!(
|
||||
"piece {} hash does not match, marking as needed",
|
||||
piece_info.piece_index
|
||||
);
|
||||
needed_bytes += piece_info.len as u64;
|
||||
needed_pieces.set(piece_info.piece_index.get() as usize, true);
|
||||
} else {
|
||||
trace!(
|
||||
"piece {} hash does not match, but it is not required by any of the requested files, ignoring",
|
||||
piece_info.piece_index
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(InitialCheckResults {
|
||||
needed_pieces,
|
||||
have_pieces,
|
||||
have_bytes,
|
||||
needed_bytes,
|
||||
})
|
||||
}
|
||||
|
|
@ -2,8 +2,10 @@ pub mod buffers;
|
|||
pub mod chunk_tracker;
|
||||
pub mod clone_to_owned;
|
||||
pub mod constants;
|
||||
pub mod file_checking;
|
||||
pub mod lengths;
|
||||
pub mod peer_binary_protocol;
|
||||
pub mod peer_connection;
|
||||
pub mod peer_id;
|
||||
pub mod serde_bencode;
|
||||
pub mod torrent_manager;
|
||||
|
|
|
|||
1
crates/librqbit/src/peer_connection.rs
Normal file
1
crates/librqbit/src/peer_connection.rs
Normal file
|
|
@ -0,0 +1 @@
|
|||
// TODO
|
||||
|
|
@ -27,12 +27,13 @@ use crate::{
|
|||
buffers::{ByteBuf, ByteString},
|
||||
chunk_tracker::{ChunkMarkingResult, ChunkTracker},
|
||||
clone_to_owned::CloneToOwned,
|
||||
file_checking::{initial_check, update_hash_from_file},
|
||||
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
|
||||
peer_binary_protocol::{
|
||||
Handshake, Message, MessageBorrowed, MessageDeserializeError, MessageOwned, Piece, Request,
|
||||
},
|
||||
peer_id::try_decode_peer_id,
|
||||
torrent_metainfo::{FileIteratorName, TorrentMetaV1Owned},
|
||||
torrent_metainfo::TorrentMetaV1Owned,
|
||||
tracker_comms::{CompactTrackerResponse, TrackerRequest, TrackerRequestEvent},
|
||||
};
|
||||
pub struct TorrentManagerBuilder {
|
||||
|
|
@ -300,185 +301,6 @@ fn make_lengths(torrent: &TorrentMetaV1Owned) -> anyhow::Result<Lengths> {
|
|||
Lengths::new(total_length, torrent.info.piece_length, None)
|
||||
}
|
||||
|
||||
fn update_hash_from_file(
|
||||
file: &mut File,
|
||||
hash: &mut sha1::Sha1,
|
||||
buf: &mut [u8],
|
||||
mut bytes_to_read: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut read = 0;
|
||||
while bytes_to_read > 0 {
|
||||
let chunk = std::cmp::min(buf.len(), bytes_to_read);
|
||||
file.read_exact(&mut buf[..chunk]).with_context(|| {
|
||||
format!(
|
||||
"failed reading chunk of size {}, read so far {}",
|
||||
chunk, read
|
||||
)
|
||||
})?;
|
||||
bytes_to_read -= chunk;
|
||||
read += chunk;
|
||||
hash.update(&buf[..chunk]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct InitialCheckResults {
|
||||
needed_pieces: BF,
|
||||
have_pieces: BF,
|
||||
have_bytes: u64,
|
||||
needed_bytes: u64,
|
||||
}
|
||||
|
||||
fn initial_check(
|
||||
torrent: &TorrentMetaV1Owned,
|
||||
files: &[Arc<Mutex<File>>],
|
||||
only_files: Option<&[usize]>,
|
||||
lengths: &Lengths,
|
||||
) -> anyhow::Result<InitialCheckResults> {
|
||||
let mut needed_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]);
|
||||
let mut have_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]);
|
||||
|
||||
let mut have_bytes = 0u64;
|
||||
let mut needed_bytes = 0u64;
|
||||
|
||||
struct CurrentFile<'a> {
|
||||
index: usize,
|
||||
fd: &'a Arc<Mutex<File>>,
|
||||
len: u64,
|
||||
name: FileIteratorName<'a, ByteString>,
|
||||
full_file_required: bool,
|
||||
processed_bytes: u64,
|
||||
is_broken: bool,
|
||||
}
|
||||
impl<'a> CurrentFile<'a> {
|
||||
fn remaining(&self) -> u64 {
|
||||
self.len - self.processed_bytes
|
||||
}
|
||||
fn mark_processed_bytes(&mut self, bytes: u64) {
|
||||
self.processed_bytes += bytes as u64
|
||||
}
|
||||
}
|
||||
let mut file_iterator = files
|
||||
.iter()
|
||||
.zip(torrent.info.iter_filenames_and_lengths())
|
||||
.enumerate()
|
||||
.map(|(idx, (fd, (name, len)))| {
|
||||
let full_file_required = if let Some(only_files) = only_files {
|
||||
only_files.contains(&idx)
|
||||
} else {
|
||||
true
|
||||
};
|
||||
CurrentFile {
|
||||
index: idx,
|
||||
fd,
|
||||
len,
|
||||
name,
|
||||
full_file_required,
|
||||
processed_bytes: 0,
|
||||
is_broken: false,
|
||||
}
|
||||
});
|
||||
|
||||
let mut current_file = file_iterator
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("empty input file list"))?;
|
||||
|
||||
let mut read_buffer = vec![0u8; 65536];
|
||||
|
||||
for piece_info in lengths.iter_piece_infos() {
|
||||
let mut computed_hash = sha1::Sha1::new();
|
||||
let mut piece_remaining = piece_info.len as usize;
|
||||
let mut some_files_broken = false;
|
||||
let mut at_least_one_file_required = current_file.full_file_required;
|
||||
|
||||
while piece_remaining > 0 {
|
||||
let mut to_read_in_file =
|
||||
std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize;
|
||||
while to_read_in_file == 0 {
|
||||
current_file = file_iterator
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("broken torrent metadata"))?;
|
||||
|
||||
at_least_one_file_required |= current_file.full_file_required;
|
||||
|
||||
to_read_in_file =
|
||||
std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize;
|
||||
}
|
||||
|
||||
let pos = current_file.processed_bytes;
|
||||
piece_remaining -= to_read_in_file;
|
||||
current_file.mark_processed_bytes(to_read_in_file as u64);
|
||||
|
||||
if current_file.is_broken {
|
||||
// no need to read.
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut fd = current_file.fd.lock();
|
||||
|
||||
fd.seek(SeekFrom::Start(pos)).unwrap();
|
||||
if let Err(err) = update_hash_from_file(
|
||||
&mut fd,
|
||||
&mut computed_hash,
|
||||
&mut read_buffer,
|
||||
to_read_in_file,
|
||||
) {
|
||||
debug!(
|
||||
"error reading from file {} ({:?}) at {}: {:#}",
|
||||
current_file.index, current_file.name, pos, &err
|
||||
);
|
||||
current_file.is_broken = true;
|
||||
some_files_broken = true;
|
||||
}
|
||||
}
|
||||
|
||||
if at_least_one_file_required && some_files_broken {
|
||||
trace!(
|
||||
"piece {} had errors, marking as needed",
|
||||
piece_info.piece_index
|
||||
);
|
||||
|
||||
needed_bytes += piece_info.len as u64;
|
||||
needed_pieces.set(piece_info.piece_index.get() as usize, true);
|
||||
continue;
|
||||
}
|
||||
|
||||
if torrent
|
||||
.info
|
||||
.compare_hash(piece_info.piece_index.get(), &computed_hash)
|
||||
.unwrap()
|
||||
{
|
||||
trace!(
|
||||
"piece {} is fine, not marking as needed",
|
||||
piece_info.piece_index
|
||||
);
|
||||
have_bytes += piece_info.len as u64;
|
||||
have_pieces.set(piece_info.piece_index.get() as usize, true);
|
||||
} else {
|
||||
if at_least_one_file_required {
|
||||
trace!(
|
||||
"piece {} hash does not match, marking as needed",
|
||||
piece_info.piece_index
|
||||
);
|
||||
needed_bytes += piece_info.len as u64;
|
||||
needed_pieces.set(piece_info.piece_index.get() as usize, true);
|
||||
} else {
|
||||
trace!(
|
||||
"piece {} hash does not match, but it is not required by any of the requested files, ignoring",
|
||||
piece_info.piece_index
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(InitialCheckResults {
|
||||
needed_pieces,
|
||||
have_pieces,
|
||||
have_bytes,
|
||||
needed_bytes,
|
||||
})
|
||||
}
|
||||
|
||||
impl TorrentManager {
|
||||
pub fn start<P: AsRef<Path>>(
|
||||
torrent: TorrentMetaV1Owned,
|
||||
|
|
@ -863,8 +685,8 @@ impl TorrentManager {
|
|||
None => return Ok(()),
|
||||
}
|
||||
|
||||
let (next, is_stolen) = match self.reserve_next_needed_piece(handle) {
|
||||
Some(next) => (next, false),
|
||||
let next = match self.reserve_next_needed_piece(handle) {
|
||||
Some(next) => next,
|
||||
None => {
|
||||
if self.get_left_to_download() == 0 {
|
||||
info!("{}: nothing left to download, closing requester", handle);
|
||||
|
|
@ -873,7 +695,7 @@ impl TorrentManager {
|
|||
|
||||
if let Some(piece) = self.try_steal_piece(handle) {
|
||||
info!("{}: stole a piece {}", handle, piece);
|
||||
(piece, true)
|
||||
piece
|
||||
} else {
|
||||
info!("no pieces to request from {}", handle);
|
||||
#[allow(unused_must_use)]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue