From c601b399f2035a74c680e8560f3213be3ebeeffd Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 12:07:42 +0100 Subject: [PATCH] Workaround rustfmt bug in session.rs --- crates/librqbit/src/session.rs | 63 ++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index d707b12..670cd8f 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -512,8 +512,10 @@ impl Session { async fn persistence_factory( opts: &SessionOptions, - ) -> anyhow::Result<(Option>, Arc)> { - + ) -> anyhow::Result<( + Option>, + Arc, + )> { macro_rules! make_result { ($store:expr) => { if opts.fastresume { @@ -538,7 +540,7 @@ impl Session { ); make_result!(s) - }, + } #[cfg(feature = "postgres")] Some(SessionPersistenceConfig::Postgres { connection_string }) => { use crate::session_persistence::postgres::PostgresSessionStorage; @@ -606,17 +608,22 @@ impl Session { connector: stream_connector, root_span: opts.root_span, stats: SessionStats::new(), - concurrent_initialize_semaphore: Arc::new( tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3))) + concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new( + opts.concurrent_init_limit.unwrap_or(3), + )), }); if let Some(mut disk_write_rx) = disk_write_rx { - session.spawn(error_span!(parent: session.rs(), "disk_writer"), async move { - while let Some(work) = disk_write_rx.recv().await { - trace!(disk_write_rx_queue_len = disk_write_rx.len()); - spawner.spawn_block_in_place(work); - } - Ok(()) - }); + session.spawn( + error_span!(parent: session.rs(), "disk_writer"), + async move { + while let Some(work) = disk_write_rx.recv().await { + trace!(disk_write_rx_queue_len = disk_write_rx.len()); + spawner.spawn_block_in_place(work); + } + Ok(()) + }, + ); } if let Some(tcp_listener) = tcp_listener { @@ -638,30 +645,36 @@ impl Session { if let Some(persistence) = session.persistence.as_ref() { info!("will use {persistence:?} for session persistence"); - let mut ps = persistence.stream_all().await?; + let mut ps = persistence.stream_all().await?; let mut added_all = false; let mut futs = FuturesUnordered::new(); while !added_all || !futs.is_empty() { + // NOTE: this closure exists purely to workaround rustfmt screwing up when inlining it. + let add_torrent_span = |info_hash: &Id20| -> tracing::Span { + error_span!(parent: session.rs(), "add_torrent", info_hash=?info_hash) + }; tokio::select! { Some(res) = futs.next(), if !futs.is_empty() => { if let Err(e) = res { error!("error adding torrent to session: {e:?}"); } - }, + } st = ps.next(), if !added_all => { - if let Some(st) = st { - let (id, st) = st?; - let span = error_span!(parent: session.rs(), "add_torrent", info_hash=?st.info_hash()); - let (add_torrent, mut opts) = st.into_add_torrent()?; - opts.preferred_id = Some(id); - let fut = session.add_torrent(add_torrent, Some(opts)).instrument(span); - futs.push(fut); - } else { - added_all = true; - } - }, - } + match st { + Some(st) => { + let (id, st) = st?; + let span = add_torrent_span(st.info_hash()); + let (add_torrent, mut opts) = st.into_add_torrent()?; + opts.preferred_id = Some(id); + let fut = session.add_torrent(add_torrent, Some(opts)); + let fut = fut.instrument(span); + futs.push(fut); + }, + None => added_all = true + }; + } + }; } }