From f5ccb8632b00069ef8d70aac1aabbc685d11ec23 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 18 Feb 2024 20:11:12 +0000 Subject: [PATCH] Cancellation... It compiles now at least with latest changes, but not sure if they work or are correct --- crates/librqbit/src/session.rs | 98 +++++++++++++----------- crates/librqbit/src/torrent_state/mod.rs | 30 ++++---- 2 files changed, 71 insertions(+), 57 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 4508e9c..6250425 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -768,7 +768,10 @@ impl Session { self.tcp_listen_port }; - let (info_hash, info, trackers, peer_rx, initial_peers) = match add { + let cancellation_token = self.cancellation_token.child_token(); + let cancellation_token_drop_guard = cancellation_token.clone().drop_guard(); + + let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { let magnet = Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; @@ -776,23 +779,13 @@ impl Session { .as_id20() .context("magnet link didn't contain a BTv1 infohash")?; - let dht_peer_rx = self - .dht - .as_ref() - .map(|d| d.get_peers(info_hash, announce_port)) - .transpose()?; - - let tracker_peer_rx = TrackerComms::start( + let peer_rx = self.make_peer_rx( info_hash, - self.peer_id, magnet.trackers.clone(), - Box::new(TorrentStatsForTrackerDummy {}), - opts.force_tracker_interval, - self.cancellation_token().clone(), - self.tcp_listen_port, - ); - - let peer_rx = match merge_peer_rx(dht_peer_rx, tracker_peer_rx) { + cancellation_token.clone(), + announce_port, + )?; + let peer_rx = match peer_rx { Some(peer_rx) => peer_rx, None => bail!("can't find peers: DHT disabled and no trackers in magnet"), }; @@ -823,6 +816,7 @@ impl Session { Some(peer_rx) }, initial_peers, + cancellation_token, ) } other => { @@ -844,13 +838,6 @@ impl Session { AddTorrent::TorrentInfo(t) => *t, }; - let dht_rx = match self.dht.as_ref() { - Some(dht) if !opts.paused && !opts.list_only => { - debug!(info_hash=?torrent.info_hash, "reading peers from DHT"); - Some(dht.get_peers(torrent.info_hash, announce_port)?) - } - _ => None, - }; let trackers = torrent .iter_announce() .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { @@ -862,17 +849,12 @@ impl Session { }) .collect::>(); - let tracker_peer_rx = TrackerComms::start( + let peer_rx = self.make_peer_rx( torrent.info_hash, - self.peer_id, trackers.clone(), - Box::new(TorrentStatsForTrackerDummy {}), - opts.force_tracker_interval, - self.cancellation_token().clone(), - self.tcp_listen_port, - ); - - let peer_rx = merge_peer_rx(dht_rx, tracker_peer_rx); + cancellation_token.clone(), + announce_port, + )?; ( torrent.info_hash, @@ -884,10 +866,13 @@ impl Session { .unwrap_or_default() .into_iter() .collect(), + cancellation_token, ) } }; + cancellation_token_drop_guard.disarm(); + self.main_torrent_info( info_hash, info, @@ -895,6 +880,7 @@ impl Session { peer_rx, initial_peers.into_iter().collect(), opts, + cancellation_token, ) .await } @@ -908,9 +894,12 @@ impl Session { peer_rx: Option, initial_peers: Vec, opts: AddTorrentOptions, + cancellation_token: CancellationToken, ) -> anyhow::Result { debug!("Torrent info: {:#?}", &info); + let drop_guard = cancellation_token.clone().drop_guard(); + let get_only_files = |only_files: Option>, only_files_regex: Option, list_only: bool| { match (only_files, only_files_regex) { @@ -991,7 +980,7 @@ impl Session { builder .overwrite(opts.overwrite) .spawner(self.spawner) - .cancellation_token(self.cancellation_token.child_token()) + .cancellation_token(cancellation_token.clone()) .trackers(trackers) .peer_id(self.peer_id); @@ -1029,10 +1018,16 @@ impl Session { let span = managed_torrent.info.span.clone(); let _ = span.enter(); managed_torrent - .start(initial_peers, peer_rx, opts.paused) + .start( + initial_peers, + peer_rx, + opts.paused, + cancellation_token.child_token(), + ) .context("error starting torrent")?; } + drop_guard.disarm(); Ok(AddTorrentResponse::Added(id, managed_torrent)) } @@ -1078,24 +1073,41 @@ impl Session { } } - pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { + fn make_peer_rx( + &self, + info_hash: Id20, + trackers: Vec, + cancel: CancellationToken, + announce_port: Option, + ) -> anyhow::Result> { + let announce_port = announce_port.or(self.tcp_listen_port); let dht_rx = self .dht .as_ref() - .map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port)) + .map(|dht| dht.get_peers(info_hash, announce_port)) .transpose()?; - let trackers = handle.info().trackers.clone(); let peer_rx = TrackerComms::start( - handle.info.info_hash, - handle.info.peer_id, - trackers.into_iter().collect(), + info_hash, + self.peer_id, + trackers, Box::new(tracker_comms::TorrentStatsForTrackerDummy {}), None, - self.cancellation_token.clone(), - self.tcp_listen_port, + cancel, + announce_port, ); let peer_rx = merge_peer_rx(dht_rx, peer_rx); - handle.start(Default::default(), peer_rx, false)?; + Ok(peer_rx) + } + + pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { + let token = handle.cancellation_token.child_token(); + let peer_rx = self.make_peer_rx( + handle.info_hash(), + handle.info().trackers.clone().into_iter().collect(), + token.clone(), + self.tcp_listen_port, + )?; + handle.start(Default::default(), peer_rx, false, token)?; Ok(()) } } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 0899750..d8267fa 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -15,7 +15,6 @@ use std::time::Duration; use anyhow::bail; use anyhow::Context; use buffers::ByteString; -use dht::RequestPeersStream; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::Lengths; use librqbit_core::peer_id::generate_peer_id; @@ -32,7 +31,6 @@ use tracing::debug; use tracing::error_span; use tracing::trace; use tracing::warn; -use url::Url; use crate::chunk_tracker::ChunkTracker; use crate::spawn_utils::BlockingSpawner; @@ -176,13 +174,15 @@ impl ManagedTorrent { initial_peers: Vec, peer_rx: Option, start_paused: bool, + live_cancellation_token: CancellationToken, ) -> anyhow::Result<()> { let mut g = self.locked.write(); let spawn_fatal_errors_receiver = - |state: &Arc, rx: tokio::sync::oneshot::Receiver| { + |state: &Arc, + rx: tokio::sync::oneshot::Receiver, + token: CancellationToken| { let span = state.info.span.clone(); - let token = state.cancellation_token.clone(); let state = Arc::downgrade(state); spawn_with_cancel( error_span!(parent: span, "fatal_errors_receiver"), @@ -258,7 +258,7 @@ impl ManagedTorrent { drop(g); let t = self.clone(); let span = self.info().span.clone(); - let token = self.cancellation_token.clone(); + let token = live_cancellation_token.clone(); spawn_with_cancel( error_span!(parent: span.clone(), "initialize_and_start"), token.clone(), @@ -278,10 +278,11 @@ impl ManagedTorrent { } let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new(paused, tx, token.child_token()); + let live = + TorrentStateLive::new(paused, tx, live_cancellation_token); g.state = ManagedTorrentState::Live(live.clone()); - spawn_fatal_errors_receiver(&t, rx); + spawn_fatal_errors_receiver(&t, rx, token); spawn_peer_adder(&live, initial_peers, peer_rx); Ok(()) @@ -299,13 +300,9 @@ impl ManagedTorrent { ManagedTorrentState::Paused(_) => { let paused = g.state.take().assert_paused(); let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new( - paused, - tx, - self.cancellation_token.child_token().clone(), - ); + let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone()); g.state = ManagedTorrentState::Live(live.clone()); - spawn_fatal_errors_receiver(self, rx); + spawn_fatal_errors_receiver(self, rx, live_cancellation_token); spawn_peer_adder(&live, initial_peers, peer_rx); Ok(()) } @@ -318,7 +315,12 @@ impl ManagedTorrent { drop(g); // Recurse. - self.start(initial_peers, peer_rx, start_paused) + self.start( + initial_peers, + peer_rx, + start_paused, + live_cancellation_token, + ) } ManagedTorrentState::None => bail!("bug: torrent is in empty state"), }