2/n Wrap PeerState into peer

This commit is contained in:
Igor Katson 2023-11-17 22:51:05 +00:00
parent 8e50829586
commit 55e692d476
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 50 additions and 23 deletions

View file

@ -2,7 +2,7 @@ use std::{collections::HashSet, sync::Arc};
use librqbit_core::id20::Id20; use librqbit_core::id20::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{Notify, Semaphore}; use tokio::sync::{Notify, Semaphore};
use crate::peer_connection::WriterRequest; use crate::peer_connection::WriterRequest;
@ -24,8 +24,20 @@ impl From<&ChunkInfo> for InflightRequest {
} }
// TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak. // TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak.
pub type PeerRx = UnboundedReceiver<WriterRequest>;
pub type PeerTx = Arc<UnboundedSender<WriterRequest>>; pub type PeerTx = Arc<UnboundedSender<WriterRequest>>;
#[derive(Debug, Default)]
pub struct PeerStats {
pub unsuccessful_connection_attempts: usize,
}
#[derive(Debug, Default)]
pub struct Peer {
pub state: PeerState,
pub stats: PeerStats,
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub enum PeerState { pub enum PeerState {
#[default] #[default]

View file

@ -40,7 +40,7 @@ use crate::{
peer_connection::{ peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
}, },
peer_state::{InflightRequest, LivePeerState, PeerState, PeerTx}, peer_state::{InflightRequest, LivePeerState, Peer, PeerRx, PeerState, PeerTx},
spawn_utils::{spawn, BlockingSpawner}, spawn_utils::{spawn, BlockingSpawner},
type_aliases::{PeerHandle, BF}, type_aliases::{PeerHandle, BF},
}; };
@ -52,7 +52,7 @@ pub struct InflightPiece {
#[derive(Default)] #[derive(Default)]
pub struct PeerStates { pub struct PeerStates {
states: HashMap<PeerHandle, PeerState>, states: HashMap<PeerHandle, Peer>,
seen: HashSet<SocketAddr>, seen: HashSet<SocketAddr>,
inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>, inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
} }
@ -71,7 +71,7 @@ impl PeerStates {
.states .states
.values() .values()
.fold(AggregatePeerStats::default(), |mut s, p| { .fold(AggregatePeerStats::default(), |mut s, p| {
match p { match &p.state {
PeerState::Connecting(_) => s.connecting += 1, PeerState::Connecting(_) => s.connecting += 1,
PeerState::Live(_) => s.live += 1, PeerState::Live(_) => s.live += 1,
PeerState::Queued => s.queued += 1, PeerState::Queued => s.queued += 1,
@ -93,13 +93,13 @@ impl PeerStates {
&self.seen &self.seen
} }
pub fn get_live(&self, handle: PeerHandle) -> Option<&LivePeerState> { pub fn get_live(&self, handle: PeerHandle) -> Option<&LivePeerState> {
if let PeerState::Live(ref l) = self.states.get(&handle)? { if let PeerState::Live(ref l) = &self.states.get(&handle)?.state {
return Some(l); return Some(l);
} }
None None
} }
pub fn get_live_mut(&mut self, handle: PeerHandle) -> Option<&mut LivePeerState> { pub fn get_live_mut(&mut self, handle: PeerHandle) -> Option<&mut LivePeerState> {
if let PeerState::Live(ref mut l) = self.states.get_mut(&handle)? { if let PeerState::Live(ref mut l) = &mut self.states.get_mut(&handle)?.state {
return Some(l); return Some(l);
} }
None None
@ -113,10 +113,10 @@ impl PeerStates {
if self.states.contains_key(&addr) { if self.states.contains_key(&addr) {
return None; return None;
} }
self.states.insert(handle, PeerState::Queued); self.states.insert(handle, Default::default());
Some(handle) Some(handle)
} }
pub fn drop_peer(&mut self, handle: PeerHandle) -> Option<PeerState> { pub fn drop_peer(&mut self, handle: PeerHandle) -> Option<Peer> {
self.states.remove(&handle) self.states.remove(&handle)
} }
pub fn mark_i_am_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option<bool> { pub fn mark_i_am_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option<bool> {
@ -146,6 +146,23 @@ impl PeerStates {
live.bitfield = Some(bitfield); live.bitfield = Some(bitfield);
Some(prev) Some(prev)
} }
pub fn mark_peer_connecting(&mut self, h: PeerHandle) -> anyhow::Result<PeerRx> {
let peer = self
.states
.get_mut(&h)
.context("peer not found in states")?;
match &mut peer.state {
s @ PeerState::Queued => {
let (tx, rx) = unbounded_channel();
*s = PeerState::Connecting(Arc::new(tx));
Ok(rx)
}
s => {
bail!("did not expect to see the peer in state {:?}", s);
}
}
}
pub fn clone_tx(&self, handle: PeerHandle) -> Option<PeerTx> { pub fn clone_tx(&self, handle: PeerHandle) -> Option<PeerTx> {
Some(self.get_live(handle)?.tx.clone()) Some(self.get_live(handle)?.tx.clone())
} }
@ -290,16 +307,7 @@ impl TorrentState {
spawn(format!("manage_peer({addr})"), { spawn(format!("manage_peer({addr})"), {
let state = state.clone(); let state = state.clone();
async move { async move {
let rx = match state.locked.write().peers.states.get_mut(&addr) { let rx = state.locked.write().peers.mark_peer_connecting(addr)?;
Some(s @ PeerState::Queued) => {
let (tx, rx) = unbounded_channel();
*s = PeerState::Connecting(Arc::new(tx));
rx
}
s => {
bail!("did not expect to see the peer in state {:?}", s);
}
};
let handler = PeerHandler { let handler = PeerHandler {
addr, addr,
@ -453,8 +461,15 @@ impl TorrentState {
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
let mut g = self.locked.write(); let mut g = self.locked.write();
let s = match g.peers.states.get_mut(&handle) { let peer = match g.peers.states.get_mut(&handle) {
Some(s @ PeerState::Connecting(_)) => s, Some(peer) => peer,
None => {
warn!("peer {} was in a wrong state", handle);
return;
}
};
let s = match &mut peer.state {
s @ PeerState::Connecting(_) => s,
_ => { _ => {
warn!("peer {} was in a wrong state", handle); warn!("peer {} was in a wrong state", handle);
return; return;
@ -473,7 +488,7 @@ impl TorrentState {
Some(peer) => peer, Some(peer) => peer,
None => return false, None => return false,
}; };
if let PeerState::Live(l) = peer { if let PeerState::Live(l) = peer.state {
for req in l.inflight_requests { for req in l.inflight_requests {
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
} }
@ -496,8 +511,8 @@ impl TorrentState {
let mut futures = Vec::new(); let mut futures = Vec::new();
let g = self.locked.read(); let g = self.locked.read();
for (handle, peer_state) in g.peers.states.iter() { for (handle, peer) in g.peers.states.iter() {
match peer_state { match &peer.state {
PeerState::Live(live) => { PeerState::Live(live) => {
if !live.peer_interested { if !live.peer_interested {
continue; continue;