From 41d7487ab0f59d92a801c0dbc8debac81e89fb8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Ram=C3=B3n=20Jim=C3=A9nez?= Date: Sun, 20 Apr 2025 19:50:08 +0200 Subject: [PATCH] Replace `select!` with `into_split` in `beacon::client` --- beacon/src/client.rs | 48 ++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/beacon/src/client.rs b/beacon/src/client.rs index b7444617..271788ef 100644 --- a/beacon/src/client.rs +++ b/beacon/src/client.rs @@ -3,12 +3,12 @@ use crate::core::time::{Duration, SystemTime}; use crate::span; use crate::theme; -use futures::{FutureExt, select}; use semver::Version; use serde::{Deserialize, Serialize}; use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net; -use tokio::sync::mpsc; +use tokio::sync::{RwLock, mpsc}; +use tokio::task; use tokio::time; use std::sync::Arc; @@ -116,14 +116,16 @@ async fn run( let mut buffer = Vec::new(); loop { - let mut command_sender = None; + let command_sender = Arc::new(RwLock::new(None)); match _connect().await { - Ok(mut stream) => { + Ok(stream) => { is_connected.store(true, atomic::Ordering::Relaxed); + let (mut reader, mut writer) = stream.into_split(); + let _ = send( - &mut stream, + &mut writer, Message::Connected { at: SystemTime::now(), name: name.clone(), @@ -132,17 +134,18 @@ async fn run( ) .await; - loop { - select! { - action = receiver.recv().fuse() => { - let Some(action) = action else { break; }; + { + let command_sender = command_sender.clone(); + drop(task::spawn(async move { + while let Some(action) = receiver.recv().await { match action { Action::Send(message) => { - match send(&mut stream, message).await { + match send(&mut writer, message).await { Ok(()) => {} Err(error) => { - if error.kind() != io::ErrorKind::BrokenPipe + if error.kind() + != io::ErrorKind::BrokenPipe { log::warn!( "Error sending message to server: {error}" @@ -153,17 +156,22 @@ async fn run( } } Action::Forward(sender) => { - command_sender = Some(sender); + *command_sender.write().await = + Some(sender); } } } - command = receive(&mut stream, &mut buffer).fuse() => { - let Ok(command) = command else { continue; }; + })) + }; - if let Some(sender) = command_sender.as_mut() { - let _ = sender.send(command).await; - } - } + loop { + let Ok(command) = receive(&mut reader, &mut buffer).await + else { + continue; + }; + + if let Some(sender) = command_sender.read().await.as_ref() { + let _ = sender.send(command).await; } } } @@ -186,7 +194,7 @@ async fn _connect() -> Result { } async fn send( - stream: &mut net::TcpStream, + stream: &mut net::tcp::OwnedWriteHalf, message: Message, ) -> Result<(), io::Error> { let bytes = bincode::serialize(&message).expect("Encode input message"); @@ -200,7 +208,7 @@ async fn send( } async fn receive( - stream: &mut net::TcpStream, + stream: &mut net::tcp::OwnedReadHalf, buffer: &mut Vec, ) -> Result { let size = stream.read_u64().await? as usize;