improv(feat): Load configs in parallel based on num of CPUs

This commit is contained in:
Michael Aaron Murphy 2021-08-19 18:38:25 +02:00
parent 56c112ebdc
commit 810cf39580
4 changed files with 60 additions and 64 deletions

View file

@ -60,30 +60,26 @@ impl<O: Write> Service<O> {
pub async fn exec(mut self) {
let (service_tx, service_rx) = mpsc::channel(1);
{
let (plugins_tx, mut plugins_rx) = mpsc::channel(8);
let plugin_loader = plugins::external::load::from_paths(plugins_tx);
let plugin_receiver = async {
while let Some((exec, config, regex)) = plugins_rx.recv().await {
tracing::info!("found plugin \"{}\"", exec.display());
if self
.plugins
.iter()
.any(|(_, p)| p.config.name == config.name)
{
tracing::info!("ignoring plugin");
continue;
}
let stream = plugins::external::load::from_paths();
let name = String::from(config.name.as_ref());
futures_lite::pin!(stream);
self.register_plugin(service_tx.clone(), config, regex, move |id, tx| {
ExternalPlugin::new(id, name.clone(), exec.clone(), Vec::new(), tx)
});
}
};
while let Some((exec, config, regex)) = stream.next().await {
tracing::info!("found plugin \"{}\"", exec.display());
if self
.plugins
.iter()
.any(|(_, p)| p.config.name == config.name)
{
tracing::info!("ignoring plugin");
continue;
}
future::zip(plugin_loader, plugin_receiver).await;
let name = String::from(config.name.as_ref());
self.register_plugin(service_tx.clone(), config, regex, move |id, tx| {
ExternalPlugin::new(id, name.clone(), exec.clone(), Vec::new(), tx)
});
}
self.register_plugin(

View file

@ -1,50 +1,25 @@
use crate::PluginConfig;
use futures_lite::{future::zip, Stream, StreamExt};
use postage::mpsc::Sender;
use postage::prelude::Stream as PostageStream;
use postage::prelude::*;
use futures::{stream, Stream, StreamExt};
use regex::Regex;
use std::path::{Path, PathBuf};
use std::path::PathBuf;
/// Fetches plugins installed on the system in parallel.
///
/// Searches plugin paths from highest to least priority. User plugins will override
/// distribution plugins. Plugins are loaded in the order they are found.
pub async fn from_paths(mut tx: Sender<(PathBuf, PluginConfig, Option<Regex>)>) {
let (mut tasks_tx, mut tasks_rx) = postage::mpsc::channel(8);
// Spawns a background task to run in parallel for each plugin found
let task_spawner = async move {
for path in crate::plugin_paths() {
let loadable_plugins = from_path(&path);
futures_lite::pin!(loadable_plugins);
while let Some((source, config)) = loadable_plugins.next().await {
let future = smol::unblock(move || crate::plugins::config::load(&source, &config));
if tasks_tx.send(smol::spawn(future)).await.is_err() {
break;
}
}
}
};
// This future ensures that plugins are returned in the order they were spawned.
let task_listener = async move {
while let Some(task) = tasks_rx.recv().await {
if let Some(plugin) = task.await {
if tx.send(plugin).await.is_err() {
break;
}
}
}
};
zip(task_spawner, task_listener).await;
pub fn from_paths() -> impl Stream<Item = (PathBuf, PluginConfig, Option<Regex>)> {
stream::iter(crate::plugin_paths())
.flat_map(|path| from_path(path.to_path_buf()))
.map(|(source, config)| {
smol::unblock(move || crate::plugins::config::load(&source, &config))
})
.buffered(num_cpus::get())
.filter_map(|x| async move { x })
}
/// Loads all plugin information found in the given path.
pub fn from_path(path: &Path) -> impl Stream<Item = (PathBuf, PathBuf)> + '_ {
pub fn from_path(path: PathBuf) -> impl Stream<Item = (PathBuf, PathBuf)> {
gen_z::gen_z(move |mut z| async move {
if let Ok(readdir) = path.read_dir() {
for entry in readdir.filter_map(Result::ok) {