From 09b03fda1ac02ad384a0d41cf0f482cd17f66564 Mon Sep 17 00:00:00 2001 From: Lucy Date: Mon, 27 Jun 2022 14:17:31 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20Work=20on=20session<->comp=20IPC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 4 ++- src/comp.rs | 82 ++++++++++++++++++++++++++++++++++------------------- 2 files changed, 56 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fe9130a..4d71f40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,10 @@ publish = false [dependencies] async-signals = "0.4" color-eyre = "0.6" -futures-util = "0.3.21" +futures-util = "0.3" libc = "0.2" +serde = { version = "1", features = ["derive"] } +serde_json = "1" tokio = { version = "1", features = ["full"] } tokio-util = "0.7" tracing = "0.1" diff --git a/src/comp.rs b/src/comp.rs index 211e821..759bcdf 100644 --- a/src/comp.rs +++ b/src/comp.rs @@ -3,47 +3,71 @@ use std::os::unix::prelude::IntoRawFd; // SPDX-License-Identifier: GPL-3.0-only use crate::process::{ProcessEvent, ProcessHandler}; use tokio::{ + io::{AsyncBufReadExt, BufReader, Lines}, net::UnixStream, - sync::{mpsc::unbounded_channel, oneshot}, + sync::{ + mpsc::{self, unbounded_channel}, + oneshot, + }, }; use tokio_util::sync::CancellationToken; +async fn receive_event(rx: &mut mpsc::UnboundedReceiver) -> Option<()> { + match rx.recv().await? { + ProcessEvent::Started => { + info!("started"); + Some(()) + } + // cosmic-comp outputs everything to stderr because slog + ProcessEvent::Stdout(line) | ProcessEvent::Stderr(line) => { + info!("{}", line); + Some(()) + } + ProcessEvent::Ended(Some(status)) => { + error!("exited with status {}", status); + None + } + ProcessEvent::Ended(None) => { + error!("exited"); + None + } + } +} + +async fn receive_ipc(rx: &mut Lines>) -> Option<()> { + let line = rx + .next_line() + .await + .expect("failed to get next line of ipc")?; + let message = serde_json::from_str::<()>(&line).expect("invalid message from cosmic-comp"); + Some(()) +} + pub async fn run_compositor(token: CancellationToken, wayland_display_tx: oneshot::Sender) { let mut wayland_display_tx = Some(wayland_display_tx); let (tx, mut rx) = unbounded_channel::(); let (session, comp) = UnixStream::pair().expect("failed to create pair of unix sockets"); - let comp = comp.into_std().unwrap().into_raw_fd(); + let mut session = BufReader::new(session).lines(); + let comp = { + let std_stream = comp + .into_std() + .expect("failed to convert compositor unix stream to a standard unix stream"); + std_stream + .set_nonblocking(false) + .expect("failed to mark compositor unix stream as non-blocking"); + std_stream.into_raw_fd() + }; ProcessHandler::new(tx, &token).run("cosmic-comp", vec![], vec![( "COSMIC_SESSION_SOCK".into(), comp.to_string(), )]); - while let Some(event) = rx.recv().await { - match event { - ProcessEvent::Started => { - info!("started"); - } - // cosmic-comp outputs everything to stderr because slog - ProcessEvent::Stdout(line) | ProcessEvent::Stderr(line) => { - if line.contains("Listening on \"") { - // Message format: Listening on "wayland-0" - if let Some(tx) = wayland_display_tx.take() { - let socket_name = line - .split('"') - .nth(1) - .expect("failed to get WAYLAND_DISPLAY"); - tx.send(socket_name.to_string()) - .expect("failed to send WAYLAND_DISPLAY back to main app"); - } - } - info!("{}", line); - } - ProcessEvent::Ended(Some(status)) => { - error!("exited with status {}", status); - return; - } - ProcessEvent::Ended(None) => { - error!("exited"); - return; + loop { + tokio::select! { + exit = receive_event(&mut rx) => if exit.is_none() { + break; + }, + exit = receive_ipc(&mut session) => if exit.is_none() { + break; } } }