refactor(network): use channel subscriptions

This commit is contained in:
Ashley Wulber 2023-07-10 18:17:32 -04:00 committed by Ashley Wulber
parent fad3b097d9
commit 4b9f46b388
5 changed files with 154 additions and 204 deletions

View file

@ -725,8 +725,7 @@ impl Application for CosmicNetworkApplet {
} }
fn subscription(&self) -> Subscription<Message> { fn subscription(&self) -> Subscription<Message> {
let network_sub = let network_sub = network_manager_subscription(0).map(|e| Message::NetworkManagerEvent(e));
network_manager_subscription(0).map(|e| Message::NetworkManagerEvent(e.1));
let timeline = self let timeline = self
.timeline .timeline
.as_subscription() .as_subscription()
@ -737,11 +736,9 @@ impl Application for CosmicNetworkApplet {
self.applet_helper.theme_subscription(0).map(Message::Theme), self.applet_helper.theme_subscription(0).map(Message::Theme),
timeline, timeline,
network_sub, network_sub,
active_conns_subscription(0, conn.clone()) active_conns_subscription(0, conn.clone()).map(Message::NetworkManagerEvent),
.map(|e| Message::NetworkManagerEvent(e.1)), devices_subscription(0, conn.clone()).map(Message::NetworkManagerEvent),
devices_subscription(0, conn.clone()).map(|e| Message::NetworkManagerEvent(e.1)), wireless_enabled_subscription(0, conn.clone()).map(Message::NetworkManagerEvent),
wireless_enabled_subscription(0, conn.clone())
.map(|e| Message::NetworkManagerEvent(e.1)),
]) ])
} else { } else {
Subscription::batch(vec![timeline, network_sub]) Subscription::batch(vec![timeline, network_sub])

View file

@ -1,7 +1,7 @@
use super::{NetworkManagerEvent, NetworkManagerState}; use super::{NetworkManagerEvent, NetworkManagerState};
use cosmic::iced::{self, subscription}; use cosmic::iced::{self, subscription};
use cosmic_dbus_networkmanager::nm::NetworkManager; use cosmic_dbus_networkmanager::nm::NetworkManager;
use futures::StreamExt; use futures::{SinkExt, StreamExt};
use log::error; use log::error;
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
@ -10,13 +10,14 @@ use zbus::Connection;
pub fn active_conns_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>( pub fn active_conns_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>(
id: I, id: I,
conn: Connection, conn: Connection,
) -> iced::Subscription<(I, NetworkManagerEvent)> { ) -> iced::Subscription<NetworkManagerEvent> {
subscription::unfold(id, State::Continue(conn), move |mut state| async move { let initial = State::Continue(conn.clone());
loop { subscription::channel(id, 50, move |mut output| {
let (update, new_state) = start_listening(id, state).await; let mut state = initial.clone();
state = new_state;
if let Some(update) = update { async move {
return (update, state); loop {
state = start_listening(state, &mut output).await;
} }
} }
}) })
@ -28,10 +29,10 @@ pub enum State {
Error, Error,
} }
async fn start_listening<I: Copy + Debug>( async fn start_listening(
id: I,
state: State, state: State,
) -> (Option<(I, NetworkManagerEvent)>, State) { output: &mut futures::channel::mpsc::Sender<NetworkManagerEvent>,
) -> State {
let conn = match state { let conn = match state {
State::Continue(conn) => conn, State::Continue(conn) => conn,
State::Error => iced::futures::future::pending().await, State::Error => iced::futures::future::pending().await,
@ -40,7 +41,7 @@ async fn start_listening<I: Copy + Debug>(
Ok(n) => n, Ok(n) => n,
Err(e) => { Err(e) => {
error!("Failed to connect to NetworkManager: {}", e); error!("Failed to connect to NetworkManager: {}", e);
return (None, State::Error); return State::Error;
} }
}; };
@ -49,8 +50,8 @@ async fn start_listening<I: Copy + Debug>(
let new_state = NetworkManagerState::new(&conn).await.unwrap_or_default(); let new_state = NetworkManagerState::new(&conn).await.unwrap_or_default();
( _ = output
Some((id, NetworkManagerEvent::ActiveConns(new_state))), .send(NetworkManagerEvent::ActiveConns(new_state))
State::Continue(conn), .await;
) State::Continue(conn)
} }

View file

@ -1,7 +1,7 @@
use super::{NetworkManagerEvent, NetworkManagerState}; use super::{NetworkManagerEvent, NetworkManagerState};
use cosmic::iced::{self, subscription}; use cosmic::iced::{self, subscription};
use cosmic_dbus_networkmanager::nm::NetworkManager; use cosmic_dbus_networkmanager::nm::NetworkManager;
use futures::StreamExt; use futures::{SinkExt, StreamExt};
use log::error; use log::error;
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
@ -10,13 +10,14 @@ use zbus::Connection;
pub fn devices_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>( pub fn devices_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>(
id: I, id: I,
conn: Connection, conn: Connection,
) -> iced::Subscription<(I, NetworkManagerEvent)> { ) -> iced::Subscription<NetworkManagerEvent> {
subscription::unfold(id, State::Continue(conn), move |mut state| async move { let initial = State::Continue(conn.clone());
loop { subscription::channel(id, 50, move |mut output| {
let (update, new_state) = start_listening(id, state).await; let mut state = initial.clone();
state = new_state;
if let Some(update) = update { async move {
return (update, state); loop {
state = start_listening(state, &mut output).await;
} }
} }
}) })
@ -28,10 +29,10 @@ pub enum State {
Error, Error,
} }
async fn start_listening<I: Copy + Debug>( async fn start_listening(
id: I,
state: State, state: State,
) -> (Option<(I, NetworkManagerEvent)>, State) { output: &mut futures::channel::mpsc::Sender<NetworkManagerEvent>,
) -> State {
let conn = match state { let conn = match state {
State::Continue(conn) => conn, State::Continue(conn) => conn,
State::Error => iced::futures::future::pending().await, State::Error => iced::futures::future::pending().await,
@ -40,7 +41,7 @@ async fn start_listening<I: Copy + Debug>(
Ok(n) => n, Ok(n) => n,
Err(e) => { Err(e) => {
error!("Failed to connect to NetworkManager: {}", e); error!("Failed to connect to NetworkManager: {}", e);
return (None, State::Error); return State::Error;
} }
}; };
@ -48,9 +49,8 @@ async fn start_listening<I: Copy + Debug>(
devices_changed.next().await; devices_changed.next().await;
let new_state = NetworkManagerState::new(&conn).await.unwrap_or_default(); let new_state = NetworkManagerState::new(&conn).await.unwrap_or_default();
_ = output
( .send(NetworkManagerEvent::WirelessAccessPoints(new_state))
Some((id, NetworkManagerEvent::WirelessAccessPoints(new_state))), .await;
State::Continue(conn), State::Continue(conn)
)
} }

View file

@ -4,7 +4,7 @@ pub mod current_networks;
pub mod devices; pub mod devices;
pub mod wireless_enabled; pub mod wireless_enabled;
use std::{collections::HashMap, fmt::Debug, hash::Hash, ops::Deref, time::Duration}; use std::{collections::HashMap, fmt::Debug, ops::Deref, time::Duration};
use cosmic::iced::{self, subscription}; use cosmic::iced::{self, subscription};
use cosmic_dbus_networkmanager::{ use cosmic_dbus_networkmanager::{
@ -19,7 +19,7 @@ use cosmic_dbus_networkmanager::{
}; };
use futures::{ use futures::{
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
StreamExt, SinkExt, StreamExt,
}; };
use tokio::{process::Command, time::timeout}; use tokio::{process::Command, time::timeout};
use zbus::{ use zbus::{
@ -32,14 +32,6 @@ use self::{
current_networks::{active_connections, ActiveConnectionInfo}, current_networks::{active_connections, ActiveConnectionInfo},
}; };
pub fn network_manager_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>(
id: I,
) -> iced::Subscription<(I, NetworkManagerEvent)> {
subscription::unfold(id, State::Ready, move |state| {
start_listening_loop(id, state)
})
}
#[derive(Debug)] #[derive(Debug)]
pub enum State { pub enum State {
Ready, Ready,
@ -47,51 +39,52 @@ pub enum State {
Finished, Finished,
} }
pub async fn start_listening_loop<I: Copy + Debug>( pub fn network_manager_subscription<I: Copy + Debug + std::hash::Hash + 'static>(
id: I, id: I,
mut state: State, ) -> iced::Subscription<NetworkManagerEvent> {
) -> ((I, NetworkManagerEvent), State) { subscription::channel(id, 50, |mut output| async move {
loop { let mut state = State::Ready;
let (update, new_state) = start_listening(id, state).await;
state = new_state; loop {
if let Some(update) = update { state = start_listening(state, &mut output).await;
return (update, state);
} }
} })
} }
async fn start_listening<I: Copy + Debug>( async fn start_listening(
id: I,
state: State, state: State,
) -> (Option<(I, NetworkManagerEvent)>, State) { output: &mut futures::channel::mpsc::Sender<NetworkManagerEvent>,
) -> State {
match state { match state {
State::Ready => { State::Ready => {
let conn = match Connection::system().await { let conn = match Connection::system().await {
Ok(c) => c, Ok(c) => c,
Err(_) => return (None, State::Finished), Err(_) => return State::Finished,
}; };
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
let nm_state = NetworkManagerState::new(&conn).await.unwrap_or_default(); let nm_state = NetworkManagerState::new(&conn).await.unwrap_or_default();
return ( if output
Some(( .send(NetworkManagerEvent::Init {
id, conn: conn.clone(),
NetworkManagerEvent::Init { sender: tx,
conn: conn.clone(), state: nm_state,
sender: tx, })
state: nm_state, .await
}, .is_ok()
)), {
State::Waiting(conn, rx), State::Waiting(conn, rx)
); } else {
State::Finished
}
} }
State::Waiting(conn, mut rx) => { State::Waiting(conn, mut rx) => {
let network_manager = match NetworkManager::new(&conn).await { let network_manager = match NetworkManager::new(&conn).await {
Ok(n) => n, Ok(n) => n,
Err(_) => return (None, State::Finished), Err(_) => return State::Finished,
}; };
let (update, should_exit) = match rx.next().await { match rx.next().await {
Some(NetworkManagerRequest::Disconnect(ssid)) => { Some(NetworkManagerRequest::Disconnect(ssid)) => {
let mut success = false; let mut success = false;
for c in network_manager for c in network_manager
@ -125,18 +118,13 @@ async fn start_listening<I: Copy + Debug>(
} }
} }
} }
_ = output
( .send(NetworkManagerEvent::RequestResponse {
Some(( req: NetworkManagerRequest::Disconnect(ssid.clone()),
id, success,
NetworkManagerEvent::RequestResponse { state: NetworkManagerState::new(&conn).await.unwrap_or_default(),
req: NetworkManagerRequest::Disconnect(ssid.clone()), })
success, .await;
state: NetworkManagerState::new(&conn).await.unwrap_or_default(),
},
)),
false,
)
} }
Some(NetworkManagerRequest::SetAirplaneMode(airplane_mode)) => { Some(NetworkManagerRequest::SetAirplaneMode(airplane_mode)) => {
// wifi // wifi
@ -152,12 +140,13 @@ async fn start_listening<I: Copy + Debug>(
.output() .output()
.await .await
.is_ok(); .is_ok();
let response = NetworkManagerEvent::RequestResponse { _ = output
req: NetworkManagerRequest::SetAirplaneMode(airplane_mode), .send(NetworkManagerEvent::RequestResponse {
success, req: NetworkManagerRequest::SetAirplaneMode(airplane_mode),
state: NetworkManagerState::new(&conn).await.unwrap_or_default(), success,
}; state: NetworkManagerState::new(&conn).await.unwrap_or_default(),
(Some((id, response)), false) })
.await;
} }
Some(NetworkManagerRequest::SetWiFi(enabled)) => { Some(NetworkManagerRequest::SetWiFi(enabled)) => {
let success = network_manager.set_wireless_enabled(enabled).await.is_ok(); let success = network_manager.set_wireless_enabled(enabled).await.is_ok();
@ -166,15 +155,15 @@ async fn start_listening<I: Copy + Debug>(
success, success,
state: NetworkManagerState::new(&conn).await.unwrap_or_default(), state: NetworkManagerState::new(&conn).await.unwrap_or_default(),
}; };
(Some((id, response)), false) _ = output.send(response).await;
} }
Some(NetworkManagerRequest::Password(ssid, password)) => { Some(NetworkManagerRequest::Password(ssid, password)) => {
let s = match NetworkManagerSettings::new(&conn).await { let s = match NetworkManagerSettings::new(&conn).await {
Ok(s) => s, Ok(s) => s,
Err(_) => return (None, State::Finished), Err(_) => return State::Finished,
}; };
let mut status = (None, false); let mut status: Option<NetworkManagerEvent> = None;
// First try known connections // First try known connections
// TODO more convenient methods of managing settings // TODO more convenient methods of managing settings
@ -268,22 +257,16 @@ async fn start_listening<I: Copy + Debug>(
} else { } else {
false false
}; };
status = ( status = Some(NetworkManagerEvent::RequestResponse {
Some(( req: NetworkManagerRequest::Password(
id, ssid.clone(),
NetworkManagerEvent::RequestResponse { password.clone(),
req: NetworkManagerRequest::Password( ),
ssid.clone(), success,
password.clone(), state: NetworkManagerState::new(&conn)
), .await
success, .unwrap_or_default(),
state: NetworkManagerState::new(&conn) });
.await
.unwrap_or_default(),
},
)),
false,
);
} }
break; break;
@ -291,7 +274,7 @@ async fn start_listening<I: Copy + Debug>(
} }
// create a connection // create a connection
if status.0.is_none() { if status.is_none() {
for device in network_manager.devices().await.ok().unwrap_or_default() { for device in network_manager.devices().await.ok().unwrap_or_default() {
if matches!( if matches!(
device.device_type().await.unwrap_or(DeviceType::Other), device.device_type().await.unwrap_or(DeviceType::Other),
@ -368,53 +351,42 @@ async fn start_listening<I: Copy + Debug>(
} else { } else {
false false
}; };
status = ( _ = output
Some(( .send(NetworkManagerEvent::RequestResponse {
id, req: NetworkManagerRequest::Password(
NetworkManagerEvent::RequestResponse { ssid.clone(),
req: NetworkManagerRequest::Password( password.clone(),
ssid.clone(), ),
password.clone(), success,
), state: NetworkManagerState::new(&conn)
success, .await
state: NetworkManagerState::new(&conn) .unwrap_or_default(),
.await })
.unwrap_or_default(), .await;
},
)),
false,
);
break; break;
} }
} }
} }
if status.0.is_none() { if let Some(e) = status {
status = ( _ = output.send(e).await;
Some(( } else {
id, _ = output
NetworkManagerEvent::RequestResponse { .send(NetworkManagerEvent::RequestResponse {
req: NetworkManagerRequest::Password(ssid, password), req: NetworkManagerRequest::Password(ssid, password),
success: false, success: false,
state: NetworkManagerState::new(&conn) state: NetworkManagerState::new(&conn).await.unwrap_or_default(),
.await })
.unwrap_or_default(), .await;
},
)),
false,
);
} }
status
} }
Some(NetworkManagerRequest::SelectAccessPoint(ssid)) => { Some(NetworkManagerRequest::SelectAccessPoint(ssid)) => {
let s = match NetworkManagerSettings::new(&conn).await { let s = match NetworkManagerSettings::new(&conn).await {
Ok(s) => s, Ok(s) => s,
Err(_) => return (None, State::Finished), Err(_) => return State::Finished,
}; };
// find known connection with matching ssid and activate // find known connection with matching ssid and activate
let mut status = (None, false);
for c in s.list_connections().await.unwrap_or_default() { for c in s.list_connections().await.unwrap_or_default() {
let settings = match c.get_settings().await.ok() { let settings = match c.get_settings().await.ok() {
@ -472,50 +444,30 @@ async fn start_listening<I: Copy + Debug>(
} else { } else {
false false
}; };
status = ( _ = output
Some(( .send(NetworkManagerEvent::RequestResponse {
id, req: NetworkManagerRequest::SelectAccessPoint(ssid.clone()),
NetworkManagerEvent::RequestResponse { success,
req: NetworkManagerRequest::SelectAccessPoint(ssid.clone()), state: NetworkManagerState::new(&conn).await.unwrap_or_default(),
success, })
state: NetworkManagerState::new(&conn) .await;
.await
.unwrap_or_default(),
},
)),
false,
);
break; break;
} }
_ = output
if status.0.is_none() { .send(NetworkManagerEvent::RequestResponse {
status = ( req: NetworkManagerRequest::SelectAccessPoint(ssid.clone()),
Some(( success: false,
id, state: NetworkManagerState::new(&conn).await.unwrap_or_default(),
NetworkManagerEvent::RequestResponse { })
req: NetworkManagerRequest::SelectAccessPoint(ssid.clone()), .await;
success: false, }
state: NetworkManagerState::new(&conn) _ => {
.await return State::Finished;
.unwrap_or_default(),
},
)),
false,
);
}
status
} }
None => (None, true),
}; };
(
update, State::Waiting(conn, rx)
if should_exit {
State::Finished
} else {
State::Waiting(conn, rx)
},
)
} }
State::Finished => iced::futures::future::pending().await, State::Finished => iced::futures::future::pending().await,
} }

View file

@ -1,7 +1,7 @@
use super::{NetworkManagerEvent, NetworkManagerState}; use super::{NetworkManagerEvent, NetworkManagerState};
use cosmic::iced::{self, subscription}; use cosmic::iced::{self, subscription};
use cosmic_dbus_networkmanager::nm::NetworkManager; use cosmic_dbus_networkmanager::nm::NetworkManager;
use futures::StreamExt; use futures::{SinkExt, StreamExt};
use log::error; use log::error;
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
@ -10,13 +10,14 @@ use zbus::Connection;
pub fn wireless_enabled_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>( pub fn wireless_enabled_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>(
id: I, id: I,
conn: Connection, conn: Connection,
) -> iced::Subscription<(I, NetworkManagerEvent)> { ) -> iced::Subscription<NetworkManagerEvent> {
subscription::unfold(id, State::Continue(conn), move |mut state| async move { let initial = State::Continue(conn.clone());
loop { subscription::channel(id, 50, move |mut output| {
let (update, new_state) = start_listening(id, state).await; let mut state = initial.clone();
state = new_state;
if let Some(update) = update { async move {
return (update, state); loop {
state = start_listening(state, &mut output).await;
} }
} }
}) })
@ -28,10 +29,10 @@ pub enum State {
Error, Error,
} }
async fn start_listening<I: Copy + Debug>( async fn start_listening(
id: I,
state: State, state: State,
) -> (Option<(I, NetworkManagerEvent)>, State) { output: &mut futures::channel::mpsc::Sender<NetworkManagerEvent>,
) -> State {
let conn = match state { let conn = match state {
State::Continue(conn) => conn, State::Continue(conn) => conn,
State::Error => iced::futures::future::pending().await, State::Error => iced::futures::future::pending().await,
@ -40,7 +41,7 @@ async fn start_listening<I: Copy + Debug>(
Ok(n) => n, Ok(n) => n,
Err(e) => { Err(e) => {
error!("Failed to connect to NetworkManager: {}", e); error!("Failed to connect to NetworkManager: {}", e);
return (None, State::Error); return State::Error;
} }
}; };
@ -48,9 +49,8 @@ async fn start_listening<I: Copy + Debug>(
wireless_enabled_changed.next().await; wireless_enabled_changed.next().await;
let new_state = NetworkManagerState::new(&conn).await.unwrap_or_default(); let new_state = NetworkManagerState::new(&conn).await.unwrap_or_default();
_ = output
( .send(NetworkManagerEvent::WiFiEnabled(new_state))
Some((id, NetworkManagerEvent::WiFiEnabled(new_state))), .await;
State::Continue(conn), State::Continue(conn)
)
} }