diff --git a/Cargo.toml b/Cargo.toml index 4d71f40..eb78aef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ async-signals = "0.4" color-eyre = "0.6" futures-util = "0.3" libc = "0.2" +sendfd = { version = "0.4.1", features = ["tokio"] } serde = { version = "1", features = ["derive"] } serde_json = "1" tokio = { version = "1", features = ["full"] } diff --git a/src/comp.rs b/src/comp.rs index 666ec1b..f88a7f3 100644 --- a/src/comp.rs +++ b/src/comp.rs @@ -1,10 +1,14 @@ // SPDX-License-Identifier: GPL-3.0-only use crate::process::{ProcessEvent, ProcessHandler}; +use sendfd::SendWithFd; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, os::unix::prelude::IntoRawFd}; use tokio::{ - io::{AsyncBufReadExt, BufReader, Lines}, - net::{unix::ReadHalf, UnixStream}, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines}, + net::{ + unix::{ReadHalf, WriteHalf}, + UnixStream, + }, sync::{ mpsc::{self, unbounded_channel}, oneshot, @@ -16,7 +20,7 @@ use tokio_util::sync::CancellationToken; #[serde(rename_all = "snake_case", tag = "message")] pub enum Message { SetEnv { variables: HashMap }, - NewPrivilegedClient { fd: u32 }, + NewPrivilegedClient, } async fn receive_event(rx: &mut mpsc::UnboundedReceiver) -> Option<()> { @@ -57,21 +61,39 @@ async fn receive_ipc( .expect("failed to send environmental variables"); } } - Message::NewPrivilegedClient { fd } => { + Message::NewPrivilegedClient => { unreachable!("compositor should not send NewPrivilegedClient") } } Some(()) } +async fn send_fd(session_tx: &mut WriteHalf<'_>, stream: UnixStream) { + let fd = { + let std_stream = stream + .into_std() + .expect("failed to convert stream to std stream"); + std_stream + .set_nonblocking(false) + .expect("failed to set stream as nonblocking"); + std_stream.into_raw_fd() + }; + let json = serde_json::to_string(&Message::NewPrivilegedClient).unwrap(); + session_tx.write_all(json.as_bytes()).await.unwrap(); + let _ = session_tx.write(&['\n' as u8]).await.unwrap(); + let tx: &UnixStream = session_tx.as_ref(); + tx.send_with_fd(&[], &[fd]).expect("failed to send fd"); +} + pub async fn run_compositor( token: CancellationToken, + mut socket_rx: mpsc::UnboundedReceiver, env_tx: oneshot::Sender>, ) { let mut env_tx = Some(env_tx); let (tx, mut rx) = unbounded_channel::(); let (mut session, comp) = UnixStream::pair().expect("failed to create pair of unix sockets"); - let (session_rx, session_tx) = session.split(); + let (session_rx, mut session_tx) = session.split(); let mut session = BufReader::new(session_rx).lines(); let comp = { let std_stream = comp @@ -93,6 +115,9 @@ pub async fn run_compositor( }, exit = receive_ipc(&mut session, &mut env_tx) => if exit.is_none() { break; + }, + Some(socket) = socket_rx.recv() => { + send_fd(&mut session_tx, socket).await; } } } diff --git a/src/main.rs b/src/main.rs index f0211ad..56b8bfd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ mod process; use async_signals::Signals; use color_eyre::{eyre::WrapErr, Result}; use futures_util::StreamExt; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tracing::metadata::LevelFilter; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -32,7 +32,8 @@ async fn main() -> Result<()> { let token = CancellationToken::new(); let (env_tx, env_rx) = oneshot::channel(); - tokio::spawn(comp::run_compositor(token.child_token(), env_tx)); + let (socket_tx, socket_rx) = mpsc::unbounded_channel(); + tokio::spawn(comp::run_compositor(token.child_token(), socket_rx, env_tx)); let env_vars = env_rx .await .expect("failed to receive environmental variables");