Make questionable node pings better

This commit is contained in:
Igor Katson 2023-11-30 09:38:35 +00:00
parent 210a3d5d3e
commit f04277cc11
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 65 additions and 48 deletions

View file

@ -48,6 +48,7 @@ struct OutstandingRequest {
}
pub struct WorkerSendRequest {
// If this is set, we are tracking the response in inflight_by_transaction_id
our_tid: Option<u16>,
message: Message<ByteString>,
addr: SocketAddr,
@ -471,13 +472,17 @@ pub struct DhtState {
// Sending requests to the worker.
rate_limiter: RateLimiter,
sender: UnboundedSender<WorkerSendRequest>,
// This is to send raw messages
worker_sender: UnboundedSender<WorkerSendRequest>,
// This is to send pings.
ping_sender: UnboundedSender<(Id20, SocketAddr)>,
}
impl DhtState {
fn new_internal(
id: Id20,
sender: UnboundedSender<WorkerSendRequest>,
ping_sender: UnboundedSender<(Id20, SocketAddr)>,
routing_table: Option<RoutingTable>,
listen_addr: SocketAddr,
) -> Self {
@ -487,8 +492,9 @@ impl DhtState {
next_transaction_id: AtomicU16::new(0),
inflight_by_transaction_id: Default::default(),
routing_table: RwLock::new(routing_table),
sender,
worker_sender: sender,
listen_addr,
ping_sender,
rate_limiter: make_rate_limiter(),
}
}
@ -500,7 +506,8 @@ impl DhtState {
let (tx, rx) = tokio::sync::oneshot::channel();
self.inflight_by_transaction_id
.insert(key, OutstandingRequest { done: tx });
match self.sender.send(WorkerSendRequest {
trace!("sending to {addr}, {message:?}");
match self.worker_sender.send(WorkerSendRequest {
our_tid: Some(tid),
message,
addr,
@ -594,7 +601,9 @@ impl DhtState {
.map(|(_, v)| v)
{
Some(req) => req,
None => bail!("outstanding request not found. Message: {:?}", msg),
None => {
bail!("outstanding request not found. Message: {:?}", msg)
}
};
let response_or_error = match msg.kind {
@ -625,7 +634,7 @@ impl DhtState {
}),
};
self.routing_table.write().mark_last_query(&req.id);
self.sender.send(WorkerSendRequest {
self.worker_sender.send(WorkerSendRequest {
our_tid: None,
message,
addr,
@ -633,26 +642,7 @@ impl DhtState {
Ok(())
}
MessageKind::GetPeersRequest(req) => {
// let peers = self.info_hash_meta.get(&req.info_hash).map(|meta| {
// meta.seen_peers
// .iter()
// .copied()
// .filter_map(|a| match a {
// SocketAddr::V4(v4) => Some(CompactPeerInfo { addr: v4 }),
// // this should never happen in practice
// SocketAddr::V6(_) => None,
// })
// .take(50)
// .collect::<Vec<_>>()
// });
// let token = if peers.is_some() {
// let mut token = [0u8; 20];
// rand::thread_rng().fill(&mut token);
// Some(ByteString::from(token.as_ref()))
// } else {
// None
// };
// let compact_node_info = generate_compact_nodes(req.info_hash);
// TODO: respond with peer info, for now sending an empty response.
self.routing_table.write().mark_last_query(&req.id);
let message = Message {
transaction_id: msg.transaction_id,
@ -660,12 +650,10 @@ impl DhtState {
ip: None,
kind: MessageKind::Response(bprotocol::Response {
id: self.id,
nodes: None,
values: None,
token: None,
..Default::default()
}),
};
self.sender.send(WorkerSendRequest {
self.worker_sender.send(WorkerSendRequest {
our_tid: None,
message,
addr,
@ -685,7 +673,7 @@ impl DhtState {
..Default::default()
}),
};
self.sender.send(WorkerSendRequest {
self.worker_sender.send(WorkerSendRequest {
our_tid: None,
message,
addr,
@ -704,19 +692,10 @@ impl DhtState {
}
fn routing_table_add_node(self: &Arc<Self>, id: Id20, addr: SocketAddr) -> InsertResult {
let mut questionable_nodes = Vec::new();
let res = self.routing_table.write().add_node(id, addr, |addr| {
questionable_nodes.push(addr);
let res = self.routing_table.write().add_node(id, addr, |id, addr| {
let _ = self.ping_sender.send((id, addr));
true
});
for addr in questionable_nodes {
let (_, req) = self.create_request(Request::Ping);
let _ = self.sender.send(WorkerSendRequest {
our_tid: None,
message: req,
addr,
});
}
res
}
}
@ -796,6 +775,33 @@ impl DhtWorker {
Ok(())
}
async fn pinger(&self, mut rx: UnboundedReceiver<(Id20, SocketAddr)>) -> anyhow::Result<()> {
let mut futs = FuturesUnordered::new();
loop {
tokio::select! {
r = rx.recv() => {
let (id, addr) = match r {
Some(r) => r,
None => return Ok(()),
};
futs.push(async move {
self.dht.routing_table.write().mark_outgoing_request(&id);
match self.dht.request(Request::Ping, addr).await {
Ok(_) => {
self.dht.routing_table.write().mark_response(&id);
},
Err(e) => {
self.dht.routing_table.write().mark_error(&id);
debug!("error: {e:?}");
}
}
}.instrument(error_span!("ping", addr=addr.to_string())))
},
_ = futs.next() => {},
}
}
}
async fn framer(
&self,
socket: &UdpSocket,
@ -810,7 +816,9 @@ impl DhtWorker {
addr,
}) = input_rx.recv().await
{
trace!("{}: sending {:?}", addr, &message);
if our_tid.is_none() {
trace!("{}: sending {:?}", addr, &message);
}
buf.clear();
bprotocol::serialize_message(
&mut buf,
@ -863,6 +871,7 @@ impl DhtWorker {
async fn start(
self,
in_rx: UnboundedReceiver<WorkerSendRequest>,
ping_rx: UnboundedReceiver<(Id20, SocketAddr)>,
bootstrap_addrs: &[String],
) -> anyhow::Result<()> {
let (out_tx, mut out_rx) = channel(1);
@ -888,9 +897,12 @@ impl DhtWorker {
}
.instrument(debug_span!("dht_responese_reader"));
let pinger = self.pinger(ping_rx);
tokio::pin!(framer);
tokio::pin!(bootstrap);
tokio::pin!(response_reader);
tokio::pin!(pinger);
loop {
tokio::select! {
@ -901,6 +913,9 @@ impl DhtWorker {
bootstrap_done = true;
result?;
},
err = &mut pinger => {
anyhow::bail!("pinger quit: {:?}", err)
},
err = &mut response_reader => {anyhow::bail!("response reader quit: {:?}", err)}
}
}
@ -941,9 +956,11 @@ impl DhtState {
.unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect());
let (in_tx, in_rx) = unbounded_channel();
let (ping_tx, ping_rx) = unbounded_channel();
let state = Arc::new(Self::new_internal(
peer_id,
in_tx,
ping_tx,
config.routing_table,
listen_addr,
));
@ -952,7 +969,7 @@ impl DhtState {
let state = state.clone();
async move {
let worker = DhtWorker { socket, dht: state };
worker.start(in_rx, &bootstrap_addrs).await?;
worker.start(in_rx, ping_rx, &bootstrap_addrs).await?;
Ok(())
}
});

View file

@ -287,7 +287,7 @@ impl BucketTree {
self_id: &Id20,
id: Id20,
addr: SocketAddr,
on_questionable_node: impl FnMut(SocketAddr) -> bool,
on_questionable_node: impl FnMut(Id20, SocketAddr) -> bool,
) -> InsertResult {
let idx = self.get_leaf(&id);
self.insert_into_leaf(idx, self_id, id, addr, on_questionable_node)
@ -298,7 +298,7 @@ impl BucketTree {
self_id: &Id20,
id: Id20,
addr: SocketAddr,
mut on_questionable_node: impl FnMut(SocketAddr) -> bool,
mut on_questionable_node: impl FnMut(Id20, SocketAddr) -> bool,
) -> InsertResult {
// The loop here is for this case:
// in case we split a node into two, and it degenerates into all the leaves
@ -337,7 +337,7 @@ impl BucketTree {
.iter_mut()
.find(|r| matches!(r.status(), NodeStatus::Questionable))
{
if on_questionable_node(questionable_node.addr) {
if on_questionable_node(questionable_node.id, questionable_node.addr) {
questionable_node.mark_outgoing_request();
}
}
@ -545,7 +545,7 @@ impl RoutingTable {
&mut self,
id: Id20,
addr: SocketAddr,
on_questionable_node: impl FnMut(SocketAddr) -> bool,
on_questionable_node: impl FnMut(Id20, SocketAddr) -> bool,
) -> InsertResult {
let res = self
.buckets
@ -690,7 +690,7 @@ mod tests {
for _ in 0..length.unwrap_or(16536) {
let other_id = random_id_20();
let addr = generate_socket_addr();
rtable.add_node(other_id, addr, |_| false);
rtable.add_node(other_id, addr, |_, _| false);
}
rtable
}