From fd17ddc46b70837759f16981c0214748b1344ce6 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 6 Dec 2023 00:16:59 +0000 Subject: [PATCH] Allow peers to reconnect --- crates/librqbit/src/torrent_state/live/mod.rs | 21 ++++++++++++++++--- .../src/torrent_state/live/peer/mod.rs | 19 +++++++++++++++++ .../torrent_state/live/peer/stats/atomic.rs | 5 +++-- .../torrent_state/live/peer/stats/snapshot.rs | 8 +++++-- crates/rqbit/src/main.rs | 2 +- 5 files changed, 47 insertions(+), 8 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 306661c..c2b4ef1 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -382,7 +382,17 @@ impl TorrentStateLive { let (tx, rx) = unbounded_channel(); let counters = match self.peers.states.entry(checked_peer.addr) { - Entry::Occupied(_) => bail!("we are already managing peer {}", checked_peer.addr), + Entry::Occupied(mut occ) => { + let peer = occ.get_mut(); + peer.state + .incoming_connection( + Id20(checked_peer.handshake.peer_id), + tx.clone(), + &self.peers.stats, + ) + .context("peer already existed")?; + peer.stats.counters.clone() + } Entry::Vacant(vac) => { let peer = Peer::new_live_for_incoming_connection( Id20(checked_peer.handshake.peer_id), @@ -394,6 +404,9 @@ impl TorrentStateLive { counters } }; + counters + .incoming_connections + .fetch_add(1, Ordering::Relaxed); self.spawn( "incoming peer", @@ -504,7 +517,7 @@ impl TorrentStateLive { handler .counters - .connection_attempts + .outgoing_connection_attempts .fetch_add(1, Ordering::Relaxed); let res = tokio::select! { r = requester => {r} @@ -803,7 +816,9 @@ struct PeerHandler { impl<'a> PeerConnectionHandler for &'a PeerHandler { fn on_connected(&self, connection_time: Duration) { - self.counters.connections.fetch_add(1, Ordering::Relaxed); + self.counters + .outgoing_connections + .fetch_add(1, Ordering::Relaxed); self.counters .total_time_connecting_ms .fetch_add(connection_time.as_millis() as u64, Ordering::Relaxed); diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 37cfce4..a915999 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -135,6 +135,25 @@ impl PeerStateNoMut { None } } + + pub fn incoming_connection( + &mut self, + peer_id: Id20, + tx: PeerTx, + counters: &AggregatePeerStatsAtomic, + ) -> anyhow::Result<()> { + if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { + anyhow::bail!("peer already active"); + } + match self.take(counters) { + PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => { + self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters); + } + PeerState::Connecting(..) | PeerState::Live(..) => unreachable!(), + } + Ok(()) + } + pub fn connecting_to_live( &mut self, peer_id: Id20, diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs b/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs index 6c9b80a..2933d07 100644 --- a/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs +++ b/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs @@ -12,8 +12,9 @@ use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; pub(crate) struct PeerCountersAtomic { pub fetched_bytes: AtomicU64, pub total_time_connecting_ms: AtomicU64, - pub connection_attempts: AtomicU32, - pub connections: AtomicU32, + pub incoming_connections: AtomicU32, + pub outgoing_connection_attempts: AtomicU32, + pub outgoing_connections: AtomicU32, pub errors: AtomicU32, pub fetched_chunks: AtomicU32, pub downloaded_and_checked_pieces: AtomicU32, diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs index 48db933..df18007 100644 --- a/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs +++ b/crates/librqbit/src/torrent_state/live/peer/stats/snapshot.rs @@ -6,6 +6,7 @@ use crate::torrent_state::live::peer::{Peer, PeerState}; #[derive(Serialize, Deserialize)] pub struct PeerCounters { + pub incoming_connections: u32, pub fetched_bytes: u64, pub total_time_connecting_ms: u64, pub connection_attempts: u32, @@ -24,10 +25,13 @@ pub struct PeerStats { impl From<&super::atomic::PeerCountersAtomic> for PeerCounters { fn from(counters: &super::atomic::PeerCountersAtomic) -> Self { Self { + incoming_connections: counters.incoming_connections.load(Ordering::Relaxed), fetched_bytes: counters.fetched_bytes.load(Ordering::Relaxed), total_time_connecting_ms: counters.total_time_connecting_ms.load(Ordering::Relaxed), - connection_attempts: counters.connection_attempts.load(Ordering::Relaxed), - connections: counters.connections.load(Ordering::Relaxed), + connection_attempts: counters + .outgoing_connection_attempts + .load(Ordering::Relaxed), + connections: counters.outgoing_connections.load(Ordering::Relaxed), errors: counters.errors.load(Ordering::Relaxed), fetched_chunks: counters.fetched_chunks.load(Ordering::Relaxed), downloaded_and_checked_pieces: counters diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 72168af..3049c73 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -428,7 +428,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { http_api .make_http_api_and_run(http_api_listen_addr, false) .await - .context("error starting HTTP API") + .context("error running HTTP API") } }, SubCommand::Download(download_opts) => {