From 7a52721af9e27358b3f5dc1f456e0631cf3b7cb7 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 28 Nov 2024 10:32:58 +0100 Subject: [PATCH] Refactor for readability --- crates/librqbit/src/torrent_state/live/mod.rs | 117 ++++++++---------- 1 file changed, 55 insertions(+), 62 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index b98d3e7..ca2922a 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -841,91 +841,84 @@ impl TorrentStateLive { async fn task_send_pex_to_peer( self: Arc, - peer_addr: SocketAddr, + _peer_addr: SocketAddr, tx: PeerTx, ) -> anyhow::Result<()> { - let mut sent_peers_live: HashSet = HashSet::new(); // As per BEP 11 we should not send more than 50 peers at once // (here it also applies to fist message, should be OK as we anyhow really have more) const MAX_SENT_PEERS: usize = 50; // As per BEP 11 recommended interval is min 60 seconds const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); + + let mut live_peers = HashSet::new(); + let mut connected = Vec::with_capacity(MAX_SENT_PEERS); + let mut dropped = Vec::with_capacity(MAX_SENT_PEERS); + let mut peer_view_of_live_peers = HashSet::new(); + // Wait 10 seconds before sending the first message to assure that peer will stay with us - let mut delay = Duration::from_secs(10); + tokio::time::sleep(Duration::from_secs(10)).await; + + let mut interval = tokio::time::interval(PEX_MESSAGE_INTERVAL); loop { - tokio::select! { - _ = tx.closed() => return Ok(()), - _ = tokio::time::sleep(delay) => {}, + interval.tick().await; + // TODO: store them in a shared place + // Fill in live_peers + for ps in self.peers.states.iter() { + let peer = ps.value(); + let addr = *peer.outgoing_address.as_ref().unwrap_or_else(|| ps.key()); + + // As per BEP 11 share only those we were able to connect + let has_outgoing_connections = peer + .stats + .counters + .outgoing_connections + .load(Ordering::Relaxed) + > 0; + + let is_live = has_outgoing_connections && ps.value().state.is_live(); + if is_live { + live_peers.insert(addr); + } else { + live_peers.remove(&addr); + } } - delay = PEX_MESSAGE_INTERVAL; - let addrs_live_to_sent = self - .peers - .states - .iter() - .filter_map(|e| { - let peer = e.value(); - let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key()); + connected.clear(); + dropped.clear(); - if *addr != peer_addr { - let has_outgoing_connections = peer - .stats - .counters - .outgoing_connections - .load(Ordering::Relaxed) - > 0; // As per BEP 11 share only those we were able to connect - if peer.state.is_live() - && has_outgoing_connections - && !sent_peers_live.contains(addr) - { - Some(*addr) - } else { - None - } - } else { - None - } - }) - .take(MAX_SENT_PEERS) - .collect::>(); - - let addrs_closed_to_sent = sent_peers_live - .iter() - .filter(|addr| { - self.peers - .states - .get(addr) - .map(|p| !p.value().state.is_live()) - .unwrap_or(true) - }) - .copied() - .take(MAX_SENT_PEERS) - .collect::>(); + connected.extend( + live_peers + .difference(&peer_view_of_live_peers) + .take(MAX_SENT_PEERS) + .copied(), + ); + dropped.extend( + peer_view_of_live_peers + .difference(&live_peers) + .take(MAX_SENT_PEERS) + .copied(), + ); // BEP 11 - Dont send closed if they are now in live // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, // and addrs_closed_to_sent are only filtered addresses from sent_peers_live - if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() { - debug!( - "sending PEX with {} live ({:?})and {} closed peers", - addrs_live_to_sent.len(), - addrs_live_to_sent, - addrs_closed_to_sent.len() - ); - let pex_msg = - extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent); + if !connected.is_empty() || !dropped.is_empty() { + let pex_msg = extended::ut_pex::UtPex::from_addrs(&connected, &dropped); let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); - let msg = Message::Extended(ext_msg); - - if tx.send(WriterRequest::Message(msg)).is_err() { + if tx + .send(WriterRequest::Message(Message::Extended(ext_msg))) + .is_err() + { return Ok(()); // Peer disconnected } - sent_peers_live.extend(&addrs_live_to_sent); - sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr)); + for addr in &dropped { + peer_view_of_live_peers.remove(addr); + } + peer_view_of_live_peers.extend(connected.iter().copied()); } } }