refactor: use select macro for gio copy futures
This commit is contained in:
parent
e35d5123f0
commit
0bd20e57e7
1 changed files with 12 additions and 24 deletions
|
|
@ -9,7 +9,6 @@ use compio::buf::{IntoInner, IoBuf};
|
|||
use compio::driver::{ToSharedFd, op::AsyncifyFd};
|
||||
use compio::io::{AsyncReadAt, AsyncWriteAt};
|
||||
use cosmic::iced::futures;
|
||||
use futures::future::Either;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use gio::prelude::FileExtManual;
|
||||
use std::future::Future;
|
||||
|
|
@ -608,10 +607,11 @@ impl Op {
|
|||
glib::Priority::LOW,
|
||||
);
|
||||
|
||||
let mut copy_fut =
|
||||
gio_copy_fut.map(|result| result.map_err(GioCopyError::GLib));
|
||||
let mut copy_fut = gio_copy_fut
|
||||
.map(|result| result.map_err(GioCopyError::GLib))
|
||||
.fuse();
|
||||
|
||||
let mut progress_fut = std::pin::pin!(async {
|
||||
let progress_fut = std::pin::pin!(async {
|
||||
while let Some((current_bytes, _)) = progress_stream.next().await {
|
||||
_ = progress_tx.send(current_bytes);
|
||||
}
|
||||
|
|
@ -620,31 +620,23 @@ impl Op {
|
|||
futures::future::pending::<()>().await;
|
||||
});
|
||||
|
||||
// Poll the progress future while waiting for the copy future to finish.
|
||||
let mut main_fut = std::pin::pin!(async {
|
||||
loop {
|
||||
if let Either::Right((result, _)) =
|
||||
futures::future::select(&mut progress_fut, &mut copy_fut).await
|
||||
{
|
||||
return result;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut progress_fut = progress_fut.fuse();
|
||||
let mut pause_rx2 = pause_rx.clone();
|
||||
|
||||
loop {
|
||||
let until_paused = std::pin::pin!(pause_rx.wait_for(|paused| *paused));
|
||||
match futures::future::select(until_paused, &mut main_fut).await {
|
||||
Either::Left(_) => {
|
||||
_ = pause_rx2.wait_for(|paused| !*paused).await;
|
||||
}
|
||||
futures::select! {
|
||||
_ = &mut progress_fut => {},
|
||||
|
||||
Either::Right((result, _)) => {
|
||||
result = &mut copy_fut => {
|
||||
_ = tx.send(result.map(|_| ()));
|
||||
glib_loop2.quit();
|
||||
return;
|
||||
}
|
||||
|
||||
_ = until_paused.fuse() => {
|
||||
_ = pause_rx2.wait_for(|paused| !*paused).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
@ -659,7 +651,6 @@ impl Op {
|
|||
|
||||
loop {
|
||||
let until_paused = std::pin::pin!(ctx.controller.until_paused());
|
||||
|
||||
futures::select! {
|
||||
value = progress_rx.recv().fuse() => {
|
||||
if let Some(current_bytes) = value {
|
||||
|
|
@ -668,9 +659,6 @@ impl Op {
|
|||
if current.duration_since(last_progress_update).as_millis() > 49 {
|
||||
last_progress_update = current;
|
||||
(ctx.on_progress)(self, &progress);
|
||||
|
||||
tracing::info!("checking controller");
|
||||
|
||||
// Also check if the progress was cancelled.
|
||||
if let Err(state) = ctx.controller.check().await {
|
||||
tracing::warn!(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue