timed existence for lock time debugging

This commit is contained in:
Igor Katson 2023-11-19 19:40:45 +00:00
parent 1a55936346
commit ff71ade190
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 159 additions and 51 deletions

View file

@ -13,6 +13,7 @@ readme = "README.md"
[features]
default = ["sha1-system", "default-tls"]
timed_existence = []
sha1-system = ["sha1w/sha1-system"]
sha1-openssl = ["sha1w/sha1-openssl"]
sha1-rust = ["sha1w/sha1-rust"]

View file

@ -29,7 +29,7 @@ use librqbit_core::{
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
torrent_metainfo::TorrentMetaV1Info,
};
use parking_lot::{Mutex, RwLock, RwLockReadGuard};
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use peer_binary_protocol::{
extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request,
};
@ -105,8 +105,15 @@ impl PeerStates {
self.states.get(&addr).map(|e| f(e.value()))
}
pub fn with_peer_mut<R>(&self, addr: PeerHandle, f: impl FnOnce(&mut Peer) -> R) -> Option<R> {
self.states.get_mut(&addr).map(|mut e| f(e.value_mut()))
pub fn with_peer_mut<R>(
&self,
addr: PeerHandle,
reason: &'static str,
f: impl FnOnce(&mut Peer) -> R,
) -> Option<R> {
self.states
.get_mut(&addr)
.map(|e| f(TimedExistence::new(e, reason).value_mut()))
}
pub fn with_live<R>(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option<R> {
self.states.get(&addr).and_then(|e| match &e.value().state {
@ -117,35 +124,35 @@ impl PeerStates {
pub fn with_live_mut<R>(
&self,
addr: PeerHandle,
reason: &'static str,
f: impl FnOnce(&mut LivePeerState) -> R,
) -> Option<R> {
self.states
.get_mut(&addr)
.and_then(|mut e| match &mut e.value_mut().state {
PeerState::Live(l) => Some(f(l)),
_ => None,
})
self.with_peer_mut(addr, reason, |peer| match &mut peer.state {
PeerState::Live(l) => Some(f(l)),
_ => None,
})
.flatten()
}
pub fn add(&self, addr: SocketAddr) -> Option<PeerHandle> {
self.add_if_not_seen(addr)
}
pub fn mark_peer_dead(&self, handle: PeerHandle) -> Option<Option<LivePeerState>> {
let mut peer = self.states.get_mut(&handle)?;
peer.state.to_dead()
self.with_peer_mut(handle, "mark_peer_dead", |peer| peer.state.to_dead())
.flatten()
}
pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> {
self.states.remove(&handle).map(|r| r.1)
}
pub fn mark_i_am_choked(&self, handle: PeerHandle, is_choked: bool) -> Option<bool> {
self.with_live_mut(handle, |live| {
self.with_live_mut(handle, "mark_i_am_choked", |live| {
let prev = live.i_am_choked;
live.i_am_choked = is_choked;
prev
})
}
pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option<bool> {
self.with_live_mut(handle, |live| {
self.with_live_mut(handle, "mark_peer_interested", |live| {
let prev = live.peer_interested;
live.peer_interested = is_interested;
prev
@ -156,7 +163,7 @@ impl PeerStates {
handle: PeerHandle,
bitfield: Vec<u8>,
) -> Option<Option<BF>> {
self.with_live_mut(handle, |live| {
self.with_live_mut(handle, "update_bitfield_from_vec", |live| {
let bitfield = BF::from_vec(bitfield);
let prev = live.bitfield.take();
live.bitfield = Some(bitfield);
@ -164,7 +171,7 @@ impl PeerStates {
})
}
pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<PeerRx> {
self.with_peer_mut(h, |peer| {
self.with_peer_mut(h, "mark_peer_connecting", |peer| {
peer.state
.queued_to_connecting()
.context("invalid peer state")
@ -177,13 +184,16 @@ impl PeerStates {
}
fn reset_peer_backoff(&self, handle: PeerHandle) {
self.with_peer_mut(handle, |p| {
self.with_peer_mut(handle, "reset_peer_backoff", |p| {
p.stats.backoff.reset();
});
}
fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<LivePeerState> {
self.states.get_mut(&handle)?.state.to_not_needed()
self.with_peer_mut(handle, "mark_peer_not_needed", |peer| {
peer.state.to_not_needed()
})
.flatten()
}
}
@ -279,6 +289,87 @@ pub struct TorrentState {
finished_notify: Notify,
}
#[cfg(not(feature = "timed_existence"))]
mod timed_existence {
use std::ops::{Deref, DerefMut};
impl<T> TimedExistence<T> {
#[inline(always)]
pub fn new(object: T, _reason: &'static str) -> Self {
Self(object)
}
}
pub struct TimedExistence<T>(T);
impl<T> Deref for TimedExistence<T> {
type Target = T;
#[inline(always)]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for TimedExistence<T> {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
}
#[cfg(feature = "timed_existence")]
mod timed_existence {
use std::ops::{Deref, DerefMut};
use std::time::{Duration, Instant};
use tracing::warn;
// Prints if the object exists for too long.
// This is used to track long-lived locks for debugging.
pub struct TimedExistence<T> {
object: T,
reason: &'static str,
started: Instant,
}
impl<T> TimedExistence<T> {
pub fn new(object: T, reason: &'static str) -> Self {
Self {
object,
reason,
started: Instant::now(),
}
}
}
impl<T> Drop for TimedExistence<T> {
fn drop(&mut self) {
const MAX: Duration = Duration::from_millis(1);
let elapsed = self.started.elapsed();
let reason = self.reason;
if elapsed > MAX {
warn!("elapsed on lock {reason:?}: {elapsed:?}")
}
}
}
impl<T> Deref for TimedExistence<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.object
}
}
impl<T> DerefMut for TimedExistence<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.object
}
}
}
pub use timed_existence::TimedExistence;
impl TorrentState {
#[allow(clippy::too_many_arguments)]
pub fn new(
@ -412,19 +503,26 @@ impl TorrentState {
pub fn lock_read(&self) -> RwLockReadGuard<TorrentStateLocked> {
self.locked.read()
}
pub fn lock_write(
&self,
reason: &'static str,
) -> TimedExistence<RwLockWriteGuard<TorrentStateLocked>> {
TimedExistence::new(self.locked.write(), reason)
}
fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
self.peers.with_live_mut(peer_handle, |live| {
let g = self.locked.read();
let bf = live.bitfield.as_ref()?;
for n in g.chunks.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
// in theory it should be safe without validation, but whatever.
return self.lengths.validate_piece_index(n as u32);
self.peers
.with_live_mut(peer_handle, "get_next_needed_piece", |live| {
let g = self.locked.read();
let bf = live.bitfield.as_ref()?;
for n in g.chunks.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
// in theory it should be safe without validation, but whatever.
return self.lengths.validate_piece_index(n as u32);
}
}
}
None
})?
None
})?
}
fn am_i_choked(&self, peer_handle: PeerHandle) -> Option<bool> {
@ -434,12 +532,12 @@ impl TorrentState {
fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
// TODO: locking one inside the other in different order results in deadlocks.
self.peers
.with_live_mut(peer_handle, |live| {
.with_live_mut(peer_handle, "reserve_next_needed_piece", |live| {
if live.i_am_choked {
debug!("we are choked, can't reserve next piece");
return None;
}
let mut g = self.locked.write();
let mut g = self.lock_write("reserve_next_needed_piece");
let n = {
let mut n_opt = None;
let bf = live.bitfield.as_ref()?;
@ -478,7 +576,7 @@ impl TorrentState {
}
let avg_time = self.stats.average_piece_download_time()?;
let mut g = self.locked.write();
let mut g = self.lock_write("try_steal_old_slow_piece");
let (idx, elapsed, piece_req) = g
.inflight_pieces
.iter_mut()
@ -516,7 +614,7 @@ impl TorrentState {
}
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
let result = self.peers.with_peer_mut(handle, |p| {
let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.state.connecting_to_live(Id20(h.peer_id)).is_some()
});
match result {
@ -528,7 +626,7 @@ impl TorrentState {
fn on_peer_died(self: &Arc<Self>, handle: PeerHandle, error: Option<anyhow::Error>) {
let mut pe = match self.peers.states.get_mut(&handle) {
Some(peer) => peer,
Some(peer) => TimedExistence::new(peer, "on_peer_died"),
None => {
warn!("bug: peer not found in table. Forgetting it forever");
return;
@ -537,7 +635,7 @@ impl TorrentState {
match std::mem::take(&mut pe.value_mut().state) {
PeerState::Connecting(_) => {}
PeerState::Live(live) => {
let mut g = self.locked.write();
let mut g = self.lock_write("mark_chunk_requests_canceled");
for req in live.inflight_requests {
debug!(
"peer dead, marking chunk request cancelled, index={}, chunk={}",
@ -588,7 +686,7 @@ impl TorrentState {
tokio::time::sleep(dur).await;
state
.peers
.with_peer_mut(handle, |peer| {
.with_peer_mut(handle, "dead_to_queued", |peer| {
match &peer.state {
PeerState::Dead => peer.state = PeerState::Queued,
other => bail!(
@ -822,8 +920,12 @@ impl PeerHandler {
};
let tx = {
let g = self.state.locked.read();
if !g.chunks.is_chunk_ready_to_upload(&chunk_info) {
if !self
.state
.lock_read()
.chunks
.is_chunk_ready_to_upload(&chunk_info)
{
anyhow::bail!(
"got request for a chunk that is not ready to upload. chunk {:?}",
&chunk_info
@ -846,7 +948,7 @@ impl PeerHandler {
#[inline(never)]
fn on_have(&self, handle: PeerHandle, have: u32) {
self.state.peers.with_live_mut(handle, |live| {
self.state.peers.with_live_mut(handle, "on_have", |live| {
if let Some(bitfield) = live.bitfield.as_mut() {
bitfield.set(have as usize, true);
debug!("updated bitfield with have={}", have);
@ -979,13 +1081,16 @@ impl PeerHandler {
};
for chunk in self.state.lengths.iter_chunk_infos(next) {
if self.state.locked.read().chunks.is_chunk_downloaded(&chunk) {
if self.state.lock_read().chunks.is_chunk_downloaded(&chunk) {
continue;
}
match self.state.peers.with_live_mut(handle, |l| {
l.inflight_requests.insert(InflightRequest::from(&chunk))
}) {
match self
.state
.peers
.with_live_mut(handle, "inflight_requests.insert", |l| {
l.inflight_requests.insert(InflightRequest::from(&chunk))
}) {
Some(true) => {}
Some(false) => {
warn!("probably a bug, we already requested {:?}", chunk);
@ -1039,11 +1144,13 @@ impl PeerHandler {
#[inline(never)]
fn on_i_am_unchoked(&self, handle: PeerHandle) {
debug!("we are unchoked");
self.state.peers.with_live_mut(handle, |live| {
live.i_am_choked = false;
live.have_notify.notify_waiters();
live.requests_sem.add_permits(16);
});
self.state
.peers
.with_live_mut(handle, "on_i_am_unchoked", |live| {
live.i_am_choked = false;
live.have_notify.notify_waiters();
live.requests_sem.add_permits(16);
});
}
#[inline(never)]
@ -1061,7 +1168,7 @@ impl PeerHandler {
self.state
.peers
.with_live_mut(handle, |h| {
.with_live_mut(handle, "inflight_requests.remove", |h| {
h.requests_sem.add_permits(1);
self.state
@ -1084,7 +1191,7 @@ impl PeerHandler {
.context("peer not found")??;
let full_piece_download_time = {
let mut g = self.state.locked.write();
let mut g = self.state.lock_write("mark_chunk_downloaded");
match g.chunks.mark_chunk_downloaded(&piece) {
Some(ChunkMarkingResult::Completed) => {
@ -1156,7 +1263,7 @@ impl PeerHandler {
Ordering::Relaxed,
);
{
let mut g = self.state.locked.write();
let mut g = self.state.lock_write("mark_piece_downloaded");
g.chunks.mark_piece_downloaded(chunk_info.piece_index);
self.state.peers.reset_peer_backoff(handle);
@ -1175,8 +1282,7 @@ impl PeerHandler {
false => {
warn!("checksum for piece={} did not validate", index,);
self.state
.locked
.write()
.lock_write("mark_piece_broken")
.chunks
.mark_piece_broken(chunk_info.piece_index);
}

View file

@ -13,6 +13,7 @@ readme = "README.md"
[features]
default = ["sha1-system", "default-tls"]
timed_existence = ["librqbit/timed_existence"]
sha1-system = ["librqbit/sha1-system"]
sha1-openssl = ["librqbit/sha1-openssl"]
sha1-rust = ["librqbit/sha1-rust"]