Nest Map and With inside Subscription methods
This commit is contained in:
parent
4ed6d18339
commit
7c3bdcce4e
1 changed files with 76 additions and 81 deletions
|
|
@ -249,13 +249,49 @@ impl<T> Subscription<T> {
|
|||
T: 'static,
|
||||
A: std::hash::Hash + Clone + Send + Sync + 'static,
|
||||
{
|
||||
struct With<A, B> {
|
||||
recipe: Box<dyn Recipe<Output = A>>,
|
||||
value: B,
|
||||
}
|
||||
|
||||
impl<A, B> Recipe for With<A, B>
|
||||
where
|
||||
A: 'static,
|
||||
B: 'static + std::hash::Hash + Clone + Send + Sync,
|
||||
{
|
||||
type Output = (B, A);
|
||||
|
||||
fn hash(&self, state: &mut Hasher) {
|
||||
std::any::TypeId::of::<B>().hash(state);
|
||||
self.value.hash(state);
|
||||
self.recipe.hash(state);
|
||||
}
|
||||
|
||||
fn stream(
|
||||
self: Box<Self>,
|
||||
input: EventStream,
|
||||
) -> BoxStream<Self::Output> {
|
||||
use futures::StreamExt;
|
||||
|
||||
let value = self.value;
|
||||
|
||||
Box::pin(
|
||||
self.recipe
|
||||
.stream(input)
|
||||
.map(move |element| (value.clone(), element)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Subscription {
|
||||
recipes: self
|
||||
.recipes
|
||||
.into_iter()
|
||||
.map(|recipe| {
|
||||
Box::new(With::new(recipe, value.clone()))
|
||||
as Box<dyn Recipe<Output = (A, T)>>
|
||||
Box::new(With {
|
||||
recipe,
|
||||
value: value.clone(),
|
||||
}) as Box<dyn Recipe<Output = (A, T)>>
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
|
|
@ -278,13 +314,48 @@ impl<T> Subscription<T> {
|
|||
std::any::type_name::<F>(),
|
||||
);
|
||||
|
||||
struct Map<A, B, F>
|
||||
where
|
||||
F: Fn(A) -> B + 'static,
|
||||
{
|
||||
recipe: Box<dyn Recipe<Output = A>>,
|
||||
mapper: F,
|
||||
}
|
||||
|
||||
impl<A, B, F> Recipe for Map<A, B, F>
|
||||
where
|
||||
A: 'static,
|
||||
B: 'static,
|
||||
F: Fn(A) -> B + 'static + MaybeSend,
|
||||
{
|
||||
type Output = B;
|
||||
|
||||
fn hash(&self, state: &mut Hasher) {
|
||||
TypeId::of::<F>().hash(state);
|
||||
self.recipe.hash(state);
|
||||
}
|
||||
|
||||
fn stream(
|
||||
self: Box<Self>,
|
||||
input: EventStream,
|
||||
) -> BoxStream<Self::Output> {
|
||||
use futures::StreamExt;
|
||||
|
||||
let mapper = self.mapper;
|
||||
|
||||
Box::pin(self.recipe.stream(input).map(mapper))
|
||||
}
|
||||
}
|
||||
|
||||
Subscription {
|
||||
recipes: self
|
||||
.recipes
|
||||
.into_iter()
|
||||
.map(move |recipe| {
|
||||
Box::new(Map::new(recipe, f.clone()))
|
||||
as Box<dyn Recipe<Output = A>>
|
||||
.map(|recipe| {
|
||||
Box::new(Map {
|
||||
recipe,
|
||||
mapper: f.clone(),
|
||||
}) as Box<dyn Recipe<Output = A>>
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
|
|
@ -350,82 +421,6 @@ pub trait Recipe {
|
|||
fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
|
||||
}
|
||||
|
||||
struct Map<A, B, F>
|
||||
where
|
||||
F: Fn(A) -> B + 'static,
|
||||
{
|
||||
recipe: Box<dyn Recipe<Output = A>>,
|
||||
mapper: F,
|
||||
}
|
||||
|
||||
impl<A, B, F> Map<A, B, F>
|
||||
where
|
||||
F: Fn(A) -> B + 'static,
|
||||
{
|
||||
fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: F) -> Self {
|
||||
Map { recipe, mapper }
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B, F> Recipe for Map<A, B, F>
|
||||
where
|
||||
A: 'static,
|
||||
B: 'static,
|
||||
F: Fn(A) -> B + 'static + MaybeSend,
|
||||
{
|
||||
type Output = B;
|
||||
|
||||
fn hash(&self, state: &mut Hasher) {
|
||||
TypeId::of::<F>().hash(state);
|
||||
self.recipe.hash(state);
|
||||
}
|
||||
|
||||
fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
|
||||
use futures::StreamExt;
|
||||
|
||||
let mapper = self.mapper;
|
||||
|
||||
Box::pin(self.recipe.stream(input).map(mapper))
|
||||
}
|
||||
}
|
||||
|
||||
struct With<A, B> {
|
||||
recipe: Box<dyn Recipe<Output = A>>,
|
||||
value: B,
|
||||
}
|
||||
|
||||
impl<A, B> With<A, B> {
|
||||
fn new(recipe: Box<dyn Recipe<Output = A>>, value: B) -> Self {
|
||||
With { recipe, value }
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Recipe for With<A, B>
|
||||
where
|
||||
A: 'static,
|
||||
B: 'static + std::hash::Hash + Clone + Send + Sync,
|
||||
{
|
||||
type Output = (B, A);
|
||||
|
||||
fn hash(&self, state: &mut Hasher) {
|
||||
std::any::TypeId::of::<B>().hash(state);
|
||||
self.value.hash(state);
|
||||
self.recipe.hash(state);
|
||||
}
|
||||
|
||||
fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
|
||||
use futures::StreamExt;
|
||||
|
||||
let value = self.value;
|
||||
|
||||
Box::pin(
|
||||
self.recipe
|
||||
.stream(input)
|
||||
.map(move |element| (value.clone(), element)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a [`Subscription`] from a hashable id and a filter function.
|
||||
pub fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
|
||||
where
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue