diff --git a/Cargo.lock b/Cargo.lock index c324f4a..712902b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1284,6 +1284,7 @@ dependencies = [ "async-oneshot", "async-trait", "futures", + "futures-core", "futures-lite", "futures_codec", "gen-z", diff --git a/service/Cargo.toml b/service/Cargo.toml index 3d53720..d7dd8b0 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -2,7 +2,6 @@ name = "pop-launcher-service" version = "1.0.0" edition = "2018" -publish = false [dependencies] anyhow = "1" @@ -27,3 +26,4 @@ strsim = "0.10" toml = "0.5" tracing = "0.1" tracing-subscriber = { version = "0.2", features = ["fmt"] } +futures-core = "0.3.16" diff --git a/service/src/lib.rs b/service/src/lib.rs index 4504574..3fe492b 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -1,10 +1,10 @@ mod plugins; use crate::plugins::*; +use futures_core::Stream; use futures_lite::{future, StreamExt}; use pop_launcher::*; -use postage::mpsc; -use postage::prelude::*; +use postage::{mpsc, prelude::Sink as PostageSink}; use regex::Regex; use slab::Slab; use std::{ @@ -28,36 +28,61 @@ pub struct PluginHelp { } pub async fn main() { - let stdout = io::stdout(); - Service::new(stdout.lock()).exec().await + // Listens for a stream of requests from stdin. + let input_stream = json_input_stream(async_stdin()).filter_map(|result| match result { + Ok(request) => Some(request), + Err(why) => { + tracing::error!("malformed JSON input: {}", why); + None + } + }); + + let (output_tx, mut output_rx) = postage::mpsc::channel(16); + + // Service will operate for as long as it is being awaited + let service = Service::new(output_tx).exec(input_stream); + + // Responses from the service will be streamed to stdout + let responder = async move { + use postage::prelude::Stream; + + let stdout = io::stdout(); + let stdout = &mut stdout.lock(); + + while let Some(response) = output_rx.recv().await { + serialize_out(stdout, &response); + } + }; + + futures_lite::future::zip(service, responder).await; } -pub struct Service { +pub struct Service { active_search: Vec<(PluginKey, PluginSearchResult)>, associated_list: HashMap, awaiting_results: HashSet, last_query: String, - output: O, - plugins: Slab, no_sort: bool, + output: postage::mpsc::Sender, + plugins: Slab, search_scheduled: bool, } -impl Service { - pub fn new(output: O) -> Self { +impl Service { + pub fn new(output: postage::mpsc::Sender) -> Self { Self { active_search: Vec::new(), associated_list: HashMap::new(), awaiting_results: HashSet::new(), last_query: String::new(), output, - plugins: Slab::new(), no_sort: false, + plugins: Slab::new(), search_scheduled: false, } } - pub async fn exec(mut self) { + pub async fn exec(mut self, input: impl Stream) { let (service_tx, service_rx) = mpsc::channel(1); let stream = plugins::external::load::from_paths(); @@ -89,13 +114,14 @@ impl Service { move |id, tx| HelpPlugin::new(id, tx), ); - let f1 = request_handler(service_tx); + let f1 = request_handler(input, service_tx); let f2 = self.response_handler(service_rx); future::zip(f1, f2).await; } async fn response_handler(&mut self, mut service_rx: mpsc::Receiver) { + use postage::prelude::Stream; while let Some(event) = service_rx.recv().await { match event { Event::Request(request) => { @@ -126,18 +152,21 @@ impl Service { Event::Response((plugin, response)) => match response { PluginResponse::Append(item) => self.append(plugin, item), PluginResponse::Clear => self.clear(), - PluginResponse::Close => self.close(), - PluginResponse::Context { id, options } => self.context_response(id, options), - PluginResponse::Fill(text) => self.fill(text), + PluginResponse::Close => self.close().await, + PluginResponse::Context { id, options } => { + self.context_response(id, options).await + } + PluginResponse::Fill(text) => self.fill(text).await, PluginResponse::Finished => self.finished(plugin).await, PluginResponse::DesktopEntry { path, gpu_preference, } => { - self.respond(&Response::DesktopEntry { + self.respond(Response::DesktopEntry { path, gpu_preference, - }); + }) + .await; } // Report the plugin as finished and remove it from future polling @@ -226,14 +255,14 @@ impl Service { self.active_search.clear(); } - fn close(&mut self) { - self.respond(&Response::Close); + async fn close(&mut self) { + self.respond(Response::Close).await; } - fn context_response(&mut self, id: Indice, options: Vec) { + async fn context_response(&mut self, id: Indice, options: Vec) { if let Some(id) = self.associated_list.get(&id) { let id = *id; - self.respond(&Response::Context { id, options }); + self.respond(Response::Context { id, options }).await; } } @@ -249,21 +278,24 @@ impl Service { } } - fn fill(&mut self, text: String) { - self.respond(&Response::Fill(text)); + async fn fill(&mut self, text: String) { + self.respond(Response::Fill(text)).await; } async fn finished(&mut self, plugin: PluginKey) { self.awaiting_results.remove(&plugin); - if self.awaiting_results.is_empty() { - if self.search_scheduled { - self.search(String::new()).await; - return; - } - - let search_list = self.sort(); - self.respond(&Response::Update(search_list)); + if !self.awaiting_results.is_empty() { + return; } + + if self.search_scheduled { + self.search(String::new()).await; + return; + } + + let search_list = self.sort(); + + self.respond(Response::Update(search_list)).await; } async fn interrupt(&mut self) { @@ -280,12 +312,8 @@ impl Service { } } - /// Serializes the launcher's response to stdout - fn respond(&mut self, event: &E) { - if let Ok(mut vec) = serde_json::to_vec(event) { - vec.push(b'\n'); - let _ = self.output.write_all(&vec); - } + async fn respond(&mut self, event: Response) { + let _ = self.output.send(event).await; } async fn search(&mut self, query: String) { @@ -517,29 +545,30 @@ impl Service { } /// Handles Requests received from a frontend -async fn request_handler(mut tx: mpsc::Sender) { +async fn request_handler(input: impl Stream, mut tx: mpsc::Sender) { let mut requested_to_exit = false; - let mut request_stream = json_input_stream(async_stdin()); - while let Some(result) = request_stream.next().await { - match result { - Ok(request) => { - if let Request::Exit = request { - requested_to_exit = true - } + futures_lite::pin!(input); - let _ = tx.send(Event::Request(request)).await; + while let Some(request) = input.next().await { + if let Request::Exit = request { + requested_to_exit = true + } - if requested_to_exit { - break; - } - } + let _ = tx.send(Event::Request(request)).await; - Err(why) => { - tracing::error!("Request JSON is malformed: {}", why); - } + if requested_to_exit { + break; } } tracing::debug!("no longer listening for requests") } + +/// Serializes the launcher's response to stdout +fn serialize_out(output: &mut io::StdoutLock, event: &E) { + if let Ok(mut vec) = serde_json::to_vec(event) { + vec.push(b'\n'); + let _ = output.write_all(&vec); + } +}