diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 365352c..5adadf5 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -61,6 +61,23 @@ struct MaybeUsefulNode { last_response: Option, } +fn make_rate_limiter() -> RateLimiter { + // TODO: move to configuration, i'm lazy. + let dht_queries_per_second = std::env::var("DHT_QUERIES_PER_SECOND") + .map(|v| v.parse().expect("couldn't parse DHT_QUERIES_PER_SECOND")) + .unwrap_or(250usize); + + let per_100_ms = dht_queries_per_second / 10; + + RateLimiter::builder() + .initial(per_100_ms) + .max(dht_queries_per_second) + .interval(Duration::from_millis(100)) + .fair(false) + .refill(per_100_ms) + .build() +} + pub struct DhtState { id: Id20, next_transaction_id: AtomicU16, @@ -76,6 +93,7 @@ pub struct DhtState { listen_addr: SocketAddr, // Sending requests to the worker. + rate_limiter: RateLimiter, sender: UnboundedSender, seen_peers: DashMap>, @@ -101,6 +119,7 @@ impl DhtState { listen_addr, seen_peers: Default::default(), get_peers_subscribers: Default::default(), + rate_limiter: make_rate_limiter(), closest_responding_nodes_for_info_hash: Default::default(), recent_requests: Default::default(), } @@ -137,7 +156,7 @@ impl DhtState { } async fn request(&self, request: Request, addr: SocketAddr) -> anyhow::Result { - // self.rate_limiter.acquire_one().await; + self.rate_limiter.acquire_one().await; let (tid, message) = self.create_request(request); let key = (tid, addr); let (tx, rx) = tokio::sync::oneshot::channel(); @@ -449,18 +468,19 @@ impl DhtState { ) -> anyhow::Result<()> { let key = (request, addr); - // use dashmap::mapref::entry::Entry; - // match self.recent_requests.entry(key) { - // Entry::Occupied(mut o) => { - // if o.get().elapsed() < REQUERY_INTERVAL { - // return Ok(()); - // } - // o.insert(Instant::now()); - // } - // Entry::Vacant(v) => { - // v.insert(Instant::now()); - // } - // } + use dashmap::mapref::entry::Entry; + match self.recent_requests.entry(key) { + Entry::Occupied(mut o) => { + // minus to account for randomness + if o.get().elapsed() < REQUERY_INTERVAL - Duration::from_secs(1) { + return Ok(()); + } + o.insert(Instant::now()); + } + Entry::Vacant(v) => { + v.insert(Instant::now()); + } + } let this = self.clone();