DHT Rate limiting
This commit is contained in:
parent
50ca906421
commit
56311fb4df
6 changed files with 52 additions and 12 deletions
|
|
@ -3,6 +3,7 @@ use std::{
|
|||
net::SocketAddr,
|
||||
sync::Arc,
|
||||
task::Poll,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
|
|
@ -16,6 +17,7 @@ use anyhow::Context;
|
|||
use bencode::ByteString;
|
||||
use futures::{stream::FuturesUnordered, Stream, StreamExt};
|
||||
use indexmap::IndexSet;
|
||||
use leaky_bucket::RateLimiter;
|
||||
use librqbit_core::{id20::Id20, peer_id::generate_peer_id};
|
||||
use parking_lot::RwLock;
|
||||
use rand::Rng;
|
||||
|
|
@ -404,6 +406,23 @@ impl DhtState {
|
|||
}
|
||||
}
|
||||
|
||||
fn make_rate_limiter() -> RateLimiter {
|
||||
// TODO: move to configuration, i'm lazy.
|
||||
let dht_queries_per_second = std::env::var("DHT_QUERIES_PER_SECOND")
|
||||
.map(|v| v.parse().expect("couldn't parse DHT_QUERIES_PER_SECOND"))
|
||||
.unwrap_or(250usize);
|
||||
|
||||
let per_100_ms = dht_queries_per_second / 10;
|
||||
|
||||
RateLimiter::builder()
|
||||
.initial(per_100_ms)
|
||||
.max(dht_queries_per_second)
|
||||
.interval(Duration::from_millis(100))
|
||||
.fair(false)
|
||||
.refill(per_100_ms)
|
||||
.build()
|
||||
}
|
||||
|
||||
async fn run_framer(
|
||||
socket: &UdpSocket,
|
||||
mut input_rx: UnboundedReceiver<(Message<ByteString>, SocketAddr)>,
|
||||
|
|
@ -411,11 +430,13 @@ async fn run_framer(
|
|||
) -> anyhow::Result<()> {
|
||||
let writer = async {
|
||||
let mut buf = Vec::new();
|
||||
let rate_limiter = make_rate_limiter();
|
||||
while let Some((msg, addr)) = input_rx.recv().await {
|
||||
let addr = match addr {
|
||||
SocketAddr::V4(v4) => v4,
|
||||
SocketAddr::V6(_) => continue,
|
||||
};
|
||||
rate_limiter.acquire_one().await;
|
||||
trace!("{}: sending {:?}", addr, &msg);
|
||||
buf.clear();
|
||||
bprotocol::serialize_message(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue