refactor: improve and fix subscriptions and async logic

This commit is contained in:
Michael Aaron Murphy 2025-04-10 13:03:53 +02:00 committed by Ashley Wulber
parent 2d2543094e
commit 0600931abf
9 changed files with 290 additions and 269 deletions

View file

@ -7,9 +7,9 @@ use cosmic::iced::{Point, Rectangle, Size};
use cosmic::iced_runtime::platform_specific::wayland::subsurface::SctkSubsurfaceSettings;
use cosmic::surface;
use cosmic::{
executor,
Element, executor,
iced::{
self, alignment,
self, Length, Subscription, alignment,
event::{
self,
wayland::{Event as WaylandEvent, OutputEvent, SessionLockEvent},
@ -18,12 +18,12 @@ use cosmic::{
platform_specific::shell::wayland::commands::session_lock::{
destroy_lock_surface, get_lock_surface, lock, unlock,
},
Length, Subscription,
},
iced_runtime::core::window::Id as SurfaceId,
style, widget, Element,
style, widget,
};
use cosmic_config::CosmicConfigEntry;
use std::time::Duration;
use std::{
any::TypeId,
collections::HashMap,
@ -35,8 +35,8 @@ use std::{
process,
sync::Arc,
};
use tokio::{sync::mpsc, task, time};
use wayland_client::{protocol::wl_output::WlOutput, Proxy};
use tokio::{sync::mpsc, task};
use wayland_client::{Proxy, protocol::wl_output::WlOutput};
fn lockfile_opt() -> Option<PathBuf> {
let runtime_dir = dirs::runtime_dir()?;
@ -113,7 +113,7 @@ pub fn pam_thread(username: String, conversation: Conversation) -> Result<(), pa
}
pub struct Conversation {
msg_tx: futures::channel::mpsc::Sender<Message>,
msg_tx: futures::channel::mpsc::Sender<cosmic::Action<Message>>,
value_rx: mpsc::Receiver<String>,
}
@ -127,24 +127,20 @@ impl Conversation {
log::error!("failed to convert prompt to UTF-8: {:?}", err);
pam_client::ErrorCode::CONV_ERR
})?;
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime
.block_on(async {
self.msg_tx
.send(Message::Prompt(
prompt.to_string(),
secret,
Some(String::new()),
))
.await
})
.map_err(|err| {
log::error!("failed to send prompt: {:?}", err);
pam_client::ErrorCode::CONV_ERR
})?;
futures::executor::block_on(async {
self.msg_tx
.send(cosmic::Action::App(Message::Prompt(
prompt.to_string(),
secret,
Some(String::new()),
)))
.await
})
.map_err(|err| {
log::error!("failed to send prompt: {:?}", err);
pam_client::ErrorCode::CONV_ERR
})?;
let value = self.value_rx.blocking_recv().ok_or_else(|| {
log::error!("failed to receive value: channel closed");
@ -165,7 +161,11 @@ impl Conversation {
futures::executor::block_on(async {
self.msg_tx
.send(Message::Prompt(prompt.to_string(), false, None))
.send(cosmic::Action::App(Message::Prompt(
prompt.to_string(),
false,
None,
)))
.await
})
.map_err(|err| {
@ -238,11 +238,23 @@ pub enum Message {
#[derive(Clone, Debug)]
enum State {
Locking,
Locked,
Locked {
task_handle: cosmic::iced::task::Handle,
},
Unlocking,
Unlocked,
}
impl Drop for State {
fn drop(&mut self) {
// Abort the locked task when the state is changed.
if let Self::Locked { task_handle } = self {
log::info!("dropping lockscreen tasks");
task_handle.abort();
}
}
}
/// The [`App`] stores application-specific state.
pub struct App {
core: Core,
@ -264,7 +276,7 @@ pub struct App {
}
impl App {
fn menu<'a>(&'a self, surface_id: SurfaceId) -> Element<'a, Message> {
fn menu(&self, surface_id: SurfaceId) -> Element<Message> {
let left_element = {
let date_time_column = {
let mut column = widget::column::with_capacity(2).padding(16.0);
@ -389,7 +401,7 @@ impl App {
)
.id(text_input_id)
.manage_value(true)
.on_submit(|v| Message::Submit(v));
.on_submit(Message::Submit);
if *secret {
text_input = text_input.password()
@ -577,18 +589,18 @@ impl cosmic::Application for App {
let surface_id = SurfaceId::unique();
let subsurface_id = SurfaceId::unique();
match self.surface_ids.insert(output.clone(), surface_id) {
Some(old_surface_id) => {
//TODO: remove old surface?
log::warn!(
"output {}: already had surface ID {:?}",
output.id(),
old_surface_id
);
return Task::none();
}
None => {}
if let Some(old_surface_id) =
self.surface_ids.insert(output.clone(), surface_id)
{
//TODO: remove old surface?
log::warn!(
"output {}: already had surface ID {:?}",
output.id(),
old_surface_id
);
return Task::none();
}
let size = if let Some((w, h)) =
output_info_opt.as_ref().and_then(|info| info.logical_size)
{
@ -654,7 +666,7 @@ impl cosmic::Application for App {
})),
);
if matches!(self.state, State::Locked) {
if matches!(self.state, State::Locked { .. }) {
return Task::batch([
get_lock_surface(surface_id, output),
cosmic::task::message(cosmic::Action::Cosmic(
@ -672,7 +684,7 @@ impl cosmic::Application for App {
if let Some(n) = self.surface_names.remove(&surface_id) {
self.text_input_ids.remove(&n);
}
if matches!(self.state, State::Locked) {
if matches!(self.state, State::Locked { .. }) {
return destroy_lock_surface(surface_id);
}
}
@ -709,15 +721,89 @@ impl cosmic::Application for App {
SessionLockEvent::Focused(..) => {}
SessionLockEvent::Locked => {
log::info!("session locked");
if matches!(self.state, State::Locked) {
if matches!(self.state, State::Locked { .. }) {
return Task::none();
}
self.state = State::Locked;
let username = self.flags.current_user.name.clone();
let (locked_task, locked_handle) = cosmic::task::stream(
cosmic::iced_futures::stream::channel(16, |mut msg_tx| async move {
// Send heartbeat once a second to update time.
let heartbeat_future = {
let mut output = msg_tx.clone();
async move {
let mut interval =
tokio::time::interval(Duration::from_secs(1));
loop {
output
.send(cosmic::Action::App(Message::None))
.await
.unwrap();
interval.tick().await;
}
}
};
let pam_future = async {
loop {
let (value_tx, value_rx) = mpsc::channel(16);
msg_tx
.send(cosmic::Action::App(Message::Channel(value_tx)))
.await
.unwrap();
let pam_res = {
let username = username.clone();
let msg_tx = msg_tx.clone();
task::spawn_blocking(move || {
pam_thread(username, Conversation { msg_tx, value_rx })
})
.await
.unwrap()
};
match pam_res {
Ok(()) => {
log::info!("successfully authenticated");
msg_tx
.send(cosmic::Action::App(Message::Unlock))
.await
.unwrap();
break;
}
Err(err) => {
log::warn!("authentication error: {}", err);
msg_tx
.send(cosmic::Action::App(Message::Error(
err.to_string(),
)))
.await
.unwrap();
}
}
}
};
futures::pin_mut!(heartbeat_future);
futures::pin_mut!(pam_future);
futures::future::select(heartbeat_future, pam_future).await;
}),
)
.abortable();
let mut commands = Vec::with_capacity(self.surface_ids.len() + 1);
commands.push(locked_task);
self.state = State::Locked {
task_handle: locked_handle,
};
// Allow suspend
self.inhibit_opt = None;
// Create lock surfaces
let mut commands = Vec::with_capacity(self.surface_ids.len());
for (output, surface_id) in self.surface_ids.iter() {
commands.push(get_lock_surface(*surface_id, output.clone()));
@ -836,15 +922,11 @@ impl cosmic::Application for App {
},
Message::Suspend => {
#[cfg(feature = "logind")]
return cosmic::task::future(async move {
match crate::logind::suspend().await {
Ok(()) => cosmic::action::none(),
Err(err) => {
log::error!("failed to suspend: {:?}", err);
cosmic::Action::App(Message::Error(err.to_string()))
}
}
});
return cosmic::Task::future(async move { crate::logind::suspend().await.err() })
.and_then(|err| {
log::error!("failed to suspend: {:?}", err);
cosmic::task::message(cosmic::Action::App(Message::Error(err.to_string())))
});
}
Message::Error(error) => {
self.error_opt = Some(error);
@ -869,13 +951,13 @@ impl cosmic::Application for App {
State::Unlocking => {
log::info!("session still unlocking");
}
State::Locking | State::Locked => {
State::Locking | State::Locked { .. } => {
log::info!("session already locking or locked");
}
},
Message::Unlock => {
match self.state {
State::Locked => {
State::Locked { .. } => {
log::info!("sessing unlocking");
self.state = State::Unlocking;
// Clear errors
@ -891,6 +973,12 @@ impl cosmic::Application for App {
// Destroy lock surfaces
let mut commands = Vec::with_capacity(self.surface_ids.len() + 1);
for (_output, surface_id) in self.surface_ids.iter() {
self.surface_names.remove(surface_id);
commands.push(destroy_lock_surface(*surface_id));
}
// Tell compositor to unlock
commands.push(unlock());
@ -964,60 +1052,6 @@ impl cosmic::Application for App {
}),
);
if matches!(self.state, State::Locked) {
struct HeartbeatSubscription;
subscriptions.push(Subscription::run_with_id(
TypeId::of::<HeartbeatSubscription>(),
cosmic::iced_futures::stream::channel(16, |mut msg_tx| async move {
loop {
// Send heartbeat once a second to update time
//TODO: only send this when needed
msg_tx.send(Message::None).await.unwrap();
time::sleep(time::Duration::new(1, 0)).await;
}
}),
));
struct PamSubscription;
//TODO: how to avoid cloning this on every time subscription is called?
let username = self.flags.current_user.name.clone();
subscriptions.push(Subscription::run_with_id(
TypeId::of::<PamSubscription>(),
cosmic::iced_futures::stream::channel(16, |mut msg_tx| async move {
loop {
let (value_tx, value_rx) = mpsc::channel(16);
msg_tx.send(Message::Channel(value_tx)).await.unwrap();
let pam_res = {
let username = username.clone();
let msg_tx = msg_tx.clone();
task::spawn_blocking(move || {
pam_thread(username, Conversation { msg_tx, value_rx })
})
.await
.unwrap()
};
match pam_res {
Ok(()) => {
log::info!("successfully authenticated");
msg_tx.send(Message::Unlock).await.unwrap();
break;
}
Err(err) => {
log::warn!("authentication error: {}", err);
msg_tx.send(Message::Error(err.to_string())).await.unwrap();
}
}
}
loop {
time::sleep(time::Duration::new(60, 0)).await;
}
}),
));
}
#[cfg(feature = "logind")]
{
subscriptions.push(crate::logind::subscription());
@ -1025,16 +1059,12 @@ impl cosmic::Application for App {
#[cfg(feature = "networkmanager")]
{
subscriptions.push(
crate::networkmanager::subscription()
.map(|icon_opt| Message::NetworkIcon(icon_opt)),
);
subscriptions.push(crate::networkmanager::subscription().map(Message::NetworkIcon));
}
#[cfg(feature = "upower")]
{
subscriptions
.push(crate::upower::subscription().map(|info_opt| Message::PowerInfo(info_opt)));
subscriptions.push(crate::upower::subscription().map(Message::PowerInfo));
}
Subscription::batch(subscriptions)