From f04277cc11cee756d9c7da185bb55df5f58bd9d3 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 30 Nov 2023 09:38:35 +0000 Subject: [PATCH] Make questionable node pings better --- crates/dht/src/dht.rs | 103 +++++++++++++++++++------------- crates/dht/src/routing_table.rs | 10 ++-- 2 files changed, 65 insertions(+), 48 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 183b80a..a0316c4 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -48,6 +48,7 @@ struct OutstandingRequest { } pub struct WorkerSendRequest { + // If this is set, we are tracking the response in inflight_by_transaction_id our_tid: Option, message: Message, addr: SocketAddr, @@ -471,13 +472,17 @@ pub struct DhtState { // Sending requests to the worker. rate_limiter: RateLimiter, - sender: UnboundedSender, + // This is to send raw messages + worker_sender: UnboundedSender, + // This is to send pings. + ping_sender: UnboundedSender<(Id20, SocketAddr)>, } impl DhtState { fn new_internal( id: Id20, sender: UnboundedSender, + ping_sender: UnboundedSender<(Id20, SocketAddr)>, routing_table: Option, listen_addr: SocketAddr, ) -> Self { @@ -487,8 +492,9 @@ impl DhtState { next_transaction_id: AtomicU16::new(0), inflight_by_transaction_id: Default::default(), routing_table: RwLock::new(routing_table), - sender, + worker_sender: sender, listen_addr, + ping_sender, rate_limiter: make_rate_limiter(), } } @@ -500,7 +506,8 @@ impl DhtState { let (tx, rx) = tokio::sync::oneshot::channel(); self.inflight_by_transaction_id .insert(key, OutstandingRequest { done: tx }); - match self.sender.send(WorkerSendRequest { + trace!("sending to {addr}, {message:?}"); + match self.worker_sender.send(WorkerSendRequest { our_tid: Some(tid), message, addr, @@ -594,7 +601,9 @@ impl DhtState { .map(|(_, v)| v) { Some(req) => req, - None => bail!("outstanding request not found. Message: {:?}", msg), + None => { + bail!("outstanding request not found. Message: {:?}", msg) + } }; let response_or_error = match msg.kind { @@ -625,7 +634,7 @@ impl DhtState { }), }; self.routing_table.write().mark_last_query(&req.id); - self.sender.send(WorkerSendRequest { + self.worker_sender.send(WorkerSendRequest { our_tid: None, message, addr, @@ -633,26 +642,7 @@ impl DhtState { Ok(()) } MessageKind::GetPeersRequest(req) => { - // let peers = self.info_hash_meta.get(&req.info_hash).map(|meta| { - // meta.seen_peers - // .iter() - // .copied() - // .filter_map(|a| match a { - // SocketAddr::V4(v4) => Some(CompactPeerInfo { addr: v4 }), - // // this should never happen in practice - // SocketAddr::V6(_) => None, - // }) - // .take(50) - // .collect::>() - // }); - // let token = if peers.is_some() { - // let mut token = [0u8; 20]; - // rand::thread_rng().fill(&mut token); - // Some(ByteString::from(token.as_ref())) - // } else { - // None - // }; - // let compact_node_info = generate_compact_nodes(req.info_hash); + // TODO: respond with peer info, for now sending an empty response. self.routing_table.write().mark_last_query(&req.id); let message = Message { transaction_id: msg.transaction_id, @@ -660,12 +650,10 @@ impl DhtState { ip: None, kind: MessageKind::Response(bprotocol::Response { id: self.id, - nodes: None, - values: None, - token: None, + ..Default::default() }), }; - self.sender.send(WorkerSendRequest { + self.worker_sender.send(WorkerSendRequest { our_tid: None, message, addr, @@ -685,7 +673,7 @@ impl DhtState { ..Default::default() }), }; - self.sender.send(WorkerSendRequest { + self.worker_sender.send(WorkerSendRequest { our_tid: None, message, addr, @@ -704,19 +692,10 @@ impl DhtState { } fn routing_table_add_node(self: &Arc, id: Id20, addr: SocketAddr) -> InsertResult { - let mut questionable_nodes = Vec::new(); - let res = self.routing_table.write().add_node(id, addr, |addr| { - questionable_nodes.push(addr); + let res = self.routing_table.write().add_node(id, addr, |id, addr| { + let _ = self.ping_sender.send((id, addr)); true }); - for addr in questionable_nodes { - let (_, req) = self.create_request(Request::Ping); - let _ = self.sender.send(WorkerSendRequest { - our_tid: None, - message: req, - addr, - }); - } res } } @@ -796,6 +775,33 @@ impl DhtWorker { Ok(()) } + async fn pinger(&self, mut rx: UnboundedReceiver<(Id20, SocketAddr)>) -> anyhow::Result<()> { + let mut futs = FuturesUnordered::new(); + loop { + tokio::select! { + r = rx.recv() => { + let (id, addr) = match r { + Some(r) => r, + None => return Ok(()), + }; + futs.push(async move { + self.dht.routing_table.write().mark_outgoing_request(&id); + match self.dht.request(Request::Ping, addr).await { + Ok(_) => { + self.dht.routing_table.write().mark_response(&id); + }, + Err(e) => { + self.dht.routing_table.write().mark_error(&id); + debug!("error: {e:?}"); + } + } + }.instrument(error_span!("ping", addr=addr.to_string()))) + }, + _ = futs.next() => {}, + } + } + } + async fn framer( &self, socket: &UdpSocket, @@ -810,7 +816,9 @@ impl DhtWorker { addr, }) = input_rx.recv().await { - trace!("{}: sending {:?}", addr, &message); + if our_tid.is_none() { + trace!("{}: sending {:?}", addr, &message); + } buf.clear(); bprotocol::serialize_message( &mut buf, @@ -863,6 +871,7 @@ impl DhtWorker { async fn start( self, in_rx: UnboundedReceiver, + ping_rx: UnboundedReceiver<(Id20, SocketAddr)>, bootstrap_addrs: &[String], ) -> anyhow::Result<()> { let (out_tx, mut out_rx) = channel(1); @@ -888,9 +897,12 @@ impl DhtWorker { } .instrument(debug_span!("dht_responese_reader")); + let pinger = self.pinger(ping_rx); + tokio::pin!(framer); tokio::pin!(bootstrap); tokio::pin!(response_reader); + tokio::pin!(pinger); loop { tokio::select! { @@ -901,6 +913,9 @@ impl DhtWorker { bootstrap_done = true; result?; }, + err = &mut pinger => { + anyhow::bail!("pinger quit: {:?}", err) + }, err = &mut response_reader => {anyhow::bail!("response reader quit: {:?}", err)} } } @@ -941,9 +956,11 @@ impl DhtState { .unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect()); let (in_tx, in_rx) = unbounded_channel(); + let (ping_tx, ping_rx) = unbounded_channel(); let state = Arc::new(Self::new_internal( peer_id, in_tx, + ping_tx, config.routing_table, listen_addr, )); @@ -952,7 +969,7 @@ impl DhtState { let state = state.clone(); async move { let worker = DhtWorker { socket, dht: state }; - worker.start(in_rx, &bootstrap_addrs).await?; + worker.start(in_rx, ping_rx, &bootstrap_addrs).await?; Ok(()) } }); diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index 1e21512..44f2fdd 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -287,7 +287,7 @@ impl BucketTree { self_id: &Id20, id: Id20, addr: SocketAddr, - on_questionable_node: impl FnMut(SocketAddr) -> bool, + on_questionable_node: impl FnMut(Id20, SocketAddr) -> bool, ) -> InsertResult { let idx = self.get_leaf(&id); self.insert_into_leaf(idx, self_id, id, addr, on_questionable_node) @@ -298,7 +298,7 @@ impl BucketTree { self_id: &Id20, id: Id20, addr: SocketAddr, - mut on_questionable_node: impl FnMut(SocketAddr) -> bool, + mut on_questionable_node: impl FnMut(Id20, SocketAddr) -> bool, ) -> InsertResult { // The loop here is for this case: // in case we split a node into two, and it degenerates into all the leaves @@ -337,7 +337,7 @@ impl BucketTree { .iter_mut() .find(|r| matches!(r.status(), NodeStatus::Questionable)) { - if on_questionable_node(questionable_node.addr) { + if on_questionable_node(questionable_node.id, questionable_node.addr) { questionable_node.mark_outgoing_request(); } } @@ -545,7 +545,7 @@ impl RoutingTable { &mut self, id: Id20, addr: SocketAddr, - on_questionable_node: impl FnMut(SocketAddr) -> bool, + on_questionable_node: impl FnMut(Id20, SocketAddr) -> bool, ) -> InsertResult { let res = self .buckets @@ -690,7 +690,7 @@ mod tests { for _ in 0..length.unwrap_or(16536) { let other_id = random_id_20(); let addr = generate_socket_addr(); - rtable.add_node(other_id, addr, |_| false); + rtable.add_node(other_id, addr, |_, _| false); } rtable }