Fix a very rare bug where DHT paniced

This commit is contained in:
Igor Katson 2025-06-06 14:51:33 +01:00
parent 559fca8552
commit 963f8167de
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 14 additions and 30 deletions

View file

@ -971,9 +971,10 @@ impl DhtWorker {
loop { loop {
interval.tick().await; interval.tick().await;
let mut found = 0; let mut found = 0;
let now = Instant::now();
for node in self.dht.routing_table.read().iter() { for node in self.dht.routing_table.read().iter() {
if matches!( if matches!(
node.status(), node.status(now),
NodeStatus::Questionable | NodeStatus::Unknown NodeStatus::Questionable | NodeStatus::Unknown
) { ) {
found += 1; found += 1;

View file

@ -286,10 +286,11 @@ impl BucketTree {
}; };
// Try replace a bad node // Try replace a bad node
let now = Instant::now();
if let Some(bad_node) = nodes if let Some(bad_node) = nodes
.nodes .nodes
.iter_mut() .iter_mut()
.find(|r| matches!(r.status(), NodeStatus::Bad)) .find(|r| matches!(r.status(now), NodeStatus::Bad))
{ {
std::mem::swap(bad_node, &mut new_node); std::mem::swap(bad_node, &mut new_node);
nodes.nodes.sort_by_key(|n| n.id); nodes.nodes.sort_by_key(|n| n.id);
@ -395,7 +396,7 @@ impl Serialize for RoutingTableNode {
let mut s = serializer.serialize_struct("RoutingTableNode", 3)?; let mut s = serializer.serialize_struct("RoutingTableNode", 3)?;
s.serialize_field("id", &self.id.as_string())?; s.serialize_field("id", &self.id.as_string())?;
s.serialize_field("addr", &self.addr)?; s.serialize_field("addr", &self.addr)?;
s.serialize_field("status", &self.status())?; s.serialize_field("status", &self.status(Instant::now()))?;
if let Some(l) = self.last_request { if let Some(l) = self.last_request {
s.serialize_field("last_request_ago", &l.elapsed())?; s.serialize_field("last_request_ago", &l.elapsed())?;
} }
@ -425,7 +426,7 @@ impl RoutingTableNode {
pub fn addr(&self) -> SocketAddr { pub fn addr(&self) -> SocketAddr {
self.addr self.addr
} }
pub fn status(&self) -> NodeStatus { pub fn status(&self, now: Instant) -> NodeStatus {
match (self.last_request, self.last_response, self.last_query) { match (self.last_request, self.last_response, self.last_query) {
// Nodes become bad when they fail to respond to multiple queries in a row. // Nodes become bad when they fail to respond to multiple queries in a row.
(Some(_), _, _) if self.errors_in_a_row >= 2 => NodeStatus::Bad, (Some(_), _, _) if self.errors_in_a_row >= 2 => NodeStatus::Bad,
@ -434,7 +435,7 @@ impl RoutingTableNode {
// A node is also good if it has ever responded to one of our queries and has sent // A node is also good if it has ever responded to one of our queries and has sent
// us a query within the last 15 minutes. // us a query within the last 15 minutes.
(Some(_), Some(last_incoming), _) | (Some(_), Some(_), Some(last_incoming)) (Some(_), Some(last_incoming), _) | (Some(_), Some(_), Some(last_incoming))
if last_incoming.elapsed() < INACTIVITY_TIMEOUT => if now - last_incoming < INACTIVITY_TIMEOUT =>
{ {
NodeStatus::Good NodeStatus::Good
} }
@ -442,9 +443,9 @@ impl RoutingTableNode {
// After 15 minutes of inactivity, a node becomes questionable. // After 15 minutes of inactivity, a node becomes questionable.
// The moment we send a request to it, it stops becoming questionable and becomes Unknown / Bad. // The moment we send a request to it, it stops becoming questionable and becomes Unknown / Bad.
(last_outgoing, _, Some(last_incoming)) | (last_outgoing, Some(last_incoming), _) (last_outgoing, _, Some(last_incoming)) | (last_outgoing, Some(last_incoming), _)
if last_incoming.elapsed() > INACTIVITY_TIMEOUT if now - last_incoming > INACTIVITY_TIMEOUT
&& last_outgoing && last_outgoing
.map(|e| e.elapsed() > INACTIVITY_TIMEOUT) .map(|e| now - e > INACTIVITY_TIMEOUT)
.unwrap_or(true) => .unwrap_or(true) =>
{ {
NodeStatus::Questionable NodeStatus::Questionable
@ -504,11 +505,12 @@ impl RoutingTable {
for node in self.buckets.iter() { for node in self.buckets.iter() {
result.push(node); result.push(node);
} }
let now = Instant::now();
result.sort_by_key(|n| { result.sort_by_key(|n| {
// Query decent nodes first. // Query decent nodes first.
let status = match n.status() { let status = match n.status(now) {
NodeStatus::Good => 0, NodeStatus::Good => 0,
NodeStatus::Questionable => 0, NodeStatus::Questionable => 1,
NodeStatus::Unknown => 2, NodeStatus::Unknown => 2,
NodeStatus::Bad => 3, NodeStatus::Bad => 3,
}; };

View file

@ -1,8 +1,8 @@
use data_encoding::BASE32; use data_encoding::BASE32;
use serde::{Deserialize, Deserializer, Serialize}; use serde::{Deserialize, Deserializer, Serialize};
use std::{cmp::Ordering, str::FromStr}; use std::str::FromStr;
#[derive(Clone, Copy, PartialEq, Eq, Hash)] #[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Id<const N: usize>(pub [u8; N]); pub struct Id<const N: usize>(pub [u8; N]);
impl<const N: usize> Id<N> { impl<const N: usize> Id<N> {
@ -166,25 +166,6 @@ impl<'de, const N: usize> Deserialize<'de> for Id<N> {
} }
} }
impl<const N: usize> PartialOrd<Id<N>> for Id<N> {
fn partial_cmp(&self, other: &Id<N>) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<const N: usize> Ord for Id<N> {
fn cmp(&self, other: &Id<N>) -> Ordering {
for (s, o) in self.0.iter().copied().zip(other.0.iter().copied()) {
match s.cmp(&o) {
Ordering::Less => return Ordering::Less,
Ordering::Equal => continue,
Ordering::Greater => return Ordering::Greater,
}
}
Ordering::Equal
}
}
/// A 20-byte hash used throughout librqbit, for torrent info hashes, peer ids etc. /// A 20-byte hash used throughout librqbit, for torrent info hashes, peer ids etc.
pub type Id20 = Id<20>; pub type Id20 = Id<20>;
/// A 32-byte hash used in Bittorrent V2, for torrent info hashes, piece hashing, etc. /// A 32-byte hash used in Bittorrent V2, for torrent info hashes, piece hashing, etc.