A bunch of initial changes to DHT
This commit is contained in:
parent
7d46318e98
commit
ab5ae527aa
3 changed files with 103 additions and 23 deletions
|
|
@ -309,7 +309,7 @@ pub struct GetPeersRequest {
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct PingRequest {
|
pub struct PingRequest {
|
||||||
id: Id20,
|
pub id: Id20,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ use std::{
|
||||||
use crate::{
|
use crate::{
|
||||||
bprotocol::{
|
bprotocol::{
|
||||||
self, CompactNodeInfo, CompactPeerInfo, FindNodeRequest, GetPeersRequest, Message,
|
self, CompactNodeInfo, CompactPeerInfo, FindNodeRequest, GetPeersRequest, Message,
|
||||||
MessageKind, Node,
|
MessageKind, Node, PingRequest,
|
||||||
},
|
},
|
||||||
routing_table::{InsertResult, RoutingTable},
|
routing_table::{InsertResult, RoutingTable},
|
||||||
};
|
};
|
||||||
|
|
@ -108,6 +108,12 @@ impl DhtState {
|
||||||
target,
|
target,
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
|
Request::Ping => Message {
|
||||||
|
transaction_id: ByteString::from(transaction_id_buf.as_ref()),
|
||||||
|
version: None,
|
||||||
|
ip: None,
|
||||||
|
kind: MessageKind::PingRequest(PingRequest { id: self.id }),
|
||||||
|
},
|
||||||
};
|
};
|
||||||
self.outstanding_requests
|
self.outstanding_requests
|
||||||
.insert((transaction_id, addr), request);
|
.insert((transaction_id, addr), request);
|
||||||
|
|
@ -169,6 +175,7 @@ impl DhtState {
|
||||||
Request::GetPeers(id) => {
|
Request::GetPeers(id) => {
|
||||||
self.on_found_peers_or_nodes(response.id, addr, id, response)
|
self.on_found_peers_or_nodes(response.id, addr, id, response)
|
||||||
}
|
}
|
||||||
|
Request::Ping => Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MessageKind::PingRequest(_) => {
|
MessageKind::PingRequest(_) => {
|
||||||
|
|
@ -205,6 +212,7 @@ impl DhtState {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
let compact_node_info = generate_compact_nodes(req.info_hash);
|
let compact_node_info = generate_compact_nodes(req.info_hash);
|
||||||
|
self.routing_table.mark_last_query(&req.id);
|
||||||
let message = Message {
|
let message = Message {
|
||||||
transaction_id: msg.transaction_id,
|
transaction_id: msg.transaction_id,
|
||||||
version: None,
|
version: None,
|
||||||
|
|
@ -221,6 +229,7 @@ impl DhtState {
|
||||||
}
|
}
|
||||||
MessageKind::FindNodeRequest(req) => {
|
MessageKind::FindNodeRequest(req) => {
|
||||||
let compact_node_info = generate_compact_nodes(req.target);
|
let compact_node_info = generate_compact_nodes(req.target);
|
||||||
|
self.routing_table.mark_last_query(&req.id);
|
||||||
let message = Message {
|
let message = Message {
|
||||||
transaction_id: msg.transaction_id,
|
transaction_id: msg.transaction_id,
|
||||||
version: None,
|
version: None,
|
||||||
|
|
@ -320,6 +329,19 @@ impl DhtState {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn routing_table_add_node(&mut self, id: Id20, addr: SocketAddr) -> InsertResult {
|
||||||
|
let mut questionable_nodes = Vec::new();
|
||||||
|
let res = self.routing_table.add_node(id, addr, |addr| {
|
||||||
|
questionable_nodes.push(addr);
|
||||||
|
true
|
||||||
|
});
|
||||||
|
for addr in questionable_nodes {
|
||||||
|
let req = self.create_request(Request::Ping, addr);
|
||||||
|
let _ = self.sender.send((req, addr));
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
fn on_found_nodes(
|
fn on_found_nodes(
|
||||||
&mut self,
|
&mut self,
|
||||||
source: Id20,
|
source: Id20,
|
||||||
|
|
@ -336,7 +358,7 @@ impl DhtState {
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
// On newly discovered nodes, ask them for peers that we are interested in.
|
// On newly discovered nodes, ask them for peers that we are interested in.
|
||||||
match self.routing_table.add_node(source, source_addr) {
|
match self.routing_table_add_node(source, source_addr) {
|
||||||
InsertResult::ReplacedBad(_) | InsertResult::Added => {
|
InsertResult::ReplacedBad(_) | InsertResult::Added => {
|
||||||
for info_hash in &searching_for_peers {
|
for info_hash in &searching_for_peers {
|
||||||
self.send_find_peers_if_not_yet(*info_hash, source, source_addr)?;
|
self.send_find_peers_if_not_yet(*info_hash, source, source_addr)?;
|
||||||
|
|
@ -345,7 +367,7 @@ impl DhtState {
|
||||||
_ => {}
|
_ => {}
|
||||||
};
|
};
|
||||||
for node in nodes.nodes {
|
for node in nodes.nodes {
|
||||||
match self.routing_table.add_node(node.id, node.addr.into()) {
|
match self.routing_table_add_node(node.id, node.addr.into()) {
|
||||||
InsertResult::ReplacedBad(_) | InsertResult::Added => {
|
InsertResult::ReplacedBad(_) | InsertResult::Added => {
|
||||||
for info_hash in &searching_for_peers {
|
for info_hash in &searching_for_peers {
|
||||||
self.send_find_peers_if_not_yet(*info_hash, node.id, node.addr.into())?;
|
self.send_find_peers_if_not_yet(*info_hash, node.id, node.addr.into())?;
|
||||||
|
|
@ -366,7 +388,7 @@ impl DhtState {
|
||||||
target: Id20,
|
target: Id20,
|
||||||
data: bprotocol::Response<ByteString>,
|
data: bprotocol::Response<ByteString>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
self.routing_table.add_node(source, source_addr);
|
self.routing_table_add_node(source, source_addr);
|
||||||
self.routing_table.mark_response(&source);
|
self.routing_table.mark_response(&source);
|
||||||
|
|
||||||
let bsender = match self.get_peers_subscribers.get(&target) {
|
let bsender = match self.get_peers_subscribers.get(&target) {
|
||||||
|
|
@ -398,7 +420,7 @@ impl DhtState {
|
||||||
};
|
};
|
||||||
if let Some(nodes) = data.nodes {
|
if let Some(nodes) = data.nodes {
|
||||||
for node in nodes.nodes {
|
for node in nodes.nodes {
|
||||||
self.routing_table.add_node(node.id, node.addr.into());
|
self.routing_table_add_node(node.id, node.addr.into());
|
||||||
self.send_find_peers_if_not_yet(target, node.id, node.addr.into())?;
|
self.send_find_peers_if_not_yet(target, node.id, node.addr.into())?;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -488,6 +510,7 @@ async fn run_framer(
|
||||||
enum Request {
|
enum Request {
|
||||||
GetPeers(Id20),
|
GetPeers(Id20),
|
||||||
FindNode(Id20),
|
FindNode(Id20),
|
||||||
|
Ping,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
|
||||||
|
|
@ -280,9 +280,15 @@ impl BucketTree {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_node(&mut self, self_id: &Id20, id: Id20, addr: SocketAddr) -> InsertResult {
|
pub fn add_node(
|
||||||
|
&mut self,
|
||||||
|
self_id: &Id20,
|
||||||
|
id: Id20,
|
||||||
|
addr: SocketAddr,
|
||||||
|
on_questionable_node: impl FnMut(SocketAddr) -> bool,
|
||||||
|
) -> InsertResult {
|
||||||
let idx = self.get_leaf(&id);
|
let idx = self.get_leaf(&id);
|
||||||
self.insert_into_leaf(idx, self_id, id, addr)
|
self.insert_into_leaf(idx, self_id, id, addr, on_questionable_node)
|
||||||
}
|
}
|
||||||
fn insert_into_leaf(
|
fn insert_into_leaf(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
|
@ -290,6 +296,7 @@ impl BucketTree {
|
||||||
self_id: &Id20,
|
self_id: &Id20,
|
||||||
id: Id20,
|
id: Id20,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
|
mut on_questionable_node: impl FnMut(SocketAddr) -> bool,
|
||||||
) -> InsertResult {
|
) -> InsertResult {
|
||||||
// The loop here is for this case:
|
// The loop here is for this case:
|
||||||
// in case we split a node into two, and it degenerates into all the leaves
|
// in case we split a node into two, and it degenerates into all the leaves
|
||||||
|
|
@ -313,6 +320,7 @@ impl BucketTree {
|
||||||
addr,
|
addr,
|
||||||
last_request: None,
|
last_request: None,
|
||||||
last_response: None,
|
last_response: None,
|
||||||
|
last_query: None,
|
||||||
outstanding_queries_in_a_row: 0,
|
outstanding_queries_in_a_row: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -322,6 +330,16 @@ impl BucketTree {
|
||||||
return InsertResult::Added;
|
return InsertResult::Added;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ping first questionable node
|
||||||
|
if let Some(questionable_node) = nodes
|
||||||
|
.iter_mut()
|
||||||
|
.find(|r| matches!(r.status(), NodeStatus::Questionable))
|
||||||
|
{
|
||||||
|
if on_questionable_node(questionable_node.addr) {
|
||||||
|
questionable_node.mark_outgoing_request();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Try replace a bad node
|
// Try replace a bad node
|
||||||
if let Some(bad_node) = nodes
|
if let Some(bad_node) = nodes
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
|
|
@ -400,6 +418,8 @@ pub struct RoutingTableNode {
|
||||||
#[serde(skip)]
|
#[serde(skip)]
|
||||||
last_response: Option<Instant>,
|
last_response: Option<Instant>,
|
||||||
#[serde(skip)]
|
#[serde(skip)]
|
||||||
|
last_query: Option<Instant>,
|
||||||
|
#[serde(skip)]
|
||||||
outstanding_queries_in_a_row: usize,
|
outstanding_queries_in_a_row: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -418,19 +438,36 @@ impl RoutingTableNode {
|
||||||
self.addr
|
self.addr
|
||||||
}
|
}
|
||||||
pub fn status(&self) -> NodeStatus {
|
pub fn status(&self) -> NodeStatus {
|
||||||
// TODO: this is just a stub with simpler logic
|
const RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
let last_request = match self.last_request {
|
const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(15 * 60);
|
||||||
Some(v) => v,
|
|
||||||
None => return NodeStatus::Unknown,
|
match (self.last_request, self.last_response, self.last_query) {
|
||||||
};
|
(None, _, _) => NodeStatus::Unknown,
|
||||||
if self.outstanding_queries_in_a_row > 0 && last_request.elapsed() > Duration::from_secs(10)
|
// Nodes become bad when they fail to respond to multiple queries in a row.
|
||||||
{
|
(Some(last_request), _, _)
|
||||||
return NodeStatus::Bad;
|
if last_request.elapsed() > RESPONSE_TIMEOUT
|
||||||
|
&& self.outstanding_queries_in_a_row >= 2 =>
|
||||||
|
{
|
||||||
|
NodeStatus::Bad
|
||||||
|
}
|
||||||
|
|
||||||
|
// A good node is a node has responded to one of our queries within the last 15 minutes.
|
||||||
|
// A node is also good if it has ever responded to one of our queries and has sent
|
||||||
|
// us a query within the last 15 minutes.
|
||||||
|
(Some(_), Some(last_activity), _) | (Some(_), Some(_), Some(last_activity))
|
||||||
|
if last_activity.elapsed() < INACTIVITY_TIMEOUT =>
|
||||||
|
{
|
||||||
|
NodeStatus::Good
|
||||||
|
}
|
||||||
|
|
||||||
|
// After 15 minutes of inactivity, a node becomes questionable
|
||||||
|
(_, _, Some(last_activity)) | (_, Some(last_activity), _)
|
||||||
|
if last_activity.elapsed() > INACTIVITY_TIMEOUT =>
|
||||||
|
{
|
||||||
|
NodeStatus::Questionable
|
||||||
|
}
|
||||||
|
(Some(_), _, _) => NodeStatus::Unknown,
|
||||||
}
|
}
|
||||||
if self.last_response.is_some() {
|
|
||||||
return NodeStatus::Good;
|
|
||||||
}
|
|
||||||
NodeStatus::Questionable
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn mark_outgoing_request(&mut self) {
|
pub fn mark_outgoing_request(&mut self) {
|
||||||
|
|
@ -438,6 +475,10 @@ impl RoutingTableNode {
|
||||||
self.outstanding_queries_in_a_row += 1;
|
self.outstanding_queries_in_a_row += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn mark_last_query(&mut self) {
|
||||||
|
self.last_query = Some(Instant::now());
|
||||||
|
}
|
||||||
|
|
||||||
pub fn mark_response(&mut self) {
|
pub fn mark_response(&mut self) {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
self.last_response = Some(now);
|
self.last_response = Some(now);
|
||||||
|
|
@ -479,8 +520,15 @@ impl RoutingTable {
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_node(&mut self, id: Id20, addr: SocketAddr) -> InsertResult {
|
pub fn add_node(
|
||||||
let res = self.buckets.add_node(&self.id, id, addr);
|
&mut self,
|
||||||
|
id: Id20,
|
||||||
|
addr: SocketAddr,
|
||||||
|
on_questionable_node: impl FnMut(SocketAddr) -> bool,
|
||||||
|
) -> InsertResult {
|
||||||
|
let res = self
|
||||||
|
.buckets
|
||||||
|
.add_node(&self.id, id, addr, on_questionable_node);
|
||||||
let replaced = match &res {
|
let replaced = match &res {
|
||||||
InsertResult::WasExisting => false,
|
InsertResult::WasExisting => false,
|
||||||
InsertResult::ReplacedBad(..) => true,
|
InsertResult::ReplacedBad(..) => true,
|
||||||
|
|
@ -509,6 +557,15 @@ impl RoutingTable {
|
||||||
r.mark_response();
|
r.mark_response();
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn mark_last_query(&mut self, id: &Id20) -> bool {
|
||||||
|
let r = match self.buckets.get_mut(id) {
|
||||||
|
Some(r) => r,
|
||||||
|
None => return false,
|
||||||
|
};
|
||||||
|
r.mark_last_query();
|
||||||
|
true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
@ -603,7 +660,7 @@ mod tests {
|
||||||
for _ in 0..length.unwrap_or(16536) {
|
for _ in 0..length.unwrap_or(16536) {
|
||||||
let other_id = random_id_20();
|
let other_id = random_id_20();
|
||||||
let addr = generate_socket_addr();
|
let addr = generate_socket_addr();
|
||||||
rtable.add_node(other_id, addr);
|
rtable.add_node(other_id, addr, |_| false);
|
||||||
}
|
}
|
||||||
rtable
|
rtable
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue