Refactor for readability
This commit is contained in:
parent
c8cd17ce8e
commit
7a52721af9
1 changed files with 55 additions and 62 deletions
|
|
@ -841,91 +841,84 @@ impl TorrentStateLive {
|
|||
|
||||
async fn task_send_pex_to_peer(
|
||||
self: Arc<Self>,
|
||||
peer_addr: SocketAddr,
|
||||
_peer_addr: SocketAddr,
|
||||
tx: PeerTx,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut sent_peers_live: HashSet<SocketAddr> = HashSet::new();
|
||||
// As per BEP 11 we should not send more than 50 peers at once
|
||||
// (here it also applies to fist message, should be OK as we anyhow really have more)
|
||||
const MAX_SENT_PEERS: usize = 50;
|
||||
// As per BEP 11 recommended interval is min 60 seconds
|
||||
const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60);
|
||||
|
||||
let mut live_peers = HashSet::new();
|
||||
let mut connected = Vec::with_capacity(MAX_SENT_PEERS);
|
||||
let mut dropped = Vec::with_capacity(MAX_SENT_PEERS);
|
||||
let mut peer_view_of_live_peers = HashSet::new();
|
||||
|
||||
// Wait 10 seconds before sending the first message to assure that peer will stay with us
|
||||
let mut delay = Duration::from_secs(10);
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
|
||||
let mut interval = tokio::time::interval(PEX_MESSAGE_INTERVAL);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tx.closed() => return Ok(()),
|
||||
_ = tokio::time::sleep(delay) => {},
|
||||
interval.tick().await;
|
||||
|
||||
// TODO: store them in a shared place
|
||||
// Fill in live_peers
|
||||
for ps in self.peers.states.iter() {
|
||||
let peer = ps.value();
|
||||
let addr = *peer.outgoing_address.as_ref().unwrap_or_else(|| ps.key());
|
||||
|
||||
// As per BEP 11 share only those we were able to connect
|
||||
let has_outgoing_connections = peer
|
||||
.stats
|
||||
.counters
|
||||
.outgoing_connections
|
||||
.load(Ordering::Relaxed)
|
||||
> 0;
|
||||
|
||||
let is_live = has_outgoing_connections && ps.value().state.is_live();
|
||||
if is_live {
|
||||
live_peers.insert(addr);
|
||||
} else {
|
||||
live_peers.remove(&addr);
|
||||
}
|
||||
}
|
||||
delay = PEX_MESSAGE_INTERVAL;
|
||||
|
||||
let addrs_live_to_sent = self
|
||||
.peers
|
||||
.states
|
||||
.iter()
|
||||
.filter_map(|e| {
|
||||
let peer = e.value();
|
||||
let addr = peer.outgoing_address.as_ref().unwrap_or_else(|| e.key());
|
||||
connected.clear();
|
||||
dropped.clear();
|
||||
|
||||
if *addr != peer_addr {
|
||||
let has_outgoing_connections = peer
|
||||
.stats
|
||||
.counters
|
||||
.outgoing_connections
|
||||
.load(Ordering::Relaxed)
|
||||
> 0; // As per BEP 11 share only those we were able to connect
|
||||
if peer.state.is_live()
|
||||
&& has_outgoing_connections
|
||||
&& !sent_peers_live.contains(addr)
|
||||
{
|
||||
Some(*addr)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.take(MAX_SENT_PEERS)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let addrs_closed_to_sent = sent_peers_live
|
||||
.iter()
|
||||
.filter(|addr| {
|
||||
self.peers
|
||||
.states
|
||||
.get(addr)
|
||||
.map(|p| !p.value().state.is_live())
|
||||
.unwrap_or(true)
|
||||
})
|
||||
.copied()
|
||||
.take(MAX_SENT_PEERS)
|
||||
.collect::<HashSet<_>>();
|
||||
connected.extend(
|
||||
live_peers
|
||||
.difference(&peer_view_of_live_peers)
|
||||
.take(MAX_SENT_PEERS)
|
||||
.copied(),
|
||||
);
|
||||
dropped.extend(
|
||||
peer_view_of_live_peers
|
||||
.difference(&live_peers)
|
||||
.take(MAX_SENT_PEERS)
|
||||
.copied(),
|
||||
);
|
||||
|
||||
// BEP 11 - Dont send closed if they are now in live
|
||||
// it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent,
|
||||
// and addrs_closed_to_sent are only filtered addresses from sent_peers_live
|
||||
|
||||
if !addrs_live_to_sent.is_empty() || !addrs_closed_to_sent.is_empty() {
|
||||
debug!(
|
||||
"sending PEX with {} live ({:?})and {} closed peers",
|
||||
addrs_live_to_sent.len(),
|
||||
addrs_live_to_sent,
|
||||
addrs_closed_to_sent.len()
|
||||
);
|
||||
let pex_msg =
|
||||
extended::ut_pex::UtPex::from_addrs(&addrs_live_to_sent, &addrs_closed_to_sent);
|
||||
if !connected.is_empty() || !dropped.is_empty() {
|
||||
let pex_msg = extended::ut_pex::UtPex::from_addrs(&connected, &dropped);
|
||||
let ext_msg = extended::ExtendedMessage::UtPex(pex_msg);
|
||||
let msg = Message::Extended(ext_msg);
|
||||
|
||||
if tx.send(WriterRequest::Message(msg)).is_err() {
|
||||
if tx
|
||||
.send(WriterRequest::Message(Message::Extended(ext_msg)))
|
||||
.is_err()
|
||||
{
|
||||
return Ok(()); // Peer disconnected
|
||||
}
|
||||
|
||||
sent_peers_live.extend(&addrs_live_to_sent);
|
||||
sent_peers_live.retain(|addr| !addrs_closed_to_sent.contains(addr));
|
||||
for addr in &dropped {
|
||||
peer_view_of_live_peers.remove(addr);
|
||||
}
|
||||
peer_view_of_live_peers.extend(connected.iter().copied());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue