1/n [exponential backoff peers]: refactor "tx" to be once per connected peer

This commit is contained in:
Igor Katson 2023-11-17 22:32:04 +00:00
parent c10f4fcd1b
commit 8e50829586
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 80 additions and 75 deletions

View file

@ -2,8 +2,10 @@ use std::{collections::HashSet, sync::Arc};
use librqbit_core::id20::Id20; use librqbit_core::id20::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{Notify, Semaphore}; use tokio::sync::{Notify, Semaphore};
use crate::peer_connection::WriterRequest;
use crate::type_aliases::BF; use crate::type_aliases::BF;
#[derive(Debug, Hash, PartialEq, Eq)] #[derive(Debug, Hash, PartialEq, Eq)]
@ -21,10 +23,14 @@ impl From<&ChunkInfo> for InflightRequest {
} }
} }
#[derive(Debug)] // TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak.
pub type PeerTx = Arc<UnboundedSender<WriterRequest>>;
#[derive(Debug, Default)]
pub enum PeerState { pub enum PeerState {
#[default]
Queued, Queued,
Connecting, Connecting(PeerTx),
Live(LivePeerState), Live(LivePeerState),
} }
@ -37,10 +43,11 @@ pub struct LivePeerState {
pub have_notify: Arc<Notify>, pub have_notify: Arc<Notify>,
pub bitfield: Option<BF>, pub bitfield: Option<BF>,
pub inflight_requests: HashSet<InflightRequest>, pub inflight_requests: HashSet<InflightRequest>,
pub tx: PeerTx,
} }
impl LivePeerState { impl LivePeerState {
pub fn new(peer_id: Id20) -> Self { pub fn new(peer_id: Id20, tx: PeerTx) -> Self {
LivePeerState { LivePeerState {
peer_id, peer_id,
i_am_choked: true, i_am_choked: true,
@ -49,6 +56,7 @@ impl LivePeerState {
have_notify: Arc::new(Notify::new()), have_notify: Arc::new(Notify::new()),
requests_sem: Arc::new(Semaphore::new(0)), requests_sem: Arc::new(Semaphore::new(0)),
inflight_requests: Default::default(), inflight_requests: Default::default(),
tx,
} }
} }
} }

View file

@ -10,7 +10,7 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use anyhow::Context; use anyhow::{bail, Context};
use buffers::{ByteBuf, ByteString}; use buffers::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned; use clone_to_owned::CloneToOwned;
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{stream::FuturesUnordered, StreamExt};
@ -40,7 +40,7 @@ use crate::{
peer_connection::{ peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
}, },
peer_state::{InflightRequest, LivePeerState, PeerState}, peer_state::{InflightRequest, LivePeerState, PeerState, PeerTx},
spawn_utils::{spawn, BlockingSpawner}, spawn_utils::{spawn, BlockingSpawner},
type_aliases::{PeerHandle, BF}, type_aliases::{PeerHandle, BF},
}; };
@ -55,7 +55,6 @@ pub struct PeerStates {
states: HashMap<PeerHandle, PeerState>, states: HashMap<PeerHandle, PeerState>,
seen: HashSet<SocketAddr>, seen: HashSet<SocketAddr>,
inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>, inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
tx: HashMap<PeerHandle, Arc<tokio::sync::mpsc::UnboundedSender<WriterRequest>>>,
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -73,7 +72,7 @@ impl PeerStates {
.values() .values()
.fold(AggregatePeerStats::default(), |mut s, p| { .fold(AggregatePeerStats::default(), |mut s, p| {
match p { match p {
PeerState::Connecting => s.connecting += 1, PeerState::Connecting(_) => s.connecting += 1,
PeerState::Live(_) => s.live += 1, PeerState::Live(_) => s.live += 1,
PeerState::Queued => s.queued += 1, PeerState::Queued => s.queued += 1,
}; };
@ -82,15 +81,11 @@ impl PeerStates {
stats.seen = self.seen.len(); stats.seen = self.seen.len();
stats stats
} }
pub fn add_if_not_seen( pub fn add_if_not_seen(&mut self, addr: SocketAddr) -> Option<PeerHandle> {
&mut self,
addr: SocketAddr,
tx: UnboundedSender<WriterRequest>,
) -> Option<PeerHandle> {
if self.seen.contains(&addr) { if self.seen.contains(&addr) {
return None; return None;
} }
let handle = self.add(addr, tx)?; let handle = self.add(addr)?;
self.seen.insert(addr); self.seen.insert(addr);
Some(handle) Some(handle)
} }
@ -113,23 +108,16 @@ impl PeerStates {
self.get_live_mut(handle) self.get_live_mut(handle)
.ok_or_else(|| anyhow::anyhow!("peer dropped")) .ok_or_else(|| anyhow::anyhow!("peer dropped"))
} }
pub fn add( pub fn add(&mut self, addr: SocketAddr) -> Option<PeerHandle> {
&mut self,
addr: SocketAddr,
tx: UnboundedSender<WriterRequest>,
) -> Option<PeerHandle> {
let handle = addr; let handle = addr;
if self.states.contains_key(&addr) { if self.states.contains_key(&addr) {
return None; return None;
} }
self.states.insert(handle, PeerState::Queued); self.states.insert(handle, PeerState::Queued);
self.tx.insert(handle, Arc::new(tx));
Some(handle) Some(handle)
} }
pub fn drop_peer(&mut self, handle: PeerHandle) -> Option<PeerState> { pub fn drop_peer(&mut self, handle: PeerHandle) -> Option<PeerState> {
let result = self.states.remove(&handle); self.states.remove(&handle)
self.tx.remove(&handle);
result
} }
pub fn mark_i_am_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option<bool> { pub fn mark_i_am_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option<bool> {
let live = self.get_live_mut(handle)?; let live = self.get_live_mut(handle)?;
@ -158,8 +146,8 @@ impl PeerStates {
live.bitfield = Some(bitfield); live.bitfield = Some(bitfield);
Some(prev) Some(prev)
} }
pub fn clone_tx(&self, handle: PeerHandle) -> Option<Arc<UnboundedSender<WriterRequest>>> { pub fn clone_tx(&self, handle: PeerHandle) -> Option<PeerTx> {
Some(self.tx.get(&handle)?.clone()) Some(self.get_live(handle)?.tx.clone())
} }
pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option<InflightPiece> { pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option<InflightPiece> {
self.inflight_pieces.remove(&piece) self.inflight_pieces.remove(&piece)
@ -242,8 +230,11 @@ pub struct TorrentState {
stats: AtomicStats, stats: AtomicStats,
options: TorrentStateOptions, options: TorrentStateOptions,
// Limits how many active (occupying network resources) peers there are at a moment in time.
peer_semaphore: Semaphore, peer_semaphore: Semaphore,
peer_queue_tx: UnboundedSender<(SocketAddr, UnboundedReceiver<WriterRequest>)>,
// The queue for peer manager to connect to them.
peer_queue_tx: UnboundedSender<SocketAddr>,
finished_notify: Notify, finished_notify: Notify,
} }
@ -292,45 +283,51 @@ impl TorrentState {
let state = state.clone(); let state = state.clone();
async move { async move {
loop { loop {
let (addr, out_rx) = peer_queue_rx.recv().await.unwrap(); let addr = peer_queue_rx.recv().await.unwrap();
let permit = state.peer_semaphore.acquire().await.unwrap(); let permit = state.peer_semaphore.acquire().await.unwrap();
match state.locked.write().peers.states.get_mut(&addr) {
Some(s @ PeerState::Queued) => *s = PeerState::Connecting,
s => {
warn!("did not expect to see the peer in state {:?}", s);
continue;
}
};
let handler = PeerHandler {
addr,
state: state.clone(),
spawner,
};
let options = PeerConnectionOptions {
connect_timeout: state.options.peer_connect_timeout,
read_write_timeout: state.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
addr,
state.info_hash,
state.peer_id,
handler,
Some(options),
spawner,
);
permit.forget(); permit.forget();
spawn(format!("manage_peer({addr})"), async move { spawn(format!("manage_peer({addr})"), {
if let Err(e) = peer_connection.manage_peer(out_rx).await { let state = state.clone();
debug!("error managing peer {}: {:#}", addr, e) async move {
}; let rx = match state.locked.write().peers.states.get_mut(&addr) {
let state = peer_connection.into_handler().state; Some(s @ PeerState::Queued) => {
state.drop_peer(addr); let (tx, rx) = unbounded_channel();
state.peer_semaphore.add_permits(1); *s = PeerState::Connecting(Arc::new(tx));
Ok::<_, anyhow::Error>(()) rx
}
s => {
bail!("did not expect to see the peer in state {:?}", s);
}
};
let handler = PeerHandler {
addr,
state: state.clone(),
spawner,
};
let options = PeerConnectionOptions {
connect_timeout: state.options.peer_connect_timeout,
read_write_timeout: state.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
addr,
state.info_hash,
state.peer_id,
handler,
Some(options),
spawner,
);
if let Err(e) = peer_connection.manage_peer(rx).await {
debug!("error managing peer {}: {:#}", addr, e)
};
let state = peer_connection.into_handler().state;
state.drop_peer(addr);
state.peer_semaphore.add_permits(1);
Ok::<_, anyhow::Error>(())
}
}); });
} }
} }
@ -456,14 +453,18 @@ impl TorrentState {
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
let mut g = self.locked.write(); let mut g = self.locked.write();
match g.peers.states.get_mut(&handle) { let s = match g.peers.states.get_mut(&handle) {
Some(s @ &mut PeerState::Connecting) => { Some(s @ PeerState::Connecting(_)) => s,
*s = PeerState::Live(LivePeerState::new(Id20(h.peer_id)));
}
_ => { _ => {
warn!("peer {} was in wrong state", handle); warn!("peer {} was in a wrong state", handle);
return;
} }
} };
let tx = match std::mem::take(s) {
PeerState::Connecting(tx) => tx,
_ => unreachable!(),
};
*s = PeerState::Live(LivePeerState::new(Id20(h.peer_id), tx));
} }
fn drop_peer(&self, handle: PeerHandle) -> bool { fn drop_peer(&self, handle: PeerHandle) -> bool {
@ -511,11 +512,7 @@ impl TorrentState {
continue; continue;
} }
let tx = match g.peers.tx.get(handle) { let tx = Arc::downgrade(&live.tx);
Some(tx) => tx,
None => continue,
};
let tx = Arc::downgrade(tx);
futures.push(async move { futures.push(async move {
if let Some(tx) = tx.upgrade() { if let Some(tx) = tx.upgrade() {
if tx if tx
@ -547,13 +544,13 @@ impl TorrentState {
} }
pub fn add_peer_if_not_seen(self: &Arc<Self>, addr: SocketAddr) -> bool { pub fn add_peer_if_not_seen(self: &Arc<Self>, addr: SocketAddr) -> bool {
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<WriterRequest>(); // let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<WriterRequest>();
match self.locked.write().peers.add_if_not_seen(addr, out_tx) { match self.locked.write().peers.add_if_not_seen(addr) {
Some(handle) => handle, Some(handle) => handle,
None => return false, None => return false,
}; };
match self.peer_queue_tx.send((addr, out_rx)) { match self.peer_queue_tx.send(addr) {
Ok(_) => {} Ok(_) => {}
Err(_) => { Err(_) => {
warn!("peer adder died, can't add peer") warn!("peer adder died, can't add peer")