Fix disconnection logic in beacon::client

This commit is contained in:
Héctor Ramón Jiménez 2025-04-20 20:20:03 +02:00
parent 162f8c0c29
commit 5b649541b6
No known key found for this signature in database
GPG key ID: 7CC46565708259A7

View file

@ -113,11 +113,14 @@ async fn run(
let version = semver::Version::parse(env!("CARGO_PKG_VERSION"))
.expect("Parse package version");
let mut buffer = Vec::new();
let command_sender = {
// Discard by default
let (sender, _receiver) = mpsc::channel(1);
Arc::new(Mutex::new(sender))
};
loop {
let command_sender = Arc::new(Mutex::new(None));
match _connect().await {
Ok(stream) => {
is_connected.store(true, atomic::Ordering::Relaxed);
@ -138,39 +141,45 @@ async fn run(
let command_sender = command_sender.clone();
drop(task::spawn(async move {
while let Some(action) = receiver.recv().await {
match action {
Action::Send(message) => {
match send(&mut writer, message).await {
Ok(()) => {}
Err(error) => {
if error.kind()
!= io::ErrorKind::BrokenPipe
{
log::warn!(
"Error sending message to server: {error}"
);
}
break;
}
}
}
Action::Forward(sender) => {
*command_sender.lock().await = Some(sender);
let mut buffer = Vec::new();
loop {
match receive(&mut reader, &mut buffer).await {
Ok(command) => {
let sender = command_sender.lock().await;
let _ = sender.send(command).await;
}
Err(Error::DecodingFailed(_)) => {}
Err(Error::IOFailed(_)) => break,
}
}
}))
};
loop {
let Ok(command) = receive(&mut reader, &mut buffer).await
else {
continue;
};
while let Some(action) = receiver.recv().await {
match action {
Action::Send(message) => {
match send(&mut writer, message).await {
Ok(()) => {}
Err(error) => {
if error.kind() != io::ErrorKind::BrokenPipe
{
log::warn!(
"Error sending message to server: {error}"
);
}
if let Some(sender) = command_sender.lock().await.as_ref() {
let _ = sender.send(command).await;
is_connected.store(
false,
atomic::Ordering::Relaxed,
);
break;
}
}
}
Action::Forward(sender) => {
*command_sender.lock().await = sender;
}
}
}
}