From 692fef13944f1bf76d6034f7d616c1385d03ae5c Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 27 Nov 2023 19:03:39 +0000 Subject: [PATCH] Poor mans algo to resend requests --- crates/dht/src/dht.rs | 55 +++++++++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 13 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 779c214..533c221 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -1,9 +1,9 @@ use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, HashMap}, net::SocketAddr, sync::Arc, task::Poll, - time::Duration, + time::{Duration, Instant}, }; use crate::{ @@ -42,7 +42,16 @@ pub struct DhtStats { struct DhtState { id: Id20, next_transaction_id: u16, - outstanding_requests: HashMap<(u16, SocketAddr), Request>, + + // Created requests: (transaction_id, addr) => Requests. + // If we get a response, it gets removed from here. + // + // TODO: clean up old entries + outstanding_requests_by_transaction_id: HashMap<(u16, SocketAddr), Request>, + + // TODO: clean up old entries + made_requests_by_addr: HashMap<(Request, SocketAddr), Instant>, + routing_table: RoutingTable, listen_addr: SocketAddr, @@ -55,8 +64,6 @@ struct DhtState { seen_peers: HashMap>, get_peers_subscribers: HashMap>, - - made_requests: HashSet<(Request, SocketAddr)>, } impl DhtState { @@ -70,13 +77,13 @@ impl DhtState { Self { id, next_transaction_id: 0, - outstanding_requests: Default::default(), + outstanding_requests_by_transaction_id: Default::default(), routing_table, sender, listen_addr, seen_peers: Default::default(), get_peers_subscribers: Default::default(), - made_requests: Default::default(), + made_requests_by_addr: Default::default(), } } @@ -115,10 +122,11 @@ impl DhtState { kind: MessageKind::PingRequest(PingRequest { id: self.id }), }, }; - self.outstanding_requests + self.outstanding_requests_by_transaction_id .insert((transaction_id, addr), request); message } + fn on_incoming_from_remote( &mut self, msg: Message, @@ -153,7 +161,10 @@ impl DhtState { ) } let tid = ((msg.transaction_id[0] as u16) << 8) + (msg.transaction_id[1] as u16); - let request = match self.outstanding_requests.remove(&(tid, addr)) { + let request = match self + .outstanding_requests_by_transaction_id + .remove(&(tid, addr)) + { Some(req) => req, None => anyhow::bail!("outstanding request not found. Message: {:?}", msg), }; @@ -249,9 +260,9 @@ impl DhtState { pub fn get_stats(&self) -> DhtStats { DhtStats { id: self.id, - outstanding_requests: self.outstanding_requests.len(), + outstanding_requests: self.outstanding_requests_by_transaction_id.len(), seen_peers: self.seen_peers.values().map(|v| v.len()).sum(), - made_requests: self.made_requests.len(), + made_requests: self.made_requests_by_addr.len(), routing_table_size: self.routing_table.len(), } } @@ -299,6 +310,24 @@ impl DhtState { } } + fn should_request(&mut self, request: Request, addr: SocketAddr) -> bool { + const RE_REQUEST_TIME: Duration = Duration::from_secs(10 * 60); + match self.made_requests_by_addr.entry((request, addr)) { + Entry::Occupied(mut o) => { + if o.get().elapsed() > RE_REQUEST_TIME { + o.insert(Instant::now()); + true + } else { + false + } + } + Entry::Vacant(v) => { + v.insert(Instant::now()); + true + } + } + } + fn send_find_peers_if_not_yet( &mut self, info_hash: Id20, @@ -306,7 +335,7 @@ impl DhtState { addr: SocketAddr, ) -> anyhow::Result<()> { let request = Request::GetPeers(info_hash); - if self.made_requests.insert((request, addr)) { + if self.should_request(request, addr) { self.routing_table.mark_outgoing_request(&target_node); let msg = self.create_request(request, addr); self.sender.send((msg, addr))?; @@ -321,7 +350,7 @@ impl DhtState { addr: SocketAddr, ) -> anyhow::Result<()> { let request = Request::FindNode(search_id); - if self.made_requests.insert((request, addr)) { + if self.should_request(request, addr) { self.routing_table.mark_outgoing_request(&target_node); let msg = self.create_request(request, addr); self.sender.send((msg, addr))?;