Fix a bug in pinger, attribute log messages better
This commit is contained in:
parent
c8967f2469
commit
ebd0d818eb
1 changed files with 17 additions and 11 deletions
|
|
@ -355,7 +355,6 @@ impl<C: RecursiveRequestCallbacks> RecursiveRequest<C> {
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
trace!("received {response:?}");
|
|
||||||
|
|
||||||
if let Some(peers) = response.values {
|
if let Some(peers) = response.values {
|
||||||
for peer in peers {
|
for peer in peers {
|
||||||
|
|
@ -520,7 +519,10 @@ impl DhtState {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
match tokio::time::timeout(RESPONSE_TIMEOUT, rx).await {
|
match tokio::time::timeout(RESPONSE_TIMEOUT, rx).await {
|
||||||
Ok(Ok(r)) => r,
|
Ok(Ok(r)) => r.map(|r| {
|
||||||
|
trace!("received {r:?}");
|
||||||
|
r
|
||||||
|
}),
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
self.inflight_by_transaction_id.remove(&key);
|
self.inflight_by_transaction_id.remove(&key);
|
||||||
warn!("recv error, did not expect this: {:?}", e);
|
warn!("recv error, did not expect this: {:?}", e);
|
||||||
|
|
@ -621,8 +623,14 @@ impl DhtState {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
_ => {}
|
||||||
|
};
|
||||||
|
|
||||||
|
trace!("received query: {:?}", msg);
|
||||||
|
|
||||||
|
match &msg.kind {
|
||||||
// Otherwise, respond to a query.
|
// Otherwise, respond to a query.
|
||||||
MessageKind::PingRequest(req) => {
|
MessageKind::PingRequest(req) => {
|
||||||
let message = Message {
|
let message = Message {
|
||||||
|
|
@ -681,6 +689,7 @@ impl DhtState {
|
||||||
})?;
|
})?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -848,7 +857,7 @@ impl DhtWorker {
|
||||||
}
|
}
|
||||||
}.instrument(error_span!("ping", addr=addr.to_string())))
|
}.instrument(error_span!("ping", addr=addr.to_string())))
|
||||||
},
|
},
|
||||||
_ = futs.next() => {},
|
_ = futs.next(), if !futs.is_empty() => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -898,13 +907,10 @@ impl DhtWorker {
|
||||||
.await
|
.await
|
||||||
.context("error reading from UDP socket")?;
|
.context("error reading from UDP socket")?;
|
||||||
match bprotocol::deserialize_message::<ByteString>(&buf[..size]) {
|
match bprotocol::deserialize_message::<ByteString>(&buf[..size]) {
|
||||||
Ok(msg) => {
|
Ok(msg) => match output_tx.send((msg, addr)).await {
|
||||||
trace!("{}: received {:?}", addr, &msg);
|
Ok(_) => {}
|
||||||
match output_tx.send((msg, addr)).await {
|
Err(_) => break,
|
||||||
Ok(_) => {}
|
},
|
||||||
Err(_) => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => debug!("{}: error deserializing incoming message: {}", addr, e),
|
Err(e) => debug!("{}: error deserializing incoming message: {}", addr, e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue