With pinger its not entirely bad now, but still pretty horrible

This commit is contained in:
Igor Katson 2023-11-30 13:58:33 +00:00
parent 8d58a9f419
commit fee2690aae
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 49 additions and 59 deletions

View file

@ -14,7 +14,7 @@ use crate::{
self, CompactNodeInfo, ErrorDescription, FindNodeRequest, GetPeersRequest, Message,
MessageKind, Node, PingRequest, Response,
},
routing_table::{InsertResult, RoutingTable},
routing_table::{InsertResult, NodeStatus, RoutingTable},
INACTIVITY_TIMEOUT, REQUERY_INTERVAL, RESPONSE_TIMEOUT,
};
use anyhow::{bail, Context};
@ -109,12 +109,10 @@ impl RecursiveRequestCallbacks for RecursiveRequestCallbacksGetPeers {
struct RecursiveRequestCallbacksFindNodes {}
impl RecursiveRequestCallbacks for RecursiveRequestCallbacksFindNodes {
fn on_request_start(&self, req: &RecursiveRequest<Self>, target_node: Id20, addr: SocketAddr) {
match req.dht.routing_table_add_node(target_node, addr) {
let mut rt = req.dht.routing_table.write();
match rt.add_node(target_node, addr) {
InsertResult::WasExisting | InsertResult::ReplacedBad(_) | InsertResult::Added => {
req.dht
.routing_table
.write()
.mark_outgoing_request(&target_node);
rt.mark_outgoing_request(&target_node);
}
InsertResult::Ignored => {}
}
@ -490,15 +488,12 @@ pub struct DhtState {
rate_limiter: RateLimiter,
// This is to send raw messages
worker_sender: UnboundedSender<WorkerSendRequest>,
// This is to send pings.
ping_sender: UnboundedSender<(Id20, SocketAddr)>,
}
impl DhtState {
fn new_internal(
id: Id20,
sender: UnboundedSender<WorkerSendRequest>,
ping_sender: UnboundedSender<(Id20, SocketAddr)>,
routing_table: Option<RoutingTable>,
listen_addr: SocketAddr,
) -> Self {
@ -510,7 +505,6 @@ impl DhtState {
routing_table: RwLock::new(routing_table),
worker_sender: sender,
listen_addr,
ping_sender,
rate_limiter: make_rate_limiter(),
}
}
@ -716,14 +710,6 @@ impl DhtState {
routing_table_size: self.routing_table.read().len(),
}
}
fn routing_table_add_node(self: &Arc<Self>, id: Id20, addr: SocketAddr) -> InsertResult {
let res = self.routing_table.write().add_node(id, addr, |id, addr| {
let _ = self.ping_sender.send((id, addr));
true
});
res
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@ -813,15 +799,20 @@ impl DhtWorker {
let filler = async {
let mut interval = tokio::time::interval(INACTIVITY_TIMEOUT);
tokio::time::sleep(INACTIVITY_TIMEOUT).await;
let mut iteration = 0;
loop {
interval.tick().await;
let mut found = 0;
for bucket in self.dht.routing_table.read().iter_buckets() {
if bucket.leaf.last_refreshed.elapsed() < INACTIVITY_TIMEOUT {
continue;
}
found += 1;
let random_id = bucket.random_within();
tx.send(random_id).unwrap();
}
trace!("iteration {}, refreshing {} buckets", iteration, found);
iteration += 1;
}
};
@ -851,15 +842,36 @@ impl DhtWorker {
}
}
async fn pinger(&self, mut rx: UnboundedReceiver<(Id20, SocketAddr)>) -> anyhow::Result<()> {
async fn pinger(&self) -> anyhow::Result<()> {
let mut futs = FuturesUnordered::new();
let mut interval = tokio::time::interval(INACTIVITY_TIMEOUT / 4);
let (tx, mut rx) = unbounded_channel();
let looper = async {
let mut iteration = 0;
loop {
interval.tick().await;
let mut found = 0;
for node in self.dht.routing_table.read().iter() {
if matches!(
node.status(),
NodeStatus::Questionable | NodeStatus::Unknown
) {
found += 1;
tx.send((node.id(), node.addr())).unwrap();
}
}
trace!("iteration {}, pinging {} nodes", iteration, found);
iteration += 1;
}
};
tokio::pin!(looper);
loop {
tokio::select! {
_ = &mut looper => {},
r = rx.recv() => {
let (id, addr) = match r {
Some(r) => r,
None => return Ok(()),
};
let (id, addr) = r.unwrap();
futs.push(async move {
self.dht.routing_table.write().mark_outgoing_request(&id);
match self.dht.request(Request::Ping, addr).await {
@ -944,7 +956,6 @@ impl DhtWorker {
async fn start(
self,
in_rx: UnboundedReceiver<WorkerSendRequest>,
ping_rx: UnboundedReceiver<(Id20, SocketAddr)>,
bootstrap_addrs: &[String],
) -> anyhow::Result<()> {
let (out_tx, mut out_rx) = channel(1);
@ -970,8 +981,10 @@ impl DhtWorker {
}
.instrument(debug_span!("dht_responese_reader"));
let pinger = self.pinger(ping_rx);
let bucket_refresher = self.bucket_refresher();
let pinger = self.pinger().instrument(error_span!("pinger"));
let bucket_refresher = self
.bucket_refresher()
.instrument(error_span!("bucket_refresher"));
tokio::pin!(framer);
tokio::pin!(bootstrap);
@ -1034,11 +1047,9 @@ impl DhtState {
.unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect());
let (in_tx, in_rx) = unbounded_channel();
let (ping_tx, ping_rx) = unbounded_channel();
let state = Arc::new(Self::new_internal(
peer_id,
in_tx,
ping_tx,
config.routing_table,
listen_addr,
));
@ -1047,7 +1058,7 @@ impl DhtState {
let state = state.clone();
async move {
let worker = DhtWorker { socket, dht: state };
worker.start(in_rx, ping_rx, &bootstrap_addrs).await?;
worker.start(in_rx, &bootstrap_addrs).await?;
Ok(())
}
});