Add max size to routing table

This commit is contained in:
Igor Katson 2023-11-30 15:35:08 +00:00
parent 16a4d22b6b
commit 4af26ae246
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 32 additions and 126 deletions

View file

@ -497,7 +497,7 @@ impl DhtState {
routing_table: Option<RoutingTable>,
listen_addr: SocketAddr,
) -> Self {
let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id));
let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None));
Self {
id,
next_transaction_id: AtomicU16::new(0),
@ -798,7 +798,7 @@ impl DhtWorker {
let mut futs = FuturesUnordered::new();
let filler = async {
let mut interval = tokio::time::interval(INACTIVITY_TIMEOUT);
tokio::time::sleep(INACTIVITY_TIMEOUT).await;
interval.tick().await;
let mut iteration = 0;
loop {
interval.tick().await;

View file

@ -2,11 +2,8 @@ use std::{net::SocketAddr, time::Instant};
use librqbit_core::id20::Id20;
use rand::RngCore;
use serde::{
ser::{SerializeMap, SerializeStruct},
Deserialize, Serialize, Serializer,
};
use tracing::debug;
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
use tracing::{debug, trace};
use crate::INACTIVITY_TIMEOUT;
@ -72,110 +69,11 @@ struct BucketTreeNode {
data: BucketTreeNodeData,
}
#[derive(Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BucketTree {
data: Vec<BucketTreeNode>,
}
impl<'de> Deserialize<'de> for BucketTree {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct Visitor;
impl<'de> serde::de::Visitor<'de> for Visitor {
type Value = BucketTree;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "a map with key \"flat\"")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
let mut data: Option<Vec<BucketTreeNode>> = None;
loop {
match map.next_key::<String>()?.as_deref() {
Some("flat") => {
let buckets = map.next_value::<Vec<BucketTreeNode>>()?;
data = Some(buckets)
}
Some(_) => {
map.next_value::<serde::de::IgnoredAny>()?;
}
None => {
use serde::de::Error;
match data.take() {
Some(data) => return Ok(BucketTree { data }),
None => return Err(A::Error::missing_field("flat")),
}
}
}
}
}
}
deserializer.deserialize_map(Visitor)
}
}
impl Serialize for BucketTree {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
struct Node<'a> {
tree: &'a BucketTree,
idx: usize,
}
impl<'a> Serialize for Node<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serializer.serialize_map(None)?;
let node = &self.tree.data[self.idx];
map.serialize_entry("bits", &node.bits)?;
map.serialize_entry("start", &node.start.as_string())?;
map.serialize_entry("end", &node.end_inclusive.as_string())?;
match &node.data {
BucketTreeNodeData::Leaf(nodes) => {
map.serialize_entry("nodes", &nodes)?;
}
BucketTreeNodeData::LeftRight(l, r) => {
map.serialize_entry(
"left",
&(Node {
idx: *l,
tree: self.tree,
}),
)?;
map.serialize_entry(
"right",
&(Node {
idx: *r,
tree: self.tree,
}),
)?;
}
}
map.end()
}
}
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("nodes_len", &self.data.len())?;
map.serialize_entry("nodes_capacity", &self.data.capacity())?;
map.serialize_entry("node_memory_bytes", &std::mem::size_of::<BucketTreeNode>())?;
map.serialize_entry(
"nodes_memory_bytes",
&(std::mem::size_of::<BucketTreeNode>() * self.data.capacity()),
)?;
map.serialize_entry("tree", &Node { tree: self, idx: 0 })?;
map.serialize_entry("flat", &self.data)?;
map.end()
}
size: usize,
max_size: usize,
}
pub struct BucketTreeIteratorItem<'a> {
@ -297,7 +195,7 @@ pub enum InsertResult {
}
impl BucketTree {
pub fn new() -> Self {
pub fn new(max_size: usize) -> Self {
BucketTree {
data: vec![BucketTreeNode {
bits: 160,
@ -305,6 +203,8 @@ impl BucketTree {
end_inclusive: Id20([0xff; 20]),
data: BucketTreeNodeData::Leaf(Default::default()),
}],
size: 0,
max_size,
}
}
@ -385,13 +285,6 @@ impl BucketTree {
errors_in_a_row: 0,
};
if nodes.nodes.len() < 8 {
nodes.nodes.push(new_node);
nodes.nodes.sort_by_key(|n| n.id);
nodes.last_refreshed = Instant::now();
return InsertResult::Added;
}
// Try replace a bad node
if let Some(bad_node) = nodes
.nodes
@ -405,6 +298,23 @@ impl BucketTree {
return InsertResult::ReplacedBad(new_node);
}
// if max size reached, don't bother
if self.size == self.max_size {
trace!(
"can't add node to routing table, max size of {} reached",
self.max_size
);
return InsertResult::Ignored;
}
if nodes.nodes.len() < 8 {
nodes.nodes.push(new_node);
nodes.nodes.sort_by_key(|n| n.id);
nodes.last_refreshed = Instant::now();
self.size += 1;
return InsertResult::Added;
}
// if our id is not inside, don't bother.
if *self_id < leaf.start || *self_id > leaf.end_inclusive {
return InsertResult::Ignored;
@ -462,12 +372,6 @@ impl BucketTree {
}
}
impl Default for BucketTree {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct RoutingTableNode {
#[serde(serialize_with = "crate::utils::serialize_id20")]
@ -580,10 +484,12 @@ pub struct RoutingTable {
}
impl RoutingTable {
pub fn new(id: Id20) -> Self {
const DEFAULT_MAX_SIZE: usize = 512;
pub fn new(id: Id20, max_size: Option<usize>) -> Self {
Self {
id,
buckets: BucketTree::new(),
buckets: BucketTree::new(max_size.unwrap_or(Self::DEFAULT_MAX_SIZE)),
size: 0,
}
}
@ -757,7 +663,7 @@ mod tests {
fn generate_table(length: Option<usize>) -> RoutingTable {
let my_id = random_id_20();
let mut rtable = RoutingTable::new(my_id);
let mut rtable = RoutingTable::new(my_id, None);
for _ in 0..length.unwrap_or(16536) {
let other_id = random_id_20();
let addr = generate_socket_addr();