diff --git a/TODO.md b/TODO.md index 7c24028..ac09aa6 100644 --- a/TODO.md +++ b/TODO.md @@ -15,6 +15,7 @@ - [x] remove including from disk - [ ] DHT - [ ] for torrents with a few seeds might be cool to re-query DHT once in a while. + - [ ] Buckets that have not been changed in 15 minutes should be "refreshed." (per RFC) - [x] it's sending many requests now way too fast, locks up Mac OS UI annoyingly - [ ] After the search is exhausted, the client then inserts the peer contact information for itself onto the responding nodes with IDs closest to the infohash of the torrent. - [ ] Bad actors: diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index bfa5018..f6d8143 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -5,7 +5,7 @@ use std::{ Arc, }, task::Poll, - time::Duration, + time::{Duration, Instant}, }; use crate::{ @@ -40,7 +40,7 @@ pub struct DhtStats { pub id: Id20, pub outstanding_requests: usize, pub seen_peers: usize, - pub outstanding_backoff_tasks: usize, + pub recent_requests: usize, pub routing_table_size: usize, } @@ -63,7 +63,7 @@ pub struct DhtState { inflight_by_transaction_id: DashMap<(u16, SocketAddr), OutstandingRequest>, // Current requests to addr being re-sent with backoff. - inflight_by_request: DashSet<(Request, SocketAddr)>, + recent_requests: DashMap<(Request, SocketAddr), Instant>, routing_table: RwLock, listen_addr: SocketAddr, @@ -92,7 +92,7 @@ impl DhtState { listen_addr, seen_peers: Default::default(), get_peers_subscribers: Default::default(), - inflight_by_request: Default::default(), + recent_requests: Default::default(), } } @@ -355,7 +355,7 @@ impl DhtState { id: self.id, outstanding_requests: self.inflight_by_transaction_id.len(), seen_peers: self.seen_peers.iter().map(|e| e.value().len()).sum(), - outstanding_backoff_tasks: self.inflight_by_request.len(), + recent_requests: self.recent_requests.len(), routing_table_size: self.routing_table.read().len(), } } @@ -421,59 +421,45 @@ impl DhtState { addr: SocketAddr, ) -> anyhow::Result<()> { let key = (request, addr); - if !self.inflight_by_request.insert(key) { - return Ok(()); + + use dashmap::mapref::entry::Entry; + match self.recent_requests.entry(key) { + Entry::Occupied(mut o) => { + if o.get().elapsed() < REQUERY_INTERVAL { + return Ok(()); + } + o.insert(Instant::now()); + } + Entry::Vacant(v) => { + v.insert(Instant::now()); + } } let this = self.clone(); let fut = async move { - let mut backoff = ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_secs(60)) - .with_multiplier(1.5) - .with_max_interval(Duration::from_secs(10 * 60)) - .with_max_elapsed_time(Some(Duration::from_secs(15 * 60))) - .build(); + this.routing_table + .write() + .mark_outgoing_request(&target_node); - loop { - this.routing_table - .write() - .mark_outgoing_request(&target_node); - - let resp = this.request(request, addr).await; - let sleep = match resp { - Ok(ResponseOrError::Response(response)) => { - match this.on_response(addr, request, response) { - Ok(()) => { - backoff.reset(); - Some(REQUERY_INTERVAL) - } - Err(e) => { - warn!("error in on_response: {:?}", e); - backoff.next_backoff() - } + let resp = this.request(request, addr).await; + match resp { + Ok(ResponseOrError::Response(response)) => { + match this.on_response(addr, request, response) { + Ok(()) => {} + Err(e) => { + warn!("error in on_response: {:?}", e); } } - Ok(ResponseOrError::Error(e)) => { - debug!("error response: {:?}", e); - backoff.next_backoff() - } - Err(e) => { - debug!("error: {:?}", e); - backoff.next_backoff() - } - }; - if let Some(sleep) = sleep { - tokio::time::sleep(sleep).await; - continue; } - - tokio::task::spawn(async move { - this.inflight_by_request.remove(&key); - }); - - return Ok(()); - } + Ok(ResponseOrError::Error(e)) => { + debug!("error response: {:?}", e); + } + Err(e) => { + debug!("error: {:?}", e); + } + }; + Ok(()) }; spawn(