diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 0456810..d458650 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -80,6 +80,13 @@ fn make_rate_limiter() -> RateLimiter { .build() } +struct InfoHashMeta { + seen_peers: IndexSet, + subscriber: tokio::sync::broadcast::Sender, + closest_responding_nodes: Vec, + join_handle: tokio::task::JoinHandle<()>, +} + pub struct DhtState { id: Id20, next_transaction_id: AtomicU16, @@ -98,10 +105,8 @@ pub struct DhtState { rate_limiter: RateLimiter, sender: UnboundedSender, - seen_peers: DashMap>, - - closest_responding_nodes_for_info_hash: DashMap>, - get_peers_subscribers: DashMap>, + // Per-torrent stats. + info_hash_meta: DashMap, } impl DhtState { @@ -119,10 +124,8 @@ impl DhtState { routing_table: RwLock::new(routing_table), sender, listen_addr, - seen_peers: Default::default(), - get_peers_subscribers: Default::default(), rate_limiter: make_rate_limiter(), - closest_responding_nodes_for_info_hash: Default::default(), + info_hash_meta: Default::default(), recent_requests: Default::default(), } } @@ -318,8 +321,8 @@ impl DhtState { Ok(()) } MessageKind::GetPeersRequest(req) => { - let peers = self.seen_peers.get(&req.info_hash).map(|peers| { - peers + let peers = self.info_hash_meta.get(&req.info_hash).map(|meta| { + meta.seen_peers .iter() .copied() .filter_map(|a| match a { @@ -384,7 +387,11 @@ impl DhtState { DhtStats { id: self.id, outstanding_requests: self.inflight_by_transaction_id.len(), - seen_peers: self.seen_peers.iter().map(|e| e.value().len()).sum(), + seen_peers: self + .info_hash_meta + .iter() + .map(|e| e.value().seen_peers.len()) + .sum(), recent_requests: self.recent_requests.len(), routing_table_size: self.routing_table.read().len(), } @@ -399,36 +406,35 @@ impl DhtState { tokio::sync::broadcast::Receiver, )> { use dashmap::mapref::entry::Entry; - match self.get_peers_subscribers.entry(info_hash) { + match self.info_hash_meta.entry(info_hash) { Entry::Occupied(o) => { - let pos = self.seen_peers.get(&info_hash).and_then(|p| { - if p.is_empty() { - None - } else { - Some((0, p.len())) - } - }); - let rx = o.get().subscribe(); + let seen_peers = &o.get().seen_peers; + let pos = if seen_peers.is_empty() { + None + } else { + Some((0, seen_peers.len())) + }; + let rx = o.get().subscriber.subscribe(); Ok((pos, rx)) } Entry::Vacant(v) => { // DHT sends peers REALLY fast, so ideally the consumer of this broadcast should not lag behind. // In case it does though we have PeerStream to replay. - let (tx, rx) = tokio::sync::broadcast::channel(100); - v.insert(tx); let this = self.clone(); - spawn( + let join_handle = spawn( error_span!("peers_requester", info_hash = format!("{:?}", info_hash)), async move { let mut iteration = 0usize; loop { - if !this.get_peers_subscribers.contains_key(&info_hash) { - debug!("no more subscribers, closing peers_requester"); - return Ok(()); - } + let meta = match this.info_hash_meta.get(&info_hash) { + Some(meta) => meta, + None => { + debug!("no more subscribers, closing peers_requester"); + return Ok(()); + } + }; trace!("iteration {iteration}"); - // We don't need to allocate/collect here, but the borrow checker is not happy otherwise. let nodes_to_query = this .routing_table .read() @@ -440,19 +446,26 @@ impl DhtState { for (id, addr) in nodes_to_query { this.send_find_peers_if_not_yet(info_hash, id, addr)?; } - if let Some(e) = - this.closest_responding_nodes_for_info_hash.get(&info_hash) + for MaybeUsefulNode { id, addr, .. } in + meta.closest_responding_nodes.iter() { - for MaybeUsefulNode { id, addr, .. } in e.value().iter() { - this.send_find_peers_if_not_yet(info_hash, *id, *addr)?; - } + this.send_find_peers_if_not_yet(info_hash, *id, *addr)?; } + drop(meta); tokio::time::sleep(REQUERY_INTERVAL).await; iteration += 1; } }, ); + let (tx, rx) = tokio::sync::broadcast::channel(100); + v.insert(InfoHashMeta { + seen_peers: Default::default(), + subscriber: tx, + closest_responding_nodes: Default::default(), + join_handle, + }); + Ok((None, rx)) } } @@ -559,10 +572,8 @@ impl DhtState { target: Id20, nodes: CompactNodeInfo, ) -> anyhow::Result<()> { - // We don't need to allocate/collect here, but the borrow checker is not happy - // otherwise when we iterate self.searching_for_peers and mutating self in the loop. let searching_for_peers = self - .get_peers_subscribers + .info_hash_meta .iter() .map(|e| *e.key()) .collect::>(); @@ -596,41 +607,29 @@ impl DhtState { info_hash: Id20, node_id: Id20, addr: SocketAddr, + closest_nodes: &mut Vec, ) -> bool { - use dashmap::mapref::entry::Entry; - let n = MaybeUsefulNode { + closest_nodes.push(MaybeUsefulNode { id: node_id, addr, last_response: None, - returned_peers: false, - }; - match self.closest_responding_nodes_for_info_hash.entry(info_hash) { - Entry::Occupied(mut occ) => { - // How many nodes to query per torrent. - const LIMIT: usize = 256; - let v = occ.get_mut(); - v.push(n); - v.sort_by_key(|n| { - let has_returned_peers_desc = Reverse(n.returned_peers); - let has_responded_desc = Reverse(n.last_response.is_some() as u8); - let distance = n.id.distance(&info_hash); - (has_returned_peers_desc, has_responded_desc, distance) - }); - if v.len() > LIMIT { - let popped = v.pop().unwrap(); - if popped.id == node_id { - return false; - } - } + }); - true - } - Entry::Vacant(v) => { - v.insert(vec![n]); - true + const LIMIT: usize = 256; + closest_nodes.sort_by_key(|n| { + let has_returned_peers_desc = Reverse(n.returned_peers); + let has_responded_desc = Reverse(n.last_response.is_some() as u8); + let distance = n.id.distance(&info_hash); + (has_returned_peers_desc, has_responded_desc, distance) + }); + if closest_nodes.len() > LIMIT { + let popped = closest_nodes.pop().unwrap(); + if popped.id == node_id { + return false; } } + true } fn on_found_peers_or_nodes( @@ -643,72 +642,54 @@ impl DhtState { self.routing_table_add_node(source, source_addr); use dashmap::mapref::entry::Entry; - let bsender = match self.get_peers_subscribers.entry(info_hash) { + let mut meta = match self.info_hash_meta.entry(info_hash) { Entry::Occupied(o) => o, Entry::Vacant(_) => { warn!( - "ignoring get_peers response, no subscribers for {:?}", + "ignoring found_peers response, no subscribers for {:?}", info_hash ); return Ok(()); } }; + let meta_mut = meta.get_mut(); + { - let n = MaybeUsefulNode { - id: source, - addr: source_addr, - last_response: Some(Instant::now()), - returned_peers: data.values.as_ref().map(|p| !p.is_empty()).unwrap_or(false), - }; - match self.closest_responding_nodes_for_info_hash.entry(info_hash) { - Entry::Occupied(mut useful_nodes) => { - if let Some(useful_node) = useful_nodes - .get_mut() - .iter_mut() - .find(|n| n.id == source && n.addr == source_addr) - { - useful_node.last_response = Some(Instant::now()); - } else { - useful_nodes.get_mut().push(n); - } - } - Entry::Vacant(v) => { - v.insert(vec![n]); - } - }; + let now = Some(Instant::now()); + let returned_peers = data.values.as_ref().map(|p| !p.is_empty()).unwrap_or(false); + + if let Some(existing_useful_node) = meta_mut + .closest_responding_nodes + .iter_mut() + .find(|n| n.id == source && n.addr == source_addr) + { + existing_useful_node.last_response = now; + existing_useful_node.returned_peers |= returned_peers; + } else { + meta_mut.closest_responding_nodes.push(MaybeUsefulNode { + id: source, + addr: source_addr, + last_response: now, + returned_peers, + }); + } } if let Some(peers) = data.values { - let mut seen = self.seen_peers.entry(info_hash).or_default(); - for peer in peers.iter() { if peer.addr.port() < 1024 { debug!("bad peer port, ignoring: {}", peer.addr); continue; } let addr = SocketAddr::V4(peer.addr); - if seen.insert(addr) { - match bsender.get().send(addr) { + if meta_mut.seen_peers.insert(addr) { + match meta_mut.subscriber.send(addr) { Ok(_) => {} Err(_) => { debug!("no more subscribers for {:?}, cleaning up", info_hash); - // bsender.remove(); - - // let this = self.clone(); - // spawn( - // error_span!("cleanup", info_hash = format!("{info_hash:?}")), - // async move { - // tokio::time::sleep(Duration::from_secs(10)).await; - // if !this.get_peers_subscribers.contains_key(&info_hash) { - // debug!("no more subscribers for {:?}, removed it from seen peers", info_hash); - // this.seen_peers.remove(&info_hash); - // this.closest_responding_nodes_for_info_hash - // .remove(&info_hash); - // } - // Ok(()) - // }, - // ); + meta_mut.join_handle.abort(); + meta.remove(); return Ok(()); } } @@ -721,6 +702,7 @@ impl DhtState { info_hash, node.id, node.addr.into(), + &mut meta_mut.closest_responding_nodes, ) { self.routing_table_add_node(node.id, node.addr.into()); self.send_find_peers_if_not_yet(info_hash, node.id, node.addr.into())?; @@ -984,13 +966,15 @@ impl Stream for PeerStream { ) -> Poll> { loop { if let Some((pos, end)) = self.initial_peers_pos.take() { - let addr = *self + let addr = match self .state - .seen_peers + .info_hash_meta .get(&self.info_hash) - .unwrap() - .get_index(pos) - .unwrap(); + .and_then(|meta| meta.seen_peers.get_index(pos).copied()) + { + Some(addr) => addr, + None => return Poll::Ready(None), + }; if pos + 1 < end { self.initial_peers_pos = Some((pos + 1, end)); }