Move requests_sem to peer

This commit is contained in:
Igor Katson 2023-11-20 11:37:18 +00:00
parent 99dfb14895
commit f1cc9162e9
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 12 additions and 9 deletions

View file

@ -224,9 +224,6 @@ pub struct LivePeerState {
pub i_am_choked: bool, pub i_am_choked: bool,
pub peer_interested: bool, pub peer_interested: bool,
// This is used to limit the number of chunk requests we send to a peer at a time.
pub requests_sem: Arc<Semaphore>,
// This is used to track the pieces the peer has. // This is used to track the pieces the peer has.
pub bitfield: BF, pub bitfield: BF,
@ -249,7 +246,6 @@ impl LivePeerState {
peer_interested: false, peer_interested: false,
bitfield: BF::new(), bitfield: BF::new(),
previously_requested_pieces: BF::new(), previously_requested_pieces: BF::new(),
requests_sem: Arc::new(Semaphore::new(0)),
inflight_requests: Default::default(), inflight_requests: Default::default(),
tx, tx,
} }

View file

@ -480,6 +480,7 @@ impl TorrentState {
addr, addr,
on_bitfield_notify: Default::default(), on_bitfield_notify: Default::default(),
have_notify: Default::default(), have_notify: Default::default(),
requests_sem: Semaphore::new(0),
state: state.clone(), state: state.clone(),
spawner, spawner,
}; };
@ -908,8 +909,13 @@ struct PeerHandler {
// This is used to unpause chunk requester once the bitfield // This is used to unpause chunk requester once the bitfield
// is received. // is received.
on_bitfield_notify: Notify, on_bitfield_notify: Notify,
// This is used to unpause after we were choked. // This is used to unpause after we were choked.
have_notify: Notify, have_notify: Notify,
// This is used to limit the number of chunk requests we send to a peer at a time.
requests_sem: Semaphore,
addr: SocketAddr, addr: SocketAddr,
spawner: BlockingSpawner, spawner: BlockingSpawner,
} }
@ -1119,13 +1125,14 @@ impl PeerHandler {
}, },
}; };
let (tx, sem) = let sem = &self.requests_sem;
let tx =
match self match self
.state .state
.peers .peers
.with_live_mut(handle, "peer_setup_for_piece_request", |l| { .with_live_mut(handle, "peer_setup_for_piece_request", |l| {
l.previously_requested_pieces.set(next.get() as usize, true); l.previously_requested_pieces.set(next.get() as usize, true);
(l.tx.clone(), l.requests_sem.clone()) l.tx.clone()
}) { }) {
Some(res) => res, Some(res) => res,
None => return Ok(()), None => return Ok(()),
@ -1222,11 +1229,11 @@ impl PeerHandler {
fn on_i_am_unchoked(&self, handle: PeerHandle) { fn on_i_am_unchoked(&self, handle: PeerHandle) {
debug!("we are unchoked"); debug!("we are unchoked");
self.have_notify.notify_waiters(); self.have_notify.notify_waiters();
self.requests_sem.add_permits(16);
self.state self.state
.peers .peers
.with_live_mut(handle, "on_i_am_unchoked", |live| { .with_live_mut(handle, "on_i_am_unchoked", |live| {
live.i_am_choked = false; live.i_am_choked = false;
live.requests_sem.add_permits(16);
}); });
} }
@ -1242,11 +1249,11 @@ impl PeerHandler {
} }
}; };
self.requests_sem.add_permits(1);
self.state self.state
.peers .peers
.with_live_mut(handle, "inflight_requests.remove", |h| { .with_live_mut(handle, "inflight_requests.remove", |h| {
h.requests_sem.add_permits(1);
self.state self.state
.stats .stats
.fetched_bytes .fetched_bytes