fix buggy "merge_two_streams"

This commit is contained in:
Igor Katson 2024-08-18 17:37:03 +01:00
parent 675aecf44b
commit d028e2e3c2
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 41 additions and 11 deletions

View file

@ -7,8 +7,8 @@ use futures::Stream;
struct MergedStreams<S1, S2> { struct MergedStreams<S1, S2> {
poll_count: AtomicU64, poll_count: AtomicU64,
s1: S1, s1: futures::stream::Fuse<S1>,
s2: S2, s2: futures::stream::Fuse<S2>,
} }
pub fn merge_streams< pub fn merge_streams<
@ -19,10 +19,11 @@ pub fn merge_streams<
s1: S1, s1: S1,
s2: S2, s2: S2,
) -> impl Stream<Item = I> + Unpin + 'static { ) -> impl Stream<Item = I> + Unpin + 'static {
use futures::stream::StreamExt;
MergedStreams { MergedStreams {
poll_count: AtomicU64::new(0), poll_count: AtomicU64::new(0),
s1, s1: s1.fuse(),
s2, s2: s2.fuse(),
} }
} }
@ -34,8 +35,16 @@ fn poll_two<I, S1: Stream<Item = I> + Unpin, S2: Stream<Item = I> + Unpin>(
use futures::StreamExt; use futures::StreamExt;
let s1p = s1.poll_next_unpin(cx); let s1p = s1.poll_next_unpin(cx);
match s1p { match s1p {
Poll::Ready(r) => Poll::Ready(r), Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
Poll::Pending => s2.poll_next_unpin(cx), Poll::Ready(None) => match s2.poll_next_unpin(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
},
Poll::Pending => match s2.poll_next_unpin(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
},
} }
} }

View file

@ -199,10 +199,22 @@ fn merge_two_optional_streams<T>(
s2: Option<impl Stream<Item = T> + Unpin + Send + 'static>, s2: Option<impl Stream<Item = T> + Unpin + Send + 'static>,
) -> Option<BoxStream<'static, T>> { ) -> Option<BoxStream<'static, T>> {
match (s1, s2) { match (s1, s2) {
(Some(s1), None) => Some(Box::pin(s1)), (Some(s1), None) => {
(None, Some(s2)) => Some(Box::pin(s2)), debug!("merge_two_optional_streams: using first");
(Some(s1), Some(s2)) => Some(Box::pin(merge_streams(s1, s2))), Some(Box::pin(s1))
(None, None) => None, }
(None, Some(s2)) => {
debug!("merge_two_optional_streams: using second");
Some(Box::pin(s2))
}
(Some(s1), Some(s2)) => {
debug!("merge_two_optional_streams: using both");
Some(Box::pin(merge_streams(s1, s2)))
}
(None, None) => {
debug!("merge_two_optional_streams: using none");
None
}
} }
} }
@ -1096,6 +1108,10 @@ impl Session {
// Merge "initial_peers" and "peer_rx" into one stream. // Merge "initial_peers" and "peer_rx" into one stream.
let peer_rx = merge_two_optional_streams( let peer_rx = merge_two_optional_streams(
if !initial_peers.is_empty() { if !initial_peers.is_empty() {
debug!(
count = initial_peers.len(),
"merging initial peers into peer_rx"
);
Some(futures::stream::iter(initial_peers.into_iter())) Some(futures::stream::iter(initial_peers.into_iter()))
} else { } else {
None None

View file

@ -320,6 +320,8 @@ impl ManagedTorrent {
let live = let live =
TorrentStateLive::new(paused, tx, live_cancellation_token)?; TorrentStateLive::new(paused, tx, live_cancellation_token)?;
g.state = ManagedTorrentState::Live(live.clone()); g.state = ManagedTorrentState::Live(live.clone());
drop(g);
t.state_change_notify.notify_waiters(); t.state_change_notify.notify_waiters();
spawn_fatal_errors_receiver(&t, rx, token); spawn_fatal_errors_receiver(&t, rx, token);
@ -343,6 +345,8 @@ impl ManagedTorrent {
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone())?; let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone())?;
g.state = ManagedTorrentState::Live(live.clone()); g.state = ManagedTorrentState::Live(live.clone());
drop(g);
spawn_fatal_errors_receiver(self, rx, live_cancellation_token); spawn_fatal_errors_receiver(self, rx, live_cancellation_token);
spawn_peer_adder(&live, peer_rx); spawn_peer_adder(&live, peer_rx);
Ok(()) Ok(())
@ -354,9 +358,10 @@ impl ManagedTorrent {
self.storage_factory.create_and_init(self.info())?, self.storage_factory.create_and_init(self.info())?,
)); ));
g.state = ManagedTorrentState::Initializing(initializing.clone()); g.state = ManagedTorrentState::Initializing(initializing.clone());
self.state_change_notify.notify_waiters();
drop(g); drop(g);
self.state_change_notify.notify_waiters();
// Recurse. // Recurse.
self.start( self.start(
peer_rx, peer_rx,