diff --git a/crates/librqbit/src/file_checking.rs b/crates/librqbit/src/file_checking.rs deleted file mode 100644 index 277eb08..0000000 --- a/crates/librqbit/src/file_checking.rs +++ /dev/null @@ -1,195 +0,0 @@ -use std::{ - fs::File, - io::{Read, Seek, SeekFrom}, - sync::Arc, -}; - -use anyhow::Context; -use log::{debug, trace}; -use parking_lot::Mutex; - -use crate::{ - buffers::ByteString, - lengths::Lengths, - torrent_metainfo::{FileIteratorName, TorrentMetaV1Owned}, - type_aliases::BF, -}; - -pub struct InitialCheckResults { - pub needed_pieces: BF, - pub have_pieces: BF, - pub have_bytes: u64, - pub needed_bytes: u64, -} - -pub fn update_hash_from_file( - file: &mut File, - hash: &mut sha1::Sha1, - buf: &mut [u8], - mut bytes_to_read: usize, -) -> anyhow::Result<()> { - let mut read = 0; - while bytes_to_read > 0 { - let chunk = std::cmp::min(buf.len(), bytes_to_read); - file.read_exact(&mut buf[..chunk]).with_context(|| { - format!( - "failed reading chunk of size {}, read so far {}", - chunk, read - ) - })?; - bytes_to_read -= chunk; - read += chunk; - hash.update(&buf[..chunk]); - } - Ok(()) -} - -pub fn initial_check( - torrent: &TorrentMetaV1Owned, - files: &[Arc>], - only_files: Option<&[usize]>, - lengths: &Lengths, -) -> anyhow::Result { - let mut needed_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]); - let mut have_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]); - - let mut have_bytes = 0u64; - let mut needed_bytes = 0u64; - - struct CurrentFile<'a> { - index: usize, - fd: &'a Arc>, - len: u64, - name: FileIteratorName<'a, ByteString>, - full_file_required: bool, - processed_bytes: u64, - is_broken: bool, - } - impl<'a> CurrentFile<'a> { - fn remaining(&self) -> u64 { - self.len - self.processed_bytes - } - fn mark_processed_bytes(&mut self, bytes: u64) { - self.processed_bytes += bytes as u64 - } - } - let mut file_iterator = files - .iter() - .zip(torrent.info.iter_filenames_and_lengths()) - .enumerate() - .map(|(idx, (fd, (name, len)))| { - let full_file_required = if let Some(only_files) = only_files { - only_files.contains(&idx) - } else { - true - }; - CurrentFile { - index: idx, - fd, - len, - name, - full_file_required, - processed_bytes: 0, - is_broken: false, - } - }); - - let mut current_file = file_iterator - .next() - .ok_or_else(|| anyhow::anyhow!("empty input file list"))?; - - let mut read_buffer = vec![0u8; 65536]; - - for piece_info in lengths.iter_piece_infos() { - let mut computed_hash = sha1::Sha1::new(); - let mut piece_remaining = piece_info.len as usize; - let mut some_files_broken = false; - let mut at_least_one_file_required = current_file.full_file_required; - - while piece_remaining > 0 { - let mut to_read_in_file = - std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize; - while to_read_in_file == 0 { - current_file = file_iterator - .next() - .ok_or_else(|| anyhow::anyhow!("broken torrent metadata"))?; - - at_least_one_file_required |= current_file.full_file_required; - - to_read_in_file = - std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize; - } - - let pos = current_file.processed_bytes; - piece_remaining -= to_read_in_file; - current_file.mark_processed_bytes(to_read_in_file as u64); - - if current_file.is_broken { - // no need to read. - continue; - } - - let mut fd = current_file.fd.lock(); - - fd.seek(SeekFrom::Start(pos)).unwrap(); - if let Err(err) = update_hash_from_file( - &mut fd, - &mut computed_hash, - &mut read_buffer, - to_read_in_file, - ) { - debug!( - "error reading from file {} ({:?}) at {}: {:#}", - current_file.index, current_file.name, pos, &err - ); - current_file.is_broken = true; - some_files_broken = true; - } - } - - if at_least_one_file_required && some_files_broken { - trace!( - "piece {} had errors, marking as needed", - piece_info.piece_index - ); - - needed_bytes += piece_info.len as u64; - needed_pieces.set(piece_info.piece_index.get() as usize, true); - continue; - } - - if torrent - .info - .compare_hash(piece_info.piece_index.get(), &computed_hash) - .unwrap() - { - trace!( - "piece {} is fine, not marking as needed", - piece_info.piece_index - ); - have_bytes += piece_info.len as u64; - have_pieces.set(piece_info.piece_index.get() as usize, true); - } else { - if at_least_one_file_required { - trace!( - "piece {} hash does not match, marking as needed", - piece_info.piece_index - ); - needed_bytes += piece_info.len as u64; - needed_pieces.set(piece_info.piece_index.get() as usize, true); - } else { - trace!( - "piece {} hash does not match, but it is not required by any of the requested files, ignoring", - piece_info.piece_index - ); - } - } - } - - Ok(InitialCheckResults { - needed_pieces, - have_pieces, - have_bytes, - needed_bytes, - }) -} diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index a17687c..39c7dbf 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -2,7 +2,7 @@ pub mod buffers; pub mod chunk_tracker; pub mod clone_to_owned; pub mod constants; -pub mod file_checking; +pub mod files_ops; pub mod lengths; pub mod peer_binary_protocol; pub mod peer_connection; diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 71f5e9a..78f3dc6 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -19,7 +19,7 @@ use size_format::SizeFormatterBinary as SF; use crate::{ chunk_tracker::ChunkTracker, - file_checking::initial_check, + files_ops::initial_check, lengths::Lengths, peer_binary_protocol::MessageOwned, peer_connection::PeerConnection, diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index f2937e8..6a59720 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -1,7 +1,6 @@ use std::{ collections::{HashMap, HashSet}, fs::File, - io::{Read, Seek, SeekFrom, Write}, net::SocketAddr, sync::{ atomic::{AtomicU64, Ordering}, @@ -9,16 +8,15 @@ use std::{ }, }; -use anyhow::Context; use futures::{stream::FuturesUnordered, StreamExt}; -use log::{debug, warn}; +use log::warn; use parking_lot::{Mutex, RwLock}; use tokio::sync::mpsc::Sender; use crate::{ buffers::ByteString, chunk_tracker::ChunkTracker, - file_checking::update_hash_from_file, + files_ops::{check_piece, read_chunk, write_chunk}, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, peer_binary_protocol::{Handshake, Message, MessageOwned, Piece}, peer_state::{LivePeerState, PeerState}, @@ -162,55 +160,50 @@ pub struct TorrentState { } impl TorrentState { + pub fn check_piece_blocking( + &self, + who_sent: PeerHandle, + piece_index: ValidPieceIndex, + last_received_chunk: &ChunkInfo, + ) -> anyhow::Result { + check_piece( + &self.torrent, + &self.files, + &self.lengths, + who_sent, + piece_index, + last_received_chunk, + ) + } + pub fn read_chunk_blocking( &self, who_sent: PeerHandle, chunk_info: ChunkInfo, ) -> anyhow::Result> { - let mut absolute_offset = self.lengths.chunk_absolute_offset(&chunk_info); - let mut result_buf = vec![0u8; chunk_info.size as usize]; - let mut buf = &mut result_buf[..]; + read_chunk( + &self.torrent, + &self.files, + &self.lengths, + who_sent, + chunk_info, + ) + } - for (file_idx, file_len) in self.torrent.info.iter_file_lengths().enumerate() { - if absolute_offset > file_len { - absolute_offset -= file_len; - continue; - } - let file_remaining_len = file_len - absolute_offset; - let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize; - - let mut file_g = self.files[file_idx].lock(); - debug!( - "piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}", - chunk_info.piece_index, who_sent, file_idx, absolute_offset, &chunk_info - ); - file_g - .seek(SeekFrom::Start(absolute_offset)) - .with_context(|| { - format!( - "error seeking to {}, file id: {}", - absolute_offset, file_idx - ) - })?; - file_g - .read_exact(&mut buf[..to_read_in_file]) - .with_context(|| { - format!( - "error reading {} bytes, file_id: {}", - file_idx, to_read_in_file - ) - })?; - - buf = &mut buf[to_read_in_file..]; - - if buf.is_empty() { - break; - } - - absolute_offset = 0; - } - - return Ok(result_buf); + pub fn write_chunk_blocking( + &self, + who_sent: PeerHandle, + data: &Piece, + chunk_info: &ChunkInfo, + ) -> anyhow::Result<()> { + write_chunk( + &self.torrent, + &self.files, + &self.lengths, + who_sent, + data, + chunk_info, + ) } pub fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { @@ -256,78 +249,6 @@ impl TorrentState { Some(n) } - pub fn check_piece_blocking( - &self, - who_sent: PeerHandle, - piece_index: ValidPieceIndex, - last_received_chunk: &ChunkInfo, - ) -> anyhow::Result { - let mut h = sha1::Sha1::new(); - let piece_length = self.lengths.piece_length(piece_index); - let mut absolute_offset = self.lengths.piece_offset(piece_index); - let mut buf = vec![0u8; std::cmp::min(65536, piece_length as usize)]; - - let mut piece_remaining_bytes = piece_length as usize; - - for (file_idx, (name, file_len)) in - self.torrent.info.iter_filenames_and_lengths().enumerate() - { - if absolute_offset > file_len { - absolute_offset -= file_len; - continue; - } - let file_remaining_len = file_len - absolute_offset; - - let to_read_in_file = - std::cmp::min(file_remaining_len, piece_remaining_bytes as u64) as usize; - let mut file_g = self.files[file_idx].lock(); - debug!( - "piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}", - piece_index, who_sent, file_idx, absolute_offset, &last_received_chunk - ); - file_g - .seek(SeekFrom::Start(absolute_offset)) - .with_context(|| { - format!( - "error seeking to {}, file id: {}", - absolute_offset, file_idx - ) - })?; - update_hash_from_file(&mut file_g, &mut h, &mut buf, to_read_in_file).with_context( - || { - format!( - "error reading {} bytes, file_id: {} (\"{:?}\")", - to_read_in_file, file_idx, name - ) - }, - )?; - - piece_remaining_bytes -= to_read_in_file; - - if piece_remaining_bytes == 0 { - return Ok(true); - } - - absolute_offset = 0; - } - - match self.torrent.info.compare_hash(piece_index.get(), &h) { - Some(true) => { - debug!("piece={} hash matches", piece_index); - Ok(true) - } - Some(false) => { - warn!("the piece={} hash does not match", piece_index); - Ok(false) - } - None => { - // this is probably a bug? - warn!("compare_hash() did not find the piece"); - anyhow::bail!("compare_hash() did not find the piece"); - } - } - } - pub fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool { self.get_next_needed_piece(handle).is_some() } @@ -385,59 +306,6 @@ impl TorrentState { self.needed - self.get_downloaded() } - pub fn write_chunk_blocking( - &self, - who_sent: PeerHandle, - data: &Piece, - chunk_info: &ChunkInfo, - ) -> anyhow::Result<()> { - let mut buf = data.block.as_ref(); - let mut absolute_offset = self.lengths.chunk_absolute_offset(&chunk_info); - - for (file_idx, (name, file_len)) in - self.torrent.info.iter_filenames_and_lengths().enumerate() - { - if absolute_offset > file_len { - absolute_offset -= file_len; - continue; - } - - let remaining_len = file_len - absolute_offset; - let to_write = std::cmp::min(buf.len(), remaining_len as usize); - - let mut file_g = self.files[file_idx].lock(); - debug!( - "piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}", - chunk_info.piece_index, - chunk_info, - who_sent, - chunk_info.offset, - file_idx, - to_write, - absolute_offset - ); - file_g - .seek(SeekFrom::Start(absolute_offset)) - .with_context(|| { - format!( - "error seeking to {} in file {} (\"{:?}\")", - absolute_offset, file_idx, name - ) - })?; - file_g - .write_all(&buf[..to_write]) - .with_context(|| format!("error writing to file {} (\"{:?}\")", file_idx, name))?; - buf = &buf[to_write..]; - if buf.is_empty() { - break; - } - - absolute_offset = 0; - } - - Ok(()) - } - // TODO: this is a task per chunk, not good pub async fn task_transmit_haves(&self, index: u32) -> anyhow::Result<()> { let mut unordered = FuturesUnordered::new();