From 24218c51064675d17ea8a05549be28c25aa7e458 Mon Sep 17 00:00:00 2001 From: Craig Watson Date: Fri, 6 Jun 2025 12:59:24 -0700 Subject: [PATCH] Add `filter_map` method to `Subscription` --- futures/src/subscription.rs | 76 +++++++++++++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 4 deletions(-) diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index 58317a73..746fa415 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -7,7 +7,7 @@ use crate::core::event; use crate::core::theme; use crate::core::window; use crate::futures::Stream; -use crate::{BoxStream, MaybeSend}; +use crate::{BoxStream, MaybeSend, MaybeSync}; use std::any::TypeId; use std::hash::Hash; @@ -341,9 +341,7 @@ impl Subscription { ) -> BoxStream { use futures::StreamExt; - let mapper = self.mapper; - - Box::pin(self.recipe.stream(input).map(mapper)) + Box::pin(self.recipe.stream(input).map(self.mapper)) } } @@ -361,6 +359,76 @@ impl Subscription { } } + /// Transforms the [`Subscription`] output with the given function, yielding only + /// values only when the function returns `Some(A)`. + /// + /// # Panics + /// The closure provided must be a non-capturing closure. The method + /// will panic in debug mode otherwise. + pub fn filter_map(mut self, f: F) -> Subscription + where + T: MaybeSend + 'static, + F: Fn(T) -> Option + MaybeSend + MaybeSync + Clone + 'static, + A: MaybeSend + 'static, + { + debug_assert!( + std::mem::size_of::() == 0, + "the closure {} provided in `Subscription::filter_map` is capturing", + std::any::type_name::(), + ); + + struct FilterMap + where + F: Fn(A) -> Option + 'static, + { + recipe: Box>, + mapper: F, + } + + impl Recipe for FilterMap + where + A: 'static, + B: 'static + MaybeSend, + F: Fn(A) -> Option + MaybeSend, + { + type Output = B; + + fn hash(&self, state: &mut Hasher) { + TypeId::of::().hash(state); + self.recipe.hash(state); + } + + fn stream( + self: Box, + input: EventStream, + ) -> BoxStream { + use futures::StreamExt; + use futures::future; + + let mapper = self.mapper; + + Box::pin( + self.recipe + .stream(input) + .filter_map(move |a| future::ready(mapper(a))), + ) + } + } + + Subscription { + recipes: self + .recipes + .drain(..) + .map(|recipe| { + Box::new(FilterMap { + recipe, + mapper: f.clone(), + }) as Box> + }) + .collect(), + } + } + /// Returns the amount of recipe units in this [`Subscription`]. pub fn units(&self) -> usize { self.recipes.len()