feat: Switch from smol runtime to tokio

This commit is contained in:
Michael Aaron Murphy 2022-03-27 17:38:37 +02:00 committed by Michael Murphy
parent 4153f9f060
commit dbfb3921ae
23 changed files with 242 additions and 235 deletions

View file

@ -1,10 +1,12 @@
// Copyright 2021 System76 <info@system76.com>
// SPDX-License-Identifier: MPL-2.0
use async_process as process;
use futures::{AsyncBufReadExt, AsyncWriteExt, Stream, StreamExt};
use futures::{Stream, StreamExt};
use pop_launcher::{Request, Response};
use std::io;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::process;
use tokio_stream::wrappers::LinesStream;
pub struct IpcClient {
pub child: process::Child,
@ -14,8 +16,8 @@ pub struct IpcClient {
impl IpcClient {
pub fn new() -> io::Result<(Self, impl Stream<Item = Response>)> {
let mut child = process::Command::new("pop-launcher")
.stdin(process::Stdio::piped())
.stdout(process::Stdio::piped())
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()?;
let stdin = child
@ -28,18 +30,17 @@ impl IpcClient {
.take()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "failed to find child stdout"))?;
let responses =
futures::io::BufReader::new(stdout)
.lines()
.filter_map(|result| async move {
if let Ok(line) = result {
if let Ok(event) = serde_json::from_str::<Response>(&line) {
return Some(event);
}
let responses = LinesStream::new(tokio::io::BufReader::new(stdout).lines()).filter_map(
|result| async move {
if let Ok(line) = result {
if let Ok(event) = serde_json::from_str::<Response>(&line) {
return Some(event);
}
}
None
});
None
},
);
let client = Self { child, stdin };
@ -57,6 +58,6 @@ impl IpcClient {
pub async fn exit(mut self) {
let _ = self.send(Request::Exit).await;
let _ = self.child.status().await;
let _ = self.child.wait().await;
}
}

View file

@ -34,7 +34,7 @@ pub struct PluginHelp {
pub async fn main() {
// Listens for a stream of requests from stdin.
let input_stream = json_input_stream(async_stdin()).filter_map(|result| {
let input_stream = json_input_stream(tokio::io::stdin()).filter_map(|result| {
future::ready(match result {
Ok(request) => Some(request),
Err(why) => {
@ -124,7 +124,7 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
futures::pin_mut!(f1);
futures::pin_mut!(f2);
futures::future::select(f1, f2).await.factor_first().0;
let _ = futures::future::select(f1, f2).await.factor_first().0;
}
async fn response_handler(&mut self, service_rx: Receiver<Event>) {
@ -229,10 +229,9 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
let init = init.clone();
let service_tx = service_tx.clone();
smol::spawn(async move {
tokio::spawn(async move {
init(id, service_tx).run(request_rx).await;
})
.detach();
});
request_tx
}),

View file

@ -15,10 +15,10 @@ pub fn from_paths() -> impl Stream<Item = (PathBuf, PluginConfig, Option<Regex>)
stream::iter(crate::plugin_paths())
.flat_map(|path| from_path(path.to_path_buf()))
.map(|(source, config)| {
smol::unblock(move || crate::plugins::config::load(&source, &config))
tokio::task::spawn_blocking(move || crate::plugins::config::load(&source, &config))
})
.buffered(num_cpus::get())
.filter_map(|x| async move { x })
.filter_map(|x| async move { x.ok().flatten() })
}
/// Loads all plugin information found in the given path.

View file

@ -6,6 +6,7 @@ pub mod load;
use std::{
io,
path::PathBuf,
process::Stdio,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
@ -15,10 +16,11 @@ use std::{
use crate::{Event, Indice, Plugin, PluginResponse, Request};
use async_oneshot::oneshot;
use flume::Sender;
use futures::{AsyncWriteExt, StreamExt};
use smol::{
process::{Child, Command, Stdio},
Task,
use futures::StreamExt;
use tokio::{
io::AsyncWriteExt,
process::{Child, Command},
task::JoinHandle,
};
use tracing::{event, Level};
@ -28,7 +30,7 @@ pub struct ExternalPlugin {
name: String,
pub cmd: PathBuf,
pub args: Vec<String>,
process: Option<(Task<()>, Child, async_oneshot::Sender<()>)>,
process: Option<(JoinHandle<()>, Child, async_oneshot::Sender<()>)>,
detached: Arc<AtomicBool>,
searching: Arc<AtomicBool>,
}
@ -53,7 +55,7 @@ impl ExternalPlugin {
}
}
pub fn launch(&mut self) -> Option<&mut (Task<()>, Child, async_oneshot::Sender<()>)> {
pub fn launch(&mut self) -> Option<&mut (JoinHandle<()>, Child, async_oneshot::Sender<()>)> {
event!(Level::DEBUG, "{}: launching plugin", self.name());
let child = Command::new(&self.cmd)
@ -74,7 +76,7 @@ impl ExternalPlugin {
let id = self.id;
// Spawn a background task to forward JSON responses from the child process.
let task = smol::spawn(async move {
let task = tokio::spawn(async move {
let tx_ = tx.clone();
let searching_ = searching.clone();
let name_ = name.clone();
@ -105,7 +107,13 @@ impl ExternalPlugin {
let _ = trip_rx.await;
};
let _ = crate::or(responder, trip).await;
futures::pin_mut!(responder);
futures::pin_mut!(trip);
let _ = futures::future::select(responder, trip)
.await
.factor_first()
.0;
// Ensure that a task that was searching sends a finished signal if it dies.
if searching.swap(false, Ordering::SeqCst) {
@ -128,9 +136,9 @@ impl ExternalPlugin {
pub async fn process_check(&mut self) {
if let Some(mut child) = self.process.take() {
match child.1.try_status() {
match child.1.try_wait() {
Err(_) | Ok(Some(_)) => {
child.0.cancel().await;
child.0.abort();
}
Ok(None) => self.process = Some(child),
}