Fixed a counter bug

This commit is contained in:
Igor Katson 2023-11-20 14:11:34 +00:00
parent a93a588ae9
commit 6f93fed360
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 48 additions and 36 deletions

View file

@ -54,6 +54,7 @@ pub struct PeerCounters {
pub errors: AtomicU32,
pub fetched_chunks: AtomicU32,
pub downloaded_and_checked_pieces: AtomicU32,
pub downloaded_and_checked_bytes: AtomicU64,
}
#[derive(Debug)]

View file

@ -341,9 +341,9 @@ impl TorrentManager {
info_hash: self.state.info_hash(),
peer_id: self.state.peer_id(),
port: 6778,
uploaded: self.state.get_uploaded(),
downloaded: self.state.get_downloaded(),
left: self.state.get_left_to_download(),
uploaded: self.state.get_uploaded_bytes(),
downloaded: self.state.get_downloaded_bytes(),
left: self.state.get_left_to_download_bytes(),
compact: true,
no_peer_id: false,
event,

View file

@ -228,17 +228,18 @@ pub struct TorrentStateLocked {
#[derive(Default, Debug)]
struct AtomicStats {
have: AtomicU64,
have_bytes: AtomicU64,
downloaded_and_checked_bytes: AtomicU64,
downloaded_and_checked_pieces: AtomicU64,
uploaded: AtomicU64,
uploaded_bytes: AtomicU64,
fetched_bytes: AtomicU64,
total_piece_download_ms: AtomicU64,
}
impl AtomicStats {
fn average_piece_download_time(&self) -> Option<Duration> {
let d = self.downloaded_and_checked_pieces.load(Ordering::Relaxed);
let t = self.total_piece_download_ms.load(Ordering::Relaxed);
let d = self.downloaded_and_checked_pieces.load(Ordering::Acquire);
let t = self.total_piece_download_ms.load(Ordering::Acquire);
if d == 0 {
return None;
}
@ -288,8 +289,8 @@ pub struct TorrentState {
info_hash: Id20,
peer_id: Id20,
lengths: Lengths,
needed: u64,
have_plus_needed: u64,
needed_bytes: u64,
have_plus_needed_bytes: u64,
stats: AtomicStats,
options: TorrentStateOptions,
@ -430,11 +431,11 @@ impl TorrentState {
files,
filenames,
stats: AtomicStats {
have: AtomicU64::new(have_bytes),
have_bytes: AtomicU64::new(have_bytes),
..Default::default()
},
needed: needed_bytes,
have_plus_needed: needed_bytes + have_bytes,
needed_bytes,
have_plus_needed_bytes: needed_bytes + have_bytes,
lengths,
options,
@ -551,7 +552,7 @@ impl TorrentState {
FileOps::new(&self.info, &self.files, &self.lengths)
}
pub fn initially_needed(&self) -> u64 {
self.needed
self.needed_bytes
}
pub fn lock_read(
&self,
@ -600,21 +601,21 @@ impl TorrentState {
}
}
pub fn get_uploaded(&self) -> u64 {
self.stats.uploaded.load(Ordering::Relaxed)
pub fn get_uploaded_bytes(&self) -> u64 {
self.stats.uploaded_bytes.load(Ordering::Relaxed)
}
pub fn get_downloaded(&self) -> u64 {
pub fn get_downloaded_bytes(&self) -> u64 {
self.stats
.downloaded_and_checked_pieces
.downloaded_and_checked_bytes
.load(Ordering::Acquire)
}
pub fn is_finished(&self) -> bool {
self.get_left_to_download() == 0
self.get_left_to_download_bytes() == 0
}
pub fn get_left_to_download(&self) -> u64 {
self.needed - self.get_downloaded()
pub fn get_left_to_download_bytes(&self) -> u64 {
self.needed_bytes - self.get_downloaded_bytes()
}
fn maybe_transmit_haves(&self, index: ValidPieceIndex) {
@ -684,17 +685,17 @@ impl TorrentState {
pub fn stats_snapshot(&self) -> StatsSnapshot {
use Ordering::*;
let downloaded = self.stats.downloaded_and_checked_pieces.load(Relaxed);
let remaining = self.needed - downloaded;
let downloaded_bytes = self.stats.downloaded_and_checked_bytes.load(Relaxed);
let remaining = self.needed_bytes - downloaded_bytes;
StatsSnapshot {
have_bytes: self.stats.have.load(Relaxed),
downloaded_and_checked_bytes: downloaded,
have_bytes: self.stats.have_bytes.load(Relaxed),
downloaded_and_checked_bytes: downloaded_bytes,
downloaded_and_checked_pieces: self.stats.downloaded_and_checked_pieces.load(Relaxed),
fetched_bytes: self.stats.fetched_bytes.load(Relaxed),
uploaded_bytes: self.stats.uploaded.load(Relaxed),
total_bytes: self.have_plus_needed,
uploaded_bytes: self.stats.uploaded_bytes.load(Relaxed),
total_bytes: self.have_plus_needed_bytes,
time: Instant::now(),
initially_needed_bytes: self.needed,
initially_needed_bytes: self.needed_bytes,
remaining_bytes: remaining,
total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed),
peer_stats: self.peers.stats(),
@ -792,7 +793,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
}
fn get_have_bytes(&self) -> u64 {
self.state.stats.have.load(Ordering::Relaxed)
self.state.stats.have_bytes.load(Ordering::Relaxed)
}
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> Option<usize> {
@ -811,7 +812,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn on_uploaded_bytes(&self, bytes: u32) {
self.state
.stats
.uploaded
.uploaded_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
}
@ -1343,32 +1344,42 @@ impl PeerHandler {
.with_context(|| format!("error checking piece={index}"))?
{
true => {
{
let mut g = self.state.lock_write("mark_piece_downloaded");
g.chunks.mark_piece_downloaded(chunk_info.piece_index);
}
// Global piece counters.
let piece_len =
self.state.lengths.piece_length(chunk_info.piece_index) as u64;
self.state
.stats
.downloaded_and_checked_pieces
.downloaded_and_checked_bytes
// This counter is used to compute "is_finished", so using
// stronger ordering.
.fetch_add(piece_len, Ordering::Release);
self.state
.stats
.have
.downloaded_and_checked_pieces
// This counter is used to compute "is_finished", so using
// stronger ordering.
.fetch_add(1, Ordering::Release);
self.state
.stats
.have_bytes
.fetch_add(piece_len, Ordering::Relaxed);
self.state.stats.total_piece_download_ms.fetch_add(
full_piece_download_time.as_millis() as u64,
Ordering::Relaxed,
Ordering::Release,
);
// Per-peer piece counters.
self.counters
.downloaded_and_checked_pieces
.fetch_add(1, Ordering::Relaxed);
{
let mut g = self.state.lock_write("mark_piece_downloaded");
g.chunks.mark_piece_downloaded(chunk_info.piece_index);
}
self.counters
.downloaded_and_checked_bytes
.fetch_add(piece_len, Ordering::Relaxed);
self.state.peers.reset_peer_backoff(self.addr);