diff --git a/TODO.md b/TODO.md index fed1d9b..0f929a8 100644 --- a/TODO.md +++ b/TODO.md @@ -18,7 +18,11 @@ - [x] many nodes in "Unknown" status, do smth about it - [x] for torrents with a few seeds might be cool to re-query DHT once in a while. - [x] don't leak memory when deleting torrents (i.e. remove torrent information (seen peers etc) once the torrent is deleted) - - [ ] Buckets that have not been changed in 15 minutes should be "refreshed." (per RFC) + - [ ] Routing table - is it balanced properly? + - [ ] Don't query Bad nodes + - [-] Buckets that have not been changed in 15 minutes should be "refreshed." (per RFC) + - [ ] Did it, but it's flawed: starts repeating the same queries again as neighboring refreshes + don't know about the other ones, and DHT returns the same nodes again and again. - [x] it's sending many requests now way too fast, locks up Mac OS UI annoyingly - [ ] After the search is exhausted, the client then inserts the peer contact information for itself onto the responding nodes with IDs closest to the infohash of the torrent. - [x] Ensure that if we query the "returned" nodes, they are even closer to our request than the responding node id was. diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 51fa54c..77601d7 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -137,12 +137,14 @@ impl RecursiveRequestCallbacks for RecursiveRequestCallbacksFindNodes { } struct RecursiveRequest { + max_depth: usize, + useful_nodes_limit: usize, info_hash: Id20, request: Request, dht: Arc, useful_nodes: RwLock>, peer_tx: tokio::sync::mpsc::UnboundedSender, - node_tx: tokio::sync::mpsc::UnboundedSender<(Option, SocketAddr)>, + node_tx: tokio::sync::mpsc::UnboundedSender<(Option, SocketAddr, usize)>, callbacks: C, } @@ -156,7 +158,9 @@ impl RequestPeersStream { let (peer_tx, peer_rx) = unbounded_channel(); let (node_tx, node_rx) = unbounded_channel(); let rp = Arc::new(RecursiveRequest { + max_depth: 4, info_hash, + useful_nodes_limit: 256, request: Request::GetPeers(info_hash), dht, useful_nodes: RwLock::new(Vec::new()), @@ -197,17 +201,19 @@ impl RecursiveRequest { ) -> anyhow::Result<()> { let (node_tx, mut node_rx) = unbounded_channel(); let req = RecursiveRequest { + max_depth: 4, info_hash: target, request: Request::FindNode(target), dht, + useful_nodes_limit: 32, useful_nodes: RwLock::new(Vec::new()), peer_tx: unbounded_channel().0, node_tx, callbacks: RecursiveRequestCallbacksFindNodes {}, }; - let request_one = |id, addr| { - req.request_one(id, addr) + let request_one = |id, addr, depth| { + req.request_one(id, addr, depth) .map_err(|e| { debug!("error: {e:?}"); e @@ -223,7 +229,7 @@ impl RecursiveRequest { let mut initial_addrs = 0; for addr in addrs { - futs.push(request_one(None, addr)); + futs.push(request_one(None, addr, 0)); initial_addrs += 1; } @@ -235,8 +241,8 @@ impl RecursiveRequest { biased; r = node_rx.recv() => { - let (id, addr) = r.unwrap(); - futs.push(request_one(id, addr)) + let (id, addr, depth) = r.unwrap(); + futs.push(request_one(id, addr, depth)) }, f = futs.next() => { let f = match f { @@ -267,7 +273,7 @@ impl RecursiveRequest { impl RecursiveRequest { fn request_peers_forever( self: &Arc, - mut node_rx: tokio::sync::mpsc::UnboundedReceiver<(Option, SocketAddr)>, + mut node_rx: tokio::sync::mpsc::UnboundedReceiver<(Option, SocketAddr, usize)>, ) -> tokio::task::JoinHandle<()> { let this = self.clone(); spawn( @@ -300,9 +306,9 @@ impl RecursiveRequest { loop { tokio::select! { addr = node_rx.recv() => { - let (id, addr) = addr.unwrap(); + let (id, addr, depth) = addr.unwrap(); futs.push( - this.request_one(id, addr) + this.request_one(id, addr, depth) .map_err(|e| debug!("error: {e:?}")) .instrument(error_span!("addr", addr=addr.to_string())) ); @@ -327,14 +333,19 @@ impl RecursiveRequest { .take(8) { count += 1; - self.node_tx.send((Some(id), addr))?; + self.node_tx.send((Some(id), addr, 0))?; } Ok(count) } } impl RecursiveRequest { - async fn request_one(&self, id: Option, addr: SocketAddr) -> anyhow::Result<()> { + async fn request_one( + &self, + id: Option, + addr: SocketAddr, + depth: usize, + ) -> anyhow::Result<()> { if let Some(id) = id { self.callbacks.on_request_start(self, id, addr); } @@ -365,15 +376,17 @@ impl RecursiveRequest { if let Some(nodes) = response.nodes { for node in nodes.nodes { let addr = SocketAddr::V4(node.addr); - let should_request = self.should_request_node(node.id, addr); + let should_request = self.should_request_node(node.id, addr, depth); trace!( - "should_request={}, id={:?}, addr={}", + "should_request={}, id={:?}, addr={}, depth={}/{}", should_request, node.id, - addr + addr, + depth, + self.max_depth ); if should_request { - self.node_tx.send((Some(node.id), addr))?; + self.node_tx.send((Some(node.id), addr, depth + 1))?; } } } @@ -412,7 +425,11 @@ impl RecursiveRequest { .is_some() } - fn should_request_node(&self, node_id: Id20, addr: SocketAddr) -> bool { + fn should_request_node(&self, node_id: Id20, addr: SocketAddr, depth: usize) -> bool { + if depth >= self.max_depth { + return false; + } + let mut closest_nodes = self.useful_nodes.write(); // If recently requested, ignore @@ -433,7 +450,6 @@ impl RecursiveRequest { errors_in_a_row: 0, }); - const LIMIT: usize = 256; closest_nodes.sort_by_key(|n| { let has_returned_peers_desc = Reverse(n.returned_peers); let has_responded_desc = Reverse(n.last_response.is_some() as u8); @@ -449,7 +465,7 @@ impl RecursiveRequest { freshest_response, ) }); - if closest_nodes.len() > LIMIT { + if closest_nodes.len() > self.useful_nodes_limit { let popped = closest_nodes.pop().unwrap(); if popped.id == node_id { return false; @@ -506,7 +522,7 @@ impl DhtState { let (tx, rx) = tokio::sync::oneshot::channel(); self.inflight_by_transaction_id .insert(key, OutstandingRequest { done: tx }); - trace!("sending to {addr}, {message:?}"); + trace!("sending {message:?}"); match self.worker_sender.send(WorkerSendRequest { our_tid: Some(tid), message, @@ -827,7 +843,7 @@ impl DhtWorker { futs.push( RecursiveRequest::find_node_for_routing_table( self.dht.clone(), random_id, addrs.into_iter() - ).instrument(error_span!("refresh_bucket", random_id=format!("{:?}", random_id))) + ).instrument(error_span!("refresh_bucket")) ); }, _ = futs.next(), if !futs.is_empty() => {}, diff --git a/crates/dht/src/lib.rs b/crates/dht/src/lib.rs index 30bb171..62c161d 100644 --- a/crates/dht/src/lib.rs +++ b/crates/dht/src/lib.rs @@ -15,11 +15,11 @@ pub use persistence::{PersistentDht, PersistentDhtConfig}; pub type Dht = Arc; // How long do we wait for a response from a DHT node. -pub(crate) const RESPONSE_TIMEOUT: Duration = Duration::from_secs(60); +pub(crate) const RESPONSE_TIMEOUT: Duration = Duration::from_secs(10); // TODO: Not sure if we should re-query tbh. -pub(crate) const REQUERY_INTERVAL: Duration = Duration::from_secs(60); +pub(crate) const REQUERY_INTERVAL: Duration = Duration::from_secs(300); // After how long should we ping the node again. -pub(crate) const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(15 * 60); +pub(crate) const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(2 * 60); pub struct DhtBuilder {} diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index 4223757..1da4808 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -541,26 +541,29 @@ impl RoutingTableNode { } pub fn status(&self) -> NodeStatus { match (self.last_request, self.last_response, self.last_query) { - (None, _, _) => NodeStatus::Unknown, // Nodes become bad when they fail to respond to multiple queries in a row. (Some(_), _, _) if self.errors_in_a_row >= 2 => NodeStatus::Bad, // A good node is a node has responded to one of our queries within the last 15 minutes. // A node is also good if it has ever responded to one of our queries and has sent // us a query within the last 15 minutes. - (Some(_), Some(last_activity), _) | (Some(_), Some(_), Some(last_activity)) - if last_activity.elapsed() < INACTIVITY_TIMEOUT => + (Some(_), Some(last_incoming), _) | (Some(_), Some(_), Some(last_incoming)) + if last_incoming.elapsed() < INACTIVITY_TIMEOUT => { NodeStatus::Good } - // After 15 minutes of inactivity, a node becomes questionable - (_, _, Some(last_activity)) | (_, Some(last_activity), _) - if last_activity.elapsed() > INACTIVITY_TIMEOUT => + // After 15 minutes of inactivity, a node becomes questionable. + // The moment we send a request to it, it stops becoming questionable and becomes Unknown / Bad. + (last_outgoing, _, Some(last_incoming)) | (last_outgoing, Some(last_incoming), _) + if last_incoming.elapsed() > INACTIVITY_TIMEOUT + && last_outgoing + .map(|e| e.elapsed() > INACTIVITY_TIMEOUT) + .unwrap_or(true) => { NodeStatus::Questionable } - (Some(_), _, _) => NodeStatus::Unknown, + _ => NodeStatus::Unknown, } } @@ -613,7 +616,16 @@ impl RoutingTable { for node in self.buckets.iter() { result.push(node); } - result.sort_by_key(|n| id.distance(&n.id)); + result.sort_by_key(|n| { + // Query decent nodes first. + let status = match n.status() { + NodeStatus::Good => 0, + NodeStatus::Questionable => 0, + NodeStatus::Unknown => 2, + NodeStatus::Bad => 3, + }; + (status, id.distance(&n.id)) + }); result }