diff --git a/src/comp.rs b/src/comp.rs index 06d8826..9332980 100644 --- a/src/comp.rs +++ b/src/comp.rs @@ -31,9 +31,13 @@ pub fn create_privileged_socket( sockets: &mut Vec, env_vars: &[(String, String)], ) -> Result<(Vec<(String, String)>, RawFd)> { + // Create a new pair of unnamed Unix sockets let (comp_socket, client_socket) = UnixStream::pair().wrap_err("failed to create socket pair")?; + // Push one socket to the list of sockets we were passed sockets.push(comp_socket); + // Turn the other socket into a non-blocking fd, which we can pass to the child + // process let client_fd = { let std_stream = client_socket .into_std() @@ -99,15 +103,22 @@ fn parse_and_handle_ipc(state: &mut IpcState) { } async fn receive_ipc(state: &mut IpcState, rx: &mut OwnedReadHalf) -> Result<()> { + // This is kind of a doozy, but this is kinda complex so it can be + // cancellation-safe. match state.length { + // We already got the length, and are currently reading the message body. Some(length) => { let index = state.bytes_read.saturating_sub(1); + // Add the amount of bytes read to our state. + // I don't think this is entirely cancellation safe, which worries me. state.bytes_read += rx .read_exact(&mut state.buf[index..]) .await .wrap_err("failed to read IPC length")?; + // If we've read enough bytes, parse the message. if state.bytes_read >= length as usize { parse_and_handle_ipc(state); + // Set the state back to the default "waiting for a length" mode. state.length = None; state.bytes_read = 0; state.buf.clear(); @@ -115,18 +126,22 @@ async fn receive_ipc(state: &mut IpcState, rx: &mut OwnedReadHalf) -> Result<()> Ok(()) } None => { + // Resize the state buffer enough to fit a u16./ state.buf.resize(2, 0); let index = state.bytes_read.saturating_sub(1); + // Read the remaining bytes of the length. state.bytes_read += rx .read_exact(&mut state.buf[index..]) .await .wrap_err("failed to read IPC length")?; + // If we've read two bytes, then parse a native-endian u16 from them. if state.bytes_read >= 2 { let length = u16::from_ne_bytes( state.buf[..2] .try_into() .wrap_err("failed to convert IPC length to u16")?, ); + // Set the state to "reading the message body" mode, as we now have the length. state.length = Some(length); state.bytes_read = 0; state.buf.resize(length as usize, 0); @@ -137,6 +152,7 @@ async fn receive_ipc(state: &mut IpcState, rx: &mut OwnedReadHalf) -> Result<()> } async fn send_fd(session_tx: &mut OwnedWriteHalf, stream: Vec) -> Result<()> { + // Turn our list of Unix streams into non-blocking file descriptors. let fds = stream .into_iter() .map(|stream| { @@ -150,19 +166,26 @@ async fn send_fd(session_tx: &mut OwnedWriteHalf, stream: Vec) -> Re }) .collect::>>() .wrap_err("failed to convert streams to file descriptors")?; + // Create a NewPrivilegedClient message, with a count of how many file + // descriptors we are about to send. let json = serde_json::to_string(&Message::NewPrivilegedClient { count: fds.len() }) .wrap_err("failed to encode json")?; + // Send the length of our NewPrivilegedClient message. session_tx .write_all(&(json.len() as u16).to_le_bytes()) .await .wrap_err("failed to write length")?; + // Send our NewPrivilegedClient message, in JSON form. session_tx .write_all(json.as_bytes()) .await .wrap_err("failed to write json")?; + // Wait 100 us for the session to acknowledge our message. tokio::time::sleep(std::time::Duration::from_micros(100)).await; + // Send our file descriptors. let fd: &UnixStream = session_tx.as_ref(); fd.send_with_fd(&[0], &fds).wrap_err("failed to send fd")?; + // Close our copy of each file descriptor. for fd in &fds { let _ = unistd::close(*fd); } @@ -176,8 +199,11 @@ pub fn run_compositor( env_tx: oneshot::Sender>, ) -> Result>> { let (tx, mut rx) = unbounded_channel::(); + // Create a pair of unix sockets - one for us (session), + // one for the compositor (comp) let (session, comp) = UnixStream::pair().wrap_err("failed to create pair of unix sockets")?; let (mut session_rx, mut session_tx) = session.into_split(); + // Convert our compositor socket to a non-blocking file descriptor. let comp = { let std_stream = comp .into_std() @@ -187,10 +213,13 @@ pub fn run_compositor( .wrap_err("failed to mark compositor unix stream as blocking")?; std_stream.into_raw_fd() }; + // Create a new span, marking the upcoming task as `cosmic-comp` with tracing. let span = info_span!(parent: None, "cosmic-comp"); let _span = span.clone(); Ok(tokio::spawn( async move { + // Create a new process handler for cosmic-comp, with our compositor socket's + // file descriptor as the `COSMIC_SESSION_SOCK` environment variable. ProcessHandler::new(tx, &token).run( "cosmic-comp", vec![], @@ -198,19 +227,25 @@ pub fn run_compositor( vec![comp], &span, ); + // Create a new state object for IPC purposes. let mut ipc_state = IpcState { env_tx: Some(env_tx), ..IpcState::default() }; loop { tokio::select! { + // Receive events from the process handler channel, + // exiting the loop if the process has exited. exit = receive_event(&mut rx) => if exit.is_none() { break; }, + // Receive IPC messages from the process, + // exiting the loop if IPC errors. result = receive_ipc(&mut ipc_state, &mut session_rx) => if let Err(err) = result { error!("failed to receive IPC: {:?}", err); break; }, + // Send any file descriptors we need to the compositor. Some(socket) = socket_rx.recv() => { send_fd(&mut session_tx, socket) .await