From 8c67127a853f5d5613eacdcc9c838379a4c1762e Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 20 Nov 2023 11:30:16 +0000 Subject: [PATCH] Made peer chunk requester the same task as manage_peer --- crates/librqbit/src/torrent_state.rs | 56 +++++++++++++--------------- 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index bb46f9f..326a9a7 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -478,6 +478,7 @@ impl TorrentState { let handler = PeerHandler { addr, + on_bitfield_notify: Arc::new(Default::default()), state: state.clone(), spawner, }; @@ -486,17 +487,23 @@ impl TorrentState { read_write_timeout: state.options.peer_read_write_timeout, ..Default::default() }; + let peer_connection = PeerConnection::new( addr, state.info_hash, state.peer_id, - handler, + &handler, Some(options), spawner, ); + let requester = handler.task_peer_chunk_requester(addr); - let res = peer_connection.manage_peer(rx).await; - let state = peer_connection.into_handler().state; + let res = tokio::select! { + r = requester => {r} + r = peer_connection.manage_peer(rx) => {r} + }; + + let state = handler.state; state.peer_semaphore.add_permits(1); match res { @@ -898,11 +905,12 @@ impl TorrentState { #[derive(Clone)] struct PeerHandler { state: Arc, + on_bitfield_notify: Arc, addr: SocketAddr, spawner: BlockingSpawner, } -impl PeerConnectionHandler for PeerHandler { +impl<'a> PeerConnectionHandler for &'a PeerHandler { fn on_received_message(&self, message: Message>) -> anyhow::Result<()> { match message { Message::Request(request) => { @@ -1046,20 +1054,12 @@ impl PeerHandler { return Ok(()); } - // Additional spawn per peer, not good. - spawn( - span!( - parent: None, - Level::ERROR, - "peer_chunk_requester", - peer = handle.to_string() - ), - self.clone().task_peer_chunk_requester(handle), - ); + self.on_bitfield_notify.notify_waiters(); Ok(()) } - async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> { + async fn task_peer_chunk_requester(&self, handle: PeerHandle) -> anyhow::Result<()> { + self.on_bitfield_notify.notified().await; let tx = match self.state.peers.clone_tx(handle) { Some(tx) => tx, None => return Ok(()), @@ -1068,21 +1068,7 @@ impl PeerHandler { WriterRequest::Message(MessageOwned::Unchoke), WriterRequest::Message(MessageOwned::Interested), ])?; - self.requester(handle).await?; - Ok::<_, anyhow::Error>(()) - } - fn on_i_am_choked(&self, handle: PeerHandle) { - debug!("we are choked"); - self.state.peers.mark_i_am_choked(handle, true); - } - - fn on_peer_interested(&self, handle: PeerHandle) { - debug!("peer is interested"); - self.state.peers.mark_peer_interested(handle, true); - } - - async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> { let notify = match self .state .peers @@ -1091,8 +1077,6 @@ impl PeerHandler { Some(notify) => notify, None => return Ok(()), }; - - // TODO: this might dangle, same below. #[allow(unused_must_use)] { timeout(Duration::from_secs(60), notify.notified()).await; @@ -1195,6 +1179,16 @@ impl PeerHandler { } } + fn on_i_am_choked(&self, handle: PeerHandle) { + debug!("we are choked"); + self.state.peers.mark_i_am_choked(handle, true); + } + + fn on_peer_interested(&self, handle: PeerHandle) { + debug!("peer is interested"); + self.state.peers.mark_peer_interested(handle, true); + } + fn reopen_read_only(&self) -> anyhow::Result<()> { fn dummy_file() -> anyhow::Result { #[cfg(target_os = "windows")]