diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index ae25d70..1f2beeb 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -76,6 +76,7 @@ pub enum PeerState { Live(LivePeerState), // There was an error, and it's waiting for exponential backoff. Dead, + // We don't need to do anything with the peer any longer. // The peer has the full torrent, and we have the full torrent, so no need // to keep talking to it. NotNeeded, @@ -103,7 +104,7 @@ impl PeerState { } } - fn take_live(&mut self) -> Option { + pub fn take_live(&mut self) -> Option { if let PeerState::Live(_) = self { match std::mem::take(self) { PeerState::Live(l) => Some(l), @@ -152,10 +153,11 @@ impl PeerState { } } - pub fn live_to(&mut self, new_state: PeerState) -> Option { - let l = self.take_live()?; - *self = new_state; - Some(l) + pub fn to_not_needed(&mut self) -> Option { + match std::mem::replace(self, PeerState::NotNeeded) { + PeerState::Live(l) => Some(l), + _ => None, + } } } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index e948692..d85edf0 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -184,6 +184,10 @@ impl PeerStates { }; p.stats.backoff.reset(); } + + fn mark_peer_not_needed(&mut self, handle: PeerHandle) -> Option { + self.states.get_mut(&handle)?.state.to_not_needed() + } } pub struct TorrentStateLocked { @@ -316,6 +320,11 @@ impl TorrentState { async move { loop { let addr = peer_queue_rx.recv().await.unwrap(); + if state.is_finished() { + debug!("ignoring peer {} as we are finished", addr); + state.locked.write().peers.mark_peer_not_needed(addr); + continue; + } let permit = state.peer_semaphore.acquire().await.unwrap(); permit.forget(); @@ -499,8 +508,16 @@ impl TorrentState { fn on_peer_died(self: &Arc, handle: PeerHandle, error: Option) { let mut g = self.locked.write(); - match g.peers.mark_peer_dead(handle) { - Some(Some(live)) => { + let peer = match g.peers.states.get_mut(&handle) { + Some(peer) => peer, + None => { + warn!("bug: peer not found in table. Forgetting it forever"); + return; + } + }; + match std::mem::take(&mut peer.state) { + PeerState::Connecting(_) => {} + PeerState::Live(live) => { for req in live.inflight_requests { debug!( "peer dead, marking chunk request cancelled, index={}, chunk={}", @@ -510,17 +527,30 @@ impl TorrentState { g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); } } - // Other valid state to transition to dead. - Some(None) => {} - // Peer was in an unexpected state. - None => return, - } + PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => { + warn!("bug: peer was in a wrong state, ignoring it forever"); + g.peers.drop_peer(handle); + return; + } + }; + + // Re-borrow as we were modifying states above + // (otherwise borrow checker rightfully says we're wrong). + let peer = g.peers.states.get_mut(&handle).unwrap(); if error.is_none() { debug!("peer died without errors, not re-queueing"); + peer.state = PeerState::NotNeeded; return; } + if self.is_finished() { + debug!("torrent finished, not re-queueing"); + peer.state = PeerState::NotNeeded; + return; + } + + peer.state = PeerState::Dead; let backoff = { let peer = match g.peers.states.get_mut(&handle) { Some(p) => p, @@ -576,6 +606,10 @@ impl TorrentState { self.stats.downloaded_and_checked.load(Ordering::Relaxed) } + pub fn is_finished(&self) -> bool { + self.get_left_to_download() == 0 + } + pub fn get_left_to_download(&self) -> u64 { self.needed - self.get_downloaded() } @@ -675,7 +709,7 @@ impl TorrentState { } pub async fn wait_until_completed(&self) { - if self.get_left_to_download() == 0 { + if self.is_finished() { return; } self.finished_notify.notified().await; @@ -836,10 +870,11 @@ impl PeerHandler { .peers .clone_tx(handle) .context("peer dropped")?; - tx.send(WriterRequest::Message(MessageOwned::Unchoke)) - .context("peer dropped")?; - tx.send(WriterRequest::Message(MessageOwned::NotInterested)) - .context("peer dropped")?; + tx.send(WriterRequest::Message(MessageOwned::Unchoke))?; + tx.send(WriterRequest::Message(MessageOwned::NotInterested))?; + if self.state.is_finished() { + tx.send(WriterRequest::Disconnect)?; + } return Ok(()); } @@ -1129,7 +1164,7 @@ impl PeerHandler { debug!("piece={} successfully downloaded and verified", index); - if self.state.get_left_to_download() == 0 { + if self.state.is_finished() { self.state.finished_notify.notify_waiters(); self.disconnect_all_peers_that_have_full_torrent(); self.reopen_read_only()?; @@ -1157,7 +1192,7 @@ impl PeerHandler { for (_, peer) in g.peers.states.iter_mut() { if let PeerState::Live(l) = &peer.state { if l.has_full_torrent(self.state.lengths.total_pieces() as usize) { - let live = peer.state.live_to(PeerState::NotNeeded).unwrap(); + let live = peer.state.to_not_needed().unwrap(); let _ = live.tx.send(WriterRequest::Disconnect); } }