📦 ♻️ Refactor a bunch of stuff; debian packaging

This commit is contained in:
Lucy 2022-07-07 16:45:17 -04:00
parent a0091235a4
commit a67a677a1a
No known key found for this signature in database
GPG key ID: EBC517FAD666BBF1
15 changed files with 1119 additions and 149 deletions

View file

@ -4,17 +4,25 @@ use color_eyre::eyre::{ContextCompat, Result, WrapErr};
use nix::fcntl;
use sendfd::SendWithFd;
use serde::{Deserialize, Serialize};
use std::os::unix::prelude::{AsRawFd, IntoRawFd};
use std::{
collections::HashMap,
os::unix::prelude::{AsRawFd, IntoRawFd},
};
use tokio::{
io::AsyncWriteExt,
net::UnixStream,
io::{AsyncReadExt, AsyncWriteExt},
net::{
unix::{OwnedReadHalf, OwnedWriteHalf},
UnixStream,
},
sync::mpsc::{self, unbounded_channel},
};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "message")]
pub enum Message {
SetEnv { variables: HashMap<String, String> },
NewPrivilegedClient { count: usize },
}
@ -77,7 +85,70 @@ async fn receive_event(rx: &mut mpsc::UnboundedReceiver<ProcessEvent>) -> Option
}
}
async fn send_fd(session_tx: &mut UnixStream, stream: Vec<UnixStream>) -> Result<()> {
// Cancellation safe!
#[derive(Default)]
struct IpcState {
length: Option<u16>,
bytes_read: usize,
buf: Vec<u8>,
}
fn parse_and_handle_ipc(bytes: &[u8]) {
match serde_json::from_slice::<Message>(bytes) {
Ok(Message::SetEnv { variables }) => {
debug!(?variables);
}
Ok(Message::NewPrivilegedClient { .. }) => {
unreachable!("NewPrivilegedClient should not be sent TO the session!");
}
Err(_) => {
warn!(
"Unknown session socket message, are you using incompatible cosmic-session and \
cosmic-comp versions?"
)
}
}
}
async fn receive_ipc(state: &mut IpcState, rx: &mut OwnedReadHalf) -> Result<()> {
match state.length {
Some(length) => {
let index = state.bytes_read.saturating_sub(1);
state.bytes_read += rx
.read_exact(&mut state.buf[index..])
.await
.wrap_err("failed to read IPC length")?;
if state.bytes_read >= length as usize {
parse_and_handle_ipc(&state.buf);
state.length = None;
state.bytes_read = 0;
state.buf.clear();
}
Ok(())
}
None => {
state.buf.resize(2, 0);
let index = state.bytes_read.saturating_sub(1);
state.bytes_read += rx
.read_exact(&mut state.buf[index..])
.await
.wrap_err("failed to read IPC length")?;
if state.bytes_read >= 2 {
let length = u16::from_ne_bytes(
state.buf[..2]
.try_into()
.wrap_err("failed to convert IPC length to u16")?,
);
state.length = Some(length);
state.bytes_read = 0;
state.buf.resize(length as usize, 0);
}
Ok(())
}
}
}
async fn send_fd(session_tx: &mut OwnedWriteHalf, stream: Vec<UnixStream>) -> Result<()> {
let fds = stream
.into_iter()
.map(|stream| {
@ -103,20 +174,19 @@ async fn send_fd(session_tx: &mut UnixStream, stream: Vec<UnixStream>) -> Result
.await
.wrap_err("failed to write json")?;
tokio::time::sleep(std::time::Duration::from_micros(100)).await;
session_tx
.send_with_fd(&[0], &fds)
.wrap_err("failed to send fd")?;
let fd: &UnixStream = session_tx.as_ref();
fd.send_with_fd(&[0], &fds).wrap_err("failed to send fd")?;
info!("sent {} fds", fds.len());
Ok(())
}
pub async fn run_compositor(
pub fn run_compositor(
token: CancellationToken,
mut socket_rx: mpsc::UnboundedReceiver<Vec<UnixStream>>,
) -> Result<()> {
let (tx, mut rx) = unbounded_channel::<ProcessEvent>();
let (mut session, comp) =
UnixStream::pair().wrap_err("failed to create pair of unix sockets")?;
let (session, comp) = UnixStream::pair().wrap_err("failed to create pair of unix sockets")?;
let (mut session_rx, mut session_tx) = session.into_split();
let comp = {
mark_as_not_cloexec(&comp).wrap_err("failed to mark compositor stream as CLOEXEC")?;
let std_stream = comp
@ -127,21 +197,35 @@ pub async fn run_compositor(
.wrap_err("failed to mark compositor unix stream as blocking")?;
std_stream.into_raw_fd()
};
ProcessHandler::new(tx, &token).run("cosmic-comp", vec![], vec![(
"COSMIC_SESSION_SOCK".into(),
comp.to_string(),
)]);
loop {
tokio::select! {
exit = receive_event(&mut rx) => if exit.is_none() {
break;
},
Some(socket) = socket_rx.recv() => {
send_fd(&mut session, socket)
.await
.wrap_err("failed to send file descriptor to compositor")?;
let span = info_span!(parent: None, "cosmic-comp");
let _span = span.clone();
tokio::spawn(
async move {
ProcessHandler::new(tx, &token).run(
"cosmic-comp",
vec![],
vec![("COSMIC_SESSION_SOCK".into(), comp.to_string())],
&span,
);
let mut ipc_state = IpcState::default();
loop {
tokio::select! {
exit = receive_event(&mut rx) => if exit.is_none() {
break;
},
result = receive_ipc(&mut ipc_state, &mut session_rx) => if let Err(err) = result {
error!("failed to receive IPC: {:?}", err);
},
Some(socket) = socket_rx.recv() => {
send_fd(&mut session_tx, socket)
.await
.wrap_err("failed to send file descriptor to compositor")?;
}
}
}
Result::<()>::Ok(())
}
}
.instrument(_span),
);
Ok(())
}

43
src/generic.rs Normal file
View file

@ -0,0 +1,43 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::process::{ProcessEvent, ProcessHandler};
use tokio::sync::mpsc::unbounded_channel;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, Span};
pub fn run_executable(
token: CancellationToken,
span: Span,
executable: &'static str,
args: Vec<String>,
env_vars: Vec<(String, String)>,
) {
let span_2 = span.clone();
let (tx, mut rx) = unbounded_channel::<ProcessEvent>();
tokio::spawn(
async move {
ProcessHandler::new(tx, &token).run(executable, args, env_vars, &span);
while let Some(event) = rx.recv().await {
match event {
ProcessEvent::Started => {
info!("started");
}
ProcessEvent::Stdout(line) => {
info!("{}", line);
}
ProcessEvent::Stderr(line) => {
error!("{}", line);
}
ProcessEvent::Ended(Some(status)) => {
error!("exited with status {}", status);
return;
}
ProcessEvent::Ended(None) => {
error!("exited");
return;
}
}
}
}
.instrument(span_2),
);
}

View file

@ -3,7 +3,7 @@
extern crate tracing;
mod comp;
mod panel;
mod generic;
mod process;
use async_signals::Signals;
@ -32,32 +32,39 @@ async fn main() -> Result<()> {
let token = CancellationToken::new();
let (socket_tx, socket_rx) = mpsc::unbounded_channel();
tokio::spawn({
let token = token.child_token();
async move {
if let Err(err) = comp::run_compositor(token, socket_rx).await {
error!("compositor errored: {:?}", err);
}
}
});
if let Err(err) = comp::run_compositor(token.child_token(), socket_rx) {
error!("compositor errored: {:?}", err);
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let env_vars = Vec::new();
info!("got environmental variables: {:?}", env_vars);
let mut sockets = Vec::with_capacity(2);
tokio::spawn(panel::run_panel(
generic::run_executable(
token.child_token(),
"testing-panel",
info_span!(parent: None, "cosmic-panel"),
"cosmic-panel",
vec!["testing-panel".into()],
comp::create_privileged_socket(&mut sockets, &env_vars)
.wrap_err("failed to create panel socket")?,
));
tokio::spawn(panel::run_panel(
);
generic::run_executable(
token.child_token(),
"testing-dock",
info_span!(parent: None, "cosmic-panel dock"),
"cosmic-panel",
vec!["testing-dock".into()],
comp::create_privileged_socket(&mut sockets, &env_vars)
.wrap_err("failed to create dock socket")?,
));
);
generic::run_executable(
token.child_token(),
info_span!(parent: None, "cosmic-app-library"),
"cosmic-app-library",
vec![],
comp::create_privileged_socket(&mut sockets, &env_vars)
.wrap_err("failed to create dock socket")?,
);
socket_tx.send(sockets).unwrap();

View file

@ -1,30 +0,0 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::process::{ProcessEvent, ProcessHandler};
use tokio::sync::mpsc::unbounded_channel;
use tokio_util::sync::CancellationToken;
pub async fn run_panel(token: CancellationToken, config: &str, env_vars: Vec<(String, String)>) {
let (tx, mut rx) = unbounded_channel::<ProcessEvent>();
ProcessHandler::new(tx, &token).run("cosmic-panel", vec![config.to_string()], env_vars);
while let Some(event) = rx.recv().await {
match event {
ProcessEvent::Started => {
info!("started");
}
ProcessEvent::Stdout(line) => {
info!("{}", line);
}
ProcessEvent::Stderr(line) => {
error!("{}", line);
}
ProcessEvent::Ended(Some(status)) => {
error!("exited with status {}", status);
return;
}
ProcessEvent::Ended(None) => {
error!("exited");
return;
}
}
}
}

View file

@ -6,6 +6,7 @@ use tokio::{
sync::mpsc::UnboundedSender,
};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, Span};
pub enum ProcessEvent {
Started,
@ -27,82 +28,91 @@ impl ProcessHandler {
}
}
pub fn run(self, executable: impl ToString, args: Vec<String>, vars: Vec<(String, String)>) {
pub fn run(
self,
executable: impl ToString,
args: Vec<String>,
vars: Vec<(String, String)>,
span: &Span,
) {
let executable = executable.to_string();
tokio::spawn(async move {
let mut child = match Command::new(&executable)
.args(&args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.envs(vars)
.kill_on_drop(true)
.spawn()
{
Ok(child) => child,
Err(error) => {
error!(
"failed to launch '{} {}': {}",
executable,
args.join(" "),
error
);
return;
}
};
let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines();
std::mem::drop(self.tx.send(ProcessEvent::Started));
loop {
tokio::select! {
status = child.wait() => match status {
Ok(status) => {
info!("'{}' exited with status {}", executable, status);
std::mem::drop(self.tx.send(ProcessEvent::Ended(Some(status))));
return;
}
Err(error) => {
error!(
"failed to wait for '{}' to end: {}",
executable,
error
);
return;
}
},
line = stdout.next_line() => match line {
Ok(Some(line)) => {
std::mem::drop(self.tx.send(ProcessEvent::Stdout(line)));
},
Ok(None) => (),
Err(error) => {
warn!(
"failed to get stdout line from '{}': {}",
executable,
error
);
}
},
line = stderr.next_line() => match line {
Ok(Some(line)) => {
std::mem::drop(self.tx.send(ProcessEvent::Stderr(line)));
},
Ok(None) => (),
Err(error) => {
warn!(
"failed to get stderr line from '{}': {}",
executable,
error
);
}
},
_ = self.cancellation_token.cancelled() => {
warn!("exiting '{}': cancelled", executable);
std::mem::drop(self.tx.send(ProcessEvent::Ended(None)));
tokio::spawn(
async move {
let mut child = match Command::new(&executable)
.args(&args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.envs(vars)
.kill_on_drop(true)
.spawn()
{
Ok(child) => child,
Err(error) => {
error!(
"failed to launch '{} {}': {}",
executable,
args.join(" "),
error
);
return;
}
};
let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines();
std::mem::drop(self.tx.send(ProcessEvent::Started));
loop {
tokio::select! {
status = child.wait() => match status {
Ok(status) => {
info!("'{}' exited with status {}", executable, status);
std::mem::drop(self.tx.send(ProcessEvent::Ended(Some(status))));
return;
}
Err(error) => {
error!(
"failed to wait for '{}' to end: {}",
executable,
error
);
return;
}
},
line = stdout.next_line() => match line {
Ok(Some(line)) => {
std::mem::drop(self.tx.send(ProcessEvent::Stdout(line)));
},
Ok(None) => (),
Err(error) => {
warn!(
"failed to get stdout line from '{}': {}",
executable,
error
);
}
},
line = stderr.next_line() => match line {
Ok(Some(line)) => {
std::mem::drop(self.tx.send(ProcessEvent::Stderr(line)));
},
Ok(None) => (),
Err(error) => {
warn!(
"failed to get stderr line from '{}': {}",
executable,
error
);
}
},
_ = self.cancellation_token.cancelled() => {
warn!("exiting '{}': cancelled", executable);
std::mem::drop(self.tx.send(ProcessEvent::Ended(None)));
return;
}
}
}
}
});
.instrument(span.clone()),
);
}
}