From 2693c0eb71ffdefdb04053abc14e1ff2351b3ff6 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 20 Nov 2023 11:42:37 +0000 Subject: [PATCH] Move tx into peer --- crates/librqbit/src/peer_state.rs | 8 +++- crates/librqbit/src/torrent_state.rs | 57 +++++++++++----------------- 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 1f883ed..949736e 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -183,11 +183,15 @@ impl PeerStateNoMut { } } - pub fn queued_to_connecting(&mut self, counters: &AggregatePeerStatsAtomic) -> Option { + pub fn queued_to_connecting( + &mut self, + counters: &AggregatePeerStatsAtomic, + ) -> Option<(PeerRx, PeerTx)> { if let PeerState::Queued = &self.0 { let (tx, rx) = unbounded_channel(); + let tx_2 = tx.clone(); self.set(PeerState::Connecting(tx), counters); - Some(rx) + Some((rx, tx_2)) } else { None } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 21ea526..f5dcd0c 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -205,7 +205,7 @@ impl PeerStates { live.bitfield = BF::from_vec(bitfield); }) } - pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result { + pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<(PeerRx, PeerTx)> { let rx = self .with_peer_mut(h, "mark_peer_connecting", |peer| { peer.state @@ -216,10 +216,6 @@ impl PeerStates { Ok(rx) } - pub fn clone_tx(&self, handle: PeerHandle) -> Option { - self.with_live(handle, |live| live.tx.clone()) - } - fn reset_peer_backoff(&self, handle: PeerHandle) { self.with_peer_mut(handle, "reset_peer_backoff", |p| { p.stats.backoff.reset(); @@ -474,7 +470,7 @@ impl TorrentState { spawner: BlockingSpawner, ) -> anyhow::Result<()> { let state = self; - let rx = state.peers.mark_peer_connecting(addr)?; + let (rx, tx) = state.peers.mark_peer_connecting(addr)?; let handler = PeerHandler { addr, @@ -482,6 +478,7 @@ impl TorrentState { have_notify: Default::default(), requests_sem: Semaphore::new(0), state: state.clone(), + tx, spawner, }; let options = PeerConnectionOptions { @@ -918,6 +915,8 @@ struct PeerHandler { addr: SocketAddr, spawner: BlockingSpawner, + + tx: PeerTx, } impl<'a> PeerConnectionHandler for &'a PeerHandler { @@ -1008,31 +1007,24 @@ impl PeerHandler { } }; - let tx = { - if !self - .state - .lock_read("is_chunk_ready_to_upload") - .chunks - .is_chunk_ready_to_upload(&chunk_info) - { - anyhow::bail!( - "got request for a chunk that is not ready to upload. chunk {:?}", - &chunk_info - ); - } - - self.state - .peers - .clone_tx(peer_handle) - .context("peer died, dropping chunk that it requested")? - }; + if !self + .state + .lock_read("is_chunk_ready_to_upload") + .chunks + .is_chunk_ready_to_upload(&chunk_info) + { + anyhow::bail!( + "got request for a chunk that is not ready to upload. chunk {:?}", + &chunk_info + ); + } // TODO: this is not super efficient as it does copying multiple times. // Theoretically, this could be done in the sending code, so that it reads straight into // the send buffer. let request = WriterRequest::ReadChunkRequest(chunk_info); debug!("sending {:?}", &request); - Ok::<_, anyhow::Error>(tx.send(request)?) + Ok::<_, anyhow::Error>(self.tx.send(request)?) } fn on_have(&self, handle: PeerHandle, have: u32) { @@ -1055,11 +1047,12 @@ impl PeerHandler { .update_bitfield_from_vec(handle, bitfield.0); if !self.state.am_i_interested_in_peer(handle) { - let tx = self.state.peers.clone_tx(handle).context("peer dropped")?; - tx.send(WriterRequest::Message(MessageOwned::Unchoke))?; - tx.send(WriterRequest::Message(MessageOwned::NotInterested))?; + self.tx + .send(WriterRequest::Message(MessageOwned::Unchoke))?; + self.tx + .send(WriterRequest::Message(MessageOwned::NotInterested))?; if self.state.is_finished() { - tx.send(WriterRequest::Disconnect)?; + self.tx.send(WriterRequest::Disconnect)?; } return Ok(()); } @@ -1070,11 +1063,7 @@ impl PeerHandler { async fn task_peer_chunk_requester(&self, handle: PeerHandle) -> anyhow::Result<()> { self.on_bitfield_notify.notified().await; - let tx = match self.state.peers.clone_tx(handle) { - Some(tx) => tx, - None => return Ok(()), - }; - tx.send_many([ + self.tx.send_many([ WriterRequest::Message(MessageOwned::Unchoke), WriterRequest::Message(MessageOwned::Interested), ])?;