From 658bbdb652b35764bd0ba2706175652a540133b8 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 30 Nov 2023 09:59:13 +0000 Subject: [PATCH] Add "last_refreshed" property on buckets --- crates/dht/src/dht.rs | 4 ++ crates/dht/src/routing_table.rs | 103 +++++++++++++++++++++++++------- 2 files changed, 86 insertions(+), 21 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index a0316c4..5cff57f 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -775,6 +775,10 @@ impl DhtWorker { Ok(()) } + async fn bucket_refresher(&self) -> anyhow::Result<()> { + todo!() + } + async fn pinger(&self, mut rx: UnboundedReceiver<(Id20, SocketAddr)>) -> anyhow::Result<()> { let mut futs = FuturesUnordered::new(); loop { diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index 44f2fdd..db45de5 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -3,16 +3,61 @@ use std::{net::SocketAddr, time::Instant}; use librqbit_core::id20::Id20; use serde::{ ser::{SerializeMap, SerializeStruct}, - Deserialize, Serialize, + Deserialize, Serialize, Serializer, }; use tracing::debug; use crate::INACTIVITY_TIMEOUT; +#[derive(Clone, Debug)] +struct LeafBucket { + nodes: Vec, + last_refreshed: Instant, +} + +impl Serialize for LeafBucket { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut s = serializer.serialize_struct("LeafBucket", 2)?; + s.serialize_field("nodes", &self.nodes)?; + s.serialize_field( + "last_refreshed", + &format!("{:?}", self.last_refreshed.elapsed()), + )?; + s.end() + } +} + +impl<'de> Deserialize<'de> for LeafBucket { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + #[derive(Deserialize)] + struct Tmp { + nodes: Vec, + } + Tmp::deserialize(deserializer).map(|t| Self { + nodes: t.nodes, + last_refreshed: Instant::now(), + }) + } +} + +impl Default for LeafBucket { + fn default() -> Self { + Self { + nodes: Default::default(), + last_refreshed: Instant::now(), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] enum BucketTreeNodeData { - // TODO: maybe replace that with SmallVec<8>? - Leaf(Vec), + Leaf(LeafBucket), LeftRight(usize, usize), } @@ -144,7 +189,7 @@ impl<'a> BucketTreeIterator<'a> { let mut current = 0; let current_slice = loop { match &tree.data[current].data { - BucketTreeNodeData::Leaf(nodes) => break nodes.iter(), + BucketTreeNodeData::Leaf(leaf) => break leaf.nodes.iter(), BucketTreeNodeData::LeftRight(left, right) => { queue.push(*right); current = *left; @@ -170,8 +215,8 @@ impl<'a> Iterator for BucketTreeIterator<'a> { loop { let idx = self.queue.pop()?; match &self.tree.data[idx].data { - BucketTreeNodeData::Leaf(nodes) => { - self.current = nodes.iter(); + BucketTreeNodeData::Leaf(leaf) => { + self.current = leaf.nodes.iter(); match self.current.next() { Some(v) => return Some(v), None => continue, @@ -248,7 +293,7 @@ impl BucketTree { bits: 160, start: Id20([0u8; 20]), end_inclusive: Id20([0xff; 20]), - data: BucketTreeNodeData::Leaf(Vec::new()), + data: BucketTreeNodeData::Leaf(Default::default()), }], } } @@ -274,10 +319,16 @@ impl BucketTree { } } - pub fn get_mut(&mut self, id: &Id20) -> Option<&mut RoutingTableNode> { + pub fn get_mut(&mut self, id: &Id20, refresh: bool) -> Option<&mut RoutingTableNode> { let idx = self.get_leaf(id); match &mut self.data[idx].data { - BucketTreeNodeData::Leaf(nodes) => nodes.iter_mut().find(|b| b.id == *id), + BucketTreeNodeData::Leaf(leaf) => { + let r = leaf.nodes.iter_mut().find(|b| b.id == *id); + if r.is_some() && refresh { + leaf.last_refreshed = Instant::now() + } + r + } BucketTreeNodeData::LeftRight(_, _) => unreachable!(), } } @@ -313,7 +364,7 @@ impl BucketTree { BucketTreeNodeData::LeftRight(_, _) => unreachable!(), }; // if already found, quit - if nodes.iter().any(|r| r.id == id) { + if nodes.nodes.iter().any(|r| r.id == id) { return InsertResult::WasExisting; } @@ -326,14 +377,16 @@ impl BucketTree { errors_in_a_row: 0, }; - if nodes.len() < 8 { - nodes.push(new_node); - nodes.sort_by_key(|n| n.id); + if nodes.nodes.len() < 8 { + nodes.nodes.push(new_node); + nodes.nodes.sort_by_key(|n| n.id); + nodes.last_refreshed = Instant::now(); return InsertResult::Added; } // Ping first questionable node if let Some(questionable_node) = nodes + .nodes .iter_mut() .find(|r| matches!(r.status(), NodeStatus::Questionable)) { @@ -344,12 +397,14 @@ impl BucketTree { // Try replace a bad node if let Some(bad_node) = nodes + .nodes .iter_mut() .find(|r| matches!(r.status(), NodeStatus::Bad)) { std::mem::swap(bad_node, &mut new_node); - nodes.sort_by_key(|n| n.id); + nodes.nodes.sort_by_key(|n| n.id); debug!("replaced bad node {:?}", new_node); + nodes.last_refreshed = Instant::now(); return InsertResult::ReplacedBad(new_node); } @@ -362,7 +417,7 @@ impl BucketTree { let ((ls, le), (rs, re)) = compute_split_start_end(leaf.start, leaf.end_inclusive, leaf.bits); let (mut ld, mut rd) = (Vec::new(), Vec::new()); - for node in nodes.drain(0..) { + for node in nodes.nodes.drain(0..) { if node.id < rs { ld.push(node); } else { @@ -374,13 +429,19 @@ impl BucketTree { bits: leaf.bits - 1, start: ls, end_inclusive: le, - data: BucketTreeNodeData::Leaf(ld), + data: BucketTreeNodeData::Leaf(LeafBucket { + nodes: ld, + ..Default::default() + }), }; let right = BucketTreeNode { bits: leaf.bits - 1, start: rs, end_inclusive: re, - data: BucketTreeNodeData::Leaf(rd), + data: BucketTreeNodeData::Leaf(LeafBucket { + nodes: rd, + ..Default::default() + }), }; let left_idx = { @@ -562,7 +623,7 @@ impl RoutingTable { res } pub fn mark_outgoing_request(&mut self, id: &Id20) -> bool { - let r = match self.buckets.get_mut(id) { + let r = match self.buckets.get_mut(id, false) { Some(r) => r, None => return false, }; @@ -571,7 +632,7 @@ impl RoutingTable { } pub fn mark_response(&mut self, id: &Id20) -> bool { - let r = match self.buckets.get_mut(id) { + let r = match self.buckets.get_mut(id, true) { Some(r) => r, None => return false, }; @@ -580,7 +641,7 @@ impl RoutingTable { } pub fn mark_error(&mut self, id: &Id20) -> bool { - let r = match self.buckets.get_mut(id) { + let r = match self.buckets.get_mut(id, false) { Some(r) => r, None => return false, }; @@ -589,7 +650,7 @@ impl RoutingTable { } pub fn mark_last_query(&mut self, id: &Id20) -> bool { - let r = match self.buckets.get_mut(id) { + let r = match self.buckets.get_mut(id, false) { Some(r) => r, None => return false, };