DHT routing table tracking errors better

This commit is contained in:
Igor Katson 2023-11-29 10:40:29 +00:00
parent 74c11415f1
commit dc3da89b59
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 68 additions and 25 deletions

View file

@ -14,12 +14,13 @@
- [x] pause/unpause
- [x] remove including from disk
- [ ] DHT
- [ ] for torrents with a few seeds might be cool to re-query DHT once in a while.
- [x] many nodes in "Unknown" status, do smth about it
- [x] for torrents with a few seeds might be cool to re-query DHT once in a while.
- [ ] don't leak memory when deleting torrents (i.e. remove torrent information (seen peers etc) once the torrent is deleted)
- [ ] Buckets that have not been changed in 15 minutes should be "refreshed." (per RFC)
- [x] it's sending many requests now way too fast, locks up Mac OS UI annoyingly
- [ ] After the search is exhausted, the client then inserts the peer contact information for itself onto the responding nodes with IDs closest to the infohash of the torrent.
- [ ] Bad actors:
- [ ] Ensure that if we query the "returned" nodes, they are even closer to our request than the responding node id was.
- [x] Ensure that if we query the "returned" nodes, they are even closer to our request than the responding node id was.
someday:
- [x] cancellation from the client-side for the lib (i.e. stop the torrent manager)

View file

@ -55,10 +55,12 @@ pub struct WorkerSendRequest {
addr: SocketAddr,
}
#[derive(Debug)]
struct MaybeUsefulNode {
id: Id20,
addr: SocketAddr,
last_response: Option<Instant>,
returned_peers: bool,
}
fn make_rate_limiter() -> RateLimiter {
@ -226,6 +228,7 @@ impl DhtState {
request: Request,
response: Response<ByteString>,
) -> anyhow::Result<()> {
self.routing_table.write().mark_response(&response.id);
match request {
Request::FindNode(id) => {
let nodes = response
@ -281,10 +284,7 @@ impl DhtState {
let response_or_error = match msg.kind {
MessageKind::Error(e) => ResponseOrError::Error(e),
MessageKind::Response(r) => {
self.routing_table.write().mark_response(&r.id);
ResponseOrError::Response(r)
}
MessageKind::Response(r) => ResponseOrError::Response(r),
_ => unreachable!(),
};
match request.done.send(Ok(response_or_error)) {
@ -492,6 +492,7 @@ impl DhtState {
let resp = this.request(request, addr).await;
match resp {
Ok(ResponseOrError::Response(response)) => {
this.routing_table.write().mark_response(&target_node);
match this.on_response(addr, request, response) {
Ok(()) => {}
Err(e) => {
@ -500,9 +501,11 @@ impl DhtState {
}
}
Ok(ResponseOrError::Error(e)) => {
this.routing_table.write().mark_response(&target_node);
debug!("error response: {:?}", e);
}
Err(e) => {
this.routing_table.write().mark_error(&target_node);
debug!("error: {:?}", e);
}
};
@ -592,19 +595,24 @@ impl DhtState {
id: node_id,
addr,
last_response: None,
returned_peers: false,
};
match self.closest_responding_nodes_for_info_hash.entry(info_hash) {
Entry::Occupied(mut occ) => {
const LIMIT: usize = 128;
// How many nodes to query per torrent.
const LIMIT: usize = 256;
let v = occ.get_mut();
v.push(n);
v.sort_by_key(|n| {
let responded = Reverse(n.last_response.is_some() as u8);
let has_returned_peers_desc = Reverse(n.returned_peers);
let has_responded_desc = Reverse(n.last_response.is_some() as u8);
let distance = n.id.distance(&info_hash);
(responded, distance)
(has_returned_peers_desc, has_responded_desc, distance)
});
while v.len() > LIMIT {
if v.pop().unwrap().id == node_id {
if v.len() > LIMIT {
let popped = v.pop().unwrap();
if popped.id == node_id {
return false;
}
}
@ -626,7 +634,6 @@ impl DhtState {
data: bprotocol::Response<ByteString>,
) -> anyhow::Result<()> {
self.routing_table_add_node(source, source_addr);
self.routing_table.write().mark_response(&source);
let bsender = match self.get_peers_subscribers.get(&info_hash) {
Some(s) => s,
@ -645,6 +652,7 @@ impl DhtState {
id: source,
addr: source_addr,
last_response: Some(Instant::now()),
returned_peers: data.values.as_ref().map(|p| !p.is_empty()).unwrap_or(false),
};
match self.closest_responding_nodes_for_info_hash.entry(info_hash) {
Entry::Occupied(mut useful_nodes) => {

View file

@ -1,7 +1,10 @@
use std::{net::SocketAddr, time::Instant};
use librqbit_core::id20::Id20;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use serde::{
ser::{SerializeMap, SerializeStruct},
Deserialize, Serialize,
};
use tracing::debug;
use crate::{INACTIVITY_TIMEOUT, RESPONSE_TIMEOUT};
@ -320,7 +323,7 @@ impl BucketTree {
last_request: None,
last_response: None,
last_query: None,
outstanding_queries_in_a_row: 0,
errors_in_a_row: 0,
};
if nodes.len() < 8 {
@ -407,7 +410,7 @@ impl Default for BucketTree {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct RoutingTableNode {
#[serde(serialize_with = "crate::utils::serialize_id20")]
id: Id20,
@ -419,9 +422,33 @@ pub struct RoutingTableNode {
#[serde(skip)]
last_query: Option<Instant>,
#[serde(skip)]
outstanding_queries_in_a_row: usize,
errors_in_a_row: usize,
}
impl Serialize for RoutingTableNode {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_struct("RoutingTableNode", 3)?;
s.serialize_field("id", &self.id.as_string())?;
s.serialize_field("addr", &self.addr)?;
s.serialize_field("status", &self.status())?;
if let Some(l) = self.last_request {
s.serialize_field("last_request_ago", &l.elapsed())?;
}
if let Some(l) = self.last_response {
s.serialize_field("last_response_ago", &l.elapsed())?;
}
if let Some(l) = self.last_query {
s.serialize_field("last_query_ago", &l.elapsed())?;
}
s.serialize_field("errors_in_a_row", &self.errors_in_a_row)?;
s.end()
}
}
#[derive(Serialize, Debug)]
pub enum NodeStatus {
Good,
Questionable,
@ -440,12 +467,7 @@ impl RoutingTableNode {
match (self.last_request, self.last_response, self.last_query) {
(None, _, _) => NodeStatus::Unknown,
// Nodes become bad when they fail to respond to multiple queries in a row.
(Some(last_request), _, _)
if last_request.elapsed() > RESPONSE_TIMEOUT
&& self.outstanding_queries_in_a_row >= 2 =>
{
NodeStatus::Bad
}
(Some(_), _, _) if self.errors_in_a_row >= 2 => NodeStatus::Bad,
// A good node is a node has responded to one of our queries within the last 15 minutes.
// A node is also good if it has ever responded to one of our queries and has sent
@ -468,7 +490,6 @@ impl RoutingTableNode {
pub fn mark_outgoing_request(&mut self) {
self.last_request = Some(Instant::now());
self.outstanding_queries_in_a_row += 1;
}
pub fn mark_last_query(&mut self) {
@ -481,7 +502,11 @@ impl RoutingTableNode {
if self.last_request.is_none() {
self.last_request = Some(now);
}
self.outstanding_queries_in_a_row = 0;
self.errors_in_a_row = 0;
}
pub fn mark_error(&mut self) {
self.errors_in_a_row += 1;
}
}
@ -554,6 +579,15 @@ impl RoutingTable {
true
}
pub fn mark_error(&mut self, id: &Id20) -> bool {
let r = match self.buckets.get_mut(id) {
Some(r) => r,
None => return false,
};
r.mark_error();
true
}
pub fn mark_last_query(&mut self, id: &Id20) -> bool {
let r = match self.buckets.get_mut(id) {
Some(r) => r,