diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index bb6df51..a34633b 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -54,6 +54,7 @@ pub struct PeerCounters { pub errors: AtomicU32, pub fetched_chunks: AtomicU32, pub downloaded_and_checked_pieces: AtomicU32, + pub downloaded_and_checked_bytes: AtomicU64, } #[derive(Debug)] diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index c8157d4..b953530 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -341,9 +341,9 @@ impl TorrentManager { info_hash: self.state.info_hash(), peer_id: self.state.peer_id(), port: 6778, - uploaded: self.state.get_uploaded(), - downloaded: self.state.get_downloaded(), - left: self.state.get_left_to_download(), + uploaded: self.state.get_uploaded_bytes(), + downloaded: self.state.get_downloaded_bytes(), + left: self.state.get_left_to_download_bytes(), compact: true, no_peer_id: false, event, diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 9472477..61ed206 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -228,17 +228,18 @@ pub struct TorrentStateLocked { #[derive(Default, Debug)] struct AtomicStats { - have: AtomicU64, + have_bytes: AtomicU64, + downloaded_and_checked_bytes: AtomicU64, downloaded_and_checked_pieces: AtomicU64, - uploaded: AtomicU64, + uploaded_bytes: AtomicU64, fetched_bytes: AtomicU64, total_piece_download_ms: AtomicU64, } impl AtomicStats { fn average_piece_download_time(&self) -> Option { - let d = self.downloaded_and_checked_pieces.load(Ordering::Relaxed); - let t = self.total_piece_download_ms.load(Ordering::Relaxed); + let d = self.downloaded_and_checked_pieces.load(Ordering::Acquire); + let t = self.total_piece_download_ms.load(Ordering::Acquire); if d == 0 { return None; } @@ -288,8 +289,8 @@ pub struct TorrentState { info_hash: Id20, peer_id: Id20, lengths: Lengths, - needed: u64, - have_plus_needed: u64, + needed_bytes: u64, + have_plus_needed_bytes: u64, stats: AtomicStats, options: TorrentStateOptions, @@ -430,11 +431,11 @@ impl TorrentState { files, filenames, stats: AtomicStats { - have: AtomicU64::new(have_bytes), + have_bytes: AtomicU64::new(have_bytes), ..Default::default() }, - needed: needed_bytes, - have_plus_needed: needed_bytes + have_bytes, + needed_bytes, + have_plus_needed_bytes: needed_bytes + have_bytes, lengths, options, @@ -551,7 +552,7 @@ impl TorrentState { FileOps::new(&self.info, &self.files, &self.lengths) } pub fn initially_needed(&self) -> u64 { - self.needed + self.needed_bytes } pub fn lock_read( &self, @@ -600,21 +601,21 @@ impl TorrentState { } } - pub fn get_uploaded(&self) -> u64 { - self.stats.uploaded.load(Ordering::Relaxed) + pub fn get_uploaded_bytes(&self) -> u64 { + self.stats.uploaded_bytes.load(Ordering::Relaxed) } - pub fn get_downloaded(&self) -> u64 { + pub fn get_downloaded_bytes(&self) -> u64 { self.stats - .downloaded_and_checked_pieces + .downloaded_and_checked_bytes .load(Ordering::Acquire) } pub fn is_finished(&self) -> bool { - self.get_left_to_download() == 0 + self.get_left_to_download_bytes() == 0 } - pub fn get_left_to_download(&self) -> u64 { - self.needed - self.get_downloaded() + pub fn get_left_to_download_bytes(&self) -> u64 { + self.needed_bytes - self.get_downloaded_bytes() } fn maybe_transmit_haves(&self, index: ValidPieceIndex) { @@ -684,17 +685,17 @@ impl TorrentState { pub fn stats_snapshot(&self) -> StatsSnapshot { use Ordering::*; - let downloaded = self.stats.downloaded_and_checked_pieces.load(Relaxed); - let remaining = self.needed - downloaded; + let downloaded_bytes = self.stats.downloaded_and_checked_bytes.load(Relaxed); + let remaining = self.needed_bytes - downloaded_bytes; StatsSnapshot { - have_bytes: self.stats.have.load(Relaxed), - downloaded_and_checked_bytes: downloaded, + have_bytes: self.stats.have_bytes.load(Relaxed), + downloaded_and_checked_bytes: downloaded_bytes, downloaded_and_checked_pieces: self.stats.downloaded_and_checked_pieces.load(Relaxed), fetched_bytes: self.stats.fetched_bytes.load(Relaxed), - uploaded_bytes: self.stats.uploaded.load(Relaxed), - total_bytes: self.have_plus_needed, + uploaded_bytes: self.stats.uploaded_bytes.load(Relaxed), + total_bytes: self.have_plus_needed_bytes, time: Instant::now(), - initially_needed_bytes: self.needed, + initially_needed_bytes: self.needed_bytes, remaining_bytes: remaining, total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), peer_stats: self.peers.stats(), @@ -792,7 +793,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { } fn get_have_bytes(&self) -> u64 { - self.state.stats.have.load(Ordering::Relaxed) + self.state.stats.have_bytes.load(Ordering::Relaxed) } fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> Option { @@ -811,7 +812,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { fn on_uploaded_bytes(&self, bytes: u32) { self.state .stats - .uploaded + .uploaded_bytes .fetch_add(bytes as u64, Ordering::Relaxed); } @@ -1343,32 +1344,42 @@ impl PeerHandler { .with_context(|| format!("error checking piece={index}"))? { true => { + { + let mut g = self.state.lock_write("mark_piece_downloaded"); + g.chunks.mark_piece_downloaded(chunk_info.piece_index); + } + // Global piece counters. let piece_len = self.state.lengths.piece_length(chunk_info.piece_index) as u64; self.state .stats - .downloaded_and_checked_pieces + .downloaded_and_checked_bytes // This counter is used to compute "is_finished", so using // stronger ordering. .fetch_add(piece_len, Ordering::Release); self.state .stats - .have + .downloaded_and_checked_pieces + // This counter is used to compute "is_finished", so using + // stronger ordering. + .fetch_add(1, Ordering::Release); + self.state + .stats + .have_bytes .fetch_add(piece_len, Ordering::Relaxed); self.state.stats.total_piece_download_ms.fetch_add( full_piece_download_time.as_millis() as u64, - Ordering::Relaxed, + Ordering::Release, ); // Per-peer piece counters. self.counters .downloaded_and_checked_pieces .fetch_add(1, Ordering::Relaxed); - { - let mut g = self.state.lock_write("mark_piece_downloaded"); - g.chunks.mark_piece_downloaded(chunk_info.piece_index); - } + self.counters + .downloaded_and_checked_bytes + .fetch_add(piece_len, Ordering::Relaxed); self.state.peers.reset_peer_backoff(self.addr);