From 0bd20e57e7d9023f4372f5fb65ac77710b2fdae3 Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Tue, 14 Apr 2026 16:51:21 +0200 Subject: [PATCH] refactor: use select macro for gio copy futures --- src/operation/recursive.rs | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/src/operation/recursive.rs b/src/operation/recursive.rs index befe69c..bbe89d1 100644 --- a/src/operation/recursive.rs +++ b/src/operation/recursive.rs @@ -9,7 +9,6 @@ use compio::buf::{IntoInner, IoBuf}; use compio::driver::{ToSharedFd, op::AsyncifyFd}; use compio::io::{AsyncReadAt, AsyncWriteAt}; use cosmic::iced::futures; -use futures::future::Either; use futures::{FutureExt, StreamExt}; use gio::prelude::FileExtManual; use std::future::Future; @@ -608,10 +607,11 @@ impl Op { glib::Priority::LOW, ); - let mut copy_fut = - gio_copy_fut.map(|result| result.map_err(GioCopyError::GLib)); + let mut copy_fut = gio_copy_fut + .map(|result| result.map_err(GioCopyError::GLib)) + .fuse(); - let mut progress_fut = std::pin::pin!(async { + let progress_fut = std::pin::pin!(async { while let Some((current_bytes, _)) = progress_stream.next().await { _ = progress_tx.send(current_bytes); } @@ -620,31 +620,23 @@ impl Op { futures::future::pending::<()>().await; }); - // Poll the progress future while waiting for the copy future to finish. - let mut main_fut = std::pin::pin!(async { - loop { - if let Either::Right((result, _)) = - futures::future::select(&mut progress_fut, &mut copy_fut).await - { - return result; - } - } - }); - + let mut progress_fut = progress_fut.fuse(); let mut pause_rx2 = pause_rx.clone(); loop { let until_paused = std::pin::pin!(pause_rx.wait_for(|paused| *paused)); - match futures::future::select(until_paused, &mut main_fut).await { - Either::Left(_) => { - _ = pause_rx2.wait_for(|paused| !*paused).await; - } + futures::select! { + _ = &mut progress_fut => {}, - Either::Right((result, _)) => { + result = &mut copy_fut => { _ = tx.send(result.map(|_| ())); glib_loop2.quit(); return; } + + _ = until_paused.fuse() => { + _ = pause_rx2.wait_for(|paused| !*paused).await; + } } } }); @@ -659,7 +651,6 @@ impl Op { loop { let until_paused = std::pin::pin!(ctx.controller.until_paused()); - futures::select! { value = progress_rx.recv().fuse() => { if let Some(current_bytes) = value { @@ -668,9 +659,6 @@ impl Op { if current.duration_since(last_progress_update).as_millis() > 49 { last_progress_update = current; (ctx.on_progress)(self, &progress); - - tracing::info!("checking controller"); - // Also check if the progress was cancelled. if let Err(state) = ctx.controller.check().await { tracing::warn!(