diff --git a/crates/librqbit/src/lengths.rs b/crates/librqbit/src/lengths.rs index 3e7c8dd..c0e9f69 100644 --- a/crates/librqbit/src/lengths.rs +++ b/crates/librqbit/src/lengths.rs @@ -113,6 +113,9 @@ impl Lengths { } self.piece_length } + pub const fn chunk_absolute_offset(&self, chunk_info: &ChunkInfo) -> u64 { + self.piece_offset(chunk_info.piece_index) + chunk_info.offset as u64 + } pub const fn piece_offset(&self, index: ValidPieceIndex) -> u64 { index.0 as u64 * self.piece_length as u64 } diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 1462297..517ef27 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -23,7 +23,7 @@ use crate::{ buffers::ByteString, chunk_tracker::ChunkTracker, clone_to_owned::CloneToOwned, - lengths::{Lengths, ValidPieceIndex}, + lengths::{ChunkInfo, Lengths, ValidPieceIndex}, peer_comms::{ Handshake, Message, MessageBorrowed, MessageDeserializeError, MessageOwned, Piece, Request, }, @@ -615,12 +615,13 @@ impl TorrentManager { fn check_piece_blocking( &self, who_sent: PeerHandle, - index: ValidPieceIndex, + piece_index: ValidPieceIndex, + last_received_chunk: &ChunkInfo, ) -> anyhow::Result { let mut h = sha1::Sha1::new(); - let piece_length = self.inner.lengths.piece_length(index); - let mut absolute_offset = self.inner.lengths.piece_offset(index); - let mut buf = vec![0; std::cmp::min(8192, piece_length as usize)]; + let piece_length = self.inner.lengths.piece_length(piece_index); + let mut absolute_offset = self.inner.lengths.piece_offset(piece_index); + let mut buf = vec![0u8; std::cmp::min(8192, piece_length as usize)]; let mut left_to_read = piece_length as usize; @@ -631,10 +632,13 @@ impl TorrentManager { } let file_remaining_len = file_len - absolute_offset; - let mut left_to_read_in_file = - std::cmp::min(file_remaining_len, left_to_read as u64) as usize; + let to_read_in_file = std::cmp::min(file_remaining_len, left_to_read as u64) as usize; + let mut left_to_read_in_file = to_read_in_file; let mut file_g = self.inner.files[file_idx].lock(); - trace!("piece={}, seeking to {}", index, absolute_offset); + debug!( + "piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}", + piece_index, who_sent, file_idx, absolute_offset, &last_received_chunk + ); file_g .seek(std::io::SeekFrom::Start(absolute_offset)) .with_context(|| { @@ -657,22 +661,7 @@ impl TorrentManager { left_to_read_in_file -= chunk_length; } - match self.inner.torrent.info.compare_hash(index.get(), &h) { - Some(true) => { - debug!("piece={} hash matches", index); - } - Some(false) => { - warn!("the piece={} hash does not match", index); - return Ok(false); - } - None => { - // this is probably a bug? - warn!("compare_hash() did not find the piece"); - anyhow::bail!("compare_hash() did not find the piece"); - } - } - - left_to_read -= left_to_read_in_file; + left_to_read -= to_read_in_file; if left_to_read == 0 { return Ok(true); @@ -680,7 +669,22 @@ impl TorrentManager { absolute_offset = 0; } - Ok(true) + + match self.inner.torrent.info.compare_hash(piece_index.get(), &h) { + Some(true) => { + debug!("piece={} hash matches", piece_index); + Ok(true) + } + Some(false) => { + warn!("the piece={} hash does not match", piece_index); + Ok(false) + } + None => { + // this is probably a bug? + warn!("compare_hash() did not find the piece"); + anyhow::bail!("compare_hash() did not find the piece"); + } + } } // TODO: this is a task per chunk, not good @@ -712,12 +716,11 @@ impl TorrentManager { fn write_chunk_blocking( &self, who_sent: PeerHandle, - chunk: &Piece, + data: &Piece, + chunk_info: &ChunkInfo, ) -> anyhow::Result<()> { - let mut absolute_offset = - self.inner.torrent.info.piece_length as u64 * chunk.index as u64 + chunk.begin as u64; - - let mut buf = chunk.block.as_ref(); + let mut buf = data.block.as_ref(); + let mut absolute_offset = self.inner.lengths.chunk_absolute_offset(&chunk_info); for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() { if absolute_offset > file_len { @@ -730,10 +733,15 @@ impl TorrentManager { let mut file_g = self.inner.files[file_idx].lock(); debug!( - "piece={}, handle={}, writing {} bytes to file {} at offset {}", - chunk.index, who_sent, to_write, file_idx, absolute_offset + "piece={}, chunk={}, handle={}, begin={}, file={}, writing {} bytes at {}", + chunk_info.piece_index, + chunk_info.chunk_index, + who_sent, + chunk_info.offset, + file_idx, + to_write, + absolute_offset ); - debug!("piece={}, seeking to {}", chunk.index, absolute_offset); file_g.seek(std::io::SeekFrom::Start(absolute_offset))?; file_g.write_all(&buf[..to_write])?; buf = &buf[to_write..]; @@ -782,10 +790,13 @@ impl TorrentManager { let this = self.clone(); spawn_blocking( - format!("write_and_check(piece={}, block={:?})", piece.index, &piece), + format!( + "write_and_check(piece={}, peer={}, block={:?})", + piece.index, handle, &piece + ), move || { let index = piece.index; - this.write_chunk_blocking(handle, &piece)?; + this.write_chunk_blocking(handle, &piece, &chunk_info)?; let piece_done = match this .inner @@ -825,7 +836,7 @@ impl TorrentManager { let clone = this.clone(); match clone - .check_piece_blocking(handle, chunk_info.piece_index) + .check_piece_blocking(handle, chunk_info.piece_index, &chunk_info) .with_context(|| format!("error checking piece={}", index))? { true => { diff --git a/src/main.rs b/src/main.rs index fea5d8a..8ef97a1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,11 +3,45 @@ use std::{fs::File, io::Read}; use anyhow::Context; use clap::Clap; use librqbit::{ - clone_to_owned::CloneToOwned, torrent_manager::TorrentManagerBuilder, - torrent_metainfo::torrent_from_bytes, + clone_to_owned::CloneToOwned, + torrent_manager::TorrentManagerBuilder, + torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Owned}, }; use log::info; +async fn torrent_from_url(url: &str) -> anyhow::Result { + let response = reqwest::get(url) + .await + .with_context(|| format!("error downloading torrent metadata from {}", url))?; + if !response.status().is_success() { + anyhow::bail!("GET {} returned {}", url, response.status()) + } + let b = response + .bytes() + .await + .with_context(|| format!("error reading repsonse body from {}", url))?; + Ok(torrent_from_bytes(&b) + .context("error decoding torrent")? + .clone_to_owned()) +} + +fn torrent_from_file(filename: &str) -> anyhow::Result { + let mut buf = Vec::new(); + if filename == "-" { + std::io::stdin() + .read_to_end(&mut buf) + .context("error reading stdin")?; + } else { + File::open(filename) + .with_context(|| format!("error opening {}", filename))? + .read_to_end(&mut buf) + .with_context(|| format!("error reading {}", filename))?; + } + Ok(torrent_from_bytes(&buf) + .context("error decoding torrent")? + .clone_to_owned()) +} + #[derive(Clap)] #[clap(version = "1.0", author = "Igor Katson ")] struct Opts { @@ -30,36 +64,9 @@ async fn main() -> anyhow::Result<()> { let torrent = if opts.torrent_path.starts_with("http://") || opts.torrent_path.starts_with("https://") { - let response = reqwest::get(&opts.torrent_path).await.with_context(|| { - format!( - "error downloading torrent metadata from {}", - &opts.torrent_path - ) - })?; - if !response.status().is_success() { - anyhow::bail!("GET {} returned {}", &opts.torrent_path, response.status()) - } - let b = response.bytes().await.with_context(|| { - format!("error reading repsonse body from {}", &opts.torrent_path) - })?; - torrent_from_bytes(&b) - .context("error decoding torrent")? - .clone_to_owned() + torrent_from_url(&opts.torrent_path).await? } else { - let mut buf = Vec::new(); - if opts.torrent_path == "-" { - std::io::stdin() - .read_to_end(&mut buf) - .context("error reading stdin")?; - } else { - File::open(&opts.torrent_path) - .with_context(|| format!("error opening {}", &opts.torrent_path))? - .read_to_end(&mut buf) - .with_context(|| format!("error reading {}", &opts.torrent_path))?; - } - torrent_from_bytes(&buf) - .context("error decoding torrent")? - .clone_to_owned() + torrent_from_file(&opts.torrent_path)? }; info!("Torrent metadata: {:#?}", &torrent);