From b4b22ea9a41a2471972a35fa9e784f6ec43c736a Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 28 Jun 2021 16:55:50 +0100 Subject: [PATCH] Optimize on transmit haves --- crates/librqbit/src/peer_connection.rs | 22 +++---- crates/librqbit/src/torrent_state.rs | 81 +++++++++++++++++++------- 2 files changed, 70 insertions(+), 33 deletions(-) diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index dffc770..0f59713 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -202,12 +202,7 @@ impl PeerConnection { Message::Bitfield(b) => self.on_bitfield(handle, b.clone_to_owned()).await?, Message::Choke => self.on_i_am_choked(handle), Message::Unchoke => self.on_i_am_unchoked(handle), - Message::Interested => { - warn!( - "{} is interested, but support for interested messages not implemented", - handle - ) - } + Message::Interested => self.on_peer_interested(handle), Message::Piece(piece) => { self.on_received_piece(handle, piece) .context("error in on_received_piece()")?; @@ -368,6 +363,15 @@ impl PeerConnection { .mark_i_am_choked(handle, true); } + fn on_peer_interested(&self, handle: PeerHandle) { + debug!("peer {} is interested", handle); + self.state + .locked + .write() + .peers + .mark_peer_interested(handle, true); + } + async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> { let notify = match self.state.locked.read().peers.get_live(handle) { Some(l) => l.have_notify.clone(), @@ -573,11 +577,7 @@ impl PeerConnection { index, handle ); - let state_clone = self.state.clone(); - let index = piece.index; - spawn("transmit haves", async move { - state_clone.task_transmit_haves(index).await - }); + self.state.maybe_transmit_haves(chunk_info.piece_index); } false => { warn!( diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index faa25f0..5782777 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -9,7 +9,7 @@ use std::{ }; use futures::{stream::FuturesUnordered, StreamExt}; -use log::warn; +use log::{trace, warn}; use parking_lot::{Mutex, RwLock}; use tokio::sync::mpsc::Sender; @@ -20,6 +20,7 @@ use crate::{ peer_binary_protocol::{Handshake, Message}, peer_connection::WriterRequest, peer_state::{LivePeerState, PeerState}, + spawn_utils::spawn, torrent_metainfo::TorrentMetaV1Owned, type_aliases::{PeerHandle, BF}, }; @@ -117,6 +118,16 @@ impl PeerStates { live.i_am_choked = is_choked; Some(prev) } + pub fn mark_peer_interested( + &mut self, + handle: PeerHandle, + is_interested: bool, + ) -> Option { + let live = self.get_live_mut(handle)?; + let prev = live.peer_interested; + live.peer_interested = is_interested; + Some(prev) + } pub fn update_bitfield_from_vec( &mut self, handle: PeerHandle, @@ -264,32 +275,58 @@ impl TorrentState { self.needed - self.get_downloaded() } - // TODO: this is a task per chunk, not good - pub async fn task_transmit_haves(&self, index: u32) -> anyhow::Result<()> { + pub fn maybe_transmit_haves(&self, index: ValidPieceIndex) { let mut unordered = FuturesUnordered::new(); - for weak in self - .locked - .read() - .peers - .tx - .values() - .map(|v| Arc::downgrade(v)) - { - unordered.push(async move { - if let Some(tx) = weak.upgrade() { - if tx - .send(WriterRequest::Message(Message::Have(index))) - .await - .is_err() - { - // whatever + let g = self.locked.read(); + for (handle, peer_state) in g.peers.states.iter() { + match peer_state { + PeerState::Live(live) => { + if !live.peer_interested { + continue; } + + if live + .bitfield + .as_ref() + .and_then(|b| b.get(index.get() as usize).map(|v| *v)) + .unwrap_or(false) + { + continue; + } + + let tx = match g.peers.tx.get(handle) { + Some(tx) => tx, + None => continue, + }; + let tx = Arc::downgrade(tx); + unordered.push(async move { + if let Some(tx) = tx.upgrade() { + if tx + .send(WriterRequest::Message(Message::Have(index.get()))) + .await + .is_err() + { + // whatever + } + } + }); } - }); + _ => continue, + } } - while unordered.next().await.is_some() {} - Ok(()) + if unordered.is_empty() { + trace!("no peers to transmit Have={} to, saving some work", index); + return; + } + + spawn( + format!("transmit_haves(piece={}, count={})", index, unordered.len()), + async move { + while unordered.next().await.is_some() {} + Ok(()) + }, + ); } }