Wait for SetEnv

This commit is contained in:
Lucy 2022-07-11 10:47:00 -04:00
parent de3ab99743
commit 07c811ba97
No known key found for this signature in database
GPG key ID: EBC517FAD666BBF1
2 changed files with 24 additions and 10 deletions

View file

@ -14,7 +14,10 @@ use tokio::{
unix::{OwnedReadHalf, OwnedWriteHalf},
UnixStream,
},
sync::mpsc::{self, unbounded_channel},
sync::{
mpsc::{self, unbounded_channel},
oneshot,
},
};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
@ -88,15 +91,18 @@ async fn receive_event(rx: &mut mpsc::UnboundedReceiver<ProcessEvent>) -> Option
// Cancellation safe!
#[derive(Default)]
struct IpcState {
env_tx: Option<oneshot::Sender<HashMap<String, String>>>,
length: Option<u16>,
bytes_read: usize,
buf: Vec<u8>,
}
fn parse_and_handle_ipc(bytes: &[u8]) {
match serde_json::from_slice::<Message>(bytes) {
fn parse_and_handle_ipc(state: &mut IpcState) {
match serde_json::from_slice::<Message>(&state.buf) {
Ok(Message::SetEnv { variables }) => {
debug!(?variables);
if let Some(env_tx) = state.env_tx.take() {
env_tx.send(variables).unwrap();
}
}
Ok(Message::NewPrivilegedClient { .. }) => {
unreachable!("NewPrivilegedClient should not be sent TO the session!");
@ -119,7 +125,7 @@ async fn receive_ipc(state: &mut IpcState, rx: &mut OwnedReadHalf) -> Result<()>
.await
.wrap_err("failed to read IPC length")?;
if state.bytes_read >= length as usize {
parse_and_handle_ipc(&state.buf);
parse_and_handle_ipc(state);
state.length = None;
state.bytes_read = 0;
state.buf.clear();
@ -183,6 +189,7 @@ async fn send_fd(session_tx: &mut OwnedWriteHalf, stream: Vec<UnixStream>) -> Re
pub fn run_compositor(
token: CancellationToken,
mut socket_rx: mpsc::UnboundedReceiver<Vec<UnixStream>>,
env_tx: oneshot::Sender<HashMap<String, String>>,
) -> Result<()> {
let (tx, mut rx) = unbounded_channel::<ProcessEvent>();
let (session, comp) = UnixStream::pair().wrap_err("failed to create pair of unix sockets")?;
@ -207,7 +214,10 @@ pub fn run_compositor(
vec![("COSMIC_SESSION_SOCK".into(), comp.to_string())],
&span,
);
let mut ipc_state = IpcState::default();
let mut ipc_state = IpcState {
env_tx: Some(env_tx),
..IpcState::default()
};
loop {
tokio::select! {
exit = receive_event(&mut rx) => if exit.is_none() {

View file

@ -9,7 +9,7 @@ mod process;
use async_signals::Signals;
use color_eyre::{eyre::WrapErr, Result};
use futures_util::StreamExt;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::metadata::LevelFilter;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
@ -32,11 +32,15 @@ async fn main() -> Result<()> {
let token = CancellationToken::new();
let (socket_tx, socket_rx) = mpsc::unbounded_channel();
if let Err(err) = comp::run_compositor(token.child_token(), socket_rx) {
let (env_tx, env_rx) = oneshot::channel();
if let Err(err) = comp::run_compositor(token.child_token(), socket_rx, env_tx) {
error!("compositor errored: {:?}", err);
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let env_vars = Vec::new();
let env_vars = env_rx
.await
.expect("failed to receive environmental variables")
.into_iter()
.collect::<Vec<_>>();
info!("got environmental variables: {:?}", env_vars);
let mut sockets = Vec::with_capacity(2);