refactor: Favor futures crate over futures-lite to reduce dependencies

This commit is contained in:
Michael Aaron Murphy 2022-03-27 16:54:26 +02:00 committed by Michael Murphy
parent b770b59e7b
commit 4153f9f060
18 changed files with 116 additions and 107 deletions

View file

@ -8,9 +8,7 @@ pub use client::*;
use crate::plugins::*;
use flume::{Receiver, Sender};
use futures::SinkExt;
use futures_core::Stream;
use futures_lite::{future, StreamExt};
use futures::{future, SinkExt, Stream, StreamExt};
use pop_launcher::*;
use regex::Regex;
use slab::Slab;
@ -36,12 +34,14 @@ pub struct PluginHelp {
pub async fn main() {
// Listens for a stream of requests from stdin.
let input_stream = json_input_stream(async_stdin()).filter_map(|result| match result {
Ok(request) => Some(request),
Err(why) => {
tracing::error!("malformed JSON input: {}", why);
None
}
let input_stream = json_input_stream(async_stdin()).filter_map(|result| {
future::ready(match result {
Ok(request) => Some(request),
Err(why) => {
tracing::error!("malformed JSON input: {}", why);
None
}
})
});
let (output_tx, output_rx) = flume::bounded(16);
@ -59,7 +59,7 @@ pub async fn main() {
}
};
futures_lite::future::zip(service, responder).await;
futures::future::join(service, responder).await;
}
pub struct Service<O> {
@ -91,7 +91,7 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
let (service_tx, service_rx) = flume::bounded(1);
let stream = plugins::external::load::from_paths();
futures_lite::pin!(stream);
futures::pin_mut!(stream);
while let Some((exec, config, regex)) = stream.next().await {
tracing::info!("found plugin \"{}\"", exec.display());
@ -121,7 +121,10 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
let f1 = request_handler(input, service_tx);
let f2 = self.response_handler(service_rx);
future::or(f1, f2).await;
futures::pin_mut!(f1);
futures::pin_mut!(f2);
futures::future::select(f1, f2).await.factor_first().0;
}
async fn response_handler(&mut self, service_rx: Receiver<Event>) {
@ -574,7 +577,7 @@ impl<O: futures::Sink<Response> + Unpin> Service<O> {
async fn request_handler(input: impl Stream<Item = Request>, tx: Sender<Event>) {
let mut requested_to_exit = false;
futures_lite::pin!(input);
futures::pin_mut!(input);
while let Some(request) = input.next().await {
if let Request::Exit = request {

View file

@ -15,7 +15,7 @@ use std::{
use crate::{Event, Indice, Plugin, PluginResponse, Request};
use async_oneshot::oneshot;
use flume::Sender;
use futures_lite::{AsyncWriteExt, FutureExt, StreamExt};
use futures::{AsyncWriteExt, StreamExt};
use smol::{
process::{Child, Command, Stdio},
Task,
@ -105,7 +105,7 @@ impl ExternalPlugin {
let _ = trip_rx.await;
};
let _ = responder.or(trip).await;
let _ = crate::or(responder, trip).await;
// Ensure that a task that was searching sends a finished signal if it dies.
if searching.swap(false, Ordering::SeqCst) {