Dont spam haves
This commit is contained in:
parent
44aa75f34b
commit
8a86d04631
3 changed files with 25 additions and 3 deletions
|
|
@ -33,6 +33,7 @@ pub trait PeerConnectionHandler {
|
||||||
extended_handshake: &ExtendedHandshake<ByteBuf>,
|
extended_handshake: &ExtendedHandshake<ByteBuf>,
|
||||||
) -> anyhow::Result<()>;
|
) -> anyhow::Result<()>;
|
||||||
async fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()>;
|
async fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()>;
|
||||||
|
fn should_transmit_have(&self, id: ValidPieceIndex) -> bool;
|
||||||
fn on_uploaded_bytes(&self, bytes: u32);
|
fn on_uploaded_bytes(&self, bytes: u32);
|
||||||
fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>;
|
fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>;
|
||||||
}
|
}
|
||||||
|
|
@ -265,7 +266,13 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
let req = loop {
|
let req = loop {
|
||||||
break tokio::select! {
|
break tokio::select! {
|
||||||
r = have_broadcast.recv() => match r {
|
r = have_broadcast.recv() => match r {
|
||||||
Ok(id) => WriterRequest::Message(MessageOwned::Have(id.get())),
|
Ok(id) => {
|
||||||
|
if self.handler.should_transmit_have(id) {
|
||||||
|
WriterRequest::Message(MessageOwned::Have(id.get()))
|
||||||
|
} else {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
},
|
||||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => anyhow::bail!("closing writer, broadcast channel closed"),
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => anyhow::bail!("closing writer, broadcast channel closed"),
|
||||||
_ => continue
|
_ => continue
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -226,6 +226,10 @@ impl PeerConnectionHandler for Handler {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn should_transmit_have(&self, _id: librqbit_core::lengths::ValidPieceIndex) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
||||||
|
|
@ -557,7 +557,7 @@ impl TorrentStateLive {
|
||||||
.map(|c| *c.get_hns())
|
.map(|c| *c.get_hns())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn maybe_transmit_haves(&self, index: ValidPieceIndex) {
|
fn transmit_haves(&self, index: ValidPieceIndex) {
|
||||||
let _ = self.have_broadcast_tx.send(index);
|
let _ = self.have_broadcast_tx.send(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -840,6 +840,17 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
||||||
fn get_have_bytes(&self) -> u64 {
|
fn get_have_bytes(&self) -> u64 {
|
||||||
self.state.get_approx_have_bytes()
|
self.state.get_approx_have_bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn should_transmit_have(&self, id: ValidPieceIndex) -> bool {
|
||||||
|
let have = self
|
||||||
|
.state
|
||||||
|
.peers
|
||||||
|
.with_live(self.addr, |l| {
|
||||||
|
l.bitfield.get(id.get_usize()).map(|p| *p).unwrap_or(true)
|
||||||
|
})
|
||||||
|
.unwrap_or(true);
|
||||||
|
!have
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerHandler {
|
impl PeerHandler {
|
||||||
|
|
@ -1449,7 +1460,7 @@ impl PeerHandler {
|
||||||
|
|
||||||
state.on_piece_completed(chunk_info.piece_index)?;
|
state.on_piece_completed(chunk_info.piece_index)?;
|
||||||
|
|
||||||
state.maybe_transmit_haves(chunk_info.piece_index);
|
state.transmit_haves(chunk_info.piece_index);
|
||||||
}
|
}
|
||||||
false => {
|
false => {
|
||||||
warn!(
|
warn!(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue