diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index bcfb120..ac37de2 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -24,7 +24,7 @@ use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use buffers::{ByteBuf, ByteBufT, ByteString}; use clone_to_owned::CloneToOwned; use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig}; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, TryFutureExt}; +use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, Stream, TryFutureExt}; use itertools::Itertools; use librqbit_core::{ directories::get_configuration_directory, @@ -246,6 +246,18 @@ fn compute_only_files( } } +fn merge_two_optional_streams( + s1: Option + Send + Unpin + 'static>, + s2: Option + Send + Unpin + 'static>, +) -> Option + Send + Unpin + 'static>> { + match (s1, s2) { + (Some(s1), None) => Some(Box::new(s1)), + (None, Some(s2)) => Some(Box::new(s2)), + (Some(s1), Some(s2)) => Some(Box::new(s1.chain(s2))), + (None, None) => None, + } +} + /// Options for adding new torrents to the session. #[serde_as] #[derive(Default, Clone, Serialize, Deserialize)] @@ -1017,18 +1029,14 @@ impl Session { }; // Merge "initial_peers" and "peer_rx" into one stream. - let peer_rx: Option = if !initial_peers.is_empty() || peer_rx.is_some() { - use futures::future::Either; - Some(Box::new( - futures::stream::iter(initial_peers).chain( - peer_rx - .map(Either::Left) - .unwrap_or(Either::Right(futures::stream::empty())), - ), - )) - } else { - peer_rx - }; + let peer_rx = merge_two_optional_streams( + if !initial_peers.is_empty() { + Some(futures::stream::iter(initial_peers.into_iter())) + } else { + None + }, + peer_rx, + ); { let span = managed_torrent.info.span.clone(); @@ -1111,13 +1119,7 @@ impl Session { announce_port, ); - // Merge DHT rx and tracker comms peer rx. - match (dht_rx, peer_rx) { - (Some(dht_rx), None) => Ok(Some(Box::new(dht_rx))), - (None, Some(peer_rx)) => Ok(Some(Box::new(peer_rx))), - (None, None) => Ok(None), - (Some(dht_rx), Some(peer_rx)) => Ok(Some(Box::new(dht_rx.merge(peer_rx)))), - } + Ok(merge_two_optional_streams(dht_rx, peer_rx)) } pub fn unpause(self: &Arc, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {