diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 896ca88..ed9f0fc 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -177,6 +177,7 @@ pub struct TorrentStateLive { peer_queue_tx: UnboundedSender, finished_notify: Notify, + new_pieces_notify: Notify, down_speed_estimator: SpeedEstimator, up_speed_estimator: SpeedEstimator, @@ -233,6 +234,7 @@ impl TorrentStateLive { }, lengths, peer_semaphore: Arc::new(Semaphore::new(128)), + new_pieces_notify: Notify::new(), peer_queue_tx, finished_notify: Notify::new(), down_speed_estimator, @@ -905,6 +907,7 @@ impl PeerHandler { ); g.get_chunks_mut()? .mark_piece_broken_if_not_have(req.piece_index); + self.state.new_pieces_notify.notify_waiters(); } } PeerState::NotNeeded => { @@ -1226,6 +1229,7 @@ impl PeerHandler { // to download early pieces. // Then try get the next one in queue. // Afterwards means we are close to completion, try stealing more aggressively. + let new_piece_notify = self.state.new_pieces_notify.notified(); let next = match self .try_steal_old_slow_piece(10.) .map_or_else(|| self.reserve_next_needed_piece(), |v| Ok(Some(v)))? @@ -1234,7 +1238,12 @@ impl PeerHandler { Some(next) => next, None => { debug!("no pieces to request"); - async_backtrace::frame!(tokio::time::sleep(Duration::from_secs(10))).await; + let _ = async_backtrace::frame!(tokio::time::timeout( + Duration::from_secs(10), + new_piece_notify + )) + .await; + debug!("woken up"); continue; } }; @@ -1502,6 +1511,7 @@ impl PeerHandler { .lock_write("mark_piece_broken") .get_chunks_mut()? .mark_piece_broken_if_not_have(chunk_info.piece_index); + state.new_pieces_notify.notify_waiters(); anyhow::bail!("i am probably a bogus peer. dying.") } };