refactor: fully restart after the compositor exits

This commit is contained in:
Ashley Wulber 2023-10-31 11:38:33 -04:00 committed by Ashley Wulber
parent 9a94c8f516
commit fcb16e29c1
3 changed files with 103 additions and 65 deletions

View file

@ -15,7 +15,7 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use crate::process::mark_as_not_cloexec;
use crate::{process::mark_as_not_cloexec, service::SessionRequest};
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "message")]
@ -24,31 +24,6 @@ pub enum Message {
NewPrivilegedClient { count: usize },
}
pub fn create_privileged_socket(
sockets: &mut Vec<UnixStream>,
env_vars: &[(String, String)],
) -> Result<(Vec<(String, String)>, OwnedFd)> {
// Create a new pair of unnamed Unix sockets
let (comp_socket, client_socket) =
UnixStream::pair().wrap_err("failed to create socket pair")?;
// Push one socket to the list of sockets we were passed
sockets.push(comp_socket);
// Turn the other socket into a non-blocking fd, which we can pass to the child
// process
let client_fd = {
let std_stream = client_socket
.into_std()
.wrap_err("failed to convert client socket to std socket")?;
std_stream
.set_nonblocking(true)
.wrap_err("failed to mark client socket as non-blocking")?;
OwnedFd::from(std_stream)
};
let mut env_vars = env_vars.to_vec();
env_vars.push(("WAYLAND_SOCKET".into(), client_fd.as_raw_fd().to_string()));
Ok((env_vars, client_fd))
}
// Cancellation safe!
#[derive(Default)]
struct IpcState {
@ -173,6 +148,7 @@ pub fn run_compositor(
_token: CancellationToken,
mut socket_rx: mpsc::UnboundedReceiver<Vec<UnixStream>>,
env_tx: oneshot::Sender<HashMap<String, String>>,
session_dbus_tx: mpsc::Sender<SessionRequest>,
) -> Result<JoinHandle<Result<()>>> {
let process_manager = process_manager.clone();
// Create a pair of unix sockets - one for us (session),
@ -197,7 +173,23 @@ pub fn run_compositor(
.start_process(
Process::new()
.with_executable("cosmic-comp")
.with_env([("COSMIC_SESSION_SOCK", comp.as_raw_fd().to_string())]),
.with_env([("COSMIC_SESSION_SOCK", comp.as_raw_fd().to_string())])
.with_on_exit(move |pman, _, err_code, _will_restart| {
let session_dbus_tx = session_dbus_tx.clone();
async move {
pman.stop();
if err_code == Some(0) {
info!("cosmic-comp exited successfully");
session_dbus_tx.send(SessionRequest::Exit).await.unwrap();
} else if let Some(err_code) = err_code {
error!("cosmic-comp exited with error code {}", err_code);
session_dbus_tx.send(SessionRequest::Restart).await.unwrap();
} else {
warn!("cosmic-comp exited by signal");
session_dbus_tx.send(SessionRequest::Restart).await.unwrap();
}
}
}),
)
.await
.expect("failed to launch compositor");

View file

@ -18,8 +18,12 @@ use color_eyre::{eyre::WrapErr, Result};
use cosmic_notifications_util::{DAEMON_NOTIFICATIONS_FD, PANEL_NOTIFICATIONS_FD};
use futures_util::StreamExt;
use launch_pad::{process::Process, ProcessManager};
use service::SessionRequest;
use tokio::{
sync::{mpsc, oneshot},
sync::{
mpsc::{self, Receiver, Sender},
oneshot,
},
time::{sleep, Duration},
};
use tokio_util::sync::CancellationToken;
@ -45,6 +49,46 @@ async fn main() -> Result<()> {
.wrap_err("failed to initialize logger")?;
log_panics::init();
let (session_tx, mut session_rx) = tokio::sync::mpsc::channel(10);
let session_tx_clone = session_tx.clone();
let _conn = ConnectionBuilder::session()?
.name("com.system76.CosmicSession")?
.serve_at(
"/com/system76/CosmicSession",
service::SessionService { session_tx },
)?
.build()
.await?;
loop {
match start(session_tx_clone.clone(), &mut session_rx).await {
Ok(Status::Exited) => {
info!("Exited cleanly");
break;
}
Ok(Status::Restarted) => {
info!("Restarting");
}
Err(error) => {
error!("Restarting after error: {:?}", error);
}
};
// Drain the session channel.
while session_rx.try_recv().is_ok() {}
}
Ok(())
}
#[derive(Debug)]
pub enum Status {
Restarted,
Exited,
}
async fn start(
session_tx: Sender<SessionRequest>,
session_rx: &mut Receiver<SessionRequest>,
) -> Result<Status> {
info!("Starting cosmic-session");
let process_manager = ProcessManager::new().await;
@ -57,9 +101,14 @@ async fn main() -> Result<()> {
let token = CancellationToken::new();
let (_, socket_rx) = mpsc::unbounded_channel();
let (env_tx, env_rx) = oneshot::channel();
let compositor_handle =
comp::run_compositor(&process_manager, token.child_token(), socket_rx, env_tx)
.wrap_err("failed to start compositor")?;
let compositor_handle = comp::run_compositor(
&process_manager,
token.child_token(),
socket_rx,
env_tx,
session_tx,
)
.wrap_err("failed to start compositor")?;
sleep(Duration::from_millis(2000)).await;
systemd::start_systemd_target()
.await
@ -166,31 +215,27 @@ async fn main() -> Result<()> {
.await
.expect("failed to start settings daemon");
let (exit_tx, exit_rx) = oneshot::channel();
let _conn = ConnectionBuilder::session()?
.name("com.system76.CosmicSession")?
.serve_at(
"/com/system76/CosmicSession",
service::SessionService {
exit_tx: Some(exit_tx),
},
)?
.build()
.await?;
let mut signals = Signals::new(vec![libc::SIGTERM, libc::SIGINT]).unwrap();
let mut status = Status::Exited;
loop {
let session_dbus_rx_next = session_rx.recv();
tokio::select! {
_ = compositor_handle => {
info!("EXITING: compositor exited");
break;
},
res = exit_rx => {
if res.is_err() {
warn!("exit channel dropped session");
res = session_dbus_rx_next => {
match res {
Some(service::SessionRequest::Exit) => {
info!("EXITING: session exited by request");
break;
}
Some(service::SessionRequest::Restart) => {
info!("RESTARTING: session restarted by request");
status = Status::Restarted;
break;
}
None => {
warn!("exit channel dropped session");
break;
}
}
info!("EXITING: session exited by request");
break;
},
signal = signals.next() => match signal {
Some(libc::SIGTERM | libc::SIGINT) => {
@ -202,9 +247,10 @@ async fn main() -> Result<()> {
}
}
}
compositor_handle.abort();
token.cancel();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
Ok(())
Ok(status)
}
async fn start_component(

View file

@ -1,25 +1,25 @@
// SPDX-License-Identifier: GPL-3.0-only
use tokio::sync::oneshot;
use tokio::sync::mpsc;
use zbus::dbus_interface;
pub enum SessionRequest {
Exit,
Restart,
}
pub struct SessionService {
pub exit_tx: Option<oneshot::Sender<()>>,
pub session_tx: mpsc::Sender<SessionRequest>,
}
#[dbus_interface(name = "com.system76.CosmicSession")]
impl SessionService {
fn exit(&mut self) {
match self.exit_tx.take() {
Some(tx) => {
tx.send(()).ok();
}
None => {
warn!("previously failed to properly exit session");
}
}
async fn exit(&mut self) {
warn!("exiting session");
_ = self.session_tx.send(SessionRequest::Exit).await;
}
fn restart(&self) {
async fn restart(&self) {
warn!("restarting session");
_ = self.session_tx.send(SessionRequest::Restart).await;
}
}