From 5b649541b6d3e94d2dc6ad60f70076c00f7d142c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Ram=C3=B3n=20Jim=C3=A9nez?= Date: Sun, 20 Apr 2025 20:20:03 +0200 Subject: [PATCH] Fix disconnection logic in `beacon::client` --- beacon/src/client.rs | 67 +++++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/beacon/src/client.rs b/beacon/src/client.rs index d32dc589..bfd945d2 100644 --- a/beacon/src/client.rs +++ b/beacon/src/client.rs @@ -113,11 +113,14 @@ async fn run( let version = semver::Version::parse(env!("CARGO_PKG_VERSION")) .expect("Parse package version"); - let mut buffer = Vec::new(); + let command_sender = { + // Discard by default + let (sender, _receiver) = mpsc::channel(1); + + Arc::new(Mutex::new(sender)) + }; loop { - let command_sender = Arc::new(Mutex::new(None)); - match _connect().await { Ok(stream) => { is_connected.store(true, atomic::Ordering::Relaxed); @@ -138,39 +141,45 @@ async fn run( let command_sender = command_sender.clone(); drop(task::spawn(async move { - while let Some(action) = receiver.recv().await { - match action { - Action::Send(message) => { - match send(&mut writer, message).await { - Ok(()) => {} - Err(error) => { - if error.kind() - != io::ErrorKind::BrokenPipe - { - log::warn!( - "Error sending message to server: {error}" - ); - } - break; - } - } - } - Action::Forward(sender) => { - *command_sender.lock().await = Some(sender); + let mut buffer = Vec::new(); + + loop { + match receive(&mut reader, &mut buffer).await { + Ok(command) => { + let sender = command_sender.lock().await; + let _ = sender.send(command).await; } + Err(Error::DecodingFailed(_)) => {} + Err(Error::IOFailed(_)) => break, } } })) }; - loop { - let Ok(command) = receive(&mut reader, &mut buffer).await - else { - continue; - }; + while let Some(action) = receiver.recv().await { + match action { + Action::Send(message) => { + match send(&mut writer, message).await { + Ok(()) => {} + Err(error) => { + if error.kind() != io::ErrorKind::BrokenPipe + { + log::warn!( + "Error sending message to server: {error}" + ); + } - if let Some(sender) = command_sender.lock().await.as_ref() { - let _ = sender.send(command).await; + is_connected.store( + false, + atomic::Ordering::Relaxed, + ); + break; + } + } + } + Action::Forward(sender) => { + *command_sender.lock().await = sender; + } } } }