diff --git a/Cargo.lock b/Cargo.lock index af9f801..180adc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -515,6 +515,7 @@ dependencies = [ "futures", "log", "parking_lot", + "rand", "reqwest", "serde", "sha1", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 8c0abb9..6c42f09 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -19,6 +19,7 @@ bitvec = "0.22" parking_lot = "0.11" log = "0.4" size_format = "1" +rand = "0.8" uuid = {version = "0.8", features = ["v4"]} futures = "0.3" diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 6186ef4..fa520d4 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -2,7 +2,7 @@ use log::{debug, info}; use crate::{ buffers::ByteString, - lengths::{Lengths, ValidPieceIndex}, + lengths::{ChunkInfo, Lengths, ValidPieceIndex}, peer_comms::Piece, type_aliases::BF, }; @@ -50,6 +50,12 @@ fn compute_chunk_status(lengths: &Lengths, needed_pieces: &BF) -> BF { chunk_bf } +pub enum ChunkMarkingResult { + PreviouslyCompleted, + NotCompleted, + Completed, +} + impl ChunkTracker { pub fn new(needed_pieces: BF, have_pieces: BF, lengths: Lengths) -> Self { Self { @@ -84,19 +90,33 @@ impl ChunkTracker { self.have.set(idx.get() as usize, true) } - // return true if the whole piece is marked downloaded - pub fn mark_chunk_downloaded(&mut self, piece: &Piece) -> Option { - let chunk_info = self.lengths.chunk_info_from_received_piece(piece)?; - self.chunk_status - .set(chunk_info.absolute_index as usize, true); - let chunk_range = self.lengths.chunk_range(chunk_info.piece_index); - let chunk_range = self.chunk_status.get(chunk_range).unwrap(); - let all = chunk_range.all(); + pub fn is_chunk_downloaded(&self, chunk: &ChunkInfo) -> bool { + *self + .chunk_status + .get(chunk.absolute_index as usize) + .unwrap() + } + // return true if the whole piece is marked downloaded + pub fn mark_chunk_downloaded( + &mut self, + piece: &Piece, + ) -> Option { + let chunk_info = self.lengths.chunk_info_from_received_piece(piece)?; + let chunk_range = self.lengths.chunk_range(chunk_info.piece_index); + let chunk_range = self.chunk_status.get_mut(chunk_range).unwrap(); + if chunk_range.all() { + return Some(ChunkMarkingResult::PreviouslyCompleted); + } + chunk_range.set(chunk_info.chunk_index as usize, true); debug!( "piece={}, chunk_info={:?}, bits={:?}", piece.index, chunk_info, chunk_range, ); - Some(all) + + if chunk_range.all() { + return Some(ChunkMarkingResult::Completed); + } + return Some(ChunkMarkingResult::NotCompleted); } } diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 6ad0b82..ea237df 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -18,11 +18,14 @@ use log::{debug, error, info, trace, warn}; use parking_lot::{Mutex, RwLock}; use reqwest::Url; use size_format::SizeFormatterBinary as SF; -use tokio::sync::{mpsc::Sender, Notify, Semaphore}; +use tokio::{ + sync::{mpsc::Sender, Notify, Semaphore}, + time::timeout, +}; use crate::{ buffers::{ByteBuf, ByteString}, - chunk_tracker::ChunkTracker, + chunk_tracker::{ChunkMarkingResult, ChunkTracker}, clone_to_owned::CloneToOwned, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, peer_comms::{ @@ -112,6 +115,7 @@ struct LivePeerState { struct PeerStates { states: HashMap, seen_peers: HashSet, + requested_pieces: HashSet, tx: HashMap>>, } @@ -789,6 +793,19 @@ impl TorrentManager { }) } + fn try_steal_piece(&self) -> Option { + let mut rng = rand::thread_rng(); + use rand::seq::IteratorRandom; + self.inner + .locked + .read() + .peers + .requested_pieces + .iter() + .choose(&mut rng) + .copied() + } + async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> { let notify = match self.inner.locked.read().peers.get_live(handle) { Some(l) => l.have_notify.clone(), @@ -798,25 +815,43 @@ impl TorrentManager { // TODO: this might dangle, same below. #[allow(unused_must_use)] { - tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; + timeout(Duration::from_secs(60), notify.notified()).await; } loop { - let next = match self.reserve_next_needed_piece(handle) { - Some(next) => next, - None => { - info!("no pieces to request from {}", handle); - let notify = match self.inner.locked.read().peers.get_live(handle) { - Some(l) => l.have_notify.clone(), - None => return Ok(()), - }; - + match self.am_i_choked(handle) { + Some(true) => { + warn!("we are choked by {}, can't reserve next piece", handle); #[allow(unused_must_use)] { - tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; + timeout(Duration::from_secs(60), notify.notified()).await; } continue; } + Some(false) => {} + None => return Ok(()), + } + + let (next, is_stolen) = match self.reserve_next_needed_piece(handle) { + Some(next) => (next, false), + None => { + if self.get_left_to_download() == 0 { + info!("{}: nothing left to download, closing requester", handle); + return Ok(()); + } + + if let Some(piece) = self.try_steal_piece() { + info!("{}: stole a piece {}", handle, piece); + (piece, true) + } else { + info!("no pieces to request from {}", handle); + #[allow(unused_must_use)] + { + timeout(Duration::from_secs(60), notify.notified()).await; + } + continue; + } + } }; let tx = match self.inner.locked.read().peers.clone_tx(handle) { Some(tx) => tx, @@ -827,6 +862,9 @@ impl TorrentManager { None => return Ok(()), }; for chunk in self.inner.lengths.iter_chunk_infos(next) { + if is_stolen && self.inner.locked.read().chunks.is_chunk_downloaded(&chunk) { + continue; + } let request = Request { index: next.get(), begin: chunk.offset, @@ -880,6 +918,7 @@ impl TorrentManager { break; } } + self.inner.lengths.validate_piece_index(n_opt? as u32)? }; @@ -887,6 +926,7 @@ impl TorrentManager { .get_live_mut(peer_handle)? .requested_pieces .insert(n); + g.peers.requested_pieces.insert(n); g.chunks.reserve_needed_piece(n); Some(n) } @@ -1057,9 +1097,14 @@ impl TorrentManager { .fetch_add(piece.block.len() as u64, Ordering::Relaxed); if !h.requested_pieces.contains(&chunk_info.piece_index) { + // TODO: this is wrong, we need to distinguish between these cases. warn!( "peer {} sent us a piece that we did not ask for, dropping it. Requested pieces: {:?}. Got: {:?}", handle, &h.requested_pieces, &piece, ); + + // this prevents a deadlock. + drop(g); + self.drop_peer(handle); return None; } @@ -1075,21 +1120,24 @@ impl TorrentManager { let index = piece.index; this.write_chunk_blocking(handle, &piece, &chunk_info)?; - let piece_done = match this + match this .inner .locked .write() .chunks .mark_chunk_downloaded(&piece) { - Some(true) => { + Some(ChunkMarkingResult::Completed) => { + debug!("piece={} done by {}, will checksum", piece.index, handle); + } + Some(ChunkMarkingResult::PreviouslyCompleted) => { debug!( - "piece={} done, requesting a piece from {}", + "piece={} was done by someone else {}, ignoring", piece.index, handle ); - true + return Ok(()); } - Some(false) => false, + Some(ChunkMarkingResult::NotCompleted) => return Ok(()), None => { warn!( "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", @@ -1100,16 +1148,14 @@ impl TorrentManager { } }; - if !piece_done { - return Ok(()); - } // Ignore responses about this piece from now on. - this.inner - .locked - .write() - .peers - .get_live_mut(handle) - .map(|l| l.requested_pieces.remove(&chunk_info.piece_index)); + { + let mut g = this.inner.locked.write(); + g.peers + .get_live_mut(handle) + .map(|l| l.requested_pieces.remove(&chunk_info.piece_index)); + g.peers.requested_pieces.remove(&chunk_info.piece_index); + } let clone = this.clone(); match clone @@ -1309,14 +1355,13 @@ impl TorrentManager { } loop { - let msg = - match tokio::time::timeout(keep_alive_interval, outgoing_chan.recv()).await { - Ok(Some(msg)) => msg, - Ok(None) => { - anyhow::bail!("closing writer, channel closed") - } - Err(_) => MessageOwned::KeepAlive, - }; + let msg = match timeout(keep_alive_interval, outgoing_chan.recv()).await { + Ok(Some(msg)) => msg, + Ok(None) => { + anyhow::bail!("closing writer, channel closed") + } + Err(_) => MessageOwned::KeepAlive, + }; let uploaded_add = match &msg { Message::Piece(p) => Some(p.block.len()),