refactor: Replace postage with flume

This commit is contained in:
Michael Aaron Murphy 2022-03-27 16:02:25 +02:00 committed by Michael Murphy
parent 805bf6333e
commit e578f2d19c
11 changed files with 192 additions and 112 deletions

View file

@ -7,11 +7,11 @@ mod plugins;
pub use client::*;
use crate::plugins::*;
use flume::{Receiver, Sender};
use futures::SinkExt;
use futures_core::Stream;
use futures_lite::{future, StreamExt};
use pop_launcher::*;
use postage::mpsc;
use regex::Regex;
use slab::Slab;
use std::{
@ -44,17 +44,17 @@ pub async fn main() {
}
});
let (output_tx, mut output_rx) = mpsc::channel(16);
let (output_tx, output_rx) = flume::bounded(16);
// Service will operate for as long as it is being awaited
let service = Service::new(output_tx).exec(input_stream);
let service = Service::new(output_tx.into_sink()).exec(input_stream);
// Responses from the service will be streamed to stdout
let responder = async move {
let stdout = io::stdout();
let stdout = &mut stdout.lock();
while let Some(response) = output_rx.next().await {
while let Ok(response) = output_rx.recv_async().await {
serialize_out(stdout, &response);
}
};
@ -88,7 +88,7 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
}
pub async fn exec(mut self, input: impl Stream<Item = Request>) {
let (service_tx, service_rx) = mpsc::channel(1);
let (service_tx, service_rx) = flume::bounded(1);
let stream = plugins::external::load::from_paths();
futures_lite::pin!(stream);
@ -124,8 +124,8 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
future::or(f1, f2).await;
}
async fn response_handler(&mut self, mut service_rx: mpsc::Receiver<Event>) {
while let Some(event) = service_rx.next().await {
async fn response_handler(&mut self, service_rx: Receiver<Event>) {
while let Ok(event) = service_rx.recv_async().await {
match event {
Event::Request(request) => {
match request {
@ -144,7 +144,7 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
Request::Exit => {
for (_key, plugin) in self.plugins.iter_mut() {
let tx = plugin.sender_exec();
let _ = tx.send(Request::Exit).await;
let _ = tx.send_async(Request::Exit).await;
}
break;
@ -199,12 +199,9 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
}
}
fn register_plugin<
P: Plugin,
I: Fn(usize, mpsc::Sender<Event>) -> P + Send + Sync + 'static,
>(
fn register_plugin<P: Plugin, I: Fn(usize, Sender<Event>) -> P + Send + Sync + 'static>(
&mut self,
service_tx: mpsc::Sender<Event>,
service_tx: Sender<Event>,
config: PluginConfig,
regex: Option<regex::Regex>,
init: I,
@ -225,7 +222,7 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
regex,
isolate_with,
Box::new(move || {
let (request_tx, request_rx) = mpsc::channel(8);
let (request_tx, request_rx) = flume::bounded(8);
let init = init.clone();
let service_tx = service_tx.clone();
@ -241,7 +238,10 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
async fn activate(&mut self, id: Indice) {
if let Some((plugin, meta)) = self.search_result(id as usize) {
let _ = plugin.sender_exec().send(Request::Activate(meta.id)).await;
let _ = plugin
.sender_exec()
.send_async(Request::Activate(meta.id))
.await;
}
}
@ -249,7 +249,7 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
if let Some((plugin, meta)) = self.search_result(id as usize) {
let _ = plugin
.sender_exec()
.send(Request::ActivateContext {
.send_async(Request::ActivateContext {
id: meta.id,
context,
})
@ -278,13 +278,19 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
async fn complete(&mut self, id: Indice) {
if let Some((plugin, meta)) = self.search_result(id as usize) {
let _ = plugin.sender_exec().send(Request::Complete(meta.id)).await;
let _ = plugin
.sender_exec()
.send_async(Request::Complete(meta.id))
.await;
}
}
async fn context(&mut self, id: Indice) {
if let Some((plugin, meta)) = self.search_result(id as usize) {
let _ = plugin.sender_exec().send(Request::Context(meta.id)).await;
let _ = plugin
.sender_exec()
.send_async(Request::Context(meta.id))
.await;
}
}
@ -311,14 +317,17 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
async fn interrupt(&mut self) {
for (_, plugin) in self.plugins.iter_mut() {
if let Some(sender) = plugin.sender.as_mut() {
let _ = sender.send(Request::Interrupt).await;
let _ = sender.send_async(Request::Interrupt).await;
}
}
}
async fn quit(&mut self, id: Indice) {
if let Some((plugin, meta)) = self.search_result(id as usize) {
let _ = plugin.sender_exec().send(Request::Quit(meta.id)).await;
let _ = plugin
.sender_exec()
.send_async(Request::Quit(meta.id))
.await;
}
}
@ -383,7 +392,7 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
if let Some(plugin) = self.plugins.get_mut(isolated) {
if plugin
.sender_exec()
.send(Request::Search(query.to_owned()))
.send_async(Request::Search(query.to_owned()))
.await
.is_ok()
{
@ -396,7 +405,7 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
if let Some(plugin) = self.plugins.get_mut(plugin_id) {
if plugin
.sender_exec()
.send(Request::Search(query.to_owned()))
.send_async(Request::Search(query.to_owned()))
.await
.is_ok()
{
@ -562,7 +571,7 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
}
/// Handles Requests received from a frontend
async fn request_handler(input: impl Stream<Item = Request>, mut tx: mpsc::Sender<Event>) {
async fn request_handler(input: impl Stream<Item = Request>, tx: Sender<Event>) {
let mut requested_to_exit = false;
futures_lite::pin!(input);
@ -572,7 +581,7 @@ async fn request_handler(input: impl Stream<Item = Request>, mut tx: mpsc::Sende
requested_to_exit = true
}
let _ = tx.send(Event::Request(request)).await;
let _ = tx.send_async(Event::Request(request)).await;
if requested_to_exit {
break;

View file

@ -14,9 +14,8 @@ use std::{
use crate::{Event, Indice, Plugin, PluginResponse, Request};
use async_oneshot::oneshot;
use flume::Sender;
use futures_lite::{AsyncWriteExt, FutureExt, StreamExt};
use postage::mpsc::Sender;
use postage::prelude::*;
use smol::{
process::{Child, Command, Stdio},
Task,
@ -70,13 +69,13 @@ impl ExternalPlugin {
let detached = self.detached.clone();
let searching = self.searching.clone();
let (trip_tx, trip_rx) = oneshot::<()>();
let mut tx = self.tx.clone();
let tx = self.tx.clone();
let name = self.name().to_owned();
let id = self.id;
// Spawn a background task to forward JSON responses from the child process.
let task = smol::spawn(async move {
let mut tx_ = tx.clone();
let tx_ = tx.clone();
let searching_ = searching.clone();
let name_ = name.clone();
@ -91,7 +90,7 @@ impl ExternalPlugin {
searching_.store(false, Ordering::SeqCst);
}
let _ = tx_.send(Event::Response((id, response))).await;
let _ = tx_.send_async(Event::Response((id, response))).await;
}
Err(why) => {
tracing::error!("{}: serde error: {:?}", name_, why);
@ -111,7 +110,7 @@ impl ExternalPlugin {
// Ensure that a task that was searching sends a finished signal if it dies.
if searching.swap(false, Ordering::SeqCst) {
let _ = tx
.send(Event::Response((id, PluginResponse::Finished)))
.send_async(Event::Response((id, PluginResponse::Finished)))
.await;
}
@ -207,7 +206,7 @@ impl Plugin for ExternalPlugin {
} else {
let _ = self
.tx
.send(Event::Response((self.id, PluginResponse::Finished)))
.send_async(Event::Response((self.id, PluginResponse::Finished)))
.await;
}
}

View file

@ -2,8 +2,8 @@
// SPDX-License-Identifier: MPL-2.0
use crate::*;
use flume::Sender;
use pop_launcher::*;
use postage::mpsc::Sender;
use slab::Slab;
use std::borrow::Cow;
@ -41,7 +41,7 @@ impl HelpPlugin {
async fn reload(&mut self) {
let (tx, rx) = async_oneshot::oneshot();
let _ = self.tx.send(Event::Help(tx)).await;
let _ = self.tx.send_async(Event::Help(tx)).await;
self.details = rx.await.expect("internal error fetching help info");
}
}
@ -53,7 +53,7 @@ impl Plugin for HelpPlugin {
if let Some(help) = detail.help.as_ref() {
let _ = self
.tx
.send(Event::Response((
.send_async(Event::Response((
self.id,
PluginResponse::Fill(help.clone()),
)))
@ -91,13 +91,16 @@ impl Plugin for HelpPlugin {
..Default::default()
});
let _ = self.tx.send(Event::Response((self.id, response))).await;
let _ = self
.tx
.send_async(Event::Response((self.id, response)))
.await;
}
}
let _ = self
.tx
.send(Event::Response((self.id, PluginResponse::Finished)))
.send_async(Event::Response((self.id, PluginResponse::Finished)))
.await;
}

View file

@ -11,8 +11,7 @@ pub use self::help::HelpPlugin;
use crate::{Indice, PluginHelp, Request};
use async_trait::async_trait;
use postage::mpsc::{Receiver, Sender};
use postage::prelude::*;
use flume::{Receiver, Sender};
use regex::Regex;
#[async_trait]
@ -39,8 +38,8 @@ where
async fn quit(&mut self, id: Indice);
async fn run(&mut self, mut rx: Receiver<Request>) {
while let Some(request) = rx.recv().await {
async fn run(&mut self, rx: Receiver<Request>) {
while let Ok(request) = rx.recv_async().await {
tracing::event!(
tracing::Level::DEBUG,
"{}: received {:?}",
@ -130,7 +129,7 @@ impl PluginConnector {
..
} = self;
sender.get_or_insert_with(|| init())
sender.get_or_insert_with(init)
}
/// Drops the sender, which will subsequently drop the plugin forwarder attached to it