From e012cd94a35d60da129120c687d1c30f2371fabe Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 28 Nov 2023 08:56:27 +0000 Subject: [PATCH] Remove timed out DHT requests --- crates/dht/src/dht.rs | 47 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 5769893..52083b3 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -42,6 +42,11 @@ pub struct DhtStats { pub routing_table_size: usize, } +struct OutstandingRequest { + request: Request, + done: tokio::sync::oneshot::Sender<()>, +} + pub struct DhtState { id: Id20, next_transaction_id: AtomicU16, @@ -50,7 +55,7 @@ pub struct DhtState { // If we get a response, it gets removed from here. // // TODO: clean up old entries - outstanding_requests_by_transaction_id: DashMap<(u16, SocketAddr), Request>, + outstanding_requests_by_transaction_id: DashMap<(u16, SocketAddr), OutstandingRequest>, // TODO: clean up old entries made_requests_by_addr: DashMap<(Request, SocketAddr), Instant>, @@ -92,9 +97,39 @@ impl DhtState { fn send_request(self: &Arc, request: Request, addr: SocketAddr) -> anyhow::Result<()> { let (tid, msg) = self.create_request(request); + let (tx, rx) = tokio::sync::oneshot::channel(); self.outstanding_requests_by_transaction_id - .insert((tid, addr), request); - Ok(self.sender.send((msg, addr))?) + .insert((tid, addr), OutstandingRequest { request, done: tx }); + match self.sender.send((msg, addr)) { + Ok(_) => {} + Err(e) => { + self.outstanding_requests_by_transaction_id + .remove(&(tid, addr)); + return Err(e.into()); + } + }; + let this = self.clone(); + spawn( + debug_span!("dht_request", tid = tid, addr = addr.to_string()), + async move { + match tokio::time::timeout(Duration::from_secs(60), rx).await { + Ok(Ok(_)) => {} + Ok(Err(e)) => { + this.outstanding_requests_by_transaction_id + .remove(&(tid, addr)); + warn!("recv error, did not expect this: {:?}", e); + } + Err(e) => { + this.outstanding_requests_by_transaction_id + .remove(&(tid, addr)); + debug!("error: {:?}", e); + } + }; + + Ok(()) + }, + ); + Ok(()) } fn create_request(&self, request: Request) -> (u16, Message) { @@ -173,6 +208,10 @@ impl DhtState { Some(req) => req, None => anyhow::bail!("outstanding request not found. Message: {:?}", msg), }; + let request = { + let _ = request.done.send(()); + request.request + }; let response = match msg.kind { MessageKind::Error(e) => { anyhow::bail!("request {:?} received error response {:?}", request, e) @@ -266,7 +305,7 @@ impl DhtState { DhtStats { id: self.id, outstanding_requests: self.outstanding_requests_by_transaction_id.len(), - seen_peers: self.seen_peers.iter().map(|(e)| e.value().len()).sum(), + seen_peers: self.seen_peers.iter().map(|e| e.value().len()).sum(), made_requests: self.made_requests_by_addr.len(), routing_table_size: self.routing_table.read().len(), }