diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 763f7f2..0b4d578 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -21,6 +21,7 @@ impl From<&ChunkInfo> for InflightRequest { } pub enum PeerState { + Queued(SocketAddr), Connecting(SocketAddr), Live(LivePeerState), } diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index f1a3797..2e4f1fb 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -266,11 +266,12 @@ impl TorrentManager { (stats.downloaded_and_checked_bytes as f64 / needed as f64) * 100f64 }; info!( - "Stats: downloaded {:.2}% ({:.2}), peers {{live: {}, connecting: {}, seen: {}}}, fetched {}, remaining {:.2} out of {:.2}, uploaded {:.2}, total have {:.2}", + "Stats: downloaded {:.2}% ({:.2}), peers {{live: {}, connecting: {}, queued: {}, seen: {}}}, fetched {}, remaining {:.2} out of {:.2}, uploaded {:.2}, total have {:.2}", downloaded_pct, SF::new(stats.downloaded_and_checked_bytes), live_peer_stats.live, live_peer_stats.connecting, + live_peer_stats.queued, seen_peers_count, SF::new(stats.fetched_bytes), SF::new(stats.remaining_bytes), diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 55d8aef..dc5e7e6 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -56,6 +56,7 @@ pub struct PeerStates { #[derive(Debug, Default)] pub struct AggregatePeerStats { + pub queued: usize, pub connecting: usize, pub live: usize, } @@ -68,6 +69,7 @@ impl PeerStates { match p { PeerState::Connecting(_) => s.connecting += 1, PeerState::Live(_) => s.live += 1, + PeerState::Queued(_) => s.queued += 1, }; s }) @@ -112,7 +114,7 @@ impl PeerStates { if self.states.contains_key(&addr) { return None; } - self.states.insert(handle, PeerState::Connecting(addr)); + self.states.insert(handle, PeerState::Queued(addr)); self.tx.insert(handle, Arc::new(tx)); Some(handle) } @@ -162,20 +164,18 @@ pub struct TorrentStateLocked { } #[derive(Default, Debug)] -pub struct AtomicStats { - pub have: AtomicU64, - pub downloaded_and_checked: AtomicU64, - pub uploaded: AtomicU64, - pub fetched_bytes: AtomicU64, +struct AtomicStats { + have: AtomicU64, + downloaded_and_checked: AtomicU64, + uploaded: AtomicU64, + fetched_bytes: AtomicU64, - pub downloaded_pieces: AtomicU64, - pub total_piece_download_ms: AtomicU64, - - pub queued_peers: AtomicU64, + downloaded_pieces: AtomicU64, + total_piece_download_ms: AtomicU64, } impl AtomicStats { - pub fn average_piece_download_time(&self) -> Option { + fn average_piece_download_time(&self) -> Option { let d = self.downloaded_pieces.load(Ordering::Relaxed); let t = self.total_piece_download_ms.load(Ordering::Relaxed); if d == 0 { @@ -224,6 +224,7 @@ pub struct TorrentState { stats: AtomicStats, spawner: BlockingSpawner, + peer_semaphore: Semaphore, peer_queue_tx: UnboundedSender<(SocketAddr, UnboundedReceiver)>, } @@ -241,7 +242,6 @@ impl TorrentState { spawner: BlockingSpawner, ) -> Arc { let (peer_queue_tx, mut peer_queue_rx) = unbounded_channel(); - let peer_semaphore = Arc::new(Semaphore::new(128)); let state = Arc::new(TorrentState { info_hash, info, @@ -259,6 +259,7 @@ impl TorrentState { lengths, spawner, + peer_semaphore: Semaphore::new(128), peer_queue_tx, }); spawn("peer adder", { @@ -266,8 +267,29 @@ impl TorrentState { async move { loop { let (addr, out_rx) = peer_queue_rx.recv().await.unwrap(); - state.stats.queued_peers.fetch_sub(1, Ordering::Relaxed); - let permit = peer_semaphore.clone().acquire_owned().await.unwrap(); + + { + // Update state to connecting. + match state.locked.write().peers.states.get_mut(&addr) { + Some(s) => match s { + PeerState::Queued(_) => *s = PeerState::Connecting(addr), + PeerState::Connecting(_) => { + warn!("did not expect to see the peer {} connecting", addr); + continue; + } + PeerState::Live(_) => { + warn!("did not expect to see the peer {} live", addr); + continue; + } + }, + None => { + warn!("did not find peer state for {}", addr); + continue; + } + }; + } + + state.peer_semaphore.acquire().await.unwrap().forget(); let handler = PeerHandler { addr, @@ -282,7 +304,7 @@ impl TorrentState { }; let state = peer_connection.into_handler().state; state.drop_peer(addr); - drop(permit); + state.peer_semaphore.add_permits(1); Ok::<_, anyhow::Error>(()) }); } @@ -305,11 +327,8 @@ impl TorrentState { pub fn initially_needed(&self) -> u64 { self.needed } - fn stats(&self) -> &AtomicStats { - &self.stats - } - pub fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { + fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { let g = self.locked.read(); let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?; for n in g.chunks.get_needed_pieces().iter_ones() { @@ -321,7 +340,7 @@ impl TorrentState { None } - pub fn am_i_choked(&self, peer_handle: PeerHandle) -> Option { + fn am_i_choked(&self, peer_handle: PeerHandle) -> Option { self.locked .read() .peers @@ -329,7 +348,7 @@ impl TorrentState { .map(|l| l.i_am_choked) } - pub fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { + fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { if self.am_i_choked(peer_handle)? { warn!("we are choked by {}, can't reserve next piece", peer_handle); return None; @@ -358,11 +377,11 @@ impl TorrentState { Some(n) } - pub fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool { + fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool { self.get_next_needed_piece(handle).is_some() } - pub fn try_steal_old_slow_piece(&self, handle: PeerHandle) -> Option { + fn try_steal_old_slow_piece(&self, handle: PeerHandle) -> Option { let total = self.stats.downloaded_pieces.load(Ordering::Relaxed); // heuristic for not enough precision in average time @@ -394,7 +413,7 @@ impl TorrentState { None } - pub fn try_steal_piece(&self, handle: PeerHandle) -> Option { + fn try_steal_piece(&self, handle: PeerHandle) -> Option { let mut rng = rand::thread_rng(); use rand::seq::IteratorRandom; let g = self.locked.read(); @@ -407,7 +426,7 @@ impl TorrentState { .copied() } - pub fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { + fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { let mut g = self.locked.write(); match g.peers.states.get_mut(&handle) { Some(s @ &mut PeerState::Connecting(_)) => { @@ -419,7 +438,7 @@ impl TorrentState { } } - pub fn drop_peer(&self, handle: PeerHandle) -> bool { + fn drop_peer(&self, handle: PeerHandle) -> bool { let mut g = self.locked.write(); let peer = match g.peers.drop_peer(handle) { Some(peer) => peer, @@ -432,6 +451,7 @@ impl TorrentState { g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); } } + PeerState::Queued(_) => {} } true } @@ -447,7 +467,7 @@ impl TorrentState { self.needed - self.get_downloaded() } - pub fn maybe_transmit_haves(&self, index: ValidPieceIndex) { + fn maybe_transmit_haves(&self, index: ValidPieceIndex) { let mut futures = Vec::new(); let g = self.locked.read(); @@ -509,7 +529,6 @@ impl TorrentState { None => return false, }; - self.stats.queued_peers.fetch_add(1, Ordering::Relaxed); match self.peer_queue_tx.send((addr, out_rx)) { Ok(_) => {} Err(_) => { @@ -522,14 +541,14 @@ impl TorrentState { pub fn stats_snapshot(&self) -> StatsSnapshot { let g = self.locked.read(); use Ordering::*; - let (live, connecting) = - g.peers - .states - .values() - .fold((0u32, 0u32), |(live, connecting), p| match p { - PeerState::Connecting(_) => (live, connecting + 1), - PeerState::Live(_) => (live + 1, connecting), - }); + let (live, connecting, queued) = g.peers.states.values().fold( + (0u32, 0u32, 0u32), + |(live, connecting, queued), p| match p { + PeerState::Connecting(_) => (live, connecting + 1, queued), + PeerState::Live(_) => (live + 1, connecting, queued), + PeerState::Queued(_) => (live, connecting, queued + 1), + }, + ); let downloaded = self.stats.downloaded_and_checked.load(Relaxed); let remaining = self.needed - downloaded; StatsSnapshot { @@ -544,7 +563,7 @@ impl TorrentState { time: Instant::now(), initially_needed_bytes: self.needed, remaining_bytes: remaining, - queued_peers: self.stats.queued_peers.load(Relaxed) as u32, + queued_peers: queued, total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), } }