Return back rate limiting and not re-querying same nodes
This commit is contained in:
parent
3b3af34152
commit
74c11415f1
1 changed files with 33 additions and 13 deletions
|
|
@ -61,6 +61,23 @@ struct MaybeUsefulNode {
|
|||
last_response: Option<Instant>,
|
||||
}
|
||||
|
||||
fn make_rate_limiter() -> RateLimiter {
|
||||
// TODO: move to configuration, i'm lazy.
|
||||
let dht_queries_per_second = std::env::var("DHT_QUERIES_PER_SECOND")
|
||||
.map(|v| v.parse().expect("couldn't parse DHT_QUERIES_PER_SECOND"))
|
||||
.unwrap_or(250usize);
|
||||
|
||||
let per_100_ms = dht_queries_per_second / 10;
|
||||
|
||||
RateLimiter::builder()
|
||||
.initial(per_100_ms)
|
||||
.max(dht_queries_per_second)
|
||||
.interval(Duration::from_millis(100))
|
||||
.fair(false)
|
||||
.refill(per_100_ms)
|
||||
.build()
|
||||
}
|
||||
|
||||
pub struct DhtState {
|
||||
id: Id20,
|
||||
next_transaction_id: AtomicU16,
|
||||
|
|
@ -76,6 +93,7 @@ pub struct DhtState {
|
|||
listen_addr: SocketAddr,
|
||||
|
||||
// Sending requests to the worker.
|
||||
rate_limiter: RateLimiter,
|
||||
sender: UnboundedSender<WorkerSendRequest>,
|
||||
|
||||
seen_peers: DashMap<Id20, IndexSet<SocketAddr>>,
|
||||
|
|
@ -101,6 +119,7 @@ impl DhtState {
|
|||
listen_addr,
|
||||
seen_peers: Default::default(),
|
||||
get_peers_subscribers: Default::default(),
|
||||
rate_limiter: make_rate_limiter(),
|
||||
closest_responding_nodes_for_info_hash: Default::default(),
|
||||
recent_requests: Default::default(),
|
||||
}
|
||||
|
|
@ -137,7 +156,7 @@ impl DhtState {
|
|||
}
|
||||
|
||||
async fn request(&self, request: Request, addr: SocketAddr) -> anyhow::Result<ResponseOrError> {
|
||||
// self.rate_limiter.acquire_one().await;
|
||||
self.rate_limiter.acquire_one().await;
|
||||
let (tid, message) = self.create_request(request);
|
||||
let key = (tid, addr);
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
|
|
@ -449,18 +468,19 @@ impl DhtState {
|
|||
) -> anyhow::Result<()> {
|
||||
let key = (request, addr);
|
||||
|
||||
// use dashmap::mapref::entry::Entry;
|
||||
// match self.recent_requests.entry(key) {
|
||||
// Entry::Occupied(mut o) => {
|
||||
// if o.get().elapsed() < REQUERY_INTERVAL {
|
||||
// return Ok(());
|
||||
// }
|
||||
// o.insert(Instant::now());
|
||||
// }
|
||||
// Entry::Vacant(v) => {
|
||||
// v.insert(Instant::now());
|
||||
// }
|
||||
// }
|
||||
use dashmap::mapref::entry::Entry;
|
||||
match self.recent_requests.entry(key) {
|
||||
Entry::Occupied(mut o) => {
|
||||
// minus to account for randomness
|
||||
if o.get().elapsed() < REQUERY_INTERVAL - Duration::from_secs(1) {
|
||||
return Ok(());
|
||||
}
|
||||
o.insert(Instant::now());
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
let this = self.clone();
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue