refactor(display): use Task::stream instead of on_enter sender
The on_enter sender was introduced before iced supported creating tasks from streams
This commit is contained in:
parent
c8b9ee951d
commit
855ba59cab
1 changed files with 47 additions and 23 deletions
|
|
@ -139,7 +139,7 @@ pub struct Page {
|
|||
mirror_map: SecondaryMap<OutputKey, OutputKey>,
|
||||
mirror_menu: widget::dropdown::multi::Model<String, Mirroring>,
|
||||
active_display: OutputKey,
|
||||
background_service_cancel: Option<oneshot::Sender<()>>,
|
||||
randr_handle: Option<(oneshot::Sender<()>, cosmic::iced::task::Handle)>,
|
||||
hotplug_handle: Option<(oneshot::Sender<()>, cosmic::iced::task::Handle)>,
|
||||
config: Config,
|
||||
cache: ViewCache,
|
||||
|
|
@ -176,7 +176,7 @@ impl Default for Page {
|
|||
mirror_map: SecondaryMap::new(),
|
||||
mirror_menu: widget::dropdown::multi::model(),
|
||||
active_display: OutputKey::default(),
|
||||
background_service_cancel: None,
|
||||
randr_handle: None,
|
||||
hotplug_handle: None,
|
||||
config: Config::default(),
|
||||
cache: ViewCache::default(),
|
||||
|
|
@ -254,7 +254,7 @@ impl page::Page<crate::pages::Message> for Page {
|
|||
#[cfg(not(feature = "test"))]
|
||||
fn on_enter(
|
||||
&mut self,
|
||||
sender: tokio::sync::mpsc::Sender<crate::pages::Message>,
|
||||
_sender: tokio::sync::mpsc::Sender<crate::pages::Message>,
|
||||
) -> Task<crate::pages::Message> {
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
@ -265,8 +265,17 @@ impl page::Page<crate::pages::Message> for Page {
|
|||
fl!("orientation", "rotate-270"),
|
||||
];
|
||||
|
||||
if let Some(canceller) = self.background_service_cancel.take() {
|
||||
let mut tasks = Vec::with_capacity(3);
|
||||
tasks.push(cosmic::task::future(on_enter()));
|
||||
|
||||
if let Some((canceller, handle)) = self.randr_handle.take() {
|
||||
_ = canceller.send(());
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
if let Some((canceller, handle)) = self.hotplug_handle.take() {
|
||||
_ = canceller.send(());
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
self.refreshing_page.store(true, Ordering::SeqCst);
|
||||
|
|
@ -275,7 +284,7 @@ impl page::Page<crate::pages::Message> for Page {
|
|||
{
|
||||
let refreshing_page = self.refreshing_page.clone();
|
||||
let (tx, mut rx) = tachyonix::channel(4);
|
||||
let (canceller, cancelled) = oneshot::channel();
|
||||
let (canceller, cancelled) = oneshot::channel::<()>();
|
||||
let runtime = tokio::runtime::Handle::current();
|
||||
|
||||
// Spawns a background service to monitor for display state changes.
|
||||
|
|
@ -298,28 +307,41 @@ impl page::Page<crate::pages::Message> for Page {
|
|||
});
|
||||
|
||||
// Forward messages from another thread to prevent the monitoring thread from blocking.
|
||||
tokio::task::spawn(async move {
|
||||
while let Ok(message) = rx.recv().await {
|
||||
if sender.is_closed() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let cosmic_randr::Message::ManagerDone = message {
|
||||
if !refreshing_page.swap(true, Ordering::SeqCst) {
|
||||
let sender = sender.clone();
|
||||
tokio::spawn(async move {
|
||||
_ = sender.send(on_enter().await).await;
|
||||
});
|
||||
let (randr_task, randr_handle) =
|
||||
Task::stream(async_fn_stream::fn_stream(|emitter| async move {
|
||||
while let Ok(message) = rx.recv().await {
|
||||
if let cosmic_randr::Message::ManagerDone = message {
|
||||
if !refreshing_page.swap(true, Ordering::SeqCst) {
|
||||
_ = emitter.emit(on_enter().await).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}))
|
||||
.abortable();
|
||||
|
||||
self.background_service_cancel = Some(canceller);
|
||||
// tokio::task::spawn(async move {
|
||||
// while let Ok(message) = rx.recv().await {
|
||||
// if sender.is_closed() {
|
||||
// return;
|
||||
// }
|
||||
|
||||
// if let cosmic_randr::Message::ManagerDone = message {
|
||||
// if !refreshing_page.swap(true, Ordering::SeqCst) {
|
||||
// let sender = sender.clone();
|
||||
// tokio::spawn(async move {
|
||||
// _ = sender.send(on_enter().await).await;
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// });
|
||||
|
||||
tasks.push(randr_task);
|
||||
self.randr_handle = Some((canceller, randr_handle));
|
||||
}
|
||||
|
||||
// Channels for communicating messages from the DRM hotplug thread.
|
||||
let (hotplug_cancel_tx, hotplug_cancel_rx) = tokio::sync::oneshot::channel::<()>();
|
||||
let (hotplug_cancel_tx, hotplug_cancel_rx) = oneshot::channel::<()>();
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
// Spawn a background thread for asynchronously polling a udev monitor for DRM
|
||||
|
|
@ -383,9 +405,10 @@ impl page::Page<crate::pages::Message> for Page {
|
|||
}))
|
||||
.abortable();
|
||||
|
||||
tasks.push(hotplug_task);
|
||||
self.hotplug_handle = Some((hotplug_cancel_tx, hotplug_handle));
|
||||
|
||||
cosmic::task::batch(vec![cosmic::task::future(on_enter()), hotplug_task])
|
||||
cosmic::task::batch(tasks)
|
||||
}
|
||||
|
||||
fn on_leave(&mut self) -> Task<crate::pages::Message> {
|
||||
|
|
@ -394,8 +417,9 @@ impl page::Page<crate::pages::Message> for Page {
|
|||
handle.abort();
|
||||
}
|
||||
|
||||
if let Some(canceller) = self.background_service_cancel.take() {
|
||||
if let Some((canceller, handle)) = self.randr_handle.take() {
|
||||
_ = canceller.send(());
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
Task::none()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue