diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 16e0592..ae25d70 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -1,6 +1,7 @@ use std::time::Duration; use std::{collections::HashSet, sync::Arc}; +use anyhow::Context; use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; use librqbit_core::id20::Id20; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; @@ -27,7 +28,20 @@ impl From<&ChunkInfo> for InflightRequest { // TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak. pub type PeerRx = UnboundedReceiver; -pub type PeerTx = Arc>; +pub type PeerTx = UnboundedSender; + +pub trait SendMany { + fn send_many(&self, requests: impl IntoIterator) -> anyhow::Result<()>; +} + +impl SendMany for PeerTx { + fn send_many(&self, requests: impl IntoIterator) -> anyhow::Result<()> { + requests + .into_iter() + .try_for_each(|r| self.send(r)) + .context("peer dropped") + } +} #[derive(Debug)] pub struct PeerStats { @@ -64,10 +78,20 @@ pub enum PeerState { Dead, // The peer has the full torrent, and we have the full torrent, so no need // to keep talking to it. - FullyHaveNoLongerNeeded, + NotNeeded, } impl PeerState { + pub fn name(&self) -> &'static str { + match self { + PeerState::Queued => "queued", + PeerState::Connecting(_) => "connecting", + PeerState::Live(_) => "live", + PeerState::Dead => "dead", + PeerState::NotNeeded => "not needed", + } + } + fn take_connecting(&mut self) -> Option { if let PeerState::Connecting(_) = self { match std::mem::take(self) { @@ -100,7 +124,7 @@ impl PeerState { pub fn queued_to_connecting(&mut self) -> Option { if let PeerState::Queued = self { let (tx, rx) = unbounded_channel(); - *self = PeerState::Connecting(Arc::new(tx)); + *self = PeerState::Connecting(tx); Some(rx) } else { None @@ -120,9 +144,10 @@ impl PeerState { false } - pub fn to_dead(&mut self) -> Option { + pub fn to_dead(&mut self) -> Option> { match std::mem::replace(self, PeerState::Dead) { - PeerState::Live(l) => Some(l), + PeerState::Live(l) => Some(Some(l)), + PeerState::Connecting(_) => Some(None), _ => None, } } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 73287b5..79885ac 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -1,3 +1,6 @@ +// The main logic of rqbit is here - connecting to peers, reading and writing messages +// to them, tracking peer state etc. + use std::{ collections::{HashMap, HashSet}, fs::File, @@ -41,7 +44,7 @@ use crate::{ peer_connection::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, - peer_state::{InflightRequest, LivePeerState, Peer, PeerRx, PeerState, PeerTx}, + peer_state::{InflightRequest, LivePeerState, Peer, PeerRx, PeerState, PeerTx, SendMany}, spawn_utils::{spawn, BlockingSpawner}, type_aliases::{PeerHandle, BF}, }; @@ -79,7 +82,7 @@ impl PeerStates { PeerState::Live(_) => s.live += 1, PeerState::Queued => s.queued += 1, PeerState::Dead => s.dead += 1, - PeerState::FullyHaveNoLongerNeeded => s.fully_have_and_we_are_finished += 1, + PeerState::NotNeeded => s.fully_have_and_we_are_finished += 1, }; s }); @@ -121,7 +124,7 @@ impl PeerStates { self.states.insert(handle, Default::default()); Some(handle) } - pub fn mark_peer_dead(&mut self, handle: PeerHandle) -> Option { + pub fn mark_peer_dead(&mut self, handle: PeerHandle) -> Option> { let peer = self.states.get_mut(&handle)?; peer.state.to_dead() } @@ -174,7 +177,7 @@ impl PeerStates { self.inflight_pieces.remove(&piece) } - fn mark_peer_trustworthy(&mut self, handle: SocketAddr) { + fn reset_peer_backoff(&mut self, handle: PeerHandle) { let p = match self.states.get_mut(&handle) { Some(p) => p, None => return, @@ -340,12 +343,20 @@ impl TorrentState { spawner, ); - if let Err(e) = peer_connection.manage_peer(rx).await { - debug!("error managing peer {}: {:#}", addr, e) - }; + let res = peer_connection.manage_peer(rx).await; let state = peer_connection.into_handler().state; - state.on_peer_died(addr); state.peer_semaphore.add_permits(1); + + match res { + // We disconnected the peer ourselves as we don't need it + Ok(()) => { + state.on_peer_died(addr, None); + } + Err(e) => { + debug!("error managing peer {}: {:#}", addr, e); + state.on_peer_died(addr, Some(e)); + } + } Ok::<_, anyhow::Error>(()) } }); @@ -483,12 +494,22 @@ impl TorrentState { peer.state.connecting_to_live(Id20(h.peer_id)); } - fn on_peer_died(self: &Arc, handle: PeerHandle) { + fn on_peer_died(self: &Arc, handle: PeerHandle, error: Option) { let mut g = self.locked.write(); - if let Some(live) = g.peers.mark_peer_dead(handle) { - for req in live.inflight_requests { - g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); + match g.peers.mark_peer_dead(handle) { + Some(Some(live)) => { + for req in live.inflight_requests { + g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); + } } + // Other valid state to transition to dead. + Some(None) => {} + // Peer was in an unexpected state. + None => return, + } + + if error.is_none() { + return; } let backoff = g @@ -502,17 +523,21 @@ impl TorrentState { if let Some(dur) = backoff { let state = self.clone(); - spawn("wait for peer", async move { + spawn(format!("wait_for_peer({handle}, {dur:?})"), async move { tokio::time::sleep(dur).await; { let mut g = state.locked.write(); let peer = match g.peers.states.get_mut(&handle) { Some(p) => p, - None => bail!("bug: peer disappeared"), + None => bail!("bug: peer {} disappeared", handle), }; match &peer.state { PeerState::Dead => peer.state = PeerState::Queued, - _ => bail!("peer in unexpected state"), + other => bail!( + "peer {} in unexpected state: {}. Expected dead", + handle, + other.name() + ), } } state.peer_queue_tx.send(handle)?; @@ -554,7 +579,7 @@ impl TorrentState { continue; } - let tx = Arc::downgrade(&live.tx); + let tx = live.tx.downgrade(); futures.push(async move { if let Some(tx) = tx.upgrade() { if tx @@ -586,18 +611,12 @@ impl TorrentState { } pub fn add_peer_if_not_seen(self: &Arc, addr: SocketAddr) -> bool { - // let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::(); match self.locked.write().peers.add_if_not_seen(addr) { Some(handle) => handle, None => return false, }; - match self.peer_queue_tx.send(addr) { - Ok(_) => {} - Err(_) => { - warn!("peer adder died, can't add peer") - } - } + let _ = self.peer_queue_tx.send(addr); true } @@ -818,11 +837,10 @@ impl PeerHandler { Some(tx) => tx, None => return Ok(()), }; - tx.send(WriterRequest::Message(MessageOwned::Unchoke)) - .context("peer dropped")?; - tx.send(WriterRequest::Message(MessageOwned::Interested)) - .context("peer dropped")?; - + tx.send_many([ + WriterRequest::Message(MessageOwned::Unchoke), + WriterRequest::Message(MessageOwned::Interested), + ])?; self.requester(handle).await?; Ok::<_, anyhow::Error>(()) } @@ -1095,7 +1113,7 @@ impl PeerHandler { let mut g = self.state.locked.write(); g.chunks.mark_piece_downloaded(chunk_info.piece_index); - g.peers.mark_peer_trustworthy(handle); + g.peers.reset_peer_backoff(handle); } debug!( @@ -1134,10 +1152,7 @@ impl PeerHandler { for (_, peer) in g.peers.states.iter_mut() { if let PeerState::Live(l) = &peer.state { if l.has_full_torrent(self.state.lengths.total_pieces() as usize) { - let live = peer - .state - .live_to(PeerState::FullyHaveNoLongerNeeded) - .unwrap(); + let live = peer.state.live_to(PeerState::NotNeeded).unwrap(); let _ = live.tx.send(WriterRequest::Disconnect); } }