Merge pull request #73 from ikatson/less-stealing

Less stealing
This commit is contained in:
Igor Katson 2024-01-02 18:35:49 +00:00 committed by GitHub
commit 396bacff0c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 41 deletions

View file

@ -1011,10 +1011,15 @@ impl PeerHandler {
} }
} }
let n_opt = match n_opt {
Some(n_opt) => n_opt,
None => return Ok(None),
};
self.state self.state
.lengths .lengths
.validate_piece_index(n_opt.context("invalid n_opt")? as u32) .validate_piece_index(n_opt as u32)
.context("invalid piece")? .context("bug: invalid piece")?
}; };
g.inflight_pieces.insert( g.inflight_pieces.insert(
n, n,
@ -1030,18 +1035,15 @@ impl PeerHandler {
.map(|r| r.flatten()) .map(|r| r.flatten())
} }
/// Try to steal a piece from a slower peer. Threshold is
/// "how many times is my average download speed faster to be able to steal".
///
/// If this returns, an existing in-flight piece was marked to be ours.
fn try_steal_old_slow_piece(&self, threshold: f64) -> Option<ValidPieceIndex> { fn try_steal_old_slow_piece(&self, threshold: f64) -> Option<ValidPieceIndex> {
let total = self let my_avg_time = match self.counters.average_piece_download_time() {
.state Some(t) => t,
.stats None => return None,
.downloaded_and_checked_pieces };
.load(Ordering::Acquire);
// heuristic for not enough precision in average time
if total < 20 {
return None;
}
let avg_time = self.state.stats.average_piece_download_time()?;
let mut g = self.state.lock_write("try_steal_old_slow_piece"); let mut g = self.state.lock_write("try_steal_old_slow_piece");
let (idx, elapsed, piece_req) = g let (idx, elapsed, piece_req) = g
@ -1053,10 +1055,10 @@ impl PeerHandler {
.max_by_key(|(_, e, _)| *e)?; .max_by_key(|(_, e, _)| *e)?;
// heuristic for "too slow peer" // heuristic for "too slow peer"
if elapsed.as_secs_f64() > avg_time.as_secs_f64() * threshold { if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold {
debug!( debug!(
"will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}", "will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}",
idx, piece_req.peer, elapsed, avg_time idx, piece_req.peer, elapsed, my_avg_time
); );
piece_req.peer = self.addr; piece_req.peer = self.addr;
piece_req.started = Instant::now(); piece_req.started = Instant::now();
@ -1217,8 +1219,8 @@ impl PeerHandler {
// Afterwards means we are close to completion, try stealing more aggressively. // Afterwards means we are close to completion, try stealing more aggressively.
let next = match self let next = match self
.try_steal_old_slow_piece(10.) .try_steal_old_slow_piece(10.)
.or_else(|| self.reserve_next_needed_piece().ok().flatten()) .map_or_else(|| self.reserve_next_needed_piece(), |v| Ok(Some(v)))?
.or_else(|| self.try_steal_old_slow_piece(2.)) .or_else(|| self.try_steal_old_slow_piece(3.))
{ {
Some(next) => next, Some(next) => next,
None => { None => {
@ -1469,17 +1471,12 @@ impl PeerHandler {
.fetch_add(piece_len, Ordering::Relaxed); .fetch_add(piece_len, Ordering::Relaxed);
self.state.stats.total_piece_download_ms.fetch_add( self.state.stats.total_piece_download_ms.fetch_add(
full_piece_download_time.as_millis() as u64, full_piece_download_time.as_millis() as u64,
Ordering::Release, Ordering::Relaxed,
); );
// Per-peer piece counters. // Per-peer piece counters.
self.counters self.counters
.downloaded_and_checked_pieces .on_piece_downloaded(piece_len, full_piece_download_time);
.fetch_add(1, Ordering::Relaxed);
self.counters
.downloaded_and_checked_bytes
.fetch_add(piece_len, Ordering::Relaxed);
self.state.peers.reset_peer_backoff(self.addr); self.state.peers.reset_peer_backoff(self.addr);
debug!("piece={} successfully downloaded and verified", index); debug!("piece={} successfully downloaded and verified", index);

View file

@ -1,6 +1,6 @@
use std::{ use std::{
sync::{ sync::{
atomic::{AtomicU32, AtomicU64}, atomic::{AtomicU32, AtomicU64, Ordering},
Arc, Arc,
}, },
time::Duration, time::Duration,
@ -19,6 +19,30 @@ pub(crate) struct PeerCountersAtomic {
pub fetched_chunks: AtomicU32, pub fetched_chunks: AtomicU32,
pub downloaded_and_checked_pieces: AtomicU32, pub downloaded_and_checked_pieces: AtomicU32,
pub downloaded_and_checked_bytes: AtomicU64, pub downloaded_and_checked_bytes: AtomicU64,
pub total_piece_download_ms: AtomicU64,
}
impl PeerCountersAtomic {
pub(crate) fn on_piece_downloaded(&self, piece_len: u64, elapsed: Duration) {
let elapsed = elapsed.as_millis() as u64;
self.total_piece_download_ms
.fetch_add(elapsed, Ordering::Release);
self.downloaded_and_checked_pieces
.fetch_add(1, Ordering::Release);
self.downloaded_and_checked_bytes
.fetch_add(piece_len, Ordering::Relaxed);
}
pub(crate) fn average_piece_download_time(&self) -> Option<Duration> {
let downloaded_pieces = self.downloaded_and_checked_pieces.load(Ordering::Acquire);
let total_download_time = self.total_piece_download_ms.load(Ordering::Acquire);
if total_download_time == 0 || downloaded_pieces == 0 {
return None;
}
Some(Duration::from_millis(
total_download_time / downloaded_pieces as u64,
))
}
} }
#[derive(Debug)] #[derive(Debug)]

View file

@ -14,6 +14,7 @@ pub struct PeerCounters {
pub errors: u32, pub errors: u32,
pub fetched_chunks: u32, pub fetched_chunks: u32,
pub downloaded_and_checked_pieces: u32, pub downloaded_and_checked_pieces: u32,
pub total_piece_download_ms: u64,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -37,6 +38,7 @@ impl From<&super::atomic::PeerCountersAtomic> for PeerCounters {
downloaded_and_checked_pieces: counters downloaded_and_checked_pieces: counters
.downloaded_and_checked_pieces .downloaded_and_checked_pieces
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
total_piece_download_ms: counters.total_piece_download_ms.load(Ordering::Relaxed),
} }
} }
} }

View file

@ -1,7 +1,4 @@
use std::{ use std::sync::atomic::AtomicU64;
sync::atomic::{AtomicU64, Ordering},
time::Duration,
};
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct AtomicStats { pub struct AtomicStats {
@ -12,14 +9,3 @@ pub struct AtomicStats {
pub fetched_bytes: AtomicU64, pub fetched_bytes: AtomicU64,
pub total_piece_download_ms: AtomicU64, pub total_piece_download_ms: AtomicU64,
} }
impl AtomicStats {
pub fn average_piece_download_time(&self) -> Option<Duration> {
let d = self.downloaded_and_checked_pieces.load(Ordering::Acquire);
let t = self.total_piece_download_ms.load(Ordering::Acquire);
if d == 0 {
return None;
}
Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64))
}
}