From a0091235a4d7e79d663a49e7e3e9eb01a7a0c8c6 Mon Sep 17 00:00:00 2001 From: Lucy Date: Wed, 6 Jul 2022 15:32:21 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=97=91=EF=B8=8F=20Code=20cleanup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/comp.rs | 69 ++++++++++------------------------------------------- src/main.rs | 8 ++----- 2 files changed, 15 insertions(+), 62 deletions(-) diff --git a/src/comp.rs b/src/comp.rs index 200b890..7448f5b 100644 --- a/src/comp.rs +++ b/src/comp.rs @@ -4,31 +4,21 @@ use color_eyre::eyre::{ContextCompat, Result, WrapErr}; use nix::fcntl; use sendfd::SendWithFd; use serde::{Deserialize, Serialize}; -use std::{ - collections::HashMap, - os::unix::prelude::{AsRawFd, IntoRawFd}, -}; +use std::os::unix::prelude::{AsRawFd, IntoRawFd}; use tokio::{ - io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines}, - net::{ - unix::{ReadHalf, WriteHalf}, - UnixStream, - }, - sync::{ - mpsc::{self, unbounded_channel}, - oneshot, - }, + io::AsyncWriteExt, + net::UnixStream, + sync::mpsc::{self, unbounded_channel}, }; use tokio_util::sync::CancellationToken; #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case", tag = "message")] pub enum Message { - SetEnv { variables: HashMap }, NewPrivilegedClient { count: usize }, } -fn mark_as_cloexec(stream: &UnixStream) -> Result<()> { +fn mark_as_not_cloexec(stream: &UnixStream) -> Result<()> { let raw_fd = stream.as_raw_fd(); let fd_flags = fcntl::FdFlag::from_bits( fcntl::fcntl(raw_fd, fcntl::FcntlArg::F_GETFD) @@ -51,7 +41,7 @@ pub fn create_privileged_socket( UnixStream::pair().wrap_err("failed to create socket pair")?; sockets.push(comp_socket); let client_fd = { - mark_as_cloexec(&client_socket).wrap_err("failed to mark client stream as CLOEXEC")?; + mark_as_not_cloexec(&client_socket).wrap_err("failed to mark client stream as CLOEXEC")?; let std_stream = client_socket .into_std() .wrap_err("failed to convert client socket to std socket")?; @@ -87,38 +77,11 @@ async fn receive_event(rx: &mut mpsc::UnboundedReceiver) -> Option } } -async fn receive_ipc( - rx: &mut Lines>>, - env_tx: &mut Option>>, -) -> Result> { - let line = match rx - .next_line() - .await - .wrap_err("failed to get next line of ipc")? - { - Some(line) => line, - None => return Ok(None), - }; - match serde_json::from_str::(&line).wrap_err("invalid message from cosmic-comp")? { - Message::SetEnv { variables } => { - if let Some(env_tx) = env_tx.take() { - env_tx - .send(variables.into_iter().collect()) - .expect("failed to send environmental variables"); - } - } - Message::NewPrivilegedClient { .. } => { - unreachable!("compositor should not send NewPrivilegedClient") - } - } - Ok(Some(())) -} - -async fn send_fd(session_tx: &mut WriteHalf<'_>, stream: Vec) -> Result<()> { +async fn send_fd(session_tx: &mut UnixStream, stream: Vec) -> Result<()> { let fds = stream .into_iter() .map(|stream| { - mark_as_cloexec(&stream).wrap_err("failed to mark stream as CLOEXEC")?; + mark_as_not_cloexec(&stream).wrap_err("failed to mark stream as CLOEXEC")?; let std_stream = stream .into_std() .wrap_err("failed to convert stream to std stream")?; @@ -140,8 +103,9 @@ async fn send_fd(session_tx: &mut WriteHalf<'_>, stream: Vec) -> Res .await .wrap_err("failed to write json")?; tokio::time::sleep(std::time::Duration::from_micros(100)).await; - let tx: &UnixStream = session_tx.as_ref(); - tx.send_with_fd(&[0], &fds).wrap_err("failed to send fd")?; + session_tx + .send_with_fd(&[0], &fds) + .wrap_err("failed to send fd")?; info!("sent {} fds", fds.len()); Ok(()) } @@ -149,16 +113,12 @@ async fn send_fd(session_tx: &mut WriteHalf<'_>, stream: Vec) -> Res pub async fn run_compositor( token: CancellationToken, mut socket_rx: mpsc::UnboundedReceiver>, - env_tx: oneshot::Sender>, ) -> Result<()> { - let mut env_tx = Some(env_tx); let (tx, mut rx) = unbounded_channel::(); let (mut session, comp) = UnixStream::pair().wrap_err("failed to create pair of unix sockets")?; - let (session_rx, mut session_tx) = session.split(); - let mut session = BufReader::new(session_rx).lines(); let comp = { - mark_as_cloexec(&comp).wrap_err("failed to mark compositor stream as CLOEXEC")?; + mark_as_not_cloexec(&comp).wrap_err("failed to mark compositor stream as CLOEXEC")?; let std_stream = comp .into_std() .wrap_err("failed to convert compositor unix stream to a standard unix stream")?; @@ -176,11 +136,8 @@ pub async fn run_compositor( exit = receive_event(&mut rx) => if exit.is_none() { break; }, - exit = receive_ipc(&mut session, &mut env_tx) => if exit.wrap_err("failed to receive ipc message")?.is_none() { - break; - }, Some(socket) = socket_rx.recv() => { - send_fd(&mut session_tx, socket) + send_fd(&mut session, socket) .await .wrap_err("failed to send file descriptor to compositor")?; } diff --git a/src/main.rs b/src/main.rs index 90e3ad1..5a05275 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ mod process; use async_signals::Signals; use color_eyre::{eyre::WrapErr, Result}; use futures_util::StreamExt; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::metadata::LevelFilter; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -31,19 +31,15 @@ async fn main() -> Result<()> { info!("Starting cosmic-session"); let token = CancellationToken::new(); - let (env_tx, env_rx) = oneshot::channel(); let (socket_tx, socket_rx) = mpsc::unbounded_channel(); tokio::spawn({ let token = token.child_token(); async move { - if let Err(err) = comp::run_compositor(token, socket_rx, env_tx).await { + if let Err(err) = comp::run_compositor(token, socket_rx).await { error!("compositor errored: {:?}", err); } } }); - // let env_vars = env_rx - // .await - // .expect("failed to receive environmental variables"); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let env_vars = Vec::new(); info!("got environmental variables: {:?}", env_vars);