fix(service): Switch from flume to postage to fix out-of-order communication
This commit is contained in:
parent
490c6a6e8b
commit
8b4fbf441f
12 changed files with 156 additions and 208 deletions
|
|
@ -1,10 +1,10 @@
|
|||
mod plugins;
|
||||
|
||||
use crate::plugins::*;
|
||||
use pop_launcher::*;
|
||||
|
||||
use flume::{unbounded, Receiver, Sender};
|
||||
use futures_lite::{future, StreamExt};
|
||||
use pop_launcher::*;
|
||||
use postage::mpsc;
|
||||
use postage::prelude::*;
|
||||
use regex::Regex;
|
||||
use slab::Slab;
|
||||
use std::{
|
||||
|
|
@ -56,13 +56,13 @@ impl<O: Write> Service<O> {
|
|||
}
|
||||
|
||||
pub async fn exec(mut self) {
|
||||
let (service_tx, service_rx) = unbounded();
|
||||
let (service_tx, service_rx) = mpsc::channel(1);
|
||||
|
||||
{
|
||||
let (plugins_tx, plugins_rx) = unbounded();
|
||||
let (plugins_tx, mut plugins_rx) = mpsc::channel(8);
|
||||
let plugin_loader = plugins::external::load::from_paths(plugins_tx);
|
||||
let plugin_receiver = async {
|
||||
while let Ok((exec, config, regex)) = plugins_rx.recv_async().await {
|
||||
while let Some((exec, config, regex)) = plugins_rx.recv().await {
|
||||
tracing::info!("found plugin \"{}\"", exec.display());
|
||||
if self
|
||||
.plugins
|
||||
|
|
@ -84,13 +84,11 @@ impl<O: Write> Service<O> {
|
|||
future::zip(plugin_loader, plugin_receiver).await;
|
||||
}
|
||||
|
||||
let internal = service_tx.clone();
|
||||
|
||||
self.register_plugin(
|
||||
service_tx.clone(),
|
||||
plugins::help::CONFIG,
|
||||
Some(Regex::new(plugins::help::REGEX.as_ref()).expect("failed to compile help regex")),
|
||||
move |id, tx| HelpPlugin::new(id, internal.clone(), tx),
|
||||
move |id, tx| HelpPlugin::new(id, tx),
|
||||
);
|
||||
|
||||
let f1 = request_handler(service_tx);
|
||||
|
|
@ -99,23 +97,23 @@ impl<O: Write> Service<O> {
|
|||
future::zip(f1, f2).await;
|
||||
}
|
||||
|
||||
async fn response_handler(&mut self, service_rx: Receiver<Event>) {
|
||||
while let Ok(event) = service_rx.recv_async().await {
|
||||
async fn response_handler(&mut self, mut service_rx: mpsc::Receiver<Event>) {
|
||||
while let Some(event) = service_rx.recv().await {
|
||||
match event {
|
||||
Event::Request(request) => {
|
||||
match request {
|
||||
Request::Search(query) => self.search(query),
|
||||
Request::Interrupt => self.interrupt(),
|
||||
Request::Activate(id) => self.activate(id),
|
||||
Request::Complete(id) => self.complete(id),
|
||||
Request::Quit(id) => self.quit(id),
|
||||
Request::Search(query) => self.search(query).await,
|
||||
Request::Interrupt => self.interrupt().await,
|
||||
Request::Activate(id) => self.activate(id).await,
|
||||
Request::Complete(id) => self.complete(id).await,
|
||||
Request::Quit(id) => self.quit(id).await,
|
||||
|
||||
// When requested to exit, the service will forward that
|
||||
// request to all of its plugins before exiting itself
|
||||
Request::Exit => {
|
||||
for (_key, plugin) in self.plugins.iter_mut() {
|
||||
let tx = plugin.sender_exec();
|
||||
let _ = tx.send(Request::Exit);
|
||||
let _ = tx.send(Request::Exit).await;
|
||||
}
|
||||
|
||||
break;
|
||||
|
|
@ -128,7 +126,7 @@ impl<O: Write> Service<O> {
|
|||
PluginResponse::Clear => self.clear(),
|
||||
PluginResponse::Close => self.close(),
|
||||
PluginResponse::Fill(text) => self.fill(text),
|
||||
PluginResponse::Finished => self.finished(plugin),
|
||||
PluginResponse::Finished => self.finished(plugin).await,
|
||||
PluginResponse::DesktopEntry(path) => {
|
||||
self.respond(&Response::DesktopEntry(path));
|
||||
}
|
||||
|
|
@ -154,9 +152,12 @@ impl<O: Write> Service<O> {
|
|||
}
|
||||
}
|
||||
|
||||
fn register_plugin<P: Plugin, I: Fn(usize, Sender<Event>) -> P + Send + Sync + 'static>(
|
||||
fn register_plugin<
|
||||
P: Plugin,
|
||||
I: Fn(usize, mpsc::Sender<Event>) -> P + Send + Sync + 'static,
|
||||
>(
|
||||
&mut self,
|
||||
service_tx: Sender<Event>,
|
||||
service_tx: mpsc::Sender<Event>,
|
||||
config: PluginConfig,
|
||||
regex: Option<regex::Regex>,
|
||||
init: I,
|
||||
|
|
@ -170,7 +171,7 @@ impl<O: Write> Service<O> {
|
|||
config,
|
||||
regex,
|
||||
Box::new(move || {
|
||||
let (request_tx, request_rx) = unbounded();
|
||||
let (request_tx, request_rx) = mpsc::channel(8);
|
||||
|
||||
let init = init.clone();
|
||||
let service_tx = service_tx.clone();
|
||||
|
|
@ -184,14 +185,13 @@ impl<O: Write> Service<O> {
|
|||
));
|
||||
}
|
||||
|
||||
fn activate(&mut self, id: u32) {
|
||||
async fn activate(&mut self, id: u32) {
|
||||
if let Some((plugin, meta)) = self.search_result(id as usize) {
|
||||
let _ = plugin.sender_exec().send(Request::Activate(meta.id));
|
||||
let _ = plugin.sender_exec().send(Request::Activate(meta.id)).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn append(&mut self, plugin: PluginKey, append: PluginSearchResult) {
|
||||
eprintln!("appending {:?}", append);
|
||||
self.active_search.push((plugin, append));
|
||||
}
|
||||
|
||||
|
|
@ -203,9 +203,9 @@ impl<O: Write> Service<O> {
|
|||
self.respond(&Response::Close);
|
||||
}
|
||||
|
||||
fn complete(&mut self, id: u32) {
|
||||
async fn complete(&mut self, id: u32) {
|
||||
if let Some((plugin, meta)) = self.search_result(id as usize) {
|
||||
let _ = plugin.sender_exec().send(Request::Complete(meta.id));
|
||||
let _ = plugin.sender_exec().send(Request::Complete(meta.id)).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -213,32 +213,30 @@ impl<O: Write> Service<O> {
|
|||
self.respond(&Response::Fill(text));
|
||||
}
|
||||
|
||||
fn finished(&mut self, plugin: PluginKey) {
|
||||
async fn finished(&mut self, plugin: PluginKey) {
|
||||
self.awaiting_results.remove(&plugin);
|
||||
if self.awaiting_results.is_empty() {
|
||||
if self.search_scheduled {
|
||||
self.search(String::new());
|
||||
self.search(String::new()).await;
|
||||
return;
|
||||
}
|
||||
|
||||
eprintln!("updating with {:?}", self.active_search);
|
||||
|
||||
let search_list = self.sort();
|
||||
self.respond(&Response::Update(search_list))
|
||||
}
|
||||
}
|
||||
|
||||
fn interrupt(&mut self) {
|
||||
async fn interrupt(&mut self) {
|
||||
for (_, plugin) in self.plugins.iter_mut() {
|
||||
if let Some(sender) = plugin.sender.as_mut() {
|
||||
let _ = sender.send(Request::Interrupt);
|
||||
let _ = sender.send(Request::Interrupt).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn quit(&mut self, id: u32) {
|
||||
async fn quit(&mut self, id: u32) {
|
||||
if let Some((plugin, meta)) = self.search_result(id as usize) {
|
||||
let _ = plugin.sender_exec().send(Request::Quit(meta.id));
|
||||
let _ = plugin.sender_exec().send(Request::Quit(meta.id)).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -250,11 +248,11 @@ impl<O: Write> Service<O> {
|
|||
}
|
||||
}
|
||||
|
||||
fn search(&mut self, query: String) {
|
||||
async fn search(&mut self, query: String) {
|
||||
if !self.awaiting_results.is_empty() {
|
||||
tracing::debug!("backing off from search until plugins are ready");
|
||||
if !self.search_scheduled {
|
||||
self.interrupt();
|
||||
self.interrupt().await;
|
||||
self.search_scheduled = true;
|
||||
}
|
||||
|
||||
|
|
@ -300,9 +298,9 @@ impl<O: Write> Service<O> {
|
|||
if plugin
|
||||
.sender_exec()
|
||||
.send(Request::Search(query.to_owned()))
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
tracing::debug!("submitted query to {}", plugin.config.name);
|
||||
self.awaiting_results.insert(isolated);
|
||||
self.no_sort = plugin.config.query.no_sort;
|
||||
}
|
||||
|
|
@ -313,9 +311,9 @@ impl<O: Write> Service<O> {
|
|||
if plugin
|
||||
.sender_exec()
|
||||
.send(Request::Search(query.to_owned()))
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
tracing::debug!("submitted query to {}", plugin.config.name);
|
||||
self.awaiting_results.insert(plugin_id);
|
||||
}
|
||||
}
|
||||
|
|
@ -456,7 +454,7 @@ impl<O: Write> Service<O> {
|
|||
}
|
||||
|
||||
/// Handles Requests received from a frontend
|
||||
async fn request_handler(tx: Sender<Event>) {
|
||||
async fn request_handler(mut tx: mpsc::Sender<Event>) {
|
||||
let mut requested_to_exit = false;
|
||||
let mut request_stream = json_input_stream(async_stdin());
|
||||
|
||||
|
|
@ -467,7 +465,7 @@ async fn request_handler(tx: Sender<Event>) {
|
|||
requested_to_exit = true
|
||||
}
|
||||
|
||||
let _ = tx.send(Event::Request(request));
|
||||
let _ = tx.send(Event::Request(request)).await;
|
||||
|
||||
if requested_to_exit {
|
||||
break;
|
||||
|
|
|
|||
14
service/src/plugins/external/load.rs
vendored
14
service/src/plugins/external/load.rs
vendored
|
|
@ -1,7 +1,9 @@
|
|||
use crate::PluginConfig;
|
||||
|
||||
use flume::Sender;
|
||||
use futures_lite::{future::zip, Stream, StreamExt};
|
||||
use postage::mpsc::Sender;
|
||||
use postage::prelude::Stream as PostageStream;
|
||||
use postage::prelude::*;
|
||||
use regex::Regex;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
|
|
@ -9,8 +11,8 @@ use std::path::{Path, PathBuf};
|
|||
///
|
||||
/// Searches plugin paths from highest to least priority. User plugins will override
|
||||
/// distribution plugins. Plugins are loaded in the order they are found.
|
||||
pub async fn from_paths(tx: Sender<(PathBuf, PluginConfig, Option<Regex>)>) {
|
||||
let (tasks_tx, tasks_rx) = flume::unbounded();
|
||||
pub async fn from_paths(mut tx: Sender<(PathBuf, PluginConfig, Option<Regex>)>) {
|
||||
let (mut tasks_tx, mut tasks_rx) = postage::mpsc::channel(8);
|
||||
|
||||
// Spawns a background task to run in parallel for each plugin found
|
||||
let task_spawner = async move {
|
||||
|
|
@ -20,7 +22,7 @@ pub async fn from_paths(tx: Sender<(PathBuf, PluginConfig, Option<Regex>)>) {
|
|||
|
||||
while let Some((source, config)) = loadable_plugins.next().await {
|
||||
let future = smol::unblock(move || crate::plugins::config::load(&source, &config));
|
||||
if tasks_tx.send_async(smol::spawn(future)).await.is_err() {
|
||||
if tasks_tx.send(smol::spawn(future)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -29,9 +31,9 @@ pub async fn from_paths(tx: Sender<(PathBuf, PluginConfig, Option<Regex>)>) {
|
|||
|
||||
// This future ensures that plugins are returned in the order they were spawned.
|
||||
let task_listener = async move {
|
||||
while let Ok(task) = tasks_rx.recv_async().await {
|
||||
while let Some(task) = tasks_rx.recv().await {
|
||||
if let Some(plugin) = task.await {
|
||||
if tx.send_async(plugin).await.is_err() {
|
||||
if tx.send(plugin).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
18
service/src/plugins/external/mod.rs
vendored
18
service/src/plugins/external/mod.rs
vendored
|
|
@ -11,8 +11,9 @@ use std::{
|
|||
|
||||
use crate::{Event, Plugin, PluginResponse, Request};
|
||||
use async_oneshot::oneshot;
|
||||
use flume::Sender;
|
||||
use futures_lite::{AsyncWriteExt, FutureExt, StreamExt};
|
||||
use postage::mpsc::Sender;
|
||||
use postage::prelude::*;
|
||||
use smol::{
|
||||
process::{Child, Command, Stdio},
|
||||
Task,
|
||||
|
|
@ -66,13 +67,13 @@ impl ExternalPlugin {
|
|||
let detached = self.detached.clone();
|
||||
let searching = self.searching.clone();
|
||||
let (trip_tx, trip_rx) = oneshot::<()>();
|
||||
let tx = self.tx.clone();
|
||||
let mut tx = self.tx.clone();
|
||||
let name = self.name().to_owned();
|
||||
let id = self.id;
|
||||
|
||||
// Spawn a background task to forward JSON responses from the child process.
|
||||
let task = smol::spawn(async move {
|
||||
let tx_ = tx.clone();
|
||||
let mut tx_ = tx.clone();
|
||||
let searching_ = searching.clone();
|
||||
let name_ = name.clone();
|
||||
|
||||
|
|
@ -87,11 +88,10 @@ impl ExternalPlugin {
|
|||
searching_.store(false, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
tracing::debug!("{}: responding with {:?}", name_, response);
|
||||
let _ = tx_.send(Event::Response((id, response)));
|
||||
let _ = tx_.send(Event::Response((id, response))).await;
|
||||
}
|
||||
Err(why) => {
|
||||
event!(Level::ERROR, "{}: serde error: {:?}", name_, why);
|
||||
tracing::error!("{}: serde error: {:?}", name_, why);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -107,7 +107,9 @@ impl ExternalPlugin {
|
|||
|
||||
// Ensure that a task that was searching sends a finished signal if it dies.
|
||||
if searching.swap(false, Ordering::SeqCst) {
|
||||
let _ = tx.send(Event::Response((id, PluginResponse::Finished)));
|
||||
let _ = tx
|
||||
.send(Event::Response((id, PluginResponse::Finished)))
|
||||
.await;
|
||||
}
|
||||
|
||||
detached.store(true, Ordering::SeqCst);
|
||||
|
|
@ -194,7 +196,7 @@ impl Plugin for ExternalPlugin {
|
|||
} else {
|
||||
let _ = self
|
||||
.tx
|
||||
.send_async(Event::Response((self.id, PluginResponse::Finished)))
|
||||
.send(Event::Response((self.id, PluginResponse::Finished)))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use crate::*;
|
||||
use flume::Sender;
|
||||
use pop_launcher::*;
|
||||
use postage::mpsc::Sender;
|
||||
use postage::prelude::*;
|
||||
use slab::Slab;
|
||||
use std::borrow::Cow;
|
||||
|
||||
|
|
@ -22,23 +23,21 @@ pub const CONFIG: PluginConfig = PluginConfig {
|
|||
pub struct HelpPlugin {
|
||||
pub id: usize,
|
||||
pub details: Slab<PluginHelp>,
|
||||
pub internal: Sender<Event>,
|
||||
pub tx: Sender<Event>,
|
||||
}
|
||||
|
||||
impl HelpPlugin {
|
||||
pub fn new(id: usize, internal: Sender<Event>, tx: Sender<Event>) -> Self {
|
||||
pub fn new(id: usize, tx: Sender<Event>) -> Self {
|
||||
Self {
|
||||
id,
|
||||
details: Slab::new(),
|
||||
internal,
|
||||
tx,
|
||||
}
|
||||
}
|
||||
|
||||
async fn reload(&mut self) {
|
||||
let (tx, rx) = async_oneshot::oneshot();
|
||||
let _ = self.internal.send_async(Event::Help(tx)).await;
|
||||
let _ = self.tx.send(Event::Help(tx)).await;
|
||||
self.details = rx.await.expect("internal error fetching help info");
|
||||
}
|
||||
}
|
||||
|
|
@ -50,7 +49,7 @@ impl Plugin for HelpPlugin {
|
|||
if let Some(help) = detail.help.as_ref() {
|
||||
let _ = self
|
||||
.tx
|
||||
.send_async(Event::Response((
|
||||
.send(Event::Response((
|
||||
self.id,
|
||||
PluginResponse::Fill(help.clone()),
|
||||
)))
|
||||
|
|
@ -84,16 +83,13 @@ impl Plugin for HelpPlugin {
|
|||
..Default::default()
|
||||
});
|
||||
|
||||
let _ = self
|
||||
.tx
|
||||
.send_async(Event::Response((self.id, response)))
|
||||
.await;
|
||||
let _ = self.tx.send(Event::Response((self.id, response))).await;
|
||||
}
|
||||
}
|
||||
|
||||
let _ = self
|
||||
.tx
|
||||
.send_async(Event::Response((self.id, PluginResponse::Finished)))
|
||||
.send(Event::Response((self.id, PluginResponse::Finished)))
|
||||
.await;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,8 @@ pub use self::help::HelpPlugin;
|
|||
|
||||
use crate::{PluginHelp, Request};
|
||||
use async_trait::async_trait;
|
||||
use flume::{Receiver, Sender};
|
||||
use postage::mpsc::{Receiver, Sender};
|
||||
use postage::prelude::*;
|
||||
use regex::Regex;
|
||||
|
||||
#[async_trait]
|
||||
|
|
@ -31,8 +32,8 @@ where
|
|||
|
||||
async fn quit(&mut self, id: u32);
|
||||
|
||||
async fn run(&mut self, rx: Receiver<Request>) {
|
||||
while let Ok(request) = rx.recv_async().await {
|
||||
async fn run(&mut self, mut rx: Receiver<Request>) {
|
||||
while let Some(request) = rx.recv().await {
|
||||
tracing::event!(
|
||||
tracing::Level::DEBUG,
|
||||
"{}: received {:?}",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue