Merge pull request #240 from ikatson/merge-streams-reuse
[Refactor] Re-use code in "merge_streams"
This commit is contained in:
commit
14cd2a4d9c
1 changed files with 3 additions and 60 deletions
|
|
@ -1,15 +1,4 @@
|
|||
use std::{
|
||||
sync::atomic::{AtomicU64, Ordering},
|
||||
task::Poll,
|
||||
};
|
||||
|
||||
use futures::Stream;
|
||||
|
||||
struct MergedStreams<S1, S2> {
|
||||
poll_count: AtomicU64,
|
||||
s1: futures::stream::Fuse<S1>,
|
||||
s2: futures::stream::Fuse<S2>,
|
||||
}
|
||||
use futures::stream::Stream;
|
||||
|
||||
pub fn merge_streams<
|
||||
I,
|
||||
|
|
@ -19,52 +8,6 @@ pub fn merge_streams<
|
|||
s1: S1,
|
||||
s2: S2,
|
||||
) -> impl Stream<Item = I> + Unpin + 'static {
|
||||
use futures::stream::StreamExt;
|
||||
MergedStreams {
|
||||
poll_count: AtomicU64::new(0),
|
||||
s1: s1.fuse(),
|
||||
s2: s2.fuse(),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_two<I, S1: Stream<Item = I> + Unpin, S2: Stream<Item = I> + Unpin>(
|
||||
s1: &mut S1,
|
||||
s2: &mut S2,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<I>> {
|
||||
use futures::StreamExt;
|
||||
let s1p = s1.poll_next_unpin(cx);
|
||||
match s1p {
|
||||
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
|
||||
Poll::Ready(None) => match s2.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => Poll::Pending,
|
||||
},
|
||||
Poll::Pending => match s2.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
|
||||
Poll::Ready(None) | Poll::Pending => Poll::Pending,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
impl<S1, S2, I> Stream for MergedStreams<S1, S2>
|
||||
where
|
||||
S1: Stream<Item = I> + Unpin,
|
||||
S2: Stream<Item = I> + Unpin,
|
||||
{
|
||||
type Item = I;
|
||||
|
||||
fn poll_next(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
let s1_first = this.poll_count.fetch_add(1, Ordering::Relaxed) % 2 == 0;
|
||||
if s1_first {
|
||||
poll_two(&mut this.s1, &mut this.s2, cx)
|
||||
} else {
|
||||
poll_two(&mut this.s2, &mut this.s1, cx)
|
||||
}
|
||||
}
|
||||
use tokio_stream::StreamExt;
|
||||
s1.merge(s2)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue