diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index f5706a7..bc7ca03 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -13,6 +13,7 @@ readme = "README.md" [features] default = ["sha1-system", "default-tls"] +timed_existence = [] sha1-system = ["sha1w/sha1-system"] sha1-openssl = ["sha1w/sha1-openssl"] sha1-rust = ["sha1w/sha1-rust"] diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 5d92f2b..65aae19 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -29,7 +29,7 @@ use librqbit_core::{ lengths::{ChunkInfo, Lengths, ValidPieceIndex}, torrent_metainfo::TorrentMetaV1Info, }; -use parking_lot::{Mutex, RwLock, RwLockReadGuard}; +use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; use peer_binary_protocol::{ extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request, }; @@ -105,8 +105,15 @@ impl PeerStates { self.states.get(&addr).map(|e| f(e.value())) } - pub fn with_peer_mut(&self, addr: PeerHandle, f: impl FnOnce(&mut Peer) -> R) -> Option { - self.states.get_mut(&addr).map(|mut e| f(e.value_mut())) + pub fn with_peer_mut( + &self, + addr: PeerHandle, + reason: &'static str, + f: impl FnOnce(&mut Peer) -> R, + ) -> Option { + self.states + .get_mut(&addr) + .map(|e| f(TimedExistence::new(e, reason).value_mut())) } pub fn with_live(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option { self.states.get(&addr).and_then(|e| match &e.value().state { @@ -117,35 +124,35 @@ impl PeerStates { pub fn with_live_mut( &self, addr: PeerHandle, + reason: &'static str, f: impl FnOnce(&mut LivePeerState) -> R, ) -> Option { - self.states - .get_mut(&addr) - .and_then(|mut e| match &mut e.value_mut().state { - PeerState::Live(l) => Some(f(l)), - _ => None, - }) + self.with_peer_mut(addr, reason, |peer| match &mut peer.state { + PeerState::Live(l) => Some(f(l)), + _ => None, + }) + .flatten() } pub fn add(&self, addr: SocketAddr) -> Option { self.add_if_not_seen(addr) } pub fn mark_peer_dead(&self, handle: PeerHandle) -> Option> { - let mut peer = self.states.get_mut(&handle)?; - peer.state.to_dead() + self.with_peer_mut(handle, "mark_peer_dead", |peer| peer.state.to_dead()) + .flatten() } pub fn drop_peer(&self, handle: PeerHandle) -> Option { self.states.remove(&handle).map(|r| r.1) } pub fn mark_i_am_choked(&self, handle: PeerHandle, is_choked: bool) -> Option { - self.with_live_mut(handle, |live| { + self.with_live_mut(handle, "mark_i_am_choked", |live| { let prev = live.i_am_choked; live.i_am_choked = is_choked; prev }) } pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option { - self.with_live_mut(handle, |live| { + self.with_live_mut(handle, "mark_peer_interested", |live| { let prev = live.peer_interested; live.peer_interested = is_interested; prev @@ -156,7 +163,7 @@ impl PeerStates { handle: PeerHandle, bitfield: Vec, ) -> Option> { - self.with_live_mut(handle, |live| { + self.with_live_mut(handle, "update_bitfield_from_vec", |live| { let bitfield = BF::from_vec(bitfield); let prev = live.bitfield.take(); live.bitfield = Some(bitfield); @@ -164,7 +171,7 @@ impl PeerStates { }) } pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result { - self.with_peer_mut(h, |peer| { + self.with_peer_mut(h, "mark_peer_connecting", |peer| { peer.state .queued_to_connecting() .context("invalid peer state") @@ -177,13 +184,16 @@ impl PeerStates { } fn reset_peer_backoff(&self, handle: PeerHandle) { - self.with_peer_mut(handle, |p| { + self.with_peer_mut(handle, "reset_peer_backoff", |p| { p.stats.backoff.reset(); }); } fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { - self.states.get_mut(&handle)?.state.to_not_needed() + self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { + peer.state.to_not_needed() + }) + .flatten() } } @@ -279,6 +289,87 @@ pub struct TorrentState { finished_notify: Notify, } +#[cfg(not(feature = "timed_existence"))] +mod timed_existence { + use std::ops::{Deref, DerefMut}; + + impl TimedExistence { + #[inline(always)] + pub fn new(object: T, _reason: &'static str) -> Self { + Self(object) + } + } + + pub struct TimedExistence(T); + impl Deref for TimedExistence { + type Target = T; + + #[inline(always)] + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl DerefMut for TimedExistence { + #[inline(always)] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } + } +} + +#[cfg(feature = "timed_existence")] +mod timed_existence { + use std::ops::{Deref, DerefMut}; + use std::time::{Duration, Instant}; + use tracing::warn; + + // Prints if the object exists for too long. + // This is used to track long-lived locks for debugging. + pub struct TimedExistence { + object: T, + reason: &'static str, + started: Instant, + } + + impl TimedExistence { + pub fn new(object: T, reason: &'static str) -> Self { + Self { + object, + reason, + started: Instant::now(), + } + } + } + + impl Drop for TimedExistence { + fn drop(&mut self) { + const MAX: Duration = Duration::from_millis(1); + let elapsed = self.started.elapsed(); + let reason = self.reason; + if elapsed > MAX { + warn!("elapsed on lock {reason:?}: {elapsed:?}") + } + } + } + + impl Deref for TimedExistence { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.object + } + } + + impl DerefMut for TimedExistence { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.object + } + } +} + +pub use timed_existence::TimedExistence; + impl TorrentState { #[allow(clippy::too_many_arguments)] pub fn new( @@ -412,19 +503,26 @@ impl TorrentState { pub fn lock_read(&self) -> RwLockReadGuard { self.locked.read() } + pub fn lock_write( + &self, + reason: &'static str, + ) -> TimedExistence> { + TimedExistence::new(self.locked.write(), reason) + } fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { - self.peers.with_live_mut(peer_handle, |live| { - let g = self.locked.read(); - let bf = live.bitfield.as_ref()?; - for n in g.chunks.iter_needed_pieces() { - if bf.get(n).map(|v| *v) == Some(true) { - // in theory it should be safe without validation, but whatever. - return self.lengths.validate_piece_index(n as u32); + self.peers + .with_live_mut(peer_handle, "get_next_needed_piece", |live| { + let g = self.locked.read(); + let bf = live.bitfield.as_ref()?; + for n in g.chunks.iter_needed_pieces() { + if bf.get(n).map(|v| *v) == Some(true) { + // in theory it should be safe without validation, but whatever. + return self.lengths.validate_piece_index(n as u32); + } } - } - None - })? + None + })? } fn am_i_choked(&self, peer_handle: PeerHandle) -> Option { @@ -434,12 +532,12 @@ impl TorrentState { fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { // TODO: locking one inside the other in different order results in deadlocks. self.peers - .with_live_mut(peer_handle, |live| { + .with_live_mut(peer_handle, "reserve_next_needed_piece", |live| { if live.i_am_choked { debug!("we are choked, can't reserve next piece"); return None; } - let mut g = self.locked.write(); + let mut g = self.lock_write("reserve_next_needed_piece"); let n = { let mut n_opt = None; let bf = live.bitfield.as_ref()?; @@ -478,7 +576,7 @@ impl TorrentState { } let avg_time = self.stats.average_piece_download_time()?; - let mut g = self.locked.write(); + let mut g = self.lock_write("try_steal_old_slow_piece"); let (idx, elapsed, piece_req) = g .inflight_pieces .iter_mut() @@ -516,7 +614,7 @@ impl TorrentState { } fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { - let result = self.peers.with_peer_mut(handle, |p| { + let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| { p.state.connecting_to_live(Id20(h.peer_id)).is_some() }); match result { @@ -528,7 +626,7 @@ impl TorrentState { fn on_peer_died(self: &Arc, handle: PeerHandle, error: Option) { let mut pe = match self.peers.states.get_mut(&handle) { - Some(peer) => peer, + Some(peer) => TimedExistence::new(peer, "on_peer_died"), None => { warn!("bug: peer not found in table. Forgetting it forever"); return; @@ -537,7 +635,7 @@ impl TorrentState { match std::mem::take(&mut pe.value_mut().state) { PeerState::Connecting(_) => {} PeerState::Live(live) => { - let mut g = self.locked.write(); + let mut g = self.lock_write("mark_chunk_requests_canceled"); for req in live.inflight_requests { debug!( "peer dead, marking chunk request cancelled, index={}, chunk={}", @@ -588,7 +686,7 @@ impl TorrentState { tokio::time::sleep(dur).await; state .peers - .with_peer_mut(handle, |peer| { + .with_peer_mut(handle, "dead_to_queued", |peer| { match &peer.state { PeerState::Dead => peer.state = PeerState::Queued, other => bail!( @@ -822,8 +920,12 @@ impl PeerHandler { }; let tx = { - let g = self.state.locked.read(); - if !g.chunks.is_chunk_ready_to_upload(&chunk_info) { + if !self + .state + .lock_read() + .chunks + .is_chunk_ready_to_upload(&chunk_info) + { anyhow::bail!( "got request for a chunk that is not ready to upload. chunk {:?}", &chunk_info @@ -846,7 +948,7 @@ impl PeerHandler { #[inline(never)] fn on_have(&self, handle: PeerHandle, have: u32) { - self.state.peers.with_live_mut(handle, |live| { + self.state.peers.with_live_mut(handle, "on_have", |live| { if let Some(bitfield) = live.bitfield.as_mut() { bitfield.set(have as usize, true); debug!("updated bitfield with have={}", have); @@ -979,13 +1081,16 @@ impl PeerHandler { }; for chunk in self.state.lengths.iter_chunk_infos(next) { - if self.state.locked.read().chunks.is_chunk_downloaded(&chunk) { + if self.state.lock_read().chunks.is_chunk_downloaded(&chunk) { continue; } - match self.state.peers.with_live_mut(handle, |l| { - l.inflight_requests.insert(InflightRequest::from(&chunk)) - }) { + match self + .state + .peers + .with_live_mut(handle, "inflight_requests.insert", |l| { + l.inflight_requests.insert(InflightRequest::from(&chunk)) + }) { Some(true) => {} Some(false) => { warn!("probably a bug, we already requested {:?}", chunk); @@ -1039,11 +1144,13 @@ impl PeerHandler { #[inline(never)] fn on_i_am_unchoked(&self, handle: PeerHandle) { debug!("we are unchoked"); - self.state.peers.with_live_mut(handle, |live| { - live.i_am_choked = false; - live.have_notify.notify_waiters(); - live.requests_sem.add_permits(16); - }); + self.state + .peers + .with_live_mut(handle, "on_i_am_unchoked", |live| { + live.i_am_choked = false; + live.have_notify.notify_waiters(); + live.requests_sem.add_permits(16); + }); } #[inline(never)] @@ -1061,7 +1168,7 @@ impl PeerHandler { self.state .peers - .with_live_mut(handle, |h| { + .with_live_mut(handle, "inflight_requests.remove", |h| { h.requests_sem.add_permits(1); self.state @@ -1084,7 +1191,7 @@ impl PeerHandler { .context("peer not found")??; let full_piece_download_time = { - let mut g = self.state.locked.write(); + let mut g = self.state.lock_write("mark_chunk_downloaded"); match g.chunks.mark_chunk_downloaded(&piece) { Some(ChunkMarkingResult::Completed) => { @@ -1156,7 +1263,7 @@ impl PeerHandler { Ordering::Relaxed, ); { - let mut g = self.state.locked.write(); + let mut g = self.state.lock_write("mark_piece_downloaded"); g.chunks.mark_piece_downloaded(chunk_info.piece_index); self.state.peers.reset_peer_backoff(handle); @@ -1175,8 +1282,7 @@ impl PeerHandler { false => { warn!("checksum for piece={} did not validate", index,); self.state - .locked - .write() + .lock_write("mark_piece_broken") .chunks .mark_piece_broken(chunk_info.piece_index); } diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index 8ea9f1b..eaf40ba 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -13,6 +13,7 @@ readme = "README.md" [features] default = ["sha1-system", "default-tls"] +timed_existence = ["librqbit/timed_existence"] sha1-system = ["librqbit/sha1-system"] sha1-openssl = ["librqbit/sha1-openssl"] sha1-rust = ["librqbit/sha1-rust"]