🚧 Add process handling framework

This commit is contained in:
Lucy 2022-06-22 14:09:59 -04:00
parent 5b9527461f
commit affb81a7a0
No known key found for this signature in database
GPG key ID: EBC517FAD666BBF1
3 changed files with 131 additions and 2 deletions

View file

@ -8,7 +8,9 @@ authors = ["Lucy <lucy@system76.com>"]
publish = false
[dependencies]
color-eyre = "0.6.1"
async-signals = "0.4"
color-eyre = "0.6"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View file

@ -1,4 +1,9 @@
// SPDX-License-Identifier: GPL-3.0-only
#[macro_use]
extern crate tracing;
mod process;
use color_eyre::{eyre::WrapErr, Result};
use tracing::metadata::LevelFilter;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
@ -15,7 +20,9 @@ async fn main() -> Result<()> {
.from_env_lossy(),
)
.try_init()
.wrap_err("failed to initialize logger")?;
.wrap_err("failed to ianitialize logger")?;
info!("Starting cosmic-session");
Ok(())
}

120
src/process.rs Normal file
View file

@ -0,0 +1,120 @@
// SPDX-License-Identifier: GPL-3.0-only
use std::{
future::Future,
process::{ExitStatus, Stdio},
};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
};
use tokio_util::sync::CancellationToken;
pub enum ProcessEvent {
Started,
Stdout(String),
Stderr(String),
Ended { status: ExitStatus },
}
pub struct ProcessHandler<AsyncTask, Task>
where
AsyncTask: Future<Output = ()> + Send,
Task: Fn(ProcessEvent) -> AsyncTask + Send + 'static,
{
task: Task,
cancellation_token: CancellationToken,
}
impl<AsyncTask, Task> ProcessHandler<AsyncTask, Task>
where
AsyncTask: Future<Output = ()> + Send,
Task: Fn(ProcessEvent) -> AsyncTask + Send + 'static,
{
pub fn new(task: Task, cancellation_token: &CancellationToken) -> Self {
Self {
task,
cancellation_token: cancellation_token.child_token(),
}
}
pub fn run(self, executable: impl ToString, args: Vec<String>) {
let executable = executable.to_string();
tokio::spawn(async move {
let mut child = match Command::new(&executable)
.args(&args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
{
Ok(child) => child,
Err(error) => {
error!(
"failed to launch '{} {}': {}",
executable,
args.join(" "),
error
);
return;
}
};
let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines();
let task = (self.task)(ProcessEvent::Started);
task.await;
loop {
tokio::select! {
status = child.wait() => match status {
Ok(status) => {
info!("'{}' exited with status {}", executable, status);
let task = (self.task)(ProcessEvent::Ended { status });
task.await;
return;
}
Err(error) => {
error!(
"failed to wait for '{}' to end: {}",
executable,
error
);
return;
}
},
line = stdout.next_line() => match line {
Ok(Some(line)) => {
let task = (self.task)(ProcessEvent::Stdout(line));
task.await;
},
Ok(None) => (),
Err(error) => {
warn!(
"failed to get stdout line from '{}': {}",
executable,
error
);
}
},
line = stderr.next_line() => match line {
Ok(Some(line)) => {
let task = (self.task)(ProcessEvent::Stderr(line));
task.await;
},
Ok(None) => (),
Err(error) => {
warn!(
"failed to get stderr line from '{}': {}",
executable,
error
);
}
},
_ = self.cancellation_token.cancelled() => {
warn!("exiting '{}': cancelled", executable);
return;
}
}
}
});
}
}