Fix merging DHT and tracker streams

This commit is contained in:
Igor Katson 2024-04-29 13:57:29 +01:00
parent 9f806d7adb
commit 7c83240a1a
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 80 additions and 3 deletions

View file

@ -33,6 +33,7 @@ mod dht_utils;
mod file_ops;
pub mod http_api;
pub mod http_api_client;
mod merge_streams;
mod opened_file;
mod peer_connection;
mod peer_info_reader;

View file

@ -0,0 +1,61 @@
use std::{
sync::atomic::{AtomicU64, Ordering},
task::Poll,
};
use futures::Stream;
struct MergedStreams<S1, S2> {
poll_count: AtomicU64,
s1: S1,
s2: S2,
}
pub fn merge_streams<
I,
S1: Stream<Item = I> + 'static + Unpin,
S2: Stream<Item = I> + 'static + Unpin,
>(
s1: S1,
s2: S2,
) -> impl Stream<Item = I> + Unpin + 'static {
MergedStreams {
poll_count: AtomicU64::new(0),
s1,
s2,
}
}
fn poll_two<I, S1: Stream<Item = I> + Unpin, S2: Stream<Item = I> + Unpin>(
s1: &mut S1,
s2: &mut S2,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<I>> {
use futures::StreamExt;
let s1p = s1.poll_next_unpin(cx);
match s1p {
Poll::Ready(r) => Poll::Ready(r),
Poll::Pending => s2.poll_next_unpin(cx),
}
}
impl<S1, S2, I> Stream for MergedStreams<S1, S2>
where
S1: Stream<Item = I> + Unpin,
S2: Stream<Item = I> + Unpin,
{
type Item = I;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let s1_first = this.poll_count.fetch_add(1, Ordering::Relaxed) % 2 == 0;
if s1_first {
poll_two(&mut this.s1, &mut this.s2, cx)
} else {
poll_two(&mut this.s2, &mut this.s1, cx)
}
}
}

View file

@ -11,6 +11,7 @@ use std::{
use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
merge_streams::merge_streams,
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
spawn_utils::BlockingSpawner,
@ -254,13 +255,13 @@ fn compute_only_files(
}
fn merge_two_optional_streams<T>(
s1: Option<impl Stream<Item = T> + Send + 'static>,
s2: Option<impl Stream<Item = T> + Send + 'static>,
s1: Option<impl Stream<Item = T> + Unpin + Send + 'static>,
s2: Option<impl Stream<Item = T> + Unpin + Send + 'static>,
) -> Option<BoxStream<'static, T>> {
match (s1, s2) {
(Some(s1), None) => Some(Box::pin(s1)),
(None, Some(s2)) => Some(Box::pin(s2)),
(Some(s1), Some(s2)) => Some(Box::pin(s1.chain(s2))),
(Some(s1), Some(s2)) => Some(Box::pin(merge_streams(s1, s2))),
(None, None) => None,
}
}