This commit is contained in:
Igor Katson 2021-06-26 00:43:28 +01:00
parent e1354e8a85
commit 12b3f12859

View file

@ -17,7 +17,10 @@ use futures::{stream::FuturesUnordered, StreamExt};
use log::{debug, error, info, trace, warn};
use parking_lot::{Mutex, RwLock};
use reqwest::Url;
use tokio::sync::{mpsc::Sender, Notify, Semaphore};
use tokio::{
sync::{mpsc::Sender, Notify, Semaphore},
task::JoinHandle,
};
use crate::{
buffers::ByteString,
@ -237,10 +240,10 @@ fn spawn<N: Display + 'static + Send>(
});
}
async fn spawn_blocking<T: Send + Sync + 'static, N: Display + 'static + Send>(
fn spawn_blocking<T: Send + Sync + 'static, N: Display + 'static + Send>(
name: N,
f: impl FnOnce() -> anyhow::Result<T> + Send + 'static,
) -> anyhow::Result<T> {
) -> JoinHandle<anyhow::Result<T>> {
debug!("starting blocking task \"{}\"", name);
tokio::task::spawn_blocking(move || match f() {
Ok(v) => {
@ -252,7 +255,6 @@ async fn spawn_blocking<T: Send + Sync + 'static, N: Display + 'static + Send>(
Err(e)
}
})
.await?
}
fn make_lengths(torrent: &TorrentMetaV1Owned) -> anyhow::Result<Lengths> {
@ -472,7 +474,7 @@ impl TorrentManager {
),
move || clone.read_chunk_blocking(peer_handle, chunk_info),
)
.await?;
.await??;
let tx = this
.inner
.locked
@ -490,6 +492,7 @@ impl TorrentManager {
chunk_info.offset,
chunk,
));
info!("sending to {}: {:?}", peer_handle, &message);
Ok::<_, anyhow::Error>(tx.send(message).await?)
});
}