diff --git a/crates/librqbit/examples/custom_storage.rs b/crates/librqbit/examples/custom_storage.rs index 2be41c9..f22daa8 100644 --- a/crates/librqbit/examples/custom_storage.rs +++ b/crates/librqbit/examples/custom_storage.rs @@ -20,7 +20,7 @@ struct CustomStorage { impl StorageFactory for CustomStorageFactory { type Storage = CustomStorage; - fn create(&self, _info: &librqbit::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, _info: &librqbit::ManagedTorrentShared) -> anyhow::Result { Ok(CustomStorage::default()) } @@ -54,7 +54,7 @@ impl TorrentStorage for CustomStorage { anyhow::bail!("not implemented") } - fn init(&mut self, _meta: &librqbit::ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, _meta: &librqbit::ManagedTorrentShared) -> anyhow::Result<()> { anyhow::bail!("not implemented") } } diff --git a/crates/librqbit/examples/ubuntu.rs b/crates/librqbit/examples/ubuntu.rs index 6c7f414..7dbd189 100644 --- a/crates/librqbit/examples/ubuntu.rs +++ b/crates/librqbit/examples/ubuntu.rs @@ -49,7 +49,7 @@ async fn main() -> Result<(), anyhow::Error> { _ => unreachable!(), }; - info!("Details: {:?}", &handle.info().info); + info!("Details: {:?}", &handle.shared().info); // Print stats periodically. tokio::spawn({ diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 17e2fcf..65609c3 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -197,7 +197,7 @@ impl Api { torrents .map(|(id, mgr)| TorrentListResponseItem { id, - info_hash: mgr.info().info_hash.as_string(), + info_hash: mgr.shared().info_hash.as_string(), }) .collect() }); @@ -206,9 +206,9 @@ impl Api { pub fn api_torrent_details(&self, idx: TorrentIdOrHash) -> Result { let handle = self.mgr_handle(idx)?; - let info_hash = handle.info().info_hash; + let info_hash = handle.shared().info_hash; let only_files = handle.only_files(); - make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref()) + make_torrent_details(&info_hash, &handle.shared().info, only_files.as_deref()) } pub fn api_session_stats(&self) -> SessionStatsSnapshot { @@ -221,7 +221,7 @@ impl Api { file_idx: usize, ) -> Result<&'static str> { let handle = self.mgr_handle(idx)?; - let info = &handle.info().info; + let info = &handle.shared().info; torrent_file_mime_type(info, file_idx) } @@ -361,7 +361,7 @@ impl Api { AddTorrentResponse::Added(id, handle) => { let details = make_torrent_details( &handle.info_hash(), - &handle.info().info, + &handle.shared().info, handle.only_files().as_deref(), ) .context("error making torrent details")?; @@ -370,7 +370,7 @@ impl Api { details, seen_peers: None, output_folder: handle - .info() + .shared() .options .output_folder .to_string_lossy() diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index f72b95b..fd672d0 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -142,7 +142,7 @@ impl HttpApi { fn torrent_playlist_items(handle: &ManagedTorrent) -> Result> { let mut playlist_items = handle - .info() + .shared() .info .iter_filenames_and_lengths()? .enumerate() @@ -216,8 +216,8 @@ impl HttpApi { .await?; let (info, content) = match added { crate::AddTorrentResponse::AlreadyManaged(_, handle) => ( - handle.info().info.clone(), - handle.info().torrent_bytes.clone(), + handle.shared().info.clone(), + handle.shared().torrent_bytes.clone(), ), crate::AddTorrentResponse::ListOnly(ListOnlyResponse { info, diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 4dda659..6b67b4e 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -77,7 +77,7 @@ pub use session::{ }; pub use spawn_utils::spawn as librqbit_spawn; pub use torrent_state::{ - ManagedTorrent, ManagedTorrentInfo, ManagedTorrentState, TorrentStats, TorrentStatsState, + ManagedTorrent, ManagedTorrentShared, ManagedTorrentState, TorrentStats, TorrentStatsState, }; pub use type_aliases::FileInfos; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 670cd8f..096e086 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -12,6 +12,7 @@ use crate::{ api::TorrentIdOrHash, bitv_factory::{BitVFactory, NonPersistentBitVFactory}, dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, + file_info::FileInfo, merge_streams::merge_streams, peer_connection::PeerConnectionOptions, read_buf::ReadBuf, @@ -23,10 +24,11 @@ use crate::{ }, stream_connect::{SocksProxyConfig, StreamConnector}, torrent_state::{ - ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, + initializing::TorrentStateInitializing, ManagedTorrentHandle, ManagedTorrentLocked, + ManagedTorrentOptions, ManagedTorrentState, TorrentStateLive, }, type_aliases::{DiskWorkQueueSender, PeerStream}, - ManagedTorrentInfo, + ManagedTorrent, ManagedTorrentShared, }; use anyhow::{bail, Context}; use bencode::bencode_serialize_to_writer; @@ -43,6 +45,7 @@ use itertools::Itertools; use librqbit_core::{ constants::CHUNK_SIZE, directories::get_configuration_directory, + lengths::Lengths, magnet::Magnet, peer_id::generate_peer_id, spawn_utils::spawn_with_cancel, @@ -51,7 +54,10 @@ use librqbit_core::{ use parking_lot::RwLock; use peer_binary_protocol::Handshake; use serde::{Deserialize, Serialize}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::Notify, +}; use tokio_stream::StreamExt; use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span}; @@ -95,7 +101,7 @@ pub struct Session { peer_id: Id20, dht: Option, persistence: Option>, - bitv_factory: Arc, + pub(crate) bitv_factory: Arc, peer_opts: PeerConnectionOptions, spawner: BlockingSpawner, next_id: AtomicUsize, @@ -111,9 +117,8 @@ pub struct Session { default_storage_factory: Option, reqwest_client: reqwest::Client, - connector: Arc, - - concurrent_initialize_semaphore: Arc, + pub(crate) connector: Arc, + pub(crate) concurrent_initialize_semaphore: Arc, root_span: Option, @@ -637,7 +642,7 @@ impl Session { if opts.enable_upnp_port_forwarding { session.spawn( error_span!(parent: session.rs(), "upnp_forward", port = listen_port), - session.clone().task_upnp_port_forwarder(listen_port), + Self::task_upnp_port_forwarder(listen_port), ); } } @@ -686,7 +691,7 @@ impl Session { } async fn check_incoming_connection( - &self, + self: Arc, addr: SocketAddr, mut stream: TcpStream, ) -> anyhow::Result<(Arc, CheckedIncomingConnection)> { @@ -739,6 +744,8 @@ impl Session { async fn task_tcp_listener(self: Arc, l: TcpListener) -> anyhow::Result<()> { let mut futs = FuturesUnordered::new(); + let session = Arc::downgrade(&self); + drop(self); loop { tokio::select! { @@ -746,13 +753,15 @@ impl Session { match r { Ok((stream, addr)) => { trace!("accepted connection from {addr}"); + let session = session.upgrade().context("session is dead")?; + let span = error_span!(parent: session.rs(), "incoming", addr=%addr); futs.push( - self.check_incoming_connection(addr, stream) + session.check_incoming_connection(addr, stream) .map_err(|e| { debug!("error checking incoming connection: {e:#}"); e }) - .instrument(error_span!(parent: self.rs(), "incoming", addr=%addr)) + .instrument(span) ); } Err(e) => { @@ -770,7 +779,7 @@ impl Session { } } - async fn task_upnp_port_forwarder(self: Arc, port: u16) -> anyhow::Result<()> { + async fn task_upnp_port_forwarder(port: u16) -> anyhow::Result<()> { let pf = librqbit_upnp::UpnpPortForwarder::new(vec![port], None)?; pf.run_forever().await } @@ -914,7 +923,6 @@ impl Session { peer_rx: Some(rx), initial_peers: { let seen = seen.into_iter().collect_vec(); - info!(count=seen.len(), "seen"); for peer in &seen { debug!(?peer, "seen") } @@ -1023,7 +1031,7 @@ impl Session { } async fn main_torrent_info( - &self, + self: &Arc, add_res: InternalAddResult, mut opts: AddTorrentOptions, ) -> anyhow::Result { @@ -1084,43 +1092,6 @@ impl Session { .fetch_add(1, std::sync::atomic::Ordering::Relaxed) }; - let mut builder = ManagedTorrentBuilder::new( - id, - info, - info_hash, - torrent_bytes, - info_bytes, - output_folder, - storage_factory, - ); - builder - .allow_overwrite(opts.overwrite) - .spawner(self.spawner) - .trackers(trackers) - .connector(self.connector.clone()) - .peer_id(self.peer_id); - - if let Some(d) = self.disk_write_tx.clone() { - builder.disk_writer(d); - } - - if let Some(only_files) = only_files { - builder.only_files(only_files); - } - if let Some(interval) = opts.force_tracker_interval { - builder.force_tracker_interval(interval); - } - - let peer_opts = self.merge_peer_opts(opts.peer_opts); - - if let Some(t) = peer_opts.connect_timeout { - builder.peer_connect_timeout(t); - } - - if let Some(t) = peer_opts.read_write_timeout { - builder.peer_read_write_timeout(t); - } - let managed_torrent = { let mut g = self.db.write(); if let Some((id, handle)) = g.torrents.iter().find_map(|(eid, t)| { @@ -1132,13 +1103,70 @@ impl Session { }) { return Ok(AddTorrentResponse::AlreadyManaged(id, handle)); } - let managed_torrent = builder.build(error_span!(parent: self.rs(), "torrent", id))?; - g.add_torrent(managed_torrent.clone(), id); - managed_torrent + + let lengths = Lengths::from_torrent(&info)?; + let file_infos = info + .iter_file_details(&lengths)? + .map(|fd| { + Ok::<_, anyhow::Error>(FileInfo { + relative_filename: fd.filename.to_pathbuf()?, + offset_in_torrent: fd.offset, + piece_range: fd.pieces, + len: fd.len, + }) + }) + .collect::>>()?; + + let span = error_span!(parent: self.rs(), "torrent", id); + let peer_opts = self.merge_peer_opts(opts.peer_opts); + let minfo = Arc::new(ManagedTorrentShared { + id, + span, + file_infos, + info, + torrent_bytes, + info_bytes, + info_hash, + trackers: trackers.into_iter().collect(), + spawner: self.spawner, + peer_id: self.peer_id, + lengths, + storage_factory, + options: ManagedTorrentOptions { + force_tracker_interval: opts.force_tracker_interval, + peer_connect_timeout: peer_opts.connect_timeout, + peer_read_write_timeout: peer_opts.read_write_timeout, + allow_overwrite: opts.overwrite, + output_folder, + disk_write_queue: self.disk_write_tx.clone(), + }, + connector: self.connector.clone(), + session: Arc::downgrade(self), + }); + + let initializing = Arc::new(TorrentStateInitializing::new( + minfo.clone(), + only_files.clone(), + minfo.storage_factory.create_and_init(&minfo)?, + )); + let handle = Arc::new(ManagedTorrent { + locked: RwLock::new(ManagedTorrentLocked { + state: ManagedTorrentState::Initializing(initializing), + only_files, + }), + state_change_notify: Notify::new(), + shared: minfo, + }); + + g.add_torrent(handle.clone(), id); + handle }; if let Some(p) = self.persistence.as_ref() { - p.store(id, &managed_torrent).await?; + if let Err(e) = p.store(id, &managed_torrent).await { + self.db.write().torrents.remove(&id); + return Err(e); + } } // Merge "initial_peers" and "peer_rx" into one stream. @@ -1156,18 +1184,11 @@ impl Session { ); { - let span = managed_torrent.info.span.clone(); + let span = managed_torrent.shared.span.clone(); let _ = span.enter(); managed_torrent - .start( - peer_rx, - opts.paused, - self.cancellation_token.child_token(), - self.concurrent_initialize_semaphore.clone(), - self.bitv_factory.clone(), - self.stats.atomic.clone(), - ) + .start(peer_rx, opts.paused) .context("error starting torrent")?; } @@ -1239,18 +1260,18 @@ impl Session { _ => None, }) .map(Ok) - .unwrap_or_else(|| removed.storage_factory.create(removed.info())); + .unwrap_or_else(|| removed.shared.storage_factory.create(removed.shared())); match (storage, delete_files) { (Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"), (Ok(storage), true) => { debug!("will delete files"); - remove_files_and_dirs(removed.info(), &storage); - if removed.info().options.output_folder != self.output_folder { + remove_files_and_dirs(removed.shared(), &storage); + if removed.shared().options.output_folder != self.output_folder { if let Err(e) = storage.remove_directory_if_empty(Path::new("")) { warn!( "error removing {:?}: {e:?}", - removed.info().options.output_folder + removed.shared().options.output_folder ) } } @@ -1313,18 +1334,11 @@ impl Session { pub async fn unpause(self: &Arc, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { let peer_rx = self.make_peer_rx( handle.info_hash(), - handle.info().trackers.clone().into_iter().collect(), + handle.shared().trackers.clone().into_iter().collect(), self.tcp_listen_port, - handle.info().options.force_tracker_interval, - )?; - handle.start( - peer_rx, - false, - self.cancellation_token.child_token(), - self.concurrent_initialize_semaphore.clone(), - self.bitv_factory.clone(), - self.stats.atomic.clone(), + handle.shared().options.force_tracker_interval, )?; + handle.start(peer_rx, false)?; self.try_update_persistence_metadata(handle).await; Ok(()) } @@ -1344,7 +1358,7 @@ impl Session { } } -fn remove_files_and_dirs(info: &ManagedTorrentInfo, files: &dyn TorrentStorage) { +fn remove_files_and_dirs(info: &ManagedTorrentShared, files: &dyn TorrentStorage) { let mut all_dirs = HashSet::new(); for (id, fi) in info.file_infos.iter().enumerate() { let mut fname = &*fi.relative_filename; diff --git a/crates/librqbit/src/session_persistence/json.rs b/crates/librqbit/src/session_persistence/json.rs index d415375..bd2d888 100644 --- a/crates/librqbit/src/session_persistence/json.rs +++ b/crates/librqbit/src/session_persistence/json.rs @@ -128,6 +128,7 @@ impl JsonSessionPersistenceStore { write_torrent_file: bool, ) -> anyhow::Result<()> { if !torrent + .shared .storage_factory .is_type_id(TypeId::of::()) { @@ -136,7 +137,7 @@ impl JsonSessionPersistenceStore { let st = SerializedTorrent { trackers: torrent - .info() + .shared() .trackers .iter() .map(|u| u.to_string()) @@ -146,10 +147,10 @@ impl JsonSessionPersistenceStore { torrent_bytes: Default::default(), only_files: torrent.only_files().clone(), is_paused: torrent.with_state(|s| matches!(s, ManagedTorrentState::Paused(_))), - output_folder: torrent.info().options.output_folder.clone(), + output_folder: torrent.shared().options.output_folder.clone(), }; - if write_torrent_file && !torrent.info().torrent_bytes.is_empty() { + if write_torrent_file && !torrent.shared().torrent_bytes.is_empty() { let torrent_bytes_file = self.torrent_bytes_filename(&torrent.info_hash()); match tokio::fs::OpenOptions::new() .create(true) @@ -159,7 +160,7 @@ impl JsonSessionPersistenceStore { .await { Ok(mut f) => { - if let Err(e) = f.write_all(&torrent.info().torrent_bytes).await { + if let Err(e) = f.write_all(&torrent.shared().torrent_bytes).await { warn!(error=?e, file=?torrent_bytes_file, "error writing torrent bytes") } } diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index 98dd6f3..10d593c 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -96,7 +96,7 @@ impl SessionPersistenceStore for PostgresSessionStorage { } async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> { - let torrent_bytes: &[u8] = &torrent.info().torrent_bytes; + let torrent_bytes: &[u8] = &torrent.shared().torrent_bytes; let q = "INSERT INTO torrents (id, info_hash, torrent_bytes, trackers, output_folder, only_files, is_paused) VALUES($1, $2, $3, $4, $5, $6, $7) ON CONFLICT(id) DO NOTHING"; @@ -104,10 +104,17 @@ impl SessionPersistenceStore for PostgresSessionStorage { .bind::(id.try_into()?) .bind(&torrent.info_hash().0[..]) .bind(torrent_bytes) - .bind(torrent.info().trackers.iter().cloned().collect::>()) .bind( torrent - .info() + .shared() + .trackers + .iter() + .cloned() + .collect::>(), + ) + .bind( + torrent + .shared() .options .output_folder .to_str() diff --git a/crates/librqbit/src/session_stats/mod.rs b/crates/librqbit/src/session_stats/mod.rs index 8cf5eb0..2939694 100644 --- a/crates/librqbit/src/session_stats/mod.rs +++ b/crates/librqbit/src/session_stats/mod.rs @@ -3,6 +3,7 @@ use std::{ time::{Duration, Instant}, }; +use anyhow::Context; use atomic::AtomicSessionStats; use librqbit_core::speed_estimator::SpeedEstimator; use snapshot::SessionStatsSnapshot; @@ -40,12 +41,13 @@ impl Default for SessionStats { impl Session { pub(crate) fn start_speed_estimator_updater(self: &Arc) { self.spawn(error_span!(parent: self.rs(), "speed_estimator"), { - let s = self.clone(); + let s = Arc::downgrade(self); async move { let mut i = tokio::time::interval(Duration::from_secs(1)); loop { i.tick().await; + let s = s.upgrade().context("session is dead")?; let now = Instant::now(); let fetched = s.stats.atomic.fetched_bytes.load(Ordering::Relaxed); let uploaded = s.stats.atomic.uploaded_bytes.load(Ordering::Relaxed); diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index 651d2fc..74ac596 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Context; use tracing::warn; -use crate::{storage::StorageFactoryExt, torrent_state::ManagedTorrentInfo}; +use crate::{storage::StorageFactoryExt, torrent_state::ManagedTorrentShared}; use crate::storage::{StorageFactory, TorrentStorage}; @@ -18,7 +18,7 @@ pub struct FilesystemStorageFactory {} impl StorageFactory for FilesystemStorageFactory { type Storage = FilesystemStorage; - fn create(&self, meta: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result { Ok(FilesystemStorage { output_folder: meta.options.output_folder.clone(), opened_files: Default::default(), @@ -149,7 +149,7 @@ impl TorrentStorage for FilesystemStorage { } } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { let mut files = Vec::::new(); for file_details in meta.info.iter_file_details(&meta.lengths)? { let mut full_path = self.output_folder.clone(); diff --git a/crates/librqbit/src/storage/filesystem/mmap.rs b/crates/librqbit/src/storage/filesystem/mmap.rs index 1e68b24..5b92657 100644 --- a/crates/librqbit/src/storage/filesystem/mmap.rs +++ b/crates/librqbit/src/storage/filesystem/mmap.rs @@ -4,7 +4,7 @@ use anyhow::Context; use memmap2::{MmapMut, MmapOptions}; use parking_lot::RwLock; -use crate::torrent_state::ManagedTorrentInfo; +use crate::torrent_state::ManagedTorrentShared; use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; @@ -22,7 +22,7 @@ fn dummy_mmap() -> anyhow::Result { impl StorageFactory for MmapFilesystemStorageFactory { type Storage = MmapFilesystemStorage; - fn create(&self, meta: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result { let fs_storage = FilesystemStorageFactory::default().create(meta)?; Ok(MmapFilesystemStorage { @@ -97,7 +97,7 @@ impl TorrentStorage for MmapFilesystemStorage { })) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.fs.init(meta)?; let mut mmaps = Vec::new(); for (idx, file) in self.fs.opened_files.iter().enumerate() { diff --git a/crates/librqbit/src/storage/middleware/slow.rs b/crates/librqbit/src/storage/middleware/slow.rs index fba5160..5dbdf8a 100644 --- a/crates/librqbit/src/storage/middleware/slow.rs +++ b/crates/librqbit/src/storage/middleware/slow.rs @@ -16,7 +16,7 @@ use parking_lot::Mutex; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, - ManagedTorrentInfo, + ManagedTorrentShared, }; #[derive(Clone)] @@ -35,7 +35,7 @@ impl SlowStorageFactory { impl StorageFactory for SlowStorageFactory { type Storage = SlowStorage; - fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { Ok(SlowStorage { underlying: self.underlying_factory.create(info)?, pwrite_all_bufread: Mutex::new(Box::new( @@ -116,7 +116,7 @@ impl TorrentStorage for SlowStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.underlying.init(meta) } } diff --git a/crates/librqbit/src/storage/middleware/timing.rs b/crates/librqbit/src/storage/middleware/timing.rs index 7079db2..42c3382 100644 --- a/crates/librqbit/src/storage/middleware/timing.rs +++ b/crates/librqbit/src/storage/middleware/timing.rs @@ -4,7 +4,7 @@ A storage middleware that logs the time underlying storage operations took. use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, - ManagedTorrentInfo, + ManagedTorrentShared, }; #[derive(Clone)] @@ -25,7 +25,7 @@ impl TimingStorageFactory { impl StorageFactory for TimingStorageFactory { type Storage = TimingStorage; - fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { Ok(TimingStorage { name: self.name.clone(), underlying: self.underlying_factory.create(info)?, @@ -104,7 +104,7 @@ impl TorrentStorage for TimingStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.underlying.init(meta) } } diff --git a/crates/librqbit/src/storage/middleware/write_through_cache.rs b/crates/librqbit/src/storage/middleware/write_through_cache.rs index 7b979a6..59e8cb0 100644 --- a/crates/librqbit/src/storage/middleware/write_through_cache.rs +++ b/crates/librqbit/src/storage/middleware/write_through_cache.rs @@ -14,7 +14,7 @@ use parking_lot::RwLock; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, - FileInfos, ManagedTorrentInfo, + FileInfos, ManagedTorrentShared, }; #[derive(Clone, Copy)] @@ -35,7 +35,7 @@ impl WriteThroughCacheStorageFactory { impl StorageFactory for WriteThroughCacheStorageFactory { type Storage = WriteThroughCacheStorage; - fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { let pieces = self .max_cache_bytes .div_ceil(info.lengths.default_piece_length() as u64) @@ -121,7 +121,7 @@ impl TorrentStorage for WriteThroughCacheStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.underlying.init(meta) } } diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index 14632cd..ff710d0 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -11,13 +11,13 @@ use std::{ path::Path, }; -use crate::torrent_state::ManagedTorrentInfo; +use crate::torrent_state::ManagedTorrentShared; pub trait StorageFactory: Send + Sync + Any { type Storage: TorrentStorage; - fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result; - fn create_and_init(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result; + fn create_and_init(&self, info: &ManagedTorrentShared) -> anyhow::Result { let mut storage = self.create(info)?; storage.init(info)?; Ok(storage) @@ -44,7 +44,7 @@ impl StorageFactoryExt for SF { impl StorageFactory for Wrapper { type Storage = Box; - fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result { let s = self.sf.create(info)?; Ok(Box::new(s)) } @@ -65,7 +65,7 @@ impl StorageFactoryExt for SF { impl StorageFactory for Box { type Storage = U::Storage; - fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result { (**self).create(info) } @@ -76,7 +76,7 @@ impl StorageFactory for Box { pub trait TorrentStorage: Send + Sync { // Create/open files etc. - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()>; + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()>; /// Given a file_id (which you can get more info from in init_storage() through torrent info) /// read buf.len() bytes into buf at offset. @@ -124,7 +124,7 @@ impl TorrentStorage for Box { (**self).remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { (**self).init(meta) } } diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index ee6f5db..67b6fcb 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -16,7 +16,7 @@ use crate::{ create_torrent, tests::test_util::{ create_default_random_dir_with_torrents, setup_test_logging, spawn_debug_server, - TestPeerMetadata, + wait_until_i_am_the_last_task, DropChecks, TestPeerMetadata, }, AddTorrentOptions, AddTorrentResponse, Session, SessionOptions, SessionPersistenceConfig, }; @@ -28,13 +28,22 @@ async fn test_e2e_download() { .and_then(|v| v.parse().ok()) .unwrap_or(180); - tokio::time::timeout(Duration::from_secs(timeout), _test_e2e_download()) - .await - .context("test_e2e_download timed out") - .unwrap() + let drop_checks = DropChecks::default(); + tokio::time::timeout( + Duration::from_secs(timeout), + _test_e2e_download(&drop_checks), + ) + .await + .context("test_e2e_download timed out") + .unwrap(); + + // Wait to ensure everything is dropped. + wait_until_i_am_the_last_task().await; + + drop_checks.check().unwrap(); } -async fn _test_e2e_download() { +async fn _test_e2e_download(drop_checks: &DropChecks) { setup_test_logging(); match crate::try_increase_nofile_limit() { Ok(limit) => info!(limit, "increased ulimit"), @@ -75,6 +84,7 @@ async fn _test_e2e_download() { for i in 0..num_servers { let torrent_file_bytes = torrent_file_bytes.clone(); let tempdir = tempdir.path().to_owned(); + let drop_checks = drop_checks.clone(); let fut = spawn( async move { let peer_id = TestPeerMetadata { @@ -104,6 +114,8 @@ async fn _test_e2e_download() { .await .context("error starting session")?; + drop_checks.add(&session, format!("server session {i}")); + info!("started session"); let handle = session @@ -118,6 +130,9 @@ async fn _test_e2e_download() { .await .context("error adding torrent")?; let h = handle.into_handle().context("into_handle()")?; + + drop_checks.add(&h.shared, format!("server {i} torrent shared handle")); + let mut interval = interval(Duration::from_millis(100)); info!("added torrent"); @@ -141,12 +156,13 @@ async fn _test_e2e_download() { } } info!("torrent is live"); - Ok::<_, anyhow::Error>(SocketAddr::new( + let addr = SocketAddr::new( std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), session .tcp_listen_port() .context("expected session.tcp_listen_port() to be set")?, - )) + ); + Ok::<_, anyhow::Error>((session, addr)) } .instrument(error_span!("server", id = i)), ); @@ -154,12 +170,15 @@ async fn _test_e2e_download() { } let mut peers = Vec::new(); + + // This is around just not to drop. + let mut _servers = Vec::new(); for (id, peer) in futures::future::join_all(futs) .await .into_iter() .enumerate() { - let peer = peer + let (server, peer) = peer .with_context(|| format!("join error, server={id}")) .unwrap() .with_context(|| format!("timeout, server={id}")) @@ -167,6 +186,7 @@ async fn _test_e2e_download() { .with_context(|| format!("server couldn't start, server={id}")) .unwrap(); peers.push(peer); + _servers.push(server); } info!("started all servers, starting client"); @@ -201,6 +221,7 @@ async fn _test_e2e_download() { ) .await .unwrap(); + drop_checks.add(&session, "client session"); info!("started client session"); diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index 4aac149..051d773 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -1,14 +1,20 @@ -use std::{io::Write, path::Path}; +use std::{ + io::Write, + path::Path, + sync::{Arc, Weak}, + time::Duration, +}; -use anyhow::Context; +use anyhow::{bail, Context}; use axum::{response::IntoResponse, routing::get, Router}; use librqbit_core::Id20; +use parking_lot::RwLock; use rand::{thread_rng, Rng, RngCore, SeedableRng}; use tempfile::TempDir; use tracing::{debug, info}; pub fn setup_test_logging() { - if let Err(_) = std::env::var("RUST_LOG") { + if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", "debug"); } let _ = tracing_subscriber::fmt::try_init(); @@ -124,3 +130,77 @@ async fn debug_server() -> anyhow::Result<()> { pub fn spawn_debug_server() { tokio::spawn(debug_server()); } + +pub trait DropPlaceholder: Send + Sync {} +impl DropPlaceholder for T {} + +struct DropCheck { + obj: Weak, + name: String, +} + +#[derive(Default, Clone)] +pub struct DropChecks(Arc>>); + +impl DropChecks { + pub fn add>(&self, obj: &Arc, name: S) { + let weak = Arc::downgrade(obj); + self.0.write().push(DropCheck { + obj: weak as Weak, + name: name.into(), + }) + } + + pub fn check(&self) -> anyhow::Result<()> { + let mut still_running = Vec::new(); + for dc in self.0.read().iter() { + if dc.obj.upgrade().is_some() { + still_running.push(dc.name.clone()) + } + } + if !still_running.is_empty() { + anyhow::bail!( + "still existing objects that were supposed to be dropped: {still_running:#?}" + ) + } + Ok(()) + } +} + +pub async fn wait_until( + mut cond: impl FnMut() -> anyhow::Result<()>, + timeout: Duration, +) -> anyhow::Result<()> { + let mut interval = tokio::time::interval(Duration::from_millis(10)); + let mut last_err: Option = None; + let res = tokio::time::timeout(timeout, async { + loop { + interval.tick().await; + match cond() { + Ok(()) => return Ok::<_, anyhow::Error>(()), + Err(e) => last_err = Some(e), + } + } + }) + .await; + if res.is_err() { + bail!("wait_until timeout: last result = {last_err:?}") + } + Ok(()) +} + +pub async fn wait_until_i_am_the_last_task() { + let metrics = tokio::runtime::Handle::current().metrics(); + wait_until( + || { + let num_alive = metrics.num_alive_tasks(); + if num_alive != 1 { + bail!("metrics.num_alive_tasks() = {num_alive}, expected 1") + } + Ok(()) + }, + Duration::from_secs(5), + ) + .await + .unwrap(); +} diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 5f7fdb8..915139b 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -19,11 +19,11 @@ use crate::{ FileInfos, }; -use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; +use super::{paused::TorrentStatePaused, ManagedTorrentShared}; pub struct TorrentStateInitializing { pub(crate) files: FileStorage, - pub(crate) meta: Arc, + pub(crate) meta: Arc, pub(crate) only_files: Option>, pub(crate) checked_bytes: AtomicU64, } @@ -37,7 +37,7 @@ fn compute_selected_pieces( for (_, fi) in file_infos .iter() .enumerate() - .filter(|(id, _)| only_files.map(|of| of.contains(id)).unwrap_or(false)) + .filter(|(id, _)| only_files.map(|of| of.contains(id)).unwrap_or(true)) { if let Some(r) = bf.get_mut(fi.piece_range_usize()) { r.fill(true); @@ -48,7 +48,7 @@ fn compute_selected_pieces( impl TorrentStateInitializing { pub fn new( - meta: Arc, + meta: Arc, only_files: Option>, files: FileStorage, ) -> Self { diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index e9da863..0c583cc 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -106,7 +106,7 @@ use super::{ paused::TorrentStatePaused, streaming::TorrentStreams, utils::{timeit, TimedExistence}, - ManagedTorrentInfo, + ManagedTorrentShared, }; #[derive(Debug)] @@ -180,7 +180,7 @@ const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024; pub struct TorrentStateLive { peers: PeerStates, - meta: Arc, + torrent: Arc, locked: RwLock, pub(crate) files: FileStorage, @@ -214,12 +214,16 @@ impl TorrentStateLive { paused: TorrentStatePaused, fatal_errors_tx: tokio::sync::oneshot::Sender, cancellation_token: CancellationToken, - session_stats: Arc, ) -> anyhow::Result> { let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); - - let down_speed_estimator = SpeedEstimator::new(5); - let up_speed_estimator = SpeedEstimator::new(5); + let session = paused + .info + .session + .upgrade() + .context("session is dead, cannot start torrent")?; + let session_stats = session.stats.atomic.clone(); + let down_speed_estimator = SpeedEstimator::default(); + let up_speed_estimator = SpeedEstimator::default(); let have_bytes = paused.chunk_tracker.get_hns().have_bytes; let lengths = *paused.chunk_tracker.get_lengths(); @@ -241,7 +245,7 @@ impl TorrentStateLive { let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128); let state = Arc::new(TorrentStateLive { - meta: paused.info.clone(), + torrent: paused.info.clone(), peers: PeerStates { session_stats: session_stats.clone(), stats: Default::default(), @@ -277,7 +281,7 @@ impl TorrentStateLive { }); state.spawn( - error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), + error_span!(parent: state.torrent.span.clone(), "speed_estimator_updater"), { let state = Arc::downgrade(&state); async move { @@ -303,7 +307,7 @@ impl TorrentStateLive { ); state.spawn( - error_span!(parent: state.meta.span.clone(), "peer_adder"), + error_span!(parent: state.torrent.span.clone(), "peer_adder"), state.clone().task_peer_adder(peer_queue_rx), ); Ok(state) @@ -330,7 +334,7 @@ impl TorrentStateLive { } fn disk_work_tx(&self) -> Option<&DiskWorkQueueSender> { - self.meta.options.disk_write_queue.as_ref() + self.torrent.options.disk_write_queue.as_ref() } pub(crate) fn add_incoming_peer( @@ -378,7 +382,7 @@ impl TorrentStateLive { self.spawn( error_span!( - parent: self.meta.span.clone(), + parent: self.torrent.span.clone(), "manage_incoming_peer", addr = %checked_peer.addr ), @@ -410,18 +414,18 @@ impl TorrentStateLive { first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { - connect_timeout: self.meta.options.peer_connect_timeout, - read_write_timeout: self.meta.options.peer_read_write_timeout, + connect_timeout: self.torrent.options.peer_connect_timeout, + read_write_timeout: self.torrent.options.peer_read_write_timeout, ..Default::default() }; let peer_connection = PeerConnection::new( checked_peer.addr, - self.meta.info_hash, - self.meta.peer_id, + self.torrent.info_hash, + self.torrent.peer_id, &handler, Some(options), - self.meta.spawner, - self.meta.connector.clone(), + self.torrent.spawner, + self.torrent.connector.clone(), ); let requester = handler.task_peer_chunk_requester(); @@ -474,18 +478,18 @@ impl TorrentStateLive { first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { - connect_timeout: state.meta.options.peer_connect_timeout, - read_write_timeout: state.meta.options.peer_read_write_timeout, + connect_timeout: state.torrent.options.peer_connect_timeout, + read_write_timeout: state.torrent.options.peer_read_write_timeout, ..Default::default() }; let peer_connection = PeerConnection::new( addr, - state.meta.info_hash, - state.meta.peer_id, + state.torrent.info_hash, + state.torrent.peer_id, &handler, Some(options), - state.meta.spawner, - state.meta.connector.clone(), + state.torrent.spawner, + state.torrent.connector.clone(), ); let requester = aframe!(handler .task_peer_chunk_requester() @@ -532,30 +536,30 @@ impl TorrentStateLive { let permit = state.peer_semaphore.clone().acquire_owned().await?; state.spawn( - error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()), + error_span!(parent: state.torrent.span.clone(), "manage_peer", peer = addr.to_string()), aframe!(state.clone().task_manage_outgoing_peer(addr, permit)), ); } } - pub fn meta(&self) -> &ManagedTorrentInfo { - &self.meta + pub fn torrent(&self) -> &ManagedTorrentShared { + &self.torrent } pub fn info(&self) -> &TorrentMetaV1Info { - &self.meta.info + &self.torrent.info } pub fn info_hash(&self) -> Id20 { - self.meta.info_hash + self.torrent.info_hash } pub fn peer_id(&self) -> Id20 { - self.meta.peer_id + self.torrent.peer_id } pub(crate) fn file_ops(&self) -> FileOps<'_> { FileOps::new( - &self.meta.info, + &self.torrent.info, &*self.files, - &self.meta().file_infos, + &self.torrent().file_infos, &self.lengths, ) } @@ -664,7 +668,7 @@ impl TorrentStateLive { // g.chunks; Ok(TorrentStatePaused { - info: self.meta.clone(), + info: self.torrent.clone(), files: self.files.take()?, chunk_tracker, streams: self.streams.clone(), @@ -687,7 +691,8 @@ impl TorrentStateLive { pub(crate) fn update_only_files(&self, only_files: &HashSet) -> anyhow::Result<()> { let mut g = self.lock_write("update_only_files"); let ct = g.get_chunks_mut()?; - let hns = ct.update_only_files(self.meta().file_infos.iter().map(|f| f.len), only_files)?; + let hns = + ct.update_only_files(self.torrent().file_infos.iter().map(|f| f.len), only_files)?; if !hns.finished() { self.reconnect_all_not_needed_peers(); } @@ -706,7 +711,7 @@ impl TorrentStateLive { }; self.streams .streamed_file_ids() - .any(|file_id| !chunks.is_file_finished(&self.meta.file_infos[file_id])) + .any(|file_id| !chunks.is_file_finished(&self.torrent.file_infos[file_id])) } // We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite @@ -725,7 +730,7 @@ impl TorrentStateLive { // if we have all the pieces of the file, reopen it read only for (idx, file_info) in self - .meta() + .torrent() .file_infos .iter() .enumerate() @@ -736,9 +741,9 @@ impl TorrentStateLive { } self.streams - .wake_streams_on_piece_completed(id, &self.meta.lengths); + .wake_streams_on_piece_completed(id, &self.torrent.lengths); - g.unflushed_bitv_bytes += self.meta.lengths.piece_length(id) as u64; + g.unflushed_bitv_bytes += self.torrent.lengths.piece_length(id) as u64; if g.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES { g.try_flush_bitv() } @@ -930,7 +935,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { &self, handshake: &mut ExtendedHandshake, ) -> anyhow::Result<()> { - let info_bytes = &self.state.meta().info_bytes; + let info_bytes = &self.state.torrent().info_bytes; if !info_bytes.is_empty() { if let Ok(len) = info_bytes.len().try_into() { handshake.metadata_size = Some(len); @@ -1010,7 +1015,7 @@ impl PeerHandler { if let Some(dur) = backoff { self.state.clone().spawn( error_span!( - parent: self.state.meta.span.clone(), + parent: self.state.torrent.span.clone(), "wait_for_peer", peer = handle.to_string(), duration = format!("{dur:?}") @@ -1069,7 +1074,7 @@ impl PeerHandler { && !g.inflight_pieces.contains_key(pid) }); let natural_order_pieces = chunk_tracker - .iter_queued_pieces(&g.file_priorities, &self.state.meta().file_infos); + .iter_queued_pieces(&g.file_priorities, &self.state.torrent().file_infos); for n in priority_streamed_pieces.chain(natural_order_pieces) { if bf.get(n.get() as usize).map(|v| *v) == Some(true) { n_opt = Some(n); @@ -1612,7 +1617,7 @@ impl PeerHandler { dtx.send(Box::new(work)).await?; } else { self.state - .meta + .torrent .spawner .spawn_block_in_place(|| { write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info) @@ -1624,7 +1629,7 @@ impl PeerHandler { } fn send_metadata_piece(&self, piece_id: u32) -> anyhow::Result<()> { - let data = &self.state.meta().info_bytes; + let data = &self.state.torrent().info_bytes; let metadata_size = data.len(); if metadata_size == 0 { anyhow::bail!("peer requested for info metadata but we don't have it") diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 0b506ab..0feaaf1 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -9,6 +9,7 @@ use std::collections::HashSet; use std::path::PathBuf; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::sync::Weak; use std::time::Duration; use anyhow::bail; @@ -19,7 +20,6 @@ use futures::future::BoxFuture; use futures::FutureExt; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::Lengths; -use librqbit_core::peer_id::generate_peer_id; use librqbit_core::spawn_utils::spawn_with_cancel; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; @@ -34,11 +34,8 @@ use tracing::debug; use tracing::error_span; use tracing::warn; -use crate::bitv_factory::BitVFactory; use crate::chunk_tracker::ChunkTracker; -use crate::file_info::FileInfo; use crate::session::TorrentId; -use crate::session_stats::atomic::AtomicSessionStats; use crate::spawn_utils::BlockingSpawner; use crate::storage::BoxStorageFactory; use crate::stream_connect::StreamConnector; @@ -46,6 +43,7 @@ use crate::torrent_state::stats::LiveStats; use crate::type_aliases::DiskWorkQueueSender; use crate::type_aliases::FileInfos; use crate::type_aliases::PeerStream; +use crate::Session; use initializing::TorrentStateInitializing; @@ -87,7 +85,7 @@ impl ManagedTorrentState { } pub(crate) struct ManagedTorrentLocked { - pub state: ManagedTorrentState, + pub(crate) state: ManagedTorrentState, pub(crate) only_files: Option>, } @@ -101,7 +99,13 @@ pub(crate) struct ManagedTorrentOptions { pub disk_write_queue: Option, } -pub struct ManagedTorrentInfo { +/// Common information about torrent shared among all possible states. +/// +// The reason it's not inlined into ManagedTorrent is to break the Arc cycle: +// ManagedTorrent contains the current torrent state, which in turn needs access to a bunch +// of stuff, but it shouldn't access the state. +pub struct ManagedTorrentShared { + pub id: TorrentId, pub info: TorrentMetaV1Info, pub torrent_bytes: Bytes, pub info_bytes: Bytes, @@ -114,33 +118,31 @@ pub struct ManagedTorrentInfo { pub span: tracing::Span, pub(crate) options: ManagedTorrentOptions, pub(crate) connector: Arc, + pub(crate) storage_factory: BoxStorageFactory, + pub(crate) session: Weak, } pub struct ManagedTorrent { - pub id: TorrentId, - // TODO: merge ManagedTorrent and ManagedTorrentInfo - pub info: Arc, - pub(crate) storage_factory: BoxStorageFactory, - - state_change_notify: Notify, - locked: RwLock, + pub shared: Arc, + pub(crate) state_change_notify: Notify, + pub(crate) locked: RwLock, } impl ManagedTorrent { pub fn id(&self) -> TorrentId { - self.id + self.shared.id } - pub fn info(&self) -> &ManagedTorrentInfo { - &self.info + pub fn shared(&self) -> &ManagedTorrentShared { + &self.shared } pub fn get_total_bytes(&self) -> u64 { - self.info.lengths.total_length() + self.shared.lengths.total_length() } pub fn info_hash(&self) -> Id20 { - self.info.info_hash + self.shared.info_hash } pub fn only_files(&self) -> Option> { @@ -209,18 +211,20 @@ impl ManagedTorrent { self: &Arc, peer_rx: Option, start_paused: bool, - live_cancellation_token: CancellationToken, - init_semaphore: Arc, - bitv_factory: Arc, - session_stats: Arc, ) -> anyhow::Result<()> { + let session = self + .shared + .session + .upgrade() + .context("session is dead, cannot start torrent")?; let mut g = self.locked.write(); + let cancellation_token = session.cancellation_token().child_token(); let spawn_fatal_errors_receiver = |state: &Arc, rx: tokio::sync::oneshot::Receiver, token: CancellationToken| { - let span = state.info.span.clone(); + let span = state.shared.span.clone(); let state = Arc::downgrade(state); spawn_with_cancel( error_span!(parent: span, "fatal_errors_receiver"), @@ -242,7 +246,7 @@ impl ManagedTorrent { fn spawn_peer_adder(live: &Arc, peer_rx: Option) { live.spawn( - error_span!(parent: live.meta().span.clone(), "external_peer_adder"), + error_span!(parent: live.torrent().span.clone(), "external_peer_adder"), { let live = live.clone(); async move { @@ -293,19 +297,21 @@ impl ManagedTorrent { let init = init.clone(); drop(g); let t = self.clone(); - let span = self.info().span.clone(); - let token = live_cancellation_token.clone(); + let span = self.shared().span.clone(); + let token = cancellation_token.clone(); spawn_with_cancel( error_span!(parent: span.clone(), "initialize_and_start"), token.clone(), async move { - let _permit = init_semaphore + let concurrent_init_semaphore = + session.concurrent_initialize_semaphore.clone(); + let _permit = concurrent_init_semaphore .acquire() .await .context("bug: concurrent init semaphore was closed")?; - match init.check(bitv_factory).await { + match init.check(session.bitv_factory.clone()).await { Ok(paused) => { let mut g = t.locked.write(); if let ManagedTorrentState::Initializing(_) = &g.state { @@ -321,12 +327,7 @@ impl ManagedTorrent { } let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new( - paused, - tx, - live_cancellation_token, - session_stats, - )?; + let live = TorrentStateLive::new(paused, tx, cancellation_token)?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); @@ -351,24 +352,19 @@ impl ManagedTorrent { ManagedTorrentState::Paused(_) => { let paused = g.state.take().assert_paused(); let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new( - paused, - tx, - live_cancellation_token.clone(), - session_stats, - )?; + let live = TorrentStateLive::new(paused, tx, cancellation_token.clone())?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); - spawn_fatal_errors_receiver(self, rx, live_cancellation_token); + spawn_fatal_errors_receiver(self, rx, cancellation_token); spawn_peer_adder(&live, peer_rx); Ok(()) } ManagedTorrentState::Error(_) => { let initializing = Arc::new(TorrentStateInitializing::new( - self.info.clone(), + self.shared.clone(), g.only_files.clone(), - self.storage_factory.create_and_init(self.info())?, + self.shared.storage_factory.create_and_init(self.shared())?, )); g.state = ManagedTorrentState::Initializing(initializing.clone()); drop(g); @@ -376,14 +372,7 @@ impl ManagedTorrent { self.state_change_notify.notify_waiters(); // Recurse. - self.start( - peer_rx, - start_paused, - live_cancellation_token, - init_semaphore, - bitv_factory, - session_stats, - ) + self.start(peer_rx, start_paused) } ManagedTorrentState::None => bail!("bug: torrent is in empty state"), } @@ -420,7 +409,7 @@ impl ManagedTorrent { pub fn stats(&self) -> TorrentStats { use stats::TorrentStatsState as S; let mut resp = TorrentStats { - total_bytes: self.info().lengths.total_length(), + total_bytes: self.shared().lengths.total_length(), file_progress: Vec::new(), state: S::Error, error: None, @@ -521,7 +510,7 @@ impl ManagedTorrent { // Returns true if needed to unpause torrent. // This is just implementation detail - it's easier to pause/unpause than to tinker with internals. pub(crate) fn update_only_files(&self, only_files: &HashSet) -> anyhow::Result<()> { - let file_count = self.info().info.iter_file_lengths()?.count(); + let file_count = self.shared().info.iter_file_lengths()?.count(); for f in only_files.iter().copied() { if f >= file_count { anyhow::bail!("only_files contains invalid value {f}") @@ -550,160 +539,4 @@ impl ManagedTorrent { } } -pub(crate) struct ManagedTorrentBuilder { - id: TorrentId, - info: TorrentMetaV1Info, - output_folder: PathBuf, - info_hash: Id20, - torrent_bytes: Bytes, - info_bytes: Bytes, - force_tracker_interval: Option, - peer_connect_timeout: Option, - peer_read_write_timeout: Option, - only_files: Option>, - trackers: Vec, - peer_id: Option, - spawner: Option, - allow_overwrite: bool, - storage_factory: BoxStorageFactory, - disk_writer: Option, - connector: Arc, -} - -impl ManagedTorrentBuilder { - pub fn new( - id: usize, - info: TorrentMetaV1Info, - info_hash: Id20, - torrent_bytes: Bytes, - info_bytes: Bytes, - output_folder: PathBuf, - storage_factory: BoxStorageFactory, - ) -> Self { - Self { - id, - info, - info_hash, - torrent_bytes, - info_bytes, - spawner: None, - force_tracker_interval: None, - peer_connect_timeout: None, - peer_read_write_timeout: None, - only_files: None, - trackers: Default::default(), - peer_id: None, - allow_overwrite: false, - output_folder, - storage_factory, - disk_writer: None, - connector: Arc::new(Default::default()), - } - } - - pub fn only_files(&mut self, only_files: Vec) -> &mut Self { - self.only_files = Some(only_files); - self - } - - pub fn trackers(&mut self, trackers: Vec) -> &mut Self { - self.trackers = trackers; - self - } - - pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self { - self.force_tracker_interval = Some(force_tracker_interval); - self - } - - pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self { - self.spawner = Some(spawner); - self - } - - pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self { - self.peer_id = Some(peer_id); - self - } - - pub fn allow_overwrite(&mut self, value: bool) -> &mut Self { - self.allow_overwrite = value; - self - } - - pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self { - self.peer_connect_timeout = Some(timeout); - self - } - - pub fn peer_read_write_timeout(&mut self, timeout: Duration) -> &mut Self { - self.peer_read_write_timeout = Some(timeout); - self - } - - pub fn disk_writer(&mut self, value: DiskWorkQueueSender) -> &mut Self { - self.disk_writer = Some(value); - self - } - - pub fn connector(&mut self, value: Arc) -> &mut Self { - self.connector = value; - self - } - - pub fn build(self, span: tracing::Span) -> anyhow::Result { - let lengths = Lengths::from_torrent(&self.info)?; - let file_infos = self - .info - .iter_file_details(&lengths)? - .map(|fd| { - Ok::<_, anyhow::Error>(FileInfo { - relative_filename: fd.filename.to_pathbuf()?, - offset_in_torrent: fd.offset, - piece_range: fd.pieces, - len: fd.len, - }) - }) - .collect::>>()?; - - let info = Arc::new(ManagedTorrentInfo { - span, - file_infos, - info: self.info, - torrent_bytes: self.torrent_bytes, - info_bytes: self.info_bytes, - info_hash: self.info_hash, - trackers: self.trackers.into_iter().collect(), - spawner: self.spawner.unwrap_or_default(), - peer_id: self.peer_id.unwrap_or_else(generate_peer_id), - lengths, - options: ManagedTorrentOptions { - force_tracker_interval: self.force_tracker_interval, - peer_connect_timeout: self.peer_connect_timeout, - peer_read_write_timeout: self.peer_read_write_timeout, - allow_overwrite: self.allow_overwrite, - output_folder: self.output_folder, - disk_write_queue: self.disk_writer, - }, - connector: self.connector, - }); - - let initializing = Arc::new(TorrentStateInitializing::new( - info.clone(), - self.only_files.clone(), - self.storage_factory.create_and_init(&info)?, - )); - Ok(Arc::new(ManagedTorrent { - id: self.id, - locked: RwLock::new(ManagedTorrentLocked { - state: ManagedTorrentState::Initializing(initializing), - only_files: self.only_files, - }), - state_change_notify: Notify::new(), - storage_factory: self.storage_factory, - info, - })) - } -} - pub type ManagedTorrentHandle = Arc; diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index 805b8d5..b92304c 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -5,10 +5,10 @@ use crate::{ type_aliases::FileStorage, }; -use super::{streaming::TorrentStreams, ManagedTorrentInfo}; +use super::{streaming::TorrentStreams, ManagedTorrentShared}; pub struct TorrentStatePaused { - pub(crate) info: Arc, + pub(crate) info: Arc, pub(crate) files: FileStorage, pub(crate) chunk_tracker: ChunkTracker, pub(crate) streams: Arc, diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index cd711a2..deadf3b 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -179,7 +179,7 @@ impl AsyncRead for FileStream { let current = poll_try_io!(self .torrent - .info() + .shared() .lengths .compute_current_piece(self.position, self.file_torrent_abs_offset) .context("invalid position")); @@ -280,7 +280,7 @@ impl ManagedTorrent { s => anyhow::bail!("with_storage_and_file: invalid state: {}", s.name()), }; let fi = self - .info() + .shared() .file_infos .get(file_id) .context("invalid file")?; @@ -311,7 +311,7 @@ impl ManagedTorrent { fn is_file_finished(&self, file_id: usize) -> bool { // TODO: would be nice to remove locking - self.with_chunk_tracker(|ct| ct.is_file_finished(&self.info.file_infos[file_id])) + self.with_chunk_tracker(|ct| ct.is_file_finished(&self.shared.file_infos[file_id])) .unwrap_or(false) } diff --git a/crates/librqbit_core/src/speed_estimator.rs b/crates/librqbit_core/src/speed_estimator.rs index 6fbb1c7..953d3b1 100644 --- a/crates/librqbit_core/src/speed_estimator.rs +++ b/crates/librqbit_core/src/speed_estimator.rs @@ -19,6 +19,12 @@ pub struct SpeedEstimator { time_remaining_millis: AtomicU64, } +impl Default for SpeedEstimator { + fn default() -> Self { + Self::new(5) + } +} + impl SpeedEstimator { pub fn new(window_seconds: usize) -> Self { assert!(window_seconds > 1);