diff --git a/crates/dht/src/bprotocol.rs b/crates/dht/src/bprotocol.rs index 4488fec..d0f92a4 100644 --- a/crates/dht/src/bprotocol.rs +++ b/crates/dht/src/bprotocol.rs @@ -309,7 +309,7 @@ pub struct GetPeersRequest { #[derive(Debug, Serialize, Deserialize)] pub struct PingRequest { - id: Id20, + pub id: Id20, } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index f2331e2..779c214 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -9,7 +9,7 @@ use std::{ use crate::{ bprotocol::{ self, CompactNodeInfo, CompactPeerInfo, FindNodeRequest, GetPeersRequest, Message, - MessageKind, Node, + MessageKind, Node, PingRequest, }, routing_table::{InsertResult, RoutingTable}, }; @@ -108,6 +108,12 @@ impl DhtState { target, }), }, + Request::Ping => Message { + transaction_id: ByteString::from(transaction_id_buf.as_ref()), + version: None, + ip: None, + kind: MessageKind::PingRequest(PingRequest { id: self.id }), + }, }; self.outstanding_requests .insert((transaction_id, addr), request); @@ -169,6 +175,7 @@ impl DhtState { Request::GetPeers(id) => { self.on_found_peers_or_nodes(response.id, addr, id, response) } + Request::Ping => Ok(()), } } MessageKind::PingRequest(_) => { @@ -205,6 +212,7 @@ impl DhtState { None }; let compact_node_info = generate_compact_nodes(req.info_hash); + self.routing_table.mark_last_query(&req.id); let message = Message { transaction_id: msg.transaction_id, version: None, @@ -221,6 +229,7 @@ impl DhtState { } MessageKind::FindNodeRequest(req) => { let compact_node_info = generate_compact_nodes(req.target); + self.routing_table.mark_last_query(&req.id); let message = Message { transaction_id: msg.transaction_id, version: None, @@ -320,6 +329,19 @@ impl DhtState { Ok(()) } + fn routing_table_add_node(&mut self, id: Id20, addr: SocketAddr) -> InsertResult { + let mut questionable_nodes = Vec::new(); + let res = self.routing_table.add_node(id, addr, |addr| { + questionable_nodes.push(addr); + true + }); + for addr in questionable_nodes { + let req = self.create_request(Request::Ping, addr); + let _ = self.sender.send((req, addr)); + } + res + } + fn on_found_nodes( &mut self, source: Id20, @@ -336,7 +358,7 @@ impl DhtState { .collect::>(); // On newly discovered nodes, ask them for peers that we are interested in. - match self.routing_table.add_node(source, source_addr) { + match self.routing_table_add_node(source, source_addr) { InsertResult::ReplacedBad(_) | InsertResult::Added => { for info_hash in &searching_for_peers { self.send_find_peers_if_not_yet(*info_hash, source, source_addr)?; @@ -345,7 +367,7 @@ impl DhtState { _ => {} }; for node in nodes.nodes { - match self.routing_table.add_node(node.id, node.addr.into()) { + match self.routing_table_add_node(node.id, node.addr.into()) { InsertResult::ReplacedBad(_) | InsertResult::Added => { for info_hash in &searching_for_peers { self.send_find_peers_if_not_yet(*info_hash, node.id, node.addr.into())?; @@ -366,7 +388,7 @@ impl DhtState { target: Id20, data: bprotocol::Response, ) -> anyhow::Result<()> { - self.routing_table.add_node(source, source_addr); + self.routing_table_add_node(source, source_addr); self.routing_table.mark_response(&source); let bsender = match self.get_peers_subscribers.get(&target) { @@ -398,7 +420,7 @@ impl DhtState { }; if let Some(nodes) = data.nodes { for node in nodes.nodes { - self.routing_table.add_node(node.id, node.addr.into()); + self.routing_table_add_node(node.id, node.addr.into()); self.send_find_peers_if_not_yet(target, node.id, node.addr.into())?; } }; @@ -488,6 +510,7 @@ async fn run_framer( enum Request { GetPeers(Id20), FindNode(Id20), + Ping, } #[derive(Clone)] diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index 5118c81..6e45ab9 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -280,9 +280,15 @@ impl BucketTree { } } - pub fn add_node(&mut self, self_id: &Id20, id: Id20, addr: SocketAddr) -> InsertResult { + pub fn add_node( + &mut self, + self_id: &Id20, + id: Id20, + addr: SocketAddr, + on_questionable_node: impl FnMut(SocketAddr) -> bool, + ) -> InsertResult { let idx = self.get_leaf(&id); - self.insert_into_leaf(idx, self_id, id, addr) + self.insert_into_leaf(idx, self_id, id, addr, on_questionable_node) } fn insert_into_leaf( &mut self, @@ -290,6 +296,7 @@ impl BucketTree { self_id: &Id20, id: Id20, addr: SocketAddr, + mut on_questionable_node: impl FnMut(SocketAddr) -> bool, ) -> InsertResult { // The loop here is for this case: // in case we split a node into two, and it degenerates into all the leaves @@ -313,6 +320,7 @@ impl BucketTree { addr, last_request: None, last_response: None, + last_query: None, outstanding_queries_in_a_row: 0, }; @@ -322,6 +330,16 @@ impl BucketTree { return InsertResult::Added; } + // Ping first questionable node + if let Some(questionable_node) = nodes + .iter_mut() + .find(|r| matches!(r.status(), NodeStatus::Questionable)) + { + if on_questionable_node(questionable_node.addr) { + questionable_node.mark_outgoing_request(); + } + } + // Try replace a bad node if let Some(bad_node) = nodes .iter_mut() @@ -400,6 +418,8 @@ pub struct RoutingTableNode { #[serde(skip)] last_response: Option, #[serde(skip)] + last_query: Option, + #[serde(skip)] outstanding_queries_in_a_row: usize, } @@ -418,19 +438,36 @@ impl RoutingTableNode { self.addr } pub fn status(&self) -> NodeStatus { - // TODO: this is just a stub with simpler logic - let last_request = match self.last_request { - Some(v) => v, - None => return NodeStatus::Unknown, - }; - if self.outstanding_queries_in_a_row > 0 && last_request.elapsed() > Duration::from_secs(10) - { - return NodeStatus::Bad; + const RESPONSE_TIMEOUT: Duration = Duration::from_secs(10); + const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(15 * 60); + + match (self.last_request, self.last_response, self.last_query) { + (None, _, _) => NodeStatus::Unknown, + // Nodes become bad when they fail to respond to multiple queries in a row. + (Some(last_request), _, _) + if last_request.elapsed() > RESPONSE_TIMEOUT + && self.outstanding_queries_in_a_row >= 2 => + { + NodeStatus::Bad + } + + // A good node is a node has responded to one of our queries within the last 15 minutes. + // A node is also good if it has ever responded to one of our queries and has sent + // us a query within the last 15 minutes. + (Some(_), Some(last_activity), _) | (Some(_), Some(_), Some(last_activity)) + if last_activity.elapsed() < INACTIVITY_TIMEOUT => + { + NodeStatus::Good + } + + // After 15 minutes of inactivity, a node becomes questionable + (_, _, Some(last_activity)) | (_, Some(last_activity), _) + if last_activity.elapsed() > INACTIVITY_TIMEOUT => + { + NodeStatus::Questionable + } + (Some(_), _, _) => NodeStatus::Unknown, } - if self.last_response.is_some() { - return NodeStatus::Good; - } - NodeStatus::Questionable } pub fn mark_outgoing_request(&mut self) { @@ -438,6 +475,10 @@ impl RoutingTableNode { self.outstanding_queries_in_a_row += 1; } + pub fn mark_last_query(&mut self) { + self.last_query = Some(Instant::now()); + } + pub fn mark_response(&mut self) { let now = Instant::now(); self.last_response = Some(now); @@ -479,8 +520,15 @@ impl RoutingTable { result } - pub fn add_node(&mut self, id: Id20, addr: SocketAddr) -> InsertResult { - let res = self.buckets.add_node(&self.id, id, addr); + pub fn add_node( + &mut self, + id: Id20, + addr: SocketAddr, + on_questionable_node: impl FnMut(SocketAddr) -> bool, + ) -> InsertResult { + let res = self + .buckets + .add_node(&self.id, id, addr, on_questionable_node); let replaced = match &res { InsertResult::WasExisting => false, InsertResult::ReplacedBad(..) => true, @@ -509,6 +557,15 @@ impl RoutingTable { r.mark_response(); true } + + pub fn mark_last_query(&mut self, id: &Id20) -> bool { + let r = match self.buckets.get_mut(id) { + Some(r) => r, + None => return false, + }; + r.mark_last_query(); + true + } } #[cfg(test)] @@ -603,7 +660,7 @@ mod tests { for _ in 0..length.unwrap_or(16536) { let other_id = random_id_20(); let addr = generate_socket_addr(); - rtable.add_node(other_id, addr); + rtable.add_node(other_id, addr, |_| false); } rtable }