🚧 Implement compositor IPC further

This commit is contained in:
Lucy 2022-06-28 13:40:16 -04:00
parent 09b03fda1a
commit a0c05bcb66
No known key found for this signature in database
GPG key ID: EBC517FAD666BBF1
3 changed files with 44 additions and 25 deletions

View file

@ -1,10 +1,10 @@
use std::os::unix::prelude::IntoRawFd;
// SPDX-License-Identifier: GPL-3.0-only // SPDX-License-Identifier: GPL-3.0-only
use crate::process::{ProcessEvent, ProcessHandler}; use crate::process::{ProcessEvent, ProcessHandler};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, os::unix::prelude::IntoRawFd};
use tokio::{ use tokio::{
io::{AsyncBufReadExt, BufReader, Lines}, io::{AsyncBufReadExt, BufReader, Lines},
net::UnixStream, net::{unix::ReadHalf, UnixStream},
sync::{ sync::{
mpsc::{self, unbounded_channel}, mpsc::{self, unbounded_channel},
oneshot, oneshot,
@ -12,6 +12,13 @@ use tokio::{
}; };
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "message")]
pub enum Message {
SetEnv { variables: HashMap<String, String> },
NewPrivilegedClient { fd: u32 },
}
async fn receive_event(rx: &mut mpsc::UnboundedReceiver<ProcessEvent>) -> Option<()> { async fn receive_event(rx: &mut mpsc::UnboundedReceiver<ProcessEvent>) -> Option<()> {
match rx.recv().await? { match rx.recv().await? {
ProcessEvent::Started => { ProcessEvent::Started => {
@ -34,20 +41,38 @@ async fn receive_event(rx: &mut mpsc::UnboundedReceiver<ProcessEvent>) -> Option
} }
} }
async fn receive_ipc(rx: &mut Lines<BufReader<UnixStream>>) -> Option<()> { async fn receive_ipc(
rx: &mut Lines<BufReader<ReadHalf<'_>>>,
env_tx: &mut Option<oneshot::Sender<Vec<(String, String)>>>,
) -> Option<()> {
let line = rx let line = rx
.next_line() .next_line()
.await .await
.expect("failed to get next line of ipc")?; .expect("failed to get next line of ipc")?;
let message = serde_json::from_str::<()>(&line).expect("invalid message from cosmic-comp"); match serde_json::from_str::<Message>(&line).expect("invalid message from cosmic-comp") {
Message::SetEnv { variables } => {
if let Some(env_tx) = env_tx.take() {
env_tx
.send(variables.into_iter().collect())
.expect("failed to send environmental variables");
}
}
Message::NewPrivilegedClient { fd } => {
unreachable!("compositor should not send NewPrivilegedClient")
}
}
Some(()) Some(())
} }
pub async fn run_compositor(token: CancellationToken, wayland_display_tx: oneshot::Sender<String>) { pub async fn run_compositor(
let mut wayland_display_tx = Some(wayland_display_tx); token: CancellationToken,
env_tx: oneshot::Sender<Vec<(String, String)>>,
) {
let mut env_tx = Some(env_tx);
let (tx, mut rx) = unbounded_channel::<ProcessEvent>(); let (tx, mut rx) = unbounded_channel::<ProcessEvent>();
let (session, comp) = UnixStream::pair().expect("failed to create pair of unix sockets"); let (mut session, comp) = UnixStream::pair().expect("failed to create pair of unix sockets");
let mut session = BufReader::new(session).lines(); let (session_rx, session_tx) = session.split();
let mut session = BufReader::new(session_rx).lines();
let comp = { let comp = {
let std_stream = comp let std_stream = comp
.into_std() .into_std()
@ -66,7 +91,7 @@ pub async fn run_compositor(token: CancellationToken, wayland_display_tx: onesho
exit = receive_event(&mut rx) => if exit.is_none() { exit = receive_event(&mut rx) => if exit.is_none() {
break; break;
}, },
exit = receive_ipc(&mut session) => if exit.is_none() { exit = receive_ipc(&mut session, &mut env_tx) => if exit.is_none() {
break; break;
} }
} }

View file

@ -31,25 +31,22 @@ async fn main() -> Result<()> {
info!("Starting cosmic-session"); info!("Starting cosmic-session");
let token = CancellationToken::new(); let token = CancellationToken::new();
let (wayland_display_tx, wayland_display_rx) = oneshot::channel(); let (env_tx, env_rx) = oneshot::channel();
tokio::spawn(comp::run_compositor( tokio::spawn(comp::run_compositor(token.child_token(), env_tx));
token.child_token(), let env_vars = env_rx
wayland_display_tx,
));
let wayland_display = wayland_display_rx
.await .await
.expect("failed to get WAYLAND_DISPLAY"); .expect("failed to receive environmental variables");
info!("got WAYLAND_DISPLAY: {}", wayland_display); info!("got environmental variables: {:?}", env_vars);
tokio::spawn(panel::run_panel( tokio::spawn(panel::run_panel(
token.child_token(), token.child_token(),
"testing-panel", "testing-panel",
wayland_display.clone(), env_vars.clone(),
)); ));
tokio::spawn(panel::run_panel( tokio::spawn(panel::run_panel(
token.child_token(), token.child_token(),
"testing-dock", "testing-dock",
wayland_display.clone(), env_vars.clone(),
)); ));
let mut signals = Signals::new(vec![libc::SIGTERM, libc::SIGINT]).unwrap(); let mut signals = Signals::new(vec![libc::SIGTERM, libc::SIGINT]).unwrap();

View file

@ -3,12 +3,9 @@ use crate::process::{ProcessEvent, ProcessHandler};
use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::unbounded_channel;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
pub async fn run_panel(token: CancellationToken, config: &str, wayland_display: String) { pub async fn run_panel(token: CancellationToken, config: &str, env_vars: Vec<(String, String)>) {
let (tx, mut rx) = unbounded_channel::<ProcessEvent>(); let (tx, mut rx) = unbounded_channel::<ProcessEvent>();
ProcessHandler::new(tx, &token).run("cosmic-panel", vec![config.to_string()], vec![( ProcessHandler::new(tx, &token).run("cosmic-panel", vec![config.to_string()], env_vars);
"WAYLAND_DISPLAY".into(),
wayland_display,
)]);
while let Some(event) = rx.recv().await { while let Some(event) = rx.recv().await {
match event { match event {
ProcessEvent::Started => { ProcessEvent::Started => {