diff --git a/Cargo.lock b/Cargo.lock index d1e610ea..5318d98d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2453,6 +2453,7 @@ version = "0.14.0-dev" dependencies = [ "iced_beacon", "iced_core", + "iced_futures", "log", ] diff --git a/Cargo.toml b/Cargo.toml index b6638e66..133c2362 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,8 @@ markdown = ["iced_widget/markdown"] lazy = ["iced_widget/lazy"] # Enables a debug view in native platforms (press F12) debug = ["iced_winit/debug", "iced_devtools"] +# Enables time-travel debugging (very experimental!) +time-travel = ["debug", "iced_devtools/time-travel"] # Enables the `thread-pool` futures executor as the `executor::Default` on native platforms thread-pool = ["iced_futures/thread-pool"] # Enables `tokio` as the `executor::Default` on native platforms diff --git a/beacon/src/client.rs b/beacon/src/client.rs index 1ca5bc8c..b7444617 100644 --- a/beacon/src/client.rs +++ b/beacon/src/client.rs @@ -1,10 +1,12 @@ +use crate::Error; use crate::core::time::{Duration, SystemTime}; use crate::span; use crate::theme; +use futures::{FutureExt, select}; use semver::Version; use serde::{Deserialize, Serialize}; -use tokio::io::{self, AsyncWriteExt}; +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net; use tokio::sync::mpsc; use tokio::time; @@ -17,7 +19,7 @@ pub const SERVER_ADDRESS: &str = "127.0.0.1:9167"; #[derive(Debug, Clone)] pub struct Client { - sender: mpsc::Sender, + sender: mpsc::Sender, is_connected: Arc, _handle: Arc>, } @@ -43,17 +45,17 @@ pub enum Event { ThemeChanged(theme::Palette), SpanStarted(span::Stage), SpanFinished(span::Stage, Duration), - MessageLogged(String), + MessageLogged { number: usize, message: String }, CommandsSpawned(usize), SubscriptionsTracked(usize), } impl Client { pub fn log(&self, event: Event) { - let _ = self.sender.try_send(Message::EventLogged { + let _ = self.sender.try_send(Action::Send(Message::EventLogged { at: SystemTime::now(), event, - }); + })); } pub fn is_connected(&self) -> bool { @@ -61,21 +63,28 @@ impl Client { } pub fn quit(&self) { - let _ = self.sender.try_send(Message::Quit { + let _ = self.sender.try_send(Action::Send(Message::Quit { at: SystemTime::now(), - }); + })); + } + + pub fn subscribe(&self) -> mpsc::Receiver { + let (sender, receiver) = mpsc::channel(100); + let _ = self.sender.try_send(Action::Forward(sender)); + + receiver } } #[must_use] pub fn connect(name: String) -> Client { - let (sender, receiver) = mpsc::channel(100); + let (sender, receiver) = mpsc::channel(10_000); let is_connected = Arc::new(AtomicBool::new(false)); let handle = { let is_connected = is_connected.clone(); - std::thread::spawn(move || run(name, is_connected.clone(), receiver)) + std::thread::spawn(move || run(name, is_connected, receiver)) }; Client { @@ -85,16 +94,30 @@ pub fn connect(name: String) -> Client { } } +enum Action { + Send(Message), + Forward(mpsc::Sender), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Command { + RewindTo { message: usize }, +} + #[tokio::main] async fn run( name: String, is_connected: Arc, - mut receiver: mpsc::Receiver, + mut receiver: mpsc::Receiver, ) { let version = semver::Version::parse(env!("CARGO_PKG_VERSION")) .expect("Parse package version"); + let mut buffer = Vec::new(); + loop { + let mut command_sender = None; + match _connect().await { Ok(mut stream) => { is_connected.store(true, atomic::Ordering::Relaxed); @@ -109,16 +132,37 @@ async fn run( ) .await; - while let Some(output) = receiver.recv().await { - match send(&mut stream, output).await { - Ok(()) => {} - Err(error) => { - if error.kind() != io::ErrorKind::BrokenPipe { - log::warn!( - "Error sending message to server: {error}" - ); + loop { + select! { + action = receiver.recv().fuse() => { + let Some(action) = action else { break; }; + + match action { + Action::Send(message) => { + match send(&mut stream, message).await { + Ok(()) => {} + Err(error) => { + if error.kind() != io::ErrorKind::BrokenPipe + { + log::warn!( + "Error sending message to server: {error}" + ); + } + break; + } + } + } + Action::Forward(sender) => { + command_sender = Some(sender); + } + } + } + command = receive(&mut stream, &mut buffer).fuse() => { + let Ok(command) = command else { continue; }; + + if let Some(sender) = command_sender.as_mut() { + let _ = sender.send(command).await; } - break; } } } @@ -154,3 +198,18 @@ async fn send( Ok(()) } + +async fn receive( + stream: &mut net::TcpStream, + buffer: &mut Vec, +) -> Result { + let size = stream.read_u64().await? as usize; + + if buffer.len() < size { + buffer.resize(size, 0); + } + + let _n = stream.read_exact(&mut buffer[..size]).await?; + + Ok(bincode::deserialize(buffer)?) +} diff --git a/beacon/src/error.rs b/beacon/src/error.rs new file mode 100644 index 00000000..032e75d5 --- /dev/null +++ b/beacon/src/error.rs @@ -0,0 +1,9 @@ +use std::io; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("input/output operation failed: {0}")] + IOFailed(#[from] io::Error), + #[error("decoding failed: {0}")] + DecodingFailed(#[from] Box), +} diff --git a/beacon/src/lib.rs b/beacon/src/lib.rs index 76df883a..8d1c31f5 100644 --- a/beacon/src/lib.rs +++ b/beacon/src/lib.rs @@ -4,6 +4,7 @@ pub use semver::Version; pub mod client; pub mod span; +mod error; mod stream; pub use client::Client; @@ -11,14 +12,36 @@ pub use span::Span; use crate::core::theme; use crate::core::time::{Duration, SystemTime}; +use crate::error::Error; use futures::{SinkExt, Stream}; -use tokio::io::{self, AsyncReadExt}; +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net; +use tokio::sync::mpsc; +use tokio::task; + +#[derive(Debug, Clone)] +pub struct Connection { + commands: mpsc::Sender, +} + +impl Connection { + pub fn rewind_to<'a>( + &self, + message: usize, + ) -> impl Future + 'a { + let commands = self.commands.clone(); + + async move { + let _ = commands.send(client::Command::RewindTo { message }).await; + } + } +} #[derive(Debug, Clone)] pub enum Event { Connected { + connection: Connection, at: SystemTime, name: String, version: Version, @@ -86,18 +109,42 @@ pub fn run() -> impl Stream { }; loop { - let Ok((mut stream, _)) = server.accept().await else { + let Ok((stream, _)) = server.accept().await else { continue; }; - let _ = stream.set_nodelay(true); + let (mut reader, mut writer) = { + let _ = stream.set_nodelay(true); + stream.into_split() + }; + let (command_sender, mut command_receiver) = mpsc::channel(1); let mut last_message = String::new(); + let mut last_update_number = 0; let mut last_commands_spawned = 0; let mut last_present_window = None; + drop(task::spawn(async move { + let mut last_message_number = None; + + while let Some(command) = command_receiver.recv().await { + let client::Command::RewindTo { message } = command; + + if Some(message) == last_message_number { + continue; + } + + last_message_number = Some(message); + + let _ = + send(&mut writer, command).await.inspect_err(|error| { + log::error!("Error when sending command: {error}") + }); + } + })); + loop { - match receive(&mut stream, &mut buffer).await { + match receive(&mut reader, &mut buffer).await { Ok(message) => { match message { client::Message::Connected { @@ -107,6 +154,9 @@ pub fn run() -> impl Stream { } => { let _ = output .send(Event::Connected { + connection: Connection { + commands: command_sender.clone(), + }, at, name, version, @@ -133,7 +183,11 @@ pub fn run() -> impl Stream { }) .await; } - client::Event::MessageLogged(message) => { + client::Event::MessageLogged { + number, + message, + } => { + last_update_number = number; last_message = message; } client::Event::CommandsSpawned( @@ -161,6 +215,7 @@ pub fn run() -> impl Stream { span::Stage::Boot => Span::Boot, span::Stage::Update => { Span::Update { + number: last_update_number, message: last_message .clone(), commands_spawned: @@ -246,7 +301,7 @@ pub fn run() -> impl Stream { } async fn receive( - stream: &mut net::TcpStream, + stream: &mut net::tcp::OwnedReadHalf, buffer: &mut Vec, ) -> Result { let size = stream.read_u64().await? as usize; @@ -260,14 +315,20 @@ async fn receive( Ok(bincode::deserialize(buffer)?) } +async fn send( + stream: &mut net::tcp::OwnedWriteHalf, + command: client::Command, +) -> Result<(), io::Error> { + let bytes = bincode::serialize(&command).expect("Encode input message"); + let size = bytes.len() as u64; + + stream.write_all(&size.to_be_bytes()).await?; + stream.write_all(&bytes).await?; + stream.flush().await?; + + Ok(()) +} + async fn delay() { tokio::time::sleep(Duration::from_secs(2)).await; } - -#[derive(Debug, thiserror::Error)] -enum Error { - #[error("input/output operation failed: {0}")] - IOFailed(#[from] io::Error), - #[error("decoding failed: {0}")] - DecodingFailed(#[from] Box), -} diff --git a/beacon/src/span.rs b/beacon/src/span.rs index d35f7b54..453ef1bd 100644 --- a/beacon/src/span.rs +++ b/beacon/src/span.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; pub enum Span { Boot, Update { + number: usize, message: String, commands_spawned: usize, }, diff --git a/debug/Cargo.toml b/debug/Cargo.toml index 1c5e7324..f6c7c843 100644 --- a/debug/Cargo.toml +++ b/debug/Cargo.toml @@ -15,6 +15,7 @@ enable = ["dep:iced_beacon"] [dependencies] iced_core.workspace = true +iced_futures.workspace = true log.workspace = true [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/debug/src/lib.rs b/debug/src/lib.rs index f3335c70..8f5a0096 100644 --- a/debug/src/lib.rs +++ b/debug/src/lib.rs @@ -1,7 +1,9 @@ pub use iced_core as core; +pub use iced_futures as futures; use crate::core::theme; use crate::core::window; +use crate::futures::Subscription; pub use internal::Span; @@ -16,6 +18,11 @@ pub enum Primitive { Text, } +#[derive(Debug, Clone, Copy)] +pub enum Command { + RewindTo { message: usize }, +} + pub fn init(name: &str) { internal::init(name); } @@ -88,12 +95,18 @@ pub fn skip_next_timing() { internal::skip_next_timing(); } +pub fn commands() -> Subscription { + internal::commands() +} + #[cfg(all(feature = "enable", not(target_arch = "wasm32")))] mod internal { - use crate::Primitive; use crate::core::theme; use crate::core::time::Instant; use crate::core::window; + use crate::futures::Subscription; + use crate::futures::futures::Stream; + use crate::{Command, Primitive}; use iced_beacon as beacon; @@ -102,7 +115,7 @@ mod internal { use std::io; use std::process; - use std::sync::atomic::{self, AtomicBool}; + use std::sync::atomic::{self, AtomicBool, AtomicUsize}; use std::sync::{LazyLock, RwLock}; pub fn init(name: &str) { @@ -162,6 +175,8 @@ mod internal { pub fn update(message: &impl std::fmt::Debug) -> Span { let span = span(span::Stage::Update); + let number = LAST_UPDATE.fetch_add(1, atomic::Ordering::Relaxed); + let start = Instant::now(); let message = format!("{message:?}"); let elapsed = start.elapsed(); @@ -172,11 +187,13 @@ mod internal { ); } - BEACON.log(client::Event::MessageLogged(if message.len() > 49 { + let message = if message.len() > 49 { format!("{}...", &message[..49]) } else { message - })); + }; + + BEACON.log(client::Event::MessageLogged { number, message }); span } @@ -217,6 +234,24 @@ mod internal { SKIP_NEXT_SPAN.store(true, atomic::Ordering::Relaxed); } + pub fn commands() -> Subscription { + fn listen_for_commands() -> impl Stream { + use crate::futures::futures::stream; + + stream::unfold(BEACON.subscribe(), async move |mut receiver| { + let command = match receiver.recv().await? { + client::Command::RewindTo { message } => { + Command::RewindTo { message } + } + }; + + Some((command, receiver)) + }) + } + + Subscription::run(listen_for_commands) + } + fn span(span: span::Stage) -> Span { BEACON.log(client::Event::SpanStarted(span.clone())); @@ -260,15 +295,17 @@ mod internal { }); static NAME: RwLock = RwLock::new(String::new()); + static LAST_UPDATE: AtomicUsize = AtomicUsize::new(0); static LAST_PALETTE: RwLock> = RwLock::new(None); static SKIP_NEXT_SPAN: AtomicBool = AtomicBool::new(false); } #[cfg(any(not(feature = "enable"), target_arch = "wasm32"))] mod internal { - use crate::Primitive; use crate::core::theme; use crate::core::window; + use crate::futures::Subscription; + use crate::{Command, Primitive}; use std::io; @@ -326,6 +363,10 @@ mod internal { pub fn skip_next_timing() {} + pub fn commands() -> Subscription { + Subscription::none() + } + #[derive(Debug)] pub struct Span; diff --git a/devtools/Cargo.toml b/devtools/Cargo.toml index df7e5012..3034f9c3 100644 --- a/devtools/Cargo.toml +++ b/devtools/Cargo.toml @@ -13,6 +13,9 @@ rust-version.workspace = true [lints] workspace = true +[features] +time-travel = ["iced_program/time-travel"] + [dependencies] iced_program.workspace = true iced_widget.workspace = true diff --git a/devtools/src/lib.rs b/devtools/src/lib.rs index 8d19140f..7bcf100a 100644 --- a/devtools/src/lib.rs +++ b/devtools/src/lib.rs @@ -115,6 +115,8 @@ where state: P::State, mode: Mode, show_notification: bool, + rewind: Option, + log: Vec, } #[derive(Debug, Clone)] @@ -147,6 +149,8 @@ where state, mode: Mode::None, show_notification: true, + rewind: None, + log: Vec::new(), }, executor::spawn_blocking(|mut sender| { thread::sleep(seconds(2)); @@ -250,7 +254,46 @@ where } }, Event::Program(message) => { - program.update(&mut self.state, message).map(Event::Program) + if self.rewind.is_some() { + return Task::none(); + } + + #[cfg(feature = "time-travel")] + { + self.log.push(message.clone()); + } + + let span = debug::update(&message); + let task = program.update(&mut self.state, message); + debug::tasks_spawned(task.units()); + span.finish(); + + task.map(Event::Program) + } + Event::Command(command) => { + match command { + debug::Command::RewindTo { message } => { + #[cfg(feature = "time-travel")] + { + let (mut state, _) = program.boot(); + + if message < self.log.len() { + // TODO: Run concurrently (?) + for message in &self.log[0..message] { + let _ = program + .update(&mut state, message.clone()); + } + } + + self.rewind = Some(state); + } + + #[cfg(not(feature = "time-travel"))] + let _ = message; + } + } + + Task::none() } } } @@ -260,8 +303,10 @@ where program: &P, window: window::Id, ) -> Element<'_, Event

, P::Theme, P::Renderer> { - let view = program.view(&self.state, window).map(Event::Program); - let theme = program.theme(&self.state, window); + let state = self.rewind.as_ref().unwrap_or(&self.state); + + let view = program.view(state, window).map(Event::Program); + let theme = program.theme(state, window); let derive_theme = move || { theme @@ -363,6 +408,7 @@ where }); stack![view] + .height(Fill) .push_maybe(mode.map(opaque)) .push_maybe(notification) .into() @@ -372,6 +418,8 @@ where let subscription = program.subscription(&self.state).map(Event::Program); + debug::subscriptions_tracked(subscription.units()); + let hotkeys = futures::keyboard::on_key_press(|key, _modifiers| match key { keyboard::Key::Named(keyboard::key::Named::F12) => { @@ -381,19 +429,22 @@ where }) .map(Event::Message); - Subscription::batch([subscription, hotkeys]) + let commands = debug::commands().map(Event::Command); + + Subscription::batch([subscription, hotkeys, commands]) } pub fn theme(&self, program: &P, window: window::Id) -> P::Theme { - program.theme(&self.state, window) + program.theme(self.rewind.as_ref().unwrap_or(&self.state), window) } pub fn style(&self, program: &P, theme: &P::Theme) -> theme::Style { - program.style(&self.state, theme) + program.style(self.rewind.as_ref().unwrap_or(&self.state), theme) } pub fn scale_factor(&self, program: &P, window: window::Id) -> f64 { - program.scale_factor(&self.state, window) + program + .scale_factor(self.rewind.as_ref().unwrap_or(&self.state), window) } } @@ -403,6 +454,7 @@ where { Message(Message), Program(P::Message), + Command(debug::Command), } impl

fmt::Debug for Event

@@ -413,6 +465,21 @@ where match self { Self::Message(message) => message.fmt(f), Self::Program(message) => message.fmt(f), + Self::Command(command) => command.fmt(f), + } + } +} + +#[cfg(feature = "time-travel")] +impl

Clone for Event

+where + P: Program, +{ + fn clone(&self) -> Self { + match self { + Event::Message(message) => Event::Message(message.clone()), + Event::Program(message) => Event::Program(message.clone()), + Event::Command(command) => Event::Command(*command), } } } diff --git a/examples/multitouch/src/main.rs b/examples/multitouch/src/main.rs index 4f22f552..0db1c09a 100644 --- a/examples/multitouch/src/main.rs +++ b/examples/multitouch/src/main.rs @@ -23,7 +23,7 @@ struct Multitouch { fingers: HashMap, } -#[derive(Debug)] +#[derive(Debug, Clone)] enum Message { FingerPressed { id: touch::Finger, position: Point }, FingerLifted { id: touch::Finger }, diff --git a/examples/solar_system/src/main.rs b/examples/solar_system/src/main.rs index 07450309..a0b5c402 100644 --- a/examples/solar_system/src/main.rs +++ b/examples/solar_system/src/main.rs @@ -105,6 +105,10 @@ impl State { } pub fn update(&mut self, now: Instant) { + if self.start > now { + self.start = now; + } + self.now = now; self.system_cache.clear(); } diff --git a/examples/todos/Cargo.toml b/examples/todos/Cargo.toml index fd3433e6..5e16a2ac 100644 --- a/examples/todos/Cargo.toml +++ b/examples/todos/Cargo.toml @@ -7,7 +7,7 @@ publish = false [dependencies] iced.workspace = true -iced.features = ["tokio", "debug"] +iced.features = ["tokio", "debug", "time-travel"] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index f799d5f8..e347e81f 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -285,6 +285,11 @@ impl Subscription { .collect(), } } + + /// Returns the amount of recipe units in this [`Subscription`]. + pub fn units(&self) -> usize { + self.recipes.len() + } } /// Creates a [`Subscription`] from a [`Recipe`] describing it. diff --git a/program/Cargo.toml b/program/Cargo.toml index 07880705..7aa6414d 100644 --- a/program/Cargo.toml +++ b/program/Cargo.toml @@ -10,9 +10,12 @@ categories.workspace = true keywords.workspace = true rust-version.workspace = true +[lints] +workspace = true + +[features] +time-travel = [] + [dependencies] iced_graphics.workspace = true iced_runtime.workspace = true - -[lints] -workspace = true diff --git a/program/src/lib.rs b/program/src/lib.rs index 7e5757de..e25cdb22 100644 --- a/program/src/lib.rs +++ b/program/src/lib.rs @@ -22,7 +22,7 @@ pub trait Program: Sized { type State; /// The message of the program. - type Message: Send + std::fmt::Debug + 'static; + type Message: Message + 'static; /// The theme of the program. type Theme: Default + theme::Base; @@ -642,3 +642,17 @@ impl Instance

{ self.program.scale_factor(&self.state, window) } } + +/// A trait alias for the [`Message`](Program::Message) of a [`Program`]. +#[cfg(feature = "time-travel")] +pub trait Message: Send + std::fmt::Debug + Clone {} + +#[cfg(feature = "time-travel")] +impl Message for T {} + +/// A trait alias for the [`Message`](Program::Message) of a [`Program`]. +#[cfg(not(feature = "time-travel"))] +pub trait Message: Send + std::fmt::Debug {} + +#[cfg(not(feature = "time-travel"))] +impl Message for T {} diff --git a/src/application.rs b/src/application.rs index 09dd2647..5438e97d 100644 --- a/src/application.rs +++ b/src/application.rs @@ -75,7 +75,7 @@ pub fn application( ) -> Application> where State: 'static, - Message: Send + std::fmt::Debug + 'static, + Message: program::Message + 'static, Theme: Default + theme::Base, Renderer: program::Renderer, { @@ -94,7 +94,7 @@ where impl Program for Instance where - Message: Send + std::fmt::Debug + 'static, + Message: program::Message + 'static, Theme: Default + theme::Base, Renderer: program::Renderer, Boot: self::Boot, diff --git a/src/daemon.rs b/src/daemon.rs index 8a356356..80271e73 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -25,7 +25,7 @@ pub fn daemon( ) -> Daemon> where State: 'static, - Message: Send + std::fmt::Debug + 'static, + Message: program::Message + 'static, Theme: Default + theme::Base, Renderer: program::Renderer, { @@ -44,7 +44,7 @@ where impl Program for Instance where - Message: Send + std::fmt::Debug + 'static, + Message: program::Message + 'static, Theme: Default + theme::Base, Renderer: program::Renderer, Boot: application::Boot, diff --git a/src/lib.rs b/src/lib.rs index b98dc4b9..c1c9fa38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -343,7 +343,7 @@ //! use iced::window; //! use iced::{Size, Subscription}; //! -//! #[derive(Debug)] +//! #[derive(Debug, Clone)] //! enum Message { //! WindowResized(Size), //! } @@ -387,7 +387,7 @@ //! # pub fn update(&mut self, message: Message) -> Action { unimplemented!() } //! # pub fn view(&self) -> Element { unimplemented!() } //! # } -//! # #[derive(Debug)] +//! # #[derive(Debug, Clone)] //! # pub enum Message {} //! # pub enum Action { None, Run(Task), Chat(()) } //! # } @@ -399,7 +399,7 @@ //! # pub fn update(&mut self, message: Message) -> Task { unimplemented!() } //! # pub fn view(&self) -> Element { unimplemented!() } //! # } -//! # #[derive(Debug)] +//! # #[derive(Debug, Clone)] //! # pub enum Message {} //! # } //! use contacts::Contacts; @@ -697,7 +697,7 @@ pub fn run( ) -> Result where State: Default + 'static, - Message: std::fmt::Debug + Send + 'static, + Message: program::Message + 'static, Theme: Default + theme::Base + 'static, Renderer: program::Renderer + 'static, { diff --git a/winit/src/lib.rs b/winit/src/lib.rs index 23bcc091..814d4f58 100644 --- a/winit/src/lib.rs +++ b/winit/src/lib.rs @@ -1069,10 +1069,7 @@ fn update( P::Theme: theme::Base, { for message in messages.drain(..) { - let update_span = debug::update(&message); let task = runtime.enter(|| program.update(message)); - debug::tasks_spawned(task.units()); - update_span.finish(); if let Some(stream) = runtime::task::into_stream(task) { runtime.run(stream); @@ -1082,7 +1079,6 @@ fn update( let subscription = runtime.enter(|| program.subscription()); let recipes = subscription::into_recipes(subscription.map(Action::Output)); - debug::subscriptions_tracked(recipes.len()); runtime.track(recipes); }