rqbit/crates/dht/src/routing_table.rs

535 lines
15 KiB
Rust
Raw Normal View History

2021-07-12 16:24:26 +01:00
use std::{
net::SocketAddr,
time::{Duration, Instant},
};
2021-07-12 11:56:26 +01:00
use librqbit_core::id20::Id20;
2021-07-12 19:42:48 +01:00
use log::debug;
2021-07-14 13:40:56 +01:00
use serde::Serialize;
2021-07-16 14:49:36 +01:00
use smallvec::SmallVec;
2021-07-12 19:42:48 +01:00
2021-07-14 01:03:39 +01:00
#[derive(Debug, Clone, Serialize)]
2021-07-16 14:49:36 +01:00
enum BucketTreeNodeData {
// TODO: maybe replace that with SmallVec<8>?
Leaf(SmallVec<[RoutingTableNode; 8]>),
LeftRight(usize, usize),
2021-07-12 13:59:58 +01:00
}
2021-07-14 01:03:39 +01:00
#[derive(Debug, Clone, Serialize)]
2021-07-16 14:49:36 +01:00
struct BucketTreeNode {
2021-07-12 13:59:58 +01:00
bits: u8,
2021-07-14 13:40:56 +01:00
#[serde(serialize_with = "crate::utils::serialize_id20")]
2021-07-12 13:59:58 +01:00
start: Id20,
2021-07-14 13:40:56 +01:00
#[serde(serialize_with = "crate::utils::serialize_id20")]
2021-07-12 13:59:58 +01:00
end_inclusive: Id20,
2021-07-16 14:49:36 +01:00
data: BucketTreeNodeData,
}
#[derive(Debug, Clone, Serialize)]
pub struct BucketTree {
data: Vec<BucketTreeNode>,
2021-07-12 13:59:58 +01:00
}
2021-07-16 14:49:36 +01:00
pub struct BucketTreeIterator<'a> {
tree: &'a BucketTree,
2021-07-12 13:59:58 +01:00
current: std::slice::Iter<'a, RoutingTableNode>,
2021-07-16 14:49:36 +01:00
queue: Vec<usize>,
2021-07-12 13:59:58 +01:00
}
2021-07-16 14:49:36 +01:00
impl<'a> BucketTreeIterator<'a> {
fn new(tree: &'a BucketTree) -> Self {
2021-07-12 13:59:58 +01:00
let mut queue = Vec::new();
2021-07-16 14:49:36 +01:00
let mut current = 0;
let current_slice = loop {
match &tree.data[current].data {
BucketTreeNodeData::Leaf(nodes) => break nodes.iter(),
BucketTreeNodeData::LeftRight(left, right) => {
queue.push(*right);
current = *left;
2021-07-12 13:59:58 +01:00
}
}
};
2021-07-16 14:49:36 +01:00
BucketTreeIterator {
tree,
current: current_slice,
queue,
}
2021-07-12 13:59:58 +01:00
}
}
2021-07-16 14:49:36 +01:00
impl<'a> Iterator for BucketTreeIterator<'a> {
2021-07-12 13:59:58 +01:00
type Item = &'a RoutingTableNode;
fn next(&mut self) -> Option<Self::Item> {
if let Some(v) = self.current.next() {
return Some(v);
};
loop {
2021-07-16 14:49:36 +01:00
let idx = self.queue.pop()?;
match &self.tree.data[idx].data {
BucketTreeNodeData::Leaf(nodes) => {
2021-07-12 13:59:58 +01:00
self.current = nodes.iter();
match self.current.next() {
Some(v) => return Some(v),
None => continue,
}
}
2021-07-16 14:49:36 +01:00
BucketTreeNodeData::LeftRight(left, right) => {
self.queue.push(*right);
self.queue.push(*left);
2021-07-12 16:24:26 +01:00
continue;
}
}
}
}
}
2021-07-12 13:59:58 +01:00
fn compute_split_start_end(
start: Id20,
end_inclusive: Id20,
bits: u8,
) -> ((Id20, Id20), (Id20, Id20)) {
let changing_bit = 160 - bits;
let new_left_end = {
let mut c = end_inclusive;
c.set_bit(changing_bit, false);
c
};
let new_right_start = {
let mut c = start;
c.set_bit(changing_bit, true);
c
};
2021-07-12 14:38:55 +01:00
debug_assert!(
start < new_left_end,
"expected start({:?}) < new_left_end({:?}); start={:?}, end={:?}, bits={}",
start,
new_left_end,
start,
end_inclusive,
bits
);
debug_assert!(
new_left_end < new_right_start,
"expected new_left_end({:?}) < new_right_start({:?}); start={:?}, end={:?}, bits={}",
new_left_end,
new_right_start,
start,
end_inclusive,
bits
);
debug_assert!(
new_right_start < end_inclusive,
"expected new_right_start({:?}) < end_inclusive({:?}); start={:?}, end={:?}, bits={}",
new_right_start,
end_inclusive,
start,
end_inclusive,
bits
);
2021-07-12 13:59:58 +01:00
((start, new_left_end), (new_right_start, end_inclusive))
}
2021-07-12 14:38:55 +01:00
#[derive(Debug)]
pub enum InsertResult {
WasExisting,
ReplacedBad(RoutingTableNode),
Added,
Ignored,
}
2021-07-12 13:59:58 +01:00
impl BucketTree {
pub fn new() -> Self {
2021-07-16 14:49:36 +01:00
let mut data = Vec::with_capacity(64);
data.push(BucketTreeNode {
2021-07-12 13:59:58 +01:00
bits: 160,
start: Id20([0u8; 20]),
end_inclusive: Id20([0xff; 20]),
2021-07-16 14:49:36 +01:00
data: BucketTreeNodeData::Leaf(SmallVec::with_capacity(8)),
});
BucketTree { data }
2021-07-12 13:59:58 +01:00
}
2021-07-16 14:49:36 +01:00
pub fn iter(&self) -> BucketTreeIterator<'_> {
BucketTreeIterator::new(self)
2021-07-12 16:24:26 +01:00
}
pub fn get_mut(&mut self, id: &Id20) -> Option<&mut RoutingTableNode> {
2021-07-16 14:49:36 +01:00
let mut idx = 0;
loop {
let node = &self.data[idx];
if !(*id >= node.start && *id <= node.end_inclusive) {
return None;
};
match &node.data {
BucketTreeNodeData::Leaf(_) => {
// re-borrow mutably
if let BucketTreeNodeData::Leaf(nodes) = &mut self.data[idx].data {
return nodes.iter_mut().find(|b| b.id == *id);
}
unreachable!()
}
BucketTreeNodeData::LeftRight(left_idx, right_idx) => {
let left_idx = *left_idx;
let right_idx = *right_idx;
let left = &self.data[left_idx];
if *id >= left.start && *id <= left.end_inclusive {
idx = left_idx;
continue;
};
idx = right_idx;
}
2021-07-12 16:24:26 +01:00
}
}
}
2021-07-12 14:38:55 +01:00
pub fn add_node(&mut self, self_id: &Id20, id: Id20, addr: SocketAddr) -> InsertResult {
2021-07-16 14:49:36 +01:00
let mut current = 0;
2021-07-12 13:59:58 +01:00
loop {
2021-07-16 14:49:36 +01:00
let node = &self.data[current];
debug_assert!(id >= node.start && id <= node.end_inclusive);
match &node.data {
BucketTreeNodeData::Leaf(_) => {
return self.insert_into_leaf(current, self_id, id, addr);
2021-07-12 13:59:58 +01:00
}
2021-07-16 14:49:36 +01:00
BucketTreeNodeData::LeftRight(left_idx, right_idx) => {
let left = &self.data[*left_idx];
if id <= left.end_inclusive {
current = *left_idx;
2021-07-12 13:59:58 +01:00
continue;
}
2021-07-16 14:49:36 +01:00
current = *right_idx;
2021-07-12 13:59:58 +01:00
}
}
}
}
2021-07-16 14:49:36 +01:00
fn insert_into_leaf(
&mut self,
mut idx: usize,
self_id: &Id20,
id: Id20,
addr: SocketAddr,
) -> InsertResult {
loop {
let leaf = &mut self.data[idx];
let nodes = match &mut leaf.data {
BucketTreeNodeData::Leaf(nodes) => nodes,
BucketTreeNodeData::LeftRight(_, _) => unreachable!(),
};
// if already found, quit
if nodes.iter().any(|r| r.id == id) {
return InsertResult::WasExisting;
}
2021-07-12 13:59:58 +01:00
2021-07-16 14:49:36 +01:00
let mut new_node = RoutingTableNode {
id,
addr,
last_request: None,
last_response: None,
outstanding_queries_in_a_row: 0,
};
if nodes.len() < 8 {
nodes.push(new_node);
nodes.sort_by_key(|n| n.id);
return InsertResult::Added;
}
2021-07-12 14:38:55 +01:00
2021-07-16 14:49:36 +01:00
// Try replace a bad node
if let Some(bad_node) = nodes
.iter_mut()
.find(|r| matches!(r.status(), NodeStatus::Bad))
{
std::mem::swap(bad_node, &mut new_node);
nodes.sort_by_key(|n| n.id);
debug!("replaced bad node {:?}", new_node);
return InsertResult::ReplacedBad(new_node);
}
2021-07-12 14:38:55 +01:00
2021-07-16 14:49:36 +01:00
// if our id is not inside, don't bother.
if *self_id < leaf.start || *self_id > leaf.end_inclusive {
return InsertResult::Ignored;
}
2021-07-12 13:59:58 +01:00
2021-07-16 14:49:36 +01:00
// Split
let ((ls, le), (rs, re)) =
compute_split_start_end(leaf.start, leaf.end_inclusive, leaf.bits);
let (mut ld, mut rd) = (SmallVec::with_capacity(8), SmallVec::with_capacity(8));
for node in nodes.drain(0..) {
if node.id < rs {
ld.push(node);
} else {
rd.push(node)
}
}
2021-07-12 14:38:55 +01:00
2021-07-16 14:49:36 +01:00
let left = BucketTreeNode {
bits: leaf.bits - 1,
start: ls,
end_inclusive: le,
data: BucketTreeNodeData::Leaf(ld),
};
let right = BucketTreeNode {
bits: leaf.bits - 1,
start: rs,
end_inclusive: re,
data: BucketTreeNodeData::Leaf(rd),
};
let left_idx = {
let l = self.data.len();
self.data.push(left);
l
};
let right_idx = {
let l = self.data.len();
self.data.push(right);
l
};
self.data[idx].data = BucketTreeNodeData::LeftRight(left_idx, right_idx);
if id < rs {
idx = left_idx
2021-07-12 14:38:55 +01:00
} else {
2021-07-16 14:49:36 +01:00
idx = right_idx
2021-07-12 14:38:55 +01:00
}
2021-07-12 13:59:58 +01:00
}
}
}
impl Default for BucketTree {
fn default() -> Self {
Self::new()
}
}
2021-07-14 01:03:39 +01:00
#[derive(Debug, Clone, Serialize)]
2021-07-12 11:56:26 +01:00
pub struct RoutingTableNode {
2021-07-14 13:40:56 +01:00
#[serde(serialize_with = "crate::utils::serialize_id20")]
2021-07-12 11:56:26 +01:00
id: Id20,
addr: SocketAddr,
2021-07-14 01:03:39 +01:00
#[serde(skip)]
2021-07-12 11:56:26 +01:00
last_request: Option<Instant>,
2021-07-14 01:03:39 +01:00
#[serde(skip)]
2021-07-12 11:56:26 +01:00
last_response: Option<Instant>,
2021-07-14 01:03:39 +01:00
#[serde(skip)]
2021-07-12 11:56:26 +01:00
outstanding_queries_in_a_row: usize,
}
pub enum NodeStatus {
Good,
Questionable,
Bad,
Unknown,
}
impl RoutingTableNode {
pub fn id(&self) -> Id20 {
self.id
}
pub fn addr(&self) -> SocketAddr {
self.addr
}
pub fn status(&self) -> NodeStatus {
// TODO: this is just a stub with simpler logic
let last_request = match self.last_request {
Some(v) => v,
None => return NodeStatus::Unknown,
};
2021-07-12 16:24:26 +01:00
if self.outstanding_queries_in_a_row > 0 && last_request.elapsed() > Duration::from_secs(10)
{
return NodeStatus::Bad;
}
2021-07-12 11:56:26 +01:00
if self.last_response.is_some() {
return NodeStatus::Good;
}
NodeStatus::Questionable
}
2021-07-12 16:24:26 +01:00
pub fn mark_outgoing_request(&mut self) {
self.last_request = Some(Instant::now());
self.outstanding_queries_in_a_row += 1;
}
pub fn mark_response(&mut self) {
2021-07-12 19:42:48 +01:00
let now = Instant::now();
self.last_response = Some(now);
if self.last_request.is_none() {
self.last_request = Some(now);
}
2021-07-12 16:24:26 +01:00
self.outstanding_queries_in_a_row = 0;
}
2021-07-12 11:56:26 +01:00
}
2021-07-14 01:03:39 +01:00
#[derive(Debug, Clone, Serialize)]
2021-07-12 11:56:26 +01:00
pub struct RoutingTable {
2021-07-14 13:40:56 +01:00
#[serde(serialize_with = "crate::utils::serialize_id20")]
2021-07-12 11:56:26 +01:00
id: Id20,
size: usize,
2021-07-12 13:59:58 +01:00
buckets: BucketTree,
2021-07-12 11:56:26 +01:00
}
impl RoutingTable {
pub fn new(id: Id20) -> Self {
Self {
id,
2021-07-12 13:59:58 +01:00
buckets: BucketTree::new(),
2021-07-12 11:56:26 +01:00
size: 0,
}
}
2021-07-14 00:48:53 +01:00
pub fn len(&self) -> usize {
self.size
}
2021-07-12 11:56:26 +01:00
pub fn sorted_by_distance_from(&self, id: Id20) -> Vec<&RoutingTableNode> {
let mut result = Vec::with_capacity(self.size);
2021-07-12 13:59:58 +01:00
for node in self.buckets.iter() {
result.push(node);
2021-07-12 11:56:26 +01:00
}
result.sort_by_key(|n| id.distance(&n.id));
result
}
2021-07-12 16:24:26 +01:00
2021-07-12 14:38:55 +01:00
pub fn add_node(&mut self, id: Id20, addr: SocketAddr) -> InsertResult {
let res = self.buckets.add_node(&self.id, id, addr);
let replaced = match &res {
InsertResult::WasExisting => false,
2021-07-12 19:42:48 +01:00
InsertResult::ReplacedBad(..) => true,
2021-07-12 14:38:55 +01:00
InsertResult::Added => true,
InsertResult::Ignored => false,
};
if replaced {
self.size += 1;
}
res
2021-07-12 13:59:58 +01:00
}
2021-07-12 16:24:26 +01:00
pub fn mark_outgoing_request(&mut self, id: &Id20) -> bool {
let r = match self.buckets.get_mut(id) {
Some(r) => r,
None => return false,
};
r.mark_outgoing_request();
true
}
pub fn mark_response(&mut self, id: &Id20) -> bool {
let r = match self.buckets.get_mut(id) {
Some(r) => r,
None => return false,
};
r.mark_response();
true
}
2021-07-12 13:59:58 +01:00
}
#[cfg(test)]
mod tests {
2021-07-12 14:38:55 +01:00
use std::net::SocketAddrV4;
use librqbit_core::id20::Id20;
2021-07-12 14:38:55 +01:00
use rand::Rng;
use crate::routing_table::compute_split_start_end;
2021-07-12 13:59:58 +01:00
2021-07-12 14:38:55 +01:00
use super::RoutingTable;
2021-07-12 13:59:58 +01:00
#[test]
fn compute_split_start_end_root() {
let start = Id20([0u8; 20]);
2021-07-12 14:38:55 +01:00
let end = Id20([0xff; 20]);
2021-07-12 13:59:58 +01:00
assert_eq!(
compute_split_start_end(start, end, 160),
(
(
start,
Id20([
0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff
])
),
(
Id20([
0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
]),
end
)
)
)
}
2021-07-12 14:38:55 +01:00
#[test]
fn compute_split_start_end_second_split() {
let start = Id20([
0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
]);
let end = Id20([0xff; 20]);
assert_eq!(
compute_split_start_end(start, end, 159),
(
(
start,
Id20([
0xbf, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff
])
),
(
Id20([
0xc0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
]),
end
)
)
)
}
#[test]
fn compute_split_start_end_3() {
let start = Id20([
0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
]);
let end = Id20([0xff; 20]);
assert_eq!(
compute_split_start_end(start, end, 159),
(
(
start,
Id20([
0xbf, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff
])
),
(
Id20([
0xc0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
]),
end
)
)
)
}
fn random_id_20() -> Id20 {
let mut id20 = [0u8; 20];
rand::thread_rng().fill(&mut id20);
Id20(id20)
}
#[test]
fn simulate_tree() {
let my_id = random_id_20();
let mut rtable = RoutingTable::new(my_id);
for i in 0..u16::MAX {
let other_id = random_id_20();
let addr = std::net::SocketAddr::V4(SocketAddrV4::new("0.0.0.0".parse().unwrap(), i));
2021-07-12 14:39:03 +01:00
rtable.add_node(other_id, addr);
2021-07-12 14:38:55 +01:00
}
2021-07-12 16:24:26 +01:00
dbg!(&rtable);
assert_eq!(rtable.sorted_by_distance_from(my_id).len(), rtable.size);
2021-07-12 14:38:55 +01:00
}
2021-07-12 11:56:26 +01:00
}