diff --git a/Cargo.lock b/Cargo.lock index b6b4159..e1ed135 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,6 +146,17 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom", + "instant", + "rand", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -755,6 +766,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -825,6 +845,7 @@ version = "2.2.2" dependencies = [ "anyhow", "axum", + "backoff", "bincode", "bitvec", "byteorder", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index e5e882c..8ce32cc 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -55,7 +55,8 @@ uuid = {version = "1.2", features = ["v4"]} futures = "0.3" url = "2" hex = "0.4" +backoff = "0.4.0" [dev-dependencies] futures = {version = "0.3"} -pretty_env_logger = "0.5" \ No newline at end of file +pretty_env_logger = "0.5" diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 94c5f5a..bcfa19d 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -1,5 +1,7 @@ +use std::time::Duration; use std::{collections::HashSet, sync::Arc}; +use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; use librqbit_core::id20::Id20; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -27,9 +29,22 @@ impl From<&ChunkInfo> for InflightRequest { pub type PeerRx = UnboundedReceiver; pub type PeerTx = Arc>; -#[derive(Debug, Default)] +#[derive(Debug)] pub struct PeerStats { - pub unsuccessful_connection_attempts: usize, + pub backoff: ExponentialBackoff, +} + +impl Default for PeerStats { + fn default() -> Self { + Self { + backoff: ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_secs(10)) + .with_multiplier(6.) + .with_max_interval(Duration::from_secs(3600)) + .with_max_elapsed_time(Some(Duration::from_secs(86400))) + .build(), + } + } } #[derive(Debug, Default)] @@ -44,6 +59,7 @@ pub enum PeerState { Queued, Connecting(PeerTx), Live(LivePeerState), + Dead, } #[derive(Debug)] diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 682220a..86e7e53 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -11,6 +11,7 @@ use std::{ }; use anyhow::{bail, Context}; +use backoff::backoff::Backoff; use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; use futures::{stream::FuturesUnordered, StreamExt}; @@ -28,7 +29,7 @@ use serde::Serialize; use sha1w::Sha1; use tokio::{ sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + mpsc::{unbounded_channel, UnboundedSender}, Notify, Semaphore, }, time::timeout, @@ -63,6 +64,7 @@ pub struct AggregatePeerStats { pub connecting: usize, pub live: usize, pub seen: usize, + pub dead: usize, } impl PeerStates { @@ -75,6 +77,7 @@ impl PeerStates { PeerState::Connecting(_) => s.connecting += 1, PeerState::Live(_) => s.live += 1, PeerState::Queued => s.queued += 1, + PeerState::Dead => s.dead += 1, }; s }); @@ -116,6 +119,13 @@ impl PeerStates { self.states.insert(handle, Default::default()); Some(handle) } + pub fn mark_peer_dead(&mut self, handle: PeerHandle) -> Option { + let peer = self.states.get_mut(&handle)?; + match std::mem::replace(&mut peer.state, PeerState::Dead) { + PeerState::Live(l) => Some(l), + _ => None, + } + } pub fn drop_peer(&mut self, handle: PeerHandle) -> Option { self.states.remove(&handle) } @@ -169,6 +179,14 @@ impl PeerStates { pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option { self.inflight_pieces.remove(&piece) } + + fn mark_peer_trustworthy(&mut self, handle: SocketAddr) { + let p = match self.states.get_mut(&handle) { + Some(p) => p, + None => return, + }; + p.stats.backoff.reset(); + } } pub struct TorrentStateLocked { @@ -332,7 +350,7 @@ impl TorrentState { debug!("error managing peer {}: {:#}", addr, e) }; let state = peer_connection.into_handler().state; - state.drop_peer(addr); + state.on_peer_died(addr); state.peer_semaphore.add_permits(1); Ok::<_, anyhow::Error>(()) } @@ -482,18 +500,45 @@ impl TorrentState { *s = PeerState::Live(LivePeerState::new(Id20(h.peer_id), tx)); } - fn drop_peer(&self, handle: PeerHandle) -> bool { + fn on_peer_died(self: &Arc, handle: PeerHandle) { let mut g = self.locked.write(); - let peer = match g.peers.drop_peer(handle) { + let live = match g.peers.mark_peer_dead(handle) { Some(peer) => peer, - None => return false, + None => return, }; - if let PeerState::Live(l) = peer.state { - for req in l.inflight_requests { - g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); - } + for req in live.inflight_requests { + g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); + } + let backoff = g + .peers + .states + .get_mut(&handle) + .unwrap() + .stats + .backoff + .next_backoff(); + + if let Some(dur) = backoff { + let state = self.clone(); + spawn("wait for peer", async move { + tokio::time::sleep(dur).await; + { + let mut g = state.locked.write(); + let peer = match g.peers.states.get_mut(&handle) { + Some(p) => p, + None => bail!("bug: peer disappeared"), + }; + match &peer.state { + PeerState::Dead => peer.state = PeerState::Queued, + _ => bail!("peer in unexpected state"), + } + } + state.peer_queue_tx.send(handle)?; + Ok::<_, anyhow::Error>(()) + }); + } else { + g.peers.drop_peer(handle); } - true } pub fn get_uploaded(&self) -> u64 { @@ -511,7 +556,7 @@ impl TorrentState { let mut futures = Vec::new(); let g = self.locked.read(); - for (handle, peer) in g.peers.states.iter() { + for (_, peer) in g.peers.states.iter() { match &peer.state { PeerState::Live(live) => { if !live.peer_interested { @@ -1064,11 +1109,12 @@ impl PeerHandler { full_piece_download_time.as_millis() as u64, Ordering::Relaxed, ); - self.state - .locked - .write() - .chunks - .mark_piece_downloaded(chunk_info.piece_index); + { + let mut g = self.state.locked.write(); + + g.chunks.mark_piece_downloaded(chunk_info.piece_index); + g.peers.mark_peer_trustworthy(handle); + } debug!( "piece={} successfully downloaded and verified from {}", diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 4ce72e7..344af16 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -223,7 +223,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> (progress as f64 / total as f64) * 100f64 }; info!( - "[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", + "[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}, dead: {}}}", idx, downloaded_pct, SF::new(progress), @@ -236,6 +236,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> peer_stats.connecting, peer_stats.queued, peer_stats.seen, + peer_stats.dead ); }, }