Make a stream merging function

This commit is contained in:
Igor Katson 2024-03-02 11:12:08 +00:00
parent 3a1d0c3ac9
commit 466cad06e0
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -24,7 +24,7 @@ use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::{ByteBuf, ByteBufT, ByteString};
use clone_to_owned::CloneToOwned;
use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig};
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, TryFutureExt};
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, Stream, TryFutureExt};
use itertools::Itertools;
use librqbit_core::{
directories::get_configuration_directory,
@ -246,6 +246,18 @@ fn compute_only_files(
}
}
fn merge_two_optional_streams<T>(
s1: Option<impl Stream<Item = T> + Send + Unpin + 'static>,
s2: Option<impl Stream<Item = T> + Send + Unpin + 'static>,
) -> Option<Box<dyn Stream<Item = T> + Send + Unpin + 'static>> {
match (s1, s2) {
(Some(s1), None) => Some(Box::new(s1)),
(None, Some(s2)) => Some(Box::new(s2)),
(Some(s1), Some(s2)) => Some(Box::new(s1.chain(s2))),
(None, None) => None,
}
}
/// Options for adding new torrents to the session.
#[serde_as]
#[derive(Default, Clone, Serialize, Deserialize)]
@ -1017,18 +1029,14 @@ impl Session {
};
// Merge "initial_peers" and "peer_rx" into one stream.
let peer_rx: Option<PeerStream> = if !initial_peers.is_empty() || peer_rx.is_some() {
use futures::future::Either;
Some(Box::new(
futures::stream::iter(initial_peers).chain(
peer_rx
.map(Either::Left)
.unwrap_or(Either::Right(futures::stream::empty())),
),
))
} else {
peer_rx
};
let peer_rx = merge_two_optional_streams(
if !initial_peers.is_empty() {
Some(futures::stream::iter(initial_peers.into_iter()))
} else {
None
},
peer_rx,
);
{
let span = managed_torrent.info.span.clone();
@ -1111,13 +1119,7 @@ impl Session {
announce_port,
);
// Merge DHT rx and tracker comms peer rx.
match (dht_rx, peer_rx) {
(Some(dht_rx), None) => Ok(Some(Box::new(dht_rx))),
(None, Some(peer_rx)) => Ok(Some(Box::new(peer_rx))),
(None, None) => Ok(None),
(Some(dht_rx), Some(peer_rx)) => Ok(Some(Box::new(dht_rx.merge(peer_rx)))),
}
Ok(merge_two_optional_streams(dht_rx, peer_rx))
}
pub fn unpause(self: &Arc<Self>, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {