DHT stats

This commit is contained in:
Igor Katson 2021-07-14 00:48:53 +01:00
parent 4e31eb6547
commit fe4dcb226f
4 changed files with 140 additions and 101 deletions

View file

@ -290,7 +290,7 @@ pub struct FindNodeRequest {
pub target: Id20,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct Response<BufT> {
pub id: Id20,
#[serde(skip_serializing_if = "Option::is_none")]

View file

@ -19,30 +19,26 @@ use librqbit_core::{id20::Id20, peer_id::generate_peer_id};
use log::{debug, info, trace, warn};
use parking_lot::Mutex;
use rand::Rng;
use serde::Serialize;
use tokio::{
net::UdpSocket,
sync::mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
};
use tokio_stream::wrappers::BroadcastStream;
struct OutstandingRequest {
transaction_id: u16,
addr: SocketAddr,
request: Request,
#[derive(Debug, Serialize)]
pub struct DhtStats {
pub id: Id20,
pub outstanding_requests: usize,
pub seen_peers: usize,
pub made_requests: usize,
pub routing_table_size: usize,
}
// TODO:
// - searching for peers - make it a set
// - peers - convert to broadcast
// - return a DHT handle.
// - flatten abstractions
// - framer is fine (I guess)
// - DhtHandle - straight out do things
struct DhtState {
id: Id20,
next_transaction_id: u16,
outstanding_requests: Vec<OutstandingRequest>,
outstanding_requests: HashMap<(u16, SocketAddr), Request>,
routing_table: RoutingTable,
// This sender sends requests to the worker.
@ -63,7 +59,7 @@ impl DhtState {
Self {
id,
next_transaction_id: 0,
outstanding_requests: Vec::new(),
outstanding_requests: Default::default(),
routing_table: RoutingTable::new(id),
sender,
seen_peers: Default::default(),
@ -96,12 +92,8 @@ impl DhtState {
}),
},
};
self.outstanding_requests.push(OutstandingRequest {
transaction_id,
addr,
request,
// time: Instant::now(),
});
self.outstanding_requests
.insert((transaction_id, addr), request);
message
}
fn on_incoming_from_remote(
@ -138,28 +130,19 @@ impl DhtState {
)
}
let tid = ((msg.transaction_id[0] as u16) << 8) + (msg.transaction_id[1] as u16);
// O(n) but whatever
let outstanding_id = self
.outstanding_requests
.iter()
.position(|req| req.transaction_id == tid && req.addr == addr)
.ok_or_else(|| {
anyhow::anyhow!("outstanding request not found. Message: {:?}", msg)
})?;
let outstanding = self.outstanding_requests.remove(outstanding_id);
let request = match self.outstanding_requests.remove(&(tid, addr)) {
Some(req) => req,
None => anyhow::bail!("outstanding request not found. Message: {:?}", msg),
};
let response = match msg.kind {
MessageKind::Error(e) => {
anyhow::bail!(
"request {:?} received error response {:?}",
outstanding.request,
e
)
anyhow::bail!("request {:?} received error response {:?}", request, e)
}
MessageKind::Response(r) => r,
_ => unreachable!(),
};
self.routing_table.mark_response(&response.id);
match outstanding.request {
match request {
Request::FindNode(id) => {
let nodes = response.nodes.ok_or_else(|| {
anyhow::anyhow!("expected nodes for find_node requests")
@ -172,17 +155,14 @@ impl DhtState {
}
}
MessageKind::PingRequest(_) => {
let response = bprotocol::Response {
id: self.id,
nodes: None,
values: None,
token: None,
};
let message = Message {
transaction_id: msg.transaction_id,
version: None,
ip: None,
kind: MessageKind::Response(response),
kind: MessageKind::Response(bprotocol::Response {
id: self.id,
..Default::default()
}),
};
self.sender.send((message, addr))?;
Ok(())
@ -208,34 +188,31 @@ impl DhtState {
None
};
let compact_node_info = generate_compact_nodes(req.info_hash);
let response = bprotocol::Response {
id: self.id,
nodes: Some(compact_node_info),
values: peers,
token,
};
let message = Message {
transaction_id: msg.transaction_id,
version: None,
ip: None,
kind: MessageKind::Response(response),
kind: MessageKind::Response(bprotocol::Response {
id: self.id,
nodes: Some(compact_node_info),
values: peers,
token,
}),
};
self.sender.send((message, addr))?;
Ok(())
}
MessageKind::FindNodeRequest(req) => {
let compact_node_info = generate_compact_nodes(req.target);
let response = bprotocol::Response {
id: self.id,
nodes: Some(compact_node_info),
values: None,
token: None,
};
let message = Message {
transaction_id: msg.transaction_id,
version: None,
ip: None,
kind: MessageKind::Response(response),
kind: MessageKind::Response(bprotocol::Response {
id: self.id,
nodes: Some(compact_node_info),
..Default::default()
}),
};
self.sender.send((message, addr))?;
Ok(())
@ -243,6 +220,20 @@ impl DhtState {
}
}
pub fn get_stats(&self) -> DhtStats {
DhtStats {
id: self.id,
outstanding_requests: self.outstanding_requests.len(),
seen_peers: self.seen_peers.values().map(|v| v.len()).sum(),
made_requests: self.made_requests.len(),
routing_table_size: self.routing_table.len(),
}
}
pub fn get_routing_table(&self) -> &RoutingTable {
&self.routing_table
}
pub fn get_peers(
&mut self,
info_hash: Id20,
@ -261,24 +252,21 @@ impl DhtState {
Ok((existing_peers, rx))
}
Entry::Vacant(v) => {
let (tx, rx) = tokio::sync::broadcast::channel(100);
// DHT sends peers REALLY fast, so the consumer of this broadcast should not lag behind.
// That's why capacity is so high.
let (tx, rx) = tokio::sync::broadcast::channel(1000);
v.insert(tx);
let mut addrs = Vec::new();
for node in self
// We don't need to allocate/collect here, but the borrow checker is not happy otherwise.
let nodes_to_query = self
.routing_table
.sorted_by_distance_from_mut(info_hash)
.into_iter()
.sorted_by_distance_from(info_hash)
.iter()
.map(|n| (n.id(), n.addr()))
.take(8)
{
node.mark_outgoing_request();
addrs.push(node.addr());
}
for addr in addrs {
let request = self.create_request(Request::GetPeers(info_hash), addr);
self.sender
.send((request, addr))
.context("DhtState: error sending to self.sender")?;
.collect::<Vec<_>>();
for (id, addr) in nodes_to_query {
self.send_find_peers_if_not_yet(info_hash, id, addr)?;
}
Ok((Vec::new(), rx))
@ -286,11 +274,41 @@ impl DhtState {
}
}
fn send_find_peers_if_not_yet(
&mut self,
info_hash: Id20,
target_node: Id20,
addr: SocketAddr,
) -> anyhow::Result<()> {
let request = Request::GetPeers(info_hash);
if self.made_requests.insert((request, addr)) {
self.routing_table.mark_outgoing_request(&target_node);
let msg = self.create_request(request, addr);
self.sender.send((msg, addr))?;
}
Ok(())
}
fn send_find_node_if_not_yet(
&mut self,
search_id: Id20,
target_node: Id20,
addr: SocketAddr,
) -> anyhow::Result<()> {
let request = Request::FindNode(search_id);
if self.made_requests.insert((request, addr)) {
self.routing_table.mark_outgoing_request(&target_node);
let msg = self.create_request(request, addr);
self.sender.send((msg, addr))?;
}
Ok(())
}
fn on_found_nodes(
&mut self,
source: Id20,
source_addr: SocketAddr,
_target: Id20,
target: Id20,
nodes: CompactNodeInfo,
) -> anyhow::Result<()> {
// We don't need to allocate/collect here, but the borrow checker is not happy
@ -301,15 +319,11 @@ impl DhtState {
.copied()
.collect::<Vec<_>>();
// On newly discovered nodes, ask them for peers that we are interested in.
match self.routing_table.add_node(source, source_addr) {
InsertResult::ReplacedBad(_) | InsertResult::Added => {
for info_hash in &searching_for_peers {
let request = Request::GetPeers(*info_hash);
if self.made_requests.insert((request, source_addr)) {
self.routing_table.mark_outgoing_request(&source);
let msg = self.create_request(request, source_addr);
self.sender.send((msg, source_addr))?;
}
self.send_find_peers_if_not_yet(*info_hash, source, source_addr)?;
}
}
_ => {}
@ -318,13 +332,10 @@ impl DhtState {
match self.routing_table.add_node(node.id, node.addr.into()) {
InsertResult::ReplacedBad(_) | InsertResult::Added => {
for info_hash in &searching_for_peers {
let request = Request::GetPeers(*info_hash);
if self.made_requests.insert((request, node.addr.into())) {
let msg = self.create_request(request, node.addr.into());
self.routing_table.mark_outgoing_request(&node.id);
self.sender.send((msg, node.addr.into()))?
}
self.send_find_peers_if_not_yet(*info_hash, node.id, node.addr.into())?;
}
// recursively find nodes closest to us until we can't find more.
self.send_find_node_if_not_yet(target, source, source_addr)?;
}
_ => {}
};
@ -366,12 +377,7 @@ impl DhtState {
if let Some(nodes) = data.nodes {
for node in nodes.nodes {
self.routing_table.add_node(node.id, node.addr.into());
let request = Request::GetPeers(target);
if self.made_requests.insert((request, node.addr.into())) {
let msg = self.create_request(Request::GetPeers(target), node.addr.into());
self.routing_table.mark_outgoing_request(&node.id);
self.sender.send((msg, node.addr.into()))?
}
self.send_find_peers_if_not_yet(target, node.id, node.addr.into())?;
}
};
Ok(())
@ -579,4 +585,16 @@ impl Dht {
let rx = futures::stream::iter(initial_peers).map(Ok).chain(rx);
Ok(rx)
}
pub fn stats(&self) -> DhtStats {
self.state.lock().get_stats()
}
pub fn with_routing_table<R, F: FnOnce(&RoutingTable) -> R>(&self, f: F) -> R {
f(&self.state.lock().routing_table)
}
pub fn clone_routing_table(&self) -> RoutingTable {
self.state.lock().routing_table.clone()
}
}

View file

@ -1,7 +1,8 @@
use std::{collections::HashSet, str::FromStr};
use std::{collections::HashSet, str::FromStr, time::Duration};
use anyhow::Context;
use dht::{Dht, Id20};
use log::info;
use tokio_stream::StreamExt;
#[tokio::main]
@ -12,11 +13,28 @@ async fn main() -> anyhow::Result<()> {
let dht = Dht::new().await.context("error initializing DHT")?;
let mut stream = dht.get_peers(info_hash).await?;
let mut seen = HashSet::new();
while let Some(peer) = stream.next().await {
let peer = peer.context("error reading peer stream")?;
if seen.insert(peer) {
log::info!("peer found: {}", peer)
let stats_printer = async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
info!("DHT stats: {:?}", dht.stats());
}
}
Ok(())
Ok::<_, anyhow::Error>(())
};
let peer_printer = async move {
while let Some(peer) = stream.next().await {
let peer = peer.context("error reading peer stream")?;
if seen.insert(peer) {
log::info!("peer found: {}", peer)
}
}
Ok(())
};
let res = tokio::select! {
res = stats_printer => res,
res = peer_printer => res,
};
res
}

View file

@ -6,13 +6,13 @@ use std::{
use librqbit_core::id20::Id20;
use log::debug;
#[derive(Debug)]
#[derive(Debug, Clone)]
enum BucketTreeNode {
Leaf(Vec<RoutingTableNode>),
LeftRight(Box<BucketTree>, Box<BucketTree>),
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BucketTree {
bits: u8,
start: Id20,
@ -302,7 +302,7 @@ impl Default for BucketTree {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RoutingTableNode {
id: Id20,
addr: SocketAddr,
@ -356,7 +356,7 @@ impl RoutingTableNode {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RoutingTable {
id: Id20,
size: usize,
@ -371,6 +371,9 @@ impl RoutingTable {
size: 0,
}
}
pub fn len(&self) -> usize {
self.size
}
pub fn sorted_by_distance_from(&self, id: Id20) -> Vec<&RoutingTableNode> {
let mut result = Vec::with_capacity(self.size);
for node in self.buckets.iter() {