This commit is contained in:
Igor Katson 2023-11-28 09:23:05 +00:00
parent e012cd94a3
commit 0478577a72
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 31 additions and 20 deletions

View file

@ -11,12 +11,13 @@ use std::{
use crate::{ use crate::{
bprotocol::{ bprotocol::{
self, CompactNodeInfo, CompactPeerInfo, FindNodeRequest, GetPeersRequest, Message, self, CompactNodeInfo, CompactPeerInfo, FindNodeRequest, GetPeersRequest, Message,
MessageKind, Node, PingRequest, MessageKind, Node, PingRequest, Response,
}, },
routing_table::{InsertResult, RoutingTable}, routing_table::{InsertResult, RoutingTable},
RESPONSE_TIMEOUT,
}; };
use anyhow::Context; use anyhow::Context;
use bencode::ByteString; use bencode::{ByteBuf, ByteString};
use dashmap::DashMap; use dashmap::DashMap;
use futures::{stream::FuturesUnordered, Stream, StreamExt}; use futures::{stream::FuturesUnordered, Stream, StreamExt};
use indexmap::IndexSet; use indexmap::IndexSet;
@ -53,8 +54,6 @@ pub struct DhtState {
// Created requests: (transaction_id, addr) => Requests. // Created requests: (transaction_id, addr) => Requests.
// If we get a response, it gets removed from here. // If we get a response, it gets removed from here.
//
// TODO: clean up old entries
outstanding_requests_by_transaction_id: DashMap<(u16, SocketAddr), OutstandingRequest>, outstanding_requests_by_transaction_id: DashMap<(u16, SocketAddr), OutstandingRequest>,
// TODO: clean up old entries // TODO: clean up old entries
@ -112,7 +111,7 @@ impl DhtState {
spawn( spawn(
debug_span!("dht_request", tid = tid, addr = addr.to_string()), debug_span!("dht_request", tid = tid, addr = addr.to_string()),
async move { async move {
match tokio::time::timeout(Duration::from_secs(60), rx).await { match tokio::time::timeout(RESPONSE_TIMEOUT, rx).await {
Ok(Ok(_)) => {} Ok(Ok(_)) => {}
Ok(Err(e)) => { Ok(Err(e)) => {
this.outstanding_requests_by_transaction_id this.outstanding_requests_by_transaction_id
@ -165,6 +164,24 @@ impl DhtState {
(transaction_id, message) (transaction_id, message)
} }
fn on_response(
self: &Arc<Self>,
addr: SocketAddr,
request: Request,
response: Response<ByteString>,
) -> anyhow::Result<()> {
match request {
Request::FindNode(id) => {
let nodes = response
.nodes
.ok_or_else(|| anyhow::anyhow!("expected nodes for find_node requests"))?;
self.on_found_nodes(response.id, addr, id, nodes)
}
Request::GetPeers(id) => self.on_found_peers_or_nodes(response.id, addr, id, response),
Request::Ping => Ok(()),
}
}
fn on_incoming_from_remote( fn on_incoming_from_remote(
self: &Arc<Self>, self: &Arc<Self>,
msg: Message<ByteString>, msg: Message<ByteString>,
@ -220,18 +237,7 @@ impl DhtState {
_ => unreachable!(), _ => unreachable!(),
}; };
self.routing_table.write().mark_response(&response.id); self.routing_table.write().mark_response(&response.id);
match request { self.on_response(addr, request, response)
Request::FindNode(id) => {
let nodes = response.nodes.ok_or_else(|| {
anyhow::anyhow!("expected nodes for find_node requests")
})?;
self.on_found_nodes(response.id, addr, id, nodes)
}
Request::GetPeers(id) => {
self.on_found_peers_or_nodes(response.id, addr, id, response)
}
Request::Ping => Ok(()),
}
} }
MessageKind::PingRequest(_) => { MessageKind::PingRequest(_) => {
let message = Message { let message = Message {

View file

@ -5,6 +5,7 @@ mod routing_table;
mod utils; mod utils;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
pub use crate::dht::DhtStats; pub use crate::dht::DhtStats;
pub use crate::dht::{DhtConfig, DhtState}; pub use crate::dht::{DhtConfig, DhtState};
@ -13,6 +14,11 @@ pub use persistence::{PersistentDht, PersistentDhtConfig};
pub type Dht = Arc<DhtState>; pub type Dht = Arc<DhtState>;
// How long do we wait for a response from a DHT node.
pub(crate) const RESPONSE_TIMEOUT: Duration = Duration::from_secs(60);
// After how long should we ping the node again.
pub(crate) const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(15 * 60);
pub struct DhtBuilder {} pub struct DhtBuilder {}
impl DhtBuilder { impl DhtBuilder {

View file

@ -7,6 +7,8 @@ use librqbit_core::id20::Id20;
use serde::{ser::SerializeMap, Deserialize, Serialize}; use serde::{ser::SerializeMap, Deserialize, Serialize};
use tracing::debug; use tracing::debug;
use crate::{INACTIVITY_TIMEOUT, RESPONSE_TIMEOUT};
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
enum BucketTreeNodeData { enum BucketTreeNodeData {
// TODO: maybe replace that with SmallVec<8>? // TODO: maybe replace that with SmallVec<8>?
@ -438,9 +440,6 @@ impl RoutingTableNode {
self.addr self.addr
} }
pub fn status(&self) -> NodeStatus { pub fn status(&self) -> NodeStatus {
const RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(15 * 60);
match (self.last_request, self.last_response, self.last_query) { match (self.last_request, self.last_response, self.last_query) {
(None, _, _) => NodeStatus::Unknown, (None, _, _) => NodeStatus::Unknown,
// Nodes become bad when they fail to respond to multiple queries in a row. // Nodes become bad when they fail to respond to multiple queries in a row.