diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 10981ee..3c98768 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use librqbit_core::speed_estimator::SpeedEstimator; use std::io::Write; -use std::sync::atomic::Ordering; use std::time::Instant; use warp::Filter; @@ -21,22 +20,22 @@ pub async fn make_and_run_http_api( let dump_stats = warp::path("stats").map({ let state = state.clone(); let start_time = Instant::now(); - let initial_downloaded_and_checked = - state.stats().downloaded_and_checked.load(Ordering::Relaxed); + let initial_downloaded_and_checked = state.stats_snapshot().downloaded_and_checked_bytes; move || { + let snapshot = state.stats_snapshot(); let mut buf = Vec::new(); writeln!(buf, "{:#?}", state.stats_snapshot()).unwrap(); writeln!( buf, "Average download time: {:?}", - state.stats().average_piece_download_time() + snapshot.average_piece_download_time() ) .unwrap(); // Poor mans download speed computation let elapsed = start_time.elapsed(); - let downloaded_bytes = state.stats().downloaded_and_checked.load(Ordering::Relaxed) - - initial_downloaded_and_checked; + let downloaded_bytes = + snapshot.downloaded_and_checked_bytes - initial_downloaded_and_checked; let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64; writeln!( buf, diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index d8d9181..c9fc308 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -111,9 +111,16 @@ impl PeerConnection { ) -> anyhow::Result<()> { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; - let mut conn = tokio::net::TcpStream::connect(self.addr) - .await - .context("error connecting")?; + let mut conn = match timeout( + Duration::from_secs(10), + tokio::net::TcpStream::connect(self.addr), + ) + .await + { + Ok(conn) => conn.context("error connecting")?, + Err(_) => anyhow::bail!("timeout connecting to {}", self.addr), + }; + let mut write_buf = Vec::::with_capacity(PIECE_MESSAGE_DEFAULT_LEN); let handshake = Handshake::new(self.info_hash, self.peer_id); handshake.serialize(&mut write_buf); diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 696f02b..f1a3797 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -209,7 +209,7 @@ impl TorrentManager { lengths, ); - let state = Arc::new(TorrentState::new( + let state = TorrentState::new( info, info_hash, peer_id, @@ -219,7 +219,7 @@ impl TorrentManager { initial_check_results.have_bytes, initial_check_results.needed_bytes, spawner, - )); + ); let estimator = Arc::new(SpeedEstimator::new(5)); @@ -242,7 +242,7 @@ impl TorrentManager { let state = mgr.state.clone(); async move { loop { - let downloaded = state.stats().downloaded_and_checked.load(Ordering::Relaxed); + let downloaded = state.stats_snapshot().downloaded_and_checked_bytes; let needed = state.initially_needed(); let remaining = needed - downloaded; estimator.add_snapshot(downloaded, remaining, Instant::now()); @@ -258,33 +258,25 @@ impl TorrentManager { loop { let live_peer_stats = self.state.locked.read().peers.stats(); let seen_peers_count = self.state.locked.read().peers.seen().len(); - let have = self.state.stats().have.load(Ordering::Relaxed); - let fetched = self.state.stats().fetched_bytes.load(Ordering::Relaxed); + let stats = self.state.stats_snapshot(); let needed = self.state.initially_needed(); - let downloaded = self - .state - .stats() - .downloaded_and_checked - .load(Ordering::Relaxed); - let remaining = needed - downloaded; - let uploaded = self.state.stats().uploaded.load(Ordering::Relaxed); - let downloaded_pct = if downloaded == needed { + let downloaded_pct = if stats.remaining_bytes == 0 { 100f64 } else { - (downloaded as f64 / needed as f64) * 100f64 + (stats.downloaded_and_checked_bytes as f64 / needed as f64) * 100f64 }; info!( "Stats: downloaded {:.2}% ({:.2}), peers {{live: {}, connecting: {}, seen: {}}}, fetched {}, remaining {:.2} out of {:.2}, uploaded {:.2}, total have {:.2}", downloaded_pct, - SF::new(downloaded), + SF::new(stats.downloaded_and_checked_bytes), live_peer_stats.live, live_peer_stats.connecting, seen_peers_count, - SF::new(fetched), - SF::new(remaining), + SF::new(stats.fetched_bytes), + SF::new(stats.remaining_bytes), SF::new(needed), - SF::new(uploaded), - SF::new(have) + SF::new(stats.uploaded_bytes), + SF::new(stats.have_bytes) ); tokio::time::sleep(Duration::from_secs(1)).await; } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index fbd38c1..55d8aef 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -24,7 +24,13 @@ use peer_binary_protocol::{ extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request, }; use sha1w::Sha1; -use tokio::{sync::mpsc::UnboundedSender, time::timeout}; +use tokio::{ + sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + Semaphore, + }, + time::timeout, +}; use crate::{ chunk_tracker::{ChunkMarkingResult, ChunkTracker}, @@ -164,6 +170,8 @@ pub struct AtomicStats { pub downloaded_pieces: AtomicU64, pub total_piece_download_ms: AtomicU64, + + pub queued_peers: AtomicU64, } impl AtomicStats { @@ -190,6 +198,19 @@ pub struct StatsSnapshot { pub seen_peers: u32, pub connecting_peers: u32, pub time: Instant, + pub queued_peers: u32, + total_piece_download_ms: u64, +} + +impl StatsSnapshot { + pub fn average_piece_download_time(&self) -> Option { + let d = self.downloaded_and_checked_pieces; + let t = self.total_piece_download_ms; + if d == 0 { + return None; + } + Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64)) + } } pub struct TorrentState { @@ -201,8 +222,9 @@ pub struct TorrentState { lengths: Lengths, needed: u64, stats: AtomicStats, - spawner: BlockingSpawner, + + peer_queue_tx: UnboundedSender<(SocketAddr, UnboundedReceiver)>, } impl TorrentState { @@ -217,8 +239,10 @@ impl TorrentState { have_bytes: u64, needed_bytes: u64, spawner: BlockingSpawner, - ) -> Self { - TorrentState { + ) -> Arc { + let (peer_queue_tx, mut peer_queue_rx) = unbounded_channel(); + let peer_semaphore = Arc::new(Semaphore::new(128)); + let state = Arc::new(TorrentState { info_hash, info, peer_id, @@ -234,7 +258,37 @@ impl TorrentState { needed: needed_bytes, lengths, spawner, - } + + peer_queue_tx, + }); + spawn("peer adder", { + let state = state.clone(); + async move { + loop { + let (addr, out_rx) = peer_queue_rx.recv().await.unwrap(); + state.stats.queued_peers.fetch_sub(1, Ordering::Relaxed); + let permit = peer_semaphore.clone().acquire_owned().await.unwrap(); + + let handler = PeerHandler { + addr, + state: state.clone(), + spawner: state.spawner, + }; + let peer_connection = + PeerConnection::new(addr, state.info_hash, state.peer_id, handler); + spawn(format!("manage_peer({})", addr), async move { + if let Err(e) = peer_connection.manage_peer(out_rx).await { + debug!("error managing peer {}: {:#}", addr, e) + }; + let state = peer_connection.into_handler().state; + state.drop_peer(addr); + drop(permit); + Ok::<_, anyhow::Error>(()) + }); + } + } + }); + state } pub fn info(&self) -> &TorrentMetaV1Info { &self.info @@ -251,7 +305,7 @@ impl TorrentState { pub fn initially_needed(&self) -> u64 { self.needed } - pub fn stats(&self) -> &AtomicStats { + fn stats(&self) -> &AtomicStats { &self.stats } @@ -450,24 +504,18 @@ impl TorrentState { pub fn add_peer_if_not_seen(self: &Arc, addr: SocketAddr) -> bool { let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::(); - let handle = match self.locked.write().peers.add_if_not_seen(addr, out_tx) { + match self.locked.write().peers.add_if_not_seen(addr, out_tx) { Some(handle) => handle, None => return false, }; - let handler = PeerHandler { - addr, - state: self.clone(), - spawner: self.spawner, - }; - let peer_connection = PeerConnection::new(addr, self.info_hash, self.peer_id, handler); - spawn(format!("manage_peer({})", handle), async move { - if let Err(e) = peer_connection.manage_peer(out_rx).await { - debug!("error managing peer {}: {:#}", handle, e) - }; - peer_connection.into_handler().state.drop_peer(handle); - Ok::<_, anyhow::Error>(()) - }); + self.stats.queued_peers.fetch_add(1, Ordering::Relaxed); + match self.peer_queue_tx.send((addr, out_rx)) { + Ok(_) => {} + Err(_) => { + warn!("peer adder died, can't add peer") + } + } true } @@ -496,6 +544,8 @@ impl TorrentState { time: Instant::now(), initially_needed_bytes: self.needed, remaining_bytes: remaining, + queued_peers: self.stats.queued_peers.load(Relaxed) as u32, + total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), } } }