diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index e229b6df..58317a73 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -249,13 +249,49 @@ impl Subscription { T: 'static, A: std::hash::Hash + Clone + Send + Sync + 'static, { + struct With { + recipe: Box>, + value: B, + } + + impl Recipe for With + where + A: 'static, + B: 'static + std::hash::Hash + Clone + Send + Sync, + { + type Output = (B, A); + + fn hash(&self, state: &mut Hasher) { + std::any::TypeId::of::().hash(state); + self.value.hash(state); + self.recipe.hash(state); + } + + fn stream( + self: Box, + input: EventStream, + ) -> BoxStream { + use futures::StreamExt; + + let value = self.value; + + Box::pin( + self.recipe + .stream(input) + .map(move |element| (value.clone(), element)), + ) + } + } + Subscription { recipes: self .recipes .into_iter() .map(|recipe| { - Box::new(With::new(recipe, value.clone())) - as Box> + Box::new(With { + recipe, + value: value.clone(), + }) as Box> }) .collect(), } @@ -278,13 +314,48 @@ impl Subscription { std::any::type_name::(), ); + struct Map + where + F: Fn(A) -> B + 'static, + { + recipe: Box>, + mapper: F, + } + + impl Recipe for Map + where + A: 'static, + B: 'static, + F: Fn(A) -> B + 'static + MaybeSend, + { + type Output = B; + + fn hash(&self, state: &mut Hasher) { + TypeId::of::().hash(state); + self.recipe.hash(state); + } + + fn stream( + self: Box, + input: EventStream, + ) -> BoxStream { + use futures::StreamExt; + + let mapper = self.mapper; + + Box::pin(self.recipe.stream(input).map(mapper)) + } + } + Subscription { recipes: self .recipes .into_iter() - .map(move |recipe| { - Box::new(Map::new(recipe, f.clone())) - as Box> + .map(|recipe| { + Box::new(Map { + recipe, + mapper: f.clone(), + }) as Box> }) .collect(), } @@ -350,82 +421,6 @@ pub trait Recipe { fn stream(self: Box, input: EventStream) -> BoxStream; } -struct Map -where - F: Fn(A) -> B + 'static, -{ - recipe: Box>, - mapper: F, -} - -impl Map -where - F: Fn(A) -> B + 'static, -{ - fn new(recipe: Box>, mapper: F) -> Self { - Map { recipe, mapper } - } -} - -impl Recipe for Map -where - A: 'static, - B: 'static, - F: Fn(A) -> B + 'static + MaybeSend, -{ - type Output = B; - - fn hash(&self, state: &mut Hasher) { - TypeId::of::().hash(state); - self.recipe.hash(state); - } - - fn stream(self: Box, input: EventStream) -> BoxStream { - use futures::StreamExt; - - let mapper = self.mapper; - - Box::pin(self.recipe.stream(input).map(mapper)) - } -} - -struct With { - recipe: Box>, - value: B, -} - -impl With { - fn new(recipe: Box>, value: B) -> Self { - With { recipe, value } - } -} - -impl Recipe for With -where - A: 'static, - B: 'static + std::hash::Hash + Clone + Send + Sync, -{ - type Output = (B, A); - - fn hash(&self, state: &mut Hasher) { - std::any::TypeId::of::().hash(state); - self.value.hash(state); - self.recipe.hash(state); - } - - fn stream(self: Box, input: EventStream) -> BoxStream { - use futures::StreamExt; - - let value = self.value; - - Box::pin( - self.recipe - .stream(input) - .map(move |element| (value.clone(), element)), - ) - } -} - /// Creates a [`Subscription`] from a hashable id and a filter function. pub fn filter_map(id: I, f: F) -> Subscription where