Doesnt refresh properly

This commit is contained in:
Igor Katson 2023-11-30 13:30:11 +00:00
parent ebd0d818eb
commit 8d58a9f419
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 64 additions and 32 deletions

View file

@ -18,7 +18,11 @@
- [x] many nodes in "Unknown" status, do smth about it - [x] many nodes in "Unknown" status, do smth about it
- [x] for torrents with a few seeds might be cool to re-query DHT once in a while. - [x] for torrents with a few seeds might be cool to re-query DHT once in a while.
- [x] don't leak memory when deleting torrents (i.e. remove torrent information (seen peers etc) once the torrent is deleted) - [x] don't leak memory when deleting torrents (i.e. remove torrent information (seen peers etc) once the torrent is deleted)
- [ ] Buckets that have not been changed in 15 minutes should be "refreshed." (per RFC) - [ ] Routing table - is it balanced properly?
- [ ] Don't query Bad nodes
- [-] Buckets that have not been changed in 15 minutes should be "refreshed." (per RFC)
- [ ] Did it, but it's flawed: starts repeating the same queries again as neighboring refreshes
don't know about the other ones, and DHT returns the same nodes again and again.
- [x] it's sending many requests now way too fast, locks up Mac OS UI annoyingly - [x] it's sending many requests now way too fast, locks up Mac OS UI annoyingly
- [ ] After the search is exhausted, the client then inserts the peer contact information for itself onto the responding nodes with IDs closest to the infohash of the torrent. - [ ] After the search is exhausted, the client then inserts the peer contact information for itself onto the responding nodes with IDs closest to the infohash of the torrent.
- [x] Ensure that if we query the "returned" nodes, they are even closer to our request than the responding node id was. - [x] Ensure that if we query the "returned" nodes, they are even closer to our request than the responding node id was.

View file

@ -137,12 +137,14 @@ impl RecursiveRequestCallbacks for RecursiveRequestCallbacksFindNodes {
} }
struct RecursiveRequest<C: RecursiveRequestCallbacks> { struct RecursiveRequest<C: RecursiveRequestCallbacks> {
max_depth: usize,
useful_nodes_limit: usize,
info_hash: Id20, info_hash: Id20,
request: Request, request: Request,
dht: Arc<DhtState>, dht: Arc<DhtState>,
useful_nodes: RwLock<Vec<MaybeUsefulNode>>, useful_nodes: RwLock<Vec<MaybeUsefulNode>>,
peer_tx: tokio::sync::mpsc::UnboundedSender<SocketAddr>, peer_tx: tokio::sync::mpsc::UnboundedSender<SocketAddr>,
node_tx: tokio::sync::mpsc::UnboundedSender<(Option<Id20>, SocketAddr)>, node_tx: tokio::sync::mpsc::UnboundedSender<(Option<Id20>, SocketAddr, usize)>,
callbacks: C, callbacks: C,
} }
@ -156,7 +158,9 @@ impl RequestPeersStream {
let (peer_tx, peer_rx) = unbounded_channel(); let (peer_tx, peer_rx) = unbounded_channel();
let (node_tx, node_rx) = unbounded_channel(); let (node_tx, node_rx) = unbounded_channel();
let rp = Arc::new(RecursiveRequest { let rp = Arc::new(RecursiveRequest {
max_depth: 4,
info_hash, info_hash,
useful_nodes_limit: 256,
request: Request::GetPeers(info_hash), request: Request::GetPeers(info_hash),
dht, dht,
useful_nodes: RwLock::new(Vec::new()), useful_nodes: RwLock::new(Vec::new()),
@ -197,17 +201,19 @@ impl RecursiveRequest<RecursiveRequestCallbacksFindNodes> {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let (node_tx, mut node_rx) = unbounded_channel(); let (node_tx, mut node_rx) = unbounded_channel();
let req = RecursiveRequest { let req = RecursiveRequest {
max_depth: 4,
info_hash: target, info_hash: target,
request: Request::FindNode(target), request: Request::FindNode(target),
dht, dht,
useful_nodes_limit: 32,
useful_nodes: RwLock::new(Vec::new()), useful_nodes: RwLock::new(Vec::new()),
peer_tx: unbounded_channel().0, peer_tx: unbounded_channel().0,
node_tx, node_tx,
callbacks: RecursiveRequestCallbacksFindNodes {}, callbacks: RecursiveRequestCallbacksFindNodes {},
}; };
let request_one = |id, addr| { let request_one = |id, addr, depth| {
req.request_one(id, addr) req.request_one(id, addr, depth)
.map_err(|e| { .map_err(|e| {
debug!("error: {e:?}"); debug!("error: {e:?}");
e e
@ -223,7 +229,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksFindNodes> {
let mut initial_addrs = 0; let mut initial_addrs = 0;
for addr in addrs { for addr in addrs {
futs.push(request_one(None, addr)); futs.push(request_one(None, addr, 0));
initial_addrs += 1; initial_addrs += 1;
} }
@ -235,8 +241,8 @@ impl RecursiveRequest<RecursiveRequestCallbacksFindNodes> {
biased; biased;
r = node_rx.recv() => { r = node_rx.recv() => {
let (id, addr) = r.unwrap(); let (id, addr, depth) = r.unwrap();
futs.push(request_one(id, addr)) futs.push(request_one(id, addr, depth))
}, },
f = futs.next() => { f = futs.next() => {
let f = match f { let f = match f {
@ -267,7 +273,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksFindNodes> {
impl RecursiveRequest<RecursiveRequestCallbacksGetPeers> { impl RecursiveRequest<RecursiveRequestCallbacksGetPeers> {
fn request_peers_forever( fn request_peers_forever(
self: &Arc<Self>, self: &Arc<Self>,
mut node_rx: tokio::sync::mpsc::UnboundedReceiver<(Option<Id20>, SocketAddr)>, mut node_rx: tokio::sync::mpsc::UnboundedReceiver<(Option<Id20>, SocketAddr, usize)>,
) -> tokio::task::JoinHandle<()> { ) -> tokio::task::JoinHandle<()> {
let this = self.clone(); let this = self.clone();
spawn( spawn(
@ -300,9 +306,9 @@ impl RecursiveRequest<RecursiveRequestCallbacksGetPeers> {
loop { loop {
tokio::select! { tokio::select! {
addr = node_rx.recv() => { addr = node_rx.recv() => {
let (id, addr) = addr.unwrap(); let (id, addr, depth) = addr.unwrap();
futs.push( futs.push(
this.request_one(id, addr) this.request_one(id, addr, depth)
.map_err(|e| debug!("error: {e:?}")) .map_err(|e| debug!("error: {e:?}"))
.instrument(error_span!("addr", addr=addr.to_string())) .instrument(error_span!("addr", addr=addr.to_string()))
); );
@ -327,14 +333,19 @@ impl RecursiveRequest<RecursiveRequestCallbacksGetPeers> {
.take(8) .take(8)
{ {
count += 1; count += 1;
self.node_tx.send((Some(id), addr))?; self.node_tx.send((Some(id), addr, 0))?;
} }
Ok(count) Ok(count)
} }
} }
impl<C: RecursiveRequestCallbacks> RecursiveRequest<C> { impl<C: RecursiveRequestCallbacks> RecursiveRequest<C> {
async fn request_one(&self, id: Option<Id20>, addr: SocketAddr) -> anyhow::Result<()> { async fn request_one(
&self,
id: Option<Id20>,
addr: SocketAddr,
depth: usize,
) -> anyhow::Result<()> {
if let Some(id) = id { if let Some(id) = id {
self.callbacks.on_request_start(self, id, addr); self.callbacks.on_request_start(self, id, addr);
} }
@ -365,15 +376,17 @@ impl<C: RecursiveRequestCallbacks> RecursiveRequest<C> {
if let Some(nodes) = response.nodes { if let Some(nodes) = response.nodes {
for node in nodes.nodes { for node in nodes.nodes {
let addr = SocketAddr::V4(node.addr); let addr = SocketAddr::V4(node.addr);
let should_request = self.should_request_node(node.id, addr); let should_request = self.should_request_node(node.id, addr, depth);
trace!( trace!(
"should_request={}, id={:?}, addr={}", "should_request={}, id={:?}, addr={}, depth={}/{}",
should_request, should_request,
node.id, node.id,
addr addr,
depth,
self.max_depth
); );
if should_request { if should_request {
self.node_tx.send((Some(node.id), addr))?; self.node_tx.send((Some(node.id), addr, depth + 1))?;
} }
} }
} }
@ -412,7 +425,11 @@ impl<C: RecursiveRequestCallbacks> RecursiveRequest<C> {
.is_some() .is_some()
} }
fn should_request_node(&self, node_id: Id20, addr: SocketAddr) -> bool { fn should_request_node(&self, node_id: Id20, addr: SocketAddr, depth: usize) -> bool {
if depth >= self.max_depth {
return false;
}
let mut closest_nodes = self.useful_nodes.write(); let mut closest_nodes = self.useful_nodes.write();
// If recently requested, ignore // If recently requested, ignore
@ -433,7 +450,6 @@ impl<C: RecursiveRequestCallbacks> RecursiveRequest<C> {
errors_in_a_row: 0, errors_in_a_row: 0,
}); });
const LIMIT: usize = 256;
closest_nodes.sort_by_key(|n| { closest_nodes.sort_by_key(|n| {
let has_returned_peers_desc = Reverse(n.returned_peers); let has_returned_peers_desc = Reverse(n.returned_peers);
let has_responded_desc = Reverse(n.last_response.is_some() as u8); let has_responded_desc = Reverse(n.last_response.is_some() as u8);
@ -449,7 +465,7 @@ impl<C: RecursiveRequestCallbacks> RecursiveRequest<C> {
freshest_response, freshest_response,
) )
}); });
if closest_nodes.len() > LIMIT { if closest_nodes.len() > self.useful_nodes_limit {
let popped = closest_nodes.pop().unwrap(); let popped = closest_nodes.pop().unwrap();
if popped.id == node_id { if popped.id == node_id {
return false; return false;
@ -506,7 +522,7 @@ impl DhtState {
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
self.inflight_by_transaction_id self.inflight_by_transaction_id
.insert(key, OutstandingRequest { done: tx }); .insert(key, OutstandingRequest { done: tx });
trace!("sending to {addr}, {message:?}"); trace!("sending {message:?}");
match self.worker_sender.send(WorkerSendRequest { match self.worker_sender.send(WorkerSendRequest {
our_tid: Some(tid), our_tid: Some(tid),
message, message,
@ -827,7 +843,7 @@ impl DhtWorker {
futs.push( futs.push(
RecursiveRequest::find_node_for_routing_table( RecursiveRequest::find_node_for_routing_table(
self.dht.clone(), random_id, addrs.into_iter() self.dht.clone(), random_id, addrs.into_iter()
).instrument(error_span!("refresh_bucket", random_id=format!("{:?}", random_id))) ).instrument(error_span!("refresh_bucket"))
); );
}, },
_ = futs.next(), if !futs.is_empty() => {}, _ = futs.next(), if !futs.is_empty() => {},

View file

@ -15,11 +15,11 @@ pub use persistence::{PersistentDht, PersistentDhtConfig};
pub type Dht = Arc<DhtState>; pub type Dht = Arc<DhtState>;
// How long do we wait for a response from a DHT node. // How long do we wait for a response from a DHT node.
pub(crate) const RESPONSE_TIMEOUT: Duration = Duration::from_secs(60); pub(crate) const RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
// TODO: Not sure if we should re-query tbh. // TODO: Not sure if we should re-query tbh.
pub(crate) const REQUERY_INTERVAL: Duration = Duration::from_secs(60); pub(crate) const REQUERY_INTERVAL: Duration = Duration::from_secs(300);
// After how long should we ping the node again. // After how long should we ping the node again.
pub(crate) const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(15 * 60); pub(crate) const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(2 * 60);
pub struct DhtBuilder {} pub struct DhtBuilder {}

View file

@ -541,26 +541,29 @@ impl RoutingTableNode {
} }
pub fn status(&self) -> NodeStatus { pub fn status(&self) -> NodeStatus {
match (self.last_request, self.last_response, self.last_query) { match (self.last_request, self.last_response, self.last_query) {
(None, _, _) => NodeStatus::Unknown,
// Nodes become bad when they fail to respond to multiple queries in a row. // Nodes become bad when they fail to respond to multiple queries in a row.
(Some(_), _, _) if self.errors_in_a_row >= 2 => NodeStatus::Bad, (Some(_), _, _) if self.errors_in_a_row >= 2 => NodeStatus::Bad,
// A good node is a node has responded to one of our queries within the last 15 minutes. // A good node is a node has responded to one of our queries within the last 15 minutes.
// A node is also good if it has ever responded to one of our queries and has sent // A node is also good if it has ever responded to one of our queries and has sent
// us a query within the last 15 minutes. // us a query within the last 15 minutes.
(Some(_), Some(last_activity), _) | (Some(_), Some(_), Some(last_activity)) (Some(_), Some(last_incoming), _) | (Some(_), Some(_), Some(last_incoming))
if last_activity.elapsed() < INACTIVITY_TIMEOUT => if last_incoming.elapsed() < INACTIVITY_TIMEOUT =>
{ {
NodeStatus::Good NodeStatus::Good
} }
// After 15 minutes of inactivity, a node becomes questionable // After 15 minutes of inactivity, a node becomes questionable.
(_, _, Some(last_activity)) | (_, Some(last_activity), _) // The moment we send a request to it, it stops becoming questionable and becomes Unknown / Bad.
if last_activity.elapsed() > INACTIVITY_TIMEOUT => (last_outgoing, _, Some(last_incoming)) | (last_outgoing, Some(last_incoming), _)
if last_incoming.elapsed() > INACTIVITY_TIMEOUT
&& last_outgoing
.map(|e| e.elapsed() > INACTIVITY_TIMEOUT)
.unwrap_or(true) =>
{ {
NodeStatus::Questionable NodeStatus::Questionable
} }
(Some(_), _, _) => NodeStatus::Unknown, _ => NodeStatus::Unknown,
} }
} }
@ -613,7 +616,16 @@ impl RoutingTable {
for node in self.buckets.iter() { for node in self.buckets.iter() {
result.push(node); result.push(node);
} }
result.sort_by_key(|n| id.distance(&n.id)); result.sort_by_key(|n| {
// Query decent nodes first.
let status = match n.status() {
NodeStatus::Good => 0,
NodeStatus::Questionable => 0,
NodeStatus::Unknown => 2,
NodeStatus::Bad => 3,
};
(status, id.distance(&n.id))
});
result result
} }