🚧 Work on session<->comp IPC

This commit is contained in:
Lucy 2022-06-27 14:17:31 -04:00
parent a732dafc36
commit 09b03fda1a
No known key found for this signature in database
GPG key ID: EBC517FAD666BBF1
2 changed files with 56 additions and 30 deletions

View file

@ -10,8 +10,10 @@ publish = false
[dependencies]
async-signals = "0.4"
color-eyre = "0.6"
futures-util = "0.3.21"
futures-util = "0.3"
libc = "0.2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
tracing = "0.1"

View file

@ -3,47 +3,71 @@ use std::os::unix::prelude::IntoRawFd;
// SPDX-License-Identifier: GPL-3.0-only
use crate::process::{ProcessEvent, ProcessHandler};
use tokio::{
io::{AsyncBufReadExt, BufReader, Lines},
net::UnixStream,
sync::{mpsc::unbounded_channel, oneshot},
sync::{
mpsc::{self, unbounded_channel},
oneshot,
},
};
use tokio_util::sync::CancellationToken;
async fn receive_event(rx: &mut mpsc::UnboundedReceiver<ProcessEvent>) -> Option<()> {
match rx.recv().await? {
ProcessEvent::Started => {
info!("started");
Some(())
}
// cosmic-comp outputs everything to stderr because slog
ProcessEvent::Stdout(line) | ProcessEvent::Stderr(line) => {
info!("{}", line);
Some(())
}
ProcessEvent::Ended(Some(status)) => {
error!("exited with status {}", status);
None
}
ProcessEvent::Ended(None) => {
error!("exited");
None
}
}
}
async fn receive_ipc(rx: &mut Lines<BufReader<UnixStream>>) -> Option<()> {
let line = rx
.next_line()
.await
.expect("failed to get next line of ipc")?;
let message = serde_json::from_str::<()>(&line).expect("invalid message from cosmic-comp");
Some(())
}
pub async fn run_compositor(token: CancellationToken, wayland_display_tx: oneshot::Sender<String>) {
let mut wayland_display_tx = Some(wayland_display_tx);
let (tx, mut rx) = unbounded_channel::<ProcessEvent>();
let (session, comp) = UnixStream::pair().expect("failed to create pair of unix sockets");
let comp = comp.into_std().unwrap().into_raw_fd();
let mut session = BufReader::new(session).lines();
let comp = {
let std_stream = comp
.into_std()
.expect("failed to convert compositor unix stream to a standard unix stream");
std_stream
.set_nonblocking(false)
.expect("failed to mark compositor unix stream as non-blocking");
std_stream.into_raw_fd()
};
ProcessHandler::new(tx, &token).run("cosmic-comp", vec![], vec![(
"COSMIC_SESSION_SOCK".into(),
comp.to_string(),
)]);
while let Some(event) = rx.recv().await {
match event {
ProcessEvent::Started => {
info!("started");
}
// cosmic-comp outputs everything to stderr because slog
ProcessEvent::Stdout(line) | ProcessEvent::Stderr(line) => {
if line.contains("Listening on \"") {
// Message format: Listening on "wayland-0"
if let Some(tx) = wayland_display_tx.take() {
let socket_name = line
.split('"')
.nth(1)
.expect("failed to get WAYLAND_DISPLAY");
tx.send(socket_name.to_string())
.expect("failed to send WAYLAND_DISPLAY back to main app");
}
}
info!("{}", line);
}
ProcessEvent::Ended(Some(status)) => {
error!("exited with status {}", status);
return;
}
ProcessEvent::Ended(None) => {
error!("exited");
return;
loop {
tokio::select! {
exit = receive_event(&mut rx) => if exit.is_none() {
break;
},
exit = receive_ipc(&mut session) => if exit.is_none() {
break;
}
}
}