Get back old behavior

This commit is contained in:
Igor Katson 2023-11-28 16:14:49 +00:00
parent 81428e30a2
commit 242f064328
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 36 additions and 49 deletions

View file

@ -15,6 +15,7 @@
- [x] remove including from disk
- [ ] DHT
- [ ] for torrents with a few seeds might be cool to re-query DHT once in a while.
- [ ] Buckets that have not been changed in 15 minutes should be "refreshed." (per RFC)
- [x] it's sending many requests now way too fast, locks up Mac OS UI annoyingly
- [ ] After the search is exhausted, the client then inserts the peer contact information for itself onto the responding nodes with IDs closest to the infohash of the torrent.
- [ ] Bad actors:

View file

@ -5,7 +5,7 @@ use std::{
Arc,
},
task::Poll,
time::Duration,
time::{Duration, Instant},
};
use crate::{
@ -40,7 +40,7 @@ pub struct DhtStats {
pub id: Id20,
pub outstanding_requests: usize,
pub seen_peers: usize,
pub outstanding_backoff_tasks: usize,
pub recent_requests: usize,
pub routing_table_size: usize,
}
@ -63,7 +63,7 @@ pub struct DhtState {
inflight_by_transaction_id: DashMap<(u16, SocketAddr), OutstandingRequest>,
// Current requests to addr being re-sent with backoff.
inflight_by_request: DashSet<(Request, SocketAddr)>,
recent_requests: DashMap<(Request, SocketAddr), Instant>,
routing_table: RwLock<RoutingTable>,
listen_addr: SocketAddr,
@ -92,7 +92,7 @@ impl DhtState {
listen_addr,
seen_peers: Default::default(),
get_peers_subscribers: Default::default(),
inflight_by_request: Default::default(),
recent_requests: Default::default(),
}
}
@ -355,7 +355,7 @@ impl DhtState {
id: self.id,
outstanding_requests: self.inflight_by_transaction_id.len(),
seen_peers: self.seen_peers.iter().map(|e| e.value().len()).sum(),
outstanding_backoff_tasks: self.inflight_by_request.len(),
recent_requests: self.recent_requests.len(),
routing_table_size: self.routing_table.read().len(),
}
}
@ -421,59 +421,45 @@ impl DhtState {
addr: SocketAddr,
) -> anyhow::Result<()> {
let key = (request, addr);
if !self.inflight_by_request.insert(key) {
return Ok(());
use dashmap::mapref::entry::Entry;
match self.recent_requests.entry(key) {
Entry::Occupied(mut o) => {
if o.get().elapsed() < REQUERY_INTERVAL {
return Ok(());
}
o.insert(Instant::now());
}
Entry::Vacant(v) => {
v.insert(Instant::now());
}
}
let this = self.clone();
let fut = async move {
let mut backoff = ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_secs(60))
.with_multiplier(1.5)
.with_max_interval(Duration::from_secs(10 * 60))
.with_max_elapsed_time(Some(Duration::from_secs(15 * 60)))
.build();
this.routing_table
.write()
.mark_outgoing_request(&target_node);
loop {
this.routing_table
.write()
.mark_outgoing_request(&target_node);
let resp = this.request(request, addr).await;
let sleep = match resp {
Ok(ResponseOrError::Response(response)) => {
match this.on_response(addr, request, response) {
Ok(()) => {
backoff.reset();
Some(REQUERY_INTERVAL)
}
Err(e) => {
warn!("error in on_response: {:?}", e);
backoff.next_backoff()
}
let resp = this.request(request, addr).await;
match resp {
Ok(ResponseOrError::Response(response)) => {
match this.on_response(addr, request, response) {
Ok(()) => {}
Err(e) => {
warn!("error in on_response: {:?}", e);
}
}
Ok(ResponseOrError::Error(e)) => {
debug!("error response: {:?}", e);
backoff.next_backoff()
}
Err(e) => {
debug!("error: {:?}", e);
backoff.next_backoff()
}
};
if let Some(sleep) = sleep {
tokio::time::sleep(sleep).await;
continue;
}
tokio::task::spawn(async move {
this.inflight_by_request.remove(&key);
});
return Ok(());
}
Ok(ResponseOrError::Error(e)) => {
debug!("error response: {:?}", e);
}
Err(e) => {
debug!("error: {:?}", e);
}
};
Ok(())
};
spawn(