From 0478577a728a1a7b44d48d4e7f8f632cb56f5cdd Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 28 Nov 2023 09:23:05 +0000 Subject: [PATCH] Nothing --- crates/dht/src/dht.rs | 40 +++++++++++++++++++-------------- crates/dht/src/lib.rs | 6 +++++ crates/dht/src/routing_table.rs | 5 ++--- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 52083b3..393fc38 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -11,12 +11,13 @@ use std::{ use crate::{ bprotocol::{ self, CompactNodeInfo, CompactPeerInfo, FindNodeRequest, GetPeersRequest, Message, - MessageKind, Node, PingRequest, + MessageKind, Node, PingRequest, Response, }, routing_table::{InsertResult, RoutingTable}, + RESPONSE_TIMEOUT, }; use anyhow::Context; -use bencode::ByteString; +use bencode::{ByteBuf, ByteString}; use dashmap::DashMap; use futures::{stream::FuturesUnordered, Stream, StreamExt}; use indexmap::IndexSet; @@ -53,8 +54,6 @@ pub struct DhtState { // Created requests: (transaction_id, addr) => Requests. // If we get a response, it gets removed from here. - // - // TODO: clean up old entries outstanding_requests_by_transaction_id: DashMap<(u16, SocketAddr), OutstandingRequest>, // TODO: clean up old entries @@ -112,7 +111,7 @@ impl DhtState { spawn( debug_span!("dht_request", tid = tid, addr = addr.to_string()), async move { - match tokio::time::timeout(Duration::from_secs(60), rx).await { + match tokio::time::timeout(RESPONSE_TIMEOUT, rx).await { Ok(Ok(_)) => {} Ok(Err(e)) => { this.outstanding_requests_by_transaction_id @@ -165,6 +164,24 @@ impl DhtState { (transaction_id, message) } + fn on_response( + self: &Arc, + addr: SocketAddr, + request: Request, + response: Response, + ) -> anyhow::Result<()> { + match request { + Request::FindNode(id) => { + let nodes = response + .nodes + .ok_or_else(|| anyhow::anyhow!("expected nodes for find_node requests"))?; + self.on_found_nodes(response.id, addr, id, nodes) + } + Request::GetPeers(id) => self.on_found_peers_or_nodes(response.id, addr, id, response), + Request::Ping => Ok(()), + } + } + fn on_incoming_from_remote( self: &Arc, msg: Message, @@ -220,18 +237,7 @@ impl DhtState { _ => unreachable!(), }; self.routing_table.write().mark_response(&response.id); - match request { - Request::FindNode(id) => { - let nodes = response.nodes.ok_or_else(|| { - anyhow::anyhow!("expected nodes for find_node requests") - })?; - self.on_found_nodes(response.id, addr, id, nodes) - } - Request::GetPeers(id) => { - self.on_found_peers_or_nodes(response.id, addr, id, response) - } - Request::Ping => Ok(()), - } + self.on_response(addr, request, response) } MessageKind::PingRequest(_) => { let message = Message { diff --git a/crates/dht/src/lib.rs b/crates/dht/src/lib.rs index 81713d3..9e5bfe4 100644 --- a/crates/dht/src/lib.rs +++ b/crates/dht/src/lib.rs @@ -5,6 +5,7 @@ mod routing_table; mod utils; use std::sync::Arc; +use std::time::Duration; pub use crate::dht::DhtStats; pub use crate::dht::{DhtConfig, DhtState}; @@ -13,6 +14,11 @@ pub use persistence::{PersistentDht, PersistentDhtConfig}; pub type Dht = Arc; +// How long do we wait for a response from a DHT node. +pub(crate) const RESPONSE_TIMEOUT: Duration = Duration::from_secs(60); +// After how long should we ping the node again. +pub(crate) const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(15 * 60); + pub struct DhtBuilder {} impl DhtBuilder { diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index 6e45ab9..fce5c14 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -7,6 +7,8 @@ use librqbit_core::id20::Id20; use serde::{ser::SerializeMap, Deserialize, Serialize}; use tracing::debug; +use crate::{INACTIVITY_TIMEOUT, RESPONSE_TIMEOUT}; + #[derive(Debug, Clone, Serialize, Deserialize)] enum BucketTreeNodeData { // TODO: maybe replace that with SmallVec<8>? @@ -438,9 +440,6 @@ impl RoutingTableNode { self.addr } pub fn status(&self) -> NodeStatus { - const RESPONSE_TIMEOUT: Duration = Duration::from_secs(10); - const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(15 * 60); - match (self.last_request, self.last_response, self.last_query) { (None, _, _) => NodeStatus::Unknown, // Nodes become bad when they fail to respond to multiple queries in a row.