Notify for released pieces

This commit is contained in:
Igor Katson 2024-08-19 13:16:33 +01:00
parent 032b34c5d6
commit 0cb92eb333
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -177,6 +177,7 @@ pub struct TorrentStateLive {
peer_queue_tx: UnboundedSender<SocketAddr>, peer_queue_tx: UnboundedSender<SocketAddr>,
finished_notify: Notify, finished_notify: Notify,
new_pieces_notify: Notify,
down_speed_estimator: SpeedEstimator, down_speed_estimator: SpeedEstimator,
up_speed_estimator: SpeedEstimator, up_speed_estimator: SpeedEstimator,
@ -233,6 +234,7 @@ impl TorrentStateLive {
}, },
lengths, lengths,
peer_semaphore: Arc::new(Semaphore::new(128)), peer_semaphore: Arc::new(Semaphore::new(128)),
new_pieces_notify: Notify::new(),
peer_queue_tx, peer_queue_tx,
finished_notify: Notify::new(), finished_notify: Notify::new(),
down_speed_estimator, down_speed_estimator,
@ -905,6 +907,7 @@ impl PeerHandler {
); );
g.get_chunks_mut()? g.get_chunks_mut()?
.mark_piece_broken_if_not_have(req.piece_index); .mark_piece_broken_if_not_have(req.piece_index);
self.state.new_pieces_notify.notify_waiters();
} }
} }
PeerState::NotNeeded => { PeerState::NotNeeded => {
@ -1226,6 +1229,7 @@ impl PeerHandler {
// to download early pieces. // to download early pieces.
// Then try get the next one in queue. // Then try get the next one in queue.
// Afterwards means we are close to completion, try stealing more aggressively. // Afterwards means we are close to completion, try stealing more aggressively.
let new_piece_notify = self.state.new_pieces_notify.notified();
let next = match self let next = match self
.try_steal_old_slow_piece(10.) .try_steal_old_slow_piece(10.)
.map_or_else(|| self.reserve_next_needed_piece(), |v| Ok(Some(v)))? .map_or_else(|| self.reserve_next_needed_piece(), |v| Ok(Some(v)))?
@ -1234,7 +1238,12 @@ impl PeerHandler {
Some(next) => next, Some(next) => next,
None => { None => {
debug!("no pieces to request"); debug!("no pieces to request");
async_backtrace::frame!(tokio::time::sleep(Duration::from_secs(10))).await; let _ = async_backtrace::frame!(tokio::time::timeout(
Duration::from_secs(10),
new_piece_notify
))
.await;
debug!("woken up");
continue; continue;
} }
}; };
@ -1502,6 +1511,7 @@ impl PeerHandler {
.lock_write("mark_piece_broken") .lock_write("mark_piece_broken")
.get_chunks_mut()? .get_chunks_mut()?
.mark_piece_broken_if_not_have(chunk_info.piece_index); .mark_piece_broken_if_not_have(chunk_info.piece_index);
state.new_pieces_notify.notify_waiters();
anyhow::bail!("i am probably a bogus peer. dying.") anyhow::bail!("i am probably a bogus peer. dying.")
} }
}; };