diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index ab0f6ce..465555f 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -497,7 +497,7 @@ impl DhtState { routing_table: Option, listen_addr: SocketAddr, ) -> Self { - let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id)); + let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None)); Self { id, next_transaction_id: AtomicU16::new(0), @@ -798,7 +798,7 @@ impl DhtWorker { let mut futs = FuturesUnordered::new(); let filler = async { let mut interval = tokio::time::interval(INACTIVITY_TIMEOUT); - tokio::time::sleep(INACTIVITY_TIMEOUT).await; + interval.tick().await; let mut iteration = 0; loop { interval.tick().await; diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index fe447af..1fe85bc 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -2,11 +2,8 @@ use std::{net::SocketAddr, time::Instant}; use librqbit_core::id20::Id20; use rand::RngCore; -use serde::{ - ser::{SerializeMap, SerializeStruct}, - Deserialize, Serialize, Serializer, -}; -use tracing::debug; +use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; +use tracing::{debug, trace}; use crate::INACTIVITY_TIMEOUT; @@ -72,110 +69,11 @@ struct BucketTreeNode { data: BucketTreeNodeData, } -#[derive(Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct BucketTree { data: Vec, -} - -impl<'de> Deserialize<'de> for BucketTree { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct Visitor; - impl<'de> serde::de::Visitor<'de> for Visitor { - type Value = BucketTree; - - fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "a map with key \"flat\"") - } - - fn visit_map(self, mut map: A) -> Result - where - A: serde::de::MapAccess<'de>, - { - let mut data: Option> = None; - loop { - match map.next_key::()?.as_deref() { - Some("flat") => { - let buckets = map.next_value::>()?; - data = Some(buckets) - } - Some(_) => { - map.next_value::()?; - } - None => { - use serde::de::Error; - match data.take() { - Some(data) => return Ok(BucketTree { data }), - None => return Err(A::Error::missing_field("flat")), - } - } - } - } - } - } - deserializer.deserialize_map(Visitor) - } -} - -impl Serialize for BucketTree { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - struct Node<'a> { - tree: &'a BucketTree, - idx: usize, - } - - impl<'a> Serialize for Node<'a> { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut map = serializer.serialize_map(None)?; - let node = &self.tree.data[self.idx]; - map.serialize_entry("bits", &node.bits)?; - map.serialize_entry("start", &node.start.as_string())?; - map.serialize_entry("end", &node.end_inclusive.as_string())?; - match &node.data { - BucketTreeNodeData::Leaf(nodes) => { - map.serialize_entry("nodes", &nodes)?; - } - BucketTreeNodeData::LeftRight(l, r) => { - map.serialize_entry( - "left", - &(Node { - idx: *l, - tree: self.tree, - }), - )?; - map.serialize_entry( - "right", - &(Node { - idx: *r, - tree: self.tree, - }), - )?; - } - } - map.end() - } - } - - let mut map = serializer.serialize_map(None)?; - map.serialize_entry("nodes_len", &self.data.len())?; - map.serialize_entry("nodes_capacity", &self.data.capacity())?; - map.serialize_entry("node_memory_bytes", &std::mem::size_of::())?; - map.serialize_entry( - "nodes_memory_bytes", - &(std::mem::size_of::() * self.data.capacity()), - )?; - map.serialize_entry("tree", &Node { tree: self, idx: 0 })?; - map.serialize_entry("flat", &self.data)?; - map.end() - } + size: usize, + max_size: usize, } pub struct BucketTreeIteratorItem<'a> { @@ -297,7 +195,7 @@ pub enum InsertResult { } impl BucketTree { - pub fn new() -> Self { + pub fn new(max_size: usize) -> Self { BucketTree { data: vec![BucketTreeNode { bits: 160, @@ -305,6 +203,8 @@ impl BucketTree { end_inclusive: Id20([0xff; 20]), data: BucketTreeNodeData::Leaf(Default::default()), }], + size: 0, + max_size, } } @@ -385,13 +285,6 @@ impl BucketTree { errors_in_a_row: 0, }; - if nodes.nodes.len() < 8 { - nodes.nodes.push(new_node); - nodes.nodes.sort_by_key(|n| n.id); - nodes.last_refreshed = Instant::now(); - return InsertResult::Added; - } - // Try replace a bad node if let Some(bad_node) = nodes .nodes @@ -405,6 +298,23 @@ impl BucketTree { return InsertResult::ReplacedBad(new_node); } + // if max size reached, don't bother + if self.size == self.max_size { + trace!( + "can't add node to routing table, max size of {} reached", + self.max_size + ); + return InsertResult::Ignored; + } + + if nodes.nodes.len() < 8 { + nodes.nodes.push(new_node); + nodes.nodes.sort_by_key(|n| n.id); + nodes.last_refreshed = Instant::now(); + self.size += 1; + return InsertResult::Added; + } + // if our id is not inside, don't bother. if *self_id < leaf.start || *self_id > leaf.end_inclusive { return InsertResult::Ignored; @@ -462,12 +372,6 @@ impl BucketTree { } } -impl Default for BucketTree { - fn default() -> Self { - Self::new() - } -} - #[derive(Debug, Clone, Deserialize)] pub struct RoutingTableNode { #[serde(serialize_with = "crate::utils::serialize_id20")] @@ -580,10 +484,12 @@ pub struct RoutingTable { } impl RoutingTable { - pub fn new(id: Id20) -> Self { + const DEFAULT_MAX_SIZE: usize = 512; + + pub fn new(id: Id20, max_size: Option) -> Self { Self { id, - buckets: BucketTree::new(), + buckets: BucketTree::new(max_size.unwrap_or(Self::DEFAULT_MAX_SIZE)), size: 0, } } @@ -757,7 +663,7 @@ mod tests { fn generate_table(length: Option) -> RoutingTable { let my_id = random_id_20(); - let mut rtable = RoutingTable::new(my_id); + let mut rtable = RoutingTable::new(my_id, None); for _ in 0..length.unwrap_or(16536) { let other_id = random_id_20(); let addr = generate_socket_addr();