From 0cd875e740911377aba147a4e23fb90ee42fe25a Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 6 Dec 2023 00:39:52 +0000 Subject: [PATCH] Incoming peers now respect concurrency limits --- crates/librqbit/src/torrent_state/live/mod.rs | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 1f26493..5557c4a 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -76,7 +76,7 @@ use sha1w::Sha1; use tokio::{ sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - Notify, Semaphore, + Notify, OwnedSemaphorePermit, Semaphore, }, time::timeout, }; @@ -178,7 +178,7 @@ pub struct TorrentStateLive { lengths: Lengths, // Limits how many active (occupying network resources) peers there are at a moment in time. - peer_semaphore: Semaphore, + peer_semaphore: Arc, // The queue for peer manager to connect to them. peer_queue_tx: UnboundedSender, @@ -224,7 +224,7 @@ impl TorrentStateLive { }, initially_needed_bytes: needed_bytes, lengths, - peer_semaphore: Semaphore::new(128), + peer_semaphore: Arc::new(Semaphore::new(128)), peer_queue_tx, finished_notify: Notify::new(), down_speed_estimator, @@ -380,6 +380,16 @@ impl TorrentStateLive { ) -> anyhow::Result<()> { use dashmap::mapref::entry::Entry; let (tx, rx) = unbounded_channel(); + let permit = match self.peer_semaphore.clone().try_acquire_owned() { + Ok(permit) => permit, + Err(_) => { + warn!("limit of live peers reached, dropping incoming peer"); + self.peers.with_peer(checked_peer.addr, |p| { + atomic_inc(&p.stats.counters.incoming_connections); + }); + return Ok(()); + } + }; let counters = match self.peers.states.entry(checked_peer.addr) { Entry::Occupied(mut occ) => { @@ -411,7 +421,7 @@ impl TorrentStateLive { "incoming peer", error_span!("manage_incoming_peer", addr = %checked_peer.addr), self.clone() - .task_manage_incoming_peer(checked_peer, counters, tx, rx), + .task_manage_incoming_peer(checked_peer, counters, tx, rx, permit), ); Ok(()) } @@ -422,6 +432,7 @@ impl TorrentStateLive { counters: Arc, tx: PeerTx, rx: PeerRx, + permit: OwnedSemaphorePermit, ) -> anyhow::Result<()> { // TODO: bump counters for incoming let handler = PeerHandler { @@ -463,8 +474,6 @@ impl TorrentStateLive { ) => {r} }; - handler.state.peer_semaphore.add_permits(1); - match res { // We disconnected the peer ourselves as we don't need it Ok(()) => { @@ -475,10 +484,15 @@ impl TorrentStateLive { handler.on_peer_died(Some(e))?; } }; + drop(permit); Ok(()) } - async fn task_manage_outgoing_peer(self: Arc, addr: SocketAddr) -> anyhow::Result<()> { + async fn task_manage_outgoing_peer( + self: Arc, + addr: SocketAddr, + permit: OwnedSemaphorePermit, + ) -> anyhow::Result<()> { let state = self; let (rx, tx) = state.peers.mark_peer_connecting(addr)?; let counters = state @@ -523,8 +537,6 @@ impl TorrentStateLive { r = peer_connection.manage_peer_outgoing(rx) => {r} }; - handler.state.peer_semaphore.add_permits(1); - match res { // We disconnected the peer ourselves as we don't need it Ok(()) => { @@ -535,6 +547,7 @@ impl TorrentStateLive { handler.on_peer_died(Some(e))?; } } + drop(permit); Ok::<_, anyhow::Error>(()) } @@ -551,12 +564,11 @@ impl TorrentStateLive { continue; } - let permit = state.peer_semaphore.acquire().await?; - permit.forget(); + let permit = state.peer_semaphore.clone().acquire_owned().await?; state.spawn( "manage_peer", error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()), - state.clone().task_manage_outgoing_peer(addr), + state.clone().task_manage_outgoing_peer(addr, permit), ); } }