diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index fa520d4..d1ab12a 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -74,8 +74,28 @@ impl ChunkTracker { pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) { self.needed_pieces.set(index.get() as usize, false) } - pub fn mark_piece_needed(&mut self, index: ValidPieceIndex) -> bool { - info!("remarking piece={} as needed", index); + + // None if wrong chunk + // true if did something + // false if didn't do anything + pub fn mark_chunk_request_cancelled( + &mut self, + index: ValidPieceIndex, + chunk: u32, + ) -> Option { + if *self.have.get(index.get() as usize)? { + return Some(false); + } + // This will trigger the requesters to re-check each chunk in this piece. + let chunk_range = self.lengths.chunk_range(index); + if !self.chunk_status.get(chunk_range)?.all() { + self.needed_pieces.set(index.get() as usize, true); + } + Some(true) + } + + pub fn mark_piece_broken(&mut self, index: ValidPieceIndex) -> bool { + info!("remarking piece={} as broken", index); self.needed_pieces.set(index.get() as usize, true); self.chunk_status .get_mut(self.lengths.chunk_range(index)) diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 90f0e0d..b8c4005 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -97,6 +97,21 @@ enum PeerState { type BF = bitvec::vec::BitVec; +#[derive(Debug, Hash, PartialEq, Eq)] +struct InflightRequest { + piece: ValidPieceIndex, + chunk: u32, +} + +impl From<&ChunkInfo> for InflightRequest { + fn from(c: &ChunkInfo) -> Self { + Self { + piece: c.piece_index, + chunk: c.chunk_index, + } + } +} + struct LivePeerState { #[allow(unused)] peer_id: [u8; 20], @@ -108,7 +123,7 @@ struct LivePeerState { outstanding_requests: Arc, have_notify: Arc, bitfield: Option, - requested_pieces: HashSet, + inflight_requests: HashSet, } #[derive(Default)] @@ -161,6 +176,10 @@ impl PeerStates { } None } + fn try_get_live_mut(&mut self, handle: PeerHandle) -> anyhow::Result<&mut LivePeerState> { + self.get_live_mut(handle) + .ok_or_else(|| anyhow::anyhow!("peer dropped")) + } fn add( &mut self, addr: SocketAddr, @@ -862,15 +881,24 @@ impl TorrentManager { None => return Ok(()), }; for chunk in self.inner.lengths.iter_chunk_infos(next) { - if is_stolen && self.inner.locked.read().chunks.is_chunk_downloaded(&chunk) { + if self.inner.locked.read().chunks.is_chunk_downloaded(&chunk) { continue; } + self.inner + .locked + .write() + .peers + .try_get_live_mut(handle)? + .inflight_requests + .insert(InflightRequest::from(&chunk)); + let request = Request { index: next.get(), begin: chunk.offset, length: chunk.size, }; sem.acquire().await?.forget(); + tx.send(MessageOwned::Request(request)) .await .context("peer dropped")?; @@ -921,11 +949,6 @@ impl TorrentManager { self.inner.lengths.validate_piece_index(n_opt? as u32)? }; - - g.peers - .get_live_mut(peer_handle)? - .requested_pieces - .insert(n); g.peers.inflight_pieces.insert(n); g.chunks.reserve_needed_piece(n); Some(n) @@ -1092,19 +1115,16 @@ impl TorrentManager { }; let mut g = self.inner.locked.write(); - let h = match g.peers.get_live_mut(handle) { - Some(l) => l, - None => anyhow::bail!("peer is not live anymore"), - }; + let h = g.peers.try_get_live_mut(handle)?; h.outstanding_requests.add_permits(1); self.inner .fetched_bytes .fetch_add(piece.block.len() as u64, Ordering::Relaxed); - if !h.requested_pieces.contains(&chunk_info.piece_index) { + if !h.inflight_requests.remove(&(&chunk_info).into()) { anyhow::bail!( - "peer {} sent us a piece that we did not ask it for. Requested pieces: {:?}. Got: {:?}", handle, &h.requested_pieces, &piece, + "peer {} sent us a piece that we did not ask it for. Requested pieces: {:?}. Got: {:?}", handle, &h.inflight_requests, &piece, ); } @@ -1185,7 +1205,7 @@ impl TorrentManager { .locked .write() .chunks - .mark_piece_needed(chunk_info.piece_index); + .mark_piece_broken(chunk_info.piece_index); } }; Ok::<_, anyhow::Error>(()) @@ -1270,7 +1290,7 @@ impl TorrentManager { bitfield: None, have_notify: Arc::new(Notify::new()), outstanding_requests: Arc::new(Semaphore::new(0)), - requested_pieces: Default::default(), + inflight_requests: Default::default(), }); } _ => { @@ -1467,8 +1487,8 @@ impl TorrentManager { match peer { PeerState::Connecting(_) => {} PeerState::Live(l) => { - for piece in l.requested_pieces { - g.chunks.mark_piece_needed(piece); + for req in l.inflight_requests { + g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); } } }