improv: Separate components & merge plugins binary with launcher service

This commit is contained in:
Michael Aaron Murphy 2021-08-14 14:19:42 +02:00
parent 43a4229ba7
commit 88acf0a74e
41 changed files with 219 additions and 152 deletions

View file

@ -0,0 +1,99 @@
use regex::Regex;
use serde::Deserialize;
use std::{
borrow::Cow,
path::{Path, PathBuf},
};
#[derive(Debug, Default, Deserialize)]
pub struct PluginConfig {
pub name: Cow<'static, str>,
pub description: Cow<'static, str>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "::serde_with::rust::unwrap_or_skip"
)]
pub bin: Option<PluginBinary>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "::serde_with::rust::unwrap_or_skip"
)]
pub icon: Option<crate::IconSource>,
#[serde(default)]
pub query: PluginQuery,
}
#[derive(Debug, Default, Deserialize)]
pub struct PluginBinary {
path: Cow<'static, str>,
#[serde(default)]
args: Vec<Cow<'static, str>>,
}
#[derive(Debug, Default, Deserialize)]
pub struct PluginQuery {
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "::serde_with::rust::unwrap_or_skip"
)]
pub help: Option<Cow<'static, str>>,
#[serde(default)]
pub isolate: bool,
#[serde(default)]
pub no_sort: bool,
#[serde(default)]
pub persistent: bool,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "::serde_with::rust::unwrap_or_skip"
)]
pub regex: Option<Cow<'static, str>>,
}
pub fn load(source: &Path, config_path: &Path) -> Option<(PathBuf, PluginConfig, Option<Regex>)> {
if let Ok(config_bytes) = std::fs::read_to_string(&config_path) {
let config = match ron::from_str::<PluginConfig>(&config_bytes) {
Ok(config) => config,
Err(why) => {
tracing::error!("malformed config at {}: {}", config_path.display(), why);
return None;
}
};
let exec = if let Some(bin) = config.bin.as_ref() {
if bin.path.starts_with('/') {
PathBuf::from((*bin.path).to_owned())
} else {
source.join(bin.path.as_ref())
}
} else {
tracing::error!(
"bin field is missing from config at {}",
config_path.display()
);
return None;
};
let regex = config
.query
.regex
.as_ref()
.and_then(|p| Regex::new(&*p).ok());
return Some((exec, config, regex));
}
tracing::error!("I/O error reading config at {}", config_path.display());
None
}

76
service/src/plugins/external/load.rs vendored Normal file
View file

@ -0,0 +1,76 @@
use crate::PluginConfig;
use flume::Sender;
use futures_lite::{Stream, StreamExt};
use regex::Regex;
use std::path::{Path, PathBuf};
/// Fetches plugins installed on the system in parallel.
pub async fn from_paths(tx: Sender<(PathBuf, PluginConfig, Option<Regex>)>) {
const PLUGIN_PATHS: &[&str] = &[
// User plugins
".local/share/pop-launcher/plugins/",
// System plugins configured by admin
"/etc/pop-launcher/plugins/",
// Distribution plugins
"/usr/lib/pop-launcher/plugins/",
];
let mut futures = Vec::new();
// Searches plugin paths from highest to least priority.
// User plugins will override distribution plugins.
for path in PLUGIN_PATHS {
let path_buf;
#[allow(deprecated)]
let path = if !path.starts_with('/') {
path_buf = std::env::home_dir()
.expect("user does not have home dir")
.join(path);
path_buf.as_path()
} else {
Path::new(&path)
};
let loadable_plugins = from_path(path);
futures_lite::pin!(loadable_plugins);
// Spawn a background task to parse the config for each plugin found.
while let Some((source, config)) = loadable_plugins.next().await {
let tx = tx.clone();
let future = smol::unblock(move || {
if let Some(plugin) = crate::plugins::config::load(&source, &config) {
let _ = tx.send(plugin);
}
});
futures.push(smol::spawn(future))
}
// Ensures that plugins are loaded in the order that they were spawned.
for future in futures.drain(..) {
future.await;
}
}
}
/// Loads all plugin information found in the given path.
pub fn from_path(path: &Path) -> impl Stream<Item = (PathBuf, PathBuf)> + '_ {
gen_z::gen_z(move |mut z| async move {
if let Ok(readdir) = path.read_dir() {
for entry in readdir.filter_map(Result::ok) {
let source = entry.path();
if !source.is_dir() {
continue;
}
let config = source.join("plugin.ron");
if !config.exists() {
continue;
}
z.send((source, config)).await;
}
}
})
}

192
service/src/plugins/external/mod.rs vendored Normal file
View file

@ -0,0 +1,192 @@
pub mod load;
use std::{
io,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use crate::{Plugin, PluginResponse, Request};
use async_oneshot::oneshot;
use flume::Sender;
use futures_lite::{AsyncWriteExt, FutureExt, StreamExt};
use smol::{
process::{Child, Command, Stdio},
Task,
};
use tracing::{event, Level};
pub struct ExternalPlugin {
tx: Sender<PluginResponse>,
name: String,
pub cmd: PathBuf,
pub args: Vec<String>,
process: Option<(Task<()>, Child, async_oneshot::Sender<()>)>,
detached: Arc<AtomicBool>,
searching: Arc<AtomicBool>,
}
impl ExternalPlugin {
pub fn new(name: String, cmd: PathBuf, args: Vec<String>, tx: Sender<PluginResponse>) -> Self {
Self {
name,
tx,
cmd,
args,
process: None,
detached: Arc::default(),
searching: Arc::default(),
}
}
pub fn launch(&mut self) -> Option<&mut (Task<()>, Child, async_oneshot::Sender<()>)> {
event!(Level::DEBUG, "{}: launching plugin", self.name());
let child = Command::new(&self.cmd)
.args(&self.args)
.stdout(Stdio::piped())
.stdin(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.ok();
if let Some(mut child) = child {
if let Some(stdout) = child.stdout.take() {
let detached = self.detached.clone();
let searching = self.searching.clone();
let (trip_tx, trip_rx) = oneshot::<()>();
let tx = self.tx.clone();
let name = self.name().to_owned();
// Spawn a background task to forward JSON responses from the child process.
let task = smol::spawn(async move {
let tx_ = tx.clone();
let searching_ = searching.clone();
let name_ = name.clone();
// Future for directly handling the JSON output from the process.
let responder = async move {
let mut requests = crate::json_input_stream(stdout);
while let Some(result) = requests.next().await {
match result {
Ok(response) => {
if let PluginResponse::Finished = response {
searching_.store(false, Ordering::SeqCst);
}
tracing::debug!("{}: responding with {:?}", name_, response);
let _ = tx_.send(response);
}
Err(why) => {
event!(Level::ERROR, "{}: serde error: {:?}", name_, why);
}
}
}
tracing::debug!("{}: exiting from responder", name_);
};
let trip = async move {
let _ = trip_rx.await;
};
let _ = responder.or(trip).await;
// Ensure that a task that was searching sends a finished signal if it dies.
if searching.swap(false, Ordering::SeqCst) {
let _ = tx.send(PluginResponse::Finished);
}
detached.store(true, Ordering::SeqCst);
event!(Level::DEBUG, "{}: detached plugin", name);
});
self.process = Some((task, child, trip_tx));
}
}
self.process.as_mut()
}
pub async fn process_check(&mut self) {
if let Some(mut child) = self.process.take() {
match child.1.try_status() {
Err(_) | Ok(Some(_)) => {
child.0.cancel().await;
}
Ok(None) => self.process = Some(child),
}
if self.detached.swap(false, Ordering::SeqCst) {
self.process = None;
}
}
}
pub async fn query(&mut self, event: &Request) -> io::Result<()> {
self.process_check().await;
if self.process.is_none() {
tracing::debug!("{}: relaunching process", self.name());
self.launch();
}
if let Some((_, child, _)) = self.process.as_mut() {
if let Some(stdin) = child.stdin.as_mut() {
if let Ok(mut serialized) = serde_json::to_vec(event) {
serialized.push(b'\n');
let _ = stdin.write_all(&serialized).await?;
tracing::debug!("{}: sent message to external process", self.name());
}
return Ok(());
}
}
Err(io::Error::new(
io::ErrorKind::NotConnected,
"child process could not be reached",
))
}
}
#[async_trait::async_trait]
impl Plugin for ExternalPlugin {
async fn activate(&mut self, id: u32) {
let _ = self.query(&Request::Activate(id)).await;
}
async fn complete(&mut self, id: u32) {
let _ = self.query(&Request::Complete(id)).await;
}
fn exit(&mut self) {
if let Some((_, _, mut trigger)) = self.process.take() {
let _ = trigger.send(());
}
}
async fn interrupt(&mut self) {
let _ = self.query(&Request::Interrupt).await;
}
fn name(&self) -> &str {
&self.name
}
async fn search(&mut self, query: &str) {
if self.query(&Request::Search(query.to_owned())).await.is_ok() {
self.searching.store(true, Ordering::SeqCst);
} else {
let _ = self.tx.send_async(PluginResponse::Finished).await;
}
}
async fn quit(&mut self, id: u32) {
let _ = self.query(&Request::Quit(id)).await;
}
}

View file

@ -0,0 +1,88 @@
use crate::*;
use pop_launcher::*;
use flume::Sender;
use slab::Slab;
use std::borrow::Cow;
pub const REGEX: Cow<'static, str> = Cow::Borrowed("^(\\?).*");
pub const CONFIG: PluginConfig = PluginConfig {
name: Cow::Borrowed("Help"),
description: Cow::Borrowed("Show available plugin prefixes"),
bin: None,
query: PluginQuery {
help: None,
isolate: true,
no_sort: true,
persistent: false,
regex: None,
},
icon: Some(IconSource::Name(Cow::Borrowed("system-help-symbolic"))),
};
pub struct HelpPlugin {
pub details: Slab<PluginHelp>,
pub internal: Sender<Event>,
pub tx: Sender<PluginResponse>,
}
impl HelpPlugin {
pub fn new(internal: Sender<Event>, tx: Sender<PluginResponse>) -> Self {
Self {
details: Slab::new(),
internal,
tx,
}
}
async fn reload(&mut self) {
let (tx, rx) = async_oneshot::oneshot();
let _ = self.internal.send_async(Event::Help(tx)).await;
self.details = rx.await.expect("internal error fetching help info");
}
}
#[async_trait::async_trait]
impl Plugin for HelpPlugin {
async fn activate(&mut self, id: u32) {
if let Some(detail) = self.details.get(id as usize) {
if let Some(help) = detail.help.as_ref() {
let _ = self.tx.send_async(PluginResponse::Fill(help.clone())).await;
}
}
}
async fn complete(&mut self, id: u32) {
self.activate(id).await
}
fn exit(&mut self) {}
async fn interrupt(&mut self) {}
fn name(&self) -> &str {
"help"
}
async fn search(&mut self, _query: &str) {
if self.details.is_empty() {
self.reload().await;
}
for (id, detail) in self.details.iter() {
if detail.help.is_some() {
let _ = self
.tx
.send_async(PluginResponse::Append(PluginSearchResult {
id: id as u32,
name: detail.name.clone(),
description: detail.description.clone(),
..Default::default()
}))
.await;
}
}
let _ = self.tx.send_async(PluginResponse::Finished).await;
}
async fn quit(&mut self, _id: u32) {}
}

124
service/src/plugins/mod.rs Normal file
View file

@ -0,0 +1,124 @@
mod config;
pub(crate) mod external;
pub mod help;
pub use self::config::{PluginBinary, PluginConfig, PluginQuery};
pub use self::external::ExternalPlugin;
pub use self::help::HelpPlugin;
use crate::{PluginHelp, Request};
use async_trait::async_trait;
use flume::{Receiver, Sender};
use regex::Regex;
#[async_trait]
pub trait Plugin
where
Self: Sized + Send,
{
/// Activate the selected ID from this plugin
async fn activate(&mut self, id: u32);
async fn complete(&mut self, id: u32);
fn exit(&mut self);
async fn interrupt(&mut self);
fn name(&self) -> &str;
async fn search(&mut self, query: &str);
async fn quit(&mut self, id: u32);
async fn run(&mut self, rx: Receiver<Request>) {
while let Ok(request) = rx.recv_async().await {
tracing::event!(
tracing::Level::DEBUG,
"{}: received {:?}",
self.name(),
request
);
match request {
Request::Search(query) => self.search(&query).await,
Request::Interrupt => self.interrupt().await,
Request::Activate(id) => self.activate(id).await,
Request::Complete(id) => self.complete(id).await,
Request::Quit(id) => self.quit(id).await,
Request::Exit => {
self.exit();
break;
}
}
}
tracing::event!(tracing::Level::DEBUG, "{}: exiting plugin", self.name());
}
}
/// Stores all information relevant for communicating with a plugin
///
/// Plugins may be requested to exit, and relaunched at any point in the future.
pub struct PluginConnector {
/// The deserialized configuration file for this plugin
pub config: PluginConfig,
/// Code that is executed to prepare a new instance of
/// this plugin to spawn as a background service
pub init: Box<dyn Fn() -> Sender<Request>>,
/// A compiled regular expression that a query must match
/// for the launcher service to justify spawning and sending
/// queries to this plugin
pub regex: Option<Regex>,
/// The sender of the spawned background service that will be
/// forwarded to the launncher service
pub sender: Option<Sender<Request>>,
}
impl PluginConnector {
pub fn new(
config: PluginConfig,
regex: Option<Regex>,
init: Box<dyn Fn() -> Sender<Request> + Send>,
) -> Self {
Self {
config,
init,
regex,
sender: None,
}
}
pub fn details(&self) -> PluginHelp {
PluginHelp {
name: self.config.name.as_ref().to_owned(),
description: self.config.description.as_ref().to_owned(),
help: self
.config
.query
.help
.as_ref()
.map(|x| x.as_ref().to_owned()),
}
}
/// Obtains the sender for sending messages to this plugin.
///
/// If the sender is absent, the plugin is relaunched with a new one.
pub fn sender_exec(&mut self) -> &mut Sender<Request> {
let &mut Self {
ref mut sender,
ref init,
..
} = self;
sender.get_or_insert_with(|| init())
}
/// Drops the sender, which will subsequently drop the plugin forwarder attached to it
pub fn sender_drop(&mut self) {
self.sender = None;
}
}