wtf... its getting worse. Lets see if we can simplify it a lot

This commit is contained in:
Igor Katson 2023-11-29 14:48:22 +00:00
parent 6518dc6eff
commit 826d1b8f1d
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -80,6 +80,13 @@ fn make_rate_limiter() -> RateLimiter {
.build()
}
struct InfoHashMeta {
seen_peers: IndexSet<SocketAddr>,
subscriber: tokio::sync::broadcast::Sender<SocketAddr>,
closest_responding_nodes: Vec<MaybeUsefulNode>,
join_handle: tokio::task::JoinHandle<()>,
}
pub struct DhtState {
id: Id20,
next_transaction_id: AtomicU16,
@ -98,10 +105,8 @@ pub struct DhtState {
rate_limiter: RateLimiter,
sender: UnboundedSender<WorkerSendRequest>,
seen_peers: DashMap<Id20, IndexSet<SocketAddr>>,
closest_responding_nodes_for_info_hash: DashMap<Id20, Vec<MaybeUsefulNode>>,
get_peers_subscribers: DashMap<Id20, tokio::sync::broadcast::Sender<SocketAddr>>,
// Per-torrent stats.
info_hash_meta: DashMap<Id20, InfoHashMeta>,
}
impl DhtState {
@ -119,10 +124,8 @@ impl DhtState {
routing_table: RwLock::new(routing_table),
sender,
listen_addr,
seen_peers: Default::default(),
get_peers_subscribers: Default::default(),
rate_limiter: make_rate_limiter(),
closest_responding_nodes_for_info_hash: Default::default(),
info_hash_meta: Default::default(),
recent_requests: Default::default(),
}
}
@ -318,8 +321,8 @@ impl DhtState {
Ok(())
}
MessageKind::GetPeersRequest(req) => {
let peers = self.seen_peers.get(&req.info_hash).map(|peers| {
peers
let peers = self.info_hash_meta.get(&req.info_hash).map(|meta| {
meta.seen_peers
.iter()
.copied()
.filter_map(|a| match a {
@ -384,7 +387,11 @@ impl DhtState {
DhtStats {
id: self.id,
outstanding_requests: self.inflight_by_transaction_id.len(),
seen_peers: self.seen_peers.iter().map(|e| e.value().len()).sum(),
seen_peers: self
.info_hash_meta
.iter()
.map(|e| e.value().seen_peers.len())
.sum(),
recent_requests: self.recent_requests.len(),
routing_table_size: self.routing_table.read().len(),
}
@ -399,36 +406,35 @@ impl DhtState {
tokio::sync::broadcast::Receiver<SocketAddr>,
)> {
use dashmap::mapref::entry::Entry;
match self.get_peers_subscribers.entry(info_hash) {
match self.info_hash_meta.entry(info_hash) {
Entry::Occupied(o) => {
let pos = self.seen_peers.get(&info_hash).and_then(|p| {
if p.is_empty() {
None
} else {
Some((0, p.len()))
}
});
let rx = o.get().subscribe();
let seen_peers = &o.get().seen_peers;
let pos = if seen_peers.is_empty() {
None
} else {
Some((0, seen_peers.len()))
};
let rx = o.get().subscriber.subscribe();
Ok((pos, rx))
}
Entry::Vacant(v) => {
// DHT sends peers REALLY fast, so ideally the consumer of this broadcast should not lag behind.
// In case it does though we have PeerStream to replay.
let (tx, rx) = tokio::sync::broadcast::channel(100);
v.insert(tx);
let this = self.clone();
spawn(
let join_handle = spawn(
error_span!("peers_requester", info_hash = format!("{:?}", info_hash)),
async move {
let mut iteration = 0usize;
loop {
if !this.get_peers_subscribers.contains_key(&info_hash) {
debug!("no more subscribers, closing peers_requester");
return Ok(());
}
let meta = match this.info_hash_meta.get(&info_hash) {
Some(meta) => meta,
None => {
debug!("no more subscribers, closing peers_requester");
return Ok(());
}
};
trace!("iteration {iteration}");
// We don't need to allocate/collect here, but the borrow checker is not happy otherwise.
let nodes_to_query = this
.routing_table
.read()
@ -440,19 +446,26 @@ impl DhtState {
for (id, addr) in nodes_to_query {
this.send_find_peers_if_not_yet(info_hash, id, addr)?;
}
if let Some(e) =
this.closest_responding_nodes_for_info_hash.get(&info_hash)
for MaybeUsefulNode { id, addr, .. } in
meta.closest_responding_nodes.iter()
{
for MaybeUsefulNode { id, addr, .. } in e.value().iter() {
this.send_find_peers_if_not_yet(info_hash, *id, *addr)?;
}
this.send_find_peers_if_not_yet(info_hash, *id, *addr)?;
}
drop(meta);
tokio::time::sleep(REQUERY_INTERVAL).await;
iteration += 1;
}
},
);
let (tx, rx) = tokio::sync::broadcast::channel(100);
v.insert(InfoHashMeta {
seen_peers: Default::default(),
subscriber: tx,
closest_responding_nodes: Default::default(),
join_handle,
});
Ok((None, rx))
}
}
@ -559,10 +572,8 @@ impl DhtState {
target: Id20,
nodes: CompactNodeInfo,
) -> anyhow::Result<()> {
// We don't need to allocate/collect here, but the borrow checker is not happy
// otherwise when we iterate self.searching_for_peers and mutating self in the loop.
let searching_for_peers = self
.get_peers_subscribers
.info_hash_meta
.iter()
.map(|e| *e.key())
.collect::<Vec<_>>();
@ -596,41 +607,29 @@ impl DhtState {
info_hash: Id20,
node_id: Id20,
addr: SocketAddr,
closest_nodes: &mut Vec<MaybeUsefulNode>,
) -> bool {
use dashmap::mapref::entry::Entry;
let n = MaybeUsefulNode {
closest_nodes.push(MaybeUsefulNode {
id: node_id,
addr,
last_response: None,
returned_peers: false,
};
match self.closest_responding_nodes_for_info_hash.entry(info_hash) {
Entry::Occupied(mut occ) => {
// How many nodes to query per torrent.
const LIMIT: usize = 256;
let v = occ.get_mut();
v.push(n);
v.sort_by_key(|n| {
let has_returned_peers_desc = Reverse(n.returned_peers);
let has_responded_desc = Reverse(n.last_response.is_some() as u8);
let distance = n.id.distance(&info_hash);
(has_returned_peers_desc, has_responded_desc, distance)
});
if v.len() > LIMIT {
let popped = v.pop().unwrap();
if popped.id == node_id {
return false;
}
}
});
true
}
Entry::Vacant(v) => {
v.insert(vec![n]);
true
const LIMIT: usize = 256;
closest_nodes.sort_by_key(|n| {
let has_returned_peers_desc = Reverse(n.returned_peers);
let has_responded_desc = Reverse(n.last_response.is_some() as u8);
let distance = n.id.distance(&info_hash);
(has_returned_peers_desc, has_responded_desc, distance)
});
if closest_nodes.len() > LIMIT {
let popped = closest_nodes.pop().unwrap();
if popped.id == node_id {
return false;
}
}
true
}
fn on_found_peers_or_nodes(
@ -643,72 +642,54 @@ impl DhtState {
self.routing_table_add_node(source, source_addr);
use dashmap::mapref::entry::Entry;
let bsender = match self.get_peers_subscribers.entry(info_hash) {
let mut meta = match self.info_hash_meta.entry(info_hash) {
Entry::Occupied(o) => o,
Entry::Vacant(_) => {
warn!(
"ignoring get_peers response, no subscribers for {:?}",
"ignoring found_peers response, no subscribers for {:?}",
info_hash
);
return Ok(());
}
};
let meta_mut = meta.get_mut();
{
let n = MaybeUsefulNode {
id: source,
addr: source_addr,
last_response: Some(Instant::now()),
returned_peers: data.values.as_ref().map(|p| !p.is_empty()).unwrap_or(false),
};
match self.closest_responding_nodes_for_info_hash.entry(info_hash) {
Entry::Occupied(mut useful_nodes) => {
if let Some(useful_node) = useful_nodes
.get_mut()
.iter_mut()
.find(|n| n.id == source && n.addr == source_addr)
{
useful_node.last_response = Some(Instant::now());
} else {
useful_nodes.get_mut().push(n);
}
}
Entry::Vacant(v) => {
v.insert(vec![n]);
}
};
let now = Some(Instant::now());
let returned_peers = data.values.as_ref().map(|p| !p.is_empty()).unwrap_or(false);
if let Some(existing_useful_node) = meta_mut
.closest_responding_nodes
.iter_mut()
.find(|n| n.id == source && n.addr == source_addr)
{
existing_useful_node.last_response = now;
existing_useful_node.returned_peers |= returned_peers;
} else {
meta_mut.closest_responding_nodes.push(MaybeUsefulNode {
id: source,
addr: source_addr,
last_response: now,
returned_peers,
});
}
}
if let Some(peers) = data.values {
let mut seen = self.seen_peers.entry(info_hash).or_default();
for peer in peers.iter() {
if peer.addr.port() < 1024 {
debug!("bad peer port, ignoring: {}", peer.addr);
continue;
}
let addr = SocketAddr::V4(peer.addr);
if seen.insert(addr) {
match bsender.get().send(addr) {
if meta_mut.seen_peers.insert(addr) {
match meta_mut.subscriber.send(addr) {
Ok(_) => {}
Err(_) => {
debug!("no more subscribers for {:?}, cleaning up", info_hash);
// bsender.remove();
// let this = self.clone();
// spawn(
// error_span!("cleanup", info_hash = format!("{info_hash:?}")),
// async move {
// tokio::time::sleep(Duration::from_secs(10)).await;
// if !this.get_peers_subscribers.contains_key(&info_hash) {
// debug!("no more subscribers for {:?}, removed it from seen peers", info_hash);
// this.seen_peers.remove(&info_hash);
// this.closest_responding_nodes_for_info_hash
// .remove(&info_hash);
// }
// Ok(())
// },
// );
meta_mut.join_handle.abort();
meta.remove();
return Ok(());
}
}
@ -721,6 +702,7 @@ impl DhtState {
info_hash,
node.id,
node.addr.into(),
&mut meta_mut.closest_responding_nodes,
) {
self.routing_table_add_node(node.id, node.addr.into());
self.send_find_peers_if_not_yet(info_hash, node.id, node.addr.into())?;
@ -984,13 +966,15 @@ impl Stream for PeerStream {
) -> Poll<Option<Self::Item>> {
loop {
if let Some((pos, end)) = self.initial_peers_pos.take() {
let addr = *self
let addr = match self
.state
.seen_peers
.info_hash_meta
.get(&self.info_hash)
.unwrap()
.get_index(pos)
.unwrap();
.and_then(|meta| meta.seen_peers.get_index(pos).copied())
{
Some(addr) => addr,
None => return Poll::Ready(None),
};
if pos + 1 < end {
self.initial_peers_pos = Some((pos + 1, end));
}