From f1cc9162e9dff50c1db378bc0b811afc11f48f91 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 20 Nov 2023 11:37:18 +0000 Subject: [PATCH] Move requests_sem to peer --- crates/librqbit/src/peer_state.rs | 4 ---- crates/librqbit/src/torrent_state.rs | 17 ++++++++++++----- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 7525874..1f883ed 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -224,9 +224,6 @@ pub struct LivePeerState { pub i_am_choked: bool, pub peer_interested: bool, - // This is used to limit the number of chunk requests we send to a peer at a time. - pub requests_sem: Arc, - // This is used to track the pieces the peer has. pub bitfield: BF, @@ -249,7 +246,6 @@ impl LivePeerState { peer_interested: false, bitfield: BF::new(), previously_requested_pieces: BF::new(), - requests_sem: Arc::new(Semaphore::new(0)), inflight_requests: Default::default(), tx, } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 7fa009f..21ea526 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -480,6 +480,7 @@ impl TorrentState { addr, on_bitfield_notify: Default::default(), have_notify: Default::default(), + requests_sem: Semaphore::new(0), state: state.clone(), spawner, }; @@ -908,8 +909,13 @@ struct PeerHandler { // This is used to unpause chunk requester once the bitfield // is received. on_bitfield_notify: Notify, + // This is used to unpause after we were choked. have_notify: Notify, + + // This is used to limit the number of chunk requests we send to a peer at a time. + requests_sem: Semaphore, + addr: SocketAddr, spawner: BlockingSpawner, } @@ -1119,13 +1125,14 @@ impl PeerHandler { }, }; - let (tx, sem) = + let sem = &self.requests_sem; + let tx = match self .state .peers .with_live_mut(handle, "peer_setup_for_piece_request", |l| { l.previously_requested_pieces.set(next.get() as usize, true); - (l.tx.clone(), l.requests_sem.clone()) + l.tx.clone() }) { Some(res) => res, None => return Ok(()), @@ -1222,11 +1229,11 @@ impl PeerHandler { fn on_i_am_unchoked(&self, handle: PeerHandle) { debug!("we are unchoked"); self.have_notify.notify_waiters(); + self.requests_sem.add_permits(16); self.state .peers .with_live_mut(handle, "on_i_am_unchoked", |live| { live.i_am_choked = false; - live.requests_sem.add_permits(16); }); } @@ -1242,11 +1249,11 @@ impl PeerHandler { } }; + self.requests_sem.add_permits(1); + self.state .peers .with_live_mut(handle, "inflight_requests.remove", |h| { - h.requests_sem.add_permits(1); - self.state .stats .fetched_bytes