From 9472d66bf9fadc0450f0df33fbbb606850894ca6 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 4 Jul 2021 11:05:20 +0100 Subject: [PATCH] Update visiibilty --- crates/librqbit/src/http_api.rs | 6 +-- crates/librqbit/src/torrent_manager.rs | 46 ++++++++---------- crates/librqbit/src/torrent_state.rs | 64 ++++++++++++++++++++++---- 3 files changed, 77 insertions(+), 39 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index d983bcb..10981ee 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -22,20 +22,20 @@ pub async fn make_and_run_http_api( let state = state.clone(); let start_time = Instant::now(); let initial_downloaded_and_checked = - state.stats.downloaded_and_checked.load(Ordering::Relaxed); + state.stats().downloaded_and_checked.load(Ordering::Relaxed); move || { let mut buf = Vec::new(); writeln!(buf, "{:#?}", state.stats_snapshot()).unwrap(); writeln!( buf, "Average download time: {:?}", - state.stats.average_piece_download_time() + state.stats().average_piece_download_time() ) .unwrap(); // Poor mans download speed computation let elapsed = start_time.elapsed(); - let downloaded_bytes = state.stats.downloaded_and_checked.load(Ordering::Relaxed) + let downloaded_bytes = state.stats().downloaded_and_checked.load(Ordering::Relaxed) - initial_downloaded_and_checked; let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64; writeln!( diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index ef02cba..696f02b 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -4,10 +4,7 @@ use std::{ net::SocketAddr, ops::Deref, path::{Path, PathBuf}, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::{atomic::Ordering, Arc}, time::{Duration, Instant}, }; @@ -19,7 +16,7 @@ use librqbit_core::{ torrent_metainfo::TorrentMetaV1Info, }; use log::{debug, info}; -use parking_lot::{Mutex, RwLock}; +use parking_lot::Mutex; use reqwest::Url; use sha1w::Sha1; use size_format::SizeFormatterBinary as SF; @@ -29,7 +26,7 @@ use crate::{ file_ops::FileOps, http_api::make_and_run_http_api, spawn_utils::{spawn, BlockingSpawner}, - torrent_state::{AtomicStats, TorrentState, TorrentStateLocked}, + torrent_state::TorrentState, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, }; pub struct TorrentManagerBuilder { @@ -212,23 +209,18 @@ impl TorrentManager { lengths, ); - let state = Arc::new(TorrentState { + let state = Arc::new(TorrentState::new( + info, info_hash, - torrent: info, peer_id, - locked: Arc::new(RwLock::new(TorrentStateLocked { - peers: Default::default(), - chunks: chunk_tracker, - })), files, - stats: AtomicStats { - have: AtomicU64::new(initial_check_results.have_bytes), - ..Default::default() - }, - needed: initial_check_results.needed_bytes, + chunk_tracker, lengths, + initial_check_results.have_bytes, + initial_check_results.needed_bytes, spawner, - }); + )); + let estimator = Arc::new(SpeedEstimator::new(5)); let mgr = Arc::new(Self { @@ -250,8 +242,8 @@ impl TorrentManager { let state = mgr.state.clone(); async move { loop { - let downloaded = state.stats.downloaded_and_checked.load(Ordering::Relaxed); - let needed = state.needed; + let downloaded = state.stats().downloaded_and_checked.load(Ordering::Relaxed); + let needed = state.initially_needed(); let remaining = needed - downloaded; estimator.add_snapshot(downloaded, remaining, Instant::now()); tokio::time::sleep(Duration::from_secs(1)).await; @@ -266,16 +258,16 @@ impl TorrentManager { loop { let live_peer_stats = self.state.locked.read().peers.stats(); let seen_peers_count = self.state.locked.read().peers.seen().len(); - let have = self.state.stats.have.load(Ordering::Relaxed); - let fetched = self.state.stats.fetched_bytes.load(Ordering::Relaxed); - let needed = self.state.needed; + let have = self.state.stats().have.load(Ordering::Relaxed); + let fetched = self.state.stats().fetched_bytes.load(Ordering::Relaxed); + let needed = self.state.initially_needed(); let downloaded = self .state - .stats + .stats() .downloaded_and_checked .load(Ordering::Relaxed); let remaining = needed - downloaded; - let uploaded = self.state.stats.uploaded.load(Ordering::Relaxed); + let uploaded = self.state.stats().uploaded.load(Ordering::Relaxed); let downloaded_pct = if downloaded == needed { 100f64 } else { @@ -326,8 +318,8 @@ impl TorrentManager { let mut event = Some(TrackerRequestEvent::Started); loop { let request = TrackerRequest { - info_hash: self.state.info_hash, - peer_id: self.state.peer_id, + info_hash: self.state.info_hash(), + peer_id: self.state.peer_id(), port: 6778, uploaded: self.state.get_uploaded(), downloaded: self.state.get_downloaded(), diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 9192d76..fbd38c1 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -14,6 +14,7 @@ use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::{ + info_hash::InfoHash, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, torrent_metainfo::TorrentMetaV1Info, }; @@ -192,21 +193,66 @@ pub struct StatsSnapshot { } pub struct TorrentState { - pub torrent: TorrentMetaV1Info, + info: TorrentMetaV1Info, pub locked: Arc>, - pub files: Vec>>, - pub info_hash: [u8; 20], - pub peer_id: [u8; 20], - pub lengths: Lengths, - pub needed: u64, - pub stats: AtomicStats, + files: Vec>>, + info_hash: [u8; 20], + peer_id: [u8; 20], + lengths: Lengths, + needed: u64, + stats: AtomicStats, - pub spawner: BlockingSpawner, + spawner: BlockingSpawner, } impl TorrentState { + #[allow(clippy::too_many_arguments)] + pub fn new( + info: TorrentMetaV1Info, + info_hash: [u8; 20], + peer_id: [u8; 20], + files: Vec>>, + chunk_tracker: ChunkTracker, + lengths: Lengths, + have_bytes: u64, + needed_bytes: u64, + spawner: BlockingSpawner, + ) -> Self { + TorrentState { + info_hash, + info, + peer_id, + locked: Arc::new(RwLock::new(TorrentStateLocked { + peers: Default::default(), + chunks: chunk_tracker, + })), + files, + stats: AtomicStats { + have: AtomicU64::new(have_bytes), + ..Default::default() + }, + needed: needed_bytes, + lengths, + spawner, + } + } + pub fn info(&self) -> &TorrentMetaV1Info { + &self.info + } + pub fn info_hash(&self) -> InfoHash { + self.info_hash + } + pub fn peer_id(&self) -> [u8; 20] { + self.peer_id + } pub fn file_ops(&self) -> FileOps<'_, Sha1> { - FileOps::new(&self.torrent, &self.files, &self.lengths) + FileOps::new(&self.info, &self.files, &self.lengths) + } + pub fn initially_needed(&self) -> u64 { + self.needed + } + pub fn stats(&self) -> &AtomicStats { + &self.stats } pub fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option {