diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index ea237df..90f0e0d 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -115,7 +115,7 @@ struct LivePeerState { struct PeerStates { states: HashMap, seen_peers: HashSet, - requested_pieces: HashSet, + inflight_pieces: HashSet, tx: HashMap>>, } @@ -800,7 +800,7 @@ impl TorrentManager { .locked .read() .peers - .requested_pieces + .inflight_pieces .iter() .choose(&mut rng) .copied() @@ -926,7 +926,7 @@ impl TorrentManager { .get_live_mut(peer_handle)? .requested_pieces .insert(n); - g.peers.requested_pieces.insert(n); + g.peers.inflight_pieces.insert(n); g.chunks.reserve_needed_piece(n); Some(n) } @@ -1075,21 +1075,27 @@ impl TorrentManager { Ok(()) } - fn on_received_piece(&self, handle: PeerHandle, piece: Piece) -> Option<()> { + fn on_received_piece( + &self, + handle: PeerHandle, + piece: Piece, + ) -> anyhow::Result<()> { let chunk_info = match self.inner.lengths.chunk_info_from_received_piece(&piece) { Some(i) => i, None => { - warn!( - "peer {} sent us a piece that is invalid {:?}, dropping", - handle, &piece, + anyhow::bail!( + "peer {} sent us a piece that is invalid {:?}", + handle, + &piece, ); - self.drop_peer(handle); - return None; } }; let mut g = self.inner.locked.write(); - let h = g.peers.get_live_mut(handle)?; + let h = match g.peers.get_live_mut(handle) { + Some(l) => l, + None => anyhow::bail!("peer is not live anymore"), + }; h.outstanding_requests.add_permits(1); self.inner @@ -1097,18 +1103,38 @@ impl TorrentManager { .fetch_add(piece.block.len() as u64, Ordering::Relaxed); if !h.requested_pieces.contains(&chunk_info.piece_index) { - // TODO: this is wrong, we need to distinguish between these cases. - warn!( - "peer {} sent us a piece that we did not ask for, dropping it. Requested pieces: {:?}. Got: {:?}", handle, &h.requested_pieces, &piece, + anyhow::bail!( + "peer {} sent us a piece that we did not ask it for. Requested pieces: {:?}. Got: {:?}", handle, &h.requested_pieces, &piece, ); - - // this prevents a deadlock. - drop(g); - - self.drop_peer(handle); - return None; } + match g.chunks.mark_chunk_downloaded(&piece) { + Some(ChunkMarkingResult::Completed) => { + debug!( + "piece={} done by {}, will write and checksum", + piece.index, handle + ); + // This will prevent others from stealing it. + g.peers.inflight_pieces.remove(&chunk_info.piece_index); + } + Some(ChunkMarkingResult::PreviouslyCompleted) => { + // TODO: we might need to send cancellations here. + debug!( + "piece={} was done by someone else {}, ignoring", + piece.index, handle + ); + return Ok(()); + } + Some(ChunkMarkingResult::NotCompleted) => return Ok(()), + None => { + anyhow::bail!( + "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", + handle, + piece + ); + } + }; + let this = self.clone(); spawn_blocking( @@ -1118,45 +1144,11 @@ impl TorrentManager { ), move || { let index = piece.index; + + // TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what + // should we really do? If we unmark it, it will get requested forever... this.write_chunk_blocking(handle, &piece, &chunk_info)?; - match this - .inner - .locked - .write() - .chunks - .mark_chunk_downloaded(&piece) - { - Some(ChunkMarkingResult::Completed) => { - debug!("piece={} done by {}, will checksum", piece.index, handle); - } - Some(ChunkMarkingResult::PreviouslyCompleted) => { - debug!( - "piece={} was done by someone else {}, ignoring", - piece.index, handle - ); - return Ok(()); - } - Some(ChunkMarkingResult::NotCompleted) => return Ok(()), - None => { - warn!( - "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", - handle, piece - ); - this.drop_peer(handle); - return Ok(()); - } - }; - - // Ignore responses about this piece from now on. - { - let mut g = this.inner.locked.write(); - g.peers - .get_live_mut(handle) - .map(|l| l.requested_pieces.remove(&chunk_info.piece_index)); - g.peers.requested_pieces.remove(&chunk_info.piece_index); - } - let clone = this.clone(); match clone .check_piece_blocking(handle, chunk_info.piece_index, &chunk_info) @@ -1194,13 +1186,12 @@ impl TorrentManager { .write() .chunks .mark_piece_needed(chunk_info.piece_index); - // this.drop_peer(handle); } }; Ok::<_, anyhow::Error>(()) }, ); - Some(()) + Ok(()) } fn into_handle(self) -> TorrentManagerHandle { TorrentManagerHandle { manager: self } @@ -1442,7 +1433,8 @@ impl TorrentManager { ) } Message::Piece(piece) => { - self.on_received_piece(handle, piece); + self.on_received_piece(handle, piece) + .context("error in on_received_piece()")?; } Message::KeepAlive => { debug!("keepalive received from {}", handle);