From 7ed532ae52552a7a68fd2f6518ace27f8dad83a4 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 26 Jun 2021 17:29:59 +0100 Subject: [PATCH] Bugs fixed --- crates/librqbit/src/chunk_tracker.rs | 16 ++++--- crates/librqbit/src/lengths.rs | 6 +++ crates/librqbit/src/peer_comms.rs | 16 ++++--- crates/librqbit/src/torrent_manager.rs | 55 +++++++++++-------------- crates/librqbit/src/torrent_metainfo.rs | 4 +- 5 files changed, 54 insertions(+), 43 deletions(-) diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index d13fc47..fc682ce 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -14,13 +14,19 @@ pub struct ChunkTracker { } fn compute_chunk_status(lengths: &Lengths, needed_pieces: &BF) -> BF { - let required_bits = lengths.total_chunks(); - let required_size = (required_bits as usize + 1) / 8; + let required_size = lengths.chunk_bitfield_bytes(); let vec = vec![0u8; required_size]; let mut chunk_bf = BF::from_vec(vec); - for bit in needed_pieces.iter_zeros() { - let offset = bit * 8; - for i in 0..8 { + for piece_index in needed_pieces + .get(0..lengths.total_pieces() as usize) + .unwrap() + .iter_zeros() + { + let offset = piece_index * lengths.default_chunks_per_piece() as usize; + let chunks_per_piece = lengths + .chunks_per_piece(lengths.validate_piece_index(piece_index as u32).unwrap()) + as usize; + for i in 0..chunks_per_piece { chunk_bf.set(offset + i, true); } } diff --git a/crates/librqbit/src/lengths.rs b/crates/librqbit/src/lengths.rs index 93f7294..b3967c1 100644 --- a/crates/librqbit/src/lengths.rs +++ b/crates/librqbit/src/lengths.rs @@ -92,6 +92,9 @@ impl Lengths { pub const fn piece_bitfield_bytes(&self) -> usize { ceil_div_u64(self.total_pieces() as u64, 8) as usize } + pub const fn chunk_bitfield_bytes(&self) -> usize { + ceil_div_u64(self.total_chunks() as u64, 8) as usize + } pub const fn total_length(&self) -> u64 { self.total_length } @@ -107,6 +110,9 @@ impl Lengths { pub const fn default_chunk_length(&self) -> u32 { self.chunk_length } + pub const fn default_chunks_per_piece(&self) -> u32 { + self.chunks_per_piece + } pub const fn total_chunks(&self) -> u32 { ceil_div_u64(self.total_length, self.chunk_length as u64) as u32 } diff --git a/crates/librqbit/src/peer_comms.rs b/crates/librqbit/src/peer_comms.rs index 51ce816..aab31ee 100644 --- a/crates/librqbit/src/peer_comms.rs +++ b/crates/librqbit/src/peer_comms.rs @@ -68,10 +68,11 @@ where } } - pub fn serialize(&self, buf: &mut [u8]) -> usize { + pub fn serialize(&self, mut buf: &mut [u8]) -> usize { byteorder::BigEndian::write_u32(&mut buf[0..4], self.index); byteorder::BigEndian::write_u32(&mut buf[4..8], self.begin); - (&mut buf[8..8 + self.block.as_ref().len()]).copy_from_slice(self.block.as_ref()); + buf = &mut buf[8..]; + buf.copy_from_slice(self.block.as_ref()); self.block.as_ref().len() + 8 } pub fn deserialize<'a>(buf: &'a [u8]) -> Piece @@ -235,12 +236,17 @@ where Message::Bitfield(_) => todo!(), Message::Choke | Message::Unchoke | Message::Interested => PREAMBLE_LEN, Message::Piece(p) => { - let msg_len = PREAMBLE_LEN + 8 + p.block.as_ref().len(); + let payload_len = 8 + p.block.as_ref().len(); + let msg_len = PREAMBLE_LEN + payload_len; out.resize(msg_len, 0); - p.serialize(&mut out[PREAMBLE_LEN..(8 + p.block.as_ref().len())]); + let tmp = &mut out[PREAMBLE_LEN..]; + p.serialize(&mut tmp[..payload_len]); msg_len } - Message::KeepAlive => 4, + Message::KeepAlive => { + // the len prefix was already written out to buf + 4 + } Message::Have(v) => { let msg_len = PREAMBLE_LEN + 4; out.resize(msg_len, 0); diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 0469dd3..942d8fe 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -2,8 +2,7 @@ use std::{ collections::{HashMap, HashSet}, fmt::Display, fs::{File, OpenOptions}, - future::Future, - io::{self, Read, Seek, SeekFrom, Write}, + io::{Read, Seek, SeekFrom, Write}, net::SocketAddr, path::{Path, PathBuf}, sync::{ @@ -14,14 +13,11 @@ use std::{ }; use anyhow::Context; -use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{stream::FuturesUnordered, StreamExt}; use log::{debug, error, info, trace, warn}; use parking_lot::{Mutex, RwLock}; use reqwest::Url; -use tokio::{ - sync::{mpsc::Sender, Notify, Semaphore}, - task::JoinHandle, -}; +use tokio::sync::{mpsc::Sender, Notify, Semaphore}; use crate::{ buffers::ByteString, @@ -171,16 +167,6 @@ impl PeerStates { _ => return None, } } - fn mark_peer_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option { - match self.states.get_mut(&handle) { - Some(PeerState::Live(live)) => { - let prev = live.peer_choked; - live.peer_choked = is_choked; - return Some(prev); - } - _ => return None, - } - } fn update_bitfield_from_vec( &mut self, handle: PeerHandle, @@ -196,9 +182,6 @@ impl PeerStates { _ => None, } } - fn get_tx(&self, handle: PeerHandle) -> Option<&Sender> { - self.tx.get(&handle).map(|v| v.as_ref()) - } fn clone_tx(&self, handle: PeerHandle) -> Option>> { Some(self.tx.get(&handle)?.clone()) } @@ -256,7 +239,7 @@ fn spawn( fn spawn_blocking( name: N, f: impl FnOnce() -> anyhow::Result + Send + 'static, -) -> impl Future> { +) -> tokio::task::JoinHandle> { debug!("starting blocking task \"{}\"", name); tokio::task::spawn_blocking(move || match f() { Ok(v) => { @@ -268,7 +251,6 @@ fn spawn_blocking( Err(e) } }) - .map(|j| j.unwrap()) } fn make_lengths(torrent: &TorrentMetaV1Owned) -> anyhow::Result { @@ -446,6 +428,10 @@ fn compute_needed_pieces( } if !at_least_one_file_required { + trace!( + "piece {} is not required by any of the requested files, ignoring", + piece_info.piece_index + ); continue; } @@ -486,7 +472,7 @@ impl TorrentManager { overwrite: bool, only_files: Option>, ) -> anyhow::Result { - let mut files = { + let files = { let mut files = Vec::>>::with_capacity(torrent.info.iter_file_lengths().count()); @@ -681,7 +667,7 @@ impl TorrentManager { ), move || clone.read_chunk_blocking(peer_handle, chunk_info), ) - .await?; + .await??; let tx = this .inner .locked @@ -708,7 +694,6 @@ impl TorrentManager { who_sent: PeerHandle, chunk_info: ChunkInfo, ) -> anyhow::Result> { - let mut h = sha1::Sha1::new(); let mut absolute_offset = self.inner.lengths.chunk_absolute_offset(&chunk_info); let mut result_buf = vec![0u8; chunk_info.size as usize]; let mut buf = &mut result_buf[..]; @@ -842,8 +827,12 @@ impl TorrentManager { Some(l) => l.have_notify.clone(), None => return Ok(()), }; - // TODO: this might dangle - tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; + + // TODO: this might dangle, same below. + #[allow(unused_must_use)] + { + tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; + } loop { let next = match self.reserve_next_needed_piece(handle) { @@ -854,8 +843,11 @@ impl TorrentManager { Some(l) => l.have_notify.clone(), None => return Ok(()), }; - // TODO: this might dangle - tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; + + #[allow(unused_must_use)] + { + tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; + } continue; } }; @@ -1106,6 +1098,7 @@ impl TorrentManager { } let this = self.clone(); + spawn_blocking( format!( "write_and_check(piece={}, peer={}, block={:?})", @@ -1264,7 +1257,7 @@ impl TorrentManager { }; } } - fn set_peer_live(&self, handle: PeerHandle, addr: SocketAddr, h: Handshake) { + fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { let mut g = self.inner.locked.write(); match g.peers.states.get_mut(&handle) { Some(s @ &mut PeerState::Connecting(_)) => { @@ -1315,7 +1308,7 @@ impl TorrentManager { anyhow::bail!("info hash does not match"); } - self.set_peer_live(handle, addr, h); + self.set_peer_live(handle, h); if read_bytes > hlen { read_buf.copy_within(hlen..read_bytes, 0); diff --git a/crates/librqbit/src/torrent_metainfo.rs b/crates/librqbit/src/torrent_metainfo.rs index 31e4529..f0f1d7d 100644 --- a/crates/librqbit/src/torrent_metainfo.rs +++ b/crates/librqbit/src/torrent_metainfo.rs @@ -1,4 +1,4 @@ -use std::{fmt::Write, fs::File, ops::Deref, path::PathBuf}; +use std::{fmt::Write, ops::Deref, path::PathBuf}; use serde::Deserialize; @@ -126,7 +126,7 @@ impl<'a, ByteBuf> FileIteratorName<'a, ByteBuf> { } impl> TorrentMetaV1Info { - pub fn get_hash(&self, piece: u32, hash: &sha1::Sha1) -> Option<&[u8]> { + pub fn get_hash(&self, piece: u32) -> Option<&[u8]> { let start = piece as usize * 20; let end = start + 20; let expected_hash = self.pieces.deref().get(start..end)?;