From fe4dcb226f386a62c2f51d5e0bd5e807501a0935 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 14 Jul 2021 00:48:53 +0100 Subject: [PATCH] DHT stats --- crates/dht/src/bprotocol.rs | 2 +- crates/dht/src/dht.rs | 196 +++++++++++++++++--------------- crates/dht/src/main.rs | 32 ++++-- crates/dht/src/routing_table.rs | 11 +- 4 files changed, 140 insertions(+), 101 deletions(-) diff --git a/crates/dht/src/bprotocol.rs b/crates/dht/src/bprotocol.rs index 236b30a..2c90aab 100644 --- a/crates/dht/src/bprotocol.rs +++ b/crates/dht/src/bprotocol.rs @@ -290,7 +290,7 @@ pub struct FindNodeRequest { pub target: Id20, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Default)] pub struct Response { pub id: Id20, #[serde(skip_serializing_if = "Option::is_none")] diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 935097c..89cbf40 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -19,30 +19,26 @@ use librqbit_core::{id20::Id20, peer_id::generate_peer_id}; use log::{debug, info, trace, warn}; use parking_lot::Mutex; use rand::Rng; +use serde::Serialize; use tokio::{ net::UdpSocket, sync::mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, }; use tokio_stream::wrappers::BroadcastStream; -struct OutstandingRequest { - transaction_id: u16, - addr: SocketAddr, - request: Request, +#[derive(Debug, Serialize)] +pub struct DhtStats { + pub id: Id20, + pub outstanding_requests: usize, + pub seen_peers: usize, + pub made_requests: usize, + pub routing_table_size: usize, } -// TODO: -// - searching for peers - make it a set -// - peers - convert to broadcast -// - return a DHT handle. -// - flatten abstractions -// - framer is fine (I guess) -// - DhtHandle - straight out do things - struct DhtState { id: Id20, next_transaction_id: u16, - outstanding_requests: Vec, + outstanding_requests: HashMap<(u16, SocketAddr), Request>, routing_table: RoutingTable, // This sender sends requests to the worker. @@ -63,7 +59,7 @@ impl DhtState { Self { id, next_transaction_id: 0, - outstanding_requests: Vec::new(), + outstanding_requests: Default::default(), routing_table: RoutingTable::new(id), sender, seen_peers: Default::default(), @@ -96,12 +92,8 @@ impl DhtState { }), }, }; - self.outstanding_requests.push(OutstandingRequest { - transaction_id, - addr, - request, - // time: Instant::now(), - }); + self.outstanding_requests + .insert((transaction_id, addr), request); message } fn on_incoming_from_remote( @@ -138,28 +130,19 @@ impl DhtState { ) } let tid = ((msg.transaction_id[0] as u16) << 8) + (msg.transaction_id[1] as u16); - // O(n) but whatever - let outstanding_id = self - .outstanding_requests - .iter() - .position(|req| req.transaction_id == tid && req.addr == addr) - .ok_or_else(|| { - anyhow::anyhow!("outstanding request not found. Message: {:?}", msg) - })?; - let outstanding = self.outstanding_requests.remove(outstanding_id); + let request = match self.outstanding_requests.remove(&(tid, addr)) { + Some(req) => req, + None => anyhow::bail!("outstanding request not found. Message: {:?}", msg), + }; let response = match msg.kind { MessageKind::Error(e) => { - anyhow::bail!( - "request {:?} received error response {:?}", - outstanding.request, - e - ) + anyhow::bail!("request {:?} received error response {:?}", request, e) } MessageKind::Response(r) => r, _ => unreachable!(), }; self.routing_table.mark_response(&response.id); - match outstanding.request { + match request { Request::FindNode(id) => { let nodes = response.nodes.ok_or_else(|| { anyhow::anyhow!("expected nodes for find_node requests") @@ -172,17 +155,14 @@ impl DhtState { } } MessageKind::PingRequest(_) => { - let response = bprotocol::Response { - id: self.id, - nodes: None, - values: None, - token: None, - }; let message = Message { transaction_id: msg.transaction_id, version: None, ip: None, - kind: MessageKind::Response(response), + kind: MessageKind::Response(bprotocol::Response { + id: self.id, + ..Default::default() + }), }; self.sender.send((message, addr))?; Ok(()) @@ -208,34 +188,31 @@ impl DhtState { None }; let compact_node_info = generate_compact_nodes(req.info_hash); - let response = bprotocol::Response { - id: self.id, - nodes: Some(compact_node_info), - values: peers, - token, - }; let message = Message { transaction_id: msg.transaction_id, version: None, ip: None, - kind: MessageKind::Response(response), + kind: MessageKind::Response(bprotocol::Response { + id: self.id, + nodes: Some(compact_node_info), + values: peers, + token, + }), }; self.sender.send((message, addr))?; Ok(()) } MessageKind::FindNodeRequest(req) => { let compact_node_info = generate_compact_nodes(req.target); - let response = bprotocol::Response { - id: self.id, - nodes: Some(compact_node_info), - values: None, - token: None, - }; let message = Message { transaction_id: msg.transaction_id, version: None, ip: None, - kind: MessageKind::Response(response), + kind: MessageKind::Response(bprotocol::Response { + id: self.id, + nodes: Some(compact_node_info), + ..Default::default() + }), }; self.sender.send((message, addr))?; Ok(()) @@ -243,6 +220,20 @@ impl DhtState { } } + pub fn get_stats(&self) -> DhtStats { + DhtStats { + id: self.id, + outstanding_requests: self.outstanding_requests.len(), + seen_peers: self.seen_peers.values().map(|v| v.len()).sum(), + made_requests: self.made_requests.len(), + routing_table_size: self.routing_table.len(), + } + } + + pub fn get_routing_table(&self) -> &RoutingTable { + &self.routing_table + } + pub fn get_peers( &mut self, info_hash: Id20, @@ -261,24 +252,21 @@ impl DhtState { Ok((existing_peers, rx)) } Entry::Vacant(v) => { - let (tx, rx) = tokio::sync::broadcast::channel(100); + // DHT sends peers REALLY fast, so the consumer of this broadcast should not lag behind. + // That's why capacity is so high. + let (tx, rx) = tokio::sync::broadcast::channel(1000); v.insert(tx); - let mut addrs = Vec::new(); - for node in self + // We don't need to allocate/collect here, but the borrow checker is not happy otherwise. + let nodes_to_query = self .routing_table - .sorted_by_distance_from_mut(info_hash) - .into_iter() + .sorted_by_distance_from(info_hash) + .iter() + .map(|n| (n.id(), n.addr())) .take(8) - { - node.mark_outgoing_request(); - addrs.push(node.addr()); - } - for addr in addrs { - let request = self.create_request(Request::GetPeers(info_hash), addr); - self.sender - .send((request, addr)) - .context("DhtState: error sending to self.sender")?; + .collect::>(); + for (id, addr) in nodes_to_query { + self.send_find_peers_if_not_yet(info_hash, id, addr)?; } Ok((Vec::new(), rx)) @@ -286,11 +274,41 @@ impl DhtState { } } + fn send_find_peers_if_not_yet( + &mut self, + info_hash: Id20, + target_node: Id20, + addr: SocketAddr, + ) -> anyhow::Result<()> { + let request = Request::GetPeers(info_hash); + if self.made_requests.insert((request, addr)) { + self.routing_table.mark_outgoing_request(&target_node); + let msg = self.create_request(request, addr); + self.sender.send((msg, addr))?; + } + Ok(()) + } + + fn send_find_node_if_not_yet( + &mut self, + search_id: Id20, + target_node: Id20, + addr: SocketAddr, + ) -> anyhow::Result<()> { + let request = Request::FindNode(search_id); + if self.made_requests.insert((request, addr)) { + self.routing_table.mark_outgoing_request(&target_node); + let msg = self.create_request(request, addr); + self.sender.send((msg, addr))?; + } + Ok(()) + } + fn on_found_nodes( &mut self, source: Id20, source_addr: SocketAddr, - _target: Id20, + target: Id20, nodes: CompactNodeInfo, ) -> anyhow::Result<()> { // We don't need to allocate/collect here, but the borrow checker is not happy @@ -301,15 +319,11 @@ impl DhtState { .copied() .collect::>(); + // On newly discovered nodes, ask them for peers that we are interested in. match self.routing_table.add_node(source, source_addr) { InsertResult::ReplacedBad(_) | InsertResult::Added => { for info_hash in &searching_for_peers { - let request = Request::GetPeers(*info_hash); - if self.made_requests.insert((request, source_addr)) { - self.routing_table.mark_outgoing_request(&source); - let msg = self.create_request(request, source_addr); - self.sender.send((msg, source_addr))?; - } + self.send_find_peers_if_not_yet(*info_hash, source, source_addr)?; } } _ => {} @@ -318,13 +332,10 @@ impl DhtState { match self.routing_table.add_node(node.id, node.addr.into()) { InsertResult::ReplacedBad(_) | InsertResult::Added => { for info_hash in &searching_for_peers { - let request = Request::GetPeers(*info_hash); - if self.made_requests.insert((request, node.addr.into())) { - let msg = self.create_request(request, node.addr.into()); - self.routing_table.mark_outgoing_request(&node.id); - self.sender.send((msg, node.addr.into()))? - } + self.send_find_peers_if_not_yet(*info_hash, node.id, node.addr.into())?; } + // recursively find nodes closest to us until we can't find more. + self.send_find_node_if_not_yet(target, source, source_addr)?; } _ => {} }; @@ -366,12 +377,7 @@ impl DhtState { if let Some(nodes) = data.nodes { for node in nodes.nodes { self.routing_table.add_node(node.id, node.addr.into()); - let request = Request::GetPeers(target); - if self.made_requests.insert((request, node.addr.into())) { - let msg = self.create_request(Request::GetPeers(target), node.addr.into()); - self.routing_table.mark_outgoing_request(&node.id); - self.sender.send((msg, node.addr.into()))? - } + self.send_find_peers_if_not_yet(target, node.id, node.addr.into())?; } }; Ok(()) @@ -579,4 +585,16 @@ impl Dht { let rx = futures::stream::iter(initial_peers).map(Ok).chain(rx); Ok(rx) } + + pub fn stats(&self) -> DhtStats { + self.state.lock().get_stats() + } + + pub fn with_routing_table R>(&self, f: F) -> R { + f(&self.state.lock().routing_table) + } + + pub fn clone_routing_table(&self) -> RoutingTable { + self.state.lock().routing_table.clone() + } } diff --git a/crates/dht/src/main.rs b/crates/dht/src/main.rs index 7240a35..36dbaa4 100644 --- a/crates/dht/src/main.rs +++ b/crates/dht/src/main.rs @@ -1,7 +1,8 @@ -use std::{collections::HashSet, str::FromStr}; +use std::{collections::HashSet, str::FromStr, time::Duration}; use anyhow::Context; use dht::{Dht, Id20}; +use log::info; use tokio_stream::StreamExt; #[tokio::main] @@ -12,11 +13,28 @@ async fn main() -> anyhow::Result<()> { let dht = Dht::new().await.context("error initializing DHT")?; let mut stream = dht.get_peers(info_hash).await?; let mut seen = HashSet::new(); - while let Some(peer) = stream.next().await { - let peer = peer.context("error reading peer stream")?; - if seen.insert(peer) { - log::info!("peer found: {}", peer) + + let stats_printer = async move { + loop { + tokio::time::sleep(Duration::from_secs(5)).await; + info!("DHT stats: {:?}", dht.stats()); } - } - Ok(()) + Ok::<_, anyhow::Error>(()) + }; + + let peer_printer = async move { + while let Some(peer) = stream.next().await { + let peer = peer.context("error reading peer stream")?; + if seen.insert(peer) { + log::info!("peer found: {}", peer) + } + } + Ok(()) + }; + + let res = tokio::select! { + res = stats_printer => res, + res = peer_printer => res, + }; + res } diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index cdeaf33..863be2a 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -6,13 +6,13 @@ use std::{ use librqbit_core::id20::Id20; use log::debug; -#[derive(Debug)] +#[derive(Debug, Clone)] enum BucketTreeNode { Leaf(Vec), LeftRight(Box, Box), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BucketTree { bits: u8, start: Id20, @@ -302,7 +302,7 @@ impl Default for BucketTree { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RoutingTableNode { id: Id20, addr: SocketAddr, @@ -356,7 +356,7 @@ impl RoutingTableNode { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RoutingTable { id: Id20, size: usize, @@ -371,6 +371,9 @@ impl RoutingTable { size: 0, } } + pub fn len(&self) -> usize { + self.size + } pub fn sorted_by_distance_from(&self, id: Id20) -> Vec<&RoutingTableNode> { let mut result = Vec::with_capacity(self.size); for node in self.buckets.iter() {