Optimize on transmit haves

This commit is contained in:
Igor Katson 2021-06-28 16:55:50 +01:00
parent 14b62b45c5
commit b4b22ea9a4
2 changed files with 70 additions and 33 deletions

View file

@ -202,12 +202,7 @@ impl PeerConnection {
Message::Bitfield(b) => self.on_bitfield(handle, b.clone_to_owned()).await?, Message::Bitfield(b) => self.on_bitfield(handle, b.clone_to_owned()).await?,
Message::Choke => self.on_i_am_choked(handle), Message::Choke => self.on_i_am_choked(handle),
Message::Unchoke => self.on_i_am_unchoked(handle), Message::Unchoke => self.on_i_am_unchoked(handle),
Message::Interested => { Message::Interested => self.on_peer_interested(handle),
warn!(
"{} is interested, but support for interested messages not implemented",
handle
)
}
Message::Piece(piece) => { Message::Piece(piece) => {
self.on_received_piece(handle, piece) self.on_received_piece(handle, piece)
.context("error in on_received_piece()")?; .context("error in on_received_piece()")?;
@ -368,6 +363,15 @@ impl PeerConnection {
.mark_i_am_choked(handle, true); .mark_i_am_choked(handle, true);
} }
fn on_peer_interested(&self, handle: PeerHandle) {
debug!("peer {} is interested", handle);
self.state
.locked
.write()
.peers
.mark_peer_interested(handle, true);
}
async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> { async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> {
let notify = match self.state.locked.read().peers.get_live(handle) { let notify = match self.state.locked.read().peers.get_live(handle) {
Some(l) => l.have_notify.clone(), Some(l) => l.have_notify.clone(),
@ -573,11 +577,7 @@ impl PeerConnection {
index, handle index, handle
); );
let state_clone = self.state.clone(); self.state.maybe_transmit_haves(chunk_info.piece_index);
let index = piece.index;
spawn("transmit haves", async move {
state_clone.task_transmit_haves(index).await
});
} }
false => { false => {
warn!( warn!(

View file

@ -9,7 +9,7 @@ use std::{
}; };
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{stream::FuturesUnordered, StreamExt};
use log::warn; use log::{trace, warn};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
@ -20,6 +20,7 @@ use crate::{
peer_binary_protocol::{Handshake, Message}, peer_binary_protocol::{Handshake, Message},
peer_connection::WriterRequest, peer_connection::WriterRequest,
peer_state::{LivePeerState, PeerState}, peer_state::{LivePeerState, PeerState},
spawn_utils::spawn,
torrent_metainfo::TorrentMetaV1Owned, torrent_metainfo::TorrentMetaV1Owned,
type_aliases::{PeerHandle, BF}, type_aliases::{PeerHandle, BF},
}; };
@ -117,6 +118,16 @@ impl PeerStates {
live.i_am_choked = is_choked; live.i_am_choked = is_choked;
Some(prev) Some(prev)
} }
pub fn mark_peer_interested(
&mut self,
handle: PeerHandle,
is_interested: bool,
) -> Option<bool> {
let live = self.get_live_mut(handle)?;
let prev = live.peer_interested;
live.peer_interested = is_interested;
Some(prev)
}
pub fn update_bitfield_from_vec( pub fn update_bitfield_from_vec(
&mut self, &mut self,
handle: PeerHandle, handle: PeerHandle,
@ -264,22 +275,35 @@ impl TorrentState {
self.needed - self.get_downloaded() self.needed - self.get_downloaded()
} }
// TODO: this is a task per chunk, not good pub fn maybe_transmit_haves(&self, index: ValidPieceIndex) {
pub async fn task_transmit_haves(&self, index: u32) -> anyhow::Result<()> {
let mut unordered = FuturesUnordered::new(); let mut unordered = FuturesUnordered::new();
for weak in self let g = self.locked.read();
.locked for (handle, peer_state) in g.peers.states.iter() {
.read() match peer_state {
.peers PeerState::Live(live) => {
.tx if !live.peer_interested {
.values() continue;
.map(|v| Arc::downgrade(v)) }
if live
.bitfield
.as_ref()
.and_then(|b| b.get(index.get() as usize).map(|v| *v))
.unwrap_or(false)
{ {
continue;
}
let tx = match g.peers.tx.get(handle) {
Some(tx) => tx,
None => continue,
};
let tx = Arc::downgrade(tx);
unordered.push(async move { unordered.push(async move {
if let Some(tx) = weak.upgrade() { if let Some(tx) = tx.upgrade() {
if tx if tx
.send(WriterRequest::Message(Message::Have(index))) .send(WriterRequest::Message(Message::Have(index.get())))
.await .await
.is_err() .is_err()
{ {
@ -288,8 +312,21 @@ impl TorrentState {
} }
}); });
} }
_ => continue,
}
}
if unordered.is_empty() {
trace!("no peers to transmit Have={} to, saving some work", index);
return;
}
spawn(
format!("transmit_haves(piece={}, count={})", index, unordered.len()),
async move {
while unordered.next().await.is_some() {} while unordered.next().await.is_some() {}
Ok(()) Ok(())
},
);
} }
} }