File ops moved out
This commit is contained in:
parent
5f8100ebad
commit
e0ffb3afe1
4 changed files with 42 additions and 369 deletions
|
|
@ -1,195 +0,0 @@
|
||||||
use std::{
|
|
||||||
fs::File,
|
|
||||||
io::{Read, Seek, SeekFrom},
|
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use log::{debug, trace};
|
|
||||||
use parking_lot::Mutex;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
buffers::ByteString,
|
|
||||||
lengths::Lengths,
|
|
||||||
torrent_metainfo::{FileIteratorName, TorrentMetaV1Owned},
|
|
||||||
type_aliases::BF,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub struct InitialCheckResults {
|
|
||||||
pub needed_pieces: BF,
|
|
||||||
pub have_pieces: BF,
|
|
||||||
pub have_bytes: u64,
|
|
||||||
pub needed_bytes: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn update_hash_from_file(
|
|
||||||
file: &mut File,
|
|
||||||
hash: &mut sha1::Sha1,
|
|
||||||
buf: &mut [u8],
|
|
||||||
mut bytes_to_read: usize,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut read = 0;
|
|
||||||
while bytes_to_read > 0 {
|
|
||||||
let chunk = std::cmp::min(buf.len(), bytes_to_read);
|
|
||||||
file.read_exact(&mut buf[..chunk]).with_context(|| {
|
|
||||||
format!(
|
|
||||||
"failed reading chunk of size {}, read so far {}",
|
|
||||||
chunk, read
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
bytes_to_read -= chunk;
|
|
||||||
read += chunk;
|
|
||||||
hash.update(&buf[..chunk]);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn initial_check(
|
|
||||||
torrent: &TorrentMetaV1Owned,
|
|
||||||
files: &[Arc<Mutex<File>>],
|
|
||||||
only_files: Option<&[usize]>,
|
|
||||||
lengths: &Lengths,
|
|
||||||
) -> anyhow::Result<InitialCheckResults> {
|
|
||||||
let mut needed_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]);
|
|
||||||
let mut have_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]);
|
|
||||||
|
|
||||||
let mut have_bytes = 0u64;
|
|
||||||
let mut needed_bytes = 0u64;
|
|
||||||
|
|
||||||
struct CurrentFile<'a> {
|
|
||||||
index: usize,
|
|
||||||
fd: &'a Arc<Mutex<File>>,
|
|
||||||
len: u64,
|
|
||||||
name: FileIteratorName<'a, ByteString>,
|
|
||||||
full_file_required: bool,
|
|
||||||
processed_bytes: u64,
|
|
||||||
is_broken: bool,
|
|
||||||
}
|
|
||||||
impl<'a> CurrentFile<'a> {
|
|
||||||
fn remaining(&self) -> u64 {
|
|
||||||
self.len - self.processed_bytes
|
|
||||||
}
|
|
||||||
fn mark_processed_bytes(&mut self, bytes: u64) {
|
|
||||||
self.processed_bytes += bytes as u64
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let mut file_iterator = files
|
|
||||||
.iter()
|
|
||||||
.zip(torrent.info.iter_filenames_and_lengths())
|
|
||||||
.enumerate()
|
|
||||||
.map(|(idx, (fd, (name, len)))| {
|
|
||||||
let full_file_required = if let Some(only_files) = only_files {
|
|
||||||
only_files.contains(&idx)
|
|
||||||
} else {
|
|
||||||
true
|
|
||||||
};
|
|
||||||
CurrentFile {
|
|
||||||
index: idx,
|
|
||||||
fd,
|
|
||||||
len,
|
|
||||||
name,
|
|
||||||
full_file_required,
|
|
||||||
processed_bytes: 0,
|
|
||||||
is_broken: false,
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut current_file = file_iterator
|
|
||||||
.next()
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("empty input file list"))?;
|
|
||||||
|
|
||||||
let mut read_buffer = vec![0u8; 65536];
|
|
||||||
|
|
||||||
for piece_info in lengths.iter_piece_infos() {
|
|
||||||
let mut computed_hash = sha1::Sha1::new();
|
|
||||||
let mut piece_remaining = piece_info.len as usize;
|
|
||||||
let mut some_files_broken = false;
|
|
||||||
let mut at_least_one_file_required = current_file.full_file_required;
|
|
||||||
|
|
||||||
while piece_remaining > 0 {
|
|
||||||
let mut to_read_in_file =
|
|
||||||
std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize;
|
|
||||||
while to_read_in_file == 0 {
|
|
||||||
current_file = file_iterator
|
|
||||||
.next()
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("broken torrent metadata"))?;
|
|
||||||
|
|
||||||
at_least_one_file_required |= current_file.full_file_required;
|
|
||||||
|
|
||||||
to_read_in_file =
|
|
||||||
std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize;
|
|
||||||
}
|
|
||||||
|
|
||||||
let pos = current_file.processed_bytes;
|
|
||||||
piece_remaining -= to_read_in_file;
|
|
||||||
current_file.mark_processed_bytes(to_read_in_file as u64);
|
|
||||||
|
|
||||||
if current_file.is_broken {
|
|
||||||
// no need to read.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut fd = current_file.fd.lock();
|
|
||||||
|
|
||||||
fd.seek(SeekFrom::Start(pos)).unwrap();
|
|
||||||
if let Err(err) = update_hash_from_file(
|
|
||||||
&mut fd,
|
|
||||||
&mut computed_hash,
|
|
||||||
&mut read_buffer,
|
|
||||||
to_read_in_file,
|
|
||||||
) {
|
|
||||||
debug!(
|
|
||||||
"error reading from file {} ({:?}) at {}: {:#}",
|
|
||||||
current_file.index, current_file.name, pos, &err
|
|
||||||
);
|
|
||||||
current_file.is_broken = true;
|
|
||||||
some_files_broken = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if at_least_one_file_required && some_files_broken {
|
|
||||||
trace!(
|
|
||||||
"piece {} had errors, marking as needed",
|
|
||||||
piece_info.piece_index
|
|
||||||
);
|
|
||||||
|
|
||||||
needed_bytes += piece_info.len as u64;
|
|
||||||
needed_pieces.set(piece_info.piece_index.get() as usize, true);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if torrent
|
|
||||||
.info
|
|
||||||
.compare_hash(piece_info.piece_index.get(), &computed_hash)
|
|
||||||
.unwrap()
|
|
||||||
{
|
|
||||||
trace!(
|
|
||||||
"piece {} is fine, not marking as needed",
|
|
||||||
piece_info.piece_index
|
|
||||||
);
|
|
||||||
have_bytes += piece_info.len as u64;
|
|
||||||
have_pieces.set(piece_info.piece_index.get() as usize, true);
|
|
||||||
} else {
|
|
||||||
if at_least_one_file_required {
|
|
||||||
trace!(
|
|
||||||
"piece {} hash does not match, marking as needed",
|
|
||||||
piece_info.piece_index
|
|
||||||
);
|
|
||||||
needed_bytes += piece_info.len as u64;
|
|
||||||
needed_pieces.set(piece_info.piece_index.get() as usize, true);
|
|
||||||
} else {
|
|
||||||
trace!(
|
|
||||||
"piece {} hash does not match, but it is not required by any of the requested files, ignoring",
|
|
||||||
piece_info.piece_index
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(InitialCheckResults {
|
|
||||||
needed_pieces,
|
|
||||||
have_pieces,
|
|
||||||
have_bytes,
|
|
||||||
needed_bytes,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
@ -2,7 +2,7 @@ pub mod buffers;
|
||||||
pub mod chunk_tracker;
|
pub mod chunk_tracker;
|
||||||
pub mod clone_to_owned;
|
pub mod clone_to_owned;
|
||||||
pub mod constants;
|
pub mod constants;
|
||||||
pub mod file_checking;
|
pub mod files_ops;
|
||||||
pub mod lengths;
|
pub mod lengths;
|
||||||
pub mod peer_binary_protocol;
|
pub mod peer_binary_protocol;
|
||||||
pub mod peer_connection;
|
pub mod peer_connection;
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ use size_format::SizeFormatterBinary as SF;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
chunk_tracker::ChunkTracker,
|
chunk_tracker::ChunkTracker,
|
||||||
file_checking::initial_check,
|
files_ops::initial_check,
|
||||||
lengths::Lengths,
|
lengths::Lengths,
|
||||||
peer_binary_protocol::MessageOwned,
|
peer_binary_protocol::MessageOwned,
|
||||||
peer_connection::PeerConnection,
|
peer_connection::PeerConnection,
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
fs::File,
|
fs::File,
|
||||||
io::{Read, Seek, SeekFrom, Write},
|
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicU64, Ordering},
|
atomic::{AtomicU64, Ordering},
|
||||||
|
|
@ -9,16 +8,15 @@ use std::{
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use futures::{stream::FuturesUnordered, StreamExt};
|
use futures::{stream::FuturesUnordered, StreamExt};
|
||||||
use log::{debug, warn};
|
use log::warn;
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::{Mutex, RwLock};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
buffers::ByteString,
|
buffers::ByteString,
|
||||||
chunk_tracker::ChunkTracker,
|
chunk_tracker::ChunkTracker,
|
||||||
file_checking::update_hash_from_file,
|
files_ops::{check_piece, read_chunk, write_chunk},
|
||||||
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
|
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
|
||||||
peer_binary_protocol::{Handshake, Message, MessageOwned, Piece},
|
peer_binary_protocol::{Handshake, Message, MessageOwned, Piece},
|
||||||
peer_state::{LivePeerState, PeerState},
|
peer_state::{LivePeerState, PeerState},
|
||||||
|
|
@ -162,55 +160,50 @@ pub struct TorrentState {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TorrentState {
|
impl TorrentState {
|
||||||
|
pub fn check_piece_blocking(
|
||||||
|
&self,
|
||||||
|
who_sent: PeerHandle,
|
||||||
|
piece_index: ValidPieceIndex,
|
||||||
|
last_received_chunk: &ChunkInfo,
|
||||||
|
) -> anyhow::Result<bool> {
|
||||||
|
check_piece(
|
||||||
|
&self.torrent,
|
||||||
|
&self.files,
|
||||||
|
&self.lengths,
|
||||||
|
who_sent,
|
||||||
|
piece_index,
|
||||||
|
last_received_chunk,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn read_chunk_blocking(
|
pub fn read_chunk_blocking(
|
||||||
&self,
|
&self,
|
||||||
who_sent: PeerHandle,
|
who_sent: PeerHandle,
|
||||||
chunk_info: ChunkInfo,
|
chunk_info: ChunkInfo,
|
||||||
) -> anyhow::Result<Vec<u8>> {
|
) -> anyhow::Result<Vec<u8>> {
|
||||||
let mut absolute_offset = self.lengths.chunk_absolute_offset(&chunk_info);
|
read_chunk(
|
||||||
let mut result_buf = vec![0u8; chunk_info.size as usize];
|
&self.torrent,
|
||||||
let mut buf = &mut result_buf[..];
|
&self.files,
|
||||||
|
&self.lengths,
|
||||||
|
who_sent,
|
||||||
|
chunk_info,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
for (file_idx, file_len) in self.torrent.info.iter_file_lengths().enumerate() {
|
pub fn write_chunk_blocking(
|
||||||
if absolute_offset > file_len {
|
&self,
|
||||||
absolute_offset -= file_len;
|
who_sent: PeerHandle,
|
||||||
continue;
|
data: &Piece<ByteString>,
|
||||||
}
|
chunk_info: &ChunkInfo,
|
||||||
let file_remaining_len = file_len - absolute_offset;
|
) -> anyhow::Result<()> {
|
||||||
let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize;
|
write_chunk(
|
||||||
|
&self.torrent,
|
||||||
let mut file_g = self.files[file_idx].lock();
|
&self.files,
|
||||||
debug!(
|
&self.lengths,
|
||||||
"piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}",
|
who_sent,
|
||||||
chunk_info.piece_index, who_sent, file_idx, absolute_offset, &chunk_info
|
data,
|
||||||
);
|
chunk_info,
|
||||||
file_g
|
)
|
||||||
.seek(SeekFrom::Start(absolute_offset))
|
|
||||||
.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"error seeking to {}, file id: {}",
|
|
||||||
absolute_offset, file_idx
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
file_g
|
|
||||||
.read_exact(&mut buf[..to_read_in_file])
|
|
||||||
.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"error reading {} bytes, file_id: {}",
|
|
||||||
file_idx, to_read_in_file
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
buf = &mut buf[to_read_in_file..];
|
|
||||||
|
|
||||||
if buf.is_empty() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
absolute_offset = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(result_buf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
|
pub fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
|
||||||
|
|
@ -256,78 +249,6 @@ impl TorrentState {
|
||||||
Some(n)
|
Some(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_piece_blocking(
|
|
||||||
&self,
|
|
||||||
who_sent: PeerHandle,
|
|
||||||
piece_index: ValidPieceIndex,
|
|
||||||
last_received_chunk: &ChunkInfo,
|
|
||||||
) -> anyhow::Result<bool> {
|
|
||||||
let mut h = sha1::Sha1::new();
|
|
||||||
let piece_length = self.lengths.piece_length(piece_index);
|
|
||||||
let mut absolute_offset = self.lengths.piece_offset(piece_index);
|
|
||||||
let mut buf = vec![0u8; std::cmp::min(65536, piece_length as usize)];
|
|
||||||
|
|
||||||
let mut piece_remaining_bytes = piece_length as usize;
|
|
||||||
|
|
||||||
for (file_idx, (name, file_len)) in
|
|
||||||
self.torrent.info.iter_filenames_and_lengths().enumerate()
|
|
||||||
{
|
|
||||||
if absolute_offset > file_len {
|
|
||||||
absolute_offset -= file_len;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let file_remaining_len = file_len - absolute_offset;
|
|
||||||
|
|
||||||
let to_read_in_file =
|
|
||||||
std::cmp::min(file_remaining_len, piece_remaining_bytes as u64) as usize;
|
|
||||||
let mut file_g = self.files[file_idx].lock();
|
|
||||||
debug!(
|
|
||||||
"piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}",
|
|
||||||
piece_index, who_sent, file_idx, absolute_offset, &last_received_chunk
|
|
||||||
);
|
|
||||||
file_g
|
|
||||||
.seek(SeekFrom::Start(absolute_offset))
|
|
||||||
.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"error seeking to {}, file id: {}",
|
|
||||||
absolute_offset, file_idx
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
update_hash_from_file(&mut file_g, &mut h, &mut buf, to_read_in_file).with_context(
|
|
||||||
|| {
|
|
||||||
format!(
|
|
||||||
"error reading {} bytes, file_id: {} (\"{:?}\")",
|
|
||||||
to_read_in_file, file_idx, name
|
|
||||||
)
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
|
|
||||||
piece_remaining_bytes -= to_read_in_file;
|
|
||||||
|
|
||||||
if piece_remaining_bytes == 0 {
|
|
||||||
return Ok(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
absolute_offset = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
match self.torrent.info.compare_hash(piece_index.get(), &h) {
|
|
||||||
Some(true) => {
|
|
||||||
debug!("piece={} hash matches", piece_index);
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
Some(false) => {
|
|
||||||
warn!("the piece={} hash does not match", piece_index);
|
|
||||||
Ok(false)
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
// this is probably a bug?
|
|
||||||
warn!("compare_hash() did not find the piece");
|
|
||||||
anyhow::bail!("compare_hash() did not find the piece");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool {
|
pub fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool {
|
||||||
self.get_next_needed_piece(handle).is_some()
|
self.get_next_needed_piece(handle).is_some()
|
||||||
}
|
}
|
||||||
|
|
@ -385,59 +306,6 @@ impl TorrentState {
|
||||||
self.needed - self.get_downloaded()
|
self.needed - self.get_downloaded()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_chunk_blocking(
|
|
||||||
&self,
|
|
||||||
who_sent: PeerHandle,
|
|
||||||
data: &Piece<ByteString>,
|
|
||||||
chunk_info: &ChunkInfo,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut buf = data.block.as_ref();
|
|
||||||
let mut absolute_offset = self.lengths.chunk_absolute_offset(&chunk_info);
|
|
||||||
|
|
||||||
for (file_idx, (name, file_len)) in
|
|
||||||
self.torrent.info.iter_filenames_and_lengths().enumerate()
|
|
||||||
{
|
|
||||||
if absolute_offset > file_len {
|
|
||||||
absolute_offset -= file_len;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let remaining_len = file_len - absolute_offset;
|
|
||||||
let to_write = std::cmp::min(buf.len(), remaining_len as usize);
|
|
||||||
|
|
||||||
let mut file_g = self.files[file_idx].lock();
|
|
||||||
debug!(
|
|
||||||
"piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}",
|
|
||||||
chunk_info.piece_index,
|
|
||||||
chunk_info,
|
|
||||||
who_sent,
|
|
||||||
chunk_info.offset,
|
|
||||||
file_idx,
|
|
||||||
to_write,
|
|
||||||
absolute_offset
|
|
||||||
);
|
|
||||||
file_g
|
|
||||||
.seek(SeekFrom::Start(absolute_offset))
|
|
||||||
.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"error seeking to {} in file {} (\"{:?}\")",
|
|
||||||
absolute_offset, file_idx, name
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
file_g
|
|
||||||
.write_all(&buf[..to_write])
|
|
||||||
.with_context(|| format!("error writing to file {} (\"{:?}\")", file_idx, name))?;
|
|
||||||
buf = &buf[to_write..];
|
|
||||||
if buf.is_empty() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
absolute_offset = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: this is a task per chunk, not good
|
// TODO: this is a task per chunk, not good
|
||||||
pub async fn task_transmit_haves(&self, index: u32) -> anyhow::Result<()> {
|
pub async fn task_transmit_haves(&self, index: u32) -> anyhow::Result<()> {
|
||||||
let mut unordered = FuturesUnordered::new();
|
let mut unordered = FuturesUnordered::new();
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue