Better cancellation

This commit is contained in:
Igor Katson 2024-02-26 09:25:01 +00:00
parent 51dba8ab67
commit e263441fb6
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 30 additions and 22 deletions

View file

@ -17,8 +17,7 @@ webui-build: webui-deps
@PHONY: devserver @PHONY: devserver
devserver: devserver:
echo -n '' > /tmp/rqbit-log echo -n '' > /tmp/rqbit-log && cargo run --release -- \
cargo run --release -- \
--log-file /tmp/rqbit-log \ --log-file /tmp/rqbit-log \
--log-file-rust-log=debug,librqbit=trace \ --log-file-rust-log=debug,librqbit=trace \
server start /tmp/scratch/ server start /tmp/scratch/

View file

@ -666,7 +666,7 @@ impl Session {
.collect(); .collect();
let info = TorrentMetaV1Owned { let info = TorrentMetaV1Owned {
announce: trackers announce: trackers
.get(0) .first()
.cloned() .cloned()
.unwrap_or_else(|| ByteString(b"http://retracker.local/announce".to_vec())), .unwrap_or_else(|| ByteString(b"http://retracker.local/announce".to_vec())),
announce_list: vec![trackers], announce_list: vec![trackers],
@ -760,6 +760,7 @@ impl Session {
let cancellation_token = self.cancellation_token.child_token(); let cancellation_token = self.cancellation_token.child_token();
let cancellation_token_drop_guard = cancellation_token.clone().drop_guard(); let cancellation_token_drop_guard = cancellation_token.clone().drop_guard();
let paused = opts.list_only || opts.paused;
let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add { let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
@ -769,10 +770,11 @@ impl Session {
.as_id20() .as_id20()
.context("magnet link didn't contain a BTv1 infohash")?; .context("magnet link didn't contain a BTv1 infohash")?;
let peer_token = cancellation_token.child_token();
let peer_rx = self.make_peer_rx( let peer_rx = self.make_peer_rx(
info_hash, info_hash,
magnet.trackers.clone(), magnet.trackers.clone(),
cancellation_token.clone(), peer_token.clone(),
announce_port, announce_port,
opts.force_tracker_interval, opts.force_tracker_interval,
)?; )?;
@ -796,16 +798,15 @@ impl Session {
anyhow::bail!("DHT died, no way to discover torrent metainfo") anyhow::bail!("DHT died, no way to discover torrent metainfo")
} }
}; };
if paused {
peer_token.cancel();
}
debug!(?info, "received result from DHT"); debug!(?info, "received result from DHT");
( (
info_hash, info_hash,
info, info,
magnet.trackers, magnet.trackers,
if opts.paused || opts.list_only { Some(peer_rx),
None
} else {
Some(peer_rx)
},
initial_peers, initial_peers,
cancellation_token, cancellation_token,
) )
@ -840,13 +841,17 @@ impl Session {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let peer_rx = self.make_peer_rx( let peer_rx = if paused {
torrent.info_hash, None
trackers.clone(), } else {
cancellation_token.clone(), self.make_peer_rx(
announce_port, torrent.info_hash,
opts.force_tracker_interval, trackers.clone(),
)?; cancellation_token.clone(),
announce_port,
opts.force_tracker_interval,
)?
};
( (
torrent.info_hash, torrent.info_hash,
@ -1008,13 +1013,17 @@ impl Session {
{ {
let span = managed_torrent.info.span.clone(); let span = managed_torrent.info.span.clone();
let _ = span.enter(); let _ = span.enter();
// Just in case, cancel all tasks started for this torrent so far.
// This is defensive, and not proven necessary.
let token = if opts.paused {
cancellation_token.cancel();
self.cancellation_token.child_token()
} else {
cancellation_token
};
managed_torrent managed_torrent
.start( .start(initial_peers, peer_rx, opts.paused, token)
initial_peers,
peer_rx,
opts.paused,
cancellation_token.child_token(),
)
.context("error starting torrent")?; .context("error starting torrent")?;
} }