diff --git a/runtime/src/task.rs b/runtime/src/task.rs index 7ac9befe..39166593 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -8,7 +8,9 @@ use crate::futures::futures::stream::{self, Stream, StreamExt}; use crate::futures::{BoxStream, MaybeSend, boxed_stream}; use std::convert::Infallible; +use std::pin::Pin; use std::sync::Arc; +use std::task; use std::thread; #[cfg(feature = "sipper")] @@ -257,7 +259,10 @@ impl Task { where T: 'static, { - Self::stream(stream::once(future)) + Self::stream(stream::once(async { + yield_now().await; + future.await + })) } /// Creates a new [`Task`] that runs the given [`Stream`] and produces @@ -267,7 +272,11 @@ impl Task { T: 'static, { Self { - stream: Some(boxed_stream(stream.map(Action::Output))), + stream: Some(boxed_stream( + stream::once(yield_now()) + .filter_map(|_| async { None }) + .chain(stream.map(Action::Output)), + )), units: 1, } } @@ -511,3 +520,30 @@ where stream::once(error_receiver).filter_map(async |result| result.ok()), )) } + +async fn yield_now() { + struct YieldNow { + yielded: bool, + } + + impl Future for YieldNow { + type Output = (); + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll<()> { + if self.yielded { + return task::Poll::Ready(()); + } + + self.yielded = true; + + cx.waker().wake_by_ref(); + + task::Poll::Pending + } + } + + YieldNow { yielded: false }.await; +}