From e22132bba06a52649f5c73ce547505184eff0622 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 5 Dec 2024 21:40:40 +0000 Subject: [PATCH 01/19] "make_peer_rx" - include initial peers --- crates/librqbit/src/session.rs | 59 +++++++++++++++++----------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index ca9b923..a1104d5 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -480,7 +480,7 @@ struct InternalAddResult { info_bytes: Bytes, trackers: Vec, peer_rx: Option, - initial_peers: Vec, + seen_peers: Vec, } impl Session { @@ -889,8 +889,6 @@ impl Session { let paused = opts.list_only || opts.paused; - let announce_port = if paused { None } else { self.tcp_listen_port }; - // The main difference between magnet link and torrent file, is that we need to resolve the magnet link // into a torrent file by connecting to peers that support extended handshakes. // So we must discover at least one peer and connect to it to be able to proceed further. @@ -920,19 +918,10 @@ impl Session { } trackers }, - announce_port, + !paused, opts.force_tracker_interval, - )?; - let initial_peers_stream = opts - .initial_peers - .clone() - .and_then(|v| if v.is_empty() { None } else { Some(v) }) - .map(futures::stream::iter); - let peer_rx = merge_two_optional_streams(peer_rx, initial_peers_stream); - let peer_rx = match peer_rx { - Some(peer_rx) => peer_rx, - None => bail!("can't find peers: DHT is disabled, no trackers in magnet, and no initial peers provided"), - }; + opts.initial_peers.clone().unwrap_or_default() + )?.context("can't find peers: DHT is disabled, no trackers in magnet, and no initial peers provided")?; debug!(?info_hash, "querying DHT"); match read_metainfo_from_peer_receiver( @@ -966,7 +955,7 @@ impl Session { info, trackers, peer_rx: Some(rx), - initial_peers: { + seen_peers: { let seen = seen.into_iter().collect_vec(); for peer in &seen { trace!(?peer, "seen") @@ -1023,8 +1012,9 @@ impl Session { } else { trackers.clone() }, - announce_port, + !paused, opts.force_tracker_interval, + opts.initial_peers.clone().unwrap_or_default() )? }; @@ -1035,7 +1025,7 @@ impl Session { info_bytes: torrent.info_bytes, trackers, peer_rx, - initial_peers: opts + seen_peers: opts .initial_peers .clone() .unwrap_or_default() @@ -1088,7 +1078,7 @@ impl Session { info_hash, trackers, peer_rx, - initial_peers, + seen_peers, torrent_bytes, info_bytes, } = add_res; @@ -1126,7 +1116,7 @@ impl Session { info, only_files, output_folder, - seen_peers: initial_peers, + seen_peers, torrent_bytes, })); } @@ -1225,12 +1215,12 @@ impl Session { // Merge "initial_peers" and "peer_rx" into one stream. let peer_rx = merge_two_optional_streams( - if !initial_peers.is_empty() { + if !seen_peers.is_empty() { debug!( - count = initial_peers.len(), + count = seen_peers.len(), "merging initial peers into peer_rx" ); - Some(futures::stream::iter(initial_peers.into_iter())) + Some(futures::stream::iter(seen_peers.into_iter())) } else { None }, @@ -1344,31 +1334,39 @@ impl Session { self: &Arc, info_hash: Id20, trackers: Vec, - announce_port: Option, + announce: bool, force_tracker_interval: Option, + initial_peers: Vec, ) -> anyhow::Result> { - let announce_port = announce_port.or(self.tcp_listen_port); + let announce_port = if announce { self.tcp_listen_port } else { None }; let dht_rx = self .dht .as_ref() .map(|dht| dht.get_peers(info_hash, announce_port)) .transpose()?; - let peer_rx_stats = PeerRxTorrentInfo { + let tracker_rx_stats = PeerRxTorrentInfo { info_hash, session: self.clone(), }; - let peer_rx = TrackerComms::start( + let tracker_rx = TrackerComms::start( info_hash, self.peer_id, trackers, - Box::new(peer_rx_stats), + Box::new(tracker_rx_stats), force_tracker_interval, announce_port, self.reqwest_client.clone(), ); - Ok(merge_two_optional_streams(dht_rx, peer_rx)) + let initial_peers_rx = if initial_peers.is_empty() { + None + } else { + Some(futures::stream::iter(initial_peers)) + }; + let peer_rx = merge_two_optional_streams(dht_rx, tracker_rx); + let peer_rx = merge_two_optional_streams(peer_rx, initial_peers_rx); + Ok(peer_rx) } async fn try_update_persistence_metadata(&self, handle: &ManagedTorrentHandle) { @@ -1391,8 +1389,9 @@ impl Session { let peer_rx = self.make_peer_rx( handle.info_hash(), handle.shared().trackers.clone().into_iter().collect(), - self.tcp_listen_port, + true, handle.shared().options.force_tracker_interval, + Default::default(), )?; handle.start(peer_rx, false)?; self.try_update_persistence_metadata(handle).await; From 8131ba04826ea841711e1a0a5076e99a21f24097 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 5 Dec 2024 21:47:37 +0000 Subject: [PATCH 02/19] Store initial_peers in state --- crates/librqbit/src/session.rs | 3 ++- crates/librqbit/src/torrent_state/mod.rs | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index a1104d5..5f6b2c6 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1179,6 +1179,7 @@ impl Session { output_folder, disk_write_queue: self.disk_write_tx.clone(), ratelimits: opts.ratelimits, + initial_peers: opts.initial_peers.clone().unwrap_or_default(), #[cfg(feature = "disable-upload")] _disable_upload: self._disable_upload, }, @@ -1391,7 +1392,7 @@ impl Session { handle.shared().trackers.clone().into_iter().collect(), true, handle.shared().options.force_tracker_interval, - Default::default(), + handle.shared().options.initial_peers.clone(), )?; handle.start(peer_rx, false)?; self.try_update_persistence_metadata(handle).await; diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 4c16106..91aec07 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -6,6 +6,7 @@ mod streaming; pub mod utils; use std::collections::HashSet; +use std::net::SocketAddr; use std::path::PathBuf; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -105,6 +106,7 @@ pub(crate) struct ManagedTorrentOptions { pub output_folder: PathBuf, pub disk_write_queue: Option, pub ratelimits: LimitsConfig, + pub initial_peers: Vec, #[cfg(feature = "disable-upload")] pub _disable_upload: bool, } From 96946d5a818c01f9d8596fe2d4608a44b8c34289 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 5 Dec 2024 21:57:44 +0000 Subject: [PATCH 03/19] "make_peer_rx_managed_torrent" --- crates/librqbit/src/session.rs | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 5f6b2c6..d2e0578 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1330,6 +1330,21 @@ impl Session { Ok(()) } + pub fn make_peer_rx_managed_torrent( + self: &Arc, + t: &Arc, + announce: bool, + ) -> anyhow::Result { + self.make_peer_rx( + t.info_hash(), + t.shared().trackers.iter().cloned().collect(), + announce, + t.shared().options.force_tracker_interval, + t.shared().options.initial_peers.clone(), + )? + .context("no peer source") + } + // Get a peer stream from both DHT and trackers. fn make_peer_rx( self: &Arc, @@ -1387,14 +1402,8 @@ impl Session { } pub async fn unpause(self: &Arc, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { - let peer_rx = self.make_peer_rx( - handle.info_hash(), - handle.shared().trackers.clone().into_iter().collect(), - true, - handle.shared().options.force_tracker_interval, - handle.shared().options.initial_peers.clone(), - )?; - handle.start(peer_rx, false)?; + let peer_rx = self.make_peer_rx_managed_torrent(handle, true)?; + handle.start(Some(peer_rx), false)?; self.try_update_persistence_metadata(handle).await; Ok(()) } From b796a8767b566dfc30136ce8a437fbb9030f7a9b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 5 Dec 2024 21:58:12 +0000 Subject: [PATCH 04/19] "main_torrent_info" -> "add_torrent_internal" --- crates/librqbit/src/session.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index d2e0578..019f39e 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1035,7 +1035,7 @@ impl Session { } }; - self.main_torrent_info(add_res, opts).await + self.add_torrent_internal(add_res, opts).await } .instrument(error_span!(parent: self.rs(), "add_torrent")) .boxed() @@ -1068,7 +1068,7 @@ impl Session { Ok::<_, anyhow::Error>(Some(PathBuf::from(longest))) } - async fn main_torrent_info( + async fn add_torrent_internal( self: &Arc, add_res: InternalAddResult, mut opts: AddTorrentOptions, From e440f0397028dfe11f06e948b6fb644829c71c96 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 5 Dec 2024 22:05:04 +0000 Subject: [PATCH 05/19] spawn_peer_adder: Option -> PeerStream --- crates/librqbit/src/torrent_state/mod.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 91aec07..32246e5 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -269,7 +269,7 @@ impl ManagedTorrent { ); }; - fn spawn_peer_adder(live: &Arc, peer_rx: Option) { + fn spawn_peer_adder(live: &Arc, mut peer_rx: PeerStream) { live.spawn( error_span!(parent: live.torrent().span.clone(), "external_peer_adder"), { @@ -281,12 +281,6 @@ impl ManagedTorrent { weak }; - let mut peer_rx = if let Some(peer_rx) = peer_rx { - peer_rx - } else { - return Ok(()); - }; - loop { match timeout(Duration::from_secs(5), peer_rx.next()).await { Ok(Some(peer)) => { @@ -359,7 +353,9 @@ impl ManagedTorrent { t.state_change_notify.notify_waiters(); spawn_fatal_errors_receiver(&t, rx, token); - spawn_peer_adder(&live, peer_rx); + if let Some(peer_rx) = peer_rx { + spawn_peer_adder(&live, peer_rx); + } Ok(()) } @@ -382,7 +378,9 @@ impl ManagedTorrent { drop(g); spawn_fatal_errors_receiver(self, rx, cancellation_token); - spawn_peer_adder(&live, peer_rx); + if let Some(peer_rx) = peer_rx { + spawn_peer_adder(&live, peer_rx); + } Ok(()) } ManagedTorrentState::Error(_) => { From 100b7116df4c3300e9a95ed0904c3eb87b2f773d Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 5 Dec 2024 22:57:34 +0000 Subject: [PATCH 06/19] Split out TorrentMetadata --- crates/librqbit/examples/custom_storage.rs | 12 ++- crates/librqbit/examples/ubuntu.rs | 4 +- crates/librqbit/src/api.rs | 67 +++++++------- crates/librqbit/src/http_api.rs | 12 ++- crates/librqbit/src/lib.rs | 3 +- crates/librqbit/src/session.rs | 52 +++++------ .../librqbit/src/session_persistence/json.rs | 11 ++- .../src/session_persistence/postgres.rs | 9 +- crates/librqbit/src/storage/filesystem/fs.rs | 23 +++-- .../librqbit/src/storage/filesystem/mmap.rs | 20 ++-- .../librqbit/src/storage/middleware/slow.rs | 17 +++- .../librqbit/src/storage/middleware/timing.rs | 17 +++- .../storage/middleware/write_through_cache.rs | 23 +++-- crates/librqbit/src/storage/mod.rs | 48 +++++++--- .../src/torrent_state/initializing.rs | 44 +++++---- crates/librqbit/src/torrent_state/live/mod.rs | 91 ++++++++++--------- crates/librqbit/src/torrent_state/mod.rs | 72 +++++++++++++-- crates/librqbit/src/torrent_state/paused.rs | 5 +- .../librqbit/src/torrent_state/streaming.rs | 47 +++++++--- crates/librqbit/src/upnp_server_adapter.rs | 59 ++++++++---- 20 files changed, 411 insertions(+), 225 deletions(-) diff --git a/crates/librqbit/examples/custom_storage.rs b/crates/librqbit/examples/custom_storage.rs index f22daa8..af6f7dd 100644 --- a/crates/librqbit/examples/custom_storage.rs +++ b/crates/librqbit/examples/custom_storage.rs @@ -20,7 +20,11 @@ struct CustomStorage { impl StorageFactory for CustomStorageFactory { type Storage = CustomStorage; - fn create(&self, _info: &librqbit::ManagedTorrentShared) -> anyhow::Result { + fn create( + &self, + _: &librqbit::ManagedTorrentShared, + _: &librqbit::TorrentMetadata, + ) -> anyhow::Result { Ok(CustomStorage::default()) } @@ -54,7 +58,11 @@ impl TorrentStorage for CustomStorage { anyhow::bail!("not implemented") } - fn init(&mut self, _meta: &librqbit::ManagedTorrentShared) -> anyhow::Result<()> { + fn init( + &mut self, + _meta: &librqbit::ManagedTorrentShared, + _: &librqbit::TorrentMetadata, + ) -> anyhow::Result<()> { anyhow::bail!("not implemented") } } diff --git a/crates/librqbit/examples/ubuntu.rs b/crates/librqbit/examples/ubuntu.rs index 7dbd189..c35c907 100644 --- a/crates/librqbit/examples/ubuntu.rs +++ b/crates/librqbit/examples/ubuntu.rs @@ -49,7 +49,9 @@ async fn main() -> Result<(), anyhow::Error> { _ => unreachable!(), }; - info!("Details: {:?}", &handle.shared().info); + handle.with_metadata(|r| { + info!("Details: {:?}", &r.info); + })?; // Print stats periodically. tokio::spawn({ diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 230274c..ec8a4f6 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -209,7 +209,10 @@ impl Api { let mut r = TorrentDetailsResponse { id: Some(id), info_hash: mgr.shared().info_hash.as_string(), - name: mgr.shared().info.name.as_ref().map(|n| n.to_string()), + name: mgr + .with_metadata(|r| r.info.name.as_ref().map(|n| n.to_string())) + .ok() + .flatten(), output_folder: mgr .shared() .options @@ -245,7 +248,7 @@ impl Api { make_torrent_details( Some(handle.id()), &info_hash, - &handle.shared().info, + handle.metadata.load().as_ref().map(|r| &r.info), only_files.as_deref(), output_folder, ) @@ -261,8 +264,7 @@ impl Api { file_idx: usize, ) -> Result<&'static str> { let handle = self.mgr_handle(idx)?; - let info = &handle.shared().info; - torrent_file_mime_type(info, file_idx) + handle.with_metadata(|r| torrent_file_mime_type(&r.info, file_idx))? } pub fn api_peer_stats( @@ -380,7 +382,7 @@ impl Api { let details = make_torrent_details( Some(id), &handle.info_hash(), - &handle.shared().info, + handle.metadata.load().as_ref().map(|r| &r.info), handle.only_files().as_deref(), handle .shared() @@ -416,7 +418,7 @@ impl Api { details: make_torrent_details( None, &info_hash, - &info, + Some(&info), only_files.as_deref(), output_folder.to_string_lossy().into_owned().to_string(), ) @@ -426,7 +428,7 @@ impl Api { let details = make_torrent_details( Some(id), &handle.info_hash(), - &handle.shared().info, + handle.metadata.load().as_ref().map(|r| &r.info), handle.only_files().as_deref(), handle .shared() @@ -529,37 +531,40 @@ pub struct ApiAddTorrentResponse { fn make_torrent_details( id: Option, info_hash: &Id20, - info: &TorrentMetaV1Info, + info: Option<&TorrentMetaV1Info>, only_files: Option<&[usize]>, output_folder: String, ) -> Result { - let files = info - .iter_file_details() - .context("error iterating filenames and lengths")? - .enumerate() - .map(|(idx, d)| { - let name = match d.filename.to_string() { - Ok(s) => s, - Err(err) => { - warn!("error reading filename: {:?}", err); - "".to_string() + let files = match info { + Some(info) => info + .iter_file_details() + .context("error iterating filenames and lengths")? + .enumerate() + .map(|(idx, d)| { + let name = match d.filename.to_string() { + Ok(s) => s, + Err(err) => { + warn!("error reading filename: {:?}", err); + "".to_string() + } + }; + let components = d.filename.to_vec().unwrap_or_default(); + let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true); + TorrentDetailsResponseFile { + name, + components, + length: d.len, + included, + attributes: d.attrs(), } - }; - let components = d.filename.to_vec().unwrap_or_default(); - let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true); - TorrentDetailsResponseFile { - name, - components, - length: d.len, - included, - attributes: d.attrs(), - } - }) - .collect(); + }) + .collect(), + None => Default::default(), + }; Ok(TorrentDetailsResponse { id, info_hash: info_hash.as_string(), - name: info.name.as_ref().map(|b| b.to_string()), + name: info.and_then(|i| i.name.as_ref().map(|b| b.to_string())), files: Some(files), output_folder, stats: None, diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 69d3e59..e4bc289 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -262,7 +262,10 @@ impl HttpApi { fn torrent_playlist_items(handle: &ManagedTorrent) -> Result> { let mut playlist_items = handle - .shared() + .metadata + .load() + .as_ref() + .context("torrent metadata not resolved")? .info .iter_file_details()? .enumerate() @@ -340,10 +343,9 @@ impl HttpApi { .context("timeout")??; let (info, content) = match added { - crate::AddTorrentResponse::AlreadyManaged(_, handle) => ( - handle.shared().info.clone(), - handle.shared().torrent_bytes.clone(), - ), + crate::AddTorrentResponse::AlreadyManaged(_, handle) => { + handle.with_metadata(|r| (r.info.clone(), r.torrent_bytes.clone()))? + } crate::AddTorrentResponse::ListOnly(ListOnlyResponse { info, torrent_bytes, diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index e13b9f1..0b689dc 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -82,7 +82,8 @@ pub use session::{ }; pub use spawn_utils::spawn as librqbit_spawn; pub use torrent_state::{ - ManagedTorrent, ManagedTorrentShared, ManagedTorrentState, TorrentStats, TorrentStatsState, + ManagedTorrent, ManagedTorrentShared, ManagedTorrentState, TorrentMetadata, TorrentStats, + TorrentStatsState, }; pub use type_aliases::FileInfos; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 019f39e..8999820 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -12,7 +12,6 @@ use crate::{ api::TorrentIdOrHash, bitv_factory::{BitVFactory, NonPersistentBitVFactory}, dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, - file_info::FileInfo, limits::{Limits, LimitsConfig}, merge_streams::merge_streams, peer_connection::PeerConnectionOptions, @@ -26,12 +25,13 @@ use crate::{ stream_connect::{SocksProxyConfig, StreamConnector}, torrent_state::{ initializing::TorrentStateInitializing, ManagedTorrentHandle, ManagedTorrentLocked, - ManagedTorrentOptions, ManagedTorrentState, TorrentStateLive, + ManagedTorrentOptions, ManagedTorrentState, TorrentMetadata, TorrentStateLive, }, type_aliases::{DiskWorkQueueSender, PeerStream}, - ManagedTorrent, ManagedTorrentShared, + FileInfos, ManagedTorrent, ManagedTorrentShared, }; use anyhow::{bail, Context}; +use arc_swap::ArcSwapOption; use bencode::bencode_serialize_to_writer; use buffers::{ByteBuf, ByteBufOwned, ByteBufT}; use bytes::Bytes; @@ -46,7 +46,6 @@ use itertools::Itertools; use librqbit_core::{ constants::CHUNK_SIZE, directories::get_configuration_directory, - lengths::Lengths, magnet::Magnet, peer_id::generate_peer_id, spawn_utils::spawn_with_cancel, @@ -1130,7 +1129,7 @@ impl Session { .fetch_add(1, std::sync::atomic::Ordering::Relaxed) }; - let managed_torrent = { + let (managed_torrent, metadata) = { let mut g = self.db.write(); if let Some((id, handle)) = g.torrents.iter().find_map(|(eid, t)| { if t.info_hash() == info_hash || *eid == id { @@ -1142,34 +1141,16 @@ impl Session { return Ok(AddTorrentResponse::AlreadyManaged(id, handle)); } - let lengths = Lengths::from_torrent(&info)?; - let file_infos = info - .iter_file_details_ext(&lengths)? - .map(|fd| { - Ok::<_, anyhow::Error>(FileInfo { - relative_filename: fd.details.filename.to_pathbuf()?, - offset_in_torrent: fd.offset, - piece_range: fd.pieces, - len: fd.details.len, - attrs: fd.details.attrs(), - }) - }) - .collect::>>()?; - let span = error_span!(parent: self.rs(), "torrent", id); let peer_opts = self.merge_peer_opts(opts.peer_opts); + let metadata = Arc::new(TorrentMetadata::new(info, torrent_bytes, info_bytes)?); let minfo = Arc::new(ManagedTorrentShared { id, span, - file_infos, - info, - torrent_bytes, - info_bytes, info_hash, trackers: trackers.into_iter().collect(), spawner: self.spawner, peer_id: self.peer_id, - lengths, storage_factory, options: ManagedTorrentOptions { force_tracker_interval: opts.force_tracker_interval, @@ -1189,8 +1170,9 @@ impl Session { let initializing = Arc::new(TorrentStateInitializing::new( minfo.clone(), + metadata.clone(), only_files.clone(), - minfo.storage_factory.create_and_init(&minfo)?, + minfo.storage_factory.create_and_init(&minfo, &metadata)?, false, )); let handle = Arc::new(ManagedTorrent { @@ -1201,10 +1183,11 @@ impl Session { }), state_change_notify: Notify::new(), shared: minfo, + metadata: ArcSwapOption::new(Some(metadata.clone())), }); g.add_torrent(handle.clone(), id); - handle + (handle, metadata) }; if let Some(p) = self.persistence.as_ref() { @@ -1233,7 +1216,7 @@ impl Session { .start(peer_rx, opts.paused) .context("error starting torrent")?; - if let Some(name) = managed_torrent.shared().info.name.as_ref() { + if let Some(name) = metadata.info.name.as_ref() { info!(?name, "added torrent"); } @@ -1281,6 +1264,8 @@ impl Session { debug!("error pausing torrent before deletion: {e:#}") } + let metadata = removed.metadata.load_full().expect("TODO"); + let storage = removed .with_state_mut(|s| match s.take() { ManagedTorrentState::Initializing(p) => p.files.take().ok(), @@ -1297,7 +1282,12 @@ impl Session { _ => None, }) .map(Ok) - .unwrap_or_else(|| removed.shared.storage_factory.create(removed.shared())); + .unwrap_or_else(|| { + removed + .shared + .storage_factory + .create(removed.shared(), &metadata) + }); if let Some(p) = self.persistence.as_ref() { if let Err(e) = p.delete(id).await { @@ -1311,7 +1301,7 @@ impl Session { (Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"), (Ok(storage), true) => { debug!("will delete files"); - remove_files_and_dirs(removed.shared(), &storage); + remove_files_and_dirs(&metadata.file_infos, &storage); if removed.shared().options.output_folder != self.output_folder { if let Err(e) = storage.remove_directory_if_empty(Path::new("")) { warn!( @@ -1423,9 +1413,9 @@ impl Session { } } -fn remove_files_and_dirs(info: &ManagedTorrentShared, files: &dyn TorrentStorage) { +fn remove_files_and_dirs(infos: &FileInfos, files: &dyn TorrentStorage) { let mut all_dirs = HashSet::new(); - for (id, fi) in info.file_infos.iter().enumerate() { + for (id, fi) in infos.iter().enumerate() { let mut fname = &*fi.relative_filename; if let Err(e) = files.remove_file(id, fname) { warn!(?fi.relative_filename, error=?e, "could not delete file"); diff --git a/crates/librqbit/src/session_persistence/json.rs b/crates/librqbit/src/session_persistence/json.rs index cda4e69..1314ebb 100644 --- a/crates/librqbit/src/session_persistence/json.rs +++ b/crates/librqbit/src/session_persistence/json.rs @@ -150,7 +150,14 @@ impl JsonSessionPersistenceStore { output_folder: torrent.shared().options.output_folder.clone(), }; - if write_torrent_file && !torrent.shared().torrent_bytes.is_empty() { + let torrent_bytes = torrent + .metadata + .load() + .as_ref() + .map(|i| i.torrent_bytes.clone()) + .unwrap_or_default(); + + if write_torrent_file && !torrent_bytes.is_empty() { let torrent_bytes_file = self.torrent_bytes_filename(&torrent.info_hash()); match tokio::fs::OpenOptions::new() .create(true) @@ -160,7 +167,7 @@ impl JsonSessionPersistenceStore { .await { Ok(mut f) => { - if let Err(e) = f.write_all(&torrent.shared().torrent_bytes).await { + if let Err(e) = f.write_all(&torrent_bytes).await { warn!(error=?e, file=?torrent_bytes_file, "error writing torrent bytes") } } diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index 62d31a1..b70bbb1 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -96,14 +96,19 @@ impl SessionPersistenceStore for PostgresSessionStorage { } async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> { - let torrent_bytes: &[u8] = &torrent.shared().torrent_bytes; + let torrent_bytes = torrent + .metadata + .load() + .as_ref() + .map(|i| i.torrent_bytes.clone()) + .unwrap_or_default(); let q = "INSERT INTO torrents (id, info_hash, torrent_bytes, trackers, output_folder, only_files, is_paused) VALUES($1, $2, $3, $4, $5, $6, $7) ON CONFLICT(id) DO NOTHING"; sqlx::query(q) .bind::(id.try_into()?) .bind(&torrent.info_hash().0[..]) - .bind(torrent_bytes) + .bind(torrent_bytes.as_ref()) .bind( torrent .shared() diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index 2f2cc14..d4051a7 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -6,7 +6,10 @@ use std::{ use anyhow::Context; use tracing::warn; -use crate::{storage::StorageFactoryExt, torrent_state::ManagedTorrentShared}; +use crate::{ + storage::StorageFactoryExt, + torrent_state::{ManagedTorrentShared, TorrentMetadata}, +}; use crate::storage::{StorageFactory, TorrentStorage}; @@ -18,9 +21,13 @@ pub struct FilesystemStorageFactory {} impl StorageFactory for FilesystemStorageFactory { type Storage = FilesystemStorage; - fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result { + fn create( + &self, + shared: &ManagedTorrentShared, + _metadata: &TorrentMetadata, + ) -> anyhow::Result { Ok(FilesystemStorage { - output_folder: meta.options.output_folder.clone(), + output_folder: shared.options.output_folder.clone(), opened_files: Default::default(), }) } @@ -149,9 +156,13 @@ impl TorrentStorage for FilesystemStorage { } } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { let mut files = Vec::::new(); - for file_details in meta.file_infos.iter() { + for file_details in metadata.file_infos.iter() { let mut full_path = self.output_folder.clone(); let relative_path = &file_details.relative_filename; full_path.push(relative_path); @@ -161,7 +172,7 @@ impl TorrentStorage for FilesystemStorage { continue; }; std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; - let f = if meta.options.allow_overwrite { + let f = if shared.options.allow_overwrite { OpenOptions::new() .create(true) .truncate(false) diff --git a/crates/librqbit/src/storage/filesystem/mmap.rs b/crates/librqbit/src/storage/filesystem/mmap.rs index 5b92657..a1824c6 100644 --- a/crates/librqbit/src/storage/filesystem/mmap.rs +++ b/crates/librqbit/src/storage/filesystem/mmap.rs @@ -4,7 +4,7 @@ use anyhow::Context; use memmap2::{MmapMut, MmapOptions}; use parking_lot::RwLock; -use crate::torrent_state::ManagedTorrentShared; +use crate::torrent_state::{ManagedTorrentShared, TorrentMetadata}; use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; @@ -22,8 +22,12 @@ fn dummy_mmap() -> anyhow::Result { impl StorageFactory for MmapFilesystemStorageFactory { type Storage = MmapFilesystemStorage; - fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result { - let fs_storage = FilesystemStorageFactory::default().create(meta)?; + fn create( + &self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { + let fs_storage = FilesystemStorageFactory::default().create(shared, metadata)?; Ok(MmapFilesystemStorage { opened_mmaps: Vec::new(), @@ -97,13 +101,17 @@ impl TorrentStorage for MmapFilesystemStorage { })) } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { - self.fs.init(meta)?; + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + self.fs.init(shared, metadata)?; let mut mmaps = Vec::new(); for (idx, file) in self.fs.opened_files.iter().enumerate() { let fg = file.file.write(); let fg = fg.as_ref().context("file is None")?; - fg.set_len(meta.file_infos[idx].len) + fg.set_len(metadata.file_infos[idx].len) .context("mmap storage: error setting length")?; let mmap = unsafe { MmapOptions::new().map_mut(fg) }.context("error mapping file")?; mmaps.push(RwLock::new(mmap)); diff --git a/crates/librqbit/src/storage/middleware/slow.rs b/crates/librqbit/src/storage/middleware/slow.rs index 5dbdf8a..0629b00 100644 --- a/crates/librqbit/src/storage/middleware/slow.rs +++ b/crates/librqbit/src/storage/middleware/slow.rs @@ -16,6 +16,7 @@ use parking_lot::Mutex; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, + torrent_state::TorrentMetadata, ManagedTorrentShared, }; @@ -35,9 +36,13 @@ impl SlowStorageFactory { impl StorageFactory for SlowStorageFactory { type Storage = SlowStorage; - fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { + fn create( + &self, + shared: &crate::ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { Ok(SlowStorage { - underlying: self.underlying_factory.create(info)?, + underlying: self.underlying_factory.create(shared, metadata)?, pwrite_all_bufread: Mutex::new(Box::new( BufReader::new( File::open( @@ -116,7 +121,11 @@ impl TorrentStorage for SlowStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { - self.underlying.init(meta) + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + self.underlying.init(shared, metadata) } } diff --git a/crates/librqbit/src/storage/middleware/timing.rs b/crates/librqbit/src/storage/middleware/timing.rs index 42c3382..379067b 100644 --- a/crates/librqbit/src/storage/middleware/timing.rs +++ b/crates/librqbit/src/storage/middleware/timing.rs @@ -4,6 +4,7 @@ A storage middleware that logs the time underlying storage operations took. use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, + torrent_state::TorrentMetadata, ManagedTorrentShared, }; @@ -25,10 +26,14 @@ impl TimingStorageFactory { impl StorageFactory for TimingStorageFactory { type Storage = TimingStorage; - fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { + fn create( + &self, + shared: &crate::ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { Ok(TimingStorage { name: self.name.clone(), - underlying: self.underlying_factory.create(info)?, + underlying: self.underlying_factory.create(shared, metadata)?, }) } @@ -104,7 +109,11 @@ impl TorrentStorage for TimingStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { - self.underlying.init(meta) + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + self.underlying.init(shared, metadata) } } diff --git a/crates/librqbit/src/storage/middleware/write_through_cache.rs b/crates/librqbit/src/storage/middleware/write_through_cache.rs index 59e8cb0..48d7070 100644 --- a/crates/librqbit/src/storage/middleware/write_through_cache.rs +++ b/crates/librqbit/src/storage/middleware/write_through_cache.rs @@ -14,6 +14,7 @@ use parking_lot::RwLock; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, + torrent_state::TorrentMetadata, FileInfos, ManagedTorrentShared, }; @@ -35,18 +36,22 @@ impl WriteThroughCacheStorageFactory { impl StorageFactory for WriteThroughCacheStorageFactory { type Storage = WriteThroughCacheStorage; - fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { + fn create( + &self, + shared: &crate::ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { let pieces = self .max_cache_bytes - .div_ceil(info.lengths.default_piece_length() as u64) + .div_ceil(metadata.lengths.default_piece_length() as u64) .try_into()?; let pieces = NonZeroUsize::new(pieces).context("bug: pieces == 0")?; let lru = RwLock::new(LruCache::new(pieces)); Ok(WriteThroughCacheStorage { lru, - underlying: self.underlying.create(info)?, - lengths: info.lengths, - file_infos: info.file_infos.clone(), + underlying: self.underlying.create(shared, metadata)?, + lengths: metadata.lengths, + file_infos: metadata.file_infos.clone(), }) } @@ -121,7 +126,11 @@ impl TorrentStorage for WriteThroughCacheStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { - self.underlying.init(meta) + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + self.underlying.init(shared, metadata) } } diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index efefa6e..e30cbbd 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -13,15 +13,23 @@ use std::{ use librqbit_core::lengths::ValidPieceIndex; -use crate::torrent_state::ManagedTorrentShared; +use crate::torrent_state::{ManagedTorrentShared, TorrentMetadata}; pub trait StorageFactory: Send + Sync + Any { type Storage: TorrentStorage; - fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result; - fn create_and_init(&self, info: &ManagedTorrentShared) -> anyhow::Result { - let mut storage = self.create(info)?; - storage.init(info)?; + fn create( + &self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result; + fn create_and_init( + &self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { + let mut storage = self.create(shared, metadata)?; + storage.init(shared, metadata)?; Ok(storage) } @@ -46,8 +54,12 @@ impl StorageFactoryExt for SF { impl StorageFactory for Wrapper { type Storage = Box; - fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result { - let s = self.sf.create(info)?; + fn create( + &self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { + let s = self.sf.create(shared, metadata)?; Ok(Box::new(s)) } @@ -67,8 +79,12 @@ impl StorageFactoryExt for SF { impl StorageFactory for Box { type Storage = U::Storage; - fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result { - (**self).create(info) + fn create( + &self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { + (**self).create(shared, metadata) } fn clone_box(&self) -> BoxStorageFactory { @@ -78,7 +94,11 @@ impl StorageFactory for Box { pub trait TorrentStorage: Send + Sync { // Create/open files etc. - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()>; + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()>; /// Given a file_id (which you can get more info from in init_storage() through torrent info) /// read buf.len() bytes into buf at offset. @@ -132,8 +152,12 @@ impl TorrentStorage for Box { (**self).remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { - (**self).init(meta) + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + (**self).init(shared, metadata) } fn on_piece_completed(&self, piece_id: ValidPieceIndex) -> anyhow::Result<()> { diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index b81ccb7..9b96a88 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -24,11 +24,12 @@ use crate::{ FileInfos, }; -use super::{paused::TorrentStatePaused, ManagedTorrentShared}; +use super::{paused::TorrentStatePaused, ManagedTorrentShared, TorrentMetadata}; pub struct TorrentStateInitializing { pub(crate) files: FileStorage, pub(crate) shared: Arc, + pub(crate) metadata: Arc, pub(crate) only_files: Option>, pub(crate) checked_bytes: AtomicU64, previously_errored: bool, @@ -54,13 +55,15 @@ fn compute_selected_pieces( impl TorrentStateInitializing { pub fn new( - meta: Arc, + shared: Arc, + metadata: Arc, only_files: Option>, files: FileStorage, previously_errored: bool, ) -> Self { Self { - shared: meta, + shared, + metadata, only_files, files, checked_bytes: AtomicU64::new(0), @@ -80,7 +83,7 @@ impl TorrentStateInitializing { ) -> Option> { let hp = have_pieces?; let actual = hp.as_bytes().len(); - let expected = self.shared.lengths.piece_bitfield_bytes(); + let expected = self.metadata.lengths.piece_bitfield_bytes(); if actual != expected { warn!( actual, @@ -92,21 +95,21 @@ impl TorrentStateInitializing { let is_broken = self.shared.spawner.spawn_block_in_place(|| { let fo = crate::file_ops::FileOps::new( - &self.shared.info, + &self.metadata.info, &self.files, - &self.shared.file_infos, - &self.shared.lengths, + &self.metadata.file_infos, + &self.metadata.lengths, ); use rand::seq::SliceRandom; let mut to_validate = BF::from_boxed_slice( - vec![0u8; self.shared.lengths.piece_bitfield_bytes()].into_boxed_slice(), + vec![0u8; self.metadata.lengths.piece_bitfield_bytes()].into_boxed_slice(), ); let mut queue = hp.as_slice().to_owned(); // Validate at least one piece from each file, if we claim we have it. - for fi in self.shared.file_infos.iter() { + for fi in self.metadata.file_infos.iter() { let prange = fi.piece_range_usize(); let offset = prange.start; for piece_id in hp @@ -136,7 +139,7 @@ impl TorrentStateInitializing { for (id, piece_id) in to_validate .iter_ones() .filter_map(|id| { - self.shared + self.metadata .lengths .validate_piece_index(id.try_into().ok()?) }) @@ -147,10 +150,10 @@ impl TorrentStateInitializing { } #[allow(clippy::cast_possible_truncation)] - let progress = (self.shared.lengths.total_length() as f64 + let progress = (self.metadata.lengths.total_length() as f64 / to_validate_count as f64 * (id + 1) as f64) as u64; - let progress = progress.min(self.shared.lengths.total_length()); + let progress = progress.min(self.metadata.lengths.total_length()); self.checked_bytes.store(progress, Ordering::Relaxed); } @@ -198,10 +201,10 @@ impl TorrentStateInitializing { info!("Doing initial checksum validation, this might take a while..."); let have_pieces = self.shared.spawner.spawn_block_in_place(|| { FileOps::new( - &self.shared.info, + &self.metadata.info, &self.files, - &self.shared.file_infos, - &self.shared.lengths, + &self.metadata.file_infos, + &self.metadata.lengths, ) .initial_check(&self.checked_bytes) })?; @@ -213,16 +216,16 @@ impl TorrentStateInitializing { }; let selected_pieces = compute_selected_pieces( - &self.shared.lengths, + &self.metadata.lengths, self.only_files.as_deref(), - &self.shared.file_infos, + &self.metadata.file_infos, ); let chunk_tracker = ChunkTracker::new( have_pieces.into_dyn(), selected_pieces, - self.shared.lengths, - &self.shared.file_infos, + self.metadata.lengths, + &self.metadata.file_infos, ) .context("error creating chunk tracker")?; @@ -237,7 +240,7 @@ impl TorrentStateInitializing { // Ensure file lenghts are correct, and reopen read-only. self.shared.spawner.spawn_block_in_place(|| { - for (idx, fi) in self.shared.file_infos.iter().enumerate() { + for (idx, fi) in self.metadata.file_infos.iter().enumerate() { if self .only_files .as_ref() @@ -268,6 +271,7 @@ impl TorrentStateInitializing { let paused = TorrentStatePaused { shared: self.shared.clone(), + metadata: self.metadata.clone(), files: self.files.take()?, chunk_tracker, streams: Arc::new(Default::default()), diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 159bb60..0db812a 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -109,7 +109,7 @@ use super::{ paused::TorrentStatePaused, streaming::TorrentStreams, utils::{timeit, TimedExistence}, - ManagedTorrentShared, + ManagedTorrentShared, TorrentMetadata, }; #[derive(Debug)] @@ -175,7 +175,8 @@ const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024; pub struct TorrentStateLive { peers: PeerStates, - torrent: Arc, + shared: Arc, + metadata: Arc, locked: RwLock, pub(crate) files: FileStorage, @@ -231,11 +232,11 @@ impl TorrentStateLive { // TODO: make it configurable let file_priorities = { - let mut pri = (0..paused.shared.file_infos.len()).collect::>(); + let mut pri = (0..paused.metadata.file_infos.len()).collect::>(); // sort by filename, cause many torrents have random sort order. pri.sort_unstable_by_key(|id| { paused - .shared + .metadata .file_infos .get(*id) .map(|fi| fi.relative_filename.as_path()) @@ -252,7 +253,8 @@ impl TorrentStateLive { let ratelimits = Limits::new(paused.shared.options.ratelimits); let state = Arc::new(TorrentStateLive { - torrent: paused.shared.clone(), + shared: paused.shared.clone(), + metadata: paused.metadata.clone(), peers: PeerStates { session_stats: session_stats.clone(), stats: Default::default(), @@ -291,7 +293,7 @@ impl TorrentStateLive { }); state.spawn( - error_span!(parent: state.torrent.span.clone(), "speed_estimator_updater"), + error_span!(parent: state.shared.span.clone(), "speed_estimator_updater"), { let state = Arc::downgrade(&state); async move { @@ -317,12 +319,12 @@ impl TorrentStateLive { ); state.spawn( - error_span!(parent: state.torrent.span.clone(), "peer_adder"), + error_span!(parent: state.shared.span.clone(), "peer_adder"), state.clone().task_peer_adder(peer_queue_rx), ); state.spawn( - error_span!(parent: state.torrent.span.clone(), "upload_scheduler"), + error_span!(parent: state.shared.span.clone(), "upload_scheduler"), state.clone().task_upload_scheduler(ratelimit_upload_rx), ); Ok(state) @@ -346,7 +348,7 @@ impl TorrentStateLive { } fn disk_work_tx(&self) -> Option<&DiskWorkQueueSender> { - self.torrent.options.disk_write_queue.as_ref() + self.shared.options.disk_write_queue.as_ref() } pub(crate) fn add_incoming_peer( @@ -394,7 +396,7 @@ impl TorrentStateLive { self.spawn( error_span!( - parent: self.torrent.span.clone(), + parent: self.shared.span.clone(), "manage_incoming_peer", addr = %checked_peer.addr ), @@ -416,7 +418,7 @@ impl TorrentStateLive { self.ratelimits .prepare_for_upload(NonZeroU32::new(ci.size).unwrap()) .await?; - if let Some(session) = self.torrent.session.upgrade() { + if let Some(session) = self.shared.session.upgrade() { session .ratelimits .prepare_for_upload(NonZeroU32::new(ci.size).unwrap()) @@ -449,18 +451,18 @@ impl TorrentStateLive { first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { - connect_timeout: self.torrent.options.peer_connect_timeout, - read_write_timeout: self.torrent.options.peer_read_write_timeout, + connect_timeout: self.shared.options.peer_connect_timeout, + read_write_timeout: self.shared.options.peer_read_write_timeout, ..Default::default() }; let peer_connection = PeerConnection::new( checked_peer.addr, - self.torrent.info_hash, - self.torrent.peer_id, + self.shared.info_hash, + self.shared.peer_id, &handler, Some(options), - self.torrent.spawner, - self.torrent.connector.clone(), + self.shared.spawner, + self.shared.connector.clone(), ); let requester = handler.task_peer_chunk_requester(); @@ -514,18 +516,18 @@ impl TorrentStateLive { first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { - connect_timeout: state.torrent.options.peer_connect_timeout, - read_write_timeout: state.torrent.options.peer_read_write_timeout, + connect_timeout: state.shared.options.peer_connect_timeout, + read_write_timeout: state.shared.options.peer_read_write_timeout, ..Default::default() }; let peer_connection = PeerConnection::new( addr, - state.torrent.info_hash, - state.torrent.peer_id, + state.shared.info_hash, + state.shared.peer_id, &handler, Some(options), - state.torrent.spawner, - state.torrent.connector.clone(), + state.shared.spawner, + state.shared.connector.clone(), ); let requester = aframe!(handler .task_peer_chunk_requester() @@ -564,7 +566,7 @@ impl TorrentStateLive { let state = self; loop { let addr = peer_queue_rx.recv().await.context("torrent closed")?; - if state.torrent.options.disable_upload() && state.is_finished_and_no_active_streams() { + if state.shared.options.disable_upload() && state.is_finished_and_no_active_streams() { debug!("ignoring peer {} as we are finished", addr); state.peers.mark_peer_not_needed(addr); continue; @@ -572,30 +574,30 @@ impl TorrentStateLive { let permit = state.peer_semaphore.clone().acquire_owned().await?; state.spawn( - error_span!(parent: state.torrent.span.clone(), "manage_peer", peer = addr.to_string()), + error_span!(parent: state.shared.span.clone(), "manage_peer", peer = addr.to_string()), aframe!(state.clone().task_manage_outgoing_peer(addr, permit)), ); } } pub fn torrent(&self) -> &ManagedTorrentShared { - &self.torrent + &self.shared } pub fn info(&self) -> &TorrentMetaV1Info { - &self.torrent.info + &self.metadata.info } pub fn info_hash(&self) -> Id20 { - self.torrent.info_hash + self.shared.info_hash } pub fn peer_id(&self) -> Id20 { - self.torrent.peer_id + self.shared.peer_id } pub(crate) fn file_ops(&self) -> FileOps<'_> { FileOps::new( - &self.torrent.info, + &self.metadata.info, &*self.files, - &self.torrent().file_infos, + &self.metadata.file_infos, &self.lengths, ) } @@ -703,7 +705,8 @@ impl TorrentStateLive { // g.chunks; Ok(TorrentStatePaused { - shared: self.torrent.clone(), + shared: self.shared.clone(), + metadata: self.metadata.clone(), files: self.files.take()?, chunk_tracker, streams: self.streams.clone(), @@ -727,7 +730,7 @@ impl TorrentStateLive { let mut g = self.lock_write("update_only_files"); let ct = g.get_chunks_mut()?; let hns = - ct.update_only_files(self.torrent().file_infos.iter().map(|f| f.len), only_files)?; + ct.update_only_files(self.metadata.file_infos.iter().map(|f| f.len), only_files)?; if !hns.finished() { self.reconnect_all_not_needed_peers(); } @@ -746,7 +749,7 @@ impl TorrentStateLive { }; self.streams .streamed_file_ids() - .any(|file_id| !chunks.is_file_finished(&self.torrent.file_infos[file_id])) + .any(|file_id| !chunks.is_file_finished(&self.metadata.file_infos[file_id])) } // We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite @@ -768,7 +771,7 @@ impl TorrentStateLive { // if we have all the pieces of the file, reopen it read only for (idx, file_info) in self - .torrent() + .metadata .file_infos .iter() .enumerate() @@ -779,9 +782,9 @@ impl TorrentStateLive { } self.streams - .wake_streams_on_piece_completed(id, &self.torrent.lengths); + .wake_streams_on_piece_completed(id, &self.metadata.lengths); - locked.unflushed_bitv_bytes += self.torrent.lengths.piece_length(id) as u64; + locked.unflushed_bitv_bytes += self.metadata.lengths.piece_length(id) as u64; if locked.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES { locked.try_flush_bitv() } @@ -1021,7 +1024,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { if let Some(_peer_pex_msg_id) = hs.ut_pex() { self.state.clone().spawn( error_span!( - parent: self.state.torrent.span.clone(), + parent: self.state.shared.span.clone(), "sending_pex_to_peer", peer = self.addr.to_string() ), @@ -1054,7 +1057,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { } fn should_transmit_have(&self, id: ValidPieceIndex) -> bool { - if self.state.torrent.options.disable_upload() { + if self.state.shared.options.disable_upload() { return false; } let have = self @@ -1071,7 +1074,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { &self, handshake: &mut ExtendedHandshake, ) -> anyhow::Result<()> { - let info_bytes = &self.state.torrent().info_bytes; + let info_bytes = &self.state.metadata.info_bytes; if !info_bytes.is_empty() { if let Ok(len) = info_bytes.len().try_into() { handshake.metadata_size = Some(len); @@ -1159,7 +1162,7 @@ impl PeerHandler { if let Some(dur) = backoff { self.state.clone().spawn( error_span!( - parent: self.state.torrent.span.clone(), + parent: self.state.shared.span.clone(), "wait_for_peer", peer = handle.to_string(), duration = format!("{dur:?}") @@ -1218,7 +1221,7 @@ impl PeerHandler { && !g.inflight_pieces.contains_key(pid) }); let natural_order_pieces = chunk_tracker - .iter_queued_pieces(&g.file_priorities, &self.state.torrent().file_infos); + .iter_queued_pieces(&g.file_priorities, &self.state.metadata.file_infos); for n in priority_streamed_pieces.chain(natural_order_pieces) { if bf.get(n.get() as usize).map(|v| *v) == Some(true) { n_opt = Some(n); @@ -1787,7 +1790,7 @@ impl PeerHandler { dtx.send(Box::new(work)).await?; } else { self.state - .torrent + .shared .spawner .spawn_block_in_place(|| { write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info) @@ -1799,7 +1802,7 @@ impl PeerHandler { } fn send_metadata_piece(&self, piece_id: u32) -> anyhow::Result<()> { - let data = &self.state.torrent().info_bytes; + let data = &self.state.metadata.info_bytes; let metadata_size = data.len(); if metadata_size == 0 { anyhow::bail!("peer requested for info metadata but we don't have it") diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 32246e5..15ec11f 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -15,6 +15,7 @@ use std::time::Duration; use anyhow::bail; use anyhow::Context; +use arc_swap::ArcSwapOption; use buffers::ByteBufOwned; use bytes::Bytes; use futures::future::BoxFuture; @@ -37,6 +38,7 @@ use tracing::trace; use tracing::warn; use crate::chunk_tracker::ChunkTracker; +use crate::file_info::FileInfo; use crate::limits::LimitsConfig; use crate::session::TorrentId; use crate::spawn_utils::BlockingSpawner; @@ -123,6 +125,44 @@ impl ManagedTorrentOptions { } } +// Torrent bencodee "info" + some precomputed fields based on it for frequent access. +pub struct TorrentMetadata { + pub info: TorrentMetaV1Info, + pub torrent_bytes: Bytes, + pub info_bytes: Bytes, + pub lengths: Lengths, + pub file_infos: FileInfos, +} + +impl TorrentMetadata { + pub(crate) fn new( + info: TorrentMetaV1Info, + torrent_bytes: Bytes, + info_bytes: Bytes, + ) -> anyhow::Result { + let lengths = Lengths::from_torrent(&info)?; + let file_infos = info + .iter_file_details_ext(&lengths)? + .map(|fd| { + Ok::<_, anyhow::Error>(FileInfo { + relative_filename: fd.details.filename.to_pathbuf()?, + offset_in_torrent: fd.offset, + piece_range: fd.pieces, + len: fd.details.len, + attrs: fd.details.attrs(), + }) + }) + .collect::>>()?; + Ok(Self { + info, + torrent_bytes, + info_bytes, + lengths, + file_infos, + }) + } +} + /// Common information about torrent shared among all possible states. /// // The reason it's not inlined into ManagedTorrent is to break the Arc cycle: @@ -130,15 +170,10 @@ impl ManagedTorrentOptions { // of stuff, but it shouldn't access the state. pub struct ManagedTorrentShared { pub id: TorrentId, - pub info: TorrentMetaV1Info, - pub torrent_bytes: Bytes, - pub info_bytes: Bytes, pub info_hash: Id20, pub(crate) spawner: BlockingSpawner, pub trackers: HashSet, pub peer_id: Id20, - pub lengths: Lengths, - pub file_infos: FileInfos, pub span: tracing::Span, pub(crate) options: ManagedTorrentOptions, pub(crate) connector: Arc, @@ -148,6 +183,7 @@ pub struct ManagedTorrentShared { pub struct ManagedTorrent { pub shared: Arc, + pub metadata: ArcSwapOption, pub(crate) state_change_notify: Notify, pub(crate) locked: RwLock, } @@ -161,8 +197,13 @@ impl ManagedTorrent { &self.shared } - pub fn get_total_bytes(&self) -> u64 { - self.shared.lengths.total_length() + pub fn with_metadata( + &self, + mut f: impl FnMut(&Arc) -> R, + ) -> anyhow::Result { + let r = self.metadata.load(); + let r = r.as_ref().context("torrent is not resolved")?; + Ok(f(r)) } pub fn info_hash(&self) -> Id20 { @@ -384,10 +425,14 @@ impl ManagedTorrent { Ok(()) } ManagedTorrentState::Error(_) => { + let metadata = self.metadata.load_full().expect("TODO"); let initializing = Arc::new(TorrentStateInitializing::new( self.shared.clone(), + metadata.clone(), g.only_files.clone(), - self.shared.storage_factory.create_and_init(self.shared())?, + self.shared + .storage_factory + .create_and_init(self.shared(), &metadata)?, true, )); g.state = ManagedTorrentState::Initializing(initializing.clone()); @@ -433,7 +478,12 @@ impl ManagedTorrent { pub fn stats(&self) -> TorrentStats { use stats::TorrentStatsState as S; let mut resp = TorrentStats { - total_bytes: self.shared().lengths.total_length(), + total_bytes: self + .metadata + .load() + .as_ref() + .map(|r| r.lengths.total_length()) + .unwrap_or_default(), file_progress: Vec::new(), state: S::Error, error: None, @@ -534,7 +584,9 @@ impl ManagedTorrent { // Returns true if needed to unpause torrent. // This is just implementation detail - it's easier to pause/unpause than to tinker with internals. pub(crate) fn update_only_files(&self, only_files: &HashSet) -> anyhow::Result<()> { - let file_count = self.shared().info.iter_file_lengths()?.count(); + let metadata = self.metadata.load(); + let metadata = metadata.as_ref().context("torrent is not resolved")?; + let file_count = metadata.file_infos.len(); for f in only_files.iter().copied() { if f >= file_count { anyhow::bail!("only_files contains invalid value {f}") diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index 9c78a26..52e8d6a 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -5,10 +5,11 @@ use crate::{ type_aliases::FileStorage, }; -use super::{streaming::TorrentStreams, ManagedTorrentShared}; +use super::{streaming::TorrentStreams, ManagedTorrentShared, TorrentMetadata}; pub struct TorrentStatePaused { pub(crate) shared: Arc, + pub(crate) metadata: Arc, pub(crate) files: FileStorage, pub(crate) chunk_tracker: ChunkTracker, pub(crate) streams: Arc, @@ -17,7 +18,7 @@ pub struct TorrentStatePaused { impl TorrentStatePaused { pub(crate) fn update_only_files(&mut self, only_files: &HashSet) -> anyhow::Result<()> { self.chunk_tracker - .update_only_files(self.shared.info.iter_file_lengths()?, only_files)?; + .update_only_files(self.metadata.info.iter_file_lengths()?, only_files)?; Ok(()) } diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 73fb9ea..3e7d8cb 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -19,7 +19,7 @@ use crate::{ file_info::FileInfo, spawn_utils::BlockingSpawner, storage::TorrentStorage, ManagedTorrent, }; -use super::ManagedTorrentHandle; +use super::{ManagedTorrentHandle, TorrentMetadata}; type StreamId = usize; @@ -130,6 +130,7 @@ impl TorrentStreams { pub struct FileStream { torrent: ManagedTorrentHandle, + metadata: Arc, streams: Arc, stream_id: usize, file_id: usize, @@ -178,8 +179,7 @@ impl AsyncRead for FileStream { } let current = poll_try_io!(self - .torrent - .shared() + .metadata .lengths .compute_current_piece(self.position, self.file_torrent_abs_offset) .context("invalid position")); @@ -216,11 +216,14 @@ impl AsyncRead for FileStream { ); poll_try_io!(poll_try_io!(self.spawner.spawn_block_in_place(|| { - self.torrent - .with_storage_and_file(self.file_id, |files, _fi| { + self.torrent.with_storage_and_file( + self.file_id, + |files, _fi| { files.pread_exact(self.file_id, self.position, buf)?; Ok::<_, anyhow::Error>(()) - }) + }, + &self.metadata, + ) }))); self.as_mut().advance(bytes_to_read as u64); @@ -269,7 +272,12 @@ impl Drop for FileStream { } impl ManagedTorrent { - fn with_storage_and_file(&self, file_id: usize, f: F) -> anyhow::Result + fn with_storage_and_file( + &self, + file_id: usize, + f: F, + metadata: &TorrentMetadata, + ) -> anyhow::Result where F: FnOnce(&dyn TorrentStorage, &FileInfo) -> R, { @@ -279,11 +287,7 @@ impl ManagedTorrent { crate::ManagedTorrentState::Live(l) => &*l.files, s => anyhow::bail!("with_storage_and_file: invalid state: {}", s.name()), }; - let fi = self - .shared() - .file_infos - .get(file_id) - .context("invalid file")?; + let fi = metadata.file_infos.get(file_id).context("invalid file")?; Ok(f(files, fi)) }) } @@ -310,14 +314,26 @@ impl ManagedTorrent { } fn is_file_finished(&self, file_id: usize) -> bool { + let metadata = self.metadata.load(); + let metadata = match metadata.as_ref() { + Some(r) => r, + None => return false, + }; // TODO: would be nice to remove locking - self.with_chunk_tracker(|ct| ct.is_file_finished(&self.shared.file_infos[file_id])) + self.with_chunk_tracker(|ct| ct.is_file_finished(&metadata.file_infos[file_id])) .unwrap_or(false) } pub fn stream(self: Arc, file_id: usize) -> anyhow::Result { - let (fd_len, fd_offset) = - self.with_storage_and_file(file_id, |_fd, fi| (fi.len, fi.offset_in_torrent))?; + let metadata = self + .metadata + .load_full() + .context("torrent metadata is not resolved")?; + let (fd_len, fd_offset) = self.with_storage_and_file( + file_id, + |_fd, fi| (fi.len, fi.offset_in_torrent), + &metadata, + )?; let streams = self.streams()?; let s = FileStream { stream_id: streams.next_id(), @@ -329,6 +345,7 @@ impl ManagedTorrent { file_torrent_abs_offset: fd_offset, torrent: self, spawner: BlockingSpawner::default(), + metadata, }; s.torrent.maybe_reconnect_needed_peers_for_file(file_id); streams.streams.insert( diff --git a/crates/librqbit/src/upnp_server_adapter.rs b/crates/librqbit/src/upnp_server_adapter.rs index ab60f37..1885a6e 100644 --- a/crates/librqbit/src/upnp_server_adapter.rs +++ b/crates/librqbit/src/upnp_server_adapter.rs @@ -6,7 +6,7 @@ use std::{ sync::Arc, }; -use crate::{session::TorrentId, ManagedTorrent, Session}; +use crate::{session::TorrentId, torrent_state::TorrentMetadata, ManagedTorrentShared, Session}; #[derive(Clone)] pub struct UpnpServerSessionAdapter { @@ -55,18 +55,18 @@ impl TorrentFileTreeNode { &self, id: usize, http_host: &str, - torrent: &ManagedTorrent, + torrent: &ManagedTorrentShared, + metadata: &TorrentMetadata, adapter: &UpnpServerSessionAdapter, ) -> ItemOrContainer { - let encoded_id = encode_id(id, torrent.id()); - let encoded_parent_id = self.parent_id.map(|p| encode_id(p, torrent.id())); + let encoded_id = encode_id(id, torrent.id); + let encoded_parent_id = self.parent_id.map(|p| encode_id(p, torrent.id)); match self.real_torrent_file_id { Some(fid) => { - let fi = &torrent.shared().file_infos[fid]; + let fi = &metadata.file_infos[fid]; let filename = &fi.relative_filename; // Torrent path joined with "/" - let last_url_bit = torrent - .shared() + let last_url_bit = metadata .info .iter_file_details() .ok() @@ -86,11 +86,7 @@ impl TorrentFileTreeNode { mime_type: mime_guess::from_path(filename).first(), url: format!( "http://{}:{}/torrents/{}/stream/{}/{}", - http_host, - adapter.port, - torrent.id(), - fid, - last_url_bit + http_host, adapter.port, torrent.id, fid, last_url_bit ), size: fi.len, }) @@ -216,10 +212,15 @@ impl UpnpServerSessionAdapter { .filter_map(|t| { let real_id = t.id(); let upnp_id = real_id + 1; + let metadata = t.metadata.load(); + let metadata = match metadata.as_ref() { + Some(r) => r, + None => return None, + }; - if is_single_file_at_root(&t.shared().info) { + if is_single_file_at_root(&metadata.info) { // Just add the file directly - let rf = &t.shared().file_infos[0].relative_filename; + let rf = &metadata.file_infos[0].relative_filename; let title = rf.file_name()?.to_str()?.to_owned(); Some( TorrentFileTreeNode { @@ -228,11 +229,16 @@ impl UpnpServerSessionAdapter { children: vec![], real_torrent_file_id: Some(0), } - .as_item_or_container(0, hostname, t, self), + .as_item_or_container( + 0, + hostname, + t.shared(), + metadata, + self, + ), ) } else { - let title = t - .shared() + let title = metadata .info .name .as_ref() @@ -288,7 +294,13 @@ impl UpnpServerSessionAdapter { } }; - let tree = match TorrentFileTree::build(torrent.id(), &torrent.shared().info) { + let t_metadata = torrent.metadata.load(); + let t_metadata = match t_metadata.as_ref() { + Some(r) => r, + None => return vec![], + }; + + let tree = match TorrentFileTree::build(torrent.id(), &t_metadata.info) { Ok(tree) => tree, Err(e) => { warn!(object_id, error=?e, "error building torrent file tree"); @@ -309,7 +321,13 @@ impl UpnpServerSessionAdapter { let mut result = Vec::new(); if node.real_torrent_file_id.is_some() || metadata { - result.push(node.as_item_or_container(node_id, http_hostname, &torrent, self)) + result.push(node.as_item_or_container( + node_id, + http_hostname, + torrent.shared(), + t_metadata, + self, + )) } else { for (child_node_id, child_node) in node .children @@ -319,7 +337,8 @@ impl UpnpServerSessionAdapter { result.push(child_node.as_item_or_container( child_node_id, http_hostname, - &torrent, + torrent.shared(), + t_metadata, self, )); } From 456a51d4dbff90e7aabd36c794790000a3152621 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 5 Dec 2024 23:37:13 +0000 Subject: [PATCH 07/19] Split up "add_torrent" method --- crates/librqbit/src/session.rs | 232 +++++++++++++++++---------------- 1 file changed, 117 insertions(+), 115 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 8999820..1a569f0 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -40,7 +40,7 @@ use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig}; use futures::{ future::BoxFuture, stream::{BoxStream, FuturesUnordered}, - FutureExt, Stream, TryFutureExt, + FutureExt, Stream, StreamExt, TryFutureExt, }; use itertools::Itertools; use librqbit_core::{ @@ -58,7 +58,6 @@ use tokio::{ net::{TcpListener, TcpStream}, sync::Notify, }; -use tokio_stream::StreamExt; use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span}; use tracker_comms::TrackerComms; @@ -474,12 +473,8 @@ pub(crate) struct CheckedIncomingConnection { struct InternalAddResult { info_hash: Id20, - info: TorrentMetaV1Info, - torrent_bytes: Bytes, - info_bytes: Bytes, + metadata: Option, trackers: Vec, - peer_rx: Option, - seen_peers: Vec, } impl Session { @@ -883,15 +878,7 @@ impl Session { opts: Option, ) -> BoxFuture<'a, anyhow::Result> { async move { - // Magnet links are different in that we first need to discover the metadata. let mut opts = opts.unwrap_or_default(); - - let paused = opts.list_only || opts.paused; - - // The main difference between magnet link and torrent file, is that we need to resolve the magnet link - // into a torrent file by connecting to peers that support extended handshakes. - // So we must discover at least one peer and connect to it to be able to proceed further. - let add_res = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") || magnet.len() == 40 => { let magnet = Magnet::parse(&magnet) @@ -906,66 +893,10 @@ impl Session { } } - let peer_rx = self.make_peer_rx( + InternalAddResult { info_hash, - if opts.disable_trackers { - Default::default() - } else { - let mut trackers = magnet.trackers.clone(); - if let Some(custom_trackers) = opts.trackers.clone() { - trackers.extend(custom_trackers); - } - trackers - }, - !paused, - opts.force_tracker_interval, - opts.initial_peers.clone().unwrap_or_default() - )?.context("can't find peers: DHT is disabled, no trackers in magnet, and no initial peers provided")?; - - debug!(?info_hash, "querying DHT"); - match read_metainfo_from_peer_receiver( - self.peer_id, - info_hash, - Default::default(), - peer_rx, - Some(self.merge_peer_opts(opts.peer_opts)), - self.connector.clone(), - ) - .await - { - ReadMetainfoResult::Found { - info, - info_bytes, - rx, - seen, - } => { - trace!(?info, "received result from DHT"); - let mut trackers = magnet.trackers.into_iter().unique().collect_vec(); - if let Some(custom_trackers) = opts.trackers.clone() { - trackers.extend(custom_trackers); - } - InternalAddResult { - info_hash, - torrent_bytes: torrent_file_from_info_bytes( - &info_bytes, - &trackers, - )?, - info_bytes: info_bytes.0, - info, - trackers, - peer_rx: Some(rx), - seen_peers: { - let seen = seen.into_iter().collect_vec(); - for peer in &seen { - trace!(?peer, "seen") - } - seen - }, - } - } - ReadMetainfoResult::ChannelClosed { .. } => { - bail!("input address stream exhausted, no way to discover torrent metainfo") - } + trackers: magnet.trackers, + metadata: None, } } other => { @@ -981,12 +912,13 @@ impl Session { url ) } - AddTorrent::TorrentFileBytes(bytes) => - torrent_from_bytes(bytes) - .context("error decoding torrent")? + AddTorrent::TorrentFileBytes(bytes) => { + torrent_from_bytes(bytes).context("error decoding torrent")? + } }; - let mut trackers = torrent.info + let mut trackers = torrent + .info .iter_announce() .unique() .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { @@ -1001,35 +933,14 @@ impl Session { trackers.extend(custom_trackers); } - let peer_rx = if paused { - None - } else { - self.make_peer_rx( - torrent.info.info_hash, - if opts.disable_trackers { - Default::default() - } else { - trackers.clone() - }, - !paused, - opts.force_tracker_interval, - opts.initial_peers.clone().unwrap_or_default() - )? - }; - InternalAddResult { info_hash: torrent.info.info_hash, - info: torrent.info.info, - torrent_bytes: torrent.torrent_bytes, - info_bytes: torrent.info_bytes, + metadata: Some(TorrentMetadata::new( + torrent.info.info, + torrent.torrent_bytes, + torrent.info_bytes, + )?), trackers, - peer_rx, - seen_peers: opts - .initial_peers - .clone() - .unwrap_or_default() - .into_iter() - .collect(), } } }; @@ -1073,19 +984,58 @@ impl Session { mut opts: AddTorrentOptions, ) -> anyhow::Result { let InternalAddResult { - info, info_hash, + metadata, trackers, - peer_rx, - seen_peers, - torrent_bytes, - info_bytes, } = add_res; - trace!("Torrent info: {:#?}", &info); + let peer_stream_permanent = !opts.paused && !opts.list_only; + let make_peer_rx = || { + self.make_peer_rx( + info_hash, + trackers.clone(), + peer_stream_permanent, + opts.force_tracker_interval, + opts.initial_peers.clone().unwrap_or_default(), + ) + .context("error creating peer stream") + }; + + let mut seen_peers = Vec::new(); + + let (metadata, peer_rx) = { + match metadata { + Some(metadata) => { + let mut peer_rx = None; + if peer_stream_permanent { + peer_rx = make_peer_rx()?; + } + (metadata, peer_rx) + } + None => { + let peer_rx = make_peer_rx()?.context( + "no known way to resolve peers (no DHT, no trackers, no initial_peers)", + )?; + let resolved_magnet = self + .resolve_magnet(info_hash, peer_rx, &trackers, opts.peer_opts) + .await?; + seen_peers = resolved_magnet.seen_peers.clone(); + let peer_rx = Some( + merge_streams( + resolved_magnet.peer_rx, + futures::stream::iter(resolved_magnet.seen_peers), + ) + .boxed(), + ); + (resolved_magnet.metadata, peer_rx) + } + } + }; + + trace!("Torrent metadata: {:#?}", &metadata.info); let only_files = compute_only_files( - &info, + &metadata.info, opts.only_files, opts.only_files_regex, opts.list_only, @@ -1093,7 +1043,7 @@ impl Session { let output_folder = match (opts.output_folder, opts.sub_folder) { (None, None) => self.output_folder.join( - self.get_default_subfolder_for_torrent(&info)? + self.get_default_subfolder_for_torrent(&metadata.info)? .unwrap_or_default(), ), (Some(o), None) => PathBuf::from(o), @@ -1112,11 +1062,11 @@ impl Session { if opts.list_only { return Ok(AddTorrentResponse::ListOnly(ListOnlyResponse { info_hash, - info, + info: metadata.info, only_files, output_folder, seen_peers, - torrent_bytes, + torrent_bytes: metadata.torrent_bytes, })); } @@ -1143,7 +1093,7 @@ impl Session { let span = error_span!(parent: self.rs(), "torrent", id); let peer_opts = self.merge_peer_opts(opts.peer_opts); - let metadata = Arc::new(TorrentMetadata::new(info, torrent_bytes, info_bytes)?); + let metadata = Arc::new(metadata); let minfo = Arc::new(ManagedTorrentShared { id, span, @@ -1411,6 +1361,58 @@ impl Session { pub fn tcp_listen_port(&self) -> Option { self.tcp_listen_port } + + async fn resolve_magnet( + self: &Arc, + info_hash: Id20, + peer_rx: PeerStream, + trackers: &[String], + peer_opts: Option, + ) -> anyhow::Result { + match read_metainfo_from_peer_receiver( + self.peer_id, + info_hash, + Default::default(), + peer_rx, + Some(self.merge_peer_opts(peer_opts)), + self.connector.clone(), + ) + .await + { + ReadMetainfoResult::Found { + info, + info_bytes, + rx, + seen, + } => { + trace!(?info, "received result from DHT"); + Ok(ResolveMagnetResult { + metadata: TorrentMetadata::new( + info, + torrent_file_from_info_bytes(&info_bytes, trackers)?, + info_bytes.0, + )?, + peer_rx: rx, + seen_peers: { + let seen = seen.into_iter().collect_vec(); + for peer in &seen { + trace!(?peer, "seen") + } + seen + }, + }) + } + ReadMetainfoResult::ChannelClosed { .. } => { + bail!("input address stream exhausted, no way to discover torrent metainfo") + } + } + } +} + +pub(crate) struct ResolveMagnetResult { + pub metadata: TorrentMetadata, + pub peer_rx: PeerStream, + pub seen_peers: Vec, } fn remove_files_and_dirs(infos: &FileInfos, files: &dyn TorrentStorage) { From f637959ba7fa7214e8552719fe0f095296906e07 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 6 Dec 2024 00:08:11 +0000 Subject: [PATCH 08/19] Group session fields + add comments (Claude) --- crates/librqbit/src/session.rs | 38 ++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 1a569f0..cffc43e 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -97,39 +97,41 @@ impl SessionDatabase { } pub struct Session { - peer_id: Id20, - dht: Option, - persistence: Option>, - pub(crate) bitv_factory: Arc, - peer_opts: PeerConnectionOptions, - spawner: BlockingSpawner, + // Core state and services + pub(crate) db: RwLock, next_id: AtomicUsize, - db: RwLock, - output_folder: PathBuf, + pub(crate) bitv_factory: Arc, + spawner: BlockingSpawner, + // Network + peer_id: Id20, tcp_listen_port: Option, + dht: Option, + pub(crate) connector: Arc, + reqwest_client: reqwest::Client, + // Lifecycle management cancellation_token: CancellationToken, + _cancellation_token_drop_guard: DropGuard, + // Runtime settings + output_folder: PathBuf, + peer_opts: PeerConnectionOptions, + default_storage_factory: Option, + persistence: Option>, disk_write_tx: Option, - default_storage_factory: Option, - - reqwest_client: reqwest::Client, - pub(crate) connector: Arc, + // Limits and throttling pub(crate) concurrent_initialize_semaphore: Arc, - - root_span: Option, - pub(crate) ratelimits: Limits, + // Monitoring / tracing / logging pub(crate) stats: SessionStats, + root_span: Option, + // Feature flags #[cfg(feature = "disable-upload")] _disable_upload: bool, - - // This is stored for all tasks to stop when session is dropped. - _cancellation_token_drop_guard: DropGuard, } async fn torrent_from_url( From 7ed037cba83695fcc5510779f9374e5a5756c6d1 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 6 Dec 2024 11:30:04 +0000 Subject: [PATCH 09/19] comments --- crates/librqbit/src/torrent_state/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 15ec11f..137618b 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -182,7 +182,9 @@ pub struct ManagedTorrentShared { } pub struct ManagedTorrent { + // Static torrent configuration that doesn't change. pub shared: Arc, + // Torrent metadata. Maybe be None when the magnet is resolving. pub metadata: ArcSwapOption, pub(crate) state_change_notify: Notify, pub(crate) locked: RwLock, From 38fec48879e793ee138470f9d5660b09ac65eeb6 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 6 Dec 2024 12:08:03 +0000 Subject: [PATCH 10/19] Reorganize _start() a bit --- crates/librqbit/src/torrent_state/mod.rs | 341 ++++++++++++----------- 1 file changed, 182 insertions(+), 159 deletions(-) diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 137618b..87ba127 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -279,6 +279,116 @@ impl ManagedTorrent { peer_rx: Option, start_paused: bool, ) -> anyhow::Result<()> { + // State machine transitions. + // + // - error -> initializing + // - initializing -> paused + // - paused -> live + // - live -> paused + // + // - initializing -> error + // - live -> error + + fn _start<'a>( + t: &'a Arc, + peer_rx: Option, + start_paused: bool, + session: Arc, + g: Option>, + token: CancellationToken, + ) -> anyhow::Result<()> { + let mut g = g.unwrap_or_else(|| t.locked.write()); + + match &g.state { + ManagedTorrentState::Live(_) => { + bail!("torrent is already live"); + } + ManagedTorrentState::Initializing(init) => { + let init = init.clone(); + drop(g); + let t = t.clone(); + let span = t.shared().span.clone(); + let token = token.clone(); + + spawn_with_cancel( + error_span!(parent: span.clone(), "initialize_and_start"), + token.clone(), + async move { + let concurrent_init_semaphore = + session.concurrent_initialize_semaphore.clone(); + let _permit = concurrent_init_semaphore + .acquire() + .await + .context("bug: concurrent init semaphore was closed")?; + + match init.check().await { + Ok(paused) => { + let mut g = t.locked.write(); + if let ManagedTorrentState::Initializing(_) = &g.state { + } else { + debug!("no need to start torrent anymore, as it switched state from initilizing"); + return Ok(()); + } + + g.state = ManagedTorrentState::Paused(paused); + t.state_change_notify.notify_waiters(); + + if start_paused { + return Ok(()); + } + + _start(&t, peer_rx, start_paused, session, Some(g), token) + } + Err(err) => { + let result = anyhow::anyhow!("{:?}", err); + t.locked.write().state = ManagedTorrentState::Error(err); + t.state_change_notify.notify_waiters(); + Err(result) + } + } + }, + ); + Ok(()) + } + ManagedTorrentState::Paused(_) => { + if start_paused { + warn!("start(start_paused=true) called, but torrent already paused"); + return Ok(()); + } + let paused = g.state.take().assert_paused(); + let (tx, rx) = tokio::sync::oneshot::channel(); + let live = TorrentStateLive::new(paused, tx, token.clone())?; + g.state = ManagedTorrentState::Live(live.clone()); + t.state_change_notify.notify_waiters(); + drop(g); + + spawn_fatal_errors_receiver(t, rx, token); + if let Some(peer_rx) = peer_rx { + spawn_peer_adder(&live, peer_rx); + } + Ok(()) + } + ManagedTorrentState::Error(_) => { + let metadata = t.metadata.load_full().expect("TODO"); + let initializing = Arc::new(TorrentStateInitializing::new( + t.shared.clone(), + metadata.clone(), + g.only_files.clone(), + t.shared + .storage_factory + .create_and_init(t.shared(), &metadata)?, + true, + )); + g.state = ManagedTorrentState::Initializing(initializing.clone()); + t.state_change_notify.notify_waiters(); + + // Recurse. + _start(t, peer_rx, start_paused, session, Some(g), token) + } + ManagedTorrentState::None => bail!("bug: torrent is in empty state"), + } + } + let session = self .shared .session @@ -288,165 +398,14 @@ impl ManagedTorrent { g.paused = start_paused; let cancellation_token = session.cancellation_token().child_token(); - let spawn_fatal_errors_receiver = - |state: &Arc, - rx: tokio::sync::oneshot::Receiver, - token: CancellationToken| { - let span = state.shared.span.clone(); - let state = Arc::downgrade(state); - spawn_with_cancel( - error_span!(parent: span, "fatal_errors_receiver"), - token, - async move { - let e = match rx.await { - Ok(e) => e, - Err(_) => return Ok(()), - }; - if let Some(state) = state.upgrade() { - state.stop_with_error(e); - } else { - warn!("tried to stop the torrent with error, but couldn't upgrade the arc"); - } - Ok(()) - }, - ); - }; - - fn spawn_peer_adder(live: &Arc, mut peer_rx: PeerStream) { - live.spawn( - error_span!(parent: live.torrent().span.clone(), "external_peer_adder"), - { - let live = live.clone(); - async move { - let live = { - let weak = Arc::downgrade(&live); - drop(live); - weak - }; - - loop { - match timeout(Duration::from_secs(5), peer_rx.next()).await { - Ok(Some(peer)) => { - trace!(?peer, "received peer from peer_rx"); - let live = match live.upgrade() { - Some(live) => live, - None => return Ok(()), - }; - live.add_peer_if_not_seen(peer).context("torrent closed")?; - } - Ok(None) => { - debug!("peer_rx closed, closing peer adder"); - return Ok(()); - } - // If timeout, check if the torrent is live. - Err(_) if live.strong_count() == 0 => { - debug!("timed out waiting for peers, torrent isn't live, closing peer adder"); - return Ok(()); - } - Err(_) => continue, - } - } - } - }, - ); - } - - match &g.state { - ManagedTorrentState::Live(_) => { - bail!("torrent is already live"); - } - ManagedTorrentState::Initializing(init) => { - let init = init.clone(); - drop(g); - let t = self.clone(); - let span = self.shared().span.clone(); - let token = cancellation_token.clone(); - - spawn_with_cancel( - error_span!(parent: span.clone(), "initialize_and_start"), - token.clone(), - async move { - let concurrent_init_semaphore = - session.concurrent_initialize_semaphore.clone(); - let _permit = concurrent_init_semaphore - .acquire() - .await - .context("bug: concurrent init semaphore was closed")?; - - match init.check().await { - Ok(paused) => { - let mut g = t.locked.write(); - if let ManagedTorrentState::Initializing(_) = &g.state { - } else { - debug!("no need to start torrent anymore, as it switched state from initilizing"); - return Ok(()); - } - - if start_paused { - g.state = ManagedTorrentState::Paused(paused); - t.state_change_notify.notify_waiters(); - return Ok(()); - } - - let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new(paused, tx, cancellation_token)?; - g.state = ManagedTorrentState::Live(live.clone()); - drop(g); - - t.state_change_notify.notify_waiters(); - - spawn_fatal_errors_receiver(&t, rx, token); - if let Some(peer_rx) = peer_rx { - spawn_peer_adder(&live, peer_rx); - } - - Ok(()) - } - Err(err) => { - let result = anyhow::anyhow!("{:?}", err); - t.locked.write().state = ManagedTorrentState::Error(err); - t.state_change_notify.notify_waiters(); - Err(result) - } - } - }, - ); - Ok(()) - } - ManagedTorrentState::Paused(_) => { - let paused = g.state.take().assert_paused(); - let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new(paused, tx, cancellation_token.clone())?; - g.state = ManagedTorrentState::Live(live.clone()); - drop(g); - - spawn_fatal_errors_receiver(self, rx, cancellation_token); - if let Some(peer_rx) = peer_rx { - spawn_peer_adder(&live, peer_rx); - } - Ok(()) - } - ManagedTorrentState::Error(_) => { - let metadata = self.metadata.load_full().expect("TODO"); - let initializing = Arc::new(TorrentStateInitializing::new( - self.shared.clone(), - metadata.clone(), - g.only_files.clone(), - self.shared - .storage_factory - .create_and_init(self.shared(), &metadata)?, - true, - )); - g.state = ManagedTorrentState::Initializing(initializing.clone()); - drop(g); - - self.state_change_notify.notify_waiters(); - - // Recurse. - self.start(peer_rx, start_paused) - } - ManagedTorrentState::None => bail!("bug: torrent is in empty state"), - } + _start( + self, + peer_rx, + start_paused, + session, + Some(g), + cancellation_token, + ) } pub fn is_paused(&self) -> bool { @@ -618,3 +577,67 @@ impl ManagedTorrent { } pub type ManagedTorrentHandle = Arc; + +fn spawn_fatal_errors_receiver( + state: &Arc, + rx: tokio::sync::oneshot::Receiver, + token: CancellationToken, +) { + let span = state.shared.span.clone(); + let state = Arc::downgrade(state); + spawn_with_cancel( + error_span!(parent: span, "fatal_errors_receiver"), + token, + async move { + let e = match rx.await { + Ok(e) => e, + Err(_) => return Ok(()), + }; + if let Some(state) = state.upgrade() { + state.stop_with_error(e); + } else { + warn!("tried to stop the torrent with error, but couldn't upgrade the arc"); + } + Ok(()) + }, + ); +} + +fn spawn_peer_adder(live: &Arc, mut peer_rx: PeerStream) { + live.spawn( + error_span!(parent: live.torrent().span.clone(), "external_peer_adder"), + { + let live = live.clone(); + async move { + let live = { + let weak = Arc::downgrade(&live); + drop(live); + weak + }; + + loop { + match timeout(Duration::from_secs(5), peer_rx.next()).await { + Ok(Some(peer)) => { + trace!(?peer, "received peer from peer_rx"); + let live = match live.upgrade() { + Some(live) => live, + None => return Ok(()), + }; + live.add_peer_if_not_seen(peer).context("torrent closed")?; + } + Ok(None) => { + debug!("peer_rx closed, closing peer adder"); + return Ok(()); + } + // If timeout, check if the torrent is live. + Err(_) if live.strong_count() == 0 => { + debug!("timed out waiting for peers, torrent isn't live, closing peer adder"); + return Ok(()); + } + Err(_) => continue, + } + } + } + }, + ); +} From 5f07872725542f23e8297cb69aec50348b144b48 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 6 Dec 2024 12:14:34 +0000 Subject: [PATCH 11/19] Docs for _start() --- crates/librqbit/src/session.rs | 4 +--- crates/librqbit/src/torrent_state/mod.rs | 7 +++++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index cffc43e..61a5401 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1336,9 +1336,7 @@ impl Session { } pub async fn pause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { - handle - .pause() - .map(|_| handle.locked.write().paused = true)?; + handle.pause()?; self.try_update_persistence_metadata(handle).await; Ok(()) } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 87ba127..93d2a2b 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -274,6 +274,8 @@ impl ManagedTorrent { g.state = ManagedTorrentState::Error(error) } + /// peer_rx: the peer stream. If start_paused=false, must be set. + /// start_paused: if set, the torrent will initialize (check file integrity), but will not start pub(crate) fn start( self: &Arc, peer_rx: Option, @@ -389,6 +391,10 @@ impl ManagedTorrent { } } + if !start_paused && peer_rx.is_none() { + bail!("logic bug: start(start_paused=true, peer_rx=None) called. peer_rx must be set if starting the torrent") + } + let session = self .shared .session @@ -419,6 +425,7 @@ impl ManagedTorrent { ManagedTorrentState::Live(live) => { let paused = live.pause()?; g.state = ManagedTorrentState::Paused(paused); + g.paused = true; self.state_change_notify.notify_waiters(); Ok(()) } From 4c654a0631eb08cf50bc36209d9b379e5bce2f92 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 6 Dec 2024 12:17:52 +0000 Subject: [PATCH 12/19] Tiny optimisation for code size --- crates/librqbit/src/torrent_state/mod.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 93d2a2b..569c48b 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -334,11 +334,6 @@ impl ManagedTorrent { g.state = ManagedTorrentState::Paused(paused); t.state_change_notify.notify_waiters(); - - if start_paused { - return Ok(()); - } - _start(&t, peer_rx, start_paused, session, Some(g), token) } Err(err) => { @@ -354,7 +349,6 @@ impl ManagedTorrent { } ManagedTorrentState::Paused(_) => { if start_paused { - warn!("start(start_paused=true) called, but torrent already paused"); return Ok(()); } let paused = g.state.take().assert_paused(); From 5df342e442d6fbd2bc00737beb5eccbc311a2005 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 6 Dec 2024 12:25:05 +0000 Subject: [PATCH 13/19] Nothing: move a comment --- crates/librqbit/src/torrent_state/mod.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 569c48b..3cefaa3 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -56,6 +56,15 @@ use self::paused::TorrentStatePaused; pub use self::stats::{TorrentStats, TorrentStatsState}; pub use self::streaming::FileStream; +// State machine transitions. +// +// - error -> initializing +// - initializing -> paused +// - paused -> live +// - live -> paused +// +// - initializing -> error +// - live -> error pub enum ManagedTorrentState { Initializing(Arc), Paused(TorrentStatePaused), @@ -281,16 +290,6 @@ impl ManagedTorrent { peer_rx: Option, start_paused: bool, ) -> anyhow::Result<()> { - // State machine transitions. - // - // - error -> initializing - // - initializing -> paused - // - paused -> live - // - live -> paused - // - // - initializing -> error - // - live -> error - fn _start<'a>( t: &'a Arc, peer_rx: Option, From 0a92cf1d65f67e04b8b0282b261af87a4304ff09 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 6 Dec 2024 12:37:53 +0000 Subject: [PATCH 14/19] cleanup a couple unnecessary drop(g) calls --- crates/librqbit/src/torrent_state/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 3cefaa3..e559be5 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -306,7 +306,6 @@ impl ManagedTorrent { } ManagedTorrentState::Initializing(init) => { let init = init.clone(); - drop(g); let t = t.clone(); let span = t.shared().span.clone(); let token = token.clone(); @@ -355,7 +354,6 @@ impl ManagedTorrent { let live = TorrentStateLive::new(paused, tx, token.clone())?; g.state = ManagedTorrentState::Live(live.clone()); t.state_change_notify.notify_waiters(); - drop(g); spawn_fatal_errors_receiver(t, rx, token); if let Some(peer_rx) = peer_rx { From 0fabb453aa0685c10cf61540a612825baa89eba2 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 6 Dec 2024 12:57:26 +0000 Subject: [PATCH 15/19] Storing torrent name in ManagedTorrentShared --- crates/librqbit/src/api.rs | 14 +++++++++----- crates/librqbit/src/session.rs | 20 +++++++++++++++++++- crates/librqbit/src/torrent_state/mod.rs | 17 +++++++++++++++++ crates/librqbit_core/src/magnet.rs | 10 ++++++++++ 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index ec8a4f6..c9f90fe 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -209,10 +209,7 @@ impl Api { let mut r = TorrentDetailsResponse { id: Some(id), info_hash: mgr.shared().info_hash.as_string(), - name: mgr - .with_metadata(|r| r.info.name.as_ref().map(|n| n.to_string())) - .ok() - .flatten(), + name: mgr.name(), output_folder: mgr .shared() .options @@ -249,6 +246,7 @@ impl Api { Some(handle.id()), &info_hash, handle.metadata.load().as_ref().map(|r| &r.info), + handle.name().as_deref(), only_files.as_deref(), output_folder, ) @@ -383,6 +381,7 @@ impl Api { Some(id), &handle.info_hash(), handle.metadata.load().as_ref().map(|r| &r.info), + handle.name().as_deref(), handle.only_files().as_deref(), handle .shared() @@ -419,6 +418,7 @@ impl Api { None, &info_hash, Some(&info), + None, only_files.as_deref(), output_folder.to_string_lossy().into_owned().to_string(), ) @@ -429,6 +429,7 @@ impl Api { Some(id), &handle.info_hash(), handle.metadata.load().as_ref().map(|r| &r.info), + handle.name().as_deref(), handle.only_files().as_deref(), handle .shared() @@ -532,6 +533,7 @@ fn make_torrent_details( id: Option, info_hash: &Id20, info: Option<&TorrentMetaV1Info>, + name: Option<&str>, only_files: Option<&[usize]>, output_folder: String, ) -> Result { @@ -564,7 +566,9 @@ fn make_torrent_details( Ok(TorrentDetailsResponse { id, info_hash: info_hash.as_string(), - name: info.and_then(|i| i.name.as_ref().map(|b| b.to_string())), + name: name + .map(|s| s.to_owned()) + .or_else(|| info.and_then(|i| i.name.as_ref().map(|b| b.to_string()))), files: Some(files), output_folder, stats: None, diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 61a5401..4131c39 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -477,6 +477,7 @@ struct InternalAddResult { info_hash: Id20, metadata: Option, trackers: Vec, + name: Option, } impl Session { @@ -899,6 +900,7 @@ impl Session { info_hash, trackers: magnet.trackers, metadata: None, + name: magnet.name, } } other => { @@ -943,6 +945,7 @@ impl Session { torrent.info_bytes, )?), trackers, + name: None, } } }; @@ -956,6 +959,7 @@ impl Session { fn get_default_subfolder_for_torrent( &self, info: &TorrentMetaV1Info, + magnet_name: Option<&str>, ) -> anyhow::Result> { let files = info .iter_file_details()? @@ -964,11 +968,23 @@ impl Session { if files.len() < 2 { return Ok(None); } + fn check_valid(name: &str) -> anyhow::Result<()> { + if name.contains("/") || name.contains("\\") || name.contains("..") { + bail!("path traversal in torrent name detected") + } + Ok(()) + } + if let Some(name) = &info.name { let s = std::str::from_utf8(name.as_slice()).context("invalid UTF-8 in torrent name")?; + check_valid(s)?; return Ok(Some(PathBuf::from(s))); }; + if let Some(name) = magnet_name { + check_valid(name)?; + return Ok(Some(PathBuf::from(name))); + } // Let the subfolder name be the longest filename let longest = files .iter() @@ -989,6 +1005,7 @@ impl Session { info_hash, metadata, trackers, + name, } = add_res; let peer_stream_permanent = !opts.paused && !opts.list_only; @@ -1045,7 +1062,7 @@ impl Session { let output_folder = match (opts.output_folder, opts.sub_folder) { (None, None) => self.output_folder.join( - self.get_default_subfolder_for_torrent(&metadata.info)? + self.get_default_subfolder_for_torrent(&metadata.info, name.as_deref())? .unwrap_or_default(), ), (Some(o), None) => PathBuf::from(o), @@ -1118,6 +1135,7 @@ impl Session { }, connector: self.connector.clone(), session: Arc::downgrade(self), + magnet_name: name, }); let initializing = Arc::new(TorrentStateInitializing::new( diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index e559be5..d0454ce 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -141,6 +141,7 @@ pub struct TorrentMetadata { pub info_bytes: Bytes, pub lengths: Lengths, pub file_infos: FileInfos, + pub name: Option, } impl TorrentMetadata { @@ -162,12 +163,18 @@ impl TorrentMetadata { }) }) .collect::>>()?; + let name = info + .name + .as_ref() + .and_then(|n| std::str::from_utf8(n.as_ref()).ok()) + .map(|s| s.to_owned()); Ok(Self { info, torrent_bytes, info_bytes, lengths, file_infos, + name, }) } } @@ -188,6 +195,9 @@ pub struct ManagedTorrentShared { pub(crate) connector: Arc, pub(crate) storage_factory: BoxStorageFactory, pub(crate) session: Weak, + + // "dn" from magnet link + pub(crate) magnet_name: Option, } pub struct ManagedTorrent { @@ -204,6 +214,13 @@ impl ManagedTorrent { self.shared.id } + pub fn name(&self) -> Option { + if let Some(m) = &*self.metadata.load() { + return m.name.clone().or_else(|| self.shared.magnet_name.clone()); + } + self.shared.magnet_name.clone() + } + pub fn shared(&self) -> &ManagedTorrentShared { &self.shared } diff --git a/crates/librqbit_core/src/magnet.rs b/crates/librqbit_core/src/magnet.rs index 2a3d35a..fdd22c0 100644 --- a/crates/librqbit_core/src/magnet.rs +++ b/crates/librqbit_core/src/magnet.rs @@ -9,6 +9,7 @@ pub struct Magnet { id20: Option, id32: Option, pub trackers: Vec, + pub name: Option, select_only: Option>, } @@ -29,6 +30,7 @@ impl Magnet { id20: Some(id20), id32: None, trackers, + name: None, select_only, } } @@ -40,6 +42,7 @@ impl Magnet { return Ok(Magnet { id20: Some(id20), id32: None, + name: None, trackers: vec![], select_only: None, }); @@ -52,6 +55,7 @@ impl Magnet { let mut info_hash_found = false; let mut id20: Option = None; let mut id32: Option = None; + let mut name: Option = None; let mut trackers = Vec::::new(); let mut files = Vec::::new(); for (key, value) in url.query_pairs() { @@ -70,6 +74,11 @@ impl Magnet { } } "tr" => trackers.push(value.into()), + "dn" => { + if !value.is_empty() { + name = Some(value.into_owned()) + } + } "so" => { // Process 'so' values, but silently ignore any which fail parsing for file_desc in value.split(',') { @@ -100,6 +109,7 @@ impl Magnet { id20, id32, trackers, + name, select_only: if files.is_empty() { None } else { Some(files) }, }), false => { From e4845e2368460f869fe116280fb57079f36e56fb Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 7 Dec 2024 11:51:58 +0000 Subject: [PATCH 16/19] Add a clarifying comment --- crates/librqbit/src/torrent_state/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index d0454ce..3243e52 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -203,7 +203,7 @@ pub struct ManagedTorrentShared { pub struct ManagedTorrent { // Static torrent configuration that doesn't change. pub shared: Arc, - // Torrent metadata. Maybe be None when the magnet is resolving. + // Torrent metadata. Maybe be None when the magnet is resolving (not implemented yet) pub metadata: ArcSwapOption, pub(crate) state_change_notify: Notify, pub(crate) locked: RwLock, From afca0fbbe38e72a1bbe44b417ffef01e93024fd1 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 7 Dec 2024 12:06:06 +0000 Subject: [PATCH 17/19] Remove unnecessary initial_peers merging code --- crates/librqbit/src/session.rs | 14 -------------- crates/librqbit/src/torrent_state/mod.rs | 2 +- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 4131c39..f942f42 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1167,20 +1167,6 @@ impl Session { } } - // Merge "initial_peers" and "peer_rx" into one stream. - let peer_rx = merge_two_optional_streams( - if !seen_peers.is_empty() { - debug!( - count = seen_peers.len(), - "merging initial peers into peer_rx" - ); - Some(futures::stream::iter(seen_peers.into_iter())) - } else { - None - }, - peer_rx, - ); - let _e = managed_torrent.shared.span.clone().entered(); managed_torrent .start(peer_rx, opts.paused) diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 3243e52..c98afd4 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -400,7 +400,7 @@ impl ManagedTorrent { } if !start_paused && peer_rx.is_none() { - bail!("logic bug: start(start_paused=true, peer_rx=None) called. peer_rx must be set if starting the torrent") + bail!("logic bug: start(start_paused=false, peer_rx=None) called. peer_rx must be set if starting the torrent") } let session = self From 82f167d50dddd99602b221ea7bcf8a693c401f26 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 7 Dec 2024 12:14:59 +0000 Subject: [PATCH 18/19] Removed the overly conservative check --- crates/librqbit/src/session.rs | 1 + crates/librqbit/src/torrent_state/mod.rs | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index f942f42..fb3da41 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1168,6 +1168,7 @@ impl Session { } let _e = managed_torrent.shared.span.clone().entered(); + managed_torrent .start(peer_rx, opts.paused) .context("error starting torrent")?; diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index c98afd4..90b8f60 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -399,10 +399,6 @@ impl ManagedTorrent { } } - if !start_paused && peer_rx.is_none() { - bail!("logic bug: start(start_paused=false, peer_rx=None) called. peer_rx must be set if starting the torrent") - } - let session = self .shared .session From 9e4c465336a444f307d77cddb240642c4901242e Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 7 Dec 2024 12:17:17 +0000 Subject: [PATCH 19/19] A couple touch-ups --- crates/librqbit/src/session.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index fb3da41..a22b53f 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1008,12 +1008,11 @@ impl Session { name, } = add_res; - let peer_stream_permanent = !opts.paused && !opts.list_only; let make_peer_rx = || { self.make_peer_rx( info_hash, trackers.clone(), - peer_stream_permanent, + !opts.paused && !opts.list_only, opts.force_tracker_interval, opts.initial_peers.clone().unwrap_or_default(), ) @@ -1026,7 +1025,7 @@ impl Session { match metadata { Some(metadata) => { let mut peer_rx = None; - if peer_stream_permanent { + if !opts.paused && !opts.list_only { peer_rx = make_peer_rx()?; } (metadata, peer_rx) @@ -1038,6 +1037,9 @@ impl Session { let resolved_magnet = self .resolve_magnet(info_hash, peer_rx, &trackers, opts.peer_opts) .await?; + + // Add back seen_peers into the peer stream, as we consumed some peers + // while resolving the magnet. seen_peers = resolved_magnet.seen_peers.clone(); let peer_rx = Some( merge_streams( @@ -1072,12 +1074,6 @@ impl Session { (None, Some(s)) => self.output_folder.join(s), }; - let storage_factory = opts - .storage_factory - .take() - .or_else(|| self.default_storage_factory.as_ref().map(|f| f.clone_box())) - .unwrap_or_else(|| FilesystemStorageFactory::default().boxed()); - if opts.list_only { return Ok(AddTorrentResponse::ListOnly(ListOnlyResponse { info_hash, @@ -1089,6 +1085,12 @@ impl Session { })); } + let storage_factory = opts + .storage_factory + .take() + .or_else(|| self.default_storage_factory.as_ref().map(|f| f.clone_box())) + .unwrap_or_else(|| FilesystemStorageFactory::default().boxed()); + let id = if let Some(id) = opts.preferred_id { id } else if let Some(p) = self.persistence.as_ref() {