diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index cc9d50d..94c5f5a 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -2,7 +2,7 @@ use std::{collections::HashSet, sync::Arc}; use librqbit_core::id20::Id20; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{Notify, Semaphore}; use crate::peer_connection::WriterRequest; @@ -24,8 +24,20 @@ impl From<&ChunkInfo> for InflightRequest { } // TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak. +pub type PeerRx = UnboundedReceiver; pub type PeerTx = Arc>; +#[derive(Debug, Default)] +pub struct PeerStats { + pub unsuccessful_connection_attempts: usize, +} + +#[derive(Debug, Default)] +pub struct Peer { + pub state: PeerState, + pub stats: PeerStats, +} + #[derive(Debug, Default)] pub enum PeerState { #[default] diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index b9c0a86..682220a 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -40,7 +40,7 @@ use crate::{ peer_connection::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, - peer_state::{InflightRequest, LivePeerState, PeerState, PeerTx}, + peer_state::{InflightRequest, LivePeerState, Peer, PeerRx, PeerState, PeerTx}, spawn_utils::{spawn, BlockingSpawner}, type_aliases::{PeerHandle, BF}, }; @@ -52,7 +52,7 @@ pub struct InflightPiece { #[derive(Default)] pub struct PeerStates { - states: HashMap, + states: HashMap, seen: HashSet, inflight_pieces: HashMap, } @@ -71,7 +71,7 @@ impl PeerStates { .states .values() .fold(AggregatePeerStats::default(), |mut s, p| { - match p { + match &p.state { PeerState::Connecting(_) => s.connecting += 1, PeerState::Live(_) => s.live += 1, PeerState::Queued => s.queued += 1, @@ -93,13 +93,13 @@ impl PeerStates { &self.seen } pub fn get_live(&self, handle: PeerHandle) -> Option<&LivePeerState> { - if let PeerState::Live(ref l) = self.states.get(&handle)? { + if let PeerState::Live(ref l) = &self.states.get(&handle)?.state { return Some(l); } None } pub fn get_live_mut(&mut self, handle: PeerHandle) -> Option<&mut LivePeerState> { - if let PeerState::Live(ref mut l) = self.states.get_mut(&handle)? { + if let PeerState::Live(ref mut l) = &mut self.states.get_mut(&handle)?.state { return Some(l); } None @@ -113,10 +113,10 @@ impl PeerStates { if self.states.contains_key(&addr) { return None; } - self.states.insert(handle, PeerState::Queued); + self.states.insert(handle, Default::default()); Some(handle) } - pub fn drop_peer(&mut self, handle: PeerHandle) -> Option { + pub fn drop_peer(&mut self, handle: PeerHandle) -> Option { self.states.remove(&handle) } pub fn mark_i_am_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option { @@ -146,6 +146,23 @@ impl PeerStates { live.bitfield = Some(bitfield); Some(prev) } + pub fn mark_peer_connecting(&mut self, h: PeerHandle) -> anyhow::Result { + let peer = self + .states + .get_mut(&h) + .context("peer not found in states")?; + match &mut peer.state { + s @ PeerState::Queued => { + let (tx, rx) = unbounded_channel(); + *s = PeerState::Connecting(Arc::new(tx)); + Ok(rx) + } + s => { + bail!("did not expect to see the peer in state {:?}", s); + } + } + } + pub fn clone_tx(&self, handle: PeerHandle) -> Option { Some(self.get_live(handle)?.tx.clone()) } @@ -290,16 +307,7 @@ impl TorrentState { spawn(format!("manage_peer({addr})"), { let state = state.clone(); async move { - let rx = match state.locked.write().peers.states.get_mut(&addr) { - Some(s @ PeerState::Queued) => { - let (tx, rx) = unbounded_channel(); - *s = PeerState::Connecting(Arc::new(tx)); - rx - } - s => { - bail!("did not expect to see the peer in state {:?}", s); - } - }; + let rx = state.locked.write().peers.mark_peer_connecting(addr)?; let handler = PeerHandler { addr, @@ -453,8 +461,15 @@ impl TorrentState { fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { let mut g = self.locked.write(); - let s = match g.peers.states.get_mut(&handle) { - Some(s @ PeerState::Connecting(_)) => s, + let peer = match g.peers.states.get_mut(&handle) { + Some(peer) => peer, + None => { + warn!("peer {} was in a wrong state", handle); + return; + } + }; + let s = match &mut peer.state { + s @ PeerState::Connecting(_) => s, _ => { warn!("peer {} was in a wrong state", handle); return; @@ -473,7 +488,7 @@ impl TorrentState { Some(peer) => peer, None => return false, }; - if let PeerState::Live(l) = peer { + if let PeerState::Live(l) = peer.state { for req in l.inflight_requests { g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); } @@ -496,8 +511,8 @@ impl TorrentState { let mut futures = Vec::new(); let g = self.locked.read(); - for (handle, peer_state) in g.peers.states.iter() { - match peer_state { + for (handle, peer) in g.peers.states.iter() { + match &peer.state { PeerState::Live(live) => { if !live.peer_interested { continue;