Bugs fixed

This commit is contained in:
Igor Katson 2021-06-26 17:29:59 +01:00
parent d546dfd1e6
commit 7ed532ae52
5 changed files with 54 additions and 43 deletions

View file

@ -14,13 +14,19 @@ pub struct ChunkTracker {
}
fn compute_chunk_status(lengths: &Lengths, needed_pieces: &BF) -> BF {
let required_bits = lengths.total_chunks();
let required_size = (required_bits as usize + 1) / 8;
let required_size = lengths.chunk_bitfield_bytes();
let vec = vec![0u8; required_size];
let mut chunk_bf = BF::from_vec(vec);
for bit in needed_pieces.iter_zeros() {
let offset = bit * 8;
for i in 0..8 {
for piece_index in needed_pieces
.get(0..lengths.total_pieces() as usize)
.unwrap()
.iter_zeros()
{
let offset = piece_index * lengths.default_chunks_per_piece() as usize;
let chunks_per_piece = lengths
.chunks_per_piece(lengths.validate_piece_index(piece_index as u32).unwrap())
as usize;
for i in 0..chunks_per_piece {
chunk_bf.set(offset + i, true);
}
}

View file

@ -92,6 +92,9 @@ impl Lengths {
pub const fn piece_bitfield_bytes(&self) -> usize {
ceil_div_u64(self.total_pieces() as u64, 8) as usize
}
pub const fn chunk_bitfield_bytes(&self) -> usize {
ceil_div_u64(self.total_chunks() as u64, 8) as usize
}
pub const fn total_length(&self) -> u64 {
self.total_length
}
@ -107,6 +110,9 @@ impl Lengths {
pub const fn default_chunk_length(&self) -> u32 {
self.chunk_length
}
pub const fn default_chunks_per_piece(&self) -> u32 {
self.chunks_per_piece
}
pub const fn total_chunks(&self) -> u32 {
ceil_div_u64(self.total_length, self.chunk_length as u64) as u32
}

View file

@ -68,10 +68,11 @@ where
}
}
pub fn serialize(&self, buf: &mut [u8]) -> usize {
pub fn serialize(&self, mut buf: &mut [u8]) -> usize {
byteorder::BigEndian::write_u32(&mut buf[0..4], self.index);
byteorder::BigEndian::write_u32(&mut buf[4..8], self.begin);
(&mut buf[8..8 + self.block.as_ref().len()]).copy_from_slice(self.block.as_ref());
buf = &mut buf[8..];
buf.copy_from_slice(self.block.as_ref());
self.block.as_ref().len() + 8
}
pub fn deserialize<'a>(buf: &'a [u8]) -> Piece<ByteBuf>
@ -235,12 +236,17 @@ where
Message::Bitfield(_) => todo!(),
Message::Choke | Message::Unchoke | Message::Interested => PREAMBLE_LEN,
Message::Piece(p) => {
let msg_len = PREAMBLE_LEN + 8 + p.block.as_ref().len();
let payload_len = 8 + p.block.as_ref().len();
let msg_len = PREAMBLE_LEN + payload_len;
out.resize(msg_len, 0);
p.serialize(&mut out[PREAMBLE_LEN..(8 + p.block.as_ref().len())]);
let tmp = &mut out[PREAMBLE_LEN..];
p.serialize(&mut tmp[..payload_len]);
msg_len
}
Message::KeepAlive => 4,
Message::KeepAlive => {
// the len prefix was already written out to buf
4
}
Message::Have(v) => {
let msg_len = PREAMBLE_LEN + 4;
out.resize(msg_len, 0);

View file

@ -2,8 +2,7 @@ use std::{
collections::{HashMap, HashSet},
fmt::Display,
fs::{File, OpenOptions},
future::Future,
io::{self, Read, Seek, SeekFrom, Write},
io::{Read, Seek, SeekFrom, Write},
net::SocketAddr,
path::{Path, PathBuf},
sync::{
@ -14,14 +13,11 @@ use std::{
};
use anyhow::Context;
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use futures::{stream::FuturesUnordered, StreamExt};
use log::{debug, error, info, trace, warn};
use parking_lot::{Mutex, RwLock};
use reqwest::Url;
use tokio::{
sync::{mpsc::Sender, Notify, Semaphore},
task::JoinHandle,
};
use tokio::sync::{mpsc::Sender, Notify, Semaphore};
use crate::{
buffers::ByteString,
@ -171,16 +167,6 @@ impl PeerStates {
_ => return None,
}
}
fn mark_peer_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option<bool> {
match self.states.get_mut(&handle) {
Some(PeerState::Live(live)) => {
let prev = live.peer_choked;
live.peer_choked = is_choked;
return Some(prev);
}
_ => return None,
}
}
fn update_bitfield_from_vec(
&mut self,
handle: PeerHandle,
@ -196,9 +182,6 @@ impl PeerStates {
_ => None,
}
}
fn get_tx(&self, handle: PeerHandle) -> Option<&Sender<MessageOwned>> {
self.tx.get(&handle).map(|v| v.as_ref())
}
fn clone_tx(&self, handle: PeerHandle) -> Option<Arc<Sender<MessageOwned>>> {
Some(self.tx.get(&handle)?.clone())
}
@ -256,7 +239,7 @@ fn spawn<N: Display + 'static + Send>(
fn spawn_blocking<T: Send + Sync + 'static, N: Display + 'static + Send>(
name: N,
f: impl FnOnce() -> anyhow::Result<T> + Send + 'static,
) -> impl Future<Output = anyhow::Result<T>> {
) -> tokio::task::JoinHandle<anyhow::Result<T>> {
debug!("starting blocking task \"{}\"", name);
tokio::task::spawn_blocking(move || match f() {
Ok(v) => {
@ -268,7 +251,6 @@ fn spawn_blocking<T: Send + Sync + 'static, N: Display + 'static + Send>(
Err(e)
}
})
.map(|j| j.unwrap())
}
fn make_lengths(torrent: &TorrentMetaV1Owned) -> anyhow::Result<Lengths> {
@ -446,6 +428,10 @@ fn compute_needed_pieces(
}
if !at_least_one_file_required {
trace!(
"piece {} is not required by any of the requested files, ignoring",
piece_info.piece_index
);
continue;
}
@ -486,7 +472,7 @@ impl TorrentManager {
overwrite: bool,
only_files: Option<Vec<usize>>,
) -> anyhow::Result<TorrentManagerHandle> {
let mut files = {
let files = {
let mut files =
Vec::<Arc<Mutex<File>>>::with_capacity(torrent.info.iter_file_lengths().count());
@ -681,7 +667,7 @@ impl TorrentManager {
),
move || clone.read_chunk_blocking(peer_handle, chunk_info),
)
.await?;
.await??;
let tx = this
.inner
.locked
@ -708,7 +694,6 @@ impl TorrentManager {
who_sent: PeerHandle,
chunk_info: ChunkInfo,
) -> anyhow::Result<Vec<u8>> {
let mut h = sha1::Sha1::new();
let mut absolute_offset = self.inner.lengths.chunk_absolute_offset(&chunk_info);
let mut result_buf = vec![0u8; chunk_info.size as usize];
let mut buf = &mut result_buf[..];
@ -842,8 +827,12 @@ impl TorrentManager {
Some(l) => l.have_notify.clone(),
None => return Ok(()),
};
// TODO: this might dangle
tokio::time::timeout(Duration::from_secs(60), notify.notified()).await;
// TODO: this might dangle, same below.
#[allow(unused_must_use)]
{
tokio::time::timeout(Duration::from_secs(60), notify.notified()).await;
}
loop {
let next = match self.reserve_next_needed_piece(handle) {
@ -854,8 +843,11 @@ impl TorrentManager {
Some(l) => l.have_notify.clone(),
None => return Ok(()),
};
// TODO: this might dangle
tokio::time::timeout(Duration::from_secs(60), notify.notified()).await;
#[allow(unused_must_use)]
{
tokio::time::timeout(Duration::from_secs(60), notify.notified()).await;
}
continue;
}
};
@ -1106,6 +1098,7 @@ impl TorrentManager {
}
let this = self.clone();
spawn_blocking(
format!(
"write_and_check(piece={}, peer={}, block={:?})",
@ -1264,7 +1257,7 @@ impl TorrentManager {
};
}
}
fn set_peer_live(&self, handle: PeerHandle, addr: SocketAddr, h: Handshake) {
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
let mut g = self.inner.locked.write();
match g.peers.states.get_mut(&handle) {
Some(s @ &mut PeerState::Connecting(_)) => {
@ -1315,7 +1308,7 @@ impl TorrentManager {
anyhow::bail!("info hash does not match");
}
self.set_peer_live(handle, addr, h);
self.set_peer_live(handle, h);
if read_bytes > hlen {
read_buf.copy_within(hlen..read_bytes, 0);

View file

@ -1,4 +1,4 @@
use std::{fmt::Write, fs::File, ops::Deref, path::PathBuf};
use std::{fmt::Write, ops::Deref, path::PathBuf};
use serde::Deserialize;
@ -126,7 +126,7 @@ impl<'a, ByteBuf> FileIteratorName<'a, ByteBuf> {
}
impl<BufType: Clone + Deref<Target = [u8]>> TorrentMetaV1Info<BufType> {
pub fn get_hash(&self, piece: u32, hash: &sha1::Sha1) -> Option<&[u8]> {
pub fn get_hash(&self, piece: u32) -> Option<&[u8]> {
let start = piece as usize * 20;
let end = start + 20;
let expected_hash = self.pieces.deref().get(start..end)?;