diff --git a/crates/librqbit/src/merge_streams.rs b/crates/librqbit/src/merge_streams.rs index 35205d4..d2ada2c 100644 --- a/crates/librqbit/src/merge_streams.rs +++ b/crates/librqbit/src/merge_streams.rs @@ -1,15 +1,4 @@ -use std::{ - sync::atomic::{AtomicU64, Ordering}, - task::Poll, -}; - -use futures::Stream; - -struct MergedStreams { - poll_count: AtomicU64, - s1: futures::stream::Fuse, - s2: futures::stream::Fuse, -} +use futures::stream::Stream; pub fn merge_streams< I, @@ -19,52 +8,6 @@ pub fn merge_streams< s1: S1, s2: S2, ) -> impl Stream + Unpin + 'static { - use futures::stream::StreamExt; - MergedStreams { - poll_count: AtomicU64::new(0), - s1: s1.fuse(), - s2: s2.fuse(), - } -} - -fn poll_two + Unpin, S2: Stream + Unpin>( - s1: &mut S1, - s2: &mut S2, - cx: &mut std::task::Context<'_>, -) -> Poll> { - use futures::StreamExt; - let s1p = s1.poll_next_unpin(cx); - match s1p { - Poll::Ready(Some(v)) => Poll::Ready(Some(v)), - Poll::Ready(None) => match s2.poll_next_unpin(cx) { - Poll::Ready(Some(v)) => Poll::Ready(Some(v)), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - }, - Poll::Pending => match s2.poll_next_unpin(cx) { - Poll::Ready(Some(v)) => Poll::Ready(Some(v)), - Poll::Ready(None) | Poll::Pending => Poll::Pending, - }, - } -} - -impl Stream for MergedStreams -where - S1: Stream + Unpin, - S2: Stream + Unpin, -{ - type Item = I; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - let this = self.get_mut(); - let s1_first = this.poll_count.fetch_add(1, Ordering::Relaxed) % 2 == 0; - if s1_first { - poll_two(&mut this.s1, &mut this.s2, cx) - } else { - poll_two(&mut this.s2, &mut this.s1, cx) - } - } + use tokio_stream::StreamExt; + s1.merge(s2) }