Replace select! with into_split in beacon::client
This commit is contained in:
parent
d5d4479a53
commit
41d7487ab0
1 changed files with 28 additions and 20 deletions
|
|
@ -3,12 +3,12 @@ use crate::core::time::{Duration, SystemTime};
|
|||
use crate::span;
|
||||
use crate::theme;
|
||||
|
||||
use futures::{FutureExt, select};
|
||||
use semver::Version;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
use tokio::task;
|
||||
use tokio::time;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
|
@ -116,14 +116,16 @@ async fn run(
|
|||
let mut buffer = Vec::new();
|
||||
|
||||
loop {
|
||||
let mut command_sender = None;
|
||||
let command_sender = Arc::new(RwLock::new(None));
|
||||
|
||||
match _connect().await {
|
||||
Ok(mut stream) => {
|
||||
Ok(stream) => {
|
||||
is_connected.store(true, atomic::Ordering::Relaxed);
|
||||
|
||||
let (mut reader, mut writer) = stream.into_split();
|
||||
|
||||
let _ = send(
|
||||
&mut stream,
|
||||
&mut writer,
|
||||
Message::Connected {
|
||||
at: SystemTime::now(),
|
||||
name: name.clone(),
|
||||
|
|
@ -132,17 +134,18 @@ async fn run(
|
|||
)
|
||||
.await;
|
||||
|
||||
loop {
|
||||
select! {
|
||||
action = receiver.recv().fuse() => {
|
||||
let Some(action) = action else { break; };
|
||||
{
|
||||
let command_sender = command_sender.clone();
|
||||
|
||||
drop(task::spawn(async move {
|
||||
while let Some(action) = receiver.recv().await {
|
||||
match action {
|
||||
Action::Send(message) => {
|
||||
match send(&mut stream, message).await {
|
||||
match send(&mut writer, message).await {
|
||||
Ok(()) => {}
|
||||
Err(error) => {
|
||||
if error.kind() != io::ErrorKind::BrokenPipe
|
||||
if error.kind()
|
||||
!= io::ErrorKind::BrokenPipe
|
||||
{
|
||||
log::warn!(
|
||||
"Error sending message to server: {error}"
|
||||
|
|
@ -153,17 +156,22 @@ async fn run(
|
|||
}
|
||||
}
|
||||
Action::Forward(sender) => {
|
||||
command_sender = Some(sender);
|
||||
*command_sender.write().await =
|
||||
Some(sender);
|
||||
}
|
||||
}
|
||||
}
|
||||
command = receive(&mut stream, &mut buffer).fuse() => {
|
||||
let Ok(command) = command else { continue; };
|
||||
}))
|
||||
};
|
||||
|
||||
if let Some(sender) = command_sender.as_mut() {
|
||||
let _ = sender.send(command).await;
|
||||
}
|
||||
}
|
||||
loop {
|
||||
let Ok(command) = receive(&mut reader, &mut buffer).await
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Some(sender) = command_sender.read().await.as_ref() {
|
||||
let _ = sender.send(command).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -186,7 +194,7 @@ async fn _connect() -> Result<net::TcpStream, io::Error> {
|
|||
}
|
||||
|
||||
async fn send(
|
||||
stream: &mut net::TcpStream,
|
||||
stream: &mut net::tcp::OwnedWriteHalf,
|
||||
message: Message,
|
||||
) -> Result<(), io::Error> {
|
||||
let bytes = bincode::serialize(&message).expect("Encode input message");
|
||||
|
|
@ -200,7 +208,7 @@ async fn send(
|
|||
}
|
||||
|
||||
async fn receive(
|
||||
stream: &mut net::TcpStream,
|
||||
stream: &mut net::tcp::OwnedReadHalf,
|
||||
buffer: &mut Vec<u8>,
|
||||
) -> Result<Command, Error> {
|
||||
let size = stream.read_u64().await? as usize;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue