Cancellation... It compiles now at least with latest changes, but not sure if they work or are correct

This commit is contained in:
Igor Katson 2024-02-18 20:11:12 +00:00
parent 1582d16cc5
commit f5ccb8632b
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 71 additions and 57 deletions

View file

@ -768,7 +768,10 @@ impl Session {
self.tcp_listen_port self.tcp_listen_port
}; };
let (info_hash, info, trackers, peer_rx, initial_peers) = match add { let cancellation_token = self.cancellation_token.child_token();
let cancellation_token_drop_guard = cancellation_token.clone().drop_guard();
let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
let magnet = let magnet =
Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?;
@ -776,23 +779,13 @@ impl Session {
.as_id20() .as_id20()
.context("magnet link didn't contain a BTv1 infohash")?; .context("magnet link didn't contain a BTv1 infohash")?;
let dht_peer_rx = self let peer_rx = self.make_peer_rx(
.dht
.as_ref()
.map(|d| d.get_peers(info_hash, announce_port))
.transpose()?;
let tracker_peer_rx = TrackerComms::start(
info_hash, info_hash,
self.peer_id,
magnet.trackers.clone(), magnet.trackers.clone(),
Box::new(TorrentStatsForTrackerDummy {}), cancellation_token.clone(),
opts.force_tracker_interval, announce_port,
self.cancellation_token().clone(), )?;
self.tcp_listen_port, let peer_rx = match peer_rx {
);
let peer_rx = match merge_peer_rx(dht_peer_rx, tracker_peer_rx) {
Some(peer_rx) => peer_rx, Some(peer_rx) => peer_rx,
None => bail!("can't find peers: DHT disabled and no trackers in magnet"), None => bail!("can't find peers: DHT disabled and no trackers in magnet"),
}; };
@ -823,6 +816,7 @@ impl Session {
Some(peer_rx) Some(peer_rx)
}, },
initial_peers, initial_peers,
cancellation_token,
) )
} }
other => { other => {
@ -844,13 +838,6 @@ impl Session {
AddTorrent::TorrentInfo(t) => *t, AddTorrent::TorrentInfo(t) => *t,
}; };
let dht_rx = match self.dht.as_ref() {
Some(dht) if !opts.paused && !opts.list_only => {
debug!(info_hash=?torrent.info_hash, "reading peers from DHT");
Some(dht.get_peers(torrent.info_hash, announce_port)?)
}
_ => None,
};
let trackers = torrent let trackers = torrent
.iter_announce() .iter_announce()
.filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) {
@ -862,17 +849,12 @@ impl Session {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let tracker_peer_rx = TrackerComms::start( let peer_rx = self.make_peer_rx(
torrent.info_hash, torrent.info_hash,
self.peer_id,
trackers.clone(), trackers.clone(),
Box::new(TorrentStatsForTrackerDummy {}), cancellation_token.clone(),
opts.force_tracker_interval, announce_port,
self.cancellation_token().clone(), )?;
self.tcp_listen_port,
);
let peer_rx = merge_peer_rx(dht_rx, tracker_peer_rx);
( (
torrent.info_hash, torrent.info_hash,
@ -884,10 +866,13 @@ impl Session {
.unwrap_or_default() .unwrap_or_default()
.into_iter() .into_iter()
.collect(), .collect(),
cancellation_token,
) )
} }
}; };
cancellation_token_drop_guard.disarm();
self.main_torrent_info( self.main_torrent_info(
info_hash, info_hash,
info, info,
@ -895,6 +880,7 @@ impl Session {
peer_rx, peer_rx,
initial_peers.into_iter().collect(), initial_peers.into_iter().collect(),
opts, opts,
cancellation_token,
) )
.await .await
} }
@ -908,9 +894,12 @@ impl Session {
peer_rx: Option<PeerStream>, peer_rx: Option<PeerStream>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
opts: AddTorrentOptions, opts: AddTorrentOptions,
cancellation_token: CancellationToken,
) -> anyhow::Result<AddTorrentResponse> { ) -> anyhow::Result<AddTorrentResponse> {
debug!("Torrent info: {:#?}", &info); debug!("Torrent info: {:#?}", &info);
let drop_guard = cancellation_token.clone().drop_guard();
let get_only_files = let get_only_files =
|only_files: Option<Vec<usize>>, only_files_regex: Option<String>, list_only: bool| { |only_files: Option<Vec<usize>>, only_files_regex: Option<String>, list_only: bool| {
match (only_files, only_files_regex) { match (only_files, only_files_regex) {
@ -991,7 +980,7 @@ impl Session {
builder builder
.overwrite(opts.overwrite) .overwrite(opts.overwrite)
.spawner(self.spawner) .spawner(self.spawner)
.cancellation_token(self.cancellation_token.child_token()) .cancellation_token(cancellation_token.clone())
.trackers(trackers) .trackers(trackers)
.peer_id(self.peer_id); .peer_id(self.peer_id);
@ -1029,10 +1018,16 @@ impl Session {
let span = managed_torrent.info.span.clone(); let span = managed_torrent.info.span.clone();
let _ = span.enter(); let _ = span.enter();
managed_torrent managed_torrent
.start(initial_peers, peer_rx, opts.paused) .start(
initial_peers,
peer_rx,
opts.paused,
cancellation_token.child_token(),
)
.context("error starting torrent")?; .context("error starting torrent")?;
} }
drop_guard.disarm();
Ok(AddTorrentResponse::Added(id, managed_torrent)) Ok(AddTorrentResponse::Added(id, managed_torrent))
} }
@ -1078,24 +1073,41 @@ impl Session {
} }
} }
pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { fn make_peer_rx(
&self,
info_hash: Id20,
trackers: Vec<String>,
cancel: CancellationToken,
announce_port: Option<u16>,
) -> anyhow::Result<Option<PeerStream>> {
let announce_port = announce_port.or(self.tcp_listen_port);
let dht_rx = self let dht_rx = self
.dht .dht
.as_ref() .as_ref()
.map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port)) .map(|dht| dht.get_peers(info_hash, announce_port))
.transpose()?; .transpose()?;
let trackers = handle.info().trackers.clone();
let peer_rx = TrackerComms::start( let peer_rx = TrackerComms::start(
handle.info.info_hash, info_hash,
handle.info.peer_id, self.peer_id,
trackers.into_iter().collect(), trackers,
Box::new(tracker_comms::TorrentStatsForTrackerDummy {}), Box::new(tracker_comms::TorrentStatsForTrackerDummy {}),
None, None,
self.cancellation_token.clone(), cancel,
self.tcp_listen_port, announce_port,
); );
let peer_rx = merge_peer_rx(dht_rx, peer_rx); let peer_rx = merge_peer_rx(dht_rx, peer_rx);
handle.start(Default::default(), peer_rx, false)?; Ok(peer_rx)
}
pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
let token = handle.cancellation_token.child_token();
let peer_rx = self.make_peer_rx(
handle.info_hash(),
handle.info().trackers.clone().into_iter().collect(),
token.clone(),
self.tcp_listen_port,
)?;
handle.start(Default::default(), peer_rx, false, token)?;
Ok(()) Ok(())
} }
} }

View file

@ -15,7 +15,6 @@ use std::time::Duration;
use anyhow::bail; use anyhow::bail;
use anyhow::Context; use anyhow::Context;
use buffers::ByteString; use buffers::ByteString;
use dht::RequestPeersStream;
use librqbit_core::hash_id::Id20; use librqbit_core::hash_id::Id20;
use librqbit_core::lengths::Lengths; use librqbit_core::lengths::Lengths;
use librqbit_core::peer_id::generate_peer_id; use librqbit_core::peer_id::generate_peer_id;
@ -32,7 +31,6 @@ use tracing::debug;
use tracing::error_span; use tracing::error_span;
use tracing::trace; use tracing::trace;
use tracing::warn; use tracing::warn;
use url::Url;
use crate::chunk_tracker::ChunkTracker; use crate::chunk_tracker::ChunkTracker;
use crate::spawn_utils::BlockingSpawner; use crate::spawn_utils::BlockingSpawner;
@ -176,13 +174,15 @@ impl ManagedTorrent {
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
peer_rx: Option<PeerStream>, peer_rx: Option<PeerStream>,
start_paused: bool, start_paused: bool,
live_cancellation_token: CancellationToken,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut g = self.locked.write(); let mut g = self.locked.write();
let spawn_fatal_errors_receiver = let spawn_fatal_errors_receiver =
|state: &Arc<Self>, rx: tokio::sync::oneshot::Receiver<anyhow::Error>| { |state: &Arc<Self>,
rx: tokio::sync::oneshot::Receiver<anyhow::Error>,
token: CancellationToken| {
let span = state.info.span.clone(); let span = state.info.span.clone();
let token = state.cancellation_token.clone();
let state = Arc::downgrade(state); let state = Arc::downgrade(state);
spawn_with_cancel( spawn_with_cancel(
error_span!(parent: span, "fatal_errors_receiver"), error_span!(parent: span, "fatal_errors_receiver"),
@ -258,7 +258,7 @@ impl ManagedTorrent {
drop(g); drop(g);
let t = self.clone(); let t = self.clone();
let span = self.info().span.clone(); let span = self.info().span.clone();
let token = self.cancellation_token.clone(); let token = live_cancellation_token.clone();
spawn_with_cancel( spawn_with_cancel(
error_span!(parent: span.clone(), "initialize_and_start"), error_span!(parent: span.clone(), "initialize_and_start"),
token.clone(), token.clone(),
@ -278,10 +278,11 @@ impl ManagedTorrent {
} }
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx, token.child_token()); let live =
TorrentStateLive::new(paused, tx, live_cancellation_token);
g.state = ManagedTorrentState::Live(live.clone()); g.state = ManagedTorrentState::Live(live.clone());
spawn_fatal_errors_receiver(&t, rx); spawn_fatal_errors_receiver(&t, rx, token);
spawn_peer_adder(&live, initial_peers, peer_rx); spawn_peer_adder(&live, initial_peers, peer_rx);
Ok(()) Ok(())
@ -299,13 +300,9 @@ impl ManagedTorrent {
ManagedTorrentState::Paused(_) => { ManagedTorrentState::Paused(_) => {
let paused = g.state.take().assert_paused(); let paused = g.state.take().assert_paused();
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new( let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone());
paused,
tx,
self.cancellation_token.child_token().clone(),
);
g.state = ManagedTorrentState::Live(live.clone()); g.state = ManagedTorrentState::Live(live.clone());
spawn_fatal_errors_receiver(self, rx); spawn_fatal_errors_receiver(self, rx, live_cancellation_token);
spawn_peer_adder(&live, initial_peers, peer_rx); spawn_peer_adder(&live, initial_peers, peer_rx);
Ok(()) Ok(())
} }
@ -318,7 +315,12 @@ impl ManagedTorrent {
drop(g); drop(g);
// Recurse. // Recurse.
self.start(initial_peers, peer_rx, start_paused) self.start(
initial_peers,
peer_rx,
start_paused,
live_cancellation_token,
)
} }
ManagedTorrentState::None => bail!("bug: torrent is in empty state"), ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
} }