Reimplement maybe_transmit_haves with a broadcast channel
This commit is contained in:
parent
7ed7e277c0
commit
44aa75f34b
3 changed files with 35 additions and 37 deletions
|
|
@ -6,7 +6,11 @@ use std::{
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use buffers::{ByteBuf, ByteBufOwned};
|
use buffers::{ByteBuf, ByteBufOwned};
|
||||||
use clone_to_owned::CloneToOwned;
|
use clone_to_owned::CloneToOwned;
|
||||||
use librqbit_core::{hash_id::Id20, lengths::ChunkInfo, peer_id::try_decode_peer_id};
|
use librqbit_core::{
|
||||||
|
hash_id::Id20,
|
||||||
|
lengths::{ChunkInfo, ValidPieceIndex},
|
||||||
|
peer_id::try_decode_peer_id,
|
||||||
|
};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use peer_binary_protocol::{
|
use peer_binary_protocol::{
|
||||||
extended::{handshake::ExtendedHandshake, ExtendedMessage},
|
extended::{handshake::ExtendedHandshake, ExtendedMessage},
|
||||||
|
|
@ -102,6 +106,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
read_buf: ReadBuf,
|
read_buf: ReadBuf,
|
||||||
handshake: Handshake<ByteBufOwned>,
|
handshake: Handshake<ByteBufOwned>,
|
||||||
mut conn: tokio::net::TcpStream,
|
mut conn: tokio::net::TcpStream,
|
||||||
|
have_broadcast: tokio::sync::broadcast::Receiver<ValidPieceIndex>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
|
@ -141,6 +146,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
write_buf,
|
write_buf,
|
||||||
conn,
|
conn,
|
||||||
outgoing_chan,
|
outgoing_chan,
|
||||||
|
have_broadcast,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
@ -148,6 +154,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
pub async fn manage_peer_outgoing(
|
pub async fn manage_peer_outgoing(
|
||||||
&self,
|
&self,
|
||||||
outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
|
outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
|
||||||
|
have_broadcast: tokio::sync::broadcast::Receiver<ValidPieceIndex>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
let rwtimeout = self
|
let rwtimeout = self
|
||||||
|
|
@ -200,6 +207,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
write_buf,
|
write_buf,
|
||||||
conn,
|
conn,
|
||||||
outgoing_chan,
|
outgoing_chan,
|
||||||
|
have_broadcast,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
@ -211,6 +219,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
mut write_buf: Vec<u8>,
|
mut write_buf: Vec<u8>,
|
||||||
mut conn: tokio::net::TcpStream,
|
mut conn: tokio::net::TcpStream,
|
||||||
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
|
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
|
||||||
|
mut have_broadcast: tokio::sync::broadcast::Receiver<ValidPieceIndex>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
|
@ -253,12 +262,21 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let req = match timeout(keep_alive_interval, outgoing_chan.recv()).await {
|
let req = loop {
|
||||||
Ok(Some(msg)) => msg,
|
break tokio::select! {
|
||||||
Ok(None) => {
|
r = have_broadcast.recv() => match r {
|
||||||
anyhow::bail!("closing writer, channel closed")
|
Ok(id) => WriterRequest::Message(MessageOwned::Have(id.get())),
|
||||||
}
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => anyhow::bail!("closing writer, broadcast channel closed"),
|
||||||
Err(_) => WriterRequest::Message(MessageOwned::KeepAlive),
|
_ => continue
|
||||||
|
},
|
||||||
|
r = timeout(keep_alive_interval, outgoing_chan.recv()) => match r {
|
||||||
|
Ok(Some(msg)) => msg,
|
||||||
|
Ok(None) => {
|
||||||
|
anyhow::bail!("closing writer, channel closed");
|
||||||
|
}
|
||||||
|
Err(_) => WriterRequest::Message(MessageOwned::KeepAlive),
|
||||||
|
}
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut uploaded_add = None;
|
let mut uploaded_add = None;
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,8 @@ pub(crate) async fn read_metainfo_from_peer(
|
||||||
);
|
);
|
||||||
|
|
||||||
let result_reader = async move { result_rx.await? };
|
let result_reader = async move { result_rx.await? };
|
||||||
let connection_runner = async move { connection.manage_peer_outgoing(writer_rx).await };
|
let (_, brx) = tokio::sync::broadcast::channel(1);
|
||||||
|
let connection_runner = async move { connection.manage_peer_outgoing(writer_rx, brx).await };
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = result_reader => result,
|
result = result_reader => result,
|
||||||
|
|
|
||||||
|
|
@ -179,6 +179,7 @@ pub struct TorrentStateLive {
|
||||||
cancellation_token: CancellationToken,
|
cancellation_token: CancellationToken,
|
||||||
|
|
||||||
pub(crate) streams: Arc<TorrentStreams>,
|
pub(crate) streams: Arc<TorrentStreams>,
|
||||||
|
have_broadcast_tx: tokio::sync::broadcast::Sender<ValidPieceIndex>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TorrentStateLive {
|
impl TorrentStateLive {
|
||||||
|
|
@ -211,6 +212,8 @@ impl TorrentStateLive {
|
||||||
|
|
||||||
let defer_writes = paused.info.options.disk_write_queue.is_some();
|
let defer_writes = paused.info.options.disk_write_queue.is_some();
|
||||||
|
|
||||||
|
let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128);
|
||||||
|
|
||||||
let state = Arc::new(TorrentStateLive {
|
let state = Arc::new(TorrentStateLive {
|
||||||
meta: paused.info.clone(),
|
meta: paused.info.clone(),
|
||||||
peers: Default::default(),
|
peers: Default::default(),
|
||||||
|
|
@ -233,6 +236,7 @@ impl TorrentStateLive {
|
||||||
down_speed_estimator,
|
down_speed_estimator,
|
||||||
up_speed_estimator,
|
up_speed_estimator,
|
||||||
cancellation_token,
|
cancellation_token,
|
||||||
|
have_broadcast_tx,
|
||||||
streams: paused.streams,
|
streams: paused.streams,
|
||||||
per_piece_locks: if defer_writes {
|
per_piece_locks: if defer_writes {
|
||||||
(0..lengths.total_pieces())
|
(0..lengths.total_pieces())
|
||||||
|
|
@ -391,7 +395,8 @@ impl TorrentStateLive {
|
||||||
rx,
|
rx,
|
||||||
checked_peer.read_buf,
|
checked_peer.read_buf,
|
||||||
checked_peer.handshake,
|
checked_peer.handshake,
|
||||||
checked_peer.stream
|
checked_peer.stream,
|
||||||
|
self.have_broadcast_tx.subscribe()
|
||||||
) => {r}
|
) => {r}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -452,7 +457,7 @@ impl TorrentStateLive {
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
let res = tokio::select! {
|
let res = tokio::select! {
|
||||||
r = requester => {r}
|
r = requester => {r}
|
||||||
r = peer_connection.manage_peer_outgoing(rx) => {r}
|
r = peer_connection.manage_peer_outgoing(rx, state.have_broadcast_tx.subscribe()) => {r}
|
||||||
};
|
};
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
|
|
@ -553,33 +558,7 @@ impl TorrentStateLive {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn maybe_transmit_haves(&self, index: ValidPieceIndex) {
|
fn maybe_transmit_haves(&self, index: ValidPieceIndex) {
|
||||||
for pe in self.peers.states.iter() {
|
let _ = self.have_broadcast_tx.send(index);
|
||||||
match &pe.value().state.get() {
|
|
||||||
PeerState::Live(live) => {
|
|
||||||
if !live.peer_interested {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if live
|
|
||||||
.bitfield
|
|
||||||
.get(index.get() as usize)
|
|
||||||
.map(|v| *v)
|
|
||||||
.unwrap_or(false)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if live
|
|
||||||
.tx
|
|
||||||
.send(WriterRequest::Message(Message::Have(index.get())))
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
// whatever
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => continue,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn add_peer_if_not_seen(&self, addr: SocketAddr) -> anyhow::Result<bool> {
|
pub(crate) fn add_peer_if_not_seen(&self, addr: SocketAddr) -> anyhow::Result<bool> {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue