DHT instrumentation

This commit is contained in:
Igor Katson 2023-11-25 15:15:16 +00:00
parent 6f113c5137
commit d8fdb94305
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
11 changed files with 76 additions and 58 deletions

View file

@ -18,7 +18,7 @@ use bencode::ByteString;
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use indexmap::IndexSet;
use leaky_bucket::RateLimiter;
use librqbit_core::{id20::Id20, peer_id::generate_peer_id};
use librqbit_core::{id20::Id20, peer_id::generate_peer_id, spawn_utils::spawn};
use parking_lot::RwLock;
use rand::Rng;
use serde::Serialize;
@ -27,7 +27,7 @@ use tokio::{
sync::mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tracing::{debug, info, trace, warn};
use tracing::{debug, debug_span, error_span, info, trace, warn, Instrument};
#[derive(Debug, Serialize)]
pub struct DhtStats {
@ -513,7 +513,7 @@ impl DhtWorker {
bootstrap_addrs: &[String],
) -> anyhow::Result<()> {
let (out_tx, mut out_rx) = channel(1);
let framer = run_framer(&self.socket, in_rx, out_tx);
let framer = run_framer(&self.socket, in_rx, out_tx).instrument(debug_span!("dht_framer"));
let bootstrap = async {
let mut futs = FuturesUnordered::new();
@ -521,34 +521,40 @@ impl DhtWorker {
for addr in bootstrap_addrs.iter() {
let this = &self;
let in_tx = &in_tx;
futs.push(async move {
match tokio::net::lookup_host(addr).await {
Ok(addrs) => {
for addr in addrs {
let request = this
.state
.write()
.create_request(Request::FindNode(this.peer_id), addr);
in_tx.send((request, addr))?;
futs.push(
async move {
match tokio::net::lookup_host(addr).await {
Ok(addrs) => {
for addr in addrs {
let request = this
.state
.write()
.create_request(Request::FindNode(this.peer_id), addr);
in_tx.send((request, addr))?;
}
}
Err(e) => {
warn!("error looking up {}: {}", addr, e);
return Err(e.into());
}
}
Err(e) => warn!("error looking up {}: {}", addr, e),
Ok::<_, anyhow::Error>(())
}
Ok::<_, anyhow::Error>(())
});
.instrument(error_span!("dht_bootstrap", addr = addr)),
);
}
let mut successes = 0;
while let Some(resp) = futs.next().await {
match resp {
Ok(_) => successes += 1,
Err(e) => warn!("error in one of the bootstrappers: {}", e),
if resp.is_ok() {
successes += 1
}
}
if successes == 0 {
anyhow::bail!("bootstrapping did not succeed")
}
Ok(())
};
}
.instrument(debug_span!("dht_bootstrapper"));
let mut bootstrap_done = false;
let response_reader = {
@ -563,7 +569,8 @@ impl DhtWorker {
"closed response reader, nowhere to send results to, DHT closed"
))
}
};
}
.instrument(debug_span!("dht_responese_reader"));
tokio::pin!(framer);
tokio::pin!(bootstrap);
@ -676,7 +683,7 @@ impl Dht {
listen_addr,
)));
tokio::spawn({
spawn(error_span!("dht"), {
let state = state.clone();
async move {
let worker = DhtWorker {
@ -684,8 +691,8 @@ impl Dht {
peer_id,
state,
};
let result = worker.start(in_tx, in_rx, &bootstrap_addrs).await;
warn!("DHT worker finished with {:?}", result);
worker.start(in_tx, in_rx, &bootstrap_addrs).await?;
Ok(())
}
});
Ok(Dht { state })

View file

@ -1,5 +1,6 @@
// TODO: this now stores only the routing table, but we also need AT LEAST the same socket address...
use librqbit_core::spawn_utils::spawn;
use serde::{Deserialize, Serialize};
use std::fs::OpenOptions;
use std::io::{BufReader, BufWriter};
@ -8,8 +9,7 @@ use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::Context;
use tokio::spawn;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, error_span, info, trace, warn};
use crate::dht::{Dht, DhtConfig};
use crate::routing_table::RoutingTable;
@ -110,7 +110,7 @@ impl PersistentDht {
};
let dht = Dht::with_config(dht_config).await?;
spawn({
spawn(error_span!("dht_persistence"), {
let dht = dht.clone();
let dump_interval = config
.dump_interval