chore: drop async-fn-stream for cosmic::iced_futures::stream::channel

This commit is contained in:
Michael Aaron Murphy 2026-01-10 03:48:49 +01:00 committed by Michael Murphy
parent 0de9c33822
commit d2d0b51da6
13 changed files with 176 additions and 166 deletions

12
Cargo.lock generated
View file

@ -467,17 +467,6 @@ dependencies = [
"slab", "slab",
] ]
[[package]]
name = "async-fn-stream"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4ba0c4baf81a0d8ab31618ffa3ae29ceeb970a6d0d82f76130753462e39d0ea"
dependencies = [
"futures-util",
"pin-project-lite",
"smallvec",
]
[[package]] [[package]]
name = "async-io" name = "async-io"
version = "1.13.0" version = "1.13.0"
@ -1676,7 +1665,6 @@ dependencies = [
"accounts-zbus", "accounts-zbus",
"anyhow", "anyhow",
"ashpd 0.12.0", "ashpd 0.12.0",
"async-fn-stream",
"bluez-zbus", "bluez-zbus",
"chrono", "chrono",
"clap", "clap",

View file

@ -90,7 +90,6 @@ rustix = { version = "1.1.3", features = ["process"] }
gettext-rs = { version = "0.7.7", features = [ gettext-rs = { version = "0.7.7", features = [
"gettext-system", "gettext-system",
], optional = true } ], optional = true }
async-fn-stream = "0.3"
num-traits = "0.2" num-traits = "0.2"
pwhash = "1" pwhash = "1"
# Pinned to fix ashpd. Remove this when fixed. # Pinned to fix ashpd. Remove this when fixed.

View file

@ -15,6 +15,7 @@ use cosmic_settings_page::{
self as page, Entity, self as page, Entity,
section::{self, Section}, section::{self, Section},
}; };
use futures::SinkExt;
use slotmap::SlotMap; use slotmap::SlotMap;
use tracing::error; use tracing::error;
@ -128,18 +129,19 @@ impl page::Page<crate::pages::Message> for Page {
Ok((tx, mut rx)) => { Ok((tx, mut rx)) => {
self.wayland_thread = Some(tx); self.wayland_thread = Some(tx);
return cosmic::Task::stream(async_fn_stream::fn_stream( return cosmic::Task::stream(cosmic::iced_futures::stream::channel(
|emitter| async move { 1,
|mut sender| async move {
while let Some(event) = rx.recv().await { while let Some(event) = rx.recv().await {
let _ = emitter let _ = sender
.emit(crate::pages::Message::AccessibilityMagnifier( .send(crate::pages::Message::AccessibilityMagnifier(
Message::Event(event), Message::Event(event),
)) ))
.await; .await;
} }
let _ = emitter let _ = sender
.emit(crate::pages::Message::AccessibilityMagnifier( .send(crate::pages::Message::AccessibilityMagnifier(
Message::ProtocolUnavailable, Message::ProtocolUnavailable,
)) ))
.await; .await;

View file

@ -15,6 +15,7 @@ use cosmic_settings_page::{
self as page, Insert, self as page, Insert,
section::{self, Section}, section::{self, Section},
}; };
use futures::SinkExt;
use num_traits::FromPrimitive; use num_traits::FromPrimitive;
use slotmap::SlotMap; use slotmap::SlotMap;
@ -126,18 +127,19 @@ impl page::Page<crate::pages::Message> for Page {
Ok((tx, mut rx)) => { Ok((tx, mut rx)) => {
self.wayland_thread = Some(tx); self.wayland_thread = Some(tx);
return cosmic::Task::stream(async_fn_stream::fn_stream( return cosmic::Task::stream(cosmic::iced_futures::stream::channel(
|emitter| async move { 1,
|mut sender| async move {
while let Some(event) = rx.recv().await { while let Some(event) = rx.recv().await {
let _ = emitter let _ = sender
.emit(crate::pages::Message::Accessibility(Message::Event( .send(crate::pages::Message::Accessibility(Message::Event(
event, event,
))) )))
.await; .await;
} }
let _ = emitter let _ = sender
.emit(crate::pages::Message::Accessibility( .send(crate::pages::Message::Accessibility(
Message::ProtocolUnavailable, Message::ProtocolUnavailable,
)) ))
.await; .await;

View file

@ -20,6 +20,7 @@ use cosmic_comp_config::{EavesdroppingKeyboardMode, XwaylandDescaling, XwaylandE
use cosmic_randr_shell::List; use cosmic_randr_shell::List;
use cosmic_settings_page::Section; use cosmic_settings_page::Section;
use cosmic_settings_page::{self as page, section}; use cosmic_settings_page::{self as page, section};
use futures::SinkExt;
use slab::Slab; use slab::Slab;
use slotmap::SlotMap; use slotmap::SlotMap;
use tokio::sync::oneshot; use tokio::sync::oneshot;
@ -135,17 +136,19 @@ impl page::Page<crate::pages::Message> for Page {
}); });
// Forward messages from another thread to prevent the monitoring thread from blocking. // Forward messages from another thread to prevent the monitoring thread from blocking.
let (randr_task, randr_handle) = let (randr_task, randr_handle) = Task::stream(cosmic::iced_futures::stream::channel(
Task::stream(async_fn_stream::fn_stream(|emitter| async move { 1,
|mut sender| async move {
while let Some(message) = rx.recv().await { while let Some(message) = rx.recv().await {
if let cosmic_randr::Message::ManagerDone = message if let cosmic_randr::Message::ManagerDone = message
&& !refresh_pending.swap(true, Ordering::SeqCst) && !refresh_pending.swap(true, Ordering::SeqCst)
{ {
_ = emitter.emit(on_enter().await).await; _ = sender.send(on_enter().await).await;
} }
} }
})) },
.abortable(); ))
.abortable();
tasks.push(randr_task); tasks.push(randr_task);
self.randr_handle = Some((canceller, randr_handle)); self.randr_handle = Some((canceller, randr_handle));

View file

@ -7,8 +7,8 @@ use cosmic::widget::{self, settings, text};
use cosmic::{Apply, Element, Task, theme}; use cosmic::{Apply, Element, Task, theme};
use cosmic_settings_bluetooth_subscription::*; use cosmic_settings_bluetooth_subscription::*;
use cosmic_settings_page::{self as page, Section, section}; use cosmic_settings_page::{self as page, Section, section};
use futures::StreamExt;
use futures::channel::oneshot; use futures::channel::oneshot;
use futures::{SinkExt, StreamExt};
use slab::Slab; use slab::Slab;
use slotmap::SlotMap; use slotmap::SlotMap;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@ -562,33 +562,34 @@ impl Page {
if self.subscription.is_none() { if self.subscription.is_none() {
let connection = connection.clone(); let connection = connection.clone();
let (cancellation, task) = crate::utils::forward_event_loop(move |emitter| { let (cancellation, task) =
let connection = connection.clone(); crate::utils::forward_event_loop(move |mut sender| {
let connection = connection.clone();
async move { async move {
let (tx, mut rx) = futures::channel::mpsc::channel(1); let (tx, mut rx) = futures::channel::mpsc::channel(1);
let watchers = std::pin::pin!(async move { let watchers = std::pin::pin!(async move {
_ = futures::future::join( _ = futures::future::join(
subscription::watch(connection.clone(), tx.clone()), subscription::watch(connection.clone(), tx.clone()),
agent::watch(connection, tx), agent::watch(connection, tx),
) )
.await; .await;
}); });
let forwarder = std::pin::pin!(async move { let forwarder = std::pin::pin!(async move {
while let Some(message) = rx.next().await { while let Some(message) = rx.next().await {
_ = emitter _ = sender
.emit(crate::pages::Message::Bluetooth( .send(crate::pages::Message::Bluetooth(
Message::BluetoothEvent(message), Message::BluetoothEvent(message),
)) ))
.await; .await;
} }
}); });
futures::future::select(watchers, forwarder).await; futures::future::select(watchers, forwarder).await;
} }
}); });
self.subscription = Some(cancellation); self.subscription = Some(cancellation);
return cosmic::task::batch(vec![cosmic::task::future(get_adapters_fut), task]); return cosmic::task::batch(vec![cosmic::task::future(get_adapters_fut), task]);

View file

@ -791,6 +791,7 @@ pub fn window_management() -> Section<crate::pages::Message> {
.add(settings::item::builder(&descriptions[active_hint]).control( .add(settings::item::builder(&descriptions[active_hint]).control(
widget::spin_button( widget::spin_button(
page.theme_manager.builder().active_hint.to_string(), page.theme_manager.builder().active_hint.to_string(),
"active hint",
page.theme_manager.builder().active_hint, page.theme_manager.builder().active_hint,
1, 1,
0, 0,
@ -801,6 +802,7 @@ pub fn window_management() -> Section<crate::pages::Message> {
.add( .add(
settings::item::builder(&descriptions[gaps]).control(widget::spin_button( settings::item::builder(&descriptions[gaps]).control(widget::spin_button(
page.theme_manager.builder().gaps.1.to_string(), page.theme_manager.builder().gaps.1.to_string(),
"gaps",
page.theme_manager.builder().gaps.1, page.theme_manager.builder().gaps.1,
1, 1,
page.theme_manager.builder().active_hint, page.theme_manager.builder().active_hint,

View file

@ -16,6 +16,7 @@ use cosmic_randr_shell::{
AdaptiveSyncAvailability, AdaptiveSyncState, List, Output, OutputKey, Transform, AdaptiveSyncAvailability, AdaptiveSyncState, List, Output, OutputKey, Transform,
}; };
use cosmic_settings_page::{self as page, Section, section}; use cosmic_settings_page::{self as page, Section, section};
use futures::SinkExt;
use indexmap::Equivalent; use indexmap::Equivalent;
use slab::Slab; use slab::Slab;
use slotmap::{Key, SecondaryMap, SlotMap}; use slotmap::{Key, SecondaryMap, SlotMap};
@ -285,17 +286,19 @@ impl page::Page<crate::pages::Message> for Page {
}); });
// Forward messages from another thread to prevent the monitoring thread from blocking. // Forward messages from another thread to prevent the monitoring thread from blocking.
let (randr_task, randr_handle) = let (randr_task, randr_handle) = Task::stream(cosmic::iced_futures::stream::channel(
Task::stream(async_fn_stream::fn_stream(|emitter| async move { 1,
|mut emitter| async move {
while let Some(message) = rx.recv().await { while let Some(message) = rx.recv().await {
if let cosmic_randr::Message::ManagerDone = message if let cosmic_randr::Message::ManagerDone = message
&& !refreshing_page.swap(true, Ordering::SeqCst) && !refreshing_page.swap(true, Ordering::SeqCst)
{ {
_ = emitter.emit(on_enter().await).await; _ = emitter.send(on_enter().await).await;
} }
} }
})) },
.abortable(); ))
.abortable();
tasks.push(randr_task); tasks.push(randr_task);
self.randr_handle = Some((canceller, randr_handle)); self.randr_handle = Some((canceller, randr_handle));
@ -355,13 +358,15 @@ impl page::Page<crate::pages::Message> for Page {
}); });
// Forward messages from the DRM hotplug thread. // Forward messages from the DRM hotplug thread.
let (hotplug_task, hotplug_handle) = let (hotplug_task, hotplug_handle) = Task::stream(cosmic::iced_futures::stream::channel(
Task::stream(async_fn_stream::fn_stream(|emitter| async move { 1,
|mut emitter| async move {
while let Some(message) = rx.recv().await { while let Some(message) = rx.recv().await {
_ = emitter.emit(message).await; _ = emitter.send(message).await;
} }
})) },
.abortable(); ))
.abortable();
tasks.push(hotplug_task); tasks.push(hotplug_task);
self.hotplug_handle = Some((hotplug_cancel_tx, hotplug_handle)); self.hotplug_handle = Some((hotplug_cancel_tx, hotplug_handle));
@ -1310,6 +1315,7 @@ pub fn display_configuration() -> Section<crate::pages::Message> {
&descriptions[additional_scale_options], &descriptions[additional_scale_options],
widget::spin_button( widget::spin_button(
format!("{}%", page.adjusted_scale), format!("{}%", page.adjusted_scale),
"additional display scale",
page.adjusted_scale, page.adjusted_scale,
5, 5,
0, 0,

View file

@ -15,7 +15,7 @@ use cosmic_dbus_networkmanager::{
}; };
use cosmic_settings_network_manager_subscription as network_manager; use cosmic_settings_network_manager_subscription as network_manager;
use cosmic_settings_page::{self as page, Section, section}; use cosmic_settings_page::{self as page, Section, section};
use futures::StreamExt; use futures::{SinkExt, StreamExt};
use secure_string::SecureString; use secure_string::SecureString;
use slotmap::SlotMap; use slotmap::SlotMap;
@ -312,37 +312,38 @@ impl Page {
fn connect(&mut self, conn: zbus::Connection) -> Task<crate::app::Message> { fn connect(&mut self, conn: zbus::Connection) -> Task<crate::app::Message> {
if self.nm_task.is_none() { if self.nm_task.is_none() {
let (canceller, task) = crate::utils::forward_event_loop(move |emitter| async move { let (canceller, task) =
let network_manager = match NetworkManager::new(&conn).await { crate::utils::forward_event_loop(move |mut sender| async move {
Ok(n) => n, let network_manager = match NetworkManager::new(&conn).await {
Err(why) => { Ok(n) => n,
tracing::error!( Err(why) => {
why = why.to_string(), tracing::error!(
"failed to connect to network_manager" why = why.to_string(),
); "failed to connect to network_manager"
);
return futures::future::pending().await; return futures::future::pending().await;
}
};
let mut devices_changed =
std::pin::pin!(network_manager.receive_devices_changed().await.then(
|_| async {
match network_manager::devices::list(&conn, |_| true).await {
Ok(devices) => Message::UpdateDevices(
devices.into_iter().map(Arc::new).collect(),
),
Err(why) => Message::Error(why.to_string()),
}
} }
)); };
while let Some(message) = devices_changed.next().await { let mut devices_changed =
_ = emitter std::pin::pin!(network_manager.receive_devices_changed().await.then(
.emit(crate::pages::Message::Networking(message)) |_| async {
.await; match network_manager::devices::list(&conn, |_| true).await {
} Ok(devices) => Message::UpdateDevices(
}); devices.into_iter().map(Arc::new).collect(),
),
Err(why) => Message::Error(why.to_string()),
}
}
));
while let Some(message) = devices_changed.next().await {
_ = sender
.send(crate::pages::Message::Networking(message))
.await;
}
});
self.nm_task = Some(canceller); self.nm_task = Some(canceller);
return task.map(crate::app::Message::from); return task.map(crate::app::Message::from);

View file

@ -21,7 +21,7 @@ use cosmic_settings_network_manager_subscription::{
self as network_manager, NetworkManagerState, UUID, current_networks::ActiveConnectionInfo, self as network_manager, NetworkManagerState, UUID, current_networks::ActiveConnectionInfo,
}; };
use cosmic_settings_page::{self as page, Section, section}; use cosmic_settings_page::{self as page, Section, section};
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, SinkExt, StreamExt};
use indexmap::IndexMap; use indexmap::IndexMap;
use secure_string::SecureString; use secure_string::SecureString;
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -900,28 +900,29 @@ impl Page {
fn connect(&mut self, conn: zbus::Connection) -> Task<crate::app::Message> { fn connect(&mut self, conn: zbus::Connection) -> Task<crate::app::Message> {
if self.nm_task.is_none() { if self.nm_task.is_none() {
let (canceller, task) = crate::utils::forward_event_loop(move |emitter| async move { let (canceller, task) =
let (tx, mut rx) = futures::channel::mpsc::channel(1); crate::utils::forward_event_loop(move |mut sender| async move {
let (tx, mut rx) = futures::channel::mpsc::channel(1);
let watchers = std::pin::pin!(async move { let watchers = std::pin::pin!(async move {
futures::join!( futures::join!(
network_manager::watch(conn.clone(), tx.clone()), network_manager::watch(conn.clone(), tx.clone()),
network_manager::active_conns::watch(conn.clone(), tx.clone()), network_manager::active_conns::watch(conn.clone(), tx.clone()),
network_manager::devices::watch(conn, true, tx) network_manager::devices::watch(conn, true, tx)
) )
});
let forwarder = std::pin::pin!(async move {
while let Some(message) = rx.next().await {
_ = sender
.send(crate::pages::Message::Vpn(Message::NetworkManager(message)))
.await;
}
});
futures::future::select(watchers, forwarder).await;
}); });
let forwarder = std::pin::pin!(async move {
while let Some(message) = rx.next().await {
_ = emitter
.emit(crate::pages::Message::Vpn(Message::NetworkManager(message)))
.await;
}
});
futures::future::select(watchers, forwarder).await;
});
self.nm_task = Some(canceller); self.nm_task = Some(canceller);
return task.map(crate::app::Message::from); return task.map(crate::app::Message::from);

View file

@ -24,7 +24,7 @@ use cosmic_settings_network_manager_subscription::{
nm_secret_agent, nm_secret_agent,
}; };
use cosmic_settings_page::{self as page, Section, section}; use cosmic_settings_page::{self as page, Section, section};
use futures::StreamExt; use futures::{SinkExt, StreamExt};
use secure_string::SecureString; use secure_string::SecureString;
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -776,31 +776,32 @@ impl Page {
fn connect(&mut self, conn: zbus::Connection) -> Task<crate::app::Message> { fn connect(&mut self, conn: zbus::Connection) -> Task<crate::app::Message> {
if self.nm_task.is_none() { if self.nm_task.is_none() {
let (canceller, task) = crate::utils::forward_event_loop(move |emitter| async move { let (canceller, task) =
let (tx, mut rx) = futures::channel::mpsc::channel(1); crate::utils::forward_event_loop(move |mut sender| async move {
let (tx, mut rx) = futures::channel::mpsc::channel(1);
let watchers = std::pin::pin!(async move { let watchers = std::pin::pin!(async move {
futures::join!( futures::join!(
network_manager::watch(conn.clone(), tx.clone()), network_manager::watch(conn.clone(), tx.clone()),
network_manager::active_conns::watch(conn.clone(), tx.clone()), network_manager::active_conns::watch(conn.clone(), tx.clone()),
network_manager::wireless_enabled::watch(conn.clone(), tx.clone()), network_manager::wireless_enabled::watch(conn.clone(), tx.clone()),
network_manager::watch_connections_changed(conn, tx) network_manager::watch_connections_changed(conn, tx)
); );
});
let forwarder = std::pin::pin!(async move {
while let Some(message) = rx.next().await {
_ = sender
.send(crate::pages::Message::WiFi(Message::NetworkManager(
message,
)))
.await;
}
});
futures::future::select(watchers, forwarder).await;
}); });
let forwarder = std::pin::pin!(async move {
while let Some(message) = rx.next().await {
_ = emitter
.emit(crate::pages::Message::WiFi(Message::NetworkManager(
message,
)))
.await;
}
});
futures::future::select(watchers, forwarder).await;
});
self.nm_task = Some(canceller); self.nm_task = Some(canceller);
return task.map(crate::app::Message::from); return task.map(crate::app::Message::from);
} }

View file

@ -15,7 +15,7 @@ use cosmic_settings_network_manager_subscription::{
self as network_manager, NetworkManagerState, current_networks::ActiveConnectionInfo, self as network_manager, NetworkManagerState, current_networks::ActiveConnectionInfo,
}; };
use cosmic_settings_page::{self as page, Section, section}; use cosmic_settings_page::{self as page, Section, section};
use futures::StreamExt; use futures::{SinkExt, StreamExt};
pub type ConnectionId = Arc<str>; pub type ConnectionId = Arc<str>;
@ -359,30 +359,31 @@ impl Page {
fn connect(&mut self, conn: zbus::Connection) -> Task<crate::app::Message> { fn connect(&mut self, conn: zbus::Connection) -> Task<crate::app::Message> {
if self.nm_task.is_none() { if self.nm_task.is_none() {
let (canceller, task) = crate::utils::forward_event_loop(move |emitter| async move { let (canceller, task) =
let (tx, mut rx) = futures::channel::mpsc::channel(1); crate::utils::forward_event_loop(move |mut sender| async move {
let (tx, mut rx) = futures::channel::mpsc::channel(1);
let watchers = std::pin::pin!(async move { let watchers = std::pin::pin!(async move {
futures::join!( futures::join!(
network_manager::watch(conn.clone(), tx.clone()), network_manager::watch(conn.clone(), tx.clone()),
network_manager::active_conns::watch(conn.clone(), tx.clone()), network_manager::active_conns::watch(conn.clone(), tx.clone()),
network_manager::devices::watch(conn, true, tx) network_manager::devices::watch(conn, true, tx)
) )
});
let forwarder = std::pin::pin!(async move {
while let Some(message) = rx.next().await {
_ = sender
.send(crate::pages::Message::Wired(Message::NetworkManager(
message,
)))
.await;
}
});
futures::future::select(watchers, forwarder).await;
}); });
let forwarder = std::pin::pin!(async move {
while let Some(message) = rx.next().await {
_ = emitter
.emit(crate::pages::Message::Wired(Message::NetworkManager(
message,
)))
.await;
}
});
futures::future::select(watchers, forwarder).await;
});
self.nm_task = Some(canceller); self.nm_task = Some(canceller);
return task.map(crate::app::Message::from); return task.map(crate::app::Message::from);
} }

View file

@ -21,17 +21,20 @@ pub fn display_name(name: &str, physical: (u32, u32)) -> String {
/// Spawn a background tasks and forward its messages /// Spawn a background tasks and forward its messages
pub fn forward_event_loop<M: 'static + Send, T: Future<Output = ()> + Send + 'static>( pub fn forward_event_loop<M: 'static + Send, T: Future<Output = ()> + Send + 'static>(
event_loop: impl FnOnce(async_fn_stream::StreamEmitter<M>) -> T + Send + 'static, event_loop: impl FnOnce(futures::channel::mpsc::Sender<M>) -> T + Send + 'static,
) -> (tokio::sync::oneshot::Sender<()>, cosmic::Task<M>) { ) -> (tokio::sync::oneshot::Sender<()>, cosmic::Task<M>) {
let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>(); let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
let task = cosmic::Task::stream(async_fn_stream::fn_stream(|emitter| async move { let task = cosmic::Task::stream(cosmic::iced_futures::stream::channel(
select( 1,
std::pin::pin!(cancel_rx), |emitter| async move {
std::pin::pin!(event_loop(emitter)), select(
) std::pin::pin!(cancel_rx),
.await; std::pin::pin!(event_loop(emitter)),
})); )
.await;
},
));
(cancel_tx, task) (cancel_tx, task)
} }