Fixed bugs
This commit is contained in:
parent
87d6fe27ce
commit
0a640daba4
3 changed files with 88 additions and 67 deletions
|
|
@ -113,6 +113,9 @@ impl Lengths {
|
||||||
}
|
}
|
||||||
self.piece_length
|
self.piece_length
|
||||||
}
|
}
|
||||||
|
pub const fn chunk_absolute_offset(&self, chunk_info: &ChunkInfo) -> u64 {
|
||||||
|
self.piece_offset(chunk_info.piece_index) + chunk_info.offset as u64
|
||||||
|
}
|
||||||
pub const fn piece_offset(&self, index: ValidPieceIndex) -> u64 {
|
pub const fn piece_offset(&self, index: ValidPieceIndex) -> u64 {
|
||||||
index.0 as u64 * self.piece_length as u64
|
index.0 as u64 * self.piece_length as u64
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ use crate::{
|
||||||
buffers::ByteString,
|
buffers::ByteString,
|
||||||
chunk_tracker::ChunkTracker,
|
chunk_tracker::ChunkTracker,
|
||||||
clone_to_owned::CloneToOwned,
|
clone_to_owned::CloneToOwned,
|
||||||
lengths::{Lengths, ValidPieceIndex},
|
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
|
||||||
peer_comms::{
|
peer_comms::{
|
||||||
Handshake, Message, MessageBorrowed, MessageDeserializeError, MessageOwned, Piece, Request,
|
Handshake, Message, MessageBorrowed, MessageDeserializeError, MessageOwned, Piece, Request,
|
||||||
},
|
},
|
||||||
|
|
@ -615,12 +615,13 @@ impl TorrentManager {
|
||||||
fn check_piece_blocking(
|
fn check_piece_blocking(
|
||||||
&self,
|
&self,
|
||||||
who_sent: PeerHandle,
|
who_sent: PeerHandle,
|
||||||
index: ValidPieceIndex,
|
piece_index: ValidPieceIndex,
|
||||||
|
last_received_chunk: &ChunkInfo,
|
||||||
) -> anyhow::Result<bool> {
|
) -> anyhow::Result<bool> {
|
||||||
let mut h = sha1::Sha1::new();
|
let mut h = sha1::Sha1::new();
|
||||||
let piece_length = self.inner.lengths.piece_length(index);
|
let piece_length = self.inner.lengths.piece_length(piece_index);
|
||||||
let mut absolute_offset = self.inner.lengths.piece_offset(index);
|
let mut absolute_offset = self.inner.lengths.piece_offset(piece_index);
|
||||||
let mut buf = vec![0; std::cmp::min(8192, piece_length as usize)];
|
let mut buf = vec![0u8; std::cmp::min(8192, piece_length as usize)];
|
||||||
|
|
||||||
let mut left_to_read = piece_length as usize;
|
let mut left_to_read = piece_length as usize;
|
||||||
|
|
||||||
|
|
@ -631,10 +632,13 @@ impl TorrentManager {
|
||||||
}
|
}
|
||||||
let file_remaining_len = file_len - absolute_offset;
|
let file_remaining_len = file_len - absolute_offset;
|
||||||
|
|
||||||
let mut left_to_read_in_file =
|
let to_read_in_file = std::cmp::min(file_remaining_len, left_to_read as u64) as usize;
|
||||||
std::cmp::min(file_remaining_len, left_to_read as u64) as usize;
|
let mut left_to_read_in_file = to_read_in_file;
|
||||||
let mut file_g = self.inner.files[file_idx].lock();
|
let mut file_g = self.inner.files[file_idx].lock();
|
||||||
trace!("piece={}, seeking to {}", index, absolute_offset);
|
debug!(
|
||||||
|
"piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}",
|
||||||
|
piece_index, who_sent, file_idx, absolute_offset, &last_received_chunk
|
||||||
|
);
|
||||||
file_g
|
file_g
|
||||||
.seek(std::io::SeekFrom::Start(absolute_offset))
|
.seek(std::io::SeekFrom::Start(absolute_offset))
|
||||||
.with_context(|| {
|
.with_context(|| {
|
||||||
|
|
@ -657,22 +661,7 @@ impl TorrentManager {
|
||||||
left_to_read_in_file -= chunk_length;
|
left_to_read_in_file -= chunk_length;
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.inner.torrent.info.compare_hash(index.get(), &h) {
|
left_to_read -= to_read_in_file;
|
||||||
Some(true) => {
|
|
||||||
debug!("piece={} hash matches", index);
|
|
||||||
}
|
|
||||||
Some(false) => {
|
|
||||||
warn!("the piece={} hash does not match", index);
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
// this is probably a bug?
|
|
||||||
warn!("compare_hash() did not find the piece");
|
|
||||||
anyhow::bail!("compare_hash() did not find the piece");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
left_to_read -= left_to_read_in_file;
|
|
||||||
|
|
||||||
if left_to_read == 0 {
|
if left_to_read == 0 {
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
|
|
@ -680,7 +669,22 @@ impl TorrentManager {
|
||||||
|
|
||||||
absolute_offset = 0;
|
absolute_offset = 0;
|
||||||
}
|
}
|
||||||
Ok(true)
|
|
||||||
|
match self.inner.torrent.info.compare_hash(piece_index.get(), &h) {
|
||||||
|
Some(true) => {
|
||||||
|
debug!("piece={} hash matches", piece_index);
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
Some(false) => {
|
||||||
|
warn!("the piece={} hash does not match", piece_index);
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// this is probably a bug?
|
||||||
|
warn!("compare_hash() did not find the piece");
|
||||||
|
anyhow::bail!("compare_hash() did not find the piece");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: this is a task per chunk, not good
|
// TODO: this is a task per chunk, not good
|
||||||
|
|
@ -712,12 +716,11 @@ impl TorrentManager {
|
||||||
fn write_chunk_blocking(
|
fn write_chunk_blocking(
|
||||||
&self,
|
&self,
|
||||||
who_sent: PeerHandle,
|
who_sent: PeerHandle,
|
||||||
chunk: &Piece<ByteString>,
|
data: &Piece<ByteString>,
|
||||||
|
chunk_info: &ChunkInfo,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut absolute_offset =
|
let mut buf = data.block.as_ref();
|
||||||
self.inner.torrent.info.piece_length as u64 * chunk.index as u64 + chunk.begin as u64;
|
let mut absolute_offset = self.inner.lengths.chunk_absolute_offset(&chunk_info);
|
||||||
|
|
||||||
let mut buf = chunk.block.as_ref();
|
|
||||||
|
|
||||||
for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() {
|
for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() {
|
||||||
if absolute_offset > file_len {
|
if absolute_offset > file_len {
|
||||||
|
|
@ -730,10 +733,15 @@ impl TorrentManager {
|
||||||
|
|
||||||
let mut file_g = self.inner.files[file_idx].lock();
|
let mut file_g = self.inner.files[file_idx].lock();
|
||||||
debug!(
|
debug!(
|
||||||
"piece={}, handle={}, writing {} bytes to file {} at offset {}",
|
"piece={}, chunk={}, handle={}, begin={}, file={}, writing {} bytes at {}",
|
||||||
chunk.index, who_sent, to_write, file_idx, absolute_offset
|
chunk_info.piece_index,
|
||||||
|
chunk_info.chunk_index,
|
||||||
|
who_sent,
|
||||||
|
chunk_info.offset,
|
||||||
|
file_idx,
|
||||||
|
to_write,
|
||||||
|
absolute_offset
|
||||||
);
|
);
|
||||||
debug!("piece={}, seeking to {}", chunk.index, absolute_offset);
|
|
||||||
file_g.seek(std::io::SeekFrom::Start(absolute_offset))?;
|
file_g.seek(std::io::SeekFrom::Start(absolute_offset))?;
|
||||||
file_g.write_all(&buf[..to_write])?;
|
file_g.write_all(&buf[..to_write])?;
|
||||||
buf = &buf[to_write..];
|
buf = &buf[to_write..];
|
||||||
|
|
@ -782,10 +790,13 @@ impl TorrentManager {
|
||||||
|
|
||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
spawn_blocking(
|
spawn_blocking(
|
||||||
format!("write_and_check(piece={}, block={:?})", piece.index, &piece),
|
format!(
|
||||||
|
"write_and_check(piece={}, peer={}, block={:?})",
|
||||||
|
piece.index, handle, &piece
|
||||||
|
),
|
||||||
move || {
|
move || {
|
||||||
let index = piece.index;
|
let index = piece.index;
|
||||||
this.write_chunk_blocking(handle, &piece)?;
|
this.write_chunk_blocking(handle, &piece, &chunk_info)?;
|
||||||
|
|
||||||
let piece_done = match this
|
let piece_done = match this
|
||||||
.inner
|
.inner
|
||||||
|
|
@ -825,7 +836,7 @@ impl TorrentManager {
|
||||||
|
|
||||||
let clone = this.clone();
|
let clone = this.clone();
|
||||||
match clone
|
match clone
|
||||||
.check_piece_blocking(handle, chunk_info.piece_index)
|
.check_piece_blocking(handle, chunk_info.piece_index, &chunk_info)
|
||||||
.with_context(|| format!("error checking piece={}", index))?
|
.with_context(|| format!("error checking piece={}", index))?
|
||||||
{
|
{
|
||||||
true => {
|
true => {
|
||||||
|
|
|
||||||
69
src/main.rs
69
src/main.rs
|
|
@ -3,11 +3,45 @@ use std::{fs::File, io::Read};
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use clap::Clap;
|
use clap::Clap;
|
||||||
use librqbit::{
|
use librqbit::{
|
||||||
clone_to_owned::CloneToOwned, torrent_manager::TorrentManagerBuilder,
|
clone_to_owned::CloneToOwned,
|
||||||
torrent_metainfo::torrent_from_bytes,
|
torrent_manager::TorrentManagerBuilder,
|
||||||
|
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Owned},
|
||||||
};
|
};
|
||||||
use log::info;
|
use log::info;
|
||||||
|
|
||||||
|
async fn torrent_from_url(url: &str) -> anyhow::Result<TorrentMetaV1Owned> {
|
||||||
|
let response = reqwest::get(url)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("error downloading torrent metadata from {}", url))?;
|
||||||
|
if !response.status().is_success() {
|
||||||
|
anyhow::bail!("GET {} returned {}", url, response.status())
|
||||||
|
}
|
||||||
|
let b = response
|
||||||
|
.bytes()
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("error reading repsonse body from {}", url))?;
|
||||||
|
Ok(torrent_from_bytes(&b)
|
||||||
|
.context("error decoding torrent")?
|
||||||
|
.clone_to_owned())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn torrent_from_file(filename: &str) -> anyhow::Result<TorrentMetaV1Owned> {
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
if filename == "-" {
|
||||||
|
std::io::stdin()
|
||||||
|
.read_to_end(&mut buf)
|
||||||
|
.context("error reading stdin")?;
|
||||||
|
} else {
|
||||||
|
File::open(filename)
|
||||||
|
.with_context(|| format!("error opening {}", filename))?
|
||||||
|
.read_to_end(&mut buf)
|
||||||
|
.with_context(|| format!("error reading {}", filename))?;
|
||||||
|
}
|
||||||
|
Ok(torrent_from_bytes(&buf)
|
||||||
|
.context("error decoding torrent")?
|
||||||
|
.clone_to_owned())
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clap)]
|
#[derive(Clap)]
|
||||||
#[clap(version = "1.0", author = "Igor Katson <igor.katson@gmail.com>")]
|
#[clap(version = "1.0", author = "Igor Katson <igor.katson@gmail.com>")]
|
||||||
struct Opts {
|
struct Opts {
|
||||||
|
|
@ -30,36 +64,9 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let torrent =
|
let torrent =
|
||||||
if opts.torrent_path.starts_with("http://") || opts.torrent_path.starts_with("https://") {
|
if opts.torrent_path.starts_with("http://") || opts.torrent_path.starts_with("https://") {
|
||||||
let response = reqwest::get(&opts.torrent_path).await.with_context(|| {
|
torrent_from_url(&opts.torrent_path).await?
|
||||||
format!(
|
|
||||||
"error downloading torrent metadata from {}",
|
|
||||||
&opts.torrent_path
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
if !response.status().is_success() {
|
|
||||||
anyhow::bail!("GET {} returned {}", &opts.torrent_path, response.status())
|
|
||||||
}
|
|
||||||
let b = response.bytes().await.with_context(|| {
|
|
||||||
format!("error reading repsonse body from {}", &opts.torrent_path)
|
|
||||||
})?;
|
|
||||||
torrent_from_bytes(&b)
|
|
||||||
.context("error decoding torrent")?
|
|
||||||
.clone_to_owned()
|
|
||||||
} else {
|
} else {
|
||||||
let mut buf = Vec::new();
|
torrent_from_file(&opts.torrent_path)?
|
||||||
if opts.torrent_path == "-" {
|
|
||||||
std::io::stdin()
|
|
||||||
.read_to_end(&mut buf)
|
|
||||||
.context("error reading stdin")?;
|
|
||||||
} else {
|
|
||||||
File::open(&opts.torrent_path)
|
|
||||||
.with_context(|| format!("error opening {}", &opts.torrent_path))?
|
|
||||||
.read_to_end(&mut buf)
|
|
||||||
.with_context(|| format!("error reading {}", &opts.torrent_path))?;
|
|
||||||
}
|
|
||||||
torrent_from_bytes(&buf)
|
|
||||||
.context("error decoding torrent")?
|
|
||||||
.clone_to_owned()
|
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Torrent metadata: {:#?}", &torrent);
|
info!("Torrent metadata: {:#?}", &torrent);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue