diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index a7c77ca..d43429d 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -6,7 +6,11 @@ use std::{ use anyhow::{bail, Context}; use buffers::{ByteBuf, ByteBufOwned}; use clone_to_owned::CloneToOwned; -use librqbit_core::{hash_id::Id20, lengths::ChunkInfo, peer_id::try_decode_peer_id}; +use librqbit_core::{ + hash_id::Id20, + lengths::{ChunkInfo, ValidPieceIndex}, + peer_id::try_decode_peer_id, +}; use parking_lot::RwLock; use peer_binary_protocol::{ extended::{handshake::ExtendedHandshake, ExtendedMessage}, @@ -102,6 +106,7 @@ impl PeerConnection { read_buf: ReadBuf, handshake: Handshake, mut conn: tokio::net::TcpStream, + have_broadcast: tokio::sync::broadcast::Receiver, ) -> anyhow::Result<()> { use tokio::io::AsyncWriteExt; @@ -141,6 +146,7 @@ impl PeerConnection { write_buf, conn, outgoing_chan, + have_broadcast, ) .await } @@ -148,6 +154,7 @@ impl PeerConnection { pub async fn manage_peer_outgoing( &self, outgoing_chan: tokio::sync::mpsc::UnboundedReceiver, + have_broadcast: tokio::sync::broadcast::Receiver, ) -> anyhow::Result<()> { use tokio::io::AsyncWriteExt; let rwtimeout = self @@ -200,6 +207,7 @@ impl PeerConnection { write_buf, conn, outgoing_chan, + have_broadcast, ) .await } @@ -211,6 +219,7 @@ impl PeerConnection { mut write_buf: Vec, mut conn: tokio::net::TcpStream, mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver, + mut have_broadcast: tokio::sync::broadcast::Receiver, ) -> anyhow::Result<()> { use tokio::io::AsyncWriteExt; @@ -253,12 +262,21 @@ impl PeerConnection { } loop { - let req = match timeout(keep_alive_interval, outgoing_chan.recv()).await { - Ok(Some(msg)) => msg, - Ok(None) => { - anyhow::bail!("closing writer, channel closed") - } - Err(_) => WriterRequest::Message(MessageOwned::KeepAlive), + let req = loop { + break tokio::select! { + r = have_broadcast.recv() => match r { + Ok(id) => WriterRequest::Message(MessageOwned::Have(id.get())), + Err(tokio::sync::broadcast::error::RecvError::Closed) => anyhow::bail!("closing writer, broadcast channel closed"), + _ => continue + }, + r = timeout(keep_alive_interval, outgoing_chan.recv()) => match r { + Ok(Some(msg)) => msg, + Ok(None) => { + anyhow::bail!("closing writer, channel closed"); + } + Err(_) => WriterRequest::Message(MessageOwned::KeepAlive), + } + }; }; let mut uploaded_add = None; diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index 2c752cb..f7590eb 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -51,7 +51,8 @@ pub(crate) async fn read_metainfo_from_peer( ); let result_reader = async move { result_rx.await? }; - let connection_runner = async move { connection.manage_peer_outgoing(writer_rx).await }; + let (_, brx) = tokio::sync::broadcast::channel(1); + let connection_runner = async move { connection.manage_peer_outgoing(writer_rx, brx).await }; tokio::select! { result = result_reader => result, diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 5083106..ed72000 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -179,6 +179,7 @@ pub struct TorrentStateLive { cancellation_token: CancellationToken, pub(crate) streams: Arc, + have_broadcast_tx: tokio::sync::broadcast::Sender, } impl TorrentStateLive { @@ -211,6 +212,8 @@ impl TorrentStateLive { let defer_writes = paused.info.options.disk_write_queue.is_some(); + let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128); + let state = Arc::new(TorrentStateLive { meta: paused.info.clone(), peers: Default::default(), @@ -233,6 +236,7 @@ impl TorrentStateLive { down_speed_estimator, up_speed_estimator, cancellation_token, + have_broadcast_tx, streams: paused.streams, per_piece_locks: if defer_writes { (0..lengths.total_pieces()) @@ -391,7 +395,8 @@ impl TorrentStateLive { rx, checked_peer.read_buf, checked_peer.handshake, - checked_peer.stream + checked_peer.stream, + self.have_broadcast_tx.subscribe() ) => {r} }; @@ -452,7 +457,7 @@ impl TorrentStateLive { .fetch_add(1, Ordering::Relaxed); let res = tokio::select! { r = requester => {r} - r = peer_connection.manage_peer_outgoing(rx) => {r} + r = peer_connection.manage_peer_outgoing(rx, state.have_broadcast_tx.subscribe()) => {r} }; match res { @@ -553,33 +558,7 @@ impl TorrentStateLive { } fn maybe_transmit_haves(&self, index: ValidPieceIndex) { - for pe in self.peers.states.iter() { - match &pe.value().state.get() { - PeerState::Live(live) => { - if !live.peer_interested { - continue; - } - - if live - .bitfield - .get(index.get() as usize) - .map(|v| *v) - .unwrap_or(false) - { - continue; - } - - if live - .tx - .send(WriterRequest::Message(Message::Have(index.get()))) - .is_err() - { - // whatever - } - } - _ => continue, - } - } + let _ = self.have_broadcast_tx.send(index); } pub(crate) fn add_peer_if_not_seen(&self, addr: SocketAddr) -> anyhow::Result {