Incoming peers now respect concurrency limits

This commit is contained in:
Igor Katson 2023-12-06 00:39:52 +00:00
parent 91873ed287
commit 0cd875e740
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -76,7 +76,7 @@ use sha1w::Sha1;
use tokio::{ use tokio::{
sync::{ sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Notify, Semaphore, Notify, OwnedSemaphorePermit, Semaphore,
}, },
time::timeout, time::timeout,
}; };
@ -178,7 +178,7 @@ pub struct TorrentStateLive {
lengths: Lengths, lengths: Lengths,
// Limits how many active (occupying network resources) peers there are at a moment in time. // Limits how many active (occupying network resources) peers there are at a moment in time.
peer_semaphore: Semaphore, peer_semaphore: Arc<Semaphore>,
// The queue for peer manager to connect to them. // The queue for peer manager to connect to them.
peer_queue_tx: UnboundedSender<SocketAddr>, peer_queue_tx: UnboundedSender<SocketAddr>,
@ -224,7 +224,7 @@ impl TorrentStateLive {
}, },
initially_needed_bytes: needed_bytes, initially_needed_bytes: needed_bytes,
lengths, lengths,
peer_semaphore: Semaphore::new(128), peer_semaphore: Arc::new(Semaphore::new(128)),
peer_queue_tx, peer_queue_tx,
finished_notify: Notify::new(), finished_notify: Notify::new(),
down_speed_estimator, down_speed_estimator,
@ -380,6 +380,16 @@ impl TorrentStateLive {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
use dashmap::mapref::entry::Entry; use dashmap::mapref::entry::Entry;
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let permit = match self.peer_semaphore.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
warn!("limit of live peers reached, dropping incoming peer");
self.peers.with_peer(checked_peer.addr, |p| {
atomic_inc(&p.stats.counters.incoming_connections);
});
return Ok(());
}
};
let counters = match self.peers.states.entry(checked_peer.addr) { let counters = match self.peers.states.entry(checked_peer.addr) {
Entry::Occupied(mut occ) => { Entry::Occupied(mut occ) => {
@ -411,7 +421,7 @@ impl TorrentStateLive {
"incoming peer", "incoming peer",
error_span!("manage_incoming_peer", addr = %checked_peer.addr), error_span!("manage_incoming_peer", addr = %checked_peer.addr),
self.clone() self.clone()
.task_manage_incoming_peer(checked_peer, counters, tx, rx), .task_manage_incoming_peer(checked_peer, counters, tx, rx, permit),
); );
Ok(()) Ok(())
} }
@ -422,6 +432,7 @@ impl TorrentStateLive {
counters: Arc<AtomicPeerCounters>, counters: Arc<AtomicPeerCounters>,
tx: PeerTx, tx: PeerTx,
rx: PeerRx, rx: PeerRx,
permit: OwnedSemaphorePermit,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// TODO: bump counters for incoming // TODO: bump counters for incoming
let handler = PeerHandler { let handler = PeerHandler {
@ -463,8 +474,6 @@ impl TorrentStateLive {
) => {r} ) => {r}
}; };
handler.state.peer_semaphore.add_permits(1);
match res { match res {
// We disconnected the peer ourselves as we don't need it // We disconnected the peer ourselves as we don't need it
Ok(()) => { Ok(()) => {
@ -475,10 +484,15 @@ impl TorrentStateLive {
handler.on_peer_died(Some(e))?; handler.on_peer_died(Some(e))?;
} }
}; };
drop(permit);
Ok(()) Ok(())
} }
async fn task_manage_outgoing_peer(self: Arc<Self>, addr: SocketAddr) -> anyhow::Result<()> { async fn task_manage_outgoing_peer(
self: Arc<Self>,
addr: SocketAddr,
permit: OwnedSemaphorePermit,
) -> anyhow::Result<()> {
let state = self; let state = self;
let (rx, tx) = state.peers.mark_peer_connecting(addr)?; let (rx, tx) = state.peers.mark_peer_connecting(addr)?;
let counters = state let counters = state
@ -523,8 +537,6 @@ impl TorrentStateLive {
r = peer_connection.manage_peer_outgoing(rx) => {r} r = peer_connection.manage_peer_outgoing(rx) => {r}
}; };
handler.state.peer_semaphore.add_permits(1);
match res { match res {
// We disconnected the peer ourselves as we don't need it // We disconnected the peer ourselves as we don't need it
Ok(()) => { Ok(()) => {
@ -535,6 +547,7 @@ impl TorrentStateLive {
handler.on_peer_died(Some(e))?; handler.on_peer_died(Some(e))?;
} }
} }
drop(permit);
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
} }
@ -551,12 +564,11 @@ impl TorrentStateLive {
continue; continue;
} }
let permit = state.peer_semaphore.acquire().await?; let permit = state.peer_semaphore.clone().acquire_owned().await?;
permit.forget();
state.spawn( state.spawn(
"manage_peer", "manage_peer",
error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()), error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()),
state.clone().task_manage_outgoing_peer(addr), state.clone().task_manage_outgoing_peer(addr, permit),
); );
} }
} }