From efaa36a16150f47cdcce080886eb037665774c34 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 5 Dec 2023 20:02:54 +0000 Subject: [PATCH] SAving, its broken --- crates/librqbit/src/peer_connection.rs | 2 + crates/librqbit/src/torrent_state/live/mod.rs | 86 ++++++++++--------- .../src/torrent_state/live/peer/mod.rs | 17 +++- .../src/torrent_state/live/peers/mod.rs | 5 ++ 4 files changed, 67 insertions(+), 43 deletions(-) diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index c2e2841..33c3f2a 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -157,6 +157,8 @@ impl PeerConnection { let h_supports_extended = handshake.supports_extended(); + self.handler.on_handshake(handshake)?; + self.manage_peer( h_supports_extended, read_so_far, diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index a39f035..c10ab16 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -376,6 +376,7 @@ impl TorrentStateLive { let peer = Peer::new_live_for_incoming_connection( Id20(checked_peer.handshake.peer_id), tx.clone(), + &self.peers.stats, ); let counters = peer.stats.counters.clone(); vac.insert(peer); @@ -400,7 +401,6 @@ impl TorrentStateLive { rx: PeerRx, ) -> anyhow::Result<()> { // TODO: bump counters for incoming - let handler = PeerHandler { addr: checked_peer.addr, on_bitfield_notify: Default::default(), @@ -427,7 +427,7 @@ impl TorrentStateLive { Some(options), self.meta.spawner, ); - let requester = handler.task_peer_chunk_requester(checked_peer.addr); + let requester = handler.task_peer_chunk_requester(); let res = tokio::select! { r = requester => {r} @@ -458,7 +458,6 @@ impl TorrentStateLive { async fn task_manage_outgoing_peer(self: Arc, addr: SocketAddr) -> anyhow::Result<()> { let state = self; let (rx, tx) = state.peers.mark_peer_connecting(addr)?; - let counters = state .peers .with_peer(addr, |p| p.stats.counters.clone()) @@ -490,7 +489,7 @@ impl TorrentStateLive { Some(options), state.meta.spawner, ); - let requester = handler.task_peer_chunk_requester(addr); + let requester = handler.task_peer_chunk_requester(); handler .counters @@ -597,18 +596,10 @@ impl TorrentStateLive { } fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { - let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| { + self.peers.with_peer_mut(handle, "set_peer_live", |p| { p.state - .connecting_to_live(Id20(h.peer_id), &self.peers.stats) - .is_some() + .connecting_to_live(Id20(h.peer_id), &self.peers.stats); }); - match result { - Some(true) => { - trace!("set peer to live") - } - Some(false) => debug!("can't set peer live, it was in wrong state"), - None => debug!("can't set peer live, it disappeared"), - } } pub fn get_uploaded_bytes(&self) -> u64 { @@ -867,6 +858,8 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> { self.state.set_peer_live(self.addr, handshake); + self.tx + .send(WriterRequest::Message(MessageOwned::Unchoke))?; Ok(()) } @@ -1116,7 +1109,8 @@ impl PeerHandler { self.state .peers .with_live_mut(self.addr, "on_have", |live| { - // If bitfield wasn't allocated yet, let's do it. Some clients send haves before bitfield. + // If bitfield wasn't allocated yet, let's do it. Some clients start empty so they never + // send bitfields. if live.bitfield.is_empty() { live.bitfield = BF::from_vec(vec![0; self.state.lengths.piece_bitfield_bytes()]); @@ -1129,6 +1123,7 @@ impl PeerHandler { } }; trace!("updated bitfield with have={}", have); + self.on_bitfield_notify.notify_waiters(); }); } @@ -1144,10 +1139,40 @@ impl PeerHandler { self.state .peers .update_bitfield_from_vec(self.addr, bitfield.0); + self.on_bitfield_notify.notify_waiters(); + Ok(()) + } + + async fn wait_for_any_notify(&self, notify: &Notify, check: impl Fn() -> bool) { + // To remove possibility of races, we first grab a token, then check + // if we need it, and only if so, await. + let notified = notify.notified(); + if check() { + return; + } + notified.await; + } + + async fn wait_for_bitfield(&self) { + self.wait_for_any_notify(&self.on_bitfield_notify, || { + self.state + .peers + .with_live(self.addr, |live| !live.bitfield.is_empty()) + .unwrap_or_default() + }) + .await; + } + + async fn wait_for_unchoke(&self) { + self.wait_for_any_notify(&self.unchoke_notify, || !self.locked.read().i_am_choked) + .await; + } + + async fn task_peer_chunk_requester(&self) -> anyhow::Result<()> { + let handle = self.addr; + self.wait_for_bitfield().await; if !self.state.am_i_interested_in_peer(self.addr) { - self.tx - .send(WriterRequest::Message(MessageOwned::Unchoke))?; self.tx .send(WriterRequest::Message(MessageOwned::NotInterested))?; if self.state.is_finished() { @@ -1155,32 +1180,11 @@ impl PeerHandler { } return Ok(()); } - - self.on_bitfield_notify.notify_waiters(); - Ok(()) - } - - async fn task_peer_chunk_requester(&self, handle: PeerHandle) -> anyhow::Result<()> { - self.on_bitfield_notify.notified().await; - self.tx.send_many([ - WriterRequest::Message(MessageOwned::Unchoke), - WriterRequest::Message(MessageOwned::Interested), - ])?; - - #[allow(unused_must_use)] - { - timeout(Duration::from_secs(60), self.unchoke_notify.notified()).await; - } + self.tx + .send_many([WriterRequest::Message(MessageOwned::Interested)])?; loop { - if self.locked.read().i_am_choked { - debug!("we are choked, can't reserve next piece"); - #[allow(unused_must_use)] - { - timeout(Duration::from_secs(60), self.unchoke_notify.notified()).await; - } - continue; - } + self.wait_for_unchoke().await; if self.state.is_finished() { debug!("nothing left to download, looping forever until manage_peer quits"); diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index b0eee03..75d182b 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -53,9 +53,15 @@ pub(crate) struct Peer { } impl Peer { - pub fn new_live_for_incoming_connection(peer_id: Id20, tx: PeerTx) -> Self { + pub fn new_live_for_incoming_connection( + peer_id: Id20, + tx: PeerTx, + counters: &AggregatePeerStatsAtomic, + ) -> Self { + let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx))); + counters.inc(&state.0); Self { - state: PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx))), + state, stats: Default::default(), } } @@ -118,6 +124,13 @@ impl PeerStateNoMut { std::mem::replace(&mut self.0, new) } + pub fn get_live(&self) -> Option<&LivePeerState> { + match &self.0 { + PeerState::Live(l) => Some(l), + _ => None, + } + } + pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { match &mut self.0 { PeerState::Live(l) => Some(l), diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index df359b8..2e4b4af 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -53,6 +53,11 @@ impl PeerStates { .map(|e| f(TimedExistence::new(e, reason).value_mut())) } + pub fn with_live(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option { + self.with_peer(addr, |peer| peer.state.get_live().map(f)) + .flatten() + } + pub fn with_live_mut( &self, addr: PeerHandle,