From 9ce76d317beb2cd08a028f92041d0d1f343df7de Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 13 Jul 2021 18:27:32 +0100 Subject: [PATCH] Actually send back peers --- crates/dht/src/dht.rs | 87 +++++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 24 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 0636a8e..e9382a7 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -6,7 +6,8 @@ use std::{ use crate::{ bprotocol::{ - self, CompactNodeInfo, FindNodeRequest, GetPeersRequest, Message, MessageKind, Node, + self, CompactNodeInfo, CompactPeerInfo, FindNodeRequest, GetPeersRequest, Message, + MessageKind, Node, }, routing_table::{InsertResult, RoutingTable}, DHT_BOOTSTRAP, @@ -17,6 +18,7 @@ use futures::{stream::FuturesUnordered, Stream, StreamExt, TryStreamExt}; use librqbit_core::{id20::Id20, peer_id::generate_peer_id}; use log::{debug, info, trace, warn}; use parking_lot::Mutex; +use rand::Rng; use tokio::{ net::UdpSocket, sync::mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, @@ -107,6 +109,25 @@ impl DhtState { msg: Message, addr: SocketAddr, ) -> anyhow::Result<()> { + let generate_compact_nodes = |target| { + let nodes = self + .routing_table + .sorted_by_distance_from(target) + .into_iter() + .filter_map(|r| { + Some(Node { + id: r.id(), + addr: match r.addr() { + SocketAddr::V4(v4) => v4, + SocketAddr::V6(_) => return None, + }, + }) + }) + .take(8) + .collect::>(); + CompactNodeInfo { nodes } + }; + match &msg.kind { MessageKind::Error(_) | MessageKind::Response(_) => {} MessageKind::PingRequest(_) => { @@ -125,30 +146,44 @@ impl DhtState { self.sender.send((message, addr))?; return Ok(()); } - MessageKind::FindNodeRequest(_) | MessageKind::GetPeersRequest(_) => { - let target = match &msg.kind { - MessageKind::FindNodeRequest(req) => req.target, - MessageKind::GetPeersRequest(req) => req.info_hash, - _ => unreachable!(), - }; - // we don't track who is downloading a torrent (we don't have a peer store), - // so send nodes all the time. - let nodes = self - .routing_table - .sorted_by_distance_from(target) - .into_iter() - .filter_map(|r| { - Some(Node { - id: r.id(), - addr: match r.addr() { - SocketAddr::V4(v4) => v4, - SocketAddr::V6(_) => return None, - }, + MessageKind::GetPeersRequest(req) => { + let peers = self.seen_peers.get(&req.info_hash).map(|peers| { + peers + .iter() + .copied() + .filter_map(|a| match a { + SocketAddr::V4(v4) => Some(CompactPeerInfo { addr: v4 }), + // this should never happen in practice + SocketAddr::V6(_) => None, }) - }) - .take(8) - .collect::>(); - let compact_node_info = CompactNodeInfo { nodes }; + .take(50) + .collect::>() + }); + let token = if peers.is_some() { + let mut token = [0u8; 20]; + rand::thread_rng().fill(&mut token); + Some(ByteString::from(token.as_ref())) + } else { + None + }; + let compact_node_info = generate_compact_nodes(req.info_hash); + let response = bprotocol::Response { + id: self.id, + nodes: Some(compact_node_info), + values: peers, + token, + }; + let message = Message { + transaction_id: msg.transaction_id, + version: None, + ip: None, + kind: MessageKind::Response(response), + }; + self.sender.send((message, addr))?; + return Ok(()); + } + MessageKind::FindNodeRequest(req) => { + let compact_node_info = generate_compact_nodes(req.target); let response = bprotocol::Response { id: self.id, nodes: Some(compact_node_info), @@ -303,6 +338,10 @@ impl DhtState { self.routing_table.mark_response(&source); if let Some(peers) = data.values { + let seen = self.seen_peers.entry(target).or_default(); + for peer in peers.iter() { + seen.insert(SocketAddr::V4(peer.addr)); + } let bsender = match self.get_peers_subscribers.get(&target) { Some(s) => s, None => {