From 162afe30560f291a9605163ae1aa521477025d4e Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 5 Dec 2023 21:13:31 +0000 Subject: [PATCH] DHT announce compiles --- crates/dht/src/dht.rs | 96 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 84 insertions(+), 12 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 3581bfe..8a8863d 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -1,6 +1,7 @@ use std::{ cmp::Reverse, - net::SocketAddr, + net::{SocketAddr, SocketAddrV4}, + str::FromStr, sync::{ atomic::{AtomicU16, Ordering}, Arc, @@ -11,8 +12,8 @@ use std::{ use crate::{ bprotocol::{ - self, CompactNodeInfo, ErrorDescription, FindNodeRequest, GetPeersRequest, Message, - MessageKind, Node, PingRequest, Response, + self, AnnouncePeer, CompactNodeInfo, ErrorDescription, FindNodeRequest, GetPeersRequest, + Message, MessageKind, Node, PingRequest, Response, }, peer_store::PeerStore, routing_table::{InsertResult, NodeStatus, RoutingTable}, @@ -93,17 +94,52 @@ trait RecursiveRequestCallbacks: Sized + Send + Sync + 'static { ); } -struct RecursiveRequestCallbacksGetPeers {} +struct RecursiveRequestCallbacksGetPeers { + // Id20::from_str("00000fffffffffffffffffffffffffffffffffff").unwrap() + min_distance_to_announce: Id20, +} + impl RecursiveRequestCallbacks for RecursiveRequestCallbacksGetPeers { fn on_request_start(&self, _: &RecursiveRequest, _: Id20, _: SocketAddr) {} fn on_request_end( &self, - _: &RecursiveRequest, - _: Id20, - _: SocketAddr, - _: &anyhow::Result, + req: &RecursiveRequest, + target_node: Id20, + addr: SocketAddr, + resp: &anyhow::Result, ) { + let announce_addr = match req.dht.announce_addr { + Some(a) => a, + None => return, + }; + let resp = match resp { + Ok(ResponseOrError::Response(resp)) => resp, + _ => return, + }; + let token = match &resp.token { + Some(token) => token, + None => return, + }; + if req.info_hash.distance(&target_node) > self.min_distance_to_announce { + trace!( + "not announcing, {:?} is too far from {:?}", + target_node, + req.info_hash + ); + return; + } + let (tid, message) = req.dht.create_request(Request::Announce { + info_hash: req.info_hash, + token: token.clone(), + addr: announce_addr, + }); + + let _ = req.dht.worker_sender.send(WorkerSendRequest { + our_tid: Some(tid), + message, + addr, + }); } } @@ -165,7 +201,12 @@ impl RequestPeersStream { useful_nodes: RwLock::new(Vec::new()), peer_tx, node_tx, - callbacks: RecursiveRequestCallbacksGetPeers {}, + callbacks: RecursiveRequestCallbacksGetPeers { + min_distance_to_announce: Id20::from_str( + "000000ffffffffffffffffffffffffffffffffff", + ) + .unwrap(), + }, }); let join_handle = rp.request_peers_forever(node_rx); Self { @@ -351,7 +392,7 @@ impl RecursiveRequest { self.callbacks.on_request_start(self, id, addr); } - let response = self.dht.request(self.request, addr).await.map(|r| { + let response = self.dht.request(self.request.clone(), addr).await.map(|r| { self.mark_node_responded(addr, &r); r }); @@ -359,7 +400,7 @@ impl RecursiveRequest { self.callbacks.on_request_end(self, id, addr, &response); } - let response = match self.dht.request(self.request, addr).await { + let response = match self.dht.request(self.request.clone(), addr).await { Ok(ResponseOrError::Response(r)) => r, Ok(ResponseOrError::Error(e)) => bail!("error response: {:?}", e), Err(e) => { @@ -493,6 +534,7 @@ pub struct DhtState { worker_sender: UnboundedSender, pub(crate) peer_store: PeerStore, + announce_addr: Option, } impl DhtState { @@ -502,6 +544,7 @@ impl DhtState { routing_table: Option, listen_addr: SocketAddr, peer_store: PeerStore, + announce_addr: Option, ) -> Self { let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None)); Self { @@ -513,6 +556,7 @@ impl DhtState { listen_addr, rate_limiter: make_rate_limiter(), peer_store, + announce_addr, } } @@ -581,6 +625,22 @@ impl DhtState { ip: None, kind: MessageKind::PingRequest(PingRequest { id: self.id }), }, + Request::Announce { + info_hash, + token, + addr, + } => Message { + kind: MessageKind::AnnouncePeer(AnnouncePeer { + id: self.id, + implied_port: 0, + info_hash, + port: addr.port(), + token, + }), + transaction_id: ByteString::from(transaction_id_buf.as_ref()), + version: None, + ip: None, + }, }; (transaction_id, message) } @@ -744,10 +804,15 @@ impl DhtState { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] enum Request { GetPeers(Id20), FindNode(Id20), + Announce { + info_hash: Id20, + token: ByteString, + addr: SocketAddrV4, + }, Ping, } @@ -1095,6 +1160,13 @@ impl DhtState { config.routing_table, listen_addr, config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), + config.announce_addr.and_then(|a| match a { + SocketAddr::V4(v4) => Some(v4), + SocketAddr::V6(_) => { + warn!("libqrqbit-dht doesn't support announcing IPv6 addresses"); + None + } + }), )); spawn(error_span!("dht"), {