refactor: use the cards widget and use peer to peer dbus
This commit is contained in:
parent
c8273f0b4d
commit
c8891c8af9
9 changed files with 357 additions and 269 deletions
|
|
@ -1,4 +1,4 @@
|
|||
use crate::subscriptions::dbus_proxy::NotificationsProxy;
|
||||
use crate::subscriptions::freedesktop_proxy::NotificationsProxy;
|
||||
use cosmic::{
|
||||
iced::{
|
||||
futures::{self, SinkExt},
|
||||
|
|
@ -8,23 +8,25 @@ use cosmic::{
|
|||
};
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tracing::{error, warn};
|
||||
use zbus::Connection;
|
||||
use zbus::{export::futures_util::StreamExt, Connection};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum State {
|
||||
Ready,
|
||||
WaitingForNotificationEvent(Connection, Receiver<Input>),
|
||||
WaitingForNotificationEvent(NotificationsProxy<'static>, Receiver<Input>),
|
||||
Finished,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum Input {
|
||||
Dismiss(u32),
|
||||
CloseEvent(u32),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Output {
|
||||
Ready(Sender<Input>),
|
||||
CloseEvent(u32),
|
||||
}
|
||||
|
||||
pub fn proxy() -> Subscription<Output> {
|
||||
|
|
@ -45,34 +47,54 @@ pub fn proxy() -> Subscription<Output> {
|
|||
state = State::Finished;
|
||||
continue;
|
||||
};
|
||||
if let Err(err) = output.send(Output::Ready(sender)).await {
|
||||
error!("Failed to send sender: {}", err);
|
||||
state = State::Finished;
|
||||
continue;
|
||||
}
|
||||
|
||||
state = State::WaitingForNotificationEvent(conn, receiver);
|
||||
}
|
||||
State::WaitingForNotificationEvent(conn, rx) => {
|
||||
let Ok(proxy) = NotificationsProxy::new(&conn).await else {
|
||||
error!("Failed to create proxy from session connection");
|
||||
state = State::Finished;
|
||||
continue;
|
||||
};
|
||||
|
||||
match rx.recv().await {
|
||||
Some(Input::Dismiss(id)) => {
|
||||
if let Err(err) = proxy.close_notification(id).await {
|
||||
error!("Failed to close notification: {}", err);
|
||||
}
|
||||
let tx = sender.clone();
|
||||
if let Err(err) = output.send(Output::Ready(sender)).await {
|
||||
error!("Failed to send sender: {}", err);
|
||||
state = State::Finished;
|
||||
continue;
|
||||
}
|
||||
state = match proxy.receive_notification_closed().await {
|
||||
Ok(mut s) => {
|
||||
tokio::spawn(async move {
|
||||
while let Some(msg) = s.next().await {
|
||||
let Ok(id) = msg.args().map(|args| args.id) else {
|
||||
continue;
|
||||
};
|
||||
_ = tx.send(Input::CloseEvent(id)).await;
|
||||
}
|
||||
});
|
||||
State::WaitingForNotificationEvent(proxy, receiver)
|
||||
}
|
||||
None => {
|
||||
warn!("Notification event channel closed");
|
||||
state = State::Finished;
|
||||
continue;
|
||||
Err(err) => {
|
||||
error!(
|
||||
"failed to get a stream of signals for notifications. {}",
|
||||
err
|
||||
);
|
||||
State::Finished
|
||||
}
|
||||
};
|
||||
}
|
||||
State::WaitingForNotificationEvent(proxy, rx) => match rx.recv().await {
|
||||
Some(Input::Dismiss(id)) => {
|
||||
if let Err(err) = proxy.close_notification(id).await {
|
||||
error!("Failed to close notification: {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Input::CloseEvent(id)) => {
|
||||
_ = output.send(Output::CloseEvent(id)).await;
|
||||
}
|
||||
None => {
|
||||
warn!("Notification event channel closed");
|
||||
state = State::Finished;
|
||||
continue;
|
||||
}
|
||||
},
|
||||
State::Finished => {
|
||||
let () = futures::future::pending().await;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,3 +1,3 @@
|
|||
pub mod dbus;
|
||||
mod dbus_proxy;
|
||||
mod freedesktop_proxy;
|
||||
pub mod notifications;
|
||||
|
|
|
|||
|
|
@ -1,28 +1,28 @@
|
|||
use cosmic::{
|
||||
iced::{
|
||||
futures::{self, SinkExt},
|
||||
subscription,
|
||||
},
|
||||
iced::{futures, subscription},
|
||||
iced_futures::Subscription,
|
||||
};
|
||||
use cosmic_notifications_util::AppletEvent;
|
||||
use sendfd::RecvWithFd;
|
||||
use std::os::unix::io::{FromRawFd, RawFd};
|
||||
use tokio::{
|
||||
io::{self, AsyncBufReadExt, BufReader},
|
||||
net::UnixStream,
|
||||
use cosmic_notifications_util::Notification;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
os::unix::io::{FromRawFd, RawFd},
|
||||
};
|
||||
|
||||
use tracing::{error, info};
|
||||
use zbus::{
|
||||
dbus_proxy,
|
||||
export::futures_util::{SinkExt, StreamExt},
|
||||
ConnectionBuilder,
|
||||
};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum State {
|
||||
Ready,
|
||||
WaitingForDaemon(UnixStream),
|
||||
WaitingForNotificationEvent(UnixStream),
|
||||
WaitingForNotificationEvent(NotificationsAppletProxy<'static>),
|
||||
Finished,
|
||||
}
|
||||
|
||||
pub fn notifications() -> Subscription<AppletEvent> {
|
||||
pub fn notifications() -> Subscription<Notification> {
|
||||
struct SomeWorker;
|
||||
|
||||
subscription::channel(
|
||||
|
|
@ -34,97 +34,45 @@ pub fn notifications() -> Subscription<AppletEvent> {
|
|||
loop {
|
||||
match &mut state {
|
||||
State::Ready => {
|
||||
info!("Reading COSMIC_NOTIFICATIONS env var");
|
||||
let Ok(Some(raw_fd)) = std::env::var("COSMIC_NOTIFICATIONS")
|
||||
.map(|fd| fd.parse::<RawFd>().ok()) else
|
||||
{
|
||||
error!("Failed to parse COSMIC_NOTIFICATIONS env var");
|
||||
state = State::Finished;
|
||||
continue;
|
||||
};
|
||||
|
||||
let stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) };
|
||||
let Ok(stream) = UnixStream::from_std(stream) else {
|
||||
error!("Failed to convert std stream to unix stream");
|
||||
state = State::Finished;
|
||||
continue;
|
||||
};
|
||||
state = State::WaitingForDaemon(stream);
|
||||
|
||||
}
|
||||
State::WaitingForDaemon(stream) => {
|
||||
info!("Waiting for panel to send us a stream");
|
||||
if let Err(err) = stream.readable().await {
|
||||
error!("Failed to wait for stream to be readable {}", err);
|
||||
state = State::Finished;
|
||||
continue;
|
||||
};
|
||||
|
||||
// we are expecting a single RawFd from the panel on this stream and the applet id
|
||||
let mut buf = [0u8; 4];
|
||||
let mut fd_buf = [0i32; 1];
|
||||
|
||||
match stream.recv_with_fd(&mut buf, &mut fd_buf) {
|
||||
Ok((data_cnt, fd_cnt)) => {
|
||||
if data_cnt == 0 && fd_cnt == 0 {
|
||||
warn!("Received EOF from panel");
|
||||
state = State::Finished;
|
||||
|
||||
continue;
|
||||
}
|
||||
if data_cnt != 4 || fd_cnt != 1 {
|
||||
error!(
|
||||
"Invalid data received from panel {} {}",
|
||||
data_cnt, fd_cnt
|
||||
);
|
||||
state = State::Finished;
|
||||
|
||||
continue;
|
||||
}
|
||||
let notif_stream = unsafe {
|
||||
std::os::unix::net::UnixStream::from_raw_fd(fd_buf[0])
|
||||
};
|
||||
let Ok(notif_stream) = UnixStream::from_std(notif_stream) else {
|
||||
error!("Failed to convert raw fd to unix stream");
|
||||
state = State::Finished;
|
||||
continue;
|
||||
};
|
||||
|
||||
state = State::WaitingForNotificationEvent(notif_stream);
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
continue;
|
||||
}
|
||||
state = match get_proxy().await {
|
||||
Ok(p) => State::WaitingForNotificationEvent(p),
|
||||
Err(err) => {
|
||||
error!("Failed to receive fd from panel: {}", err);
|
||||
state = State::Finished;
|
||||
|
||||
error!("Failed to connect to notifications daemon {}", err);
|
||||
State::Finished
|
||||
}
|
||||
};
|
||||
}
|
||||
State::WaitingForNotificationEvent(proxy) => {
|
||||
info!("Waiting for notification events...");
|
||||
let mut signal = match proxy.receive_notify().await {
|
||||
Ok(s) => s,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"failed to get a stream of signals for notifications. {}",
|
||||
err
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
while let Some(msg) = signal.next().await {
|
||||
info!("Notification event");
|
||||
let Some(args) = msg.args().into_iter().next() else {
|
||||
error!("Failed to get arguments from notification signal.");
|
||||
break;
|
||||
};
|
||||
let notification = Notification::new(
|
||||
args.app_name,
|
||||
args.id,
|
||||
args.app_icon,
|
||||
args.summary,
|
||||
args.body,
|
||||
args.actions,
|
||||
args.hints,
|
||||
args.expire_timeout,
|
||||
);
|
||||
_ = output.send(notification).await;
|
||||
}
|
||||
}
|
||||
State::WaitingForNotificationEvent(stream) => {
|
||||
info!("Waiting for notification event");
|
||||
let reader = BufReader::new(stream);
|
||||
// todo read messages
|
||||
|
||||
let mut lines = reader.lines();
|
||||
while let Ok(Some(line)) = lines.next_line().await {
|
||||
if line.is_empty() {
|
||||
warn!("Received empty line from notification stream. The notification daemon probably crashed, so we will exit.");
|
||||
std::process::exit(1);
|
||||
}
|
||||
if let Ok(event) = ron::de::from_str::<AppletEvent>(line.as_str()) {
|
||||
if let Err(_err) = output.send(event).await {
|
||||
error!("Error sending event");
|
||||
}
|
||||
} else {
|
||||
error!("Failed to deserialize event from notification stream");
|
||||
}
|
||||
}
|
||||
warn!("Notification stream closed. The notification daemon probably crashed, so we will exit.");
|
||||
std::process::exit(1);
|
||||
}
|
||||
State::Finished => {
|
||||
let () = futures::future::pending().await;
|
||||
}
|
||||
|
|
@ -133,3 +81,37 @@ pub fn notifications() -> Subscription<AppletEvent> {
|
|||
},
|
||||
)
|
||||
}
|
||||
|
||||
#[dbus_proxy(
|
||||
default_service = "com.system76.NotificationsApplet",
|
||||
interface = "com.system76.NotificationsApplet",
|
||||
default_path = "/com/system76/NotificationsApplet"
|
||||
)]
|
||||
trait NotificationsApplet {
|
||||
#[dbus_proxy(signal)]
|
||||
fn notify(
|
||||
&self,
|
||||
app_name: &str,
|
||||
id: u32,
|
||||
app_icon: &str,
|
||||
summary: &str,
|
||||
body: &str,
|
||||
actions: Vec<&str>,
|
||||
hints: HashMap<&str, zbus::zvariant::Value<'_>>,
|
||||
expire_timeout: i32,
|
||||
) -> zbus::Result<()>;
|
||||
}
|
||||
|
||||
async fn get_proxy() -> anyhow::Result<NotificationsAppletProxy<'static>> {
|
||||
let raw_fd = std::env::var("COSMIC_NOTIFICATIONS")?;
|
||||
let raw_fd = raw_fd.parse::<RawFd>()?;
|
||||
|
||||
let stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) };
|
||||
stream.set_nonblocking(true)?;
|
||||
let stream = tokio::net::UnixStream::from_std(stream)?;
|
||||
let conn = ConnectionBuilder::socket(stream).p2p().build().await?;
|
||||
info!("Applet connection created");
|
||||
let proxy = NotificationsAppletProxy::new(&conn).await?;
|
||||
|
||||
Ok(proxy)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue