diff --git a/crates/librqbit/examples/custom_storage.rs b/crates/librqbit/examples/custom_storage.rs index 2be41c9..f22daa8 100644 --- a/crates/librqbit/examples/custom_storage.rs +++ b/crates/librqbit/examples/custom_storage.rs @@ -20,7 +20,7 @@ struct CustomStorage { impl StorageFactory for CustomStorageFactory { type Storage = CustomStorage; - fn create(&self, _info: &librqbit::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, _info: &librqbit::ManagedTorrentShared) -> anyhow::Result { Ok(CustomStorage::default()) } @@ -54,7 +54,7 @@ impl TorrentStorage for CustomStorage { anyhow::bail!("not implemented") } - fn init(&mut self, _meta: &librqbit::ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, _meta: &librqbit::ManagedTorrentShared) -> anyhow::Result<()> { anyhow::bail!("not implemented") } } diff --git a/crates/librqbit/examples/ubuntu.rs b/crates/librqbit/examples/ubuntu.rs index 6c7f414..7dbd189 100644 --- a/crates/librqbit/examples/ubuntu.rs +++ b/crates/librqbit/examples/ubuntu.rs @@ -49,7 +49,7 @@ async fn main() -> Result<(), anyhow::Error> { _ => unreachable!(), }; - info!("Details: {:?}", &handle.info().info); + info!("Details: {:?}", &handle.shared().info); // Print stats periodically. tokio::spawn({ diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 17e2fcf..65609c3 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -197,7 +197,7 @@ impl Api { torrents .map(|(id, mgr)| TorrentListResponseItem { id, - info_hash: mgr.info().info_hash.as_string(), + info_hash: mgr.shared().info_hash.as_string(), }) .collect() }); @@ -206,9 +206,9 @@ impl Api { pub fn api_torrent_details(&self, idx: TorrentIdOrHash) -> Result { let handle = self.mgr_handle(idx)?; - let info_hash = handle.info().info_hash; + let info_hash = handle.shared().info_hash; let only_files = handle.only_files(); - make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref()) + make_torrent_details(&info_hash, &handle.shared().info, only_files.as_deref()) } pub fn api_session_stats(&self) -> SessionStatsSnapshot { @@ -221,7 +221,7 @@ impl Api { file_idx: usize, ) -> Result<&'static str> { let handle = self.mgr_handle(idx)?; - let info = &handle.info().info; + let info = &handle.shared().info; torrent_file_mime_type(info, file_idx) } @@ -361,7 +361,7 @@ impl Api { AddTorrentResponse::Added(id, handle) => { let details = make_torrent_details( &handle.info_hash(), - &handle.info().info, + &handle.shared().info, handle.only_files().as_deref(), ) .context("error making torrent details")?; @@ -370,7 +370,7 @@ impl Api { details, seen_peers: None, output_folder: handle - .info() + .shared() .options .output_folder .to_string_lossy() diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index f72b95b..fd672d0 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -142,7 +142,7 @@ impl HttpApi { fn torrent_playlist_items(handle: &ManagedTorrent) -> Result> { let mut playlist_items = handle - .info() + .shared() .info .iter_filenames_and_lengths()? .enumerate() @@ -216,8 +216,8 @@ impl HttpApi { .await?; let (info, content) = match added { crate::AddTorrentResponse::AlreadyManaged(_, handle) => ( - handle.info().info.clone(), - handle.info().torrent_bytes.clone(), + handle.shared().info.clone(), + handle.shared().torrent_bytes.clone(), ), crate::AddTorrentResponse::ListOnly(ListOnlyResponse { info, diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 4dda659..6b67b4e 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -77,7 +77,7 @@ pub use session::{ }; pub use spawn_utils::spawn as librqbit_spawn; pub use torrent_state::{ - ManagedTorrent, ManagedTorrentInfo, ManagedTorrentState, TorrentStats, TorrentStatsState, + ManagedTorrent, ManagedTorrentShared, ManagedTorrentState, TorrentStats, TorrentStatsState, }; pub use type_aliases::FileInfos; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index a9263d0..377a744 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -28,7 +28,7 @@ use crate::{ ManagedTorrentOptions, ManagedTorrentState, TorrentStateLive, }, type_aliases::{DiskWorkQueueSender, PeerStream}, - ManagedTorrent, ManagedTorrentInfo, + ManagedTorrent, ManagedTorrentShared, }; use anyhow::{bail, Context}; use bencode::bencode_serialize_to_writer; @@ -1116,7 +1116,7 @@ impl Session { let span = error_span!(parent: self.rs(), "torrent", id); let peer_opts = self.merge_peer_opts(opts.peer_opts); - let minfo = Arc::new(ManagedTorrentInfo { + let minfo = Arc::new(ManagedTorrentShared { id, span, file_infos, @@ -1151,7 +1151,7 @@ impl Session { only_files, }), state_change_notify: Notify::new(), - info: minfo, + shared: minfo, session: Arc::downgrade(self), }); @@ -1181,7 +1181,7 @@ impl Session { ); { - let span = managed_torrent.info.span.clone(); + let span = managed_torrent.shared.span.clone(); let _ = span.enter(); managed_torrent @@ -1257,18 +1257,18 @@ impl Session { _ => None, }) .map(Ok) - .unwrap_or_else(|| removed.info.storage_factory.create(removed.info())); + .unwrap_or_else(|| removed.shared.storage_factory.create(removed.shared())); match (storage, delete_files) { (Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"), (Ok(storage), true) => { debug!("will delete files"); - remove_files_and_dirs(removed.info(), &storage); - if removed.info().options.output_folder != self.output_folder { + remove_files_and_dirs(removed.shared(), &storage); + if removed.shared().options.output_folder != self.output_folder { if let Err(e) = storage.remove_directory_if_empty(Path::new("")) { warn!( "error removing {:?}: {e:?}", - removed.info().options.output_folder + removed.shared().options.output_folder ) } } @@ -1331,9 +1331,9 @@ impl Session { pub async fn unpause(self: &Arc, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { let peer_rx = self.make_peer_rx( handle.info_hash(), - handle.info().trackers.clone().into_iter().collect(), + handle.shared().trackers.clone().into_iter().collect(), self.tcp_listen_port, - handle.info().options.force_tracker_interval, + handle.shared().options.force_tracker_interval, )?; handle.start(peer_rx, false)?; self.try_update_persistence_metadata(handle).await; @@ -1355,7 +1355,7 @@ impl Session { } } -fn remove_files_and_dirs(info: &ManagedTorrentInfo, files: &dyn TorrentStorage) { +fn remove_files_and_dirs(info: &ManagedTorrentShared, files: &dyn TorrentStorage) { let mut all_dirs = HashSet::new(); for (id, fi) in info.file_infos.iter().enumerate() { let mut fname = &*fi.relative_filename; diff --git a/crates/librqbit/src/session_persistence/json.rs b/crates/librqbit/src/session_persistence/json.rs index b8cc7dc..bd2d888 100644 --- a/crates/librqbit/src/session_persistence/json.rs +++ b/crates/librqbit/src/session_persistence/json.rs @@ -128,7 +128,7 @@ impl JsonSessionPersistenceStore { write_torrent_file: bool, ) -> anyhow::Result<()> { if !torrent - .info + .shared .storage_factory .is_type_id(TypeId::of::()) { @@ -137,7 +137,7 @@ impl JsonSessionPersistenceStore { let st = SerializedTorrent { trackers: torrent - .info() + .shared() .trackers .iter() .map(|u| u.to_string()) @@ -147,10 +147,10 @@ impl JsonSessionPersistenceStore { torrent_bytes: Default::default(), only_files: torrent.only_files().clone(), is_paused: torrent.with_state(|s| matches!(s, ManagedTorrentState::Paused(_))), - output_folder: torrent.info().options.output_folder.clone(), + output_folder: torrent.shared().options.output_folder.clone(), }; - if write_torrent_file && !torrent.info().torrent_bytes.is_empty() { + if write_torrent_file && !torrent.shared().torrent_bytes.is_empty() { let torrent_bytes_file = self.torrent_bytes_filename(&torrent.info_hash()); match tokio::fs::OpenOptions::new() .create(true) @@ -160,7 +160,7 @@ impl JsonSessionPersistenceStore { .await { Ok(mut f) => { - if let Err(e) = f.write_all(&torrent.info().torrent_bytes).await { + if let Err(e) = f.write_all(&torrent.shared().torrent_bytes).await { warn!(error=?e, file=?torrent_bytes_file, "error writing torrent bytes") } } diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index 98dd6f3..10d593c 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -96,7 +96,7 @@ impl SessionPersistenceStore for PostgresSessionStorage { } async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> { - let torrent_bytes: &[u8] = &torrent.info().torrent_bytes; + let torrent_bytes: &[u8] = &torrent.shared().torrent_bytes; let q = "INSERT INTO torrents (id, info_hash, torrent_bytes, trackers, output_folder, only_files, is_paused) VALUES($1, $2, $3, $4, $5, $6, $7) ON CONFLICT(id) DO NOTHING"; @@ -104,10 +104,17 @@ impl SessionPersistenceStore for PostgresSessionStorage { .bind::(id.try_into()?) .bind(&torrent.info_hash().0[..]) .bind(torrent_bytes) - .bind(torrent.info().trackers.iter().cloned().collect::>()) .bind( torrent - .info() + .shared() + .trackers + .iter() + .cloned() + .collect::>(), + ) + .bind( + torrent + .shared() .options .output_folder .to_str() diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index 651d2fc..74ac596 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Context; use tracing::warn; -use crate::{storage::StorageFactoryExt, torrent_state::ManagedTorrentInfo}; +use crate::{storage::StorageFactoryExt, torrent_state::ManagedTorrentShared}; use crate::storage::{StorageFactory, TorrentStorage}; @@ -18,7 +18,7 @@ pub struct FilesystemStorageFactory {} impl StorageFactory for FilesystemStorageFactory { type Storage = FilesystemStorage; - fn create(&self, meta: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result { Ok(FilesystemStorage { output_folder: meta.options.output_folder.clone(), opened_files: Default::default(), @@ -149,7 +149,7 @@ impl TorrentStorage for FilesystemStorage { } } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { let mut files = Vec::::new(); for file_details in meta.info.iter_file_details(&meta.lengths)? { let mut full_path = self.output_folder.clone(); diff --git a/crates/librqbit/src/storage/filesystem/mmap.rs b/crates/librqbit/src/storage/filesystem/mmap.rs index 1e68b24..5b92657 100644 --- a/crates/librqbit/src/storage/filesystem/mmap.rs +++ b/crates/librqbit/src/storage/filesystem/mmap.rs @@ -4,7 +4,7 @@ use anyhow::Context; use memmap2::{MmapMut, MmapOptions}; use parking_lot::RwLock; -use crate::torrent_state::ManagedTorrentInfo; +use crate::torrent_state::ManagedTorrentShared; use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; @@ -22,7 +22,7 @@ fn dummy_mmap() -> anyhow::Result { impl StorageFactory for MmapFilesystemStorageFactory { type Storage = MmapFilesystemStorage; - fn create(&self, meta: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result { let fs_storage = FilesystemStorageFactory::default().create(meta)?; Ok(MmapFilesystemStorage { @@ -97,7 +97,7 @@ impl TorrentStorage for MmapFilesystemStorage { })) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.fs.init(meta)?; let mut mmaps = Vec::new(); for (idx, file) in self.fs.opened_files.iter().enumerate() { diff --git a/crates/librqbit/src/storage/middleware/slow.rs b/crates/librqbit/src/storage/middleware/slow.rs index fba5160..5dbdf8a 100644 --- a/crates/librqbit/src/storage/middleware/slow.rs +++ b/crates/librqbit/src/storage/middleware/slow.rs @@ -16,7 +16,7 @@ use parking_lot::Mutex; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, - ManagedTorrentInfo, + ManagedTorrentShared, }; #[derive(Clone)] @@ -35,7 +35,7 @@ impl SlowStorageFactory { impl StorageFactory for SlowStorageFactory { type Storage = SlowStorage; - fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { Ok(SlowStorage { underlying: self.underlying_factory.create(info)?, pwrite_all_bufread: Mutex::new(Box::new( @@ -116,7 +116,7 @@ impl TorrentStorage for SlowStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.underlying.init(meta) } } diff --git a/crates/librqbit/src/storage/middleware/timing.rs b/crates/librqbit/src/storage/middleware/timing.rs index 7079db2..42c3382 100644 --- a/crates/librqbit/src/storage/middleware/timing.rs +++ b/crates/librqbit/src/storage/middleware/timing.rs @@ -4,7 +4,7 @@ A storage middleware that logs the time underlying storage operations took. use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, - ManagedTorrentInfo, + ManagedTorrentShared, }; #[derive(Clone)] @@ -25,7 +25,7 @@ impl TimingStorageFactory { impl StorageFactory for TimingStorageFactory { type Storage = TimingStorage; - fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { Ok(TimingStorage { name: self.name.clone(), underlying: self.underlying_factory.create(info)?, @@ -104,7 +104,7 @@ impl TorrentStorage for TimingStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.underlying.init(meta) } } diff --git a/crates/librqbit/src/storage/middleware/write_through_cache.rs b/crates/librqbit/src/storage/middleware/write_through_cache.rs index 7b979a6..59e8cb0 100644 --- a/crates/librqbit/src/storage/middleware/write_through_cache.rs +++ b/crates/librqbit/src/storage/middleware/write_through_cache.rs @@ -14,7 +14,7 @@ use parking_lot::RwLock; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, - FileInfos, ManagedTorrentInfo, + FileInfos, ManagedTorrentShared, }; #[derive(Clone, Copy)] @@ -35,7 +35,7 @@ impl WriteThroughCacheStorageFactory { impl StorageFactory for WriteThroughCacheStorageFactory { type Storage = WriteThroughCacheStorage; - fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { let pieces = self .max_cache_bytes .div_ceil(info.lengths.default_piece_length() as u64) @@ -121,7 +121,7 @@ impl TorrentStorage for WriteThroughCacheStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.underlying.init(meta) } } diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index 14632cd..ff710d0 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -11,13 +11,13 @@ use std::{ path::Path, }; -use crate::torrent_state::ManagedTorrentInfo; +use crate::torrent_state::ManagedTorrentShared; pub trait StorageFactory: Send + Sync + Any { type Storage: TorrentStorage; - fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result; - fn create_and_init(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result; + fn create_and_init(&self, info: &ManagedTorrentShared) -> anyhow::Result { let mut storage = self.create(info)?; storage.init(info)?; Ok(storage) @@ -44,7 +44,7 @@ impl StorageFactoryExt for SF { impl StorageFactory for Wrapper { type Storage = Box; - fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result { let s = self.sf.create(info)?; Ok(Box::new(s)) } @@ -65,7 +65,7 @@ impl StorageFactoryExt for SF { impl StorageFactory for Box { type Storage = U::Storage; - fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result { (**self).create(info) } @@ -76,7 +76,7 @@ impl StorageFactory for Box { pub trait TorrentStorage: Send + Sync { // Create/open files etc. - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()>; + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()>; /// Given a file_id (which you can get more info from in init_storage() through torrent info) /// read buf.len() bytes into buf at offset. @@ -124,7 +124,7 @@ impl TorrentStorage for Box { (**self).remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { (**self).init(meta) } } diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 5f7fdb8..0c32b4a 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -19,11 +19,11 @@ use crate::{ FileInfos, }; -use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; +use super::{paused::TorrentStatePaused, ManagedTorrentShared}; pub struct TorrentStateInitializing { pub(crate) files: FileStorage, - pub(crate) meta: Arc, + pub(crate) meta: Arc, pub(crate) only_files: Option>, pub(crate) checked_bytes: AtomicU64, } @@ -48,7 +48,7 @@ fn compute_selected_pieces( impl TorrentStateInitializing { pub fn new( - meta: Arc, + meta: Arc, only_files: Option>, files: FileStorage, ) -> Self { diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index e9da863..ee2245c 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -106,7 +106,7 @@ use super::{ paused::TorrentStatePaused, streaming::TorrentStreams, utils::{timeit, TimedExistence}, - ManagedTorrentInfo, + ManagedTorrentShared, }; #[derive(Debug)] @@ -180,7 +180,7 @@ const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024; pub struct TorrentStateLive { peers: PeerStates, - meta: Arc, + torrent: Arc, locked: RwLock, pub(crate) files: FileStorage, @@ -241,7 +241,7 @@ impl TorrentStateLive { let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128); let state = Arc::new(TorrentStateLive { - meta: paused.info.clone(), + torrent: paused.info.clone(), peers: PeerStates { session_stats: session_stats.clone(), stats: Default::default(), @@ -277,7 +277,7 @@ impl TorrentStateLive { }); state.spawn( - error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), + error_span!(parent: state.torrent.span.clone(), "speed_estimator_updater"), { let state = Arc::downgrade(&state); async move { @@ -303,7 +303,7 @@ impl TorrentStateLive { ); state.spawn( - error_span!(parent: state.meta.span.clone(), "peer_adder"), + error_span!(parent: state.torrent.span.clone(), "peer_adder"), state.clone().task_peer_adder(peer_queue_rx), ); Ok(state) @@ -330,7 +330,7 @@ impl TorrentStateLive { } fn disk_work_tx(&self) -> Option<&DiskWorkQueueSender> { - self.meta.options.disk_write_queue.as_ref() + self.torrent.options.disk_write_queue.as_ref() } pub(crate) fn add_incoming_peer( @@ -378,7 +378,7 @@ impl TorrentStateLive { self.spawn( error_span!( - parent: self.meta.span.clone(), + parent: self.torrent.span.clone(), "manage_incoming_peer", addr = %checked_peer.addr ), @@ -410,18 +410,18 @@ impl TorrentStateLive { first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { - connect_timeout: self.meta.options.peer_connect_timeout, - read_write_timeout: self.meta.options.peer_read_write_timeout, + connect_timeout: self.torrent.options.peer_connect_timeout, + read_write_timeout: self.torrent.options.peer_read_write_timeout, ..Default::default() }; let peer_connection = PeerConnection::new( checked_peer.addr, - self.meta.info_hash, - self.meta.peer_id, + self.torrent.info_hash, + self.torrent.peer_id, &handler, Some(options), - self.meta.spawner, - self.meta.connector.clone(), + self.torrent.spawner, + self.torrent.connector.clone(), ); let requester = handler.task_peer_chunk_requester(); @@ -474,18 +474,18 @@ impl TorrentStateLive { first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { - connect_timeout: state.meta.options.peer_connect_timeout, - read_write_timeout: state.meta.options.peer_read_write_timeout, + connect_timeout: state.torrent.options.peer_connect_timeout, + read_write_timeout: state.torrent.options.peer_read_write_timeout, ..Default::default() }; let peer_connection = PeerConnection::new( addr, - state.meta.info_hash, - state.meta.peer_id, + state.torrent.info_hash, + state.torrent.peer_id, &handler, Some(options), - state.meta.spawner, - state.meta.connector.clone(), + state.torrent.spawner, + state.torrent.connector.clone(), ); let requester = aframe!(handler .task_peer_chunk_requester() @@ -532,30 +532,30 @@ impl TorrentStateLive { let permit = state.peer_semaphore.clone().acquire_owned().await?; state.spawn( - error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()), + error_span!(parent: state.torrent.span.clone(), "manage_peer", peer = addr.to_string()), aframe!(state.clone().task_manage_outgoing_peer(addr, permit)), ); } } - pub fn meta(&self) -> &ManagedTorrentInfo { - &self.meta + pub fn torrent(&self) -> &ManagedTorrentShared { + &self.torrent } pub fn info(&self) -> &TorrentMetaV1Info { - &self.meta.info + &self.torrent.info } pub fn info_hash(&self) -> Id20 { - self.meta.info_hash + self.torrent.info_hash } pub fn peer_id(&self) -> Id20 { - self.meta.peer_id + self.torrent.peer_id } pub(crate) fn file_ops(&self) -> FileOps<'_> { FileOps::new( - &self.meta.info, + &self.torrent.info, &*self.files, - &self.meta().file_infos, + &self.torrent().file_infos, &self.lengths, ) } @@ -664,7 +664,7 @@ impl TorrentStateLive { // g.chunks; Ok(TorrentStatePaused { - info: self.meta.clone(), + info: self.torrent.clone(), files: self.files.take()?, chunk_tracker, streams: self.streams.clone(), @@ -687,7 +687,8 @@ impl TorrentStateLive { pub(crate) fn update_only_files(&self, only_files: &HashSet) -> anyhow::Result<()> { let mut g = self.lock_write("update_only_files"); let ct = g.get_chunks_mut()?; - let hns = ct.update_only_files(self.meta().file_infos.iter().map(|f| f.len), only_files)?; + let hns = + ct.update_only_files(self.torrent().file_infos.iter().map(|f| f.len), only_files)?; if !hns.finished() { self.reconnect_all_not_needed_peers(); } @@ -706,7 +707,7 @@ impl TorrentStateLive { }; self.streams .streamed_file_ids() - .any(|file_id| !chunks.is_file_finished(&self.meta.file_infos[file_id])) + .any(|file_id| !chunks.is_file_finished(&self.torrent.file_infos[file_id])) } // We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite @@ -725,7 +726,7 @@ impl TorrentStateLive { // if we have all the pieces of the file, reopen it read only for (idx, file_info) in self - .meta() + .torrent() .file_infos .iter() .enumerate() @@ -736,9 +737,9 @@ impl TorrentStateLive { } self.streams - .wake_streams_on_piece_completed(id, &self.meta.lengths); + .wake_streams_on_piece_completed(id, &self.torrent.lengths); - g.unflushed_bitv_bytes += self.meta.lengths.piece_length(id) as u64; + g.unflushed_bitv_bytes += self.torrent.lengths.piece_length(id) as u64; if g.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES { g.try_flush_bitv() } @@ -930,7 +931,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { &self, handshake: &mut ExtendedHandshake, ) -> anyhow::Result<()> { - let info_bytes = &self.state.meta().info_bytes; + let info_bytes = &self.state.torrent().info_bytes; if !info_bytes.is_empty() { if let Ok(len) = info_bytes.len().try_into() { handshake.metadata_size = Some(len); @@ -1010,7 +1011,7 @@ impl PeerHandler { if let Some(dur) = backoff { self.state.clone().spawn( error_span!( - parent: self.state.meta.span.clone(), + parent: self.state.torrent.span.clone(), "wait_for_peer", peer = handle.to_string(), duration = format!("{dur:?}") @@ -1069,7 +1070,7 @@ impl PeerHandler { && !g.inflight_pieces.contains_key(pid) }); let natural_order_pieces = chunk_tracker - .iter_queued_pieces(&g.file_priorities, &self.state.meta().file_infos); + .iter_queued_pieces(&g.file_priorities, &self.state.torrent().file_infos); for n in priority_streamed_pieces.chain(natural_order_pieces) { if bf.get(n.get() as usize).map(|v| *v) == Some(true) { n_opt = Some(n); @@ -1612,7 +1613,7 @@ impl PeerHandler { dtx.send(Box::new(work)).await?; } else { self.state - .meta + .torrent .spawner .spawn_block_in_place(|| { write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info) @@ -1624,7 +1625,7 @@ impl PeerHandler { } fn send_metadata_piece(&self, piece_id: u32) -> anyhow::Result<()> { - let data = &self.state.meta().info_bytes; + let data = &self.state.torrent().info_bytes; let metadata_size = data.len(); if metadata_size == 0 { anyhow::bail!("peer requested for info metadata but we don't have it") diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 9da473b..3de4956 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -99,7 +99,12 @@ pub(crate) struct ManagedTorrentOptions { pub disk_write_queue: Option, } -pub struct ManagedTorrentInfo { +/// Common information about torrent shared among all possible states. +/// +// The reason it's not inlined into ManagedTorrent is to break the Arc cycle: +// ManagedTorrent contains the current torrent state, which in turn needs access to a bunch +// of stuff, but it shouldn't access the state. +pub struct ManagedTorrentShared { pub id: TorrentId, pub info: TorrentMetaV1Info, pub torrent_bytes: Bytes, @@ -117,7 +122,7 @@ pub struct ManagedTorrentInfo { } pub struct ManagedTorrent { - pub info: Arc, + pub shared: Arc, pub(crate) session: Weak, pub(crate) state_change_notify: Notify, pub(crate) locked: RwLock, @@ -125,19 +130,19 @@ pub struct ManagedTorrent { impl ManagedTorrent { pub fn id(&self) -> TorrentId { - self.info.id + self.shared.id } - pub fn info(&self) -> &ManagedTorrentInfo { - &self.info + pub fn shared(&self) -> &ManagedTorrentShared { + &self.shared } pub fn get_total_bytes(&self) -> u64 { - self.info.lengths.total_length() + self.shared.lengths.total_length() } pub fn info_hash(&self) -> Id20 { - self.info.info_hash + self.shared.info_hash } pub fn only_files(&self) -> Option> { @@ -217,7 +222,7 @@ impl ManagedTorrent { |state: &Arc, rx: tokio::sync::oneshot::Receiver, token: CancellationToken| { - let span = state.info.span.clone(); + let span = state.shared.span.clone(); let state = Arc::downgrade(state); spawn_with_cancel( error_span!(parent: span, "fatal_errors_receiver"), @@ -239,7 +244,7 @@ impl ManagedTorrent { fn spawn_peer_adder(live: &Arc, peer_rx: Option) { live.spawn( - error_span!(parent: live.meta().span.clone(), "external_peer_adder"), + error_span!(parent: live.torrent().span.clone(), "external_peer_adder"), { let live = live.clone(); async move { @@ -290,7 +295,7 @@ impl ManagedTorrent { let init = init.clone(); drop(g); let t = self.clone(); - let span = self.info().span.clone(); + let span = self.shared().span.clone(); let token = session.cancellation_token().child_token().clone(); spawn_with_cancel( @@ -365,9 +370,9 @@ impl ManagedTorrent { } ManagedTorrentState::Error(_) => { let initializing = Arc::new(TorrentStateInitializing::new( - self.info.clone(), + self.shared.clone(), g.only_files.clone(), - self.info.storage_factory.create_and_init(self.info())?, + self.shared.storage_factory.create_and_init(self.shared())?, )); g.state = ManagedTorrentState::Initializing(initializing.clone()); drop(g); @@ -412,7 +417,7 @@ impl ManagedTorrent { pub fn stats(&self) -> TorrentStats { use stats::TorrentStatsState as S; let mut resp = TorrentStats { - total_bytes: self.info().lengths.total_length(), + total_bytes: self.shared().lengths.total_length(), file_progress: Vec::new(), state: S::Error, error: None, @@ -513,7 +518,7 @@ impl ManagedTorrent { // Returns true if needed to unpause torrent. // This is just implementation detail - it's easier to pause/unpause than to tinker with internals. pub(crate) fn update_only_files(&self, only_files: &HashSet) -> anyhow::Result<()> { - let file_count = self.info().info.iter_file_lengths()?.count(); + let file_count = self.shared().info.iter_file_lengths()?.count(); for f in only_files.iter().copied() { if f >= file_count { anyhow::bail!("only_files contains invalid value {f}") diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index 805b8d5..b92304c 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -5,10 +5,10 @@ use crate::{ type_aliases::FileStorage, }; -use super::{streaming::TorrentStreams, ManagedTorrentInfo}; +use super::{streaming::TorrentStreams, ManagedTorrentShared}; pub struct TorrentStatePaused { - pub(crate) info: Arc, + pub(crate) info: Arc, pub(crate) files: FileStorage, pub(crate) chunk_tracker: ChunkTracker, pub(crate) streams: Arc, diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index cd711a2..deadf3b 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -179,7 +179,7 @@ impl AsyncRead for FileStream { let current = poll_try_io!(self .torrent - .info() + .shared() .lengths .compute_current_piece(self.position, self.file_torrent_abs_offset) .context("invalid position")); @@ -280,7 +280,7 @@ impl ManagedTorrent { s => anyhow::bail!("with_storage_and_file: invalid state: {}", s.name()), }; let fi = self - .info() + .shared() .file_infos .get(file_id) .context("invalid file")?; @@ -311,7 +311,7 @@ impl ManagedTorrent { fn is_file_finished(&self, file_id: usize) -> bool { // TODO: would be nice to remove locking - self.with_chunk_tracker(|ct| ct.is_file_finished(&self.info.file_infos[file_id])) + self.with_chunk_tracker(|ct| ct.is_file_finished(&self.shared.file_infos[file_id])) .unwrap_or(false) }