improv(service): Plugins shall respond directly, without forwarding middleman
This commit is contained in:
parent
2805d43b2b
commit
2287c40a38
4 changed files with 51 additions and 42 deletions
|
|
@ -75,8 +75,8 @@ impl<O: Write> Service<O> {
|
|||
|
||||
let name = String::from(config.name.as_ref());
|
||||
|
||||
self.register_plugin(service_tx.clone(), config, regex, move |tx| {
|
||||
ExternalPlugin::new(name.clone(), exec.clone(), Vec::new(), tx)
|
||||
self.register_plugin(service_tx.clone(), config, regex, move |id, tx| {
|
||||
ExternalPlugin::new(id, name.clone(), exec.clone(), Vec::new(), tx)
|
||||
});
|
||||
}
|
||||
};
|
||||
|
|
@ -90,7 +90,7 @@ impl<O: Write> Service<O> {
|
|||
service_tx.clone(),
|
||||
plugins::help::CONFIG,
|
||||
Some(Regex::new(plugins::help::REGEX.as_ref()).expect("failed to compile help regex")),
|
||||
move |tx| HelpPlugin::new(internal.clone(), tx),
|
||||
move |id, tx| HelpPlugin::new(id, internal.clone(), tx),
|
||||
);
|
||||
|
||||
let f1 = request_handler(service_tx);
|
||||
|
|
@ -154,15 +154,13 @@ impl<O: Write> Service<O> {
|
|||
}
|
||||
}
|
||||
|
||||
fn register_plugin<P: Plugin, I: Fn(Sender<PluginResponse>) -> P + Send + Sync + 'static>(
|
||||
fn register_plugin<P: Plugin, I: Fn(usize, Sender<Event>) -> P + Send + Sync + 'static>(
|
||||
&mut self,
|
||||
service_tx: Sender<Event>,
|
||||
config: PluginConfig,
|
||||
regex: Option<regex::Regex>,
|
||||
init: I,
|
||||
) {
|
||||
let (plugin_tx, plugin_rx) = unbounded();
|
||||
|
||||
let entry = self.plugins.vacant_entry();
|
||||
let id = entry.key();
|
||||
|
||||
|
|
@ -175,16 +173,9 @@ impl<O: Write> Service<O> {
|
|||
let (request_tx, request_rx) = unbounded();
|
||||
|
||||
let init = init.clone();
|
||||
let plugin_tx = plugin_tx.clone();
|
||||
let plugin_rx = plugin_rx.clone();
|
||||
let service_tx = service_tx.clone();
|
||||
smol::spawn(async move {
|
||||
let mut plugin = init(plugin_tx);
|
||||
|
||||
let f1 = plugin.run(request_rx);
|
||||
let f2 = plugin_forwarder(id, plugin_rx, service_tx);
|
||||
|
||||
future::zip(f1, f2).await;
|
||||
init(id, service_tx).run(request_rx).await;
|
||||
})
|
||||
.detach();
|
||||
|
||||
|
|
@ -200,6 +191,7 @@ impl<O: Write> Service<O> {
|
|||
}
|
||||
|
||||
fn append(&mut self, plugin: PluginKey, append: PluginSearchResult) {
|
||||
eprintln!("appending {:?}", append);
|
||||
self.active_search.push((plugin, append));
|
||||
}
|
||||
|
||||
|
|
@ -229,6 +221,8 @@ impl<O: Write> Service<O> {
|
|||
return;
|
||||
}
|
||||
|
||||
eprintln!("updating with {:?}", self.active_search);
|
||||
|
||||
let search_list = self.sort();
|
||||
self.respond(&Response::Update(search_list))
|
||||
}
|
||||
|
|
@ -461,18 +455,6 @@ impl<O: Write> Service<O> {
|
|||
}
|
||||
}
|
||||
|
||||
async fn plugin_forwarder(
|
||||
plugin_id: PluginKey,
|
||||
receiver: Receiver<PluginResponse>,
|
||||
forwarder: Sender<Event>,
|
||||
) {
|
||||
while let Ok(response) = receiver.recv_async().await {
|
||||
let _ = forwarder.send(Event::Response((plugin_id, response)));
|
||||
}
|
||||
|
||||
let _ = forwarder.send(Event::PluginExit(plugin_id));
|
||||
}
|
||||
|
||||
/// Handles Requests received from a frontend
|
||||
async fn request_handler(tx: Sender<Event>) {
|
||||
let mut requested_to_exit = false;
|
||||
|
|
|
|||
24
service/src/plugins/external/mod.rs
vendored
24
service/src/plugins/external/mod.rs
vendored
|
|
@ -9,7 +9,7 @@ use std::{
|
|||
},
|
||||
};
|
||||
|
||||
use crate::{Plugin, PluginResponse, Request};
|
||||
use crate::{Event, Plugin, PluginResponse, Request};
|
||||
use async_oneshot::oneshot;
|
||||
use flume::Sender;
|
||||
use futures_lite::{AsyncWriteExt, FutureExt, StreamExt};
|
||||
|
|
@ -20,7 +20,8 @@ use smol::{
|
|||
use tracing::{event, Level};
|
||||
|
||||
pub struct ExternalPlugin {
|
||||
tx: Sender<PluginResponse>,
|
||||
id: usize,
|
||||
tx: Sender<Event>,
|
||||
name: String,
|
||||
pub cmd: PathBuf,
|
||||
pub args: Vec<String>,
|
||||
|
|
@ -30,8 +31,15 @@ pub struct ExternalPlugin {
|
|||
}
|
||||
|
||||
impl ExternalPlugin {
|
||||
pub fn new(name: String, cmd: PathBuf, args: Vec<String>, tx: Sender<PluginResponse>) -> Self {
|
||||
pub fn new(
|
||||
id: usize,
|
||||
name: String,
|
||||
cmd: PathBuf,
|
||||
args: Vec<String>,
|
||||
tx: Sender<Event>,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
name,
|
||||
tx,
|
||||
cmd,
|
||||
|
|
@ -60,6 +68,7 @@ impl ExternalPlugin {
|
|||
let (trip_tx, trip_rx) = oneshot::<()>();
|
||||
let tx = self.tx.clone();
|
||||
let name = self.name().to_owned();
|
||||
let id = self.id;
|
||||
|
||||
// Spawn a background task to forward JSON responses from the child process.
|
||||
let task = smol::spawn(async move {
|
||||
|
|
@ -79,7 +88,7 @@ impl ExternalPlugin {
|
|||
}
|
||||
|
||||
tracing::debug!("{}: responding with {:?}", name_, response);
|
||||
let _ = tx_.send(response);
|
||||
let _ = tx_.send(Event::Response((id, response)));
|
||||
}
|
||||
Err(why) => {
|
||||
event!(Level::ERROR, "{}: serde error: {:?}", name_, why);
|
||||
|
|
@ -98,7 +107,7 @@ impl ExternalPlugin {
|
|||
|
||||
// Ensure that a task that was searching sends a finished signal if it dies.
|
||||
if searching.swap(false, Ordering::SeqCst) {
|
||||
let _ = tx.send(PluginResponse::Finished);
|
||||
let _ = tx.send(Event::Response((id, PluginResponse::Finished)));
|
||||
}
|
||||
|
||||
detached.store(true, Ordering::SeqCst);
|
||||
|
|
@ -183,7 +192,10 @@ impl Plugin for ExternalPlugin {
|
|||
if self.query(&Request::Search(query.to_owned())).await.is_ok() {
|
||||
self.searching.store(true, Ordering::SeqCst);
|
||||
} else {
|
||||
let _ = self.tx.send_async(PluginResponse::Finished).await;
|
||||
let _ = self
|
||||
.tx
|
||||
.send_async(Event::Response((self.id, PluginResponse::Finished)))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
async fn quit(&mut self, id: u32) {
|
||||
|
|
|
|||
|
|
@ -20,14 +20,16 @@ pub const CONFIG: PluginConfig = PluginConfig {
|
|||
icon: Some(IconSource::Name(Cow::Borrowed("system-help-symbolic"))),
|
||||
};
|
||||
pub struct HelpPlugin {
|
||||
pub id: usize,
|
||||
pub details: Slab<PluginHelp>,
|
||||
pub internal: Sender<Event>,
|
||||
pub tx: Sender<PluginResponse>,
|
||||
pub tx: Sender<Event>,
|
||||
}
|
||||
|
||||
impl HelpPlugin {
|
||||
pub fn new(internal: Sender<Event>, tx: Sender<PluginResponse>) -> Self {
|
||||
pub fn new(id: usize, internal: Sender<Event>, tx: Sender<Event>) -> Self {
|
||||
Self {
|
||||
id,
|
||||
details: Slab::new(),
|
||||
internal,
|
||||
tx,
|
||||
|
|
@ -46,7 +48,13 @@ impl Plugin for HelpPlugin {
|
|||
async fn activate(&mut self, id: u32) {
|
||||
if let Some(detail) = self.details.get(id as usize) {
|
||||
if let Some(help) = detail.help.as_ref() {
|
||||
let _ = self.tx.send_async(PluginResponse::Fill(help.clone())).await;
|
||||
let _ = self
|
||||
.tx
|
||||
.send_async(Event::Response((
|
||||
self.id,
|
||||
PluginResponse::Fill(help.clone()),
|
||||
)))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -69,19 +77,24 @@ impl Plugin for HelpPlugin {
|
|||
}
|
||||
for (id, detail) in self.details.iter() {
|
||||
if detail.help.is_some() {
|
||||
let response = PluginResponse::Append(PluginSearchResult {
|
||||
id: id as u32,
|
||||
name: detail.name.clone(),
|
||||
description: detail.description.clone(),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let _ = self
|
||||
.tx
|
||||
.send_async(PluginResponse::Append(PluginSearchResult {
|
||||
id: id as u32,
|
||||
name: detail.name.clone(),
|
||||
description: detail.description.clone(),
|
||||
..Default::default()
|
||||
}))
|
||||
.send_async(Event::Response((self.id, response)))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
let _ = self.tx.send_async(PluginResponse::Finished).await;
|
||||
let _ = self
|
||||
.tx
|
||||
.send_async(Event::Response((self.id, PluginResponse::Finished)))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn quit(&mut self, _id: u32) {}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue