From f39cdb37b5124514cc264bbc8eaea84f3b78dedb Mon Sep 17 00:00:00 2001 From: Lucy Date: Thu, 30 Jun 2022 10:35:02 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20Proper=20`WAYLAND=5FSOCKET`=20se?= =?UTF-8?q?tup,=20I=20think.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/comp.rs | 94 ++++++++++++++++++++++++++++++++++++++--------------- src/main.rs | 19 +++++++++-- 2 files changed, 84 insertions(+), 29 deletions(-) diff --git a/src/comp.rs b/src/comp.rs index f88a7f3..d31e090 100644 --- a/src/comp.rs +++ b/src/comp.rs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-only use crate::process::{ProcessEvent, ProcessHandler}; +use color_eyre::eyre::{ContextCompat, Result, WrapErr}; use sendfd::SendWithFd; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, os::unix::prelude::IntoRawFd}; @@ -23,6 +24,27 @@ pub enum Message { NewPrivilegedClient, } +pub fn create_privileged_socket( + sockets: &mut Vec, + env_vars: &[(String, String)], +) -> Result> { + let (comp_socket, client_socket) = + UnixStream::pair().wrap_err("failed to create socket pair")?; + sockets.push(comp_socket); + let client_fd = { + let std_stream = client_socket + .into_std() + .wrap_err("failed to convert client socket to std socket")?; + std_stream + .set_nonblocking(false) + .wrap_err("failed to mark client socket as blocking")?; + std_stream.into_raw_fd() + }; + let mut env_vars = env_vars.to_vec(); + env_vars.push(("WAYLAND_SOCKET".into(), client_fd.to_string())); + Ok(env_vars) +} + async fn receive_event(rx: &mut mpsc::UnboundedReceiver) -> Option<()> { match rx.recv().await? { ProcessEvent::Started => { @@ -48,12 +70,16 @@ async fn receive_event(rx: &mut mpsc::UnboundedReceiver) -> Option async fn receive_ipc( rx: &mut Lines>>, env_tx: &mut Option>>, -) -> Option<()> { - let line = rx +) -> Result> { + let line = match rx .next_line() .await - .expect("failed to get next line of ipc")?; - match serde_json::from_str::(&line).expect("invalid message from cosmic-comp") { + .wrap_err("failed to get next line of ipc")? + { + Some(line) => line, + None => return Ok(None), + }; + match serde_json::from_str::(&line).wrap_err("invalid message from cosmic-comp")? { Message::SetEnv { variables } => { if let Some(env_tx) = env_tx.take() { env_tx @@ -65,43 +91,56 @@ async fn receive_ipc( unreachable!("compositor should not send NewPrivilegedClient") } } - Some(()) + Ok(Some(())) } -async fn send_fd(session_tx: &mut WriteHalf<'_>, stream: UnixStream) { - let fd = { - let std_stream = stream - .into_std() - .expect("failed to convert stream to std stream"); - std_stream - .set_nonblocking(false) - .expect("failed to set stream as nonblocking"); - std_stream.into_raw_fd() - }; - let json = serde_json::to_string(&Message::NewPrivilegedClient).unwrap(); - session_tx.write_all(json.as_bytes()).await.unwrap(); - let _ = session_tx.write(&['\n' as u8]).await.unwrap(); +async fn send_fd(session_tx: &mut WriteHalf<'_>, stream: Vec) -> Result<()> { + let fds = stream + .into_iter() + .map(|stream| { + let std_stream = stream + .into_std() + .wrap_err("failed to convert stream to std stream")?; + std_stream + .set_nonblocking(false) + .wrap_err("failed to set stream as blocking")?; + Ok(std_stream.into_raw_fd()) + }) + .collect::>>() + .wrap_err("failed to convert streams to file descriptors")?; + let json = + serde_json::to_string(&Message::NewPrivilegedClient).wrap_err("failed to encode json")?; + session_tx + .write_all(json.as_bytes()) + .await + .wrap_err("failed to write json")?; + session_tx + .write_all(b"\n") + .await + .wrap_err("failed to write newline")?; let tx: &UnixStream = session_tx.as_ref(); - tx.send_with_fd(&[], &[fd]).expect("failed to send fd"); + tx.send_with_fd(&[], &fds).wrap_err("failed to send fd")?; + Ok(()) } pub async fn run_compositor( token: CancellationToken, - mut socket_rx: mpsc::UnboundedReceiver, + mut socket_rx: mpsc::UnboundedReceiver>, env_tx: oneshot::Sender>, -) { +) -> Result<()> { let mut env_tx = Some(env_tx); let (tx, mut rx) = unbounded_channel::(); - let (mut session, comp) = UnixStream::pair().expect("failed to create pair of unix sockets"); + let (mut session, comp) = + UnixStream::pair().wrap_err("failed to create pair of unix sockets")?; let (session_rx, mut session_tx) = session.split(); let mut session = BufReader::new(session_rx).lines(); let comp = { let std_stream = comp .into_std() - .expect("failed to convert compositor unix stream to a standard unix stream"); + .wrap_err("failed to convert compositor unix stream to a standard unix stream")?; std_stream .set_nonblocking(false) - .expect("failed to mark compositor unix stream as non-blocking"); + .wrap_err("failed to mark compositor unix stream as blocking")?; std_stream.into_raw_fd() }; ProcessHandler::new(tx, &token).run("cosmic-comp", vec![], vec![( @@ -113,12 +152,15 @@ pub async fn run_compositor( exit = receive_event(&mut rx) => if exit.is_none() { break; }, - exit = receive_ipc(&mut session, &mut env_tx) => if exit.is_none() { + exit = receive_ipc(&mut session, &mut env_tx) => if exit.wrap_err("failed to receive ipc message")?.is_none() { break; }, Some(socket) = socket_rx.recv() => { - send_fd(&mut session_tx, socket).await; + send_fd(&mut session_tx, socket) + .await + .wrap_err("failed to send file descriptor to compositor")?; } } } + Ok(()) } diff --git a/src/main.rs b/src/main.rs index 56b8bfd..24539f9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,23 +33,36 @@ async fn main() -> Result<()> { let token = CancellationToken::new(); let (env_tx, env_rx) = oneshot::channel(); let (socket_tx, socket_rx) = mpsc::unbounded_channel(); - tokio::spawn(comp::run_compositor(token.child_token(), socket_rx, env_tx)); + tokio::spawn({ + let token = token.child_token(); + async move { + if let Err(err) = comp::run_compositor(token, socket_rx, env_tx).await { + error!("compositor errored: {:?}", err); + } + } + }); let env_vars = env_rx .await .expect("failed to receive environmental variables"); info!("got environmental variables: {:?}", env_vars); + let mut sockets = Vec::with_capacity(2); + tokio::spawn(panel::run_panel( token.child_token(), "testing-panel", - env_vars.clone(), + comp::create_privileged_socket(&mut sockets, &env_vars) + .wrap_err("failed to create panel socket")?, )); tokio::spawn(panel::run_panel( token.child_token(), "testing-dock", - env_vars.clone(), + comp::create_privileged_socket(&mut sockets, &env_vars) + .wrap_err("failed to create dock socket")?, )); + socket_tx.send(sockets).unwrap(); + let mut signals = Signals::new(vec![libc::SIGTERM, libc::SIGINT]).unwrap(); while let Some(signal) = signals.next().await { match signal {