♻️ Use a channel rather than a closure for ProcessHandler

This commit is contained in:
Lucy 2022-06-23 10:42:50 -04:00
parent affb81a7a0
commit bf77beb91e
No known key found for this signature in database
GPG key ID: EBC517FAD666BBF1

View file

@ -1,11 +1,9 @@
// SPDX-License-Identifier: GPL-3.0-only // SPDX-License-Identifier: GPL-3.0-only
use std::{ use std::process::{ExitStatus, Stdio};
future::Future,
process::{ExitStatus, Stdio},
};
use tokio::{ use tokio::{
io::{AsyncBufReadExt, BufReader}, io::{AsyncBufReadExt, BufReader},
process::Command, process::Command,
sync::mpsc::UnboundedSender,
}; };
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -13,26 +11,18 @@ pub enum ProcessEvent {
Started, Started,
Stdout(String), Stdout(String),
Stderr(String), Stderr(String),
Ended { status: ExitStatus }, Ended(Option<ExitStatus>),
} }
pub struct ProcessHandler<AsyncTask, Task> pub struct ProcessHandler {
where tx: UnboundedSender<ProcessEvent>,
AsyncTask: Future<Output = ()> + Send,
Task: Fn(ProcessEvent) -> AsyncTask + Send + 'static,
{
task: Task,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
} }
impl<AsyncTask, Task> ProcessHandler<AsyncTask, Task> impl ProcessHandler {
where pub fn new(tx: UnboundedSender<ProcessEvent>, cancellation_token: &CancellationToken) -> Self {
AsyncTask: Future<Output = ()> + Send,
Task: Fn(ProcessEvent) -> AsyncTask + Send + 'static,
{
pub fn new(task: Task, cancellation_token: &CancellationToken) -> Self {
Self { Self {
task, tx,
cancellation_token: cancellation_token.child_token(), cancellation_token: cancellation_token.child_token(),
} }
} }
@ -61,15 +51,13 @@ where
}; };
let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines(); let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines(); let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines();
let task = (self.task)(ProcessEvent::Started); std::mem::drop(self.tx.send(ProcessEvent::Started));
task.await;
loop { loop {
tokio::select! { tokio::select! {
status = child.wait() => match status { status = child.wait() => match status {
Ok(status) => { Ok(status) => {
info!("'{}' exited with status {}", executable, status); info!("'{}' exited with status {}", executable, status);
let task = (self.task)(ProcessEvent::Ended { status }); std::mem::drop(self.tx.send(ProcessEvent::Ended(Some(status))));
task.await;
return; return;
} }
Err(error) => { Err(error) => {
@ -83,8 +71,7 @@ where
}, },
line = stdout.next_line() => match line { line = stdout.next_line() => match line {
Ok(Some(line)) => { Ok(Some(line)) => {
let task = (self.task)(ProcessEvent::Stdout(line)); std::mem::drop(self.tx.send(ProcessEvent::Stdout(line)));
task.await;
}, },
Ok(None) => (), Ok(None) => (),
Err(error) => { Err(error) => {
@ -97,8 +84,7 @@ where
}, },
line = stderr.next_line() => match line { line = stderr.next_line() => match line {
Ok(Some(line)) => { Ok(Some(line)) => {
let task = (self.task)(ProcessEvent::Stderr(line)); std::mem::drop(self.tx.send(ProcessEvent::Stderr(line)));
task.await;
}, },
Ok(None) => (), Ok(None) => (),
Err(error) => { Err(error) => {
@ -111,6 +97,7 @@ where
}, },
_ = self.cancellation_token.cancelled() => { _ = self.cancellation_token.cancelled() => {
warn!("exiting '{}': cancelled", executable); warn!("exiting '{}': cancelled", executable);
std::mem::drop(self.tx.send(ProcessEvent::Ended(None)));
return; return;
} }
} }