From dc3da89b59b4799fc0d61f6efee018944b9611ed Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 29 Nov 2023 10:40:29 +0000 Subject: [PATCH] DHT routing table tracking errors better --- TODO.md | 7 ++-- crates/dht/src/dht.rs | 28 ++++++++++------ crates/dht/src/routing_table.rs | 58 ++++++++++++++++++++++++++------- 3 files changed, 68 insertions(+), 25 deletions(-) diff --git a/TODO.md b/TODO.md index ac09aa6..017b3d6 100644 --- a/TODO.md +++ b/TODO.md @@ -14,12 +14,13 @@ - [x] pause/unpause - [x] remove including from disk - [ ] DHT - - [ ] for torrents with a few seeds might be cool to re-query DHT once in a while. + - [x] many nodes in "Unknown" status, do smth about it + - [x] for torrents with a few seeds might be cool to re-query DHT once in a while. + - [ ] don't leak memory when deleting torrents (i.e. remove torrent information (seen peers etc) once the torrent is deleted) - [ ] Buckets that have not been changed in 15 minutes should be "refreshed." (per RFC) - [x] it's sending many requests now way too fast, locks up Mac OS UI annoyingly - [ ] After the search is exhausted, the client then inserts the peer contact information for itself onto the responding nodes with IDs closest to the infohash of the torrent. - - [ ] Bad actors: - - [ ] Ensure that if we query the "returned" nodes, they are even closer to our request than the responding node id was. + - [x] Ensure that if we query the "returned" nodes, they are even closer to our request than the responding node id was. someday: - [x] cancellation from the client-side for the lib (i.e. stop the torrent manager) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 5adadf5..fea416a 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -55,10 +55,12 @@ pub struct WorkerSendRequest { addr: SocketAddr, } +#[derive(Debug)] struct MaybeUsefulNode { id: Id20, addr: SocketAddr, last_response: Option, + returned_peers: bool, } fn make_rate_limiter() -> RateLimiter { @@ -226,6 +228,7 @@ impl DhtState { request: Request, response: Response, ) -> anyhow::Result<()> { + self.routing_table.write().mark_response(&response.id); match request { Request::FindNode(id) => { let nodes = response @@ -281,10 +284,7 @@ impl DhtState { let response_or_error = match msg.kind { MessageKind::Error(e) => ResponseOrError::Error(e), - MessageKind::Response(r) => { - self.routing_table.write().mark_response(&r.id); - ResponseOrError::Response(r) - } + MessageKind::Response(r) => ResponseOrError::Response(r), _ => unreachable!(), }; match request.done.send(Ok(response_or_error)) { @@ -492,6 +492,7 @@ impl DhtState { let resp = this.request(request, addr).await; match resp { Ok(ResponseOrError::Response(response)) => { + this.routing_table.write().mark_response(&target_node); match this.on_response(addr, request, response) { Ok(()) => {} Err(e) => { @@ -500,9 +501,11 @@ impl DhtState { } } Ok(ResponseOrError::Error(e)) => { + this.routing_table.write().mark_response(&target_node); debug!("error response: {:?}", e); } Err(e) => { + this.routing_table.write().mark_error(&target_node); debug!("error: {:?}", e); } }; @@ -592,19 +595,24 @@ impl DhtState { id: node_id, addr, last_response: None, + + returned_peers: false, }; match self.closest_responding_nodes_for_info_hash.entry(info_hash) { Entry::Occupied(mut occ) => { - const LIMIT: usize = 128; + // How many nodes to query per torrent. + const LIMIT: usize = 256; let v = occ.get_mut(); v.push(n); v.sort_by_key(|n| { - let responded = Reverse(n.last_response.is_some() as u8); + let has_returned_peers_desc = Reverse(n.returned_peers); + let has_responded_desc = Reverse(n.last_response.is_some() as u8); let distance = n.id.distance(&info_hash); - (responded, distance) + (has_returned_peers_desc, has_responded_desc, distance) }); - while v.len() > LIMIT { - if v.pop().unwrap().id == node_id { + if v.len() > LIMIT { + let popped = v.pop().unwrap(); + if popped.id == node_id { return false; } } @@ -626,7 +634,6 @@ impl DhtState { data: bprotocol::Response, ) -> anyhow::Result<()> { self.routing_table_add_node(source, source_addr); - self.routing_table.write().mark_response(&source); let bsender = match self.get_peers_subscribers.get(&info_hash) { Some(s) => s, @@ -645,6 +652,7 @@ impl DhtState { id: source, addr: source_addr, last_response: Some(Instant::now()), + returned_peers: data.values.as_ref().map(|p| !p.is_empty()).unwrap_or(false), }; match self.closest_responding_nodes_for_info_hash.entry(info_hash) { Entry::Occupied(mut useful_nodes) => { diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index 965a414..6cfb6da 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -1,7 +1,10 @@ use std::{net::SocketAddr, time::Instant}; use librqbit_core::id20::Id20; -use serde::{ser::SerializeMap, Deserialize, Serialize}; +use serde::{ + ser::{SerializeMap, SerializeStruct}, + Deserialize, Serialize, +}; use tracing::debug; use crate::{INACTIVITY_TIMEOUT, RESPONSE_TIMEOUT}; @@ -320,7 +323,7 @@ impl BucketTree { last_request: None, last_response: None, last_query: None, - outstanding_queries_in_a_row: 0, + errors_in_a_row: 0, }; if nodes.len() < 8 { @@ -407,7 +410,7 @@ impl Default for BucketTree { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Deserialize)] pub struct RoutingTableNode { #[serde(serialize_with = "crate::utils::serialize_id20")] id: Id20, @@ -419,9 +422,33 @@ pub struct RoutingTableNode { #[serde(skip)] last_query: Option, #[serde(skip)] - outstanding_queries_in_a_row: usize, + errors_in_a_row: usize, } +impl Serialize for RoutingTableNode { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut s = serializer.serialize_struct("RoutingTableNode", 3)?; + s.serialize_field("id", &self.id.as_string())?; + s.serialize_field("addr", &self.addr)?; + s.serialize_field("status", &self.status())?; + if let Some(l) = self.last_request { + s.serialize_field("last_request_ago", &l.elapsed())?; + } + if let Some(l) = self.last_response { + s.serialize_field("last_response_ago", &l.elapsed())?; + } + if let Some(l) = self.last_query { + s.serialize_field("last_query_ago", &l.elapsed())?; + } + s.serialize_field("errors_in_a_row", &self.errors_in_a_row)?; + s.end() + } +} + +#[derive(Serialize, Debug)] pub enum NodeStatus { Good, Questionable, @@ -440,12 +467,7 @@ impl RoutingTableNode { match (self.last_request, self.last_response, self.last_query) { (None, _, _) => NodeStatus::Unknown, // Nodes become bad when they fail to respond to multiple queries in a row. - (Some(last_request), _, _) - if last_request.elapsed() > RESPONSE_TIMEOUT - && self.outstanding_queries_in_a_row >= 2 => - { - NodeStatus::Bad - } + (Some(_), _, _) if self.errors_in_a_row >= 2 => NodeStatus::Bad, // A good node is a node has responded to one of our queries within the last 15 minutes. // A node is also good if it has ever responded to one of our queries and has sent @@ -468,7 +490,6 @@ impl RoutingTableNode { pub fn mark_outgoing_request(&mut self) { self.last_request = Some(Instant::now()); - self.outstanding_queries_in_a_row += 1; } pub fn mark_last_query(&mut self) { @@ -481,7 +502,11 @@ impl RoutingTableNode { if self.last_request.is_none() { self.last_request = Some(now); } - self.outstanding_queries_in_a_row = 0; + self.errors_in_a_row = 0; + } + + pub fn mark_error(&mut self) { + self.errors_in_a_row += 1; } } @@ -554,6 +579,15 @@ impl RoutingTable { true } + pub fn mark_error(&mut self, id: &Id20) -> bool { + let r = match self.buckets.get_mut(id) { + Some(r) => r, + None => return false, + }; + r.mark_error(); + true + } + pub fn mark_last_query(&mut self, id: &Id20) -> bool { let r = match self.buckets.get_mut(id) { Some(r) => r,