Use tokio_util::CancellationToken everywhere

This commit is contained in:
Igor Katson 2023-12-07 08:10:17 +00:00
parent 53868ad45e
commit bed7433d8e
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
16 changed files with 176 additions and 178 deletions

View file

@ -21,6 +21,7 @@ use librqbit_core::{
directories::get_configuration_directory,
magnet::Magnet,
peer_id::generate_peer_id,
spawn_utils::spawn_with_cancel,
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned},
};
use parking_lot::RwLock;
@ -32,12 +33,13 @@ use tokio::{
io::AsyncReadExt,
net::{TcpListener, TcpStream},
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, error_span, info, trace, warn, Instrument};
use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
peer_connection::{with_timeout, PeerConnectionOptions},
spawn_utils::{spawn, BlockingSpawner},
spawn_utils::BlockingSpawner,
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
},
@ -150,23 +152,6 @@ struct SerializedSessionDatabase {
torrents: HashMap<usize, SerializedTorrent>,
}
fn spawn_with_cancel_token(
mut cancel_rx: tokio::sync::watch::Receiver<()>,
name: &str,
span: tracing::Span,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) {
spawn(name, span, async move {
tokio::select! {
r = fut => r,
_ = cancel_rx.changed() => {
debug!("task canceled");
Ok(())
}
}
});
}
pub struct Session {
peer_id: Id20,
dht: Option<Dht>,
@ -178,8 +163,7 @@ pub struct Session {
tcp_listen_port: Option<u16>,
cancel_tx: tokio::sync::watch::Sender<()>,
cancel_rx: tokio::sync::watch::Receiver<()>,
cancellation_token: CancellationToken,
}
async fn torrent_from_url(url: &str) -> anyhow::Result<TorrentMetaV1Owned> {
@ -395,14 +379,17 @@ impl Session {
Ok(dir.data_dir().join("session.json"))
}
pub fn cancellation_token(&self) -> &CancellationToken {
&self.cancellation_token
}
/// Create a new session with options.
pub async fn new_with_opts(
output_folder: PathBuf,
mut opts: SessionOptions,
) -> anyhow::Result<Arc<Self>> {
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(());
let token = CancellationToken::new();
let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range {
let (l, p) = create_tcp_listener(port_range)
@ -418,24 +405,17 @@ impl Session {
None
} else {
let dht = if opts.disable_dht_persistence {
let (dht, run_worker) = DhtBuilder::with_config(DhtConfig::default())
.await
.context("error initializing DHT")?;
spawn_with_cancel_token(cancel_rx.clone(), "dht", error_span!("dht"), run_worker);
dht
DhtBuilder::with_config(DhtConfig {
cancellation_token: Some(token.child_token()),
..Default::default()
})
.await
.context("error initializing DHT")?
} else {
let pdht_config = opts.dht_config.take().unwrap_or_default();
let (dht, run_worker, run_persistence) = PersistentDht::create(Some(pdht_config))
PersistentDht::create(Some(pdht_config), Some(token.clone()))
.await
.context("error initializing persistent DHT")?;
spawn_with_cancel_token(cancel_rx.clone(), "dht", error_span!("dht"), run_worker);
spawn_with_cancel_token(
cancel_rx.clone(),
"dht_persistence",
error_span!("dht_persistence"),
run_persistence,
);
dht
.context("error initializing persistent DHT")?
};
Some(dht)
@ -455,14 +435,12 @@ impl Session {
spawner,
output_folder,
db: RwLock::new(Default::default()),
cancel_rx,
cancel_tx,
cancellation_token: token,
tcp_listen_port,
});
if let Some(tcp_listener) = tcp_listener {
session.spawn(
"tcp listener",
error_span!("tcp_listen", port = tcp_listen_port),
session.clone().task_tcp_listener(tcp_listener),
);
@ -471,7 +449,6 @@ impl Session {
if let Some(listen_port) = tcp_listen_port {
if opts.enable_upnp_port_forwarding {
session.spawn(
"upnp_forward",
error_span!("upnp_forward", port = listen_port),
session.clone().task_upnp_port_forwarder(listen_port),
);
@ -489,11 +466,7 @@ impl Session {
})?;
}
let persistence_task = session.clone().task_persistence();
session.spawn(
"session persistene",
error_span!("session_persistence"),
persistence_task,
);
session.spawn(error_span!("session_persistence"), persistence_task);
}
Ok(session)
@ -645,11 +618,10 @@ impl Session {
/// Spawn a task in the context of the session.
pub fn spawn(
&self,
name: &str,
span: tracing::Span,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) {
spawn_with_cancel_token(self.cancel_rx.clone(), name, span, fut);
spawn_with_cancel(span, self.cancellation_token.clone(), fut);
}
/// Stop the session and all managed tasks.
@ -666,7 +638,7 @@ impl Session {
debug!("error pausing torrent: {e:#}");
}
}
let _ = self.cancel_tx.send(());
self.cancellation_token.cancel();
// this sucks, but hopefully will be enough
tokio::time::sleep(Duration::from_secs(1)).await;
}
@ -999,6 +971,7 @@ impl Session {
builder
.overwrite(opts.overwrite)
.spawner(self.spawner)
.cancellation_token(self.cancellation_token.child_token())
.peer_id(self.peer_id);
if opts.disable_trackers {