From bf77beb91ef9a0f5eb77406437603cf0c3580863 Mon Sep 17 00:00:00 2001 From: Lucy Date: Thu, 23 Jun 2022 10:42:50 -0400 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Use=20a=20channel=20rather?= =?UTF-8?q?=20than=20a=20closure=20for=20`ProcessHandler`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/process.rs | 39 +++++++++++++-------------------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/src/process.rs b/src/process.rs index bc4744f..1189259 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,11 +1,9 @@ // SPDX-License-Identifier: GPL-3.0-only -use std::{ - future::Future, - process::{ExitStatus, Stdio}, -}; +use std::process::{ExitStatus, Stdio}; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, + sync::mpsc::UnboundedSender, }; use tokio_util::sync::CancellationToken; @@ -13,26 +11,18 @@ pub enum ProcessEvent { Started, Stdout(String), Stderr(String), - Ended { status: ExitStatus }, + Ended(Option), } -pub struct ProcessHandler -where - AsyncTask: Future + Send, - Task: Fn(ProcessEvent) -> AsyncTask + Send + 'static, -{ - task: Task, +pub struct ProcessHandler { + tx: UnboundedSender, cancellation_token: CancellationToken, } -impl ProcessHandler -where - AsyncTask: Future + Send, - Task: Fn(ProcessEvent) -> AsyncTask + Send + 'static, -{ - pub fn new(task: Task, cancellation_token: &CancellationToken) -> Self { +impl ProcessHandler { + pub fn new(tx: UnboundedSender, cancellation_token: &CancellationToken) -> Self { Self { - task, + tx, cancellation_token: cancellation_token.child_token(), } } @@ -61,15 +51,13 @@ where }; let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines(); let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines(); - let task = (self.task)(ProcessEvent::Started); - task.await; + std::mem::drop(self.tx.send(ProcessEvent::Started)); loop { tokio::select! { status = child.wait() => match status { Ok(status) => { info!("'{}' exited with status {}", executable, status); - let task = (self.task)(ProcessEvent::Ended { status }); - task.await; + std::mem::drop(self.tx.send(ProcessEvent::Ended(Some(status)))); return; } Err(error) => { @@ -83,8 +71,7 @@ where }, line = stdout.next_line() => match line { Ok(Some(line)) => { - let task = (self.task)(ProcessEvent::Stdout(line)); - task.await; + std::mem::drop(self.tx.send(ProcessEvent::Stdout(line))); }, Ok(None) => (), Err(error) => { @@ -97,8 +84,7 @@ where }, line = stderr.next_line() => match line { Ok(Some(line)) => { - let task = (self.task)(ProcessEvent::Stderr(line)); - task.await; + std::mem::drop(self.tx.send(ProcessEvent::Stderr(line))); }, Ok(None) => (), Err(error) => { @@ -111,6 +97,7 @@ where }, _ = self.cancellation_token.cancelled() => { warn!("exiting '{}': cancelled", executable); + std::mem::drop(self.tx.send(ProcessEvent::Ended(None))); return; } }