diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index da9037b..159bb60 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -69,11 +69,10 @@ use librqbit_core::{ use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use peer_binary_protocol::{ extended::{ - handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, + self, handshake::ExtendedHandshake, ut_metadata::UtMetadata, ut_pex::UtPex, ExtendedMessage, }, Handshake, Message, MessageOwned, Piece, Request, }; -use peers::stats::atomic::AggregatePeerStatsAtomic; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, Notify, OwnedSemaphorePermit, Semaphore, @@ -258,6 +257,7 @@ impl TorrentStateLive { session_stats: session_stats.clone(), stats: Default::default(), states: Default::default(), + live_outgoing_peers: Default::default(), }, locked: RwLock::new(TorrentStateLocked { chunks: Some(paused.chunk_tracker), @@ -337,10 +337,6 @@ impl TorrentStateLive { spawn_with_cancel(span, self.cancellation_token.clone(), fut); } - fn peer_stats(&self) -> [&AggregatePeerStatsAtomic; 2] { - [&self.peers.stats, &self.peers.session_stats.peers] - } - pub fn down_speed_estimator(&self) -> &SpeedEstimator { &self.down_speed_estimator } @@ -373,21 +369,21 @@ impl TorrentStateLive { let counters = match self.peers.states.entry(checked_peer.addr) { Entry::Occupied(mut occ) => { let peer = occ.get_mut(); - peer.state - .incoming_connection( - Id20::new(checked_peer.handshake.peer_id), - tx.clone(), - &self.peer_stats(), - ) - .context("peer already existed")?; + peer.incoming_connection( + Id20::new(checked_peer.handshake.peer_id), + tx.clone(), + &self.peers, + ) + .context("peer already existed")?; peer.stats.counters.clone() } Entry::Vacant(vac) => { atomic_inc(&self.peers.stats.seen); let peer = Peer::new_live_for_incoming_connection( + *vac.key(), Id20::new(checked_peer.handshake.peer_id), tx.clone(), - &self.peer_stats(), + &self.peers, ); let counters = peer.stats.counters.clone(); vac.insert(peer); @@ -619,8 +615,7 @@ impl TorrentStateLive { fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { self.peers.with_peer_mut(handle, "set_peer_live", |p| { - p.state - .connecting_to_live(Id20::new(h.peer_id), &self.peer_stats()); + p.connecting_to_live(Id20::new(h.peer_id), &self.peers); }); } @@ -677,7 +672,7 @@ impl TorrentStateLive { .peers .states .iter() - .filter(|e| filter.state.matches(e.value().state.get())) + .filter(|e| filter.state.matches(e.value().get_state())) .map(|e| (e.key().to_string(), e.value().into())) .collect(), } @@ -812,9 +807,9 @@ impl TorrentStateLive { fn disconnect_all_peers_that_have_full_torrent(&self) { for mut pe in self.peers.states.iter_mut() { - if let PeerState::Live(l) = pe.value().state.get() { + if let PeerState::Live(l) = pe.value().get_state() { if l.has_full_torrent(self.lengths.total_pieces() as usize) { - let prev = pe.value_mut().state.set_not_needed(&self.peer_stats()); + let prev = pe.value_mut().set_not_needed(&self.peers); let _ = prev .take_live_no_counters() .unwrap() @@ -829,15 +824,75 @@ impl TorrentStateLive { self.peers .states .iter_mut() - .filter_map(|mut p| { - let known_addr = *p.key(); - p.value_mut() - .reconnect_not_needed_peer(known_addr, &self.peer_stats()) - }) + .filter_map(|mut p| p.value_mut().reconnect_not_needed_peer(&self.peers)) .map(|socket_addr| self.peer_queue_tx.send(socket_addr)) .take_while(|r| r.is_ok()) .last(); } + + async fn task_send_pex_to_peer( + self: Arc, + _peer_addr: SocketAddr, + tx: PeerTx, + ) -> anyhow::Result<()> { + // As per BEP 11 we should not send more than 50 peers at once + // (here it also applies to fist message, should be OK as we anyhow really have more) + const MAX_SENT_PEERS: usize = 50; + // As per BEP 11 recommended interval is min 60 seconds + const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60); + + let mut connected = Vec::with_capacity(MAX_SENT_PEERS); + let mut dropped = Vec::with_capacity(MAX_SENT_PEERS); + let mut peer_view_of_live_peers = HashSet::new(); + + // Wait 10 seconds before sending the first message to assure that peer will stay with us + tokio::time::sleep(Duration::from_secs(10)).await; + + let mut interval = tokio::time::interval(PEX_MESSAGE_INTERVAL); + + loop { + interval.tick().await; + + { + let live_peers = self.peers.live_outgoing_peers.read(); + connected.clear(); + dropped.clear(); + + connected.extend( + live_peers + .difference(&peer_view_of_live_peers) + .take(MAX_SENT_PEERS) + .copied(), + ); + dropped.extend( + peer_view_of_live_peers + .difference(&live_peers) + .take(MAX_SENT_PEERS) + .copied(), + ); + } + + // BEP 11 - Dont send closed if they are now in live + // it's assured by mutual exclusion of two above sets if in sent_peers_live, it cannot be in addrs_live_to_sent, + // and addrs_closed_to_sent are only filtered addresses from sent_peers_live + + if !connected.is_empty() || !dropped.is_empty() { + let pex_msg = extended::ut_pex::UtPex::from_addrs(&connected, &dropped); + let ext_msg = extended::ExtendedMessage::UtPex(pex_msg); + if tx + .send(WriterRequest::Message(Message::Extended(ext_msg))) + .is_err() + { + return Ok(()); // Peer disconnected + } + + for addr in &dropped { + peer_view_of_live_peers.remove(addr); + } + peer_view_of_live_peers.extend(connected.iter().copied()); + } + } + } } struct PeerHandlerLocked { @@ -963,8 +1018,17 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { } fn on_extended_handshake(&self, hs: &ExtendedHandshake) -> anyhow::Result<()> { - if let Some(peer_pex_msg_id) = hs.ut_pex() { - trace!("peer supports pex at {peer_pex_msg_id}"); + if let Some(_peer_pex_msg_id) = hs.ut_pex() { + self.state.clone().spawn( + error_span!( + parent: self.state.torrent.span.clone(), + "sending_pex_to_peer", + peer = self.addr.to_string() + ), + self.state + .clone() + .task_send_pex_to_peer(self.addr, self.tx.clone()), + ); } // Lets update outgoing Socket address for incoming connection if self.incoming { @@ -1020,7 +1084,6 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { impl PeerHandler { fn on_peer_died(self, error: Option) -> anyhow::Result<()> { let peers = &self.state.peers; - let pstats = self.state.peer_stats(); let handle = self.addr; let mut pe = match peers.states.get_mut(&handle) { Some(peer) => TimedExistence::new(peer, "on_peer_died"), @@ -1029,7 +1092,7 @@ impl PeerHandler { return Ok(()); } }; - let prev = pe.value_mut().state.take(&pstats); + let prev = pe.value_mut().take_state(peers); match prev { PeerState::Connecting(_) => {} @@ -1048,7 +1111,7 @@ impl PeerHandler { } PeerState::NotNeeded => { // Restore it as std::mem::take() replaced it above. - pe.value_mut().state.set(PeerState::NotNeeded, &pstats); + pe.value_mut().set_state(PeerState::NotNeeded, peers); return Ok(()); } s @ PeerState::Queued | s @ PeerState::Dead => { @@ -1064,7 +1127,7 @@ impl PeerHandler { Some(e) => e, None => { trace!("peer died without errors, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, &pstats); + pe.value_mut().set_state(PeerState::NotNeeded, peers); return Ok(()); } }; @@ -1073,11 +1136,11 @@ impl PeerHandler { if self.state.is_finished_and_no_active_streams() { debug!("torrent finished, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, &pstats); + pe.value_mut().set_state(PeerState::NotNeeded, peers); return Ok(()); } - pe.value_mut().state.set(PeerState::Dead, &pstats); + pe.value_mut().set_state(PeerState::Dead, peers); if self.incoming { // do not retry incoming peers @@ -1108,9 +1171,9 @@ impl PeerHandler { self.state .peers .with_peer_mut(handle, "dead_to_queued", |peer| { - match peer.state.get() { + match peer.get_state() { PeerState::Dead => { - peer.state.set(PeerState::Queued, &self.state.peer_stats()) + peer.set_state(PeerState::Queued, &self.state.peers) } other => bail!( "peer is in unexpected state: {}. Expected dead", diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index b0895f8..fc1d04f 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -2,6 +2,7 @@ pub mod stats; use std::collections::HashSet; use std::net::SocketAddr; +use std::sync::atomic::Ordering; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::ChunkInfo; @@ -12,30 +13,33 @@ use tracing::debug; use crate::peer_connection::WriterRequest; use crate::type_aliases::BF; -use super::peers::stats::atomic::AggregatePeerStatsAtomic; +use super::PeerStates; pub(crate) type InflightRequest = ChunkInfo; pub(crate) type PeerRx = UnboundedReceiver; pub(crate) type PeerTx = UnboundedSender; -#[derive(Debug, Default)] +#[derive(Debug)] pub(crate) struct Peer { - pub state: PeerStateNoMut, + pub addr: SocketAddr, + state: PeerState, pub stats: stats::atomic::PeerStats, pub outgoing_address: Option, } impl Peer { pub fn new_live_for_incoming_connection( + addr: SocketAddr, peer_id: Id20, tx: PeerTx, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> Self { - let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true))); - for counter in counters { - counter.inc(&state.0); + let state = PeerState::Live(LivePeerState::new(peer_id, tx, true)); + for counter in [&counters.session_stats.peers, &counters.stats] { + counter.inc(&state); } Self { + addr, state, stats: Default::default(), outgoing_address: None, @@ -44,29 +48,30 @@ impl Peer { pub fn new_with_outgoing_address(addr: SocketAddr) -> Self { Self { + addr, outgoing_address: Some(addr), - ..Default::default() + stats: Default::default(), + state: Default::default(), } } pub(crate) fn reconnect_not_needed_peer( &mut self, - known_address: SocketAddr, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> Option { - if let PeerState::NotNeeded = self.state.get() { + if let PeerState::NotNeeded = self.get_state() { match self.outgoing_address { None => None, + Some(socket_addr) if self.addr == socket_addr => { + self.set_state(PeerState::Queued, counters); + Some(socket_addr) + } Some(socket_addr) => { - if known_address == socket_addr { - self.state.set(PeerState::Queued, counters); - } else { - debug!( - peer = known_address.to_string(), - outgoing_addr = socket_addr.to_string(), - "peer will by retried on different address", - ); - } + debug!( + peer = %self.addr, + outgoing_addr = %socket_addr, + "peer will by retried on different address", + ); Some(socket_addr) } } @@ -116,54 +121,67 @@ impl PeerState { } } -#[derive(Debug, Default)] -pub(crate) struct PeerStateNoMut(PeerState); - -impl PeerStateNoMut { - pub fn get(&self) -> &PeerState { - &self.0 +impl Peer { + pub fn get_state(&self) -> &PeerState { + &self.state } - pub fn take(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { - self.set(Default::default(), counters) + pub fn take_state(&mut self, counters: &PeerStates) -> PeerState { + self.set_state(Default::default(), counters) } - pub fn destroy(self, counters: &[&AggregatePeerStatsAtomic]) { - for counter in counters { - counter.dec(&self.0); + pub fn destroy(self, counters: &PeerStates) { + for counter in [&counters.session_stats.peers, &counters.stats] { + counter.dec(&self.state); + } + if let (Some(addr), PeerState::Live(..)) = (self.outgoing_address, &self.state) { + counters.live_outgoing_peers.write().remove(&addr); } } - pub fn set(&mut self, new: PeerState, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { - for counter in counters { - counter.incdec(&self.0, &new); + pub fn set_state(&mut self, new: PeerState, counters: &PeerStates) -> PeerState { + for counter in [&counters.session_stats.peers, &counters.stats] { + counter.incdec(&self.state, &new); } - std::mem::replace(&mut self.0, new) + if let Some(addr) = self.outgoing_address { + if matches!(&self.state, PeerState::Live(..)) { + counters.live_outgoing_peers.write().remove(&addr); + } + if matches!(&new, PeerState::Live(..)) + && self + .stats + .counters + .outgoing_connections + .load(Ordering::Relaxed) + > 0 + { + counters.live_outgoing_peers.write().insert(addr); + } + } + + std::mem::replace(&mut self.state, new) } pub fn get_live(&self) -> Option<&LivePeerState> { - match &self.0 { + match &self.state { PeerState::Live(l) => Some(l), _ => None, } } pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { - match &mut self.0 { + match &mut self.state { PeerState::Live(l) => Some(l), _ => None, } } - pub fn idle_to_connecting( - &mut self, - counters: &[&AggregatePeerStatsAtomic], - ) -> Option<(PeerRx, PeerTx)> { - match &self.0 { + pub fn idle_to_connecting(&mut self, counters: &PeerStates) -> Option<(PeerRx, PeerTx)> { + match &self.state { PeerState::Queued | PeerState::NotNeeded => { let (tx, rx) = unbounded_channel(); let tx_2 = tx.clone(); - self.set(PeerState::Connecting(tx), counters); + self.set_state(PeerState::Connecting(tx), counters); Some((rx, tx_2)) } _ => None, @@ -174,14 +192,14 @@ impl PeerStateNoMut { &mut self, peer_id: Id20, tx: PeerTx, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> anyhow::Result<()> { - if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { + if matches!(&self.state, PeerState::Connecting(..) | PeerState::Live(..)) { anyhow::bail!("peer already active"); } - match self.take(counters) { + match self.take_state(counters) { PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => { - self.set( + self.set_state( PeerState::Live(LivePeerState::new(peer_id, tx, true)), counters, ); @@ -194,14 +212,14 @@ impl PeerStateNoMut { pub fn connecting_to_live( &mut self, peer_id: Id20, - counters: &[&AggregatePeerStatsAtomic], + counters: &PeerStates, ) -> Option<&mut LivePeerState> { - if let PeerState::Connecting(_) = &self.0 { - let tx = match self.take(counters) { + if let PeerState::Connecting(_) = &self.state { + let tx = match self.take_state(counters) { PeerState::Connecting(tx) => tx, _ => unreachable!(), }; - self.set( + self.set_state( PeerState::Live(LivePeerState::new(peer_id, tx, false)), counters, ); @@ -211,8 +229,8 @@ impl PeerStateNoMut { } } - pub fn set_not_needed(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { - self.set(PeerState::NotNeeded, counters) + pub fn set_not_needed(&mut self, counters: &PeerStates) -> PeerState { + self.set_state(PeerState::NotNeeded, counters) } } diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs index 471e3f7..4d6e2b3 100644 --- a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs +++ b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs @@ -51,7 +51,7 @@ impl From<&Peer> for PeerStats { fn from(peer: &Peer) -> Self { Self { counters: peer.stats.counters.as_ref().into(), - state: peer.state.get().name(), + state: peer.get_state().name(), } } } diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index 110df73..17c0758 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -1,9 +1,10 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{collections::HashSet, net::SocketAddr, sync::Arc}; use anyhow::Context; use backoff::backoff::Backoff; use dashmap::DashMap; use librqbit_core::lengths::ValidPieceIndex; +use parking_lot::RwLock; use peer_binary_protocol::{Message, Request}; use crate::{ @@ -21,6 +22,9 @@ pub mod stats; pub(crate) struct PeerStates { pub session_stats: Arc, + + // This keeps track of live addresses we connected to, for PEX. + pub live_outgoing_peers: RwLock>, pub stats: AggregatePeerStatsAtomic, pub states: DashMap, } @@ -28,7 +32,7 @@ pub(crate) struct PeerStates { impl Drop for PeerStates { fn drop(&mut self) { for (_, p) in std::mem::take(&mut self.states).into_iter() { - p.state.destroy(&[&self.session_stats.peers]); + p.destroy(self); } } } @@ -69,7 +73,7 @@ impl PeerStates { } pub fn with_live(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option { - self.with_peer(addr, |peer| peer.state.get_live().map(f)) + self.with_peer(addr, |peer| peer.get_live().map(f)) .flatten() } @@ -79,13 +83,13 @@ impl PeerStates { reason: &'static str, f: impl FnOnce(&mut LivePeerState) -> R, ) -> Option { - self.with_peer_mut(addr, reason, |peer| peer.state.get_live_mut().map(f)) + self.with_peer_mut(addr, reason, |peer| peer.get_live_mut().map(f)) .flatten() } pub fn drop_peer(&self, handle: PeerHandle) -> Option { let p = self.states.remove(&handle).map(|r| r.1)?; - let s = p.state.get(); + let s = p.get_state(); self.stats.dec(s); self.session_stats.peers.dec(s); @@ -114,9 +118,7 @@ impl PeerStates { pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<(PeerRx, PeerTx)> { let rx = self .with_peer_mut(h, "mark_peer_connecting", |peer| { - peer.state - .idle_to_connecting(&[&self.stats, &self.session_stats.peers]) - .context("invalid peer state") + peer.idle_to_connecting(self).context("invalid peer state") }) .context("peer not found in states")??; Ok(rx) @@ -130,8 +132,7 @@ impl PeerStates { pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { - peer.state - .set_not_needed(&[&self.stats, &self.session_stats.peers]) + peer.set_not_needed(self) })?; Some(prev) } diff --git a/crates/peer_binary_protocol/src/extended/ut_pex.rs b/crates/peer_binary_protocol/src/extended/ut_pex.rs index 8e1b0a8..a7c24b9 100644 --- a/crates/peer_binary_protocol/src/extended/ut_pex.rs +++ b/crates/peer_binary_protocol/src/extended/ut_pex.rs @@ -1,7 +1,8 @@ use std::net::{IpAddr, SocketAddr}; +use buffers::ByteBufOwned; use byteorder::{ByteOrder, BE}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use clone_to_owned::CloneToOwned; use serde::{Deserialize, Serialize}; @@ -121,9 +122,58 @@ where } } +impl UtPex { + pub fn from_addrs<'a, I, J>(addrs_live: I, addrs_closed: J) -> Self + where + I: IntoIterator, + J: IntoIterator, + { + fn addrs_to_bytes<'a, I>(addrs: I) -> (Option, Option) + where + I: IntoIterator, + { + let mut ipv4_addrs = BytesMut::new(); + let mut ipv6_addrs = BytesMut::new(); + for addr in addrs { + match addr { + SocketAddr::V4(v4) => { + ipv4_addrs.extend_from_slice(&v4.ip().octets()); + ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes()); + } + SocketAddr::V6(v6) => { + ipv6_addrs.extend_from_slice(&v6.ip().octets()); + ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes()); + } + } + } + + let freeze = |buf: BytesMut| -> Option { + if !buf.is_empty() { + Some(buf.freeze().into()) + } else { + None + } + }; + + (freeze(ipv4_addrs), freeze(ipv6_addrs)) + } + + let (added, added6) = addrs_to_bytes(addrs_live); + let (dropped, dropped6) = addrs_to_bytes(addrs_closed); + + Self { + added, + added6, + dropped, + dropped6, + ..Default::default() + } + } +} + #[cfg(test)] mod tests { - use bencode::from_bytes; + use bencode::{bencode_serialize_to_writer, from_bytes}; use buffers::ByteBuf; use super::*; @@ -154,4 +204,35 @@ mod tests { ); assert_eq!(0, addrs[1].flags); } + + #[test] + fn test_pex_roundtrip() { + let a1 = "185.159.157.20:46439".parse::().unwrap(); + let a2 = "151.249.105.134:4240".parse::().unwrap(); + //IPV6 + let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439" + .parse::() + .unwrap(); + let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240" + .parse::() + .unwrap(); + + let addrs = vec![a1, aa1, a2, aa2]; + let pex = UtPex::from_addrs(&addrs, &addrs); + let mut bytes = Vec::new(); + bencode_serialize_to_writer(&pex, &mut bytes).unwrap(); + let pex2 = from_bytes::>(&bytes).unwrap(); + assert_eq!(4, pex2.added_peers().count()); + assert_eq!(pex.added_peers().count(), pex2.added_peers().count()); + let addrs2: Vec<_> = pex2.added_peers().collect(); + assert_eq!(a1, addrs2[0].addr); + assert_eq!(a2, addrs2[1].addr); + assert_eq!(aa1, addrs2[2].addr); + assert_eq!(aa2, addrs2[3].addr); + let addrs2: Vec<_> = pex2.dropped_peers().collect(); + assert_eq!(a1, addrs2[0].addr); + assert_eq!(a2, addrs2[1].addr); + assert_eq!(aa1, addrs2[2].addr); + assert_eq!(aa2, addrs2[3].addr); + } }