Poor mans algo to resend requests

This commit is contained in:
Igor Katson 2023-11-27 19:03:39 +00:00
parent ab5ae527aa
commit 692fef1394
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -1,9 +1,9 @@
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::{hash_map::Entry, HashMap},
net::SocketAddr,
sync::Arc,
task::Poll,
time::Duration,
time::{Duration, Instant},
};
use crate::{
@ -42,7 +42,16 @@ pub struct DhtStats {
struct DhtState {
id: Id20,
next_transaction_id: u16,
outstanding_requests: HashMap<(u16, SocketAddr), Request>,
// Created requests: (transaction_id, addr) => Requests.
// If we get a response, it gets removed from here.
//
// TODO: clean up old entries
outstanding_requests_by_transaction_id: HashMap<(u16, SocketAddr), Request>,
// TODO: clean up old entries
made_requests_by_addr: HashMap<(Request, SocketAddr), Instant>,
routing_table: RoutingTable,
listen_addr: SocketAddr,
@ -55,8 +64,6 @@ struct DhtState {
seen_peers: HashMap<Id20, IndexSet<SocketAddr>>,
get_peers_subscribers: HashMap<Id20, tokio::sync::broadcast::Sender<SocketAddr>>,
made_requests: HashSet<(Request, SocketAddr)>,
}
impl DhtState {
@ -70,13 +77,13 @@ impl DhtState {
Self {
id,
next_transaction_id: 0,
outstanding_requests: Default::default(),
outstanding_requests_by_transaction_id: Default::default(),
routing_table,
sender,
listen_addr,
seen_peers: Default::default(),
get_peers_subscribers: Default::default(),
made_requests: Default::default(),
made_requests_by_addr: Default::default(),
}
}
@ -115,10 +122,11 @@ impl DhtState {
kind: MessageKind::PingRequest(PingRequest { id: self.id }),
},
};
self.outstanding_requests
self.outstanding_requests_by_transaction_id
.insert((transaction_id, addr), request);
message
}
fn on_incoming_from_remote(
&mut self,
msg: Message<ByteString>,
@ -153,7 +161,10 @@ impl DhtState {
)
}
let tid = ((msg.transaction_id[0] as u16) << 8) + (msg.transaction_id[1] as u16);
let request = match self.outstanding_requests.remove(&(tid, addr)) {
let request = match self
.outstanding_requests_by_transaction_id
.remove(&(tid, addr))
{
Some(req) => req,
None => anyhow::bail!("outstanding request not found. Message: {:?}", msg),
};
@ -249,9 +260,9 @@ impl DhtState {
pub fn get_stats(&self) -> DhtStats {
DhtStats {
id: self.id,
outstanding_requests: self.outstanding_requests.len(),
outstanding_requests: self.outstanding_requests_by_transaction_id.len(),
seen_peers: self.seen_peers.values().map(|v| v.len()).sum(),
made_requests: self.made_requests.len(),
made_requests: self.made_requests_by_addr.len(),
routing_table_size: self.routing_table.len(),
}
}
@ -299,6 +310,24 @@ impl DhtState {
}
}
fn should_request(&mut self, request: Request, addr: SocketAddr) -> bool {
const RE_REQUEST_TIME: Duration = Duration::from_secs(10 * 60);
match self.made_requests_by_addr.entry((request, addr)) {
Entry::Occupied(mut o) => {
if o.get().elapsed() > RE_REQUEST_TIME {
o.insert(Instant::now());
true
} else {
false
}
}
Entry::Vacant(v) => {
v.insert(Instant::now());
true
}
}
}
fn send_find_peers_if_not_yet(
&mut self,
info_hash: Id20,
@ -306,7 +335,7 @@ impl DhtState {
addr: SocketAddr,
) -> anyhow::Result<()> {
let request = Request::GetPeers(info_hash);
if self.made_requests.insert((request, addr)) {
if self.should_request(request, addr) {
self.routing_table.mark_outgoing_request(&target_node);
let msg = self.create_request(request, addr);
self.sender.send((msg, addr))?;
@ -321,7 +350,7 @@ impl DhtState {
addr: SocketAddr,
) -> anyhow::Result<()> {
let request = Request::FindNode(search_id);
if self.made_requests.insert((request, addr)) {
if self.should_request(request, addr) {
self.routing_table.mark_outgoing_request(&target_node);
let msg = self.create_request(request, addr);
self.sender.send((msg, addr))?;