Bucket refresher. Broken

This commit is contained in:
Igor Katson 2023-11-30 11:38:15 +00:00
parent 658bbdb652
commit c8967f2469
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 125 additions and 48 deletions

View file

@ -15,7 +15,7 @@ use crate::{
MessageKind, Node, PingRequest, Response,
},
routing_table::{InsertResult, RoutingTable},
REQUERY_INTERVAL, RESPONSE_TIMEOUT,
INACTIVITY_TIMEOUT, REQUERY_INTERVAL, RESPONSE_TIMEOUT,
};
use anyhow::{bail, Context};
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
@ -190,10 +190,11 @@ impl Stream for RequestPeersStream {
}
impl RecursiveRequest<RecursiveRequestCallbacksFindNodes> {
async fn bootstrap(dht: Arc<DhtState>, target: Id20, hostname: &str) -> anyhow::Result<()> {
let addrs = tokio::net::lookup_host(hostname)
.await
.with_context(|| format!("error looking up {}", hostname))?;
async fn find_node_for_routing_table(
dht: Arc<DhtState>,
target: Id20,
addrs: impl Iterator<Item = SocketAddr>,
) -> anyhow::Result<()> {
let (node_tx, mut node_rx) = unbounded_channel();
let req = RecursiveRequest {
info_hash: target,
@ -728,9 +729,10 @@ impl DhtWorker {
}
async fn bootstrap_hostname(&self, hostname: &str) -> anyhow::Result<()> {
RecursiveRequest::bootstrap(self.dht.clone(), self.dht.id, hostname)
.instrument(error_span!("bootstrap", hostname = hostname))
let addrs = tokio::net::lookup_host(hostname)
.await
.with_context(|| format!("error looking up {}", hostname))?;
RecursiveRequest::find_node_for_routing_table(self.dht.clone(), self.dht.id, addrs).await
}
async fn bootstrap_hostname_with_backoff(&self, addr: &str) -> anyhow::Result<()> {
@ -742,7 +744,11 @@ impl DhtWorker {
.build();
loop {
let backoff = match self.bootstrap_hostname(addr).await {
let backoff = match self
.bootstrap_hostname(addr)
.instrument(error_span!("bootstrap", hostname = addr))
.await
{
Ok(_) => return Ok(()),
Err(e) => {
warn!("error: {}", e);
@ -776,7 +782,48 @@ impl DhtWorker {
}
async fn bucket_refresher(&self) -> anyhow::Result<()> {
todo!()
let (tx, mut rx) = unbounded_channel();
let mut futs = FuturesUnordered::new();
let filler = async {
let mut interval = tokio::time::interval(INACTIVITY_TIMEOUT);
tokio::time::sleep(INACTIVITY_TIMEOUT).await;
loop {
interval.tick().await;
for bucket in self.dht.routing_table.read().iter_buckets() {
if bucket.leaf.last_refreshed.elapsed() < INACTIVITY_TIMEOUT {
continue;
}
let random_id = bucket.random_within();
tx.send(random_id).unwrap();
}
}
};
tokio::pin!(filler);
loop {
tokio::select! {
_ = &mut filler => {},
random_id = rx.recv() => {
let random_id = random_id.unwrap();
let addrs = self
.dht
.routing_table
.read()
.sorted_by_distance_from(random_id)
.iter()
.map(|n| n.addr())
.take(8).collect::<Vec<_>>();
futs.push(
RecursiveRequest::find_node_for_routing_table(
self.dht.clone(), random_id, addrs.into_iter()
).instrument(error_span!("refresh_bucket", random_id=format!("{:?}", random_id)))
);
},
_ = futs.next(), if !futs.is_empty() => {},
}
}
}
async fn pinger(&self, mut rx: UnboundedReceiver<(Id20, SocketAddr)>) -> anyhow::Result<()> {
@ -902,11 +949,13 @@ impl DhtWorker {
.instrument(debug_span!("dht_responese_reader"));
let pinger = self.pinger(ping_rx);
let bucket_refresher = self.bucket_refresher();
tokio::pin!(framer);
tokio::pin!(bootstrap);
tokio::pin!(response_reader);
tokio::pin!(pinger);
tokio::pin!(bucket_refresher);
loop {
tokio::select! {
@ -920,6 +969,9 @@ impl DhtWorker {
err = &mut pinger => {
anyhow::bail!("pinger quit: {:?}", err)
},
err = &mut bucket_refresher => {
anyhow::bail!("bucket_refresher quit: {:?}", err)
},
err = &mut response_reader => {anyhow::bail!("response reader quit: {:?}", err)}
}
}