🚧 More IPC stuff

This commit is contained in:
Lucy 2022-06-28 16:17:43 -04:00
parent a0c05bcb66
commit 97fa3df483
No known key found for this signature in database
GPG key ID: EBC517FAD666BBF1
3 changed files with 34 additions and 7 deletions

View file

@ -12,6 +12,7 @@ async-signals = "0.4"
color-eyre = "0.6" color-eyre = "0.6"
futures-util = "0.3" futures-util = "0.3"
libc = "0.2" libc = "0.2"
sendfd = { version = "0.4.1", features = ["tokio"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }

View file

@ -1,10 +1,14 @@
// SPDX-License-Identifier: GPL-3.0-only // SPDX-License-Identifier: GPL-3.0-only
use crate::process::{ProcessEvent, ProcessHandler}; use crate::process::{ProcessEvent, ProcessHandler};
use sendfd::SendWithFd;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{collections::HashMap, os::unix::prelude::IntoRawFd}; use std::{collections::HashMap, os::unix::prelude::IntoRawFd};
use tokio::{ use tokio::{
io::{AsyncBufReadExt, BufReader, Lines}, io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines},
net::{unix::ReadHalf, UnixStream}, net::{
unix::{ReadHalf, WriteHalf},
UnixStream,
},
sync::{ sync::{
mpsc::{self, unbounded_channel}, mpsc::{self, unbounded_channel},
oneshot, oneshot,
@ -16,7 +20,7 @@ use tokio_util::sync::CancellationToken;
#[serde(rename_all = "snake_case", tag = "message")] #[serde(rename_all = "snake_case", tag = "message")]
pub enum Message { pub enum Message {
SetEnv { variables: HashMap<String, String> }, SetEnv { variables: HashMap<String, String> },
NewPrivilegedClient { fd: u32 }, NewPrivilegedClient,
} }
async fn receive_event(rx: &mut mpsc::UnboundedReceiver<ProcessEvent>) -> Option<()> { async fn receive_event(rx: &mut mpsc::UnboundedReceiver<ProcessEvent>) -> Option<()> {
@ -57,21 +61,39 @@ async fn receive_ipc(
.expect("failed to send environmental variables"); .expect("failed to send environmental variables");
} }
} }
Message::NewPrivilegedClient { fd } => { Message::NewPrivilegedClient => {
unreachable!("compositor should not send NewPrivilegedClient") unreachable!("compositor should not send NewPrivilegedClient")
} }
} }
Some(()) Some(())
} }
async fn send_fd(session_tx: &mut WriteHalf<'_>, stream: UnixStream) {
let fd = {
let std_stream = stream
.into_std()
.expect("failed to convert stream to std stream");
std_stream
.set_nonblocking(false)
.expect("failed to set stream as nonblocking");
std_stream.into_raw_fd()
};
let json = serde_json::to_string(&Message::NewPrivilegedClient).unwrap();
session_tx.write_all(json.as_bytes()).await.unwrap();
let _ = session_tx.write(&['\n' as u8]).await.unwrap();
let tx: &UnixStream = session_tx.as_ref();
tx.send_with_fd(&[], &[fd]).expect("failed to send fd");
}
pub async fn run_compositor( pub async fn run_compositor(
token: CancellationToken, token: CancellationToken,
mut socket_rx: mpsc::UnboundedReceiver<UnixStream>,
env_tx: oneshot::Sender<Vec<(String, String)>>, env_tx: oneshot::Sender<Vec<(String, String)>>,
) { ) {
let mut env_tx = Some(env_tx); let mut env_tx = Some(env_tx);
let (tx, mut rx) = unbounded_channel::<ProcessEvent>(); let (tx, mut rx) = unbounded_channel::<ProcessEvent>();
let (mut session, comp) = UnixStream::pair().expect("failed to create pair of unix sockets"); let (mut session, comp) = UnixStream::pair().expect("failed to create pair of unix sockets");
let (session_rx, session_tx) = session.split(); let (session_rx, mut session_tx) = session.split();
let mut session = BufReader::new(session_rx).lines(); let mut session = BufReader::new(session_rx).lines();
let comp = { let comp = {
let std_stream = comp let std_stream = comp
@ -93,6 +115,9 @@ pub async fn run_compositor(
}, },
exit = receive_ipc(&mut session, &mut env_tx) => if exit.is_none() { exit = receive_ipc(&mut session, &mut env_tx) => if exit.is_none() {
break; break;
},
Some(socket) = socket_rx.recv() => {
send_fd(&mut session_tx, socket).await;
} }
} }
} }

View file

@ -9,7 +9,7 @@ mod process;
use async_signals::Signals; use async_signals::Signals;
use color_eyre::{eyre::WrapErr, Result}; use color_eyre::{eyre::WrapErr, Result};
use futures_util::StreamExt; use futures_util::StreamExt;
use tokio::sync::oneshot; use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::metadata::LevelFilter; use tracing::metadata::LevelFilter;
use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use tracing_subscriber::{fmt, prelude::*, EnvFilter};
@ -32,7 +32,8 @@ async fn main() -> Result<()> {
let token = CancellationToken::new(); let token = CancellationToken::new();
let (env_tx, env_rx) = oneshot::channel(); let (env_tx, env_rx) = oneshot::channel();
tokio::spawn(comp::run_compositor(token.child_token(), env_tx)); let (socket_tx, socket_rx) = mpsc::unbounded_channel();
tokio::spawn(comp::run_compositor(token.child_token(), socket_rx, env_tx));
let env_vars = env_rx let env_vars = env_rx
.await .await
.expect("failed to receive environmental variables"); .expect("failed to receive environmental variables");