Remove timed out DHT requests

This commit is contained in:
Igor Katson 2023-11-28 08:56:27 +00:00
parent c7cf5eedef
commit e012cd94a3
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -42,6 +42,11 @@ pub struct DhtStats {
pub routing_table_size: usize,
}
struct OutstandingRequest {
request: Request,
done: tokio::sync::oneshot::Sender<()>,
}
pub struct DhtState {
id: Id20,
next_transaction_id: AtomicU16,
@ -50,7 +55,7 @@ pub struct DhtState {
// If we get a response, it gets removed from here.
//
// TODO: clean up old entries
outstanding_requests_by_transaction_id: DashMap<(u16, SocketAddr), Request>,
outstanding_requests_by_transaction_id: DashMap<(u16, SocketAddr), OutstandingRequest>,
// TODO: clean up old entries
made_requests_by_addr: DashMap<(Request, SocketAddr), Instant>,
@ -92,9 +97,39 @@ impl DhtState {
fn send_request(self: &Arc<Self>, request: Request, addr: SocketAddr) -> anyhow::Result<()> {
let (tid, msg) = self.create_request(request);
let (tx, rx) = tokio::sync::oneshot::channel();
self.outstanding_requests_by_transaction_id
.insert((tid, addr), request);
Ok(self.sender.send((msg, addr))?)
.insert((tid, addr), OutstandingRequest { request, done: tx });
match self.sender.send((msg, addr)) {
Ok(_) => {}
Err(e) => {
self.outstanding_requests_by_transaction_id
.remove(&(tid, addr));
return Err(e.into());
}
};
let this = self.clone();
spawn(
debug_span!("dht_request", tid = tid, addr = addr.to_string()),
async move {
match tokio::time::timeout(Duration::from_secs(60), rx).await {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
this.outstanding_requests_by_transaction_id
.remove(&(tid, addr));
warn!("recv error, did not expect this: {:?}", e);
}
Err(e) => {
this.outstanding_requests_by_transaction_id
.remove(&(tid, addr));
debug!("error: {:?}", e);
}
};
Ok(())
},
);
Ok(())
}
fn create_request(&self, request: Request) -> (u16, Message<ByteString>) {
@ -173,6 +208,10 @@ impl DhtState {
Some(req) => req,
None => anyhow::bail!("outstanding request not found. Message: {:?}", msg),
};
let request = {
let _ = request.done.send(());
request.request
};
let response = match msg.kind {
MessageKind::Error(e) => {
anyhow::bail!("request {:?} received error response {:?}", request, e)
@ -266,7 +305,7 @@ impl DhtState {
DhtStats {
id: self.id,
outstanding_requests: self.outstanding_requests_by_transaction_id.len(),
seen_peers: self.seen_peers.iter().map(|(e)| e.value().len()).sum(),
seen_peers: self.seen_peers.iter().map(|e| e.value().len()).sum(),
made_requests: self.made_requests_by_addr.len(),
routing_table_size: self.routing_table.read().len(),
}