Workaround rustfmt bug in session.rs

This commit is contained in:
Igor Katson 2024-08-21 12:07:42 +01:00
parent 06e88c138f
commit c601b399f2
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -512,8 +512,10 @@ impl Session {
async fn persistence_factory( async fn persistence_factory(
opts: &SessionOptions, opts: &SessionOptions,
) -> anyhow::Result<(Option<Arc<dyn SessionPersistenceStore>>, Arc<dyn BitVFactory>)> { ) -> anyhow::Result<(
Option<Arc<dyn SessionPersistenceStore>>,
Arc<dyn BitVFactory>,
)> {
macro_rules! make_result { macro_rules! make_result {
($store:expr) => { ($store:expr) => {
if opts.fastresume { if opts.fastresume {
@ -538,7 +540,7 @@ impl Session {
); );
make_result!(s) make_result!(s)
}, }
#[cfg(feature = "postgres")] #[cfg(feature = "postgres")]
Some(SessionPersistenceConfig::Postgres { connection_string }) => { Some(SessionPersistenceConfig::Postgres { connection_string }) => {
use crate::session_persistence::postgres::PostgresSessionStorage; use crate::session_persistence::postgres::PostgresSessionStorage;
@ -606,17 +608,22 @@ impl Session {
connector: stream_connector, connector: stream_connector,
root_span: opts.root_span, root_span: opts.root_span,
stats: SessionStats::new(), stats: SessionStats::new(),
concurrent_initialize_semaphore: Arc::new( tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3))) concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(
opts.concurrent_init_limit.unwrap_or(3),
)),
}); });
if let Some(mut disk_write_rx) = disk_write_rx { if let Some(mut disk_write_rx) = disk_write_rx {
session.spawn(error_span!(parent: session.rs(), "disk_writer"), async move { session.spawn(
while let Some(work) = disk_write_rx.recv().await { error_span!(parent: session.rs(), "disk_writer"),
trace!(disk_write_rx_queue_len = disk_write_rx.len()); async move {
spawner.spawn_block_in_place(work); while let Some(work) = disk_write_rx.recv().await {
} trace!(disk_write_rx_queue_len = disk_write_rx.len());
Ok(()) spawner.spawn_block_in_place(work);
}); }
Ok(())
},
);
} }
if let Some(tcp_listener) = tcp_listener { if let Some(tcp_listener) = tcp_listener {
@ -638,30 +645,36 @@ impl Session {
if let Some(persistence) = session.persistence.as_ref() { if let Some(persistence) = session.persistence.as_ref() {
info!("will use {persistence:?} for session persistence"); info!("will use {persistence:?} for session persistence");
let mut ps = persistence.stream_all().await?; let mut ps = persistence.stream_all().await?;
let mut added_all = false; let mut added_all = false;
let mut futs = FuturesUnordered::new(); let mut futs = FuturesUnordered::new();
while !added_all || !futs.is_empty() { while !added_all || !futs.is_empty() {
// NOTE: this closure exists purely to workaround rustfmt screwing up when inlining it.
let add_torrent_span = |info_hash: &Id20| -> tracing::Span {
error_span!(parent: session.rs(), "add_torrent", info_hash=?info_hash)
};
tokio::select! { tokio::select! {
Some(res) = futs.next(), if !futs.is_empty() => { Some(res) = futs.next(), if !futs.is_empty() => {
if let Err(e) = res { if let Err(e) = res {
error!("error adding torrent to session: {e:?}"); error!("error adding torrent to session: {e:?}");
} }
}, }
st = ps.next(), if !added_all => { st = ps.next(), if !added_all => {
if let Some(st) = st { match st {
let (id, st) = st?; Some(st) => {
let span = error_span!(parent: session.rs(), "add_torrent", info_hash=?st.info_hash()); let (id, st) = st?;
let (add_torrent, mut opts) = st.into_add_torrent()?; let span = add_torrent_span(st.info_hash());
opts.preferred_id = Some(id); let (add_torrent, mut opts) = st.into_add_torrent()?;
let fut = session.add_torrent(add_torrent, Some(opts)).instrument(span); opts.preferred_id = Some(id);
futs.push(fut); let fut = session.add_torrent(add_torrent, Some(opts));
} else { let fut = fut.instrument(span);
added_all = true; futs.push(fut);
} },
}, None => added_all = true
} };
}
};
} }
} }