🗑️ Code cleanup
This commit is contained in:
parent
5bf43a72af
commit
a0091235a4
2 changed files with 15 additions and 62 deletions
69
src/comp.rs
69
src/comp.rs
|
|
@ -4,31 +4,21 @@ use color_eyre::eyre::{ContextCompat, Result, WrapErr};
|
||||||
use nix::fcntl;
|
use nix::fcntl;
|
||||||
use sendfd::SendWithFd;
|
use sendfd::SendWithFd;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{
|
use std::os::unix::prelude::{AsRawFd, IntoRawFd};
|
||||||
collections::HashMap,
|
|
||||||
os::unix::prelude::{AsRawFd, IntoRawFd},
|
|
||||||
};
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines},
|
io::AsyncWriteExt,
|
||||||
net::{
|
net::UnixStream,
|
||||||
unix::{ReadHalf, WriteHalf},
|
sync::mpsc::{self, unbounded_channel},
|
||||||
UnixStream,
|
|
||||||
},
|
|
||||||
sync::{
|
|
||||||
mpsc::{self, unbounded_channel},
|
|
||||||
oneshot,
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "snake_case", tag = "message")]
|
#[serde(rename_all = "snake_case", tag = "message")]
|
||||||
pub enum Message {
|
pub enum Message {
|
||||||
SetEnv { variables: HashMap<String, String> },
|
|
||||||
NewPrivilegedClient { count: usize },
|
NewPrivilegedClient { count: usize },
|
||||||
}
|
}
|
||||||
|
|
||||||
fn mark_as_cloexec(stream: &UnixStream) -> Result<()> {
|
fn mark_as_not_cloexec(stream: &UnixStream) -> Result<()> {
|
||||||
let raw_fd = stream.as_raw_fd();
|
let raw_fd = stream.as_raw_fd();
|
||||||
let fd_flags = fcntl::FdFlag::from_bits(
|
let fd_flags = fcntl::FdFlag::from_bits(
|
||||||
fcntl::fcntl(raw_fd, fcntl::FcntlArg::F_GETFD)
|
fcntl::fcntl(raw_fd, fcntl::FcntlArg::F_GETFD)
|
||||||
|
|
@ -51,7 +41,7 @@ pub fn create_privileged_socket(
|
||||||
UnixStream::pair().wrap_err("failed to create socket pair")?;
|
UnixStream::pair().wrap_err("failed to create socket pair")?;
|
||||||
sockets.push(comp_socket);
|
sockets.push(comp_socket);
|
||||||
let client_fd = {
|
let client_fd = {
|
||||||
mark_as_cloexec(&client_socket).wrap_err("failed to mark client stream as CLOEXEC")?;
|
mark_as_not_cloexec(&client_socket).wrap_err("failed to mark client stream as CLOEXEC")?;
|
||||||
let std_stream = client_socket
|
let std_stream = client_socket
|
||||||
.into_std()
|
.into_std()
|
||||||
.wrap_err("failed to convert client socket to std socket")?;
|
.wrap_err("failed to convert client socket to std socket")?;
|
||||||
|
|
@ -87,38 +77,11 @@ async fn receive_event(rx: &mut mpsc::UnboundedReceiver<ProcessEvent>) -> Option
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_ipc(
|
async fn send_fd(session_tx: &mut UnixStream, stream: Vec<UnixStream>) -> Result<()> {
|
||||||
rx: &mut Lines<BufReader<ReadHalf<'_>>>,
|
|
||||||
env_tx: &mut Option<oneshot::Sender<Vec<(String, String)>>>,
|
|
||||||
) -> Result<Option<()>> {
|
|
||||||
let line = match rx
|
|
||||||
.next_line()
|
|
||||||
.await
|
|
||||||
.wrap_err("failed to get next line of ipc")?
|
|
||||||
{
|
|
||||||
Some(line) => line,
|
|
||||||
None => return Ok(None),
|
|
||||||
};
|
|
||||||
match serde_json::from_str::<Message>(&line).wrap_err("invalid message from cosmic-comp")? {
|
|
||||||
Message::SetEnv { variables } => {
|
|
||||||
if let Some(env_tx) = env_tx.take() {
|
|
||||||
env_tx
|
|
||||||
.send(variables.into_iter().collect())
|
|
||||||
.expect("failed to send environmental variables");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Message::NewPrivilegedClient { .. } => {
|
|
||||||
unreachable!("compositor should not send NewPrivilegedClient")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(Some(()))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send_fd(session_tx: &mut WriteHalf<'_>, stream: Vec<UnixStream>) -> Result<()> {
|
|
||||||
let fds = stream
|
let fds = stream
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|stream| {
|
.map(|stream| {
|
||||||
mark_as_cloexec(&stream).wrap_err("failed to mark stream as CLOEXEC")?;
|
mark_as_not_cloexec(&stream).wrap_err("failed to mark stream as CLOEXEC")?;
|
||||||
let std_stream = stream
|
let std_stream = stream
|
||||||
.into_std()
|
.into_std()
|
||||||
.wrap_err("failed to convert stream to std stream")?;
|
.wrap_err("failed to convert stream to std stream")?;
|
||||||
|
|
@ -140,8 +103,9 @@ async fn send_fd(session_tx: &mut WriteHalf<'_>, stream: Vec<UnixStream>) -> Res
|
||||||
.await
|
.await
|
||||||
.wrap_err("failed to write json")?;
|
.wrap_err("failed to write json")?;
|
||||||
tokio::time::sleep(std::time::Duration::from_micros(100)).await;
|
tokio::time::sleep(std::time::Duration::from_micros(100)).await;
|
||||||
let tx: &UnixStream = session_tx.as_ref();
|
session_tx
|
||||||
tx.send_with_fd(&[0], &fds).wrap_err("failed to send fd")?;
|
.send_with_fd(&[0], &fds)
|
||||||
|
.wrap_err("failed to send fd")?;
|
||||||
info!("sent {} fds", fds.len());
|
info!("sent {} fds", fds.len());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -149,16 +113,12 @@ async fn send_fd(session_tx: &mut WriteHalf<'_>, stream: Vec<UnixStream>) -> Res
|
||||||
pub async fn run_compositor(
|
pub async fn run_compositor(
|
||||||
token: CancellationToken,
|
token: CancellationToken,
|
||||||
mut socket_rx: mpsc::UnboundedReceiver<Vec<UnixStream>>,
|
mut socket_rx: mpsc::UnboundedReceiver<Vec<UnixStream>>,
|
||||||
env_tx: oneshot::Sender<Vec<(String, String)>>,
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut env_tx = Some(env_tx);
|
|
||||||
let (tx, mut rx) = unbounded_channel::<ProcessEvent>();
|
let (tx, mut rx) = unbounded_channel::<ProcessEvent>();
|
||||||
let (mut session, comp) =
|
let (mut session, comp) =
|
||||||
UnixStream::pair().wrap_err("failed to create pair of unix sockets")?;
|
UnixStream::pair().wrap_err("failed to create pair of unix sockets")?;
|
||||||
let (session_rx, mut session_tx) = session.split();
|
|
||||||
let mut session = BufReader::new(session_rx).lines();
|
|
||||||
let comp = {
|
let comp = {
|
||||||
mark_as_cloexec(&comp).wrap_err("failed to mark compositor stream as CLOEXEC")?;
|
mark_as_not_cloexec(&comp).wrap_err("failed to mark compositor stream as CLOEXEC")?;
|
||||||
let std_stream = comp
|
let std_stream = comp
|
||||||
.into_std()
|
.into_std()
|
||||||
.wrap_err("failed to convert compositor unix stream to a standard unix stream")?;
|
.wrap_err("failed to convert compositor unix stream to a standard unix stream")?;
|
||||||
|
|
@ -176,11 +136,8 @@ pub async fn run_compositor(
|
||||||
exit = receive_event(&mut rx) => if exit.is_none() {
|
exit = receive_event(&mut rx) => if exit.is_none() {
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
exit = receive_ipc(&mut session, &mut env_tx) => if exit.wrap_err("failed to receive ipc message")?.is_none() {
|
|
||||||
break;
|
|
||||||
},
|
|
||||||
Some(socket) = socket_rx.recv() => {
|
Some(socket) = socket_rx.recv() => {
|
||||||
send_fd(&mut session_tx, socket)
|
send_fd(&mut session, socket)
|
||||||
.await
|
.await
|
||||||
.wrap_err("failed to send file descriptor to compositor")?;
|
.wrap_err("failed to send file descriptor to compositor")?;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ mod process;
|
||||||
use async_signals::Signals;
|
use async_signals::Signals;
|
||||||
use color_eyre::{eyre::WrapErr, Result};
|
use color_eyre::{eyre::WrapErr, Result};
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::mpsc;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::metadata::LevelFilter;
|
use tracing::metadata::LevelFilter;
|
||||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||||
|
|
@ -31,19 +31,15 @@ async fn main() -> Result<()> {
|
||||||
info!("Starting cosmic-session");
|
info!("Starting cosmic-session");
|
||||||
|
|
||||||
let token = CancellationToken::new();
|
let token = CancellationToken::new();
|
||||||
let (env_tx, env_rx) = oneshot::channel();
|
|
||||||
let (socket_tx, socket_rx) = mpsc::unbounded_channel();
|
let (socket_tx, socket_rx) = mpsc::unbounded_channel();
|
||||||
tokio::spawn({
|
tokio::spawn({
|
||||||
let token = token.child_token();
|
let token = token.child_token();
|
||||||
async move {
|
async move {
|
||||||
if let Err(err) = comp::run_compositor(token, socket_rx, env_tx).await {
|
if let Err(err) = comp::run_compositor(token, socket_rx).await {
|
||||||
error!("compositor errored: {:?}", err);
|
error!("compositor errored: {:?}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
// let env_vars = env_rx
|
|
||||||
// .await
|
|
||||||
// .expect("failed to receive environmental variables");
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
let env_vars = Vec::new();
|
let env_vars = Vec::new();
|
||||||
info!("got environmental variables: {:?}", env_vars);
|
info!("got environmental variables: {:?}", env_vars);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue