diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index d43429d..2801d3c 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -33,6 +33,7 @@ pub trait PeerConnectionHandler { extended_handshake: &ExtendedHandshake, ) -> anyhow::Result<()>; async fn on_received_message(&self, msg: Message>) -> anyhow::Result<()>; + fn should_transmit_have(&self, id: ValidPieceIndex) -> bool; fn on_uploaded_bytes(&self, bytes: u32); fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>; } @@ -265,7 +266,13 @@ impl PeerConnection { let req = loop { break tokio::select! { r = have_broadcast.recv() => match r { - Ok(id) => WriterRequest::Message(MessageOwned::Have(id.get())), + Ok(id) => { + if self.handler.should_transmit_have(id) { + WriterRequest::Message(MessageOwned::Have(id.get())) + } else { + continue + } + }, Err(tokio::sync::broadcast::error::RecvError::Closed) => anyhow::bail!("closing writer, broadcast channel closed"), _ => continue }, diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index f7590eb..28e2232 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -226,6 +226,10 @@ impl PeerConnectionHandler for Handler { } Ok(()) } + + fn should_transmit_have(&self, _id: librqbit_core::lengths::ValidPieceIndex) -> bool { + false + } } #[cfg(test)] diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ed72000..a680c30 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -557,7 +557,7 @@ impl TorrentStateLive { .map(|c| *c.get_hns()) } - fn maybe_transmit_haves(&self, index: ValidPieceIndex) { + fn transmit_haves(&self, index: ValidPieceIndex) { let _ = self.have_broadcast_tx.send(index); } @@ -840,6 +840,17 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { fn get_have_bytes(&self) -> u64 { self.state.get_approx_have_bytes() } + + fn should_transmit_have(&self, id: ValidPieceIndex) -> bool { + let have = self + .state + .peers + .with_live(self.addr, |l| { + l.bitfield.get(id.get_usize()).map(|p| *p).unwrap_or(true) + }) + .unwrap_or(true); + !have + } } impl PeerHandler { @@ -1449,7 +1460,7 @@ impl PeerHandler { state.on_piece_completed(chunk_info.piece_index)?; - state.maybe_transmit_haves(chunk_info.piece_index); + state.transmit_haves(chunk_info.piece_index); } false => { warn!(