From 100b7116df4c3300e9a95ed0904c3eb87b2f773d Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 5 Dec 2024 22:57:34 +0000 Subject: [PATCH] Split out TorrentMetadata --- crates/librqbit/examples/custom_storage.rs | 12 ++- crates/librqbit/examples/ubuntu.rs | 4 +- crates/librqbit/src/api.rs | 67 +++++++------- crates/librqbit/src/http_api.rs | 12 ++- crates/librqbit/src/lib.rs | 3 +- crates/librqbit/src/session.rs | 52 +++++------ .../librqbit/src/session_persistence/json.rs | 11 ++- .../src/session_persistence/postgres.rs | 9 +- crates/librqbit/src/storage/filesystem/fs.rs | 23 +++-- .../librqbit/src/storage/filesystem/mmap.rs | 20 ++-- .../librqbit/src/storage/middleware/slow.rs | 17 +++- .../librqbit/src/storage/middleware/timing.rs | 17 +++- .../storage/middleware/write_through_cache.rs | 23 +++-- crates/librqbit/src/storage/mod.rs | 48 +++++++--- .../src/torrent_state/initializing.rs | 44 +++++---- crates/librqbit/src/torrent_state/live/mod.rs | 91 ++++++++++--------- crates/librqbit/src/torrent_state/mod.rs | 72 +++++++++++++-- crates/librqbit/src/torrent_state/paused.rs | 5 +- .../librqbit/src/torrent_state/streaming.rs | 47 +++++++--- crates/librqbit/src/upnp_server_adapter.rs | 59 ++++++++---- 20 files changed, 411 insertions(+), 225 deletions(-) diff --git a/crates/librqbit/examples/custom_storage.rs b/crates/librqbit/examples/custom_storage.rs index f22daa8..af6f7dd 100644 --- a/crates/librqbit/examples/custom_storage.rs +++ b/crates/librqbit/examples/custom_storage.rs @@ -20,7 +20,11 @@ struct CustomStorage { impl StorageFactory for CustomStorageFactory { type Storage = CustomStorage; - fn create(&self, _info: &librqbit::ManagedTorrentShared) -> anyhow::Result { + fn create( + &self, + _: &librqbit::ManagedTorrentShared, + _: &librqbit::TorrentMetadata, + ) -> anyhow::Result { Ok(CustomStorage::default()) } @@ -54,7 +58,11 @@ impl TorrentStorage for CustomStorage { anyhow::bail!("not implemented") } - fn init(&mut self, _meta: &librqbit::ManagedTorrentShared) -> anyhow::Result<()> { + fn init( + &mut self, + _meta: &librqbit::ManagedTorrentShared, + _: &librqbit::TorrentMetadata, + ) -> anyhow::Result<()> { anyhow::bail!("not implemented") } } diff --git a/crates/librqbit/examples/ubuntu.rs b/crates/librqbit/examples/ubuntu.rs index 7dbd189..c35c907 100644 --- a/crates/librqbit/examples/ubuntu.rs +++ b/crates/librqbit/examples/ubuntu.rs @@ -49,7 +49,9 @@ async fn main() -> Result<(), anyhow::Error> { _ => unreachable!(), }; - info!("Details: {:?}", &handle.shared().info); + handle.with_metadata(|r| { + info!("Details: {:?}", &r.info); + })?; // Print stats periodically. tokio::spawn({ diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 230274c..ec8a4f6 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -209,7 +209,10 @@ impl Api { let mut r = TorrentDetailsResponse { id: Some(id), info_hash: mgr.shared().info_hash.as_string(), - name: mgr.shared().info.name.as_ref().map(|n| n.to_string()), + name: mgr + .with_metadata(|r| r.info.name.as_ref().map(|n| n.to_string())) + .ok() + .flatten(), output_folder: mgr .shared() .options @@ -245,7 +248,7 @@ impl Api { make_torrent_details( Some(handle.id()), &info_hash, - &handle.shared().info, + handle.metadata.load().as_ref().map(|r| &r.info), only_files.as_deref(), output_folder, ) @@ -261,8 +264,7 @@ impl Api { file_idx: usize, ) -> Result<&'static str> { let handle = self.mgr_handle(idx)?; - let info = &handle.shared().info; - torrent_file_mime_type(info, file_idx) + handle.with_metadata(|r| torrent_file_mime_type(&r.info, file_idx))? } pub fn api_peer_stats( @@ -380,7 +382,7 @@ impl Api { let details = make_torrent_details( Some(id), &handle.info_hash(), - &handle.shared().info, + handle.metadata.load().as_ref().map(|r| &r.info), handle.only_files().as_deref(), handle .shared() @@ -416,7 +418,7 @@ impl Api { details: make_torrent_details( None, &info_hash, - &info, + Some(&info), only_files.as_deref(), output_folder.to_string_lossy().into_owned().to_string(), ) @@ -426,7 +428,7 @@ impl Api { let details = make_torrent_details( Some(id), &handle.info_hash(), - &handle.shared().info, + handle.metadata.load().as_ref().map(|r| &r.info), handle.only_files().as_deref(), handle .shared() @@ -529,37 +531,40 @@ pub struct ApiAddTorrentResponse { fn make_torrent_details( id: Option, info_hash: &Id20, - info: &TorrentMetaV1Info, + info: Option<&TorrentMetaV1Info>, only_files: Option<&[usize]>, output_folder: String, ) -> Result { - let files = info - .iter_file_details() - .context("error iterating filenames and lengths")? - .enumerate() - .map(|(idx, d)| { - let name = match d.filename.to_string() { - Ok(s) => s, - Err(err) => { - warn!("error reading filename: {:?}", err); - "".to_string() + let files = match info { + Some(info) => info + .iter_file_details() + .context("error iterating filenames and lengths")? + .enumerate() + .map(|(idx, d)| { + let name = match d.filename.to_string() { + Ok(s) => s, + Err(err) => { + warn!("error reading filename: {:?}", err); + "".to_string() + } + }; + let components = d.filename.to_vec().unwrap_or_default(); + let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true); + TorrentDetailsResponseFile { + name, + components, + length: d.len, + included, + attributes: d.attrs(), } - }; - let components = d.filename.to_vec().unwrap_or_default(); - let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true); - TorrentDetailsResponseFile { - name, - components, - length: d.len, - included, - attributes: d.attrs(), - } - }) - .collect(); + }) + .collect(), + None => Default::default(), + }; Ok(TorrentDetailsResponse { id, info_hash: info_hash.as_string(), - name: info.name.as_ref().map(|b| b.to_string()), + name: info.and_then(|i| i.name.as_ref().map(|b| b.to_string())), files: Some(files), output_folder, stats: None, diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 69d3e59..e4bc289 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -262,7 +262,10 @@ impl HttpApi { fn torrent_playlist_items(handle: &ManagedTorrent) -> Result> { let mut playlist_items = handle - .shared() + .metadata + .load() + .as_ref() + .context("torrent metadata not resolved")? .info .iter_file_details()? .enumerate() @@ -340,10 +343,9 @@ impl HttpApi { .context("timeout")??; let (info, content) = match added { - crate::AddTorrentResponse::AlreadyManaged(_, handle) => ( - handle.shared().info.clone(), - handle.shared().torrent_bytes.clone(), - ), + crate::AddTorrentResponse::AlreadyManaged(_, handle) => { + handle.with_metadata(|r| (r.info.clone(), r.torrent_bytes.clone()))? + } crate::AddTorrentResponse::ListOnly(ListOnlyResponse { info, torrent_bytes, diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index e13b9f1..0b689dc 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -82,7 +82,8 @@ pub use session::{ }; pub use spawn_utils::spawn as librqbit_spawn; pub use torrent_state::{ - ManagedTorrent, ManagedTorrentShared, ManagedTorrentState, TorrentStats, TorrentStatsState, + ManagedTorrent, ManagedTorrentShared, ManagedTorrentState, TorrentMetadata, TorrentStats, + TorrentStatsState, }; pub use type_aliases::FileInfos; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 019f39e..8999820 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -12,7 +12,6 @@ use crate::{ api::TorrentIdOrHash, bitv_factory::{BitVFactory, NonPersistentBitVFactory}, dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, - file_info::FileInfo, limits::{Limits, LimitsConfig}, merge_streams::merge_streams, peer_connection::PeerConnectionOptions, @@ -26,12 +25,13 @@ use crate::{ stream_connect::{SocksProxyConfig, StreamConnector}, torrent_state::{ initializing::TorrentStateInitializing, ManagedTorrentHandle, ManagedTorrentLocked, - ManagedTorrentOptions, ManagedTorrentState, TorrentStateLive, + ManagedTorrentOptions, ManagedTorrentState, TorrentMetadata, TorrentStateLive, }, type_aliases::{DiskWorkQueueSender, PeerStream}, - ManagedTorrent, ManagedTorrentShared, + FileInfos, ManagedTorrent, ManagedTorrentShared, }; use anyhow::{bail, Context}; +use arc_swap::ArcSwapOption; use bencode::bencode_serialize_to_writer; use buffers::{ByteBuf, ByteBufOwned, ByteBufT}; use bytes::Bytes; @@ -46,7 +46,6 @@ use itertools::Itertools; use librqbit_core::{ constants::CHUNK_SIZE, directories::get_configuration_directory, - lengths::Lengths, magnet::Magnet, peer_id::generate_peer_id, spawn_utils::spawn_with_cancel, @@ -1130,7 +1129,7 @@ impl Session { .fetch_add(1, std::sync::atomic::Ordering::Relaxed) }; - let managed_torrent = { + let (managed_torrent, metadata) = { let mut g = self.db.write(); if let Some((id, handle)) = g.torrents.iter().find_map(|(eid, t)| { if t.info_hash() == info_hash || *eid == id { @@ -1142,34 +1141,16 @@ impl Session { return Ok(AddTorrentResponse::AlreadyManaged(id, handle)); } - let lengths = Lengths::from_torrent(&info)?; - let file_infos = info - .iter_file_details_ext(&lengths)? - .map(|fd| { - Ok::<_, anyhow::Error>(FileInfo { - relative_filename: fd.details.filename.to_pathbuf()?, - offset_in_torrent: fd.offset, - piece_range: fd.pieces, - len: fd.details.len, - attrs: fd.details.attrs(), - }) - }) - .collect::>>()?; - let span = error_span!(parent: self.rs(), "torrent", id); let peer_opts = self.merge_peer_opts(opts.peer_opts); + let metadata = Arc::new(TorrentMetadata::new(info, torrent_bytes, info_bytes)?); let minfo = Arc::new(ManagedTorrentShared { id, span, - file_infos, - info, - torrent_bytes, - info_bytes, info_hash, trackers: trackers.into_iter().collect(), spawner: self.spawner, peer_id: self.peer_id, - lengths, storage_factory, options: ManagedTorrentOptions { force_tracker_interval: opts.force_tracker_interval, @@ -1189,8 +1170,9 @@ impl Session { let initializing = Arc::new(TorrentStateInitializing::new( minfo.clone(), + metadata.clone(), only_files.clone(), - minfo.storage_factory.create_and_init(&minfo)?, + minfo.storage_factory.create_and_init(&minfo, &metadata)?, false, )); let handle = Arc::new(ManagedTorrent { @@ -1201,10 +1183,11 @@ impl Session { }), state_change_notify: Notify::new(), shared: minfo, + metadata: ArcSwapOption::new(Some(metadata.clone())), }); g.add_torrent(handle.clone(), id); - handle + (handle, metadata) }; if let Some(p) = self.persistence.as_ref() { @@ -1233,7 +1216,7 @@ impl Session { .start(peer_rx, opts.paused) .context("error starting torrent")?; - if let Some(name) = managed_torrent.shared().info.name.as_ref() { + if let Some(name) = metadata.info.name.as_ref() { info!(?name, "added torrent"); } @@ -1281,6 +1264,8 @@ impl Session { debug!("error pausing torrent before deletion: {e:#}") } + let metadata = removed.metadata.load_full().expect("TODO"); + let storage = removed .with_state_mut(|s| match s.take() { ManagedTorrentState::Initializing(p) => p.files.take().ok(), @@ -1297,7 +1282,12 @@ impl Session { _ => None, }) .map(Ok) - .unwrap_or_else(|| removed.shared.storage_factory.create(removed.shared())); + .unwrap_or_else(|| { + removed + .shared + .storage_factory + .create(removed.shared(), &metadata) + }); if let Some(p) = self.persistence.as_ref() { if let Err(e) = p.delete(id).await { @@ -1311,7 +1301,7 @@ impl Session { (Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"), (Ok(storage), true) => { debug!("will delete files"); - remove_files_and_dirs(removed.shared(), &storage); + remove_files_and_dirs(&metadata.file_infos, &storage); if removed.shared().options.output_folder != self.output_folder { if let Err(e) = storage.remove_directory_if_empty(Path::new("")) { warn!( @@ -1423,9 +1413,9 @@ impl Session { } } -fn remove_files_and_dirs(info: &ManagedTorrentShared, files: &dyn TorrentStorage) { +fn remove_files_and_dirs(infos: &FileInfos, files: &dyn TorrentStorage) { let mut all_dirs = HashSet::new(); - for (id, fi) in info.file_infos.iter().enumerate() { + for (id, fi) in infos.iter().enumerate() { let mut fname = &*fi.relative_filename; if let Err(e) = files.remove_file(id, fname) { warn!(?fi.relative_filename, error=?e, "could not delete file"); diff --git a/crates/librqbit/src/session_persistence/json.rs b/crates/librqbit/src/session_persistence/json.rs index cda4e69..1314ebb 100644 --- a/crates/librqbit/src/session_persistence/json.rs +++ b/crates/librqbit/src/session_persistence/json.rs @@ -150,7 +150,14 @@ impl JsonSessionPersistenceStore { output_folder: torrent.shared().options.output_folder.clone(), }; - if write_torrent_file && !torrent.shared().torrent_bytes.is_empty() { + let torrent_bytes = torrent + .metadata + .load() + .as_ref() + .map(|i| i.torrent_bytes.clone()) + .unwrap_or_default(); + + if write_torrent_file && !torrent_bytes.is_empty() { let torrent_bytes_file = self.torrent_bytes_filename(&torrent.info_hash()); match tokio::fs::OpenOptions::new() .create(true) @@ -160,7 +167,7 @@ impl JsonSessionPersistenceStore { .await { Ok(mut f) => { - if let Err(e) = f.write_all(&torrent.shared().torrent_bytes).await { + if let Err(e) = f.write_all(&torrent_bytes).await { warn!(error=?e, file=?torrent_bytes_file, "error writing torrent bytes") } } diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index 62d31a1..b70bbb1 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -96,14 +96,19 @@ impl SessionPersistenceStore for PostgresSessionStorage { } async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> { - let torrent_bytes: &[u8] = &torrent.shared().torrent_bytes; + let torrent_bytes = torrent + .metadata + .load() + .as_ref() + .map(|i| i.torrent_bytes.clone()) + .unwrap_or_default(); let q = "INSERT INTO torrents (id, info_hash, torrent_bytes, trackers, output_folder, only_files, is_paused) VALUES($1, $2, $3, $4, $5, $6, $7) ON CONFLICT(id) DO NOTHING"; sqlx::query(q) .bind::(id.try_into()?) .bind(&torrent.info_hash().0[..]) - .bind(torrent_bytes) + .bind(torrent_bytes.as_ref()) .bind( torrent .shared() diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index 2f2cc14..d4051a7 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -6,7 +6,10 @@ use std::{ use anyhow::Context; use tracing::warn; -use crate::{storage::StorageFactoryExt, torrent_state::ManagedTorrentShared}; +use crate::{ + storage::StorageFactoryExt, + torrent_state::{ManagedTorrentShared, TorrentMetadata}, +}; use crate::storage::{StorageFactory, TorrentStorage}; @@ -18,9 +21,13 @@ pub struct FilesystemStorageFactory {} impl StorageFactory for FilesystemStorageFactory { type Storage = FilesystemStorage; - fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result { + fn create( + &self, + shared: &ManagedTorrentShared, + _metadata: &TorrentMetadata, + ) -> anyhow::Result { Ok(FilesystemStorage { - output_folder: meta.options.output_folder.clone(), + output_folder: shared.options.output_folder.clone(), opened_files: Default::default(), }) } @@ -149,9 +156,13 @@ impl TorrentStorage for FilesystemStorage { } } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { let mut files = Vec::::new(); - for file_details in meta.file_infos.iter() { + for file_details in metadata.file_infos.iter() { let mut full_path = self.output_folder.clone(); let relative_path = &file_details.relative_filename; full_path.push(relative_path); @@ -161,7 +172,7 @@ impl TorrentStorage for FilesystemStorage { continue; }; std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; - let f = if meta.options.allow_overwrite { + let f = if shared.options.allow_overwrite { OpenOptions::new() .create(true) .truncate(false) diff --git a/crates/librqbit/src/storage/filesystem/mmap.rs b/crates/librqbit/src/storage/filesystem/mmap.rs index 5b92657..a1824c6 100644 --- a/crates/librqbit/src/storage/filesystem/mmap.rs +++ b/crates/librqbit/src/storage/filesystem/mmap.rs @@ -4,7 +4,7 @@ use anyhow::Context; use memmap2::{MmapMut, MmapOptions}; use parking_lot::RwLock; -use crate::torrent_state::ManagedTorrentShared; +use crate::torrent_state::{ManagedTorrentShared, TorrentMetadata}; use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; @@ -22,8 +22,12 @@ fn dummy_mmap() -> anyhow::Result { impl StorageFactory for MmapFilesystemStorageFactory { type Storage = MmapFilesystemStorage; - fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result { - let fs_storage = FilesystemStorageFactory::default().create(meta)?; + fn create( + &self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { + let fs_storage = FilesystemStorageFactory::default().create(shared, metadata)?; Ok(MmapFilesystemStorage { opened_mmaps: Vec::new(), @@ -97,13 +101,17 @@ impl TorrentStorage for MmapFilesystemStorage { })) } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { - self.fs.init(meta)?; + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + self.fs.init(shared, metadata)?; let mut mmaps = Vec::new(); for (idx, file) in self.fs.opened_files.iter().enumerate() { let fg = file.file.write(); let fg = fg.as_ref().context("file is None")?; - fg.set_len(meta.file_infos[idx].len) + fg.set_len(metadata.file_infos[idx].len) .context("mmap storage: error setting length")?; let mmap = unsafe { MmapOptions::new().map_mut(fg) }.context("error mapping file")?; mmaps.push(RwLock::new(mmap)); diff --git a/crates/librqbit/src/storage/middleware/slow.rs b/crates/librqbit/src/storage/middleware/slow.rs index 5dbdf8a..0629b00 100644 --- a/crates/librqbit/src/storage/middleware/slow.rs +++ b/crates/librqbit/src/storage/middleware/slow.rs @@ -16,6 +16,7 @@ use parking_lot::Mutex; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, + torrent_state::TorrentMetadata, ManagedTorrentShared, }; @@ -35,9 +36,13 @@ impl SlowStorageFactory { impl StorageFactory for SlowStorageFactory { type Storage = SlowStorage; - fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { + fn create( + &self, + shared: &crate::ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { Ok(SlowStorage { - underlying: self.underlying_factory.create(info)?, + underlying: self.underlying_factory.create(shared, metadata)?, pwrite_all_bufread: Mutex::new(Box::new( BufReader::new( File::open( @@ -116,7 +121,11 @@ impl TorrentStorage for SlowStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { - self.underlying.init(meta) + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + self.underlying.init(shared, metadata) } } diff --git a/crates/librqbit/src/storage/middleware/timing.rs b/crates/librqbit/src/storage/middleware/timing.rs index 42c3382..379067b 100644 --- a/crates/librqbit/src/storage/middleware/timing.rs +++ b/crates/librqbit/src/storage/middleware/timing.rs @@ -4,6 +4,7 @@ A storage middleware that logs the time underlying storage operations took. use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, + torrent_state::TorrentMetadata, ManagedTorrentShared, }; @@ -25,10 +26,14 @@ impl TimingStorageFactory { impl StorageFactory for TimingStorageFactory { type Storage = TimingStorage; - fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { + fn create( + &self, + shared: &crate::ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { Ok(TimingStorage { name: self.name.clone(), - underlying: self.underlying_factory.create(info)?, + underlying: self.underlying_factory.create(shared, metadata)?, }) } @@ -104,7 +109,11 @@ impl TorrentStorage for TimingStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { - self.underlying.init(meta) + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + self.underlying.init(shared, metadata) } } diff --git a/crates/librqbit/src/storage/middleware/write_through_cache.rs b/crates/librqbit/src/storage/middleware/write_through_cache.rs index 59e8cb0..48d7070 100644 --- a/crates/librqbit/src/storage/middleware/write_through_cache.rs +++ b/crates/librqbit/src/storage/middleware/write_through_cache.rs @@ -14,6 +14,7 @@ use parking_lot::RwLock; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, + torrent_state::TorrentMetadata, FileInfos, ManagedTorrentShared, }; @@ -35,18 +36,22 @@ impl WriteThroughCacheStorageFactory { impl StorageFactory for WriteThroughCacheStorageFactory { type Storage = WriteThroughCacheStorage; - fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { + fn create( + &self, + shared: &crate::ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { let pieces = self .max_cache_bytes - .div_ceil(info.lengths.default_piece_length() as u64) + .div_ceil(metadata.lengths.default_piece_length() as u64) .try_into()?; let pieces = NonZeroUsize::new(pieces).context("bug: pieces == 0")?; let lru = RwLock::new(LruCache::new(pieces)); Ok(WriteThroughCacheStorage { lru, - underlying: self.underlying.create(info)?, - lengths: info.lengths, - file_infos: info.file_infos.clone(), + underlying: self.underlying.create(shared, metadata)?, + lengths: metadata.lengths, + file_infos: metadata.file_infos.clone(), }) } @@ -121,7 +126,11 @@ impl TorrentStorage for WriteThroughCacheStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { - self.underlying.init(meta) + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + self.underlying.init(shared, metadata) } } diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index efefa6e..e30cbbd 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -13,15 +13,23 @@ use std::{ use librqbit_core::lengths::ValidPieceIndex; -use crate::torrent_state::ManagedTorrentShared; +use crate::torrent_state::{ManagedTorrentShared, TorrentMetadata}; pub trait StorageFactory: Send + Sync + Any { type Storage: TorrentStorage; - fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result; - fn create_and_init(&self, info: &ManagedTorrentShared) -> anyhow::Result { - let mut storage = self.create(info)?; - storage.init(info)?; + fn create( + &self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result; + fn create_and_init( + &self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { + let mut storage = self.create(shared, metadata)?; + storage.init(shared, metadata)?; Ok(storage) } @@ -46,8 +54,12 @@ impl StorageFactoryExt for SF { impl StorageFactory for Wrapper { type Storage = Box; - fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result { - let s = self.sf.create(info)?; + fn create( + &self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { + let s = self.sf.create(shared, metadata)?; Ok(Box::new(s)) } @@ -67,8 +79,12 @@ impl StorageFactoryExt for SF { impl StorageFactory for Box { type Storage = U::Storage; - fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result { - (**self).create(info) + fn create( + &self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result { + (**self).create(shared, metadata) } fn clone_box(&self) -> BoxStorageFactory { @@ -78,7 +94,11 @@ impl StorageFactory for Box { pub trait TorrentStorage: Send + Sync { // Create/open files etc. - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()>; + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()>; /// Given a file_id (which you can get more info from in init_storage() through torrent info) /// read buf.len() bytes into buf at offset. @@ -132,8 +152,12 @@ impl TorrentStorage for Box { (**self).remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { - (**self).init(meta) + fn init( + &mut self, + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + (**self).init(shared, metadata) } fn on_piece_completed(&self, piece_id: ValidPieceIndex) -> anyhow::Result<()> { diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index b81ccb7..9b96a88 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -24,11 +24,12 @@ use crate::{ FileInfos, }; -use super::{paused::TorrentStatePaused, ManagedTorrentShared}; +use super::{paused::TorrentStatePaused, ManagedTorrentShared, TorrentMetadata}; pub struct TorrentStateInitializing { pub(crate) files: FileStorage, pub(crate) shared: Arc, + pub(crate) metadata: Arc, pub(crate) only_files: Option>, pub(crate) checked_bytes: AtomicU64, previously_errored: bool, @@ -54,13 +55,15 @@ fn compute_selected_pieces( impl TorrentStateInitializing { pub fn new( - meta: Arc, + shared: Arc, + metadata: Arc, only_files: Option>, files: FileStorage, previously_errored: bool, ) -> Self { Self { - shared: meta, + shared, + metadata, only_files, files, checked_bytes: AtomicU64::new(0), @@ -80,7 +83,7 @@ impl TorrentStateInitializing { ) -> Option> { let hp = have_pieces?; let actual = hp.as_bytes().len(); - let expected = self.shared.lengths.piece_bitfield_bytes(); + let expected = self.metadata.lengths.piece_bitfield_bytes(); if actual != expected { warn!( actual, @@ -92,21 +95,21 @@ impl TorrentStateInitializing { let is_broken = self.shared.spawner.spawn_block_in_place(|| { let fo = crate::file_ops::FileOps::new( - &self.shared.info, + &self.metadata.info, &self.files, - &self.shared.file_infos, - &self.shared.lengths, + &self.metadata.file_infos, + &self.metadata.lengths, ); use rand::seq::SliceRandom; let mut to_validate = BF::from_boxed_slice( - vec![0u8; self.shared.lengths.piece_bitfield_bytes()].into_boxed_slice(), + vec![0u8; self.metadata.lengths.piece_bitfield_bytes()].into_boxed_slice(), ); let mut queue = hp.as_slice().to_owned(); // Validate at least one piece from each file, if we claim we have it. - for fi in self.shared.file_infos.iter() { + for fi in self.metadata.file_infos.iter() { let prange = fi.piece_range_usize(); let offset = prange.start; for piece_id in hp @@ -136,7 +139,7 @@ impl TorrentStateInitializing { for (id, piece_id) in to_validate .iter_ones() .filter_map(|id| { - self.shared + self.metadata .lengths .validate_piece_index(id.try_into().ok()?) }) @@ -147,10 +150,10 @@ impl TorrentStateInitializing { } #[allow(clippy::cast_possible_truncation)] - let progress = (self.shared.lengths.total_length() as f64 + let progress = (self.metadata.lengths.total_length() as f64 / to_validate_count as f64 * (id + 1) as f64) as u64; - let progress = progress.min(self.shared.lengths.total_length()); + let progress = progress.min(self.metadata.lengths.total_length()); self.checked_bytes.store(progress, Ordering::Relaxed); } @@ -198,10 +201,10 @@ impl TorrentStateInitializing { info!("Doing initial checksum validation, this might take a while..."); let have_pieces = self.shared.spawner.spawn_block_in_place(|| { FileOps::new( - &self.shared.info, + &self.metadata.info, &self.files, - &self.shared.file_infos, - &self.shared.lengths, + &self.metadata.file_infos, + &self.metadata.lengths, ) .initial_check(&self.checked_bytes) })?; @@ -213,16 +216,16 @@ impl TorrentStateInitializing { }; let selected_pieces = compute_selected_pieces( - &self.shared.lengths, + &self.metadata.lengths, self.only_files.as_deref(), - &self.shared.file_infos, + &self.metadata.file_infos, ); let chunk_tracker = ChunkTracker::new( have_pieces.into_dyn(), selected_pieces, - self.shared.lengths, - &self.shared.file_infos, + self.metadata.lengths, + &self.metadata.file_infos, ) .context("error creating chunk tracker")?; @@ -237,7 +240,7 @@ impl TorrentStateInitializing { // Ensure file lenghts are correct, and reopen read-only. self.shared.spawner.spawn_block_in_place(|| { - for (idx, fi) in self.shared.file_infos.iter().enumerate() { + for (idx, fi) in self.metadata.file_infos.iter().enumerate() { if self .only_files .as_ref() @@ -268,6 +271,7 @@ impl TorrentStateInitializing { let paused = TorrentStatePaused { shared: self.shared.clone(), + metadata: self.metadata.clone(), files: self.files.take()?, chunk_tracker, streams: Arc::new(Default::default()), diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 159bb60..0db812a 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -109,7 +109,7 @@ use super::{ paused::TorrentStatePaused, streaming::TorrentStreams, utils::{timeit, TimedExistence}, - ManagedTorrentShared, + ManagedTorrentShared, TorrentMetadata, }; #[derive(Debug)] @@ -175,7 +175,8 @@ const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024; pub struct TorrentStateLive { peers: PeerStates, - torrent: Arc, + shared: Arc, + metadata: Arc, locked: RwLock, pub(crate) files: FileStorage, @@ -231,11 +232,11 @@ impl TorrentStateLive { // TODO: make it configurable let file_priorities = { - let mut pri = (0..paused.shared.file_infos.len()).collect::>(); + let mut pri = (0..paused.metadata.file_infos.len()).collect::>(); // sort by filename, cause many torrents have random sort order. pri.sort_unstable_by_key(|id| { paused - .shared + .metadata .file_infos .get(*id) .map(|fi| fi.relative_filename.as_path()) @@ -252,7 +253,8 @@ impl TorrentStateLive { let ratelimits = Limits::new(paused.shared.options.ratelimits); let state = Arc::new(TorrentStateLive { - torrent: paused.shared.clone(), + shared: paused.shared.clone(), + metadata: paused.metadata.clone(), peers: PeerStates { session_stats: session_stats.clone(), stats: Default::default(), @@ -291,7 +293,7 @@ impl TorrentStateLive { }); state.spawn( - error_span!(parent: state.torrent.span.clone(), "speed_estimator_updater"), + error_span!(parent: state.shared.span.clone(), "speed_estimator_updater"), { let state = Arc::downgrade(&state); async move { @@ -317,12 +319,12 @@ impl TorrentStateLive { ); state.spawn( - error_span!(parent: state.torrent.span.clone(), "peer_adder"), + error_span!(parent: state.shared.span.clone(), "peer_adder"), state.clone().task_peer_adder(peer_queue_rx), ); state.spawn( - error_span!(parent: state.torrent.span.clone(), "upload_scheduler"), + error_span!(parent: state.shared.span.clone(), "upload_scheduler"), state.clone().task_upload_scheduler(ratelimit_upload_rx), ); Ok(state) @@ -346,7 +348,7 @@ impl TorrentStateLive { } fn disk_work_tx(&self) -> Option<&DiskWorkQueueSender> { - self.torrent.options.disk_write_queue.as_ref() + self.shared.options.disk_write_queue.as_ref() } pub(crate) fn add_incoming_peer( @@ -394,7 +396,7 @@ impl TorrentStateLive { self.spawn( error_span!( - parent: self.torrent.span.clone(), + parent: self.shared.span.clone(), "manage_incoming_peer", addr = %checked_peer.addr ), @@ -416,7 +418,7 @@ impl TorrentStateLive { self.ratelimits .prepare_for_upload(NonZeroU32::new(ci.size).unwrap()) .await?; - if let Some(session) = self.torrent.session.upgrade() { + if let Some(session) = self.shared.session.upgrade() { session .ratelimits .prepare_for_upload(NonZeroU32::new(ci.size).unwrap()) @@ -449,18 +451,18 @@ impl TorrentStateLive { first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { - connect_timeout: self.torrent.options.peer_connect_timeout, - read_write_timeout: self.torrent.options.peer_read_write_timeout, + connect_timeout: self.shared.options.peer_connect_timeout, + read_write_timeout: self.shared.options.peer_read_write_timeout, ..Default::default() }; let peer_connection = PeerConnection::new( checked_peer.addr, - self.torrent.info_hash, - self.torrent.peer_id, + self.shared.info_hash, + self.shared.peer_id, &handler, Some(options), - self.torrent.spawner, - self.torrent.connector.clone(), + self.shared.spawner, + self.shared.connector.clone(), ); let requester = handler.task_peer_chunk_requester(); @@ -514,18 +516,18 @@ impl TorrentStateLive { first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { - connect_timeout: state.torrent.options.peer_connect_timeout, - read_write_timeout: state.torrent.options.peer_read_write_timeout, + connect_timeout: state.shared.options.peer_connect_timeout, + read_write_timeout: state.shared.options.peer_read_write_timeout, ..Default::default() }; let peer_connection = PeerConnection::new( addr, - state.torrent.info_hash, - state.torrent.peer_id, + state.shared.info_hash, + state.shared.peer_id, &handler, Some(options), - state.torrent.spawner, - state.torrent.connector.clone(), + state.shared.spawner, + state.shared.connector.clone(), ); let requester = aframe!(handler .task_peer_chunk_requester() @@ -564,7 +566,7 @@ impl TorrentStateLive { let state = self; loop { let addr = peer_queue_rx.recv().await.context("torrent closed")?; - if state.torrent.options.disable_upload() && state.is_finished_and_no_active_streams() { + if state.shared.options.disable_upload() && state.is_finished_and_no_active_streams() { debug!("ignoring peer {} as we are finished", addr); state.peers.mark_peer_not_needed(addr); continue; @@ -572,30 +574,30 @@ impl TorrentStateLive { let permit = state.peer_semaphore.clone().acquire_owned().await?; state.spawn( - error_span!(parent: state.torrent.span.clone(), "manage_peer", peer = addr.to_string()), + error_span!(parent: state.shared.span.clone(), "manage_peer", peer = addr.to_string()), aframe!(state.clone().task_manage_outgoing_peer(addr, permit)), ); } } pub fn torrent(&self) -> &ManagedTorrentShared { - &self.torrent + &self.shared } pub fn info(&self) -> &TorrentMetaV1Info { - &self.torrent.info + &self.metadata.info } pub fn info_hash(&self) -> Id20 { - self.torrent.info_hash + self.shared.info_hash } pub fn peer_id(&self) -> Id20 { - self.torrent.peer_id + self.shared.peer_id } pub(crate) fn file_ops(&self) -> FileOps<'_> { FileOps::new( - &self.torrent.info, + &self.metadata.info, &*self.files, - &self.torrent().file_infos, + &self.metadata.file_infos, &self.lengths, ) } @@ -703,7 +705,8 @@ impl TorrentStateLive { // g.chunks; Ok(TorrentStatePaused { - shared: self.torrent.clone(), + shared: self.shared.clone(), + metadata: self.metadata.clone(), files: self.files.take()?, chunk_tracker, streams: self.streams.clone(), @@ -727,7 +730,7 @@ impl TorrentStateLive { let mut g = self.lock_write("update_only_files"); let ct = g.get_chunks_mut()?; let hns = - ct.update_only_files(self.torrent().file_infos.iter().map(|f| f.len), only_files)?; + ct.update_only_files(self.metadata.file_infos.iter().map(|f| f.len), only_files)?; if !hns.finished() { self.reconnect_all_not_needed_peers(); } @@ -746,7 +749,7 @@ impl TorrentStateLive { }; self.streams .streamed_file_ids() - .any(|file_id| !chunks.is_file_finished(&self.torrent.file_infos[file_id])) + .any(|file_id| !chunks.is_file_finished(&self.metadata.file_infos[file_id])) } // We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite @@ -768,7 +771,7 @@ impl TorrentStateLive { // if we have all the pieces of the file, reopen it read only for (idx, file_info) in self - .torrent() + .metadata .file_infos .iter() .enumerate() @@ -779,9 +782,9 @@ impl TorrentStateLive { } self.streams - .wake_streams_on_piece_completed(id, &self.torrent.lengths); + .wake_streams_on_piece_completed(id, &self.metadata.lengths); - locked.unflushed_bitv_bytes += self.torrent.lengths.piece_length(id) as u64; + locked.unflushed_bitv_bytes += self.metadata.lengths.piece_length(id) as u64; if locked.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES { locked.try_flush_bitv() } @@ -1021,7 +1024,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { if let Some(_peer_pex_msg_id) = hs.ut_pex() { self.state.clone().spawn( error_span!( - parent: self.state.torrent.span.clone(), + parent: self.state.shared.span.clone(), "sending_pex_to_peer", peer = self.addr.to_string() ), @@ -1054,7 +1057,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { } fn should_transmit_have(&self, id: ValidPieceIndex) -> bool { - if self.state.torrent.options.disable_upload() { + if self.state.shared.options.disable_upload() { return false; } let have = self @@ -1071,7 +1074,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { &self, handshake: &mut ExtendedHandshake, ) -> anyhow::Result<()> { - let info_bytes = &self.state.torrent().info_bytes; + let info_bytes = &self.state.metadata.info_bytes; if !info_bytes.is_empty() { if let Ok(len) = info_bytes.len().try_into() { handshake.metadata_size = Some(len); @@ -1159,7 +1162,7 @@ impl PeerHandler { if let Some(dur) = backoff { self.state.clone().spawn( error_span!( - parent: self.state.torrent.span.clone(), + parent: self.state.shared.span.clone(), "wait_for_peer", peer = handle.to_string(), duration = format!("{dur:?}") @@ -1218,7 +1221,7 @@ impl PeerHandler { && !g.inflight_pieces.contains_key(pid) }); let natural_order_pieces = chunk_tracker - .iter_queued_pieces(&g.file_priorities, &self.state.torrent().file_infos); + .iter_queued_pieces(&g.file_priorities, &self.state.metadata.file_infos); for n in priority_streamed_pieces.chain(natural_order_pieces) { if bf.get(n.get() as usize).map(|v| *v) == Some(true) { n_opt = Some(n); @@ -1787,7 +1790,7 @@ impl PeerHandler { dtx.send(Box::new(work)).await?; } else { self.state - .torrent + .shared .spawner .spawn_block_in_place(|| { write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info) @@ -1799,7 +1802,7 @@ impl PeerHandler { } fn send_metadata_piece(&self, piece_id: u32) -> anyhow::Result<()> { - let data = &self.state.torrent().info_bytes; + let data = &self.state.metadata.info_bytes; let metadata_size = data.len(); if metadata_size == 0 { anyhow::bail!("peer requested for info metadata but we don't have it") diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 32246e5..15ec11f 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -15,6 +15,7 @@ use std::time::Duration; use anyhow::bail; use anyhow::Context; +use arc_swap::ArcSwapOption; use buffers::ByteBufOwned; use bytes::Bytes; use futures::future::BoxFuture; @@ -37,6 +38,7 @@ use tracing::trace; use tracing::warn; use crate::chunk_tracker::ChunkTracker; +use crate::file_info::FileInfo; use crate::limits::LimitsConfig; use crate::session::TorrentId; use crate::spawn_utils::BlockingSpawner; @@ -123,6 +125,44 @@ impl ManagedTorrentOptions { } } +// Torrent bencodee "info" + some precomputed fields based on it for frequent access. +pub struct TorrentMetadata { + pub info: TorrentMetaV1Info, + pub torrent_bytes: Bytes, + pub info_bytes: Bytes, + pub lengths: Lengths, + pub file_infos: FileInfos, +} + +impl TorrentMetadata { + pub(crate) fn new( + info: TorrentMetaV1Info, + torrent_bytes: Bytes, + info_bytes: Bytes, + ) -> anyhow::Result { + let lengths = Lengths::from_torrent(&info)?; + let file_infos = info + .iter_file_details_ext(&lengths)? + .map(|fd| { + Ok::<_, anyhow::Error>(FileInfo { + relative_filename: fd.details.filename.to_pathbuf()?, + offset_in_torrent: fd.offset, + piece_range: fd.pieces, + len: fd.details.len, + attrs: fd.details.attrs(), + }) + }) + .collect::>>()?; + Ok(Self { + info, + torrent_bytes, + info_bytes, + lengths, + file_infos, + }) + } +} + /// Common information about torrent shared among all possible states. /// // The reason it's not inlined into ManagedTorrent is to break the Arc cycle: @@ -130,15 +170,10 @@ impl ManagedTorrentOptions { // of stuff, but it shouldn't access the state. pub struct ManagedTorrentShared { pub id: TorrentId, - pub info: TorrentMetaV1Info, - pub torrent_bytes: Bytes, - pub info_bytes: Bytes, pub info_hash: Id20, pub(crate) spawner: BlockingSpawner, pub trackers: HashSet, pub peer_id: Id20, - pub lengths: Lengths, - pub file_infos: FileInfos, pub span: tracing::Span, pub(crate) options: ManagedTorrentOptions, pub(crate) connector: Arc, @@ -148,6 +183,7 @@ pub struct ManagedTorrentShared { pub struct ManagedTorrent { pub shared: Arc, + pub metadata: ArcSwapOption, pub(crate) state_change_notify: Notify, pub(crate) locked: RwLock, } @@ -161,8 +197,13 @@ impl ManagedTorrent { &self.shared } - pub fn get_total_bytes(&self) -> u64 { - self.shared.lengths.total_length() + pub fn with_metadata( + &self, + mut f: impl FnMut(&Arc) -> R, + ) -> anyhow::Result { + let r = self.metadata.load(); + let r = r.as_ref().context("torrent is not resolved")?; + Ok(f(r)) } pub fn info_hash(&self) -> Id20 { @@ -384,10 +425,14 @@ impl ManagedTorrent { Ok(()) } ManagedTorrentState::Error(_) => { + let metadata = self.metadata.load_full().expect("TODO"); let initializing = Arc::new(TorrentStateInitializing::new( self.shared.clone(), + metadata.clone(), g.only_files.clone(), - self.shared.storage_factory.create_and_init(self.shared())?, + self.shared + .storage_factory + .create_and_init(self.shared(), &metadata)?, true, )); g.state = ManagedTorrentState::Initializing(initializing.clone()); @@ -433,7 +478,12 @@ impl ManagedTorrent { pub fn stats(&self) -> TorrentStats { use stats::TorrentStatsState as S; let mut resp = TorrentStats { - total_bytes: self.shared().lengths.total_length(), + total_bytes: self + .metadata + .load() + .as_ref() + .map(|r| r.lengths.total_length()) + .unwrap_or_default(), file_progress: Vec::new(), state: S::Error, error: None, @@ -534,7 +584,9 @@ impl ManagedTorrent { // Returns true if needed to unpause torrent. // This is just implementation detail - it's easier to pause/unpause than to tinker with internals. pub(crate) fn update_only_files(&self, only_files: &HashSet) -> anyhow::Result<()> { - let file_count = self.shared().info.iter_file_lengths()?.count(); + let metadata = self.metadata.load(); + let metadata = metadata.as_ref().context("torrent is not resolved")?; + let file_count = metadata.file_infos.len(); for f in only_files.iter().copied() { if f >= file_count { anyhow::bail!("only_files contains invalid value {f}") diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index 9c78a26..52e8d6a 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -5,10 +5,11 @@ use crate::{ type_aliases::FileStorage, }; -use super::{streaming::TorrentStreams, ManagedTorrentShared}; +use super::{streaming::TorrentStreams, ManagedTorrentShared, TorrentMetadata}; pub struct TorrentStatePaused { pub(crate) shared: Arc, + pub(crate) metadata: Arc, pub(crate) files: FileStorage, pub(crate) chunk_tracker: ChunkTracker, pub(crate) streams: Arc, @@ -17,7 +18,7 @@ pub struct TorrentStatePaused { impl TorrentStatePaused { pub(crate) fn update_only_files(&mut self, only_files: &HashSet) -> anyhow::Result<()> { self.chunk_tracker - .update_only_files(self.shared.info.iter_file_lengths()?, only_files)?; + .update_only_files(self.metadata.info.iter_file_lengths()?, only_files)?; Ok(()) } diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 73fb9ea..3e7d8cb 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -19,7 +19,7 @@ use crate::{ file_info::FileInfo, spawn_utils::BlockingSpawner, storage::TorrentStorage, ManagedTorrent, }; -use super::ManagedTorrentHandle; +use super::{ManagedTorrentHandle, TorrentMetadata}; type StreamId = usize; @@ -130,6 +130,7 @@ impl TorrentStreams { pub struct FileStream { torrent: ManagedTorrentHandle, + metadata: Arc, streams: Arc, stream_id: usize, file_id: usize, @@ -178,8 +179,7 @@ impl AsyncRead for FileStream { } let current = poll_try_io!(self - .torrent - .shared() + .metadata .lengths .compute_current_piece(self.position, self.file_torrent_abs_offset) .context("invalid position")); @@ -216,11 +216,14 @@ impl AsyncRead for FileStream { ); poll_try_io!(poll_try_io!(self.spawner.spawn_block_in_place(|| { - self.torrent - .with_storage_and_file(self.file_id, |files, _fi| { + self.torrent.with_storage_and_file( + self.file_id, + |files, _fi| { files.pread_exact(self.file_id, self.position, buf)?; Ok::<_, anyhow::Error>(()) - }) + }, + &self.metadata, + ) }))); self.as_mut().advance(bytes_to_read as u64); @@ -269,7 +272,12 @@ impl Drop for FileStream { } impl ManagedTorrent { - fn with_storage_and_file(&self, file_id: usize, f: F) -> anyhow::Result + fn with_storage_and_file( + &self, + file_id: usize, + f: F, + metadata: &TorrentMetadata, + ) -> anyhow::Result where F: FnOnce(&dyn TorrentStorage, &FileInfo) -> R, { @@ -279,11 +287,7 @@ impl ManagedTorrent { crate::ManagedTorrentState::Live(l) => &*l.files, s => anyhow::bail!("with_storage_and_file: invalid state: {}", s.name()), }; - let fi = self - .shared() - .file_infos - .get(file_id) - .context("invalid file")?; + let fi = metadata.file_infos.get(file_id).context("invalid file")?; Ok(f(files, fi)) }) } @@ -310,14 +314,26 @@ impl ManagedTorrent { } fn is_file_finished(&self, file_id: usize) -> bool { + let metadata = self.metadata.load(); + let metadata = match metadata.as_ref() { + Some(r) => r, + None => return false, + }; // TODO: would be nice to remove locking - self.with_chunk_tracker(|ct| ct.is_file_finished(&self.shared.file_infos[file_id])) + self.with_chunk_tracker(|ct| ct.is_file_finished(&metadata.file_infos[file_id])) .unwrap_or(false) } pub fn stream(self: Arc, file_id: usize) -> anyhow::Result { - let (fd_len, fd_offset) = - self.with_storage_and_file(file_id, |_fd, fi| (fi.len, fi.offset_in_torrent))?; + let metadata = self + .metadata + .load_full() + .context("torrent metadata is not resolved")?; + let (fd_len, fd_offset) = self.with_storage_and_file( + file_id, + |_fd, fi| (fi.len, fi.offset_in_torrent), + &metadata, + )?; let streams = self.streams()?; let s = FileStream { stream_id: streams.next_id(), @@ -329,6 +345,7 @@ impl ManagedTorrent { file_torrent_abs_offset: fd_offset, torrent: self, spawner: BlockingSpawner::default(), + metadata, }; s.torrent.maybe_reconnect_needed_peers_for_file(file_id); streams.streams.insert( diff --git a/crates/librqbit/src/upnp_server_adapter.rs b/crates/librqbit/src/upnp_server_adapter.rs index ab60f37..1885a6e 100644 --- a/crates/librqbit/src/upnp_server_adapter.rs +++ b/crates/librqbit/src/upnp_server_adapter.rs @@ -6,7 +6,7 @@ use std::{ sync::Arc, }; -use crate::{session::TorrentId, ManagedTorrent, Session}; +use crate::{session::TorrentId, torrent_state::TorrentMetadata, ManagedTorrentShared, Session}; #[derive(Clone)] pub struct UpnpServerSessionAdapter { @@ -55,18 +55,18 @@ impl TorrentFileTreeNode { &self, id: usize, http_host: &str, - torrent: &ManagedTorrent, + torrent: &ManagedTorrentShared, + metadata: &TorrentMetadata, adapter: &UpnpServerSessionAdapter, ) -> ItemOrContainer { - let encoded_id = encode_id(id, torrent.id()); - let encoded_parent_id = self.parent_id.map(|p| encode_id(p, torrent.id())); + let encoded_id = encode_id(id, torrent.id); + let encoded_parent_id = self.parent_id.map(|p| encode_id(p, torrent.id)); match self.real_torrent_file_id { Some(fid) => { - let fi = &torrent.shared().file_infos[fid]; + let fi = &metadata.file_infos[fid]; let filename = &fi.relative_filename; // Torrent path joined with "/" - let last_url_bit = torrent - .shared() + let last_url_bit = metadata .info .iter_file_details() .ok() @@ -86,11 +86,7 @@ impl TorrentFileTreeNode { mime_type: mime_guess::from_path(filename).first(), url: format!( "http://{}:{}/torrents/{}/stream/{}/{}", - http_host, - adapter.port, - torrent.id(), - fid, - last_url_bit + http_host, adapter.port, torrent.id, fid, last_url_bit ), size: fi.len, }) @@ -216,10 +212,15 @@ impl UpnpServerSessionAdapter { .filter_map(|t| { let real_id = t.id(); let upnp_id = real_id + 1; + let metadata = t.metadata.load(); + let metadata = match metadata.as_ref() { + Some(r) => r, + None => return None, + }; - if is_single_file_at_root(&t.shared().info) { + if is_single_file_at_root(&metadata.info) { // Just add the file directly - let rf = &t.shared().file_infos[0].relative_filename; + let rf = &metadata.file_infos[0].relative_filename; let title = rf.file_name()?.to_str()?.to_owned(); Some( TorrentFileTreeNode { @@ -228,11 +229,16 @@ impl UpnpServerSessionAdapter { children: vec![], real_torrent_file_id: Some(0), } - .as_item_or_container(0, hostname, t, self), + .as_item_or_container( + 0, + hostname, + t.shared(), + metadata, + self, + ), ) } else { - let title = t - .shared() + let title = metadata .info .name .as_ref() @@ -288,7 +294,13 @@ impl UpnpServerSessionAdapter { } }; - let tree = match TorrentFileTree::build(torrent.id(), &torrent.shared().info) { + let t_metadata = torrent.metadata.load(); + let t_metadata = match t_metadata.as_ref() { + Some(r) => r, + None => return vec![], + }; + + let tree = match TorrentFileTree::build(torrent.id(), &t_metadata.info) { Ok(tree) => tree, Err(e) => { warn!(object_id, error=?e, "error building torrent file tree"); @@ -309,7 +321,13 @@ impl UpnpServerSessionAdapter { let mut result = Vec::new(); if node.real_torrent_file_id.is_some() || metadata { - result.push(node.as_item_or_container(node_id, http_hostname, &torrent, self)) + result.push(node.as_item_or_container( + node_id, + http_hostname, + torrent.shared(), + t_metadata, + self, + )) } else { for (child_node_id, child_node) in node .children @@ -319,7 +337,8 @@ impl UpnpServerSessionAdapter { result.push(child_node.as_item_or_container( child_node_id, http_hostname, - &torrent, + torrent.shared(), + t_metadata, self, )); }