Allow peers to reconnect

This commit is contained in:
Igor Katson 2023-12-06 00:16:59 +00:00
parent 124be19e43
commit fd17ddc46b
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
5 changed files with 47 additions and 8 deletions

View file

@ -382,7 +382,17 @@ impl TorrentStateLive {
let (tx, rx) = unbounded_channel();
let counters = match self.peers.states.entry(checked_peer.addr) {
Entry::Occupied(_) => bail!("we are already managing peer {}", checked_peer.addr),
Entry::Occupied(mut occ) => {
let peer = occ.get_mut();
peer.state
.incoming_connection(
Id20(checked_peer.handshake.peer_id),
tx.clone(),
&self.peers.stats,
)
.context("peer already existed")?;
peer.stats.counters.clone()
}
Entry::Vacant(vac) => {
let peer = Peer::new_live_for_incoming_connection(
Id20(checked_peer.handshake.peer_id),
@ -394,6 +404,9 @@ impl TorrentStateLive {
counters
}
};
counters
.incoming_connections
.fetch_add(1, Ordering::Relaxed);
self.spawn(
"incoming peer",
@ -504,7 +517,7 @@ impl TorrentStateLive {
handler
.counters
.connection_attempts
.outgoing_connection_attempts
.fetch_add(1, Ordering::Relaxed);
let res = tokio::select! {
r = requester => {r}
@ -803,7 +816,9 @@ struct PeerHandler {
impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn on_connected(&self, connection_time: Duration) {
self.counters.connections.fetch_add(1, Ordering::Relaxed);
self.counters
.outgoing_connections
.fetch_add(1, Ordering::Relaxed);
self.counters
.total_time_connecting_ms
.fetch_add(connection_time.as_millis() as u64, Ordering::Relaxed);

View file

@ -135,6 +135,25 @@ impl PeerStateNoMut {
None
}
}
pub fn incoming_connection(
&mut self,
peer_id: Id20,
tx: PeerTx,
counters: &AggregatePeerStatsAtomic,
) -> anyhow::Result<()> {
if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) {
anyhow::bail!("peer already active");
}
match self.take(counters) {
PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => {
self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters);
}
PeerState::Connecting(..) | PeerState::Live(..) => unreachable!(),
}
Ok(())
}
pub fn connecting_to_live(
&mut self,
peer_id: Id20,

View file

@ -12,8 +12,9 @@ use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
pub(crate) struct PeerCountersAtomic {
pub fetched_bytes: AtomicU64,
pub total_time_connecting_ms: AtomicU64,
pub connection_attempts: AtomicU32,
pub connections: AtomicU32,
pub incoming_connections: AtomicU32,
pub outgoing_connection_attempts: AtomicU32,
pub outgoing_connections: AtomicU32,
pub errors: AtomicU32,
pub fetched_chunks: AtomicU32,
pub downloaded_and_checked_pieces: AtomicU32,

View file

@ -6,6 +6,7 @@ use crate::torrent_state::live::peer::{Peer, PeerState};
#[derive(Serialize, Deserialize)]
pub struct PeerCounters {
pub incoming_connections: u32,
pub fetched_bytes: u64,
pub total_time_connecting_ms: u64,
pub connection_attempts: u32,
@ -24,10 +25,13 @@ pub struct PeerStats {
impl From<&super::atomic::PeerCountersAtomic> for PeerCounters {
fn from(counters: &super::atomic::PeerCountersAtomic) -> Self {
Self {
incoming_connections: counters.incoming_connections.load(Ordering::Relaxed),
fetched_bytes: counters.fetched_bytes.load(Ordering::Relaxed),
total_time_connecting_ms: counters.total_time_connecting_ms.load(Ordering::Relaxed),
connection_attempts: counters.connection_attempts.load(Ordering::Relaxed),
connections: counters.connections.load(Ordering::Relaxed),
connection_attempts: counters
.outgoing_connection_attempts
.load(Ordering::Relaxed),
connections: counters.outgoing_connections.load(Ordering::Relaxed),
errors: counters.errors.load(Ordering::Relaxed),
fetched_chunks: counters.fetched_chunks.load(Ordering::Relaxed),
downloaded_and_checked_pieces: counters

View file

@ -428,7 +428,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
http_api
.make_http_api_and_run(http_api_listen_addr, false)
.await
.context("error starting HTTP API")
.context("error running HTTP API")
}
},
SubCommand::Download(download_opts) => {