From d546dfd1e6f70fc3d80905b8b736b989d4673a21 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 26 Jun 2021 16:43:36 +0100 Subject: [PATCH] Things are pretty broken now --- Cargo.lock | 1 + Cargo.toml | 1 + TODO.md | 13 ++ crates/librqbit/src/lengths.rs | 17 ++ crates/librqbit/src/torrent_manager.rs | 270 +++++++++++++++++++++--- crates/librqbit/src/torrent_metainfo.rs | 43 +++- src/main.rs | 54 ++++- 7 files changed, 364 insertions(+), 35 deletions(-) create mode 100644 TODO.md diff --git a/Cargo.lock b/Cargo.lock index 992e9c1..f271561 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -904,6 +904,7 @@ dependencies = [ "librqbit", "log", "pretty_env_logger", + "regex", "reqwest", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index d10cb58..53d26d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ clap = "3.0.0-beta.2" log = "0.4" pretty_env_logger = "0.4" reqwest = "0.11" +regex = "1" [dev-dependencies] futures = {version = "0.3"} diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..6c7ca83 --- /dev/null +++ b/TODO.md @@ -0,0 +1,13 @@ +- [ ] Selective file downloading (mostly done) + - [ ] Seeking optimization + - If a file is not needed, no need to check its hash + - [ ] Proper counting of how much is left, and how much is downloaded + +- [ ] Refactor "needed pieces" into a bitfield +- [ ] Send bitfield at the start if I have something +- [ ] use the "update_hash" function in piece checking +- [ ] signaling when file is done + + +someday: +- [ ] cancellation \ No newline at end of file diff --git a/crates/librqbit/src/lengths.rs b/crates/librqbit/src/lengths.rs index 4b0feb8..93f7294 100644 --- a/crates/librqbit/src/lengths.rs +++ b/crates/librqbit/src/lengths.rs @@ -16,6 +16,12 @@ pub const fn last_element_size_u64(total: u64, chunk_size: u64) -> u64 { rem } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PieceInfo { + pub piece_index: ValidPieceIndex, + pub len: u32, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ChunkInfo { pub piece_index: ValidPieceIndex, @@ -119,6 +125,17 @@ impl Lengths { pub const fn piece_offset(&self, index: ValidPieceIndex) -> u64 { index.0 as u64 * self.piece_length as u64 } + + pub fn iter_piece_infos(&self) -> impl Iterator { + let last_id = self.last_piece_id; + let last_len = self.last_piece_length; + let pl = self.piece_length; + (0..self.total_pieces()).map(move |idx| PieceInfo { + piece_index: ValidPieceIndex(idx), + len: if idx == last_id { last_len } else { pl }, + }) + } + pub fn iter_chunk_infos(&self, index: ValidPieceIndex) -> impl Iterator { let mut remaining = self.piece_length(index); let chunk_size = self.chunk_length; diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index a835fb4..0469dd3 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -3,7 +3,7 @@ use std::{ fmt::Display, fs::{File, OpenOptions}, future::Future, - io::{Read, Seek, Write}, + io::{self, Read, Seek, SeekFrom, Write}, net::SocketAddr, path::{Path, PathBuf}, sync::{ @@ -32,13 +32,14 @@ use crate::{ Handshake, Message, MessageBorrowed, MessageDeserializeError, MessageOwned, Piece, Request, }, peer_id::try_decode_peer_id, - torrent_metainfo::TorrentMetaV1Owned, + torrent_metainfo::{FileIteratorName, TorrentMetaV1Owned}, tracker_comms::{CompactTrackerResponse, TrackerRequest, TrackerRequestEvent}, }; pub struct TorrentManagerBuilder { torrent: TorrentMetaV1Owned, overwrite: bool, output_folder: PathBuf, + only_files: Option>, } impl TorrentManagerBuilder { @@ -47,16 +48,27 @@ impl TorrentManagerBuilder { torrent, overwrite: false, output_folder: output_folder.as_ref().into(), + only_files: None, } } - pub fn overwrite(mut self, overwrite: bool) -> Self { + pub fn only_files(&mut self, only_files: Vec) -> &mut Self { + self.only_files = Some(only_files); + self + } + + pub fn overwrite(&mut self, overwrite: bool) -> &mut Self { self.overwrite = overwrite; self } pub async fn start_manager(self) -> anyhow::Result { - TorrentManager::start(self.torrent, self.output_folder, self.overwrite) + TorrentManager::start( + self.torrent, + self.output_folder, + self.overwrite, + self.only_files, + ) } } @@ -264,15 +276,206 @@ fn make_lengths(torrent: &TorrentMetaV1Owned) -> anyhow::Result { Lengths::new(total_length, torrent.info.piece_length, None) } +fn update_hash_from_file( + file: &mut File, + hash: &mut sha1::Sha1, + buf: &mut [u8], + mut bytes_to_read: usize, +) -> anyhow::Result<()> { + let mut read = 0; + while bytes_to_read > 0 { + let chunk = std::cmp::min(buf.len(), bytes_to_read); + file.read_exact(&mut buf[..chunk]).with_context(|| { + format!( + "failed reading chunk of size {}, read so far {}", + chunk, read + ) + })?; + bytes_to_read -= chunk; + read += chunk; + hash.update(&buf[..chunk]); + } + Ok(()) +} + fn compute_needed_pieces( torrent: &TorrentMetaV1Owned, - files: &mut [Arc>], + files: &[Arc>], + only_files: Option<&[usize]>, lengths: &Lengths, ) -> anyhow::Result { - let needed_pieces = vec![u8::MAX; lengths.piece_bitfield_bytes()]; - let needed_pieces = BF::from_vec(needed_pieces); + let needed_pieces = vec![0u8; lengths.piece_bitfield_bytes()]; + let mut needed_pieces = BF::from_vec(needed_pieces); + struct CurrentFile<'a> { + index: usize, + fd: &'a Arc>, + len: u64, + name: FileIteratorName<'a, ByteString>, + full_file_required: bool, + processed_bytes: u64, + is_broken: bool, + } + impl<'a> CurrentFile<'a> { + fn remaining(&self) -> u64 { + self.len - self.processed_bytes + } + fn mark_processed_bytes(&mut self, bytes: u64) { + self.processed_bytes += bytes as u64 + } + } + let mut file_iterator = files + .iter() + .zip(torrent.info.iter_filenames_and_lengths()) + .enumerate() + .map(|(idx, (fd, (name, len)))| { + let full_file_required = if let Some(only_files) = only_files { + only_files.contains(&idx) + } else { + true + }; + CurrentFile { + index: idx, + fd, + len, + name, + full_file_required, + processed_bytes: 0, + is_broken: false, + } + }); + + let mut current_file = file_iterator + .next() + .ok_or_else(|| anyhow::anyhow!("empty input file list"))?; + + let mut read_buffer = vec![0u8; 65536]; + + for piece_info in lengths.iter_piece_infos() { + // We need to compute the hash (and afterwards mark the piece as NOT needed) if ANY of the following are true + // - the file is required + // - the current piece is required (i.e. it's a part of some other file that is required) + + // This means, that for an easy implementation: + // - we ALWAYS try to compute the hash from existing files + // - after the whole piece was processed, we mark the piece needed if: + // - at least one file that the piece owns was required + // - and (there were errors OR the hash does not match) + // + // If there's an error, it's fine only if none of the files was required. + + // let mut seek: Option = None; + + // Optimization for a common case: if the piece is wholy in the file, and the file is not required, continue + + // if !current_file.full_file_required && current_file.remaining() >= piece_info.len as u64 { + // seek = match seek { + // None => Some(piece_info.len as u64), + // Some(s) => { + // current_file.mark_processed_bytes(piece_info.len as u64); + // Some(s + piece_info.len as u64) + // } + // }; + // continue; + // } + + let mut computed_hash = sha1::Sha1::new(); + let mut piece_remaining = piece_info.len as usize; + let mut piece_is_needed = false; + let mut at_least_one_file_required = current_file.full_file_required; + + while piece_remaining > 0 { + let mut to_read_in_file = + std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize; + while to_read_in_file == 0 { + current_file = file_iterator + .next() + .ok_or_else(|| anyhow::anyhow!("broken torrent metadata"))?; + + at_least_one_file_required |= current_file.full_file_required; + + to_read_in_file = + std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize; + } + + let pos = current_file.processed_bytes; + piece_remaining -= to_read_in_file; + current_file.mark_processed_bytes(to_read_in_file as u64); + + if current_file.is_broken { + piece_is_needed = true; + continue; + } + + if piece_is_needed { + continue; + } + + let mut fd = current_file.fd.lock(); + + // if let Some(offset) = seek.take() { + // match fd.seek(SeekFrom::Start(offset)) { + // Ok(v) => { + // assert_eq!(v, offset) + // } + // Err(e) => { + // debug!( + // "error seeking in file {} to {}: {:#}", + // current_file.index, offset, &e + // ); + // piece_is_needed = true; + // current_file.is_broken = true; + // continue; + // } + // } + // } + + fd.seek(SeekFrom::Start(pos)).unwrap(); + if let Err(err) = update_hash_from_file( + &mut fd, + &mut computed_hash, + &mut read_buffer, + to_read_in_file, + ) { + debug!( + "error reading from file {} ({:?}) at {}: {:#}", + current_file.index, current_file.name, pos, &err + ); + piece_is_needed = true; + current_file.is_broken = true; + } + } + + if !at_least_one_file_required { + continue; + } + + if piece_is_needed { + trace!( + "piece {} had errors, marking as needed", + piece_info.piece_index + ); + needed_pieces.set(piece_info.piece_index.get() as usize, true); + continue; + } + + if torrent + .info + .compare_hash(piece_info.piece_index.get(), &computed_hash) + .unwrap() + { + trace!( + "piece {} is fine, not marking as needed", + piece_info.piece_index + ); + } else { + trace!( + "piece {} hash does not match, marking as needed", + piece_info.piece_index + ); + needed_pieces.set(piece_info.piece_index.get() as usize, true); + } + } - // TODO: read and validate existing files Ok(needed_pieces) } @@ -281,6 +484,7 @@ impl TorrentManager { torrent: TorrentMetaV1Owned, out: P, overwrite: bool, + only_files: Option>, ) -> anyhow::Result { let mut files = { let mut files = @@ -319,7 +523,8 @@ impl TorrentManager { let peer_id = generate_peer_id(); let lengths = make_lengths(&torrent).context("unable to compute Lengths from torrent")?; - let needed_pieces = compute_needed_pieces(&torrent, &mut files, &lengths)?; + let needed_pieces = + compute_needed_pieces(&torrent, &files, only_files.as_deref(), &lengths)?; debug!("computed lengths: {:?}", &lengths); let chunk_tracker = ChunkTracker::new(needed_pieces, lengths); @@ -522,7 +727,7 @@ impl TorrentManager { chunk_info.piece_index, who_sent, file_idx, absolute_offset, &chunk_info ); file_g - .seek(std::io::SeekFrom::Start(absolute_offset)) + .seek(SeekFrom::Start(absolute_offset)) .with_context(|| { format!( "error seeking to {}, file id: {}", @@ -736,49 +941,50 @@ impl TorrentManager { let mut h = sha1::Sha1::new(); let piece_length = self.inner.lengths.piece_length(piece_index); let mut absolute_offset = self.inner.lengths.piece_offset(piece_index); - let mut buf = vec![0u8; std::cmp::min(8192, piece_length as usize)]; + let mut buf = vec![0u8; std::cmp::min(65536, piece_length as usize)]; - let mut left_to_read = piece_length as usize; + let mut piece_remaining_bytes = piece_length as usize; - for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() { + for (file_idx, (name, file_len)) in self + .inner + .torrent + .info + .iter_filenames_and_lengths() + .enumerate() + { if absolute_offset > file_len { absolute_offset -= file_len; continue; } let file_remaining_len = file_len - absolute_offset; - let to_read_in_file = std::cmp::min(file_remaining_len, left_to_read as u64) as usize; - let mut left_to_read_in_file = to_read_in_file; + let to_read_in_file = + std::cmp::min(file_remaining_len, piece_remaining_bytes as u64) as usize; let mut file_g = self.inner.files[file_idx].lock(); debug!( "piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}", piece_index, who_sent, file_idx, absolute_offset, &last_received_chunk ); file_g - .seek(std::io::SeekFrom::Start(absolute_offset)) + .seek(SeekFrom::Start(absolute_offset)) .with_context(|| { format!( "error seeking to {}, file id: {}", absolute_offset, file_idx ) })?; - while left_to_read_in_file > 0 { - let chunk_length = std::cmp::min(buf.len(), left_to_read_in_file); - file_g - .read_exact(&mut buf[..chunk_length]) - .with_context(|| { - format!( - "error reading {} bytes, file_id: {}, left_to_read_in_file: {}", - chunk_length, file_idx, left_to_read_in_file - ) - })?; - h.update(&buf[..chunk_length]); - left_to_read_in_file -= chunk_length; - } + update_hash_from_file(&mut file_g, &mut h, &mut buf, to_read_in_file).with_context( + || { + format!( + "error reading {} bytes, file_id: {} (\"{:?}\")", + to_read_in_file, file_idx, name + ) + }, + )?; - left_to_read -= to_read_in_file; + piece_remaining_bytes -= to_read_in_file; - if left_to_read == 0 { + if piece_remaining_bytes == 0 { return Ok(true); } @@ -857,7 +1063,7 @@ impl TorrentManager { to_write, absolute_offset ); - file_g.seek(std::io::SeekFrom::Start(absolute_offset))?; + file_g.seek(SeekFrom::Start(absolute_offset))?; file_g.write_all(&buf[..to_write])?; buf = &buf[to_write..]; if buf.is_empty() { diff --git a/crates/librqbit/src/torrent_metainfo.rs b/crates/librqbit/src/torrent_metainfo.rs index c348dbd..31e4529 100644 --- a/crates/librqbit/src/torrent_metainfo.rs +++ b/crates/librqbit/src/torrent_metainfo.rs @@ -1,4 +1,4 @@ -use std::{fs::File, ops::Deref, path::PathBuf}; +use std::{fmt::Write, fs::File, ops::Deref, path::PathBuf}; use serde::Deserialize; @@ -73,7 +73,42 @@ pub enum FileIteratorName<'a, ByteBuf> { Tree(&'a [ByteBuf]), } +impl<'a, ByteBuf> std::fmt::Debug for FileIteratorName<'a, ByteBuf> +where + ByteBuf: AsRef<[u8]>, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for (idx, item) in self.iter_components().enumerate() { + if idx > 0 { + f.write_char(std::path::MAIN_SEPARATOR)?; + } + match item { + Some(bit) => { + f.write_str(std::str::from_utf8(bit.as_ref()).unwrap_or(""))?; + } + None => f.write_str("output")?, + } + } + Ok(()) + } +} + impl<'a, ByteBuf> FileIteratorName<'a, ByteBuf> { + pub fn to_pathbuf(&self) -> anyhow::Result + where + ByteBuf: AsRef<[u8]>, + { + let mut buf = PathBuf::new(); + for part in self.iter_components() { + if let Some(part) = part { + buf.push(std::str::from_utf8(part.as_ref())?) + } else { + buf.push("output"); + break; + } + } + Ok(buf) + } pub fn iter_components(&self) -> impl Iterator> { let single_it = std::iter::once(match self { FileIteratorName::Single(n) => Some(*n), @@ -91,6 +126,12 @@ impl<'a, ByteBuf> FileIteratorName<'a, ByteBuf> { } impl> TorrentMetaV1Info { + pub fn get_hash(&self, piece: u32, hash: &sha1::Sha1) -> Option<&[u8]> { + let start = piece as usize * 20; + let end = start + 20; + let expected_hash = self.pieces.deref().get(start..end)?; + Some(expected_hash) + } pub fn compare_hash(&self, piece: u32, hash: &sha1::Sha1) -> Option { let start = piece as usize * 20; let end = start + 20; diff --git a/src/main.rs b/src/main.rs index b69ff23..3bfaecf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -51,11 +51,38 @@ struct Opts { /// The filename of the .torrent file. output_folder: String, + #[clap(short = 'r', long = "filename-re")] + only_files_matching_regex: Option, + /// Set if you are ok to write on top of existing files #[clap(long)] overwrite: bool, } +fn compute_only_files( + torrent: &TorrentMetaV1Owned, + filename_re: &str, +) -> anyhow::Result> { + let filename_re = regex::Regex::new(&filename_re).context("filename regex is incorrect")?; + let mut only_files = Vec::new(); + for (idx, (filename, _)) in torrent.info.iter_filenames_and_lengths().enumerate() { + let full_path = filename + .to_pathbuf() + .with_context(|| format!("filename of file {} is not valid utf8", idx))?; + if filename_re.is_match( + full_path + .to_str() + .ok_or_else(|| anyhow::anyhow!("filename of file {} is not valid utf8", idx))?, + ) { + only_files.push(idx); + } + } + if only_files.is_empty() { + anyhow::bail!("none of the filenames match the given regex") + } + Ok(only_files) +} + fn main() -> anyhow::Result<()> { pretty_env_logger::init(); @@ -79,10 +106,33 @@ fn main() -> anyhow::Result<()> { info!("Torrent metadata: {:#?}", &torrent); - let builder = - TorrentManagerBuilder::new(torrent, opts.output_folder).overwrite(opts.overwrite); + let only_files = if let Some(filename_re) = opts.only_files_matching_regex { + Some(compute_only_files(&torrent, &filename_re)?) + } else { + None + }; + + let mut builder = TorrentManagerBuilder::new(torrent, opts.output_folder); + builder.overwrite(opts.overwrite); + if let Some(only_files) = only_files { + builder.only_files(only_files); + } + let manager_handle = builder.start_manager().await?; manager_handle.wait_until_completed().await?; Ok(()) }) } + +#[cfg(test)] +mod tests { + use std::{fs::File, io::Read}; + + #[test] + fn test_bullshit() { + let mut buf = vec![0u8; 65536]; + let mut f = + File::open("/tmp/torrent-download/08.Comedy.Club.S17.WEB-DL.1080p.7turza.mkv").unwrap(); + f.read_exact(&mut buf[..]).unwrap(); + } +}