Trying to see why it hangs for a bit sometimes

This commit is contained in:
Igor Katson 2023-11-19 20:38:41 +00:00
parent 17d40824b4
commit 4b3da0bd69
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 40 additions and 42 deletions

View file

@ -249,6 +249,7 @@ pub struct StatsSnapshot {
#[serde(skip)] #[serde(skip)]
pub time: Instant, pub time: Instant,
pub queued_peers: u32, pub queued_peers: u32,
pub dead_peers: u32,
total_piece_download_ms: u64, total_piece_download_ms: u64,
} }
@ -322,7 +323,7 @@ mod timed_existence {
} }
#[inline(always)] #[inline(always)]
pub fn timeit<R>(_name: &'static str, f: impl FnOnce() -> R) -> R { pub fn timeit<R>(_n: impl std::fmt::Display, f: impl FnOnce() -> R) -> R {
f() f()
} }
} }
@ -377,12 +378,12 @@ mod timed_existence {
} }
} }
pub fn timeit<R>(name: &'static str, f: impl FnOnce() -> R) -> R { pub fn timeit<R>(name: impl std::fmt::Display, f: impl FnOnce() -> R) -> R {
let now = Instant::now(); let now = Instant::now();
let r = f(); let r = f();
let elapsed = now.elapsed(); let elapsed = now.elapsed();
if elapsed > MAX { if elapsed > MAX {
warn!("elapsed on {name:?}: {elapsed:?}") warn!("elapsed on \"{name:}\": {elapsed:?}")
} }
r r
} }
@ -813,10 +814,6 @@ impl TorrentState {
true true
} }
pub fn peer_stats_snapshot(&self) -> AggregatePeerStats {
self.peers.stats()
}
pub fn stats_snapshot(&self) -> StatsSnapshot { pub fn stats_snapshot(&self) -> StatsSnapshot {
use Ordering::*; use Ordering::*;
let peer_stats = self.peers.stats(); let peer_stats = self.peers.stats();
@ -836,6 +833,7 @@ impl TorrentState {
initially_needed_bytes: self.needed, initially_needed_bytes: self.needed,
remaining_bytes: remaining, remaining_bytes: remaining,
queued_peers: peer_stats.queued as u32, queued_peers: peer_stats.queued as u32,
dead_peers: peer_stats.dead as u32,
total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed),
} }
} }
@ -857,35 +855,35 @@ struct PeerHandler {
impl PeerConnectionHandler for PeerHandler { impl PeerConnectionHandler for PeerHandler {
fn on_received_message(&self, message: Message<ByteBuf<'_>>) -> anyhow::Result<()> { fn on_received_message(&self, message: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
timeit("on_received_message", || { match message {
match message { Message::Request(request) => {
Message::Request(request) => { self.on_download_request(self.addr, request)
self.on_download_request(self.addr, request) .context("on_download_request")?;
.context("on_download_request")?; }
} Message::Bitfield(b) => self
Message::Bitfield(b) => self .on_bitfield(self.addr, b.clone_to_owned())
.on_bitfield(self.addr, b.clone_to_owned()) .context("on_bitfield")?,
.context("on_bitfield")?, Message::Choke => self.on_i_am_choked(self.addr),
Message::Choke => self.on_i_am_choked(self.addr), Message::Unchoke => self.on_i_am_unchoked(self.addr),
Message::Unchoke => self.on_i_am_unchoked(self.addr), Message::Interested => self.on_peer_interested(self.addr),
Message::Interested => self.on_peer_interested(self.addr), Message::Piece(piece) => {
Message::Piece(piece) => { timeit("on_received_piece", || {
self.on_received_piece(self.addr, piece) self.on_received_piece(self.addr, piece)
.context("on_received_piece")?; .context("on_received_piece")
} })?;
Message::KeepAlive => { }
debug!("keepalive received"); Message::KeepAlive => {
} debug!("keepalive received");
Message::Have(h) => self.on_have(self.addr, h), }
Message::NotInterested => { Message::Have(h) => self.on_have(self.addr, h),
info!("received \"not interested\", but we don't care yet") Message::NotInterested => {
} info!("received \"not interested\", but we don't care yet")
message => { }
warn!("received unsupported message {:?}, ignoring", message); message => {
} warn!("received unsupported message {:?}, ignoring", message);
}; }
Ok(()) };
}) Ok(())
} }
fn get_have_bytes(&self) -> u64 { fn get_have_bytes(&self) -> u64 {

View file

@ -11,6 +11,7 @@ use librqbit::{
SessionOptions, SessionOptions,
}, },
spawn_utils::{spawn, BlockingSpawner}, spawn_utils::{spawn, BlockingSpawner},
torrent_state::timeit,
}; };
use size_format::SizeFormatterBinary as SF; use size_format::SizeFormatterBinary as SF;
use tracing::{error, info, span, warn, Level}; use tracing::{error, info, span, warn, Level};
@ -243,8 +244,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
info!("[{}] initializing", idx); info!("[{}] initializing", idx);
}, },
ManagedTorrentState::Running(handle) => { ManagedTorrentState::Running(handle) => {
let peer_stats = handle.torrent_state().peer_stats_snapshot(); let stats = timeit("stats_snapshot", || handle.torrent_state().stats_snapshot());
let stats = handle.torrent_state().stats_snapshot();
let speed = handle.speed_estimator(); let speed = handle.speed_estimator();
let total = stats.total_bytes; let total = stats.total_bytes;
let progress = stats.total_bytes - stats.remaining_bytes; let progress = stats.total_bytes - stats.remaining_bytes;
@ -263,11 +263,11 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
SF::new(stats.remaining_bytes), SF::new(stats.remaining_bytes),
SF::new(total), SF::new(total),
SF::new(stats.uploaded_bytes), SF::new(stats.uploaded_bytes),
peer_stats.live, stats.live_peers,
peer_stats.connecting, stats.connecting_peers,
peer_stats.queued, stats.queued_peers,
peer_stats.seen, stats.seen_peers,
peer_stats.dead stats.dead_peers
); );
}, },
} }