From d2d0b51da6ab2e9b35c3e6a3beb5c63a1062d60e Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Sat, 10 Jan 2026 03:48:49 +0100 Subject: [PATCH] chore: drop `async-fn-stream` for `cosmic::iced_futures::stream::channel` --- Cargo.lock | 12 ---- cosmic-settings/Cargo.toml | 1 - .../src/pages/accessibility/magnifier.rs | 14 +++-- .../src/pages/accessibility/mod.rs | 14 +++-- .../pages/applications/legacy_applications.rs | 13 ++-- cosmic-settings/src/pages/bluetooth/mod.rs | 49 +++++++-------- .../src/pages/desktop/appearance/mod.rs | 2 + cosmic-settings/src/pages/display/mod.rs | 26 ++++---- cosmic-settings/src/pages/networking/mod.rs | 59 ++++++++++--------- .../src/pages/networking/vpn/mod.rs | 41 ++++++------- cosmic-settings/src/pages/networking/wifi.rs | 47 +++++++-------- cosmic-settings/src/pages/networking/wired.rs | 45 +++++++------- cosmic-settings/src/utils.rs | 19 +++--- 13 files changed, 176 insertions(+), 166 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9cb3fd9..fcb4d5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -467,17 +467,6 @@ dependencies = [ "slab", ] -[[package]] -name = "async-fn-stream" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4ba0c4baf81a0d8ab31618ffa3ae29ceeb970a6d0d82f76130753462e39d0ea" -dependencies = [ - "futures-util", - "pin-project-lite", - "smallvec", -] - [[package]] name = "async-io" version = "1.13.0" @@ -1676,7 +1665,6 @@ dependencies = [ "accounts-zbus", "anyhow", "ashpd 0.12.0", - "async-fn-stream", "bluez-zbus", "chrono", "clap", diff --git a/cosmic-settings/Cargo.toml b/cosmic-settings/Cargo.toml index 51583ad..2e28c30 100644 --- a/cosmic-settings/Cargo.toml +++ b/cosmic-settings/Cargo.toml @@ -90,7 +90,6 @@ rustix = { version = "1.1.3", features = ["process"] } gettext-rs = { version = "0.7.7", features = [ "gettext-system", ], optional = true } -async-fn-stream = "0.3" num-traits = "0.2" pwhash = "1" # Pinned to fix ashpd. Remove this when fixed. diff --git a/cosmic-settings/src/pages/accessibility/magnifier.rs b/cosmic-settings/src/pages/accessibility/magnifier.rs index 2ba99a0..92606ba 100644 --- a/cosmic-settings/src/pages/accessibility/magnifier.rs +++ b/cosmic-settings/src/pages/accessibility/magnifier.rs @@ -15,6 +15,7 @@ use cosmic_settings_page::{ self as page, Entity, section::{self, Section}, }; +use futures::SinkExt; use slotmap::SlotMap; use tracing::error; @@ -128,18 +129,19 @@ impl page::Page for Page { Ok((tx, mut rx)) => { self.wayland_thread = Some(tx); - return cosmic::Task::stream(async_fn_stream::fn_stream( - |emitter| async move { + return cosmic::Task::stream(cosmic::iced_futures::stream::channel( + 1, + |mut sender| async move { while let Some(event) = rx.recv().await { - let _ = emitter - .emit(crate::pages::Message::AccessibilityMagnifier( + let _ = sender + .send(crate::pages::Message::AccessibilityMagnifier( Message::Event(event), )) .await; } - let _ = emitter - .emit(crate::pages::Message::AccessibilityMagnifier( + let _ = sender + .send(crate::pages::Message::AccessibilityMagnifier( Message::ProtocolUnavailable, )) .await; diff --git a/cosmic-settings/src/pages/accessibility/mod.rs b/cosmic-settings/src/pages/accessibility/mod.rs index fbaf3d0..cad369c 100644 --- a/cosmic-settings/src/pages/accessibility/mod.rs +++ b/cosmic-settings/src/pages/accessibility/mod.rs @@ -15,6 +15,7 @@ use cosmic_settings_page::{ self as page, Insert, section::{self, Section}, }; +use futures::SinkExt; use num_traits::FromPrimitive; use slotmap::SlotMap; @@ -126,18 +127,19 @@ impl page::Page for Page { Ok((tx, mut rx)) => { self.wayland_thread = Some(tx); - return cosmic::Task::stream(async_fn_stream::fn_stream( - |emitter| async move { + return cosmic::Task::stream(cosmic::iced_futures::stream::channel( + 1, + |mut sender| async move { while let Some(event) = rx.recv().await { - let _ = emitter - .emit(crate::pages::Message::Accessibility(Message::Event( + let _ = sender + .send(crate::pages::Message::Accessibility(Message::Event( event, ))) .await; } - let _ = emitter - .emit(crate::pages::Message::Accessibility( + let _ = sender + .send(crate::pages::Message::Accessibility( Message::ProtocolUnavailable, )) .await; diff --git a/cosmic-settings/src/pages/applications/legacy_applications.rs b/cosmic-settings/src/pages/applications/legacy_applications.rs index 9cab593..4a94867 100644 --- a/cosmic-settings/src/pages/applications/legacy_applications.rs +++ b/cosmic-settings/src/pages/applications/legacy_applications.rs @@ -20,6 +20,7 @@ use cosmic_comp_config::{EavesdroppingKeyboardMode, XwaylandDescaling, XwaylandE use cosmic_randr_shell::List; use cosmic_settings_page::Section; use cosmic_settings_page::{self as page, section}; +use futures::SinkExt; use slab::Slab; use slotmap::SlotMap; use tokio::sync::oneshot; @@ -135,17 +136,19 @@ impl page::Page for Page { }); // Forward messages from another thread to prevent the monitoring thread from blocking. - let (randr_task, randr_handle) = - Task::stream(async_fn_stream::fn_stream(|emitter| async move { + let (randr_task, randr_handle) = Task::stream(cosmic::iced_futures::stream::channel( + 1, + |mut sender| async move { while let Some(message) = rx.recv().await { if let cosmic_randr::Message::ManagerDone = message && !refresh_pending.swap(true, Ordering::SeqCst) { - _ = emitter.emit(on_enter().await).await; + _ = sender.send(on_enter().await).await; } } - })) - .abortable(); + }, + )) + .abortable(); tasks.push(randr_task); self.randr_handle = Some((canceller, randr_handle)); diff --git a/cosmic-settings/src/pages/bluetooth/mod.rs b/cosmic-settings/src/pages/bluetooth/mod.rs index 7b598dd..4e22e42 100644 --- a/cosmic-settings/src/pages/bluetooth/mod.rs +++ b/cosmic-settings/src/pages/bluetooth/mod.rs @@ -7,8 +7,8 @@ use cosmic::widget::{self, settings, text}; use cosmic::{Apply, Element, Task, theme}; use cosmic_settings_bluetooth_subscription::*; use cosmic_settings_page::{self as page, Section, section}; -use futures::StreamExt; use futures::channel::oneshot; +use futures::{SinkExt, StreamExt}; use slab::Slab; use slotmap::SlotMap; use std::collections::{HashMap, HashSet}; @@ -562,33 +562,34 @@ impl Page { if self.subscription.is_none() { let connection = connection.clone(); - let (cancellation, task) = crate::utils::forward_event_loop(move |emitter| { - let connection = connection.clone(); + let (cancellation, task) = + crate::utils::forward_event_loop(move |mut sender| { + let connection = connection.clone(); - async move { - let (tx, mut rx) = futures::channel::mpsc::channel(1); + async move { + let (tx, mut rx) = futures::channel::mpsc::channel(1); - let watchers = std::pin::pin!(async move { - _ = futures::future::join( - subscription::watch(connection.clone(), tx.clone()), - agent::watch(connection, tx), - ) - .await; - }); + let watchers = std::pin::pin!(async move { + _ = futures::future::join( + subscription::watch(connection.clone(), tx.clone()), + agent::watch(connection, tx), + ) + .await; + }); - let forwarder = std::pin::pin!(async move { - while let Some(message) = rx.next().await { - _ = emitter - .emit(crate::pages::Message::Bluetooth( - Message::BluetoothEvent(message), - )) - .await; - } - }); + let forwarder = std::pin::pin!(async move { + while let Some(message) = rx.next().await { + _ = sender + .send(crate::pages::Message::Bluetooth( + Message::BluetoothEvent(message), + )) + .await; + } + }); - futures::future::select(watchers, forwarder).await; - } - }); + futures::future::select(watchers, forwarder).await; + } + }); self.subscription = Some(cancellation); return cosmic::task::batch(vec![cosmic::task::future(get_adapters_fut), task]); diff --git a/cosmic-settings/src/pages/desktop/appearance/mod.rs b/cosmic-settings/src/pages/desktop/appearance/mod.rs index 23577e7..263c9a2 100644 --- a/cosmic-settings/src/pages/desktop/appearance/mod.rs +++ b/cosmic-settings/src/pages/desktop/appearance/mod.rs @@ -791,6 +791,7 @@ pub fn window_management() -> Section { .add(settings::item::builder(&descriptions[active_hint]).control( widget::spin_button( page.theme_manager.builder().active_hint.to_string(), + "active hint", page.theme_manager.builder().active_hint, 1, 0, @@ -801,6 +802,7 @@ pub fn window_management() -> Section { .add( settings::item::builder(&descriptions[gaps]).control(widget::spin_button( page.theme_manager.builder().gaps.1.to_string(), + "gaps", page.theme_manager.builder().gaps.1, 1, page.theme_manager.builder().active_hint, diff --git a/cosmic-settings/src/pages/display/mod.rs b/cosmic-settings/src/pages/display/mod.rs index 7097646..7de2ef4 100644 --- a/cosmic-settings/src/pages/display/mod.rs +++ b/cosmic-settings/src/pages/display/mod.rs @@ -16,6 +16,7 @@ use cosmic_randr_shell::{ AdaptiveSyncAvailability, AdaptiveSyncState, List, Output, OutputKey, Transform, }; use cosmic_settings_page::{self as page, Section, section}; +use futures::SinkExt; use indexmap::Equivalent; use slab::Slab; use slotmap::{Key, SecondaryMap, SlotMap}; @@ -285,17 +286,19 @@ impl page::Page for Page { }); // Forward messages from another thread to prevent the monitoring thread from blocking. - let (randr_task, randr_handle) = - Task::stream(async_fn_stream::fn_stream(|emitter| async move { + let (randr_task, randr_handle) = Task::stream(cosmic::iced_futures::stream::channel( + 1, + |mut emitter| async move { while let Some(message) = rx.recv().await { if let cosmic_randr::Message::ManagerDone = message && !refreshing_page.swap(true, Ordering::SeqCst) { - _ = emitter.emit(on_enter().await).await; + _ = emitter.send(on_enter().await).await; } } - })) - .abortable(); + }, + )) + .abortable(); tasks.push(randr_task); self.randr_handle = Some((canceller, randr_handle)); @@ -355,13 +358,15 @@ impl page::Page for Page { }); // Forward messages from the DRM hotplug thread. - let (hotplug_task, hotplug_handle) = - Task::stream(async_fn_stream::fn_stream(|emitter| async move { + let (hotplug_task, hotplug_handle) = Task::stream(cosmic::iced_futures::stream::channel( + 1, + |mut emitter| async move { while let Some(message) = rx.recv().await { - _ = emitter.emit(message).await; + _ = emitter.send(message).await; } - })) - .abortable(); + }, + )) + .abortable(); tasks.push(hotplug_task); self.hotplug_handle = Some((hotplug_cancel_tx, hotplug_handle)); @@ -1310,6 +1315,7 @@ pub fn display_configuration() -> Section { &descriptions[additional_scale_options], widget::spin_button( format!("{}%", page.adjusted_scale), + "additional display scale", page.adjusted_scale, 5, 0, diff --git a/cosmic-settings/src/pages/networking/mod.rs b/cosmic-settings/src/pages/networking/mod.rs index 1692547..5b5cfdf 100644 --- a/cosmic-settings/src/pages/networking/mod.rs +++ b/cosmic-settings/src/pages/networking/mod.rs @@ -15,7 +15,7 @@ use cosmic_dbus_networkmanager::{ }; use cosmic_settings_network_manager_subscription as network_manager; use cosmic_settings_page::{self as page, Section, section}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use secure_string::SecureString; use slotmap::SlotMap; @@ -312,37 +312,38 @@ impl Page { fn connect(&mut self, conn: zbus::Connection) -> Task { if self.nm_task.is_none() { - let (canceller, task) = crate::utils::forward_event_loop(move |emitter| async move { - let network_manager = match NetworkManager::new(&conn).await { - Ok(n) => n, - Err(why) => { - tracing::error!( - why = why.to_string(), - "failed to connect to network_manager" - ); + let (canceller, task) = + crate::utils::forward_event_loop(move |mut sender| async move { + let network_manager = match NetworkManager::new(&conn).await { + Ok(n) => n, + Err(why) => { + tracing::error!( + why = why.to_string(), + "failed to connect to network_manager" + ); - return futures::future::pending().await; - } - }; - - let mut devices_changed = - std::pin::pin!(network_manager.receive_devices_changed().await.then( - |_| async { - match network_manager::devices::list(&conn, |_| true).await { - Ok(devices) => Message::UpdateDevices( - devices.into_iter().map(Arc::new).collect(), - ), - Err(why) => Message::Error(why.to_string()), - } + return futures::future::pending().await; } - )); + }; - while let Some(message) = devices_changed.next().await { - _ = emitter - .emit(crate::pages::Message::Networking(message)) - .await; - } - }); + let mut devices_changed = + std::pin::pin!(network_manager.receive_devices_changed().await.then( + |_| async { + match network_manager::devices::list(&conn, |_| true).await { + Ok(devices) => Message::UpdateDevices( + devices.into_iter().map(Arc::new).collect(), + ), + Err(why) => Message::Error(why.to_string()), + } + } + )); + + while let Some(message) = devices_changed.next().await { + _ = sender + .send(crate::pages::Message::Networking(message)) + .await; + } + }); self.nm_task = Some(canceller); return task.map(crate::app::Message::from); diff --git a/cosmic-settings/src/pages/networking/vpn/mod.rs b/cosmic-settings/src/pages/networking/vpn/mod.rs index 0c4570b..75f446a 100644 --- a/cosmic-settings/src/pages/networking/vpn/mod.rs +++ b/cosmic-settings/src/pages/networking/vpn/mod.rs @@ -21,7 +21,7 @@ use cosmic_settings_network_manager_subscription::{ self as network_manager, NetworkManagerState, UUID, current_networks::ActiveConnectionInfo, }; use cosmic_settings_page::{self as page, Section, section}; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, SinkExt, StreamExt}; use indexmap::IndexMap; use secure_string::SecureString; use tokio::sync::Mutex; @@ -900,28 +900,29 @@ impl Page { fn connect(&mut self, conn: zbus::Connection) -> Task { if self.nm_task.is_none() { - let (canceller, task) = crate::utils::forward_event_loop(move |emitter| async move { - let (tx, mut rx) = futures::channel::mpsc::channel(1); + let (canceller, task) = + crate::utils::forward_event_loop(move |mut sender| async move { + let (tx, mut rx) = futures::channel::mpsc::channel(1); - let watchers = std::pin::pin!(async move { - futures::join!( - network_manager::watch(conn.clone(), tx.clone()), - network_manager::active_conns::watch(conn.clone(), tx.clone()), - network_manager::devices::watch(conn, true, tx) - ) + let watchers = std::pin::pin!(async move { + futures::join!( + network_manager::watch(conn.clone(), tx.clone()), + network_manager::active_conns::watch(conn.clone(), tx.clone()), + network_manager::devices::watch(conn, true, tx) + ) + }); + + let forwarder = std::pin::pin!(async move { + while let Some(message) = rx.next().await { + _ = sender + .send(crate::pages::Message::Vpn(Message::NetworkManager(message))) + .await; + } + }); + + futures::future::select(watchers, forwarder).await; }); - let forwarder = std::pin::pin!(async move { - while let Some(message) = rx.next().await { - _ = emitter - .emit(crate::pages::Message::Vpn(Message::NetworkManager(message))) - .await; - } - }); - - futures::future::select(watchers, forwarder).await; - }); - self.nm_task = Some(canceller); return task.map(crate::app::Message::from); diff --git a/cosmic-settings/src/pages/networking/wifi.rs b/cosmic-settings/src/pages/networking/wifi.rs index f4b068f..7a8a09a 100644 --- a/cosmic-settings/src/pages/networking/wifi.rs +++ b/cosmic-settings/src/pages/networking/wifi.rs @@ -24,7 +24,7 @@ use cosmic_settings_network_manager_subscription::{ nm_secret_agent, }; use cosmic_settings_page::{self as page, Section, section}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use secure_string::SecureString; use tokio::sync::Mutex; @@ -776,31 +776,32 @@ impl Page { fn connect(&mut self, conn: zbus::Connection) -> Task { if self.nm_task.is_none() { - let (canceller, task) = crate::utils::forward_event_loop(move |emitter| async move { - let (tx, mut rx) = futures::channel::mpsc::channel(1); + let (canceller, task) = + crate::utils::forward_event_loop(move |mut sender| async move { + let (tx, mut rx) = futures::channel::mpsc::channel(1); - let watchers = std::pin::pin!(async move { - futures::join!( - network_manager::watch(conn.clone(), tx.clone()), - network_manager::active_conns::watch(conn.clone(), tx.clone()), - network_manager::wireless_enabled::watch(conn.clone(), tx.clone()), - network_manager::watch_connections_changed(conn, tx) - ); + let watchers = std::pin::pin!(async move { + futures::join!( + network_manager::watch(conn.clone(), tx.clone()), + network_manager::active_conns::watch(conn.clone(), tx.clone()), + network_manager::wireless_enabled::watch(conn.clone(), tx.clone()), + network_manager::watch_connections_changed(conn, tx) + ); + }); + + let forwarder = std::pin::pin!(async move { + while let Some(message) = rx.next().await { + _ = sender + .send(crate::pages::Message::WiFi(Message::NetworkManager( + message, + ))) + .await; + } + }); + + futures::future::select(watchers, forwarder).await; }); - let forwarder = std::pin::pin!(async move { - while let Some(message) = rx.next().await { - _ = emitter - .emit(crate::pages::Message::WiFi(Message::NetworkManager( - message, - ))) - .await; - } - }); - - futures::future::select(watchers, forwarder).await; - }); - self.nm_task = Some(canceller); return task.map(crate::app::Message::from); } diff --git a/cosmic-settings/src/pages/networking/wired.rs b/cosmic-settings/src/pages/networking/wired.rs index eb29cd3..b183a97 100644 --- a/cosmic-settings/src/pages/networking/wired.rs +++ b/cosmic-settings/src/pages/networking/wired.rs @@ -15,7 +15,7 @@ use cosmic_settings_network_manager_subscription::{ self as network_manager, NetworkManagerState, current_networks::ActiveConnectionInfo, }; use cosmic_settings_page::{self as page, Section, section}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; pub type ConnectionId = Arc; @@ -359,30 +359,31 @@ impl Page { fn connect(&mut self, conn: zbus::Connection) -> Task { if self.nm_task.is_none() { - let (canceller, task) = crate::utils::forward_event_loop(move |emitter| async move { - let (tx, mut rx) = futures::channel::mpsc::channel(1); + let (canceller, task) = + crate::utils::forward_event_loop(move |mut sender| async move { + let (tx, mut rx) = futures::channel::mpsc::channel(1); - let watchers = std::pin::pin!(async move { - futures::join!( - network_manager::watch(conn.clone(), tx.clone()), - network_manager::active_conns::watch(conn.clone(), tx.clone()), - network_manager::devices::watch(conn, true, tx) - ) + let watchers = std::pin::pin!(async move { + futures::join!( + network_manager::watch(conn.clone(), tx.clone()), + network_manager::active_conns::watch(conn.clone(), tx.clone()), + network_manager::devices::watch(conn, true, tx) + ) + }); + + let forwarder = std::pin::pin!(async move { + while let Some(message) = rx.next().await { + _ = sender + .send(crate::pages::Message::Wired(Message::NetworkManager( + message, + ))) + .await; + } + }); + + futures::future::select(watchers, forwarder).await; }); - let forwarder = std::pin::pin!(async move { - while let Some(message) = rx.next().await { - _ = emitter - .emit(crate::pages::Message::Wired(Message::NetworkManager( - message, - ))) - .await; - } - }); - - futures::future::select(watchers, forwarder).await; - }); - self.nm_task = Some(canceller); return task.map(crate::app::Message::from); } diff --git a/cosmic-settings/src/utils.rs b/cosmic-settings/src/utils.rs index a1935b6..1814c6e 100644 --- a/cosmic-settings/src/utils.rs +++ b/cosmic-settings/src/utils.rs @@ -21,17 +21,20 @@ pub fn display_name(name: &str, physical: (u32, u32)) -> String { /// Spawn a background tasks and forward its messages pub fn forward_event_loop + Send + 'static>( - event_loop: impl FnOnce(async_fn_stream::StreamEmitter) -> T + Send + 'static, + event_loop: impl FnOnce(futures::channel::mpsc::Sender) -> T + Send + 'static, ) -> (tokio::sync::oneshot::Sender<()>, cosmic::Task) { let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>(); - let task = cosmic::Task::stream(async_fn_stream::fn_stream(|emitter| async move { - select( - std::pin::pin!(cancel_rx), - std::pin::pin!(event_loop(emitter)), - ) - .await; - })); + let task = cosmic::Task::stream(cosmic::iced_futures::stream::channel( + 1, + |emitter| async move { + select( + std::pin::pin!(cancel_rx), + std::pin::pin!(event_loop(emitter)), + ) + .await; + }, + )); (cancel_tx, task) }