Add filter_map method to Subscription
This commit is contained in:
parent
7c3bdcce4e
commit
24218c5106
1 changed files with 72 additions and 4 deletions
|
|
@ -7,7 +7,7 @@ use crate::core::event;
|
|||
use crate::core::theme;
|
||||
use crate::core::window;
|
||||
use crate::futures::Stream;
|
||||
use crate::{BoxStream, MaybeSend};
|
||||
use crate::{BoxStream, MaybeSend, MaybeSync};
|
||||
|
||||
use std::any::TypeId;
|
||||
use std::hash::Hash;
|
||||
|
|
@ -341,9 +341,7 @@ impl<T> Subscription<T> {
|
|||
) -> BoxStream<Self::Output> {
|
||||
use futures::StreamExt;
|
||||
|
||||
let mapper = self.mapper;
|
||||
|
||||
Box::pin(self.recipe.stream(input).map(mapper))
|
||||
Box::pin(self.recipe.stream(input).map(self.mapper))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -361,6 +359,76 @@ impl<T> Subscription<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Transforms the [`Subscription`] output with the given function, yielding only
|
||||
/// values only when the function returns `Some(A)`.
|
||||
///
|
||||
/// # Panics
|
||||
/// The closure provided must be a non-capturing closure. The method
|
||||
/// will panic in debug mode otherwise.
|
||||
pub fn filter_map<F, A>(mut self, f: F) -> Subscription<A>
|
||||
where
|
||||
T: MaybeSend + 'static,
|
||||
F: Fn(T) -> Option<A> + MaybeSend + MaybeSync + Clone + 'static,
|
||||
A: MaybeSend + 'static,
|
||||
{
|
||||
debug_assert!(
|
||||
std::mem::size_of::<F>() == 0,
|
||||
"the closure {} provided in `Subscription::filter_map` is capturing",
|
||||
std::any::type_name::<F>(),
|
||||
);
|
||||
|
||||
struct FilterMap<A, B, F>
|
||||
where
|
||||
F: Fn(A) -> Option<B> + 'static,
|
||||
{
|
||||
recipe: Box<dyn Recipe<Output = A>>,
|
||||
mapper: F,
|
||||
}
|
||||
|
||||
impl<A, B, F> Recipe for FilterMap<A, B, F>
|
||||
where
|
||||
A: 'static,
|
||||
B: 'static + MaybeSend,
|
||||
F: Fn(A) -> Option<B> + MaybeSend,
|
||||
{
|
||||
type Output = B;
|
||||
|
||||
fn hash(&self, state: &mut Hasher) {
|
||||
TypeId::of::<F>().hash(state);
|
||||
self.recipe.hash(state);
|
||||
}
|
||||
|
||||
fn stream(
|
||||
self: Box<Self>,
|
||||
input: EventStream,
|
||||
) -> BoxStream<Self::Output> {
|
||||
use futures::StreamExt;
|
||||
use futures::future;
|
||||
|
||||
let mapper = self.mapper;
|
||||
|
||||
Box::pin(
|
||||
self.recipe
|
||||
.stream(input)
|
||||
.filter_map(move |a| future::ready(mapper(a))),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Subscription {
|
||||
recipes: self
|
||||
.recipes
|
||||
.drain(..)
|
||||
.map(|recipe| {
|
||||
Box::new(FilterMap {
|
||||
recipe,
|
||||
mapper: f.clone(),
|
||||
}) as Box<dyn Recipe<Output = A>>
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the amount of recipe units in this [`Subscription`].
|
||||
pub fn units(&self) -> usize {
|
||||
self.recipes.len()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue