Add "last_refreshed" property on buckets
This commit is contained in:
parent
f04277cc11
commit
658bbdb652
2 changed files with 86 additions and 21 deletions
|
|
@ -775,6 +775,10 @@ impl DhtWorker {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn bucket_refresher(&self) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn pinger(&self, mut rx: UnboundedReceiver<(Id20, SocketAddr)>) -> anyhow::Result<()> {
|
||||
let mut futs = FuturesUnordered::new();
|
||||
loop {
|
||||
|
|
|
|||
|
|
@ -3,16 +3,61 @@ use std::{net::SocketAddr, time::Instant};
|
|||
use librqbit_core::id20::Id20;
|
||||
use serde::{
|
||||
ser::{SerializeMap, SerializeStruct},
|
||||
Deserialize, Serialize,
|
||||
Deserialize, Serialize, Serializer,
|
||||
};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::INACTIVITY_TIMEOUT;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct LeafBucket {
|
||||
nodes: Vec<RoutingTableNode>,
|
||||
last_refreshed: Instant,
|
||||
}
|
||||
|
||||
impl Serialize for LeafBucket {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut s = serializer.serialize_struct("LeafBucket", 2)?;
|
||||
s.serialize_field("nodes", &self.nodes)?;
|
||||
s.serialize_field(
|
||||
"last_refreshed",
|
||||
&format!("{:?}", self.last_refreshed.elapsed()),
|
||||
)?;
|
||||
s.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for LeafBucket {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
#[derive(Deserialize)]
|
||||
struct Tmp {
|
||||
nodes: Vec<RoutingTableNode>,
|
||||
}
|
||||
Tmp::deserialize(deserializer).map(|t| Self {
|
||||
nodes: t.nodes,
|
||||
last_refreshed: Instant::now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LeafBucket {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
nodes: Default::default(),
|
||||
last_refreshed: Instant::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
enum BucketTreeNodeData {
|
||||
// TODO: maybe replace that with SmallVec<8>?
|
||||
Leaf(Vec<RoutingTableNode>),
|
||||
Leaf(LeafBucket),
|
||||
LeftRight(usize, usize),
|
||||
}
|
||||
|
||||
|
|
@ -144,7 +189,7 @@ impl<'a> BucketTreeIterator<'a> {
|
|||
let mut current = 0;
|
||||
let current_slice = loop {
|
||||
match &tree.data[current].data {
|
||||
BucketTreeNodeData::Leaf(nodes) => break nodes.iter(),
|
||||
BucketTreeNodeData::Leaf(leaf) => break leaf.nodes.iter(),
|
||||
BucketTreeNodeData::LeftRight(left, right) => {
|
||||
queue.push(*right);
|
||||
current = *left;
|
||||
|
|
@ -170,8 +215,8 @@ impl<'a> Iterator for BucketTreeIterator<'a> {
|
|||
loop {
|
||||
let idx = self.queue.pop()?;
|
||||
match &self.tree.data[idx].data {
|
||||
BucketTreeNodeData::Leaf(nodes) => {
|
||||
self.current = nodes.iter();
|
||||
BucketTreeNodeData::Leaf(leaf) => {
|
||||
self.current = leaf.nodes.iter();
|
||||
match self.current.next() {
|
||||
Some(v) => return Some(v),
|
||||
None => continue,
|
||||
|
|
@ -248,7 +293,7 @@ impl BucketTree {
|
|||
bits: 160,
|
||||
start: Id20([0u8; 20]),
|
||||
end_inclusive: Id20([0xff; 20]),
|
||||
data: BucketTreeNodeData::Leaf(Vec::new()),
|
||||
data: BucketTreeNodeData::Leaf(Default::default()),
|
||||
}],
|
||||
}
|
||||
}
|
||||
|
|
@ -274,10 +319,16 @@ impl BucketTree {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn get_mut(&mut self, id: &Id20) -> Option<&mut RoutingTableNode> {
|
||||
pub fn get_mut(&mut self, id: &Id20, refresh: bool) -> Option<&mut RoutingTableNode> {
|
||||
let idx = self.get_leaf(id);
|
||||
match &mut self.data[idx].data {
|
||||
BucketTreeNodeData::Leaf(nodes) => nodes.iter_mut().find(|b| b.id == *id),
|
||||
BucketTreeNodeData::Leaf(leaf) => {
|
||||
let r = leaf.nodes.iter_mut().find(|b| b.id == *id);
|
||||
if r.is_some() && refresh {
|
||||
leaf.last_refreshed = Instant::now()
|
||||
}
|
||||
r
|
||||
}
|
||||
BucketTreeNodeData::LeftRight(_, _) => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
|
@ -313,7 +364,7 @@ impl BucketTree {
|
|||
BucketTreeNodeData::LeftRight(_, _) => unreachable!(),
|
||||
};
|
||||
// if already found, quit
|
||||
if nodes.iter().any(|r| r.id == id) {
|
||||
if nodes.nodes.iter().any(|r| r.id == id) {
|
||||
return InsertResult::WasExisting;
|
||||
}
|
||||
|
||||
|
|
@ -326,14 +377,16 @@ impl BucketTree {
|
|||
errors_in_a_row: 0,
|
||||
};
|
||||
|
||||
if nodes.len() < 8 {
|
||||
nodes.push(new_node);
|
||||
nodes.sort_by_key(|n| n.id);
|
||||
if nodes.nodes.len() < 8 {
|
||||
nodes.nodes.push(new_node);
|
||||
nodes.nodes.sort_by_key(|n| n.id);
|
||||
nodes.last_refreshed = Instant::now();
|
||||
return InsertResult::Added;
|
||||
}
|
||||
|
||||
// Ping first questionable node
|
||||
if let Some(questionable_node) = nodes
|
||||
.nodes
|
||||
.iter_mut()
|
||||
.find(|r| matches!(r.status(), NodeStatus::Questionable))
|
||||
{
|
||||
|
|
@ -344,12 +397,14 @@ impl BucketTree {
|
|||
|
||||
// Try replace a bad node
|
||||
if let Some(bad_node) = nodes
|
||||
.nodes
|
||||
.iter_mut()
|
||||
.find(|r| matches!(r.status(), NodeStatus::Bad))
|
||||
{
|
||||
std::mem::swap(bad_node, &mut new_node);
|
||||
nodes.sort_by_key(|n| n.id);
|
||||
nodes.nodes.sort_by_key(|n| n.id);
|
||||
debug!("replaced bad node {:?}", new_node);
|
||||
nodes.last_refreshed = Instant::now();
|
||||
return InsertResult::ReplacedBad(new_node);
|
||||
}
|
||||
|
||||
|
|
@ -362,7 +417,7 @@ impl BucketTree {
|
|||
let ((ls, le), (rs, re)) =
|
||||
compute_split_start_end(leaf.start, leaf.end_inclusive, leaf.bits);
|
||||
let (mut ld, mut rd) = (Vec::new(), Vec::new());
|
||||
for node in nodes.drain(0..) {
|
||||
for node in nodes.nodes.drain(0..) {
|
||||
if node.id < rs {
|
||||
ld.push(node);
|
||||
} else {
|
||||
|
|
@ -374,13 +429,19 @@ impl BucketTree {
|
|||
bits: leaf.bits - 1,
|
||||
start: ls,
|
||||
end_inclusive: le,
|
||||
data: BucketTreeNodeData::Leaf(ld),
|
||||
data: BucketTreeNodeData::Leaf(LeafBucket {
|
||||
nodes: ld,
|
||||
..Default::default()
|
||||
}),
|
||||
};
|
||||
let right = BucketTreeNode {
|
||||
bits: leaf.bits - 1,
|
||||
start: rs,
|
||||
end_inclusive: re,
|
||||
data: BucketTreeNodeData::Leaf(rd),
|
||||
data: BucketTreeNodeData::Leaf(LeafBucket {
|
||||
nodes: rd,
|
||||
..Default::default()
|
||||
}),
|
||||
};
|
||||
|
||||
let left_idx = {
|
||||
|
|
@ -562,7 +623,7 @@ impl RoutingTable {
|
|||
res
|
||||
}
|
||||
pub fn mark_outgoing_request(&mut self, id: &Id20) -> bool {
|
||||
let r = match self.buckets.get_mut(id) {
|
||||
let r = match self.buckets.get_mut(id, false) {
|
||||
Some(r) => r,
|
||||
None => return false,
|
||||
};
|
||||
|
|
@ -571,7 +632,7 @@ impl RoutingTable {
|
|||
}
|
||||
|
||||
pub fn mark_response(&mut self, id: &Id20) -> bool {
|
||||
let r = match self.buckets.get_mut(id) {
|
||||
let r = match self.buckets.get_mut(id, true) {
|
||||
Some(r) => r,
|
||||
None => return false,
|
||||
};
|
||||
|
|
@ -580,7 +641,7 @@ impl RoutingTable {
|
|||
}
|
||||
|
||||
pub fn mark_error(&mut self, id: &Id20) -> bool {
|
||||
let r = match self.buckets.get_mut(id) {
|
||||
let r = match self.buckets.get_mut(id, false) {
|
||||
Some(r) => r,
|
||||
None => return false,
|
||||
};
|
||||
|
|
@ -589,7 +650,7 @@ impl RoutingTable {
|
|||
}
|
||||
|
||||
pub fn mark_last_query(&mut self, id: &Id20) -> bool {
|
||||
let r = match self.buckets.get_mut(id) {
|
||||
let r = match self.buckets.get_mut(id, false) {
|
||||
Some(r) => r,
|
||||
None => return false,
|
||||
};
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue