From fee2690aae7bdf11b9f4e3e7a5d6ad249ee0d854 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 30 Nov 2023 13:58:33 +0000 Subject: [PATCH] With pinger its not entirely bad now, but still pretty horrible --- crates/dht/src/dht.rs | 69 +++++++++++++++++++-------------- crates/dht/src/routing_table.rs | 39 +++++-------------- 2 files changed, 49 insertions(+), 59 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 77601d7..ab0f6ce 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -14,7 +14,7 @@ use crate::{ self, CompactNodeInfo, ErrorDescription, FindNodeRequest, GetPeersRequest, Message, MessageKind, Node, PingRequest, Response, }, - routing_table::{InsertResult, RoutingTable}, + routing_table::{InsertResult, NodeStatus, RoutingTable}, INACTIVITY_TIMEOUT, REQUERY_INTERVAL, RESPONSE_TIMEOUT, }; use anyhow::{bail, Context}; @@ -109,12 +109,10 @@ impl RecursiveRequestCallbacks for RecursiveRequestCallbacksGetPeers { struct RecursiveRequestCallbacksFindNodes {} impl RecursiveRequestCallbacks for RecursiveRequestCallbacksFindNodes { fn on_request_start(&self, req: &RecursiveRequest, target_node: Id20, addr: SocketAddr) { - match req.dht.routing_table_add_node(target_node, addr) { + let mut rt = req.dht.routing_table.write(); + match rt.add_node(target_node, addr) { InsertResult::WasExisting | InsertResult::ReplacedBad(_) | InsertResult::Added => { - req.dht - .routing_table - .write() - .mark_outgoing_request(&target_node); + rt.mark_outgoing_request(&target_node); } InsertResult::Ignored => {} } @@ -490,15 +488,12 @@ pub struct DhtState { rate_limiter: RateLimiter, // This is to send raw messages worker_sender: UnboundedSender, - // This is to send pings. - ping_sender: UnboundedSender<(Id20, SocketAddr)>, } impl DhtState { fn new_internal( id: Id20, sender: UnboundedSender, - ping_sender: UnboundedSender<(Id20, SocketAddr)>, routing_table: Option, listen_addr: SocketAddr, ) -> Self { @@ -510,7 +505,6 @@ impl DhtState { routing_table: RwLock::new(routing_table), worker_sender: sender, listen_addr, - ping_sender, rate_limiter: make_rate_limiter(), } } @@ -716,14 +710,6 @@ impl DhtState { routing_table_size: self.routing_table.read().len(), } } - - fn routing_table_add_node(self: &Arc, id: Id20, addr: SocketAddr) -> InsertResult { - let res = self.routing_table.write().add_node(id, addr, |id, addr| { - let _ = self.ping_sender.send((id, addr)); - true - }); - res - } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -813,15 +799,20 @@ impl DhtWorker { let filler = async { let mut interval = tokio::time::interval(INACTIVITY_TIMEOUT); tokio::time::sleep(INACTIVITY_TIMEOUT).await; + let mut iteration = 0; loop { interval.tick().await; + let mut found = 0; for bucket in self.dht.routing_table.read().iter_buckets() { if bucket.leaf.last_refreshed.elapsed() < INACTIVITY_TIMEOUT { continue; } + found += 1; let random_id = bucket.random_within(); tx.send(random_id).unwrap(); } + trace!("iteration {}, refreshing {} buckets", iteration, found); + iteration += 1; } }; @@ -851,15 +842,36 @@ impl DhtWorker { } } - async fn pinger(&self, mut rx: UnboundedReceiver<(Id20, SocketAddr)>) -> anyhow::Result<()> { + async fn pinger(&self) -> anyhow::Result<()> { let mut futs = FuturesUnordered::new(); + let mut interval = tokio::time::interval(INACTIVITY_TIMEOUT / 4); + let (tx, mut rx) = unbounded_channel(); + let looper = async { + let mut iteration = 0; + loop { + interval.tick().await; + let mut found = 0; + for node in self.dht.routing_table.read().iter() { + if matches!( + node.status(), + NodeStatus::Questionable | NodeStatus::Unknown + ) { + found += 1; + tx.send((node.id(), node.addr())).unwrap(); + } + } + trace!("iteration {}, pinging {} nodes", iteration, found); + iteration += 1; + } + }; + + tokio::pin!(looper); + loop { tokio::select! { + _ = &mut looper => {}, r = rx.recv() => { - let (id, addr) = match r { - Some(r) => r, - None => return Ok(()), - }; + let (id, addr) = r.unwrap(); futs.push(async move { self.dht.routing_table.write().mark_outgoing_request(&id); match self.dht.request(Request::Ping, addr).await { @@ -944,7 +956,6 @@ impl DhtWorker { async fn start( self, in_rx: UnboundedReceiver, - ping_rx: UnboundedReceiver<(Id20, SocketAddr)>, bootstrap_addrs: &[String], ) -> anyhow::Result<()> { let (out_tx, mut out_rx) = channel(1); @@ -970,8 +981,10 @@ impl DhtWorker { } .instrument(debug_span!("dht_responese_reader")); - let pinger = self.pinger(ping_rx); - let bucket_refresher = self.bucket_refresher(); + let pinger = self.pinger().instrument(error_span!("pinger")); + let bucket_refresher = self + .bucket_refresher() + .instrument(error_span!("bucket_refresher")); tokio::pin!(framer); tokio::pin!(bootstrap); @@ -1034,11 +1047,9 @@ impl DhtState { .unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect()); let (in_tx, in_rx) = unbounded_channel(); - let (ping_tx, ping_rx) = unbounded_channel(); let state = Arc::new(Self::new_internal( peer_id, in_tx, - ping_tx, config.routing_table, listen_addr, )); @@ -1047,7 +1058,7 @@ impl DhtState { let state = state.clone(); async move { let worker = DhtWorker { socket, dht: state }; - worker.start(in_rx, ping_rx, &bootstrap_addrs).await?; + worker.start(in_rx, &bootstrap_addrs).await?; Ok(()) } }); diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index 1da4808..325d93c 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -348,15 +348,9 @@ impl BucketTree { } } - pub fn add_node( - &mut self, - self_id: &Id20, - id: Id20, - addr: SocketAddr, - on_questionable_node: impl FnMut(Id20, SocketAddr) -> bool, - ) -> InsertResult { + pub fn add_node(&mut self, self_id: &Id20, id: Id20, addr: SocketAddr) -> InsertResult { let idx = self.get_leaf(&id); - self.insert_into_leaf(idx, self_id, id, addr, on_questionable_node) + self.insert_into_leaf(idx, self_id, id, addr) } fn insert_into_leaf( &mut self, @@ -364,7 +358,6 @@ impl BucketTree { self_id: &Id20, id: Id20, addr: SocketAddr, - mut on_questionable_node: impl FnMut(Id20, SocketAddr) -> bool, ) -> InsertResult { // The loop here is for this case: // in case we split a node into two, and it degenerates into all the leaves @@ -399,17 +392,6 @@ impl BucketTree { return InsertResult::Added; } - // Ping first questionable node - if let Some(questionable_node) = nodes - .nodes - .iter_mut() - .find(|r| matches!(r.status(), NodeStatus::Questionable)) - { - if on_questionable_node(questionable_node.id, questionable_node.addr) { - questionable_node.mark_outgoing_request(); - } - } - // Try replace a bad node if let Some(bad_node) = nodes .nodes @@ -633,15 +615,12 @@ impl RoutingTable { self.buckets.iter_leaves() } - pub fn add_node( - &mut self, - id: Id20, - addr: SocketAddr, - on_questionable_node: impl FnMut(Id20, SocketAddr) -> bool, - ) -> InsertResult { - let res = self - .buckets - .add_node(&self.id, id, addr, on_questionable_node); + pub fn iter(&self) -> impl Iterator + '_ { + self.buckets.iter() + } + + pub fn add_node(&mut self, id: Id20, addr: SocketAddr) -> InsertResult { + let res = self.buckets.add_node(&self.id, id, addr); let replaced = match &res { InsertResult::WasExisting => false, InsertResult::ReplacedBad(..) => true, @@ -782,7 +761,7 @@ mod tests { for _ in 0..length.unwrap_or(16536) { let other_id = random_id_20(); let addr = generate_socket_addr(); - rtable.add_node(other_id, addr, |_, _| false); + rtable.add_node(other_id, addr); } rtable }