diff --git a/cosmic-applet-network/src/app.rs b/cosmic-applet-network/src/app.rs index a49de9a5..f3df9032 100644 --- a/cosmic-applet-network/src/app.rs +++ b/cosmic-applet-network/src/app.rs @@ -725,8 +725,7 @@ impl Application for CosmicNetworkApplet { } fn subscription(&self) -> Subscription { - let network_sub = - network_manager_subscription(0).map(|e| Message::NetworkManagerEvent(e.1)); + let network_sub = network_manager_subscription(0).map(|e| Message::NetworkManagerEvent(e)); let timeline = self .timeline .as_subscription() @@ -737,11 +736,9 @@ impl Application for CosmicNetworkApplet { self.applet_helper.theme_subscription(0).map(Message::Theme), timeline, network_sub, - active_conns_subscription(0, conn.clone()) - .map(|e| Message::NetworkManagerEvent(e.1)), - devices_subscription(0, conn.clone()).map(|e| Message::NetworkManagerEvent(e.1)), - wireless_enabled_subscription(0, conn.clone()) - .map(|e| Message::NetworkManagerEvent(e.1)), + active_conns_subscription(0, conn.clone()).map(Message::NetworkManagerEvent), + devices_subscription(0, conn.clone()).map(Message::NetworkManagerEvent), + wireless_enabled_subscription(0, conn.clone()).map(Message::NetworkManagerEvent), ]) } else { Subscription::batch(vec![timeline, network_sub]) diff --git a/cosmic-applet-network/src/network_manager/active_conns.rs b/cosmic-applet-network/src/network_manager/active_conns.rs index bc116e95..fe728625 100644 --- a/cosmic-applet-network/src/network_manager/active_conns.rs +++ b/cosmic-applet-network/src/network_manager/active_conns.rs @@ -1,7 +1,7 @@ use super::{NetworkManagerEvent, NetworkManagerState}; use cosmic::iced::{self, subscription}; use cosmic_dbus_networkmanager::nm::NetworkManager; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use log::error; use std::fmt::Debug; use std::hash::Hash; @@ -10,13 +10,14 @@ use zbus::Connection; pub fn active_conns_subscription( id: I, conn: Connection, -) -> iced::Subscription<(I, NetworkManagerEvent)> { - subscription::unfold(id, State::Continue(conn), move |mut state| async move { - loop { - let (update, new_state) = start_listening(id, state).await; - state = new_state; - if let Some(update) = update { - return (update, state); +) -> iced::Subscription { + let initial = State::Continue(conn.clone()); + subscription::channel(id, 50, move |mut output| { + let mut state = initial.clone(); + + async move { + loop { + state = start_listening(state, &mut output).await; } } }) @@ -28,10 +29,10 @@ pub enum State { Error, } -async fn start_listening( - id: I, +async fn start_listening( state: State, -) -> (Option<(I, NetworkManagerEvent)>, State) { + output: &mut futures::channel::mpsc::Sender, +) -> State { let conn = match state { State::Continue(conn) => conn, State::Error => iced::futures::future::pending().await, @@ -40,7 +41,7 @@ async fn start_listening( Ok(n) => n, Err(e) => { error!("Failed to connect to NetworkManager: {}", e); - return (None, State::Error); + return State::Error; } }; @@ -49,8 +50,8 @@ async fn start_listening( let new_state = NetworkManagerState::new(&conn).await.unwrap_or_default(); - ( - Some((id, NetworkManagerEvent::ActiveConns(new_state))), - State::Continue(conn), - ) + _ = output + .send(NetworkManagerEvent::ActiveConns(new_state)) + .await; + State::Continue(conn) } diff --git a/cosmic-applet-network/src/network_manager/devices.rs b/cosmic-applet-network/src/network_manager/devices.rs index c19fefda..d2f2f237 100644 --- a/cosmic-applet-network/src/network_manager/devices.rs +++ b/cosmic-applet-network/src/network_manager/devices.rs @@ -1,7 +1,7 @@ use super::{NetworkManagerEvent, NetworkManagerState}; use cosmic::iced::{self, subscription}; use cosmic_dbus_networkmanager::nm::NetworkManager; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use log::error; use std::fmt::Debug; use std::hash::Hash; @@ -10,13 +10,14 @@ use zbus::Connection; pub fn devices_subscription( id: I, conn: Connection, -) -> iced::Subscription<(I, NetworkManagerEvent)> { - subscription::unfold(id, State::Continue(conn), move |mut state| async move { - loop { - let (update, new_state) = start_listening(id, state).await; - state = new_state; - if let Some(update) = update { - return (update, state); +) -> iced::Subscription { + let initial = State::Continue(conn.clone()); + subscription::channel(id, 50, move |mut output| { + let mut state = initial.clone(); + + async move { + loop { + state = start_listening(state, &mut output).await; } } }) @@ -28,10 +29,10 @@ pub enum State { Error, } -async fn start_listening( - id: I, +async fn start_listening( state: State, -) -> (Option<(I, NetworkManagerEvent)>, State) { + output: &mut futures::channel::mpsc::Sender, +) -> State { let conn = match state { State::Continue(conn) => conn, State::Error => iced::futures::future::pending().await, @@ -40,7 +41,7 @@ async fn start_listening( Ok(n) => n, Err(e) => { error!("Failed to connect to NetworkManager: {}", e); - return (None, State::Error); + return State::Error; } }; @@ -48,9 +49,8 @@ async fn start_listening( devices_changed.next().await; let new_state = NetworkManagerState::new(&conn).await.unwrap_or_default(); - - ( - Some((id, NetworkManagerEvent::WirelessAccessPoints(new_state))), - State::Continue(conn), - ) + _ = output + .send(NetworkManagerEvent::WirelessAccessPoints(new_state)) + .await; + State::Continue(conn) } diff --git a/cosmic-applet-network/src/network_manager/mod.rs b/cosmic-applet-network/src/network_manager/mod.rs index a4ac7097..d091e2ce 100644 --- a/cosmic-applet-network/src/network_manager/mod.rs +++ b/cosmic-applet-network/src/network_manager/mod.rs @@ -4,7 +4,7 @@ pub mod current_networks; pub mod devices; pub mod wireless_enabled; -use std::{collections::HashMap, fmt::Debug, hash::Hash, ops::Deref, time::Duration}; +use std::{collections::HashMap, fmt::Debug, ops::Deref, time::Duration}; use cosmic::iced::{self, subscription}; use cosmic_dbus_networkmanager::{ @@ -19,7 +19,7 @@ use cosmic_dbus_networkmanager::{ }; use futures::{ channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, - StreamExt, + SinkExt, StreamExt, }; use tokio::{process::Command, time::timeout}; use zbus::{ @@ -32,14 +32,6 @@ use self::{ current_networks::{active_connections, ActiveConnectionInfo}, }; -pub fn network_manager_subscription( - id: I, -) -> iced::Subscription<(I, NetworkManagerEvent)> { - subscription::unfold(id, State::Ready, move |state| { - start_listening_loop(id, state) - }) -} - #[derive(Debug)] pub enum State { Ready, @@ -47,51 +39,52 @@ pub enum State { Finished, } -pub async fn start_listening_loop( +pub fn network_manager_subscription( id: I, - mut state: State, -) -> ((I, NetworkManagerEvent), State) { - loop { - let (update, new_state) = start_listening(id, state).await; - state = new_state; - if let Some(update) = update { - return (update, state); +) -> iced::Subscription { + subscription::channel(id, 50, |mut output| async move { + let mut state = State::Ready; + + loop { + state = start_listening(state, &mut output).await; } - } + }) } -async fn start_listening( - id: I, +async fn start_listening( state: State, -) -> (Option<(I, NetworkManagerEvent)>, State) { + output: &mut futures::channel::mpsc::Sender, +) -> State { match state { State::Ready => { let conn = match Connection::system().await { Ok(c) => c, - Err(_) => return (None, State::Finished), + Err(_) => return State::Finished, }; let (tx, rx) = unbounded(); let nm_state = NetworkManagerState::new(&conn).await.unwrap_or_default(); - return ( - Some(( - id, - NetworkManagerEvent::Init { - conn: conn.clone(), - sender: tx, - state: nm_state, - }, - )), - State::Waiting(conn, rx), - ); + if output + .send(NetworkManagerEvent::Init { + conn: conn.clone(), + sender: tx, + state: nm_state, + }) + .await + .is_ok() + { + State::Waiting(conn, rx) + } else { + State::Finished + } } State::Waiting(conn, mut rx) => { let network_manager = match NetworkManager::new(&conn).await { Ok(n) => n, - Err(_) => return (None, State::Finished), + Err(_) => return State::Finished, }; - let (update, should_exit) = match rx.next().await { + match rx.next().await { Some(NetworkManagerRequest::Disconnect(ssid)) => { let mut success = false; for c in network_manager @@ -125,18 +118,13 @@ async fn start_listening( } } } - - ( - Some(( - id, - NetworkManagerEvent::RequestResponse { - req: NetworkManagerRequest::Disconnect(ssid.clone()), - success, - state: NetworkManagerState::new(&conn).await.unwrap_or_default(), - }, - )), - false, - ) + _ = output + .send(NetworkManagerEvent::RequestResponse { + req: NetworkManagerRequest::Disconnect(ssid.clone()), + success, + state: NetworkManagerState::new(&conn).await.unwrap_or_default(), + }) + .await; } Some(NetworkManagerRequest::SetAirplaneMode(airplane_mode)) => { // wifi @@ -152,12 +140,13 @@ async fn start_listening( .output() .await .is_ok(); - let response = NetworkManagerEvent::RequestResponse { - req: NetworkManagerRequest::SetAirplaneMode(airplane_mode), - success, - state: NetworkManagerState::new(&conn).await.unwrap_or_default(), - }; - (Some((id, response)), false) + _ = output + .send(NetworkManagerEvent::RequestResponse { + req: NetworkManagerRequest::SetAirplaneMode(airplane_mode), + success, + state: NetworkManagerState::new(&conn).await.unwrap_or_default(), + }) + .await; } Some(NetworkManagerRequest::SetWiFi(enabled)) => { let success = network_manager.set_wireless_enabled(enabled).await.is_ok(); @@ -166,15 +155,15 @@ async fn start_listening( success, state: NetworkManagerState::new(&conn).await.unwrap_or_default(), }; - (Some((id, response)), false) + _ = output.send(response).await; } Some(NetworkManagerRequest::Password(ssid, password)) => { let s = match NetworkManagerSettings::new(&conn).await { Ok(s) => s, - Err(_) => return (None, State::Finished), + Err(_) => return State::Finished, }; - let mut status = (None, false); + let mut status: Option = None; // First try known connections // TODO more convenient methods of managing settings @@ -268,22 +257,16 @@ async fn start_listening( } else { false }; - status = ( - Some(( - id, - NetworkManagerEvent::RequestResponse { - req: NetworkManagerRequest::Password( - ssid.clone(), - password.clone(), - ), - success, - state: NetworkManagerState::new(&conn) - .await - .unwrap_or_default(), - }, - )), - false, - ); + status = Some(NetworkManagerEvent::RequestResponse { + req: NetworkManagerRequest::Password( + ssid.clone(), + password.clone(), + ), + success, + state: NetworkManagerState::new(&conn) + .await + .unwrap_or_default(), + }); } break; @@ -291,7 +274,7 @@ async fn start_listening( } // create a connection - if status.0.is_none() { + if status.is_none() { for device in network_manager.devices().await.ok().unwrap_or_default() { if matches!( device.device_type().await.unwrap_or(DeviceType::Other), @@ -368,53 +351,42 @@ async fn start_listening( } else { false }; - status = ( - Some(( - id, - NetworkManagerEvent::RequestResponse { - req: NetworkManagerRequest::Password( - ssid.clone(), - password.clone(), - ), - success, - state: NetworkManagerState::new(&conn) - .await - .unwrap_or_default(), - }, - )), - false, - ); + _ = output + .send(NetworkManagerEvent::RequestResponse { + req: NetworkManagerRequest::Password( + ssid.clone(), + password.clone(), + ), + success, + state: NetworkManagerState::new(&conn) + .await + .unwrap_or_default(), + }) + .await; break; } } } - if status.0.is_none() { - status = ( - Some(( - id, - NetworkManagerEvent::RequestResponse { - req: NetworkManagerRequest::Password(ssid, password), - success: false, - state: NetworkManagerState::new(&conn) - .await - .unwrap_or_default(), - }, - )), - false, - ); + if let Some(e) = status { + _ = output.send(e).await; + } else { + _ = output + .send(NetworkManagerEvent::RequestResponse { + req: NetworkManagerRequest::Password(ssid, password), + success: false, + state: NetworkManagerState::new(&conn).await.unwrap_or_default(), + }) + .await; } - - status } Some(NetworkManagerRequest::SelectAccessPoint(ssid)) => { let s = match NetworkManagerSettings::new(&conn).await { Ok(s) => s, - Err(_) => return (None, State::Finished), + Err(_) => return State::Finished, }; // find known connection with matching ssid and activate - let mut status = (None, false); for c in s.list_connections().await.unwrap_or_default() { let settings = match c.get_settings().await.ok() { @@ -472,50 +444,30 @@ async fn start_listening( } else { false }; - status = ( - Some(( - id, - NetworkManagerEvent::RequestResponse { - req: NetworkManagerRequest::SelectAccessPoint(ssid.clone()), - success, - state: NetworkManagerState::new(&conn) - .await - .unwrap_or_default(), - }, - )), - false, - ); + _ = output + .send(NetworkManagerEvent::RequestResponse { + req: NetworkManagerRequest::SelectAccessPoint(ssid.clone()), + success, + state: NetworkManagerState::new(&conn).await.unwrap_or_default(), + }) + .await; break; } - - if status.0.is_none() { - status = ( - Some(( - id, - NetworkManagerEvent::RequestResponse { - req: NetworkManagerRequest::SelectAccessPoint(ssid.clone()), - success: false, - state: NetworkManagerState::new(&conn) - .await - .unwrap_or_default(), - }, - )), - false, - ); - } - status + _ = output + .send(NetworkManagerEvent::RequestResponse { + req: NetworkManagerRequest::SelectAccessPoint(ssid.clone()), + success: false, + state: NetworkManagerState::new(&conn).await.unwrap_or_default(), + }) + .await; + } + _ => { + return State::Finished; } - None => (None, true), }; - ( - update, - if should_exit { - State::Finished - } else { - State::Waiting(conn, rx) - }, - ) + + State::Waiting(conn, rx) } State::Finished => iced::futures::future::pending().await, } diff --git a/cosmic-applet-network/src/network_manager/wireless_enabled.rs b/cosmic-applet-network/src/network_manager/wireless_enabled.rs index e79f498d..fc4c5d60 100644 --- a/cosmic-applet-network/src/network_manager/wireless_enabled.rs +++ b/cosmic-applet-network/src/network_manager/wireless_enabled.rs @@ -1,7 +1,7 @@ use super::{NetworkManagerEvent, NetworkManagerState}; use cosmic::iced::{self, subscription}; use cosmic_dbus_networkmanager::nm::NetworkManager; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use log::error; use std::fmt::Debug; use std::hash::Hash; @@ -10,13 +10,14 @@ use zbus::Connection; pub fn wireless_enabled_subscription( id: I, conn: Connection, -) -> iced::Subscription<(I, NetworkManagerEvent)> { - subscription::unfold(id, State::Continue(conn), move |mut state| async move { - loop { - let (update, new_state) = start_listening(id, state).await; - state = new_state; - if let Some(update) = update { - return (update, state); +) -> iced::Subscription { + let initial = State::Continue(conn.clone()); + subscription::channel(id, 50, move |mut output| { + let mut state = initial.clone(); + + async move { + loop { + state = start_listening(state, &mut output).await; } } }) @@ -28,10 +29,10 @@ pub enum State { Error, } -async fn start_listening( - id: I, +async fn start_listening( state: State, -) -> (Option<(I, NetworkManagerEvent)>, State) { + output: &mut futures::channel::mpsc::Sender, +) -> State { let conn = match state { State::Continue(conn) => conn, State::Error => iced::futures::future::pending().await, @@ -40,7 +41,7 @@ async fn start_listening( Ok(n) => n, Err(e) => { error!("Failed to connect to NetworkManager: {}", e); - return (None, State::Error); + return State::Error; } }; @@ -48,9 +49,8 @@ async fn start_listening( wireless_enabled_changed.next().await; let new_state = NetworkManagerState::new(&conn).await.unwrap_or_default(); - - ( - Some((id, NetworkManagerEvent::WiFiEnabled(new_state))), - State::Continue(conn), - ) + _ = output + .send(NetworkManagerEvent::WiFiEnabled(new_state)) + .await; + State::Continue(conn) }