diff --git a/Cargo.lock b/Cargo.lock index 901ef9f..ad983da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -566,6 +566,15 @@ dependencies = [ "wasi", ] +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + [[package]] name = "human-sort" version = "0.2.2" @@ -631,9 +640,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" [[package]] name = "mime" @@ -693,6 +702,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.8.0" @@ -835,9 +854,11 @@ dependencies = [ "async-io", "async-oneshot", "async-trait", + "futures", "futures-lite", "futures_codec", "gen-z", + "num_cpus", "pollster", "pop-launcher", "postage", @@ -1231,9 +1252,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.2.19" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab69019741fca4d98be3c62d2b75254528b5432233fd8a4d2739fec20278de48" +checksum = "b9cbe87a2fa7e35900ce5de20220a582a9483a7063811defce79d7cbd59d4cfe" dependencies = [ "ansi_term", "chrono", diff --git a/service/Cargo.toml b/service/Cargo.toml index e544cf3..4c5b55f 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -9,8 +9,13 @@ anyhow = "1" async-io = "1" async-oneshot = "0.5" async-trait = "0.1" +futures = "0.3" futures_codec = "0.4" futures-lite = "1" +gen-z = "0.1" +num_cpus = "1" +pop-launcher = { path = "../" } +postage = "0.4" regex = "1.5" ron = "0.6" serde = { version = "1", features = ["derive"] } @@ -22,10 +27,9 @@ strsim = "0.10" toml = "0.5" tracing = "0.1" tracing-subscriber = { version = "0.2", features = ["fmt"] } -gen-z = "0.1" -pop-launcher = { path = "../" } -postage = "0.4.1" # Required for rustc 1.47 pollster = "=0.2.3" + + diff --git a/service/src/lib.rs b/service/src/lib.rs index 49464a2..3e8330b 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -60,30 +60,26 @@ impl Service { pub async fn exec(mut self) { let (service_tx, service_rx) = mpsc::channel(1); - { - let (plugins_tx, mut plugins_rx) = mpsc::channel(8); - let plugin_loader = plugins::external::load::from_paths(plugins_tx); - let plugin_receiver = async { - while let Some((exec, config, regex)) = plugins_rx.recv().await { - tracing::info!("found plugin \"{}\"", exec.display()); - if self - .plugins - .iter() - .any(|(_, p)| p.config.name == config.name) - { - tracing::info!("ignoring plugin"); - continue; - } + let stream = plugins::external::load::from_paths(); - let name = String::from(config.name.as_ref()); + futures_lite::pin!(stream); - self.register_plugin(service_tx.clone(), config, regex, move |id, tx| { - ExternalPlugin::new(id, name.clone(), exec.clone(), Vec::new(), tx) - }); - } - }; + while let Some((exec, config, regex)) = stream.next().await { + tracing::info!("found plugin \"{}\"", exec.display()); + if self + .plugins + .iter() + .any(|(_, p)| p.config.name == config.name) + { + tracing::info!("ignoring plugin"); + continue; + } - future::zip(plugin_loader, plugin_receiver).await; + let name = String::from(config.name.as_ref()); + + self.register_plugin(service_tx.clone(), config, regex, move |id, tx| { + ExternalPlugin::new(id, name.clone(), exec.clone(), Vec::new(), tx) + }); } self.register_plugin( diff --git a/service/src/plugins/external/load.rs b/service/src/plugins/external/load.rs index 403c562..7c5e47a 100644 --- a/service/src/plugins/external/load.rs +++ b/service/src/plugins/external/load.rs @@ -1,50 +1,25 @@ use crate::PluginConfig; -use futures_lite::{future::zip, Stream, StreamExt}; -use postage::mpsc::Sender; -use postage::prelude::Stream as PostageStream; -use postage::prelude::*; +use futures::{stream, Stream, StreamExt}; use regex::Regex; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; /// Fetches plugins installed on the system in parallel. /// /// Searches plugin paths from highest to least priority. User plugins will override /// distribution plugins. Plugins are loaded in the order they are found. -pub async fn from_paths(mut tx: Sender<(PathBuf, PluginConfig, Option)>) { - let (mut tasks_tx, mut tasks_rx) = postage::mpsc::channel(8); - - // Spawns a background task to run in parallel for each plugin found - let task_spawner = async move { - for path in crate::plugin_paths() { - let loadable_plugins = from_path(&path); - futures_lite::pin!(loadable_plugins); - - while let Some((source, config)) = loadable_plugins.next().await { - let future = smol::unblock(move || crate::plugins::config::load(&source, &config)); - if tasks_tx.send(smol::spawn(future)).await.is_err() { - break; - } - } - } - }; - - // This future ensures that plugins are returned in the order they were spawned. - let task_listener = async move { - while let Some(task) = tasks_rx.recv().await { - if let Some(plugin) = task.await { - if tx.send(plugin).await.is_err() { - break; - } - } - } - }; - - zip(task_spawner, task_listener).await; +pub fn from_paths() -> impl Stream)> { + stream::iter(crate::plugin_paths()) + .flat_map(|path| from_path(path.to_path_buf())) + .map(|(source, config)| { + smol::unblock(move || crate::plugins::config::load(&source, &config)) + }) + .buffered(num_cpus::get()) + .filter_map(|x| async move { x }) } /// Loads all plugin information found in the given path. -pub fn from_path(path: &Path) -> impl Stream + '_ { +pub fn from_path(path: PathBuf) -> impl Stream { gen_z::gen_z(move |mut z| async move { if let Ok(readdir) = path.read_dir() { for entry in readdir.filter_map(Result::ok) {