diff --git a/src/comp.rs b/src/comp.rs index 260f1f5..7b48fce 100644 --- a/src/comp.rs +++ b/src/comp.rs @@ -14,7 +14,10 @@ use tokio::{ unix::{OwnedReadHalf, OwnedWriteHalf}, UnixStream, }, - sync::mpsc::{self, unbounded_channel}, + sync::{ + mpsc::{self, unbounded_channel}, + oneshot, + }, }; use tokio_util::sync::CancellationToken; use tracing::Instrument; @@ -88,15 +91,18 @@ async fn receive_event(rx: &mut mpsc::UnboundedReceiver) -> Option // Cancellation safe! #[derive(Default)] struct IpcState { + env_tx: Option>>, length: Option, bytes_read: usize, buf: Vec, } -fn parse_and_handle_ipc(bytes: &[u8]) { - match serde_json::from_slice::(bytes) { +fn parse_and_handle_ipc(state: &mut IpcState) { + match serde_json::from_slice::(&state.buf) { Ok(Message::SetEnv { variables }) => { - debug!(?variables); + if let Some(env_tx) = state.env_tx.take() { + env_tx.send(variables).unwrap(); + } } Ok(Message::NewPrivilegedClient { .. }) => { unreachable!("NewPrivilegedClient should not be sent TO the session!"); @@ -119,7 +125,7 @@ async fn receive_ipc(state: &mut IpcState, rx: &mut OwnedReadHalf) -> Result<()> .await .wrap_err("failed to read IPC length")?; if state.bytes_read >= length as usize { - parse_and_handle_ipc(&state.buf); + parse_and_handle_ipc(state); state.length = None; state.bytes_read = 0; state.buf.clear(); @@ -183,6 +189,7 @@ async fn send_fd(session_tx: &mut OwnedWriteHalf, stream: Vec) -> Re pub fn run_compositor( token: CancellationToken, mut socket_rx: mpsc::UnboundedReceiver>, + env_tx: oneshot::Sender>, ) -> Result<()> { let (tx, mut rx) = unbounded_channel::(); let (session, comp) = UnixStream::pair().wrap_err("failed to create pair of unix sockets")?; @@ -207,7 +214,10 @@ pub fn run_compositor( vec![("COSMIC_SESSION_SOCK".into(), comp.to_string())], &span, ); - let mut ipc_state = IpcState::default(); + let mut ipc_state = IpcState { + env_tx: Some(env_tx), + ..IpcState::default() + }; loop { tokio::select! { exit = receive_event(&mut rx) => if exit.is_none() { diff --git a/src/main.rs b/src/main.rs index ae7e4a5..b5733fd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ mod process; use async_signals::Signals; use color_eyre::{eyre::WrapErr, Result}; use futures_util::StreamExt; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tracing::metadata::LevelFilter; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -32,11 +32,15 @@ async fn main() -> Result<()> { let token = CancellationToken::new(); let (socket_tx, socket_rx) = mpsc::unbounded_channel(); - if let Err(err) = comp::run_compositor(token.child_token(), socket_rx) { + let (env_tx, env_rx) = oneshot::channel(); + if let Err(err) = comp::run_compositor(token.child_token(), socket_rx, env_tx) { error!("compositor errored: {:?}", err); } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - let env_vars = Vec::new(); + let env_vars = env_rx + .await + .expect("failed to receive environmental variables") + .into_iter() + .collect::>(); info!("got environmental variables: {:?}", env_vars); let mut sockets = Vec::with_capacity(2);