Saving before slight refactor

This commit is contained in:
Igor Katson 2023-11-29 13:48:27 +00:00
parent dc3da89b59
commit 6518dc6eff
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 36 additions and 9 deletions

View file

@ -130,7 +130,7 @@ impl DhtState {
fn spawn_request(self: &Arc<Self>, request: Request, addr: SocketAddr) {
let this = self.clone();
spawn(
error_span!(parent: None, "dht_request", addr=addr.to_string(), request=format!("{:?}", request)),
error_span!(parent: None, "dht_spawn_request", addr=addr.to_string(), request=format!("{:?}", request)),
async move {
match this.send_request_and_handle_response(request, addr).await {
Ok(_) => {}
@ -421,7 +421,13 @@ impl DhtState {
spawn(
error_span!("peers_requester", info_hash = format!("{:?}", info_hash)),
async move {
let mut iteration = 0usize;
loop {
if !this.get_peers_subscribers.contains_key(&info_hash) {
debug!("no more subscribers, closing peers_requester");
return Ok(());
}
trace!("iteration {iteration}");
// We don't need to allocate/collect here, but the borrow checker is not happy otherwise.
let nodes_to_query = this
.routing_table
@ -442,6 +448,7 @@ impl DhtState {
}
}
tokio::time::sleep(REQUERY_INTERVAL).await;
iteration += 1;
}
},
);
@ -635,9 +642,10 @@ impl DhtState {
) -> anyhow::Result<()> {
self.routing_table_add_node(source, source_addr);
let bsender = match self.get_peers_subscribers.get(&info_hash) {
Some(s) => s,
None => {
use dashmap::mapref::entry::Entry;
let bsender = match self.get_peers_subscribers.entry(info_hash) {
Entry::Occupied(o) => o,
Entry::Vacant(_) => {
warn!(
"ignoring get_peers response, no subscribers for {:?}",
info_hash
@ -647,7 +655,6 @@ impl DhtState {
};
{
use dashmap::mapref::entry::Entry;
let n = MaybeUsefulNode {
id: source,
addr: source_addr,
@ -682,9 +689,29 @@ impl DhtState {
}
let addr = SocketAddr::V4(peer.addr);
if seen.insert(addr) {
bsender
.send(addr)
.context("error sending peers to subscribers")?;
match bsender.get().send(addr) {
Ok(_) => {}
Err(_) => {
debug!("no more subscribers for {:?}, cleaning up", info_hash);
// bsender.remove();
// let this = self.clone();
// spawn(
// error_span!("cleanup", info_hash = format!("{info_hash:?}")),
// async move {
// tokio::time::sleep(Duration::from_secs(10)).await;
// if !this.get_peers_subscribers.contains_key(&info_hash) {
// debug!("no more subscribers for {:?}, removed it from seen peers", info_hash);
// this.seen_peers.remove(&info_hash);
// this.closest_responding_nodes_for_info_hash
// .remove(&info_hash);
// }
// Ok(())
// },
// );
return Ok(());
}
}
}
}
};