Draft time-travel debugging feature

This commit is contained in:
Héctor Ramón Jiménez 2025-04-17 03:24:17 +02:00
parent 388a419ed5
commit d5d4479a53
No known key found for this signature in database
GPG key ID: 7CC46565708259A7
20 changed files with 330 additions and 63 deletions

View file

@ -1,10 +1,12 @@
use crate::Error;
use crate::core::time::{Duration, SystemTime};
use crate::span;
use crate::theme;
use futures::{FutureExt, select};
use semver::Version;
use serde::{Deserialize, Serialize};
use tokio::io::{self, AsyncWriteExt};
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net;
use tokio::sync::mpsc;
use tokio::time;
@ -17,7 +19,7 @@ pub const SERVER_ADDRESS: &str = "127.0.0.1:9167";
#[derive(Debug, Clone)]
pub struct Client {
sender: mpsc::Sender<Message>,
sender: mpsc::Sender<Action>,
is_connected: Arc<AtomicBool>,
_handle: Arc<thread::JoinHandle<()>>,
}
@ -43,17 +45,17 @@ pub enum Event {
ThemeChanged(theme::Palette),
SpanStarted(span::Stage),
SpanFinished(span::Stage, Duration),
MessageLogged(String),
MessageLogged { number: usize, message: String },
CommandsSpawned(usize),
SubscriptionsTracked(usize),
}
impl Client {
pub fn log(&self, event: Event) {
let _ = self.sender.try_send(Message::EventLogged {
let _ = self.sender.try_send(Action::Send(Message::EventLogged {
at: SystemTime::now(),
event,
});
}));
}
pub fn is_connected(&self) -> bool {
@ -61,21 +63,28 @@ impl Client {
}
pub fn quit(&self) {
let _ = self.sender.try_send(Message::Quit {
let _ = self.sender.try_send(Action::Send(Message::Quit {
at: SystemTime::now(),
});
}));
}
pub fn subscribe(&self) -> mpsc::Receiver<Command> {
let (sender, receiver) = mpsc::channel(100);
let _ = self.sender.try_send(Action::Forward(sender));
receiver
}
}
#[must_use]
pub fn connect(name: String) -> Client {
let (sender, receiver) = mpsc::channel(100);
let (sender, receiver) = mpsc::channel(10_000);
let is_connected = Arc::new(AtomicBool::new(false));
let handle = {
let is_connected = is_connected.clone();
std::thread::spawn(move || run(name, is_connected.clone(), receiver))
std::thread::spawn(move || run(name, is_connected, receiver))
};
Client {
@ -85,16 +94,30 @@ pub fn connect(name: String) -> Client {
}
}
enum Action {
Send(Message),
Forward(mpsc::Sender<Command>),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
RewindTo { message: usize },
}
#[tokio::main]
async fn run(
name: String,
is_connected: Arc<AtomicBool>,
mut receiver: mpsc::Receiver<Message>,
mut receiver: mpsc::Receiver<Action>,
) {
let version = semver::Version::parse(env!("CARGO_PKG_VERSION"))
.expect("Parse package version");
let mut buffer = Vec::new();
loop {
let mut command_sender = None;
match _connect().await {
Ok(mut stream) => {
is_connected.store(true, atomic::Ordering::Relaxed);
@ -109,16 +132,37 @@ async fn run(
)
.await;
while let Some(output) = receiver.recv().await {
match send(&mut stream, output).await {
Ok(()) => {}
Err(error) => {
if error.kind() != io::ErrorKind::BrokenPipe {
log::warn!(
"Error sending message to server: {error}"
);
loop {
select! {
action = receiver.recv().fuse() => {
let Some(action) = action else { break; };
match action {
Action::Send(message) => {
match send(&mut stream, message).await {
Ok(()) => {}
Err(error) => {
if error.kind() != io::ErrorKind::BrokenPipe
{
log::warn!(
"Error sending message to server: {error}"
);
}
break;
}
}
}
Action::Forward(sender) => {
command_sender = Some(sender);
}
}
}
command = receive(&mut stream, &mut buffer).fuse() => {
let Ok(command) = command else { continue; };
if let Some(sender) = command_sender.as_mut() {
let _ = sender.send(command).await;
}
break;
}
}
}
@ -154,3 +198,18 @@ async fn send(
Ok(())
}
async fn receive(
stream: &mut net::TcpStream,
buffer: &mut Vec<u8>,
) -> Result<Command, Error> {
let size = stream.read_u64().await? as usize;
if buffer.len() < size {
buffer.resize(size, 0);
}
let _n = stream.read_exact(&mut buffer[..size]).await?;
Ok(bincode::deserialize(buffer)?)
}

9
beacon/src/error.rs Normal file
View file

@ -0,0 +1,9 @@
use std::io;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("input/output operation failed: {0}")]
IOFailed(#[from] io::Error),
#[error("decoding failed: {0}")]
DecodingFailed(#[from] Box<bincode::ErrorKind>),
}

View file

@ -4,6 +4,7 @@ pub use semver::Version;
pub mod client;
pub mod span;
mod error;
mod stream;
pub use client::Client;
@ -11,14 +12,36 @@ pub use span::Span;
use crate::core::theme;
use crate::core::time::{Duration, SystemTime};
use crate::error::Error;
use futures::{SinkExt, Stream};
use tokio::io::{self, AsyncReadExt};
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net;
use tokio::sync::mpsc;
use tokio::task;
#[derive(Debug, Clone)]
pub struct Connection {
commands: mpsc::Sender<client::Command>,
}
impl Connection {
pub fn rewind_to<'a>(
&self,
message: usize,
) -> impl Future<Output = ()> + 'a {
let commands = self.commands.clone();
async move {
let _ = commands.send(client::Command::RewindTo { message }).await;
}
}
}
#[derive(Debug, Clone)]
pub enum Event {
Connected {
connection: Connection,
at: SystemTime,
name: String,
version: Version,
@ -86,18 +109,42 @@ pub fn run() -> impl Stream<Item = Event> {
};
loop {
let Ok((mut stream, _)) = server.accept().await else {
let Ok((stream, _)) = server.accept().await else {
continue;
};
let _ = stream.set_nodelay(true);
let (mut reader, mut writer) = {
let _ = stream.set_nodelay(true);
stream.into_split()
};
let (command_sender, mut command_receiver) = mpsc::channel(1);
let mut last_message = String::new();
let mut last_update_number = 0;
let mut last_commands_spawned = 0;
let mut last_present_window = None;
drop(task::spawn(async move {
let mut last_message_number = None;
while let Some(command) = command_receiver.recv().await {
let client::Command::RewindTo { message } = command;
if Some(message) == last_message_number {
continue;
}
last_message_number = Some(message);
let _ =
send(&mut writer, command).await.inspect_err(|error| {
log::error!("Error when sending command: {error}")
});
}
}));
loop {
match receive(&mut stream, &mut buffer).await {
match receive(&mut reader, &mut buffer).await {
Ok(message) => {
match message {
client::Message::Connected {
@ -107,6 +154,9 @@ pub fn run() -> impl Stream<Item = Event> {
} => {
let _ = output
.send(Event::Connected {
connection: Connection {
commands: command_sender.clone(),
},
at,
name,
version,
@ -133,7 +183,11 @@ pub fn run() -> impl Stream<Item = Event> {
})
.await;
}
client::Event::MessageLogged(message) => {
client::Event::MessageLogged {
number,
message,
} => {
last_update_number = number;
last_message = message;
}
client::Event::CommandsSpawned(
@ -161,6 +215,7 @@ pub fn run() -> impl Stream<Item = Event> {
span::Stage::Boot => Span::Boot,
span::Stage::Update => {
Span::Update {
number: last_update_number,
message: last_message
.clone(),
commands_spawned:
@ -246,7 +301,7 @@ pub fn run() -> impl Stream<Item = Event> {
}
async fn receive(
stream: &mut net::TcpStream,
stream: &mut net::tcp::OwnedReadHalf,
buffer: &mut Vec<u8>,
) -> Result<client::Message, Error> {
let size = stream.read_u64().await? as usize;
@ -260,14 +315,20 @@ async fn receive(
Ok(bincode::deserialize(buffer)?)
}
async fn send(
stream: &mut net::tcp::OwnedWriteHalf,
command: client::Command,
) -> Result<(), io::Error> {
let bytes = bincode::serialize(&command).expect("Encode input message");
let size = bytes.len() as u64;
stream.write_all(&size.to_be_bytes()).await?;
stream.write_all(&bytes).await?;
stream.flush().await?;
Ok(())
}
async fn delay() {
tokio::time::sleep(Duration::from_secs(2)).await;
}
#[derive(Debug, thiserror::Error)]
enum Error {
#[error("input/output operation failed: {0}")]
IOFailed(#[from] io::Error),
#[error("decoding failed: {0}")]
DecodingFailed(#[from] Box<bincode::ErrorKind>),
}

View file

@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
pub enum Span {
Boot,
Update {
number: usize,
message: String,
commands_spawned: usize,
},