feat(service): Support use as a library

This commit is contained in:
Michael Aaron Murphy 2021-08-25 18:54:59 +02:00
parent 91718c7303
commit 9c491a4f9f
3 changed files with 84 additions and 54 deletions

1
Cargo.lock generated
View file

@ -1284,6 +1284,7 @@ dependencies = [
"async-oneshot",
"async-trait",
"futures",
"futures-core",
"futures-lite",
"futures_codec",
"gen-z",

View file

@ -2,7 +2,6 @@
name = "pop-launcher-service"
version = "1.0.0"
edition = "2018"
publish = false
[dependencies]
anyhow = "1"
@ -27,3 +26,4 @@ strsim = "0.10"
toml = "0.5"
tracing = "0.1"
tracing-subscriber = { version = "0.2", features = ["fmt"] }
futures-core = "0.3.16"

View file

@ -1,10 +1,10 @@
mod plugins;
use crate::plugins::*;
use futures_core::Stream;
use futures_lite::{future, StreamExt};
use pop_launcher::*;
use postage::mpsc;
use postage::prelude::*;
use postage::{mpsc, prelude::Sink as PostageSink};
use regex::Regex;
use slab::Slab;
use std::{
@ -28,36 +28,61 @@ pub struct PluginHelp {
}
pub async fn main() {
let stdout = io::stdout();
Service::new(stdout.lock()).exec().await
// Listens for a stream of requests from stdin.
let input_stream = json_input_stream(async_stdin()).filter_map(|result| match result {
Ok(request) => Some(request),
Err(why) => {
tracing::error!("malformed JSON input: {}", why);
None
}
});
let (output_tx, mut output_rx) = postage::mpsc::channel(16);
// Service will operate for as long as it is being awaited
let service = Service::new(output_tx).exec(input_stream);
// Responses from the service will be streamed to stdout
let responder = async move {
use postage::prelude::Stream;
let stdout = io::stdout();
let stdout = &mut stdout.lock();
while let Some(response) = output_rx.recv().await {
serialize_out(stdout, &response);
}
};
futures_lite::future::zip(service, responder).await;
}
pub struct Service<O> {
pub struct Service {
active_search: Vec<(PluginKey, PluginSearchResult)>,
associated_list: HashMap<Indice, Indice>,
awaiting_results: HashSet<PluginKey>,
last_query: String,
output: O,
plugins: Slab<PluginConnector>,
no_sort: bool,
output: postage::mpsc::Sender<Response>,
plugins: Slab<PluginConnector>,
search_scheduled: bool,
}
impl<O: Write> Service<O> {
pub fn new(output: O) -> Self {
impl Service {
pub fn new(output: postage::mpsc::Sender<Response>) -> Self {
Self {
active_search: Vec::new(),
associated_list: HashMap::new(),
awaiting_results: HashSet::new(),
last_query: String::new(),
output,
plugins: Slab::new(),
no_sort: false,
plugins: Slab::new(),
search_scheduled: false,
}
}
pub async fn exec(mut self) {
pub async fn exec(mut self, input: impl Stream<Item = Request>) {
let (service_tx, service_rx) = mpsc::channel(1);
let stream = plugins::external::load::from_paths();
@ -89,13 +114,14 @@ impl<O: Write> Service<O> {
move |id, tx| HelpPlugin::new(id, tx),
);
let f1 = request_handler(service_tx);
let f1 = request_handler(input, service_tx);
let f2 = self.response_handler(service_rx);
future::zip(f1, f2).await;
}
async fn response_handler(&mut self, mut service_rx: mpsc::Receiver<Event>) {
use postage::prelude::Stream;
while let Some(event) = service_rx.recv().await {
match event {
Event::Request(request) => {
@ -126,18 +152,21 @@ impl<O: Write> Service<O> {
Event::Response((plugin, response)) => match response {
PluginResponse::Append(item) => self.append(plugin, item),
PluginResponse::Clear => self.clear(),
PluginResponse::Close => self.close(),
PluginResponse::Context { id, options } => self.context_response(id, options),
PluginResponse::Fill(text) => self.fill(text),
PluginResponse::Close => self.close().await,
PluginResponse::Context { id, options } => {
self.context_response(id, options).await
}
PluginResponse::Fill(text) => self.fill(text).await,
PluginResponse::Finished => self.finished(plugin).await,
PluginResponse::DesktopEntry {
path,
gpu_preference,
} => {
self.respond(&Response::DesktopEntry {
self.respond(Response::DesktopEntry {
path,
gpu_preference,
});
})
.await;
}
// Report the plugin as finished and remove it from future polling
@ -226,14 +255,14 @@ impl<O: Write> Service<O> {
self.active_search.clear();
}
fn close(&mut self) {
self.respond(&Response::Close);
async fn close(&mut self) {
self.respond(Response::Close).await;
}
fn context_response(&mut self, id: Indice, options: Vec<ContextOption>) {
async fn context_response(&mut self, id: Indice, options: Vec<ContextOption>) {
if let Some(id) = self.associated_list.get(&id) {
let id = *id;
self.respond(&Response::Context { id, options });
self.respond(Response::Context { id, options }).await;
}
}
@ -249,21 +278,24 @@ impl<O: Write> Service<O> {
}
}
fn fill(&mut self, text: String) {
self.respond(&Response::Fill(text));
async fn fill(&mut self, text: String) {
self.respond(Response::Fill(text)).await;
}
async fn finished(&mut self, plugin: PluginKey) {
self.awaiting_results.remove(&plugin);
if self.awaiting_results.is_empty() {
if self.search_scheduled {
self.search(String::new()).await;
return;
}
let search_list = self.sort();
self.respond(&Response::Update(search_list));
if !self.awaiting_results.is_empty() {
return;
}
if self.search_scheduled {
self.search(String::new()).await;
return;
}
let search_list = self.sort();
self.respond(Response::Update(search_list)).await;
}
async fn interrupt(&mut self) {
@ -280,12 +312,8 @@ impl<O: Write> Service<O> {
}
}
/// Serializes the launcher's response to stdout
fn respond<E: serde::Serialize>(&mut self, event: &E) {
if let Ok(mut vec) = serde_json::to_vec(event) {
vec.push(b'\n');
let _ = self.output.write_all(&vec);
}
async fn respond(&mut self, event: Response) {
let _ = self.output.send(event).await;
}
async fn search(&mut self, query: String) {
@ -517,29 +545,30 @@ impl<O: Write> Service<O> {
}
/// Handles Requests received from a frontend
async fn request_handler(mut tx: mpsc::Sender<Event>) {
async fn request_handler(input: impl Stream<Item = Request>, mut tx: mpsc::Sender<Event>) {
let mut requested_to_exit = false;
let mut request_stream = json_input_stream(async_stdin());
while let Some(result) = request_stream.next().await {
match result {
Ok(request) => {
if let Request::Exit = request {
requested_to_exit = true
}
futures_lite::pin!(input);
let _ = tx.send(Event::Request(request)).await;
while let Some(request) = input.next().await {
if let Request::Exit = request {
requested_to_exit = true
}
if requested_to_exit {
break;
}
}
let _ = tx.send(Event::Request(request)).await;
Err(why) => {
tracing::error!("Request JSON is malformed: {}", why);
}
if requested_to_exit {
break;
}
}
tracing::debug!("no longer listening for requests")
}
/// Serializes the launcher's response to stdout
fn serialize_out<E: serde::Serialize>(output: &mut io::StdoutLock, event: &E) {
if let Ok(mut vec) = serde_json::to_vec(event) {
vec.push(b'\n');
let _ = output.write_all(&vec);
}
}