From 05c80aa312729c602bf8c895680cf6607df46fef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Ram=C3=B3n=20Jim=C3=A9nez?= Date: Fri, 24 Oct 2025 09:05:51 +0200 Subject: [PATCH] Force `yield_now` in `Task::future` and `stream` --- runtime/src/task.rs | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/runtime/src/task.rs b/runtime/src/task.rs index 7ac9befe..39166593 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -8,7 +8,9 @@ use crate::futures::futures::stream::{self, Stream, StreamExt}; use crate::futures::{BoxStream, MaybeSend, boxed_stream}; use std::convert::Infallible; +use std::pin::Pin; use std::sync::Arc; +use std::task; use std::thread; #[cfg(feature = "sipper")] @@ -257,7 +259,10 @@ impl Task { where T: 'static, { - Self::stream(stream::once(future)) + Self::stream(stream::once(async { + yield_now().await; + future.await + })) } /// Creates a new [`Task`] that runs the given [`Stream`] and produces @@ -267,7 +272,11 @@ impl Task { T: 'static, { Self { - stream: Some(boxed_stream(stream.map(Action::Output))), + stream: Some(boxed_stream( + stream::once(yield_now()) + .filter_map(|_| async { None }) + .chain(stream.map(Action::Output)), + )), units: 1, } } @@ -511,3 +520,30 @@ where stream::once(error_receiver).filter_map(async |result| result.ok()), )) } + +async fn yield_now() { + struct YieldNow { + yielded: bool, + } + + impl Future for YieldNow { + type Output = (); + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll<()> { + if self.yielded { + return task::Poll::Ready(()); + } + + self.yielded = true; + + cx.waker().wake_by_ref(); + + task::Poll::Pending + } + } + + YieldNow { yielded: false }.await; +}