📝 Document compositor-related (comp.rs) code

This commit is contained in:
Lucy 2022-08-08 11:19:27 -04:00
parent 89c606ab18
commit e822a87072
No known key found for this signature in database
GPG key ID: EBC517FAD666BBF1

View file

@ -31,9 +31,13 @@ pub fn create_privileged_socket(
sockets: &mut Vec<UnixStream>, sockets: &mut Vec<UnixStream>,
env_vars: &[(String, String)], env_vars: &[(String, String)],
) -> Result<(Vec<(String, String)>, RawFd)> { ) -> Result<(Vec<(String, String)>, RawFd)> {
// Create a new pair of unnamed Unix sockets
let (comp_socket, client_socket) = let (comp_socket, client_socket) =
UnixStream::pair().wrap_err("failed to create socket pair")?; UnixStream::pair().wrap_err("failed to create socket pair")?;
// Push one socket to the list of sockets we were passed
sockets.push(comp_socket); sockets.push(comp_socket);
// Turn the other socket into a non-blocking fd, which we can pass to the child
// process
let client_fd = { let client_fd = {
let std_stream = client_socket let std_stream = client_socket
.into_std() .into_std()
@ -99,15 +103,22 @@ fn parse_and_handle_ipc(state: &mut IpcState) {
} }
async fn receive_ipc(state: &mut IpcState, rx: &mut OwnedReadHalf) -> Result<()> { async fn receive_ipc(state: &mut IpcState, rx: &mut OwnedReadHalf) -> Result<()> {
// This is kind of a doozy, but this is kinda complex so it can be
// cancellation-safe.
match state.length { match state.length {
// We already got the length, and are currently reading the message body.
Some(length) => { Some(length) => {
let index = state.bytes_read.saturating_sub(1); let index = state.bytes_read.saturating_sub(1);
// Add the amount of bytes read to our state.
// I don't think this is entirely cancellation safe, which worries me.
state.bytes_read += rx state.bytes_read += rx
.read_exact(&mut state.buf[index..]) .read_exact(&mut state.buf[index..])
.await .await
.wrap_err("failed to read IPC length")?; .wrap_err("failed to read IPC length")?;
// If we've read enough bytes, parse the message.
if state.bytes_read >= length as usize { if state.bytes_read >= length as usize {
parse_and_handle_ipc(state); parse_and_handle_ipc(state);
// Set the state back to the default "waiting for a length" mode.
state.length = None; state.length = None;
state.bytes_read = 0; state.bytes_read = 0;
state.buf.clear(); state.buf.clear();
@ -115,18 +126,22 @@ async fn receive_ipc(state: &mut IpcState, rx: &mut OwnedReadHalf) -> Result<()>
Ok(()) Ok(())
} }
None => { None => {
// Resize the state buffer enough to fit a u16./
state.buf.resize(2, 0); state.buf.resize(2, 0);
let index = state.bytes_read.saturating_sub(1); let index = state.bytes_read.saturating_sub(1);
// Read the remaining bytes of the length.
state.bytes_read += rx state.bytes_read += rx
.read_exact(&mut state.buf[index..]) .read_exact(&mut state.buf[index..])
.await .await
.wrap_err("failed to read IPC length")?; .wrap_err("failed to read IPC length")?;
// If we've read two bytes, then parse a native-endian u16 from them.
if state.bytes_read >= 2 { if state.bytes_read >= 2 {
let length = u16::from_ne_bytes( let length = u16::from_ne_bytes(
state.buf[..2] state.buf[..2]
.try_into() .try_into()
.wrap_err("failed to convert IPC length to u16")?, .wrap_err("failed to convert IPC length to u16")?,
); );
// Set the state to "reading the message body" mode, as we now have the length.
state.length = Some(length); state.length = Some(length);
state.bytes_read = 0; state.bytes_read = 0;
state.buf.resize(length as usize, 0); state.buf.resize(length as usize, 0);
@ -137,6 +152,7 @@ async fn receive_ipc(state: &mut IpcState, rx: &mut OwnedReadHalf) -> Result<()>
} }
async fn send_fd(session_tx: &mut OwnedWriteHalf, stream: Vec<UnixStream>) -> Result<()> { async fn send_fd(session_tx: &mut OwnedWriteHalf, stream: Vec<UnixStream>) -> Result<()> {
// Turn our list of Unix streams into non-blocking file descriptors.
let fds = stream let fds = stream
.into_iter() .into_iter()
.map(|stream| { .map(|stream| {
@ -150,19 +166,26 @@ async fn send_fd(session_tx: &mut OwnedWriteHalf, stream: Vec<UnixStream>) -> Re
}) })
.collect::<Result<Vec<_>>>() .collect::<Result<Vec<_>>>()
.wrap_err("failed to convert streams to file descriptors")?; .wrap_err("failed to convert streams to file descriptors")?;
// Create a NewPrivilegedClient message, with a count of how many file
// descriptors we are about to send.
let json = serde_json::to_string(&Message::NewPrivilegedClient { count: fds.len() }) let json = serde_json::to_string(&Message::NewPrivilegedClient { count: fds.len() })
.wrap_err("failed to encode json")?; .wrap_err("failed to encode json")?;
// Send the length of our NewPrivilegedClient message.
session_tx session_tx
.write_all(&(json.len() as u16).to_le_bytes()) .write_all(&(json.len() as u16).to_le_bytes())
.await .await
.wrap_err("failed to write length")?; .wrap_err("failed to write length")?;
// Send our NewPrivilegedClient message, in JSON form.
session_tx session_tx
.write_all(json.as_bytes()) .write_all(json.as_bytes())
.await .await
.wrap_err("failed to write json")?; .wrap_err("failed to write json")?;
// Wait 100 us for the session to acknowledge our message.
tokio::time::sleep(std::time::Duration::from_micros(100)).await; tokio::time::sleep(std::time::Duration::from_micros(100)).await;
// Send our file descriptors.
let fd: &UnixStream = session_tx.as_ref(); let fd: &UnixStream = session_tx.as_ref();
fd.send_with_fd(&[0], &fds).wrap_err("failed to send fd")?; fd.send_with_fd(&[0], &fds).wrap_err("failed to send fd")?;
// Close our copy of each file descriptor.
for fd in &fds { for fd in &fds {
let _ = unistd::close(*fd); let _ = unistd::close(*fd);
} }
@ -176,8 +199,11 @@ pub fn run_compositor(
env_tx: oneshot::Sender<HashMap<String, String>>, env_tx: oneshot::Sender<HashMap<String, String>>,
) -> Result<JoinHandle<Result<()>>> { ) -> Result<JoinHandle<Result<()>>> {
let (tx, mut rx) = unbounded_channel::<ProcessEvent>(); let (tx, mut rx) = unbounded_channel::<ProcessEvent>();
// Create a pair of unix sockets - one for us (session),
// one for the compositor (comp)
let (session, comp) = UnixStream::pair().wrap_err("failed to create pair of unix sockets")?; let (session, comp) = UnixStream::pair().wrap_err("failed to create pair of unix sockets")?;
let (mut session_rx, mut session_tx) = session.into_split(); let (mut session_rx, mut session_tx) = session.into_split();
// Convert our compositor socket to a non-blocking file descriptor.
let comp = { let comp = {
let std_stream = comp let std_stream = comp
.into_std() .into_std()
@ -187,10 +213,13 @@ pub fn run_compositor(
.wrap_err("failed to mark compositor unix stream as blocking")?; .wrap_err("failed to mark compositor unix stream as blocking")?;
std_stream.into_raw_fd() std_stream.into_raw_fd()
}; };
// Create a new span, marking the upcoming task as `cosmic-comp` with tracing.
let span = info_span!(parent: None, "cosmic-comp"); let span = info_span!(parent: None, "cosmic-comp");
let _span = span.clone(); let _span = span.clone();
Ok(tokio::spawn( Ok(tokio::spawn(
async move { async move {
// Create a new process handler for cosmic-comp, with our compositor socket's
// file descriptor as the `COSMIC_SESSION_SOCK` environment variable.
ProcessHandler::new(tx, &token).run( ProcessHandler::new(tx, &token).run(
"cosmic-comp", "cosmic-comp",
vec![], vec![],
@ -198,19 +227,25 @@ pub fn run_compositor(
vec![comp], vec![comp],
&span, &span,
); );
// Create a new state object for IPC purposes.
let mut ipc_state = IpcState { let mut ipc_state = IpcState {
env_tx: Some(env_tx), env_tx: Some(env_tx),
..IpcState::default() ..IpcState::default()
}; };
loop { loop {
tokio::select! { tokio::select! {
// Receive events from the process handler channel,
// exiting the loop if the process has exited.
exit = receive_event(&mut rx) => if exit.is_none() { exit = receive_event(&mut rx) => if exit.is_none() {
break; break;
}, },
// Receive IPC messages from the process,
// exiting the loop if IPC errors.
result = receive_ipc(&mut ipc_state, &mut session_rx) => if let Err(err) = result { result = receive_ipc(&mut ipc_state, &mut session_rx) => if let Err(err) = result {
error!("failed to receive IPC: {:?}", err); error!("failed to receive IPC: {:?}", err);
break; break;
}, },
// Send any file descriptors we need to the compositor.
Some(socket) = socket_rx.recv() => { Some(socket) = socket_rx.recv() => {
send_fd(&mut session_tx, socket) send_fd(&mut session_tx, socket)
.await .await