Merge pull request #127 from ikatson/fix-merge-streams

Fix merging DHT and tracker streams
This commit is contained in:
Igor Katson 2024-04-29 19:42:13 +04:00 committed by GitHub
commit d664b71625
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 80 additions and 3 deletions

View file

@ -33,6 +33,7 @@ mod dht_utils;
mod file_ops;
pub mod http_api;
pub mod http_api_client;
mod merge_streams;
mod opened_file;
mod peer_connection;
mod peer_info_reader;

View file

@ -0,0 +1,61 @@
use std::{
sync::atomic::{AtomicU64, Ordering},
task::Poll,
};
use futures::Stream;
struct MergedStreams<S1, S2> {
poll_count: AtomicU64,
s1: S1,
s2: S2,
}
pub fn merge_streams<
I,
S1: Stream<Item = I> + 'static + Unpin,
S2: Stream<Item = I> + 'static + Unpin,
>(
s1: S1,
s2: S2,
) -> impl Stream<Item = I> + Unpin + 'static {
MergedStreams {
poll_count: AtomicU64::new(0),
s1,
s2,
}
}
fn poll_two<I, S1: Stream<Item = I> + Unpin, S2: Stream<Item = I> + Unpin>(
s1: &mut S1,
s2: &mut S2,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<I>> {
use futures::StreamExt;
let s1p = s1.poll_next_unpin(cx);
match s1p {
Poll::Ready(r) => Poll::Ready(r),
Poll::Pending => s2.poll_next_unpin(cx),
}
}
impl<S1, S2, I> Stream for MergedStreams<S1, S2>
where
S1: Stream<Item = I> + Unpin,
S2: Stream<Item = I> + Unpin,
{
type Item = I;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let s1_first = this.poll_count.fetch_add(1, Ordering::Relaxed) % 2 == 0;
if s1_first {
poll_two(&mut this.s1, &mut this.s2, cx)
} else {
poll_two(&mut this.s2, &mut this.s1, cx)
}
}
}

View file

@ -11,6 +11,7 @@ use std::{
use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
merge_streams::merge_streams,
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
spawn_utils::BlockingSpawner,
@ -254,13 +255,13 @@ fn compute_only_files(
}
fn merge_two_optional_streams<T>(
s1: Option<impl Stream<Item = T> + Send + 'static>,
s2: Option<impl Stream<Item = T> + Send + 'static>,
s1: Option<impl Stream<Item = T> + Unpin + Send + 'static>,
s2: Option<impl Stream<Item = T> + Unpin + Send + 'static>,
) -> Option<BoxStream<'static, T>> {
match (s1, s2) {
(Some(s1), None) => Some(Box::pin(s1)),
(None, Some(s2)) => Some(Box::pin(s2)),
(Some(s1), Some(s2)) => Some(Box::pin(s1.chain(s2))),
(Some(s1), Some(s2)) => Some(Box::pin(merge_streams(s1, s2))),
(None, None) => None,
}
}

View file

@ -77,6 +77,15 @@ enum SupportedTracker {
Http(Url),
}
impl std::fmt::Debug for SupportedTracker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SupportedTracker::Udp(u) => std::fmt::Display::fmt(u, f),
SupportedTracker::Http(u) => std::fmt::Display::fmt(u, f),
}
}
}
impl TrackerComms {
pub fn start(
info_hash: Id20,
@ -104,9 +113,12 @@ impl TrackerComms {
})
.collect::<Vec<_>>();
if trackers.is_empty() {
debug!(?info_hash, "trackers list is empty");
return None;
}
tracing::trace!(?trackers);
let (tx, mut rx) = tokio::sync::mpsc::channel::<SocketAddr>(16);
let s = async_stream::stream! {
@ -173,6 +185,7 @@ impl TrackerComms {
async fn task_single_tracker_monitor_http(&self, mut tracker_url: Url) -> anyhow::Result<()> {
let mut event = Some(tracker_comms_http::TrackerRequestEvent::Started);
trace!(url=?tracker_url, "starting monitor");
loop {
let stats = self.stats.get();
let request = tracker_comms_http::TrackerRequest {
@ -216,6 +229,7 @@ impl TrackerComms {
}
async fn tracker_one_request_http(&self, tracker_url: Url) -> anyhow::Result<u64> {
debug!(url = ?tracker_url, "calling tracker over http");
let response: reqwest::Response = reqwest::get(tracker_url).await?;
if !response.status().is_success() {
anyhow::bail!("tracker responded with {:?}", response.status());