Split up "add_torrent" method

This commit is contained in:
Igor Katson 2024-12-05 23:37:13 +00:00
parent 100b7116df
commit 456a51d4db
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -40,7 +40,7 @@ use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig};
use futures::{
future::BoxFuture,
stream::{BoxStream, FuturesUnordered},
FutureExt, Stream, TryFutureExt,
FutureExt, Stream, StreamExt, TryFutureExt,
};
use itertools::Itertools;
use librqbit_core::{
@ -58,7 +58,6 @@ use tokio::{
net::{TcpListener, TcpStream},
sync::Notify,
};
use tokio_stream::StreamExt;
use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span};
use tracker_comms::TrackerComms;
@ -474,12 +473,8 @@ pub(crate) struct CheckedIncomingConnection {
struct InternalAddResult {
info_hash: Id20,
info: TorrentMetaV1Info<ByteBufOwned>,
torrent_bytes: Bytes,
info_bytes: Bytes,
metadata: Option<TorrentMetadata>,
trackers: Vec<String>,
peer_rx: Option<PeerStream>,
seen_peers: Vec<SocketAddr>,
}
impl Session {
@ -883,15 +878,7 @@ impl Session {
opts: Option<AddTorrentOptions>,
) -> BoxFuture<'a, anyhow::Result<AddTorrentResponse>> {
async move {
// Magnet links are different in that we first need to discover the metadata.
let mut opts = opts.unwrap_or_default();
let paused = opts.list_only || opts.paused;
// The main difference between magnet link and torrent file, is that we need to resolve the magnet link
// into a torrent file by connecting to peers that support extended handshakes.
// So we must discover at least one peer and connect to it to be able to proceed further.
let add_res = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") || magnet.len() == 40 => {
let magnet = Magnet::parse(&magnet)
@ -906,66 +893,10 @@ impl Session {
}
}
let peer_rx = self.make_peer_rx(
InternalAddResult {
info_hash,
if opts.disable_trackers {
Default::default()
} else {
let mut trackers = magnet.trackers.clone();
if let Some(custom_trackers) = opts.trackers.clone() {
trackers.extend(custom_trackers);
}
trackers
},
!paused,
opts.force_tracker_interval,
opts.initial_peers.clone().unwrap_or_default()
)?.context("can't find peers: DHT is disabled, no trackers in magnet, and no initial peers provided")?;
debug!(?info_hash, "querying DHT");
match read_metainfo_from_peer_receiver(
self.peer_id,
info_hash,
Default::default(),
peer_rx,
Some(self.merge_peer_opts(opts.peer_opts)),
self.connector.clone(),
)
.await
{
ReadMetainfoResult::Found {
info,
info_bytes,
rx,
seen,
} => {
trace!(?info, "received result from DHT");
let mut trackers = magnet.trackers.into_iter().unique().collect_vec();
if let Some(custom_trackers) = opts.trackers.clone() {
trackers.extend(custom_trackers);
}
InternalAddResult {
info_hash,
torrent_bytes: torrent_file_from_info_bytes(
&info_bytes,
&trackers,
)?,
info_bytes: info_bytes.0,
info,
trackers,
peer_rx: Some(rx),
seen_peers: {
let seen = seen.into_iter().collect_vec();
for peer in &seen {
trace!(?peer, "seen")
}
seen
},
}
}
ReadMetainfoResult::ChannelClosed { .. } => {
bail!("input address stream exhausted, no way to discover torrent metainfo")
}
trackers: magnet.trackers,
metadata: None,
}
}
other => {
@ -981,12 +912,13 @@ impl Session {
url
)
}
AddTorrent::TorrentFileBytes(bytes) =>
torrent_from_bytes(bytes)
.context("error decoding torrent")?
AddTorrent::TorrentFileBytes(bytes) => {
torrent_from_bytes(bytes).context("error decoding torrent")?
}
};
let mut trackers = torrent.info
let mut trackers = torrent
.info
.iter_announce()
.unique()
.filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) {
@ -1001,35 +933,14 @@ impl Session {
trackers.extend(custom_trackers);
}
let peer_rx = if paused {
None
} else {
self.make_peer_rx(
torrent.info.info_hash,
if opts.disable_trackers {
Default::default()
} else {
trackers.clone()
},
!paused,
opts.force_tracker_interval,
opts.initial_peers.clone().unwrap_or_default()
)?
};
InternalAddResult {
info_hash: torrent.info.info_hash,
info: torrent.info.info,
torrent_bytes: torrent.torrent_bytes,
info_bytes: torrent.info_bytes,
metadata: Some(TorrentMetadata::new(
torrent.info.info,
torrent.torrent_bytes,
torrent.info_bytes,
)?),
trackers,
peer_rx,
seen_peers: opts
.initial_peers
.clone()
.unwrap_or_default()
.into_iter()
.collect(),
}
}
};
@ -1073,19 +984,58 @@ impl Session {
mut opts: AddTorrentOptions,
) -> anyhow::Result<AddTorrentResponse> {
let InternalAddResult {
info,
info_hash,
metadata,
trackers,
peer_rx,
seen_peers,
torrent_bytes,
info_bytes,
} = add_res;
trace!("Torrent info: {:#?}", &info);
let peer_stream_permanent = !opts.paused && !opts.list_only;
let make_peer_rx = || {
self.make_peer_rx(
info_hash,
trackers.clone(),
peer_stream_permanent,
opts.force_tracker_interval,
opts.initial_peers.clone().unwrap_or_default(),
)
.context("error creating peer stream")
};
let mut seen_peers = Vec::new();
let (metadata, peer_rx) = {
match metadata {
Some(metadata) => {
let mut peer_rx = None;
if peer_stream_permanent {
peer_rx = make_peer_rx()?;
}
(metadata, peer_rx)
}
None => {
let peer_rx = make_peer_rx()?.context(
"no known way to resolve peers (no DHT, no trackers, no initial_peers)",
)?;
let resolved_magnet = self
.resolve_magnet(info_hash, peer_rx, &trackers, opts.peer_opts)
.await?;
seen_peers = resolved_magnet.seen_peers.clone();
let peer_rx = Some(
merge_streams(
resolved_magnet.peer_rx,
futures::stream::iter(resolved_magnet.seen_peers),
)
.boxed(),
);
(resolved_magnet.metadata, peer_rx)
}
}
};
trace!("Torrent metadata: {:#?}", &metadata.info);
let only_files = compute_only_files(
&info,
&metadata.info,
opts.only_files,
opts.only_files_regex,
opts.list_only,
@ -1093,7 +1043,7 @@ impl Session {
let output_folder = match (opts.output_folder, opts.sub_folder) {
(None, None) => self.output_folder.join(
self.get_default_subfolder_for_torrent(&info)?
self.get_default_subfolder_for_torrent(&metadata.info)?
.unwrap_or_default(),
),
(Some(o), None) => PathBuf::from(o),
@ -1112,11 +1062,11 @@ impl Session {
if opts.list_only {
return Ok(AddTorrentResponse::ListOnly(ListOnlyResponse {
info_hash,
info,
info: metadata.info,
only_files,
output_folder,
seen_peers,
torrent_bytes,
torrent_bytes: metadata.torrent_bytes,
}));
}
@ -1143,7 +1093,7 @@ impl Session {
let span = error_span!(parent: self.rs(), "torrent", id);
let peer_opts = self.merge_peer_opts(opts.peer_opts);
let metadata = Arc::new(TorrentMetadata::new(info, torrent_bytes, info_bytes)?);
let metadata = Arc::new(metadata);
let minfo = Arc::new(ManagedTorrentShared {
id,
span,
@ -1411,6 +1361,58 @@ impl Session {
pub fn tcp_listen_port(&self) -> Option<u16> {
self.tcp_listen_port
}
async fn resolve_magnet(
self: &Arc<Self>,
info_hash: Id20,
peer_rx: PeerStream,
trackers: &[String],
peer_opts: Option<PeerConnectionOptions>,
) -> anyhow::Result<ResolveMagnetResult> {
match read_metainfo_from_peer_receiver(
self.peer_id,
info_hash,
Default::default(),
peer_rx,
Some(self.merge_peer_opts(peer_opts)),
self.connector.clone(),
)
.await
{
ReadMetainfoResult::Found {
info,
info_bytes,
rx,
seen,
} => {
trace!(?info, "received result from DHT");
Ok(ResolveMagnetResult {
metadata: TorrentMetadata::new(
info,
torrent_file_from_info_bytes(&info_bytes, trackers)?,
info_bytes.0,
)?,
peer_rx: rx,
seen_peers: {
let seen = seen.into_iter().collect_vec();
for peer in &seen {
trace!(?peer, "seen")
}
seen
},
})
}
ReadMetainfoResult::ChannelClosed { .. } => {
bail!("input address stream exhausted, no way to discover torrent metainfo")
}
}
}
}
pub(crate) struct ResolveMagnetResult {
pub metadata: TorrentMetadata,
pub peer_rx: PeerStream,
pub seen_peers: Vec<SocketAddr>,
}
fn remove_files_and_dirs(infos: &FileInfos, files: &dyn TorrentStorage) {