From 338b221e8df90291458d8bc4fd8c8b4fbe568fa1 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 15:32:45 +0100 Subject: [PATCH 01/13] clippy --- crates/librqbit/src/tests/test_util.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index 4aac149..c785f8f 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -8,7 +8,7 @@ use tempfile::TempDir; use tracing::{debug, info}; pub fn setup_test_logging() { - if let Err(_) = std::env::var("RUST_LOG") { + if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", "debug"); } let _ = tracing_subscriber::fmt::try_init(); From 80f4d3b1b212675803c84deb51887574c3fe78b5 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 15:51:58 +0100 Subject: [PATCH 02/13] Remove ManagedTorrentBuilder --- crates/librqbit/src/session.rs | 116 ++++++++++------ crates/librqbit/src/torrent_state/mod.rs | 167 +---------------------- 2 files changed, 77 insertions(+), 206 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 670cd8f..eab7a5e 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -12,6 +12,7 @@ use crate::{ api::TorrentIdOrHash, bitv_factory::{BitVFactory, NonPersistentBitVFactory}, dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, + file_info::FileInfo, merge_streams::merge_streams, peer_connection::PeerConnectionOptions, read_buf::ReadBuf, @@ -23,10 +24,11 @@ use crate::{ }, stream_connect::{SocksProxyConfig, StreamConnector}, torrent_state::{ - ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, + initializing::TorrentStateInitializing, ManagedTorrentHandle, ManagedTorrentLocked, + ManagedTorrentOptions, ManagedTorrentState, TorrentStateLive, }, type_aliases::{DiskWorkQueueSender, PeerStream}, - ManagedTorrentInfo, + ManagedTorrent, ManagedTorrentInfo, }; use anyhow::{bail, Context}; use bencode::bencode_serialize_to_writer; @@ -43,6 +45,7 @@ use itertools::Itertools; use librqbit_core::{ constants::CHUNK_SIZE, directories::get_configuration_directory, + lengths::Lengths, magnet::Magnet, peer_id::generate_peer_id, spawn_utils::spawn_with_cancel, @@ -51,7 +54,10 @@ use librqbit_core::{ use parking_lot::RwLock; use peer_binary_protocol::Handshake; use serde::{Deserialize, Serialize}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::Notify, +}; use tokio_stream::StreamExt; use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span}; @@ -1023,7 +1029,7 @@ impl Session { } async fn main_torrent_info( - &self, + self: &Arc, add_res: InternalAddResult, mut opts: AddTorrentOptions, ) -> anyhow::Result { @@ -1084,43 +1090,6 @@ impl Session { .fetch_add(1, std::sync::atomic::Ordering::Relaxed) }; - let mut builder = ManagedTorrentBuilder::new( - id, - info, - info_hash, - torrent_bytes, - info_bytes, - output_folder, - storage_factory, - ); - builder - .allow_overwrite(opts.overwrite) - .spawner(self.spawner) - .trackers(trackers) - .connector(self.connector.clone()) - .peer_id(self.peer_id); - - if let Some(d) = self.disk_write_tx.clone() { - builder.disk_writer(d); - } - - if let Some(only_files) = only_files { - builder.only_files(only_files); - } - if let Some(interval) = opts.force_tracker_interval { - builder.force_tracker_interval(interval); - } - - let peer_opts = self.merge_peer_opts(opts.peer_opts); - - if let Some(t) = peer_opts.connect_timeout { - builder.peer_connect_timeout(t); - } - - if let Some(t) = peer_opts.read_write_timeout { - builder.peer_read_write_timeout(t); - } - let managed_torrent = { let mut g = self.db.write(); if let Some((id, handle)) = g.torrents.iter().find_map(|(eid, t)| { @@ -1132,13 +1101,70 @@ impl Session { }) { return Ok(AddTorrentResponse::AlreadyManaged(id, handle)); } - let managed_torrent = builder.build(error_span!(parent: self.rs(), "torrent", id))?; - g.add_torrent(managed_torrent.clone(), id); - managed_torrent + + let lengths = Lengths::from_torrent(&info)?; + let file_infos = info + .iter_file_details(&lengths)? + .map(|fd| { + Ok::<_, anyhow::Error>(FileInfo { + relative_filename: fd.filename.to_pathbuf()?, + offset_in_torrent: fd.offset, + piece_range: fd.pieces, + len: fd.len, + }) + }) + .collect::>>()?; + + let span = error_span!(parent: self.rs(), "torrent", id); + let peer_opts = self.merge_peer_opts(opts.peer_opts); + let minfo = Arc::new(ManagedTorrentInfo { + span, + file_infos, + info, + torrent_bytes, + info_bytes, + info_hash, + trackers: trackers.into_iter().collect(), + spawner: self.spawner, + peer_id: self.peer_id, + lengths, + options: ManagedTorrentOptions { + force_tracker_interval: opts.force_tracker_interval, + peer_connect_timeout: peer_opts.connect_timeout, + peer_read_write_timeout: peer_opts.read_write_timeout, + allow_overwrite: opts.overwrite, + output_folder, + disk_write_queue: self.disk_write_tx.clone(), + }, + connector: self.connector.clone(), + }); + + let initializing = Arc::new(TorrentStateInitializing::new( + minfo.clone(), + only_files.clone(), + storage_factory.create_and_init(&minfo)?, + )); + let handle = Arc::new(ManagedTorrent { + id, + locked: RwLock::new(ManagedTorrentLocked { + state: ManagedTorrentState::Initializing(initializing), + only_files, + }), + state_change_notify: Notify::new(), + storage_factory, + info: minfo, + session: Arc::downgrade(self), + }); + + g.add_torrent(handle.clone(), id); + handle }; if let Some(p) = self.persistence.as_ref() { - p.store(id, &managed_torrent).await?; + if let Err(e) = p.store(id, &managed_torrent).await { + self.db.write().torrents.remove(&id); + return Err(e); + } } // Merge "initial_peers" and "peer_rx" into one stream. diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 0b506ab..ac7b3b8 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -9,6 +9,7 @@ use std::collections::HashSet; use std::path::PathBuf; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::sync::Weak; use std::time::Duration; use anyhow::bail; @@ -19,7 +20,6 @@ use futures::future::BoxFuture; use futures::FutureExt; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::Lengths; -use librqbit_core::peer_id::generate_peer_id; use librqbit_core::spawn_utils::spawn_with_cancel; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; @@ -36,7 +36,6 @@ use tracing::warn; use crate::bitv_factory::BitVFactory; use crate::chunk_tracker::ChunkTracker; -use crate::file_info::FileInfo; use crate::session::TorrentId; use crate::session_stats::atomic::AtomicSessionStats; use crate::spawn_utils::BlockingSpawner; @@ -46,6 +45,7 @@ use crate::torrent_state::stats::LiveStats; use crate::type_aliases::DiskWorkQueueSender; use crate::type_aliases::FileInfos; use crate::type_aliases::PeerStream; +use crate::Session; use initializing::TorrentStateInitializing; @@ -87,7 +87,7 @@ impl ManagedTorrentState { } pub(crate) struct ManagedTorrentLocked { - pub state: ManagedTorrentState, + pub(crate) state: ManagedTorrentState, pub(crate) only_files: Option>, } @@ -122,8 +122,9 @@ pub struct ManagedTorrent { pub info: Arc, pub(crate) storage_factory: BoxStorageFactory, - state_change_notify: Notify, - locked: RwLock, + pub(crate) session: Weak, + pub(crate) state_change_notify: Notify, + pub(crate) locked: RwLock, } impl ManagedTorrent { @@ -550,160 +551,4 @@ impl ManagedTorrent { } } -pub(crate) struct ManagedTorrentBuilder { - id: TorrentId, - info: TorrentMetaV1Info, - output_folder: PathBuf, - info_hash: Id20, - torrent_bytes: Bytes, - info_bytes: Bytes, - force_tracker_interval: Option, - peer_connect_timeout: Option, - peer_read_write_timeout: Option, - only_files: Option>, - trackers: Vec, - peer_id: Option, - spawner: Option, - allow_overwrite: bool, - storage_factory: BoxStorageFactory, - disk_writer: Option, - connector: Arc, -} - -impl ManagedTorrentBuilder { - pub fn new( - id: usize, - info: TorrentMetaV1Info, - info_hash: Id20, - torrent_bytes: Bytes, - info_bytes: Bytes, - output_folder: PathBuf, - storage_factory: BoxStorageFactory, - ) -> Self { - Self { - id, - info, - info_hash, - torrent_bytes, - info_bytes, - spawner: None, - force_tracker_interval: None, - peer_connect_timeout: None, - peer_read_write_timeout: None, - only_files: None, - trackers: Default::default(), - peer_id: None, - allow_overwrite: false, - output_folder, - storage_factory, - disk_writer: None, - connector: Arc::new(Default::default()), - } - } - - pub fn only_files(&mut self, only_files: Vec) -> &mut Self { - self.only_files = Some(only_files); - self - } - - pub fn trackers(&mut self, trackers: Vec) -> &mut Self { - self.trackers = trackers; - self - } - - pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self { - self.force_tracker_interval = Some(force_tracker_interval); - self - } - - pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self { - self.spawner = Some(spawner); - self - } - - pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self { - self.peer_id = Some(peer_id); - self - } - - pub fn allow_overwrite(&mut self, value: bool) -> &mut Self { - self.allow_overwrite = value; - self - } - - pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self { - self.peer_connect_timeout = Some(timeout); - self - } - - pub fn peer_read_write_timeout(&mut self, timeout: Duration) -> &mut Self { - self.peer_read_write_timeout = Some(timeout); - self - } - - pub fn disk_writer(&mut self, value: DiskWorkQueueSender) -> &mut Self { - self.disk_writer = Some(value); - self - } - - pub fn connector(&mut self, value: Arc) -> &mut Self { - self.connector = value; - self - } - - pub fn build(self, span: tracing::Span) -> anyhow::Result { - let lengths = Lengths::from_torrent(&self.info)?; - let file_infos = self - .info - .iter_file_details(&lengths)? - .map(|fd| { - Ok::<_, anyhow::Error>(FileInfo { - relative_filename: fd.filename.to_pathbuf()?, - offset_in_torrent: fd.offset, - piece_range: fd.pieces, - len: fd.len, - }) - }) - .collect::>>()?; - - let info = Arc::new(ManagedTorrentInfo { - span, - file_infos, - info: self.info, - torrent_bytes: self.torrent_bytes, - info_bytes: self.info_bytes, - info_hash: self.info_hash, - trackers: self.trackers.into_iter().collect(), - spawner: self.spawner.unwrap_or_default(), - peer_id: self.peer_id.unwrap_or_else(generate_peer_id), - lengths, - options: ManagedTorrentOptions { - force_tracker_interval: self.force_tracker_interval, - peer_connect_timeout: self.peer_connect_timeout, - peer_read_write_timeout: self.peer_read_write_timeout, - allow_overwrite: self.allow_overwrite, - output_folder: self.output_folder, - disk_write_queue: self.disk_writer, - }, - connector: self.connector, - }); - - let initializing = Arc::new(TorrentStateInitializing::new( - info.clone(), - self.only_files.clone(), - self.storage_factory.create_and_init(&info)?, - )); - Ok(Arc::new(ManagedTorrent { - id: self.id, - locked: RwLock::new(ManagedTorrentLocked { - state: ManagedTorrentState::Initializing(initializing), - only_files: self.only_files, - }), - state_change_notify: Notify::new(), - storage_factory: self.storage_factory, - info, - })) - } -} - pub type ManagedTorrentHandle = Arc; From ad7b59ea3c9277164a22a1088e79480e192de25b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 15:57:18 +0100 Subject: [PATCH 03/13] Remove some args in start() function --- crates/librqbit/src/session.rs | 25 ++++------------ crates/librqbit/src/torrent_state/mod.rs | 37 ++++++++++-------------- 2 files changed, 20 insertions(+), 42 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index eab7a5e..aa5261d 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -101,7 +101,7 @@ pub struct Session { peer_id: Id20, dht: Option, persistence: Option>, - bitv_factory: Arc, + pub(crate) bitv_factory: Arc, peer_opts: PeerConnectionOptions, spawner: BlockingSpawner, next_id: AtomicUsize, @@ -117,9 +117,8 @@ pub struct Session { default_storage_factory: Option, reqwest_client: reqwest::Client, - connector: Arc, - - concurrent_initialize_semaphore: Arc, + pub(crate) connector: Arc, + pub(crate) concurrent_initialize_semaphore: Arc, root_span: Option, @@ -1186,14 +1185,7 @@ impl Session { let _ = span.enter(); managed_torrent - .start( - peer_rx, - opts.paused, - self.cancellation_token.child_token(), - self.concurrent_initialize_semaphore.clone(), - self.bitv_factory.clone(), - self.stats.atomic.clone(), - ) + .start(peer_rx, opts.paused) .context("error starting torrent")?; } @@ -1343,14 +1335,7 @@ impl Session { self.tcp_listen_port, handle.info().options.force_tracker_interval, )?; - handle.start( - peer_rx, - false, - self.cancellation_token.child_token(), - self.concurrent_initialize_semaphore.clone(), - self.bitv_factory.clone(), - self.stats.atomic.clone(), - )?; + handle.start(peer_rx, false)?; self.try_update_persistence_metadata(handle).await; Ok(()) } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index ac7b3b8..c8c5846 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -34,10 +34,8 @@ use tracing::debug; use tracing::error_span; use tracing::warn; -use crate::bitv_factory::BitVFactory; use crate::chunk_tracker::ChunkTracker; use crate::session::TorrentId; -use crate::session_stats::atomic::AtomicSessionStats; use crate::spawn_utils::BlockingSpawner; use crate::storage::BoxStorageFactory; use crate::stream_connect::StreamConnector; @@ -210,11 +208,11 @@ impl ManagedTorrent { self: &Arc, peer_rx: Option, start_paused: bool, - live_cancellation_token: CancellationToken, - init_semaphore: Arc, - bitv_factory: Arc, - session_stats: Arc, ) -> anyhow::Result<()> { + let session = self + .session + .upgrade() + .context("session is dead, cannot start torrent")?; let mut g = self.locked.write(); let spawn_fatal_errors_receiver = @@ -295,18 +293,20 @@ impl ManagedTorrent { drop(g); let t = self.clone(); let span = self.info().span.clone(); - let token = live_cancellation_token.clone(); + let token = session.cancellation_token().child_token().clone(); spawn_with_cancel( error_span!(parent: span.clone(), "initialize_and_start"), token.clone(), async move { - let _permit = init_semaphore + let concurrent_init_semaphore = + session.concurrent_initialize_semaphore.clone(); + let _permit = concurrent_init_semaphore .acquire() .await .context("bug: concurrent init semaphore was closed")?; - match init.check(bitv_factory).await { + match init.check(session.bitv_factory.clone()).await { Ok(paused) => { let mut g = t.locked.write(); if let ManagedTorrentState::Initializing(_) = &g.state { @@ -325,8 +325,8 @@ impl ManagedTorrent { let live = TorrentStateLive::new( paused, tx, - live_cancellation_token, - session_stats, + session.cancellation_token().child_token(), + session.stats.atomic.clone(), )?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); @@ -355,13 +355,13 @@ impl ManagedTorrent { let live = TorrentStateLive::new( paused, tx, - live_cancellation_token.clone(), - session_stats, + session.cancellation_token().child_token().clone(), + session.stats.atomic.clone(), )?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); - spawn_fatal_errors_receiver(self, rx, live_cancellation_token); + spawn_fatal_errors_receiver(self, rx, session.cancellation_token().child_token()); spawn_peer_adder(&live, peer_rx); Ok(()) } @@ -377,14 +377,7 @@ impl ManagedTorrent { self.state_change_notify.notify_waiters(); // Recurse. - self.start( - peer_rx, - start_paused, - live_cancellation_token, - init_semaphore, - bitv_factory, - session_stats, - ) + self.start(peer_rx, start_paused) } ManagedTorrentState::None => bail!("bug: torrent is in empty state"), } From b4512e48098990e76ad4c9b2b7ccc38f6ee1af67 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 16:06:16 +0100 Subject: [PATCH 04/13] Move some fields into ManagedTorrentInfo --- crates/librqbit/src/session.rs | 8 ++++---- crates/librqbit/src/session_persistence/json.rs | 1 + crates/librqbit/src/torrent_state/mod.rs | 10 ++++------ 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index aa5261d..a9263d0 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1117,6 +1117,7 @@ impl Session { let span = error_span!(parent: self.rs(), "torrent", id); let peer_opts = self.merge_peer_opts(opts.peer_opts); let minfo = Arc::new(ManagedTorrentInfo { + id, span, file_infos, info, @@ -1127,6 +1128,7 @@ impl Session { spawner: self.spawner, peer_id: self.peer_id, lengths, + storage_factory, options: ManagedTorrentOptions { force_tracker_interval: opts.force_tracker_interval, peer_connect_timeout: peer_opts.connect_timeout, @@ -1141,16 +1143,14 @@ impl Session { let initializing = Arc::new(TorrentStateInitializing::new( minfo.clone(), only_files.clone(), - storage_factory.create_and_init(&minfo)?, + minfo.storage_factory.create_and_init(&minfo)?, )); let handle = Arc::new(ManagedTorrent { - id, locked: RwLock::new(ManagedTorrentLocked { state: ManagedTorrentState::Initializing(initializing), only_files, }), state_change_notify: Notify::new(), - storage_factory, info: minfo, session: Arc::downgrade(self), }); @@ -1257,7 +1257,7 @@ impl Session { _ => None, }) .map(Ok) - .unwrap_or_else(|| removed.storage_factory.create(removed.info())); + .unwrap_or_else(|| removed.info.storage_factory.create(removed.info())); match (storage, delete_files) { (Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"), diff --git a/crates/librqbit/src/session_persistence/json.rs b/crates/librqbit/src/session_persistence/json.rs index d415375..b8cc7dc 100644 --- a/crates/librqbit/src/session_persistence/json.rs +++ b/crates/librqbit/src/session_persistence/json.rs @@ -128,6 +128,7 @@ impl JsonSessionPersistenceStore { write_torrent_file: bool, ) -> anyhow::Result<()> { if !torrent + .info .storage_factory .is_type_id(TypeId::of::()) { diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index c8c5846..9da473b 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -100,6 +100,7 @@ pub(crate) struct ManagedTorrentOptions { } pub struct ManagedTorrentInfo { + pub id: TorrentId, pub info: TorrentMetaV1Info, pub torrent_bytes: Bytes, pub info_bytes: Bytes, @@ -112,14 +113,11 @@ pub struct ManagedTorrentInfo { pub span: tracing::Span, pub(crate) options: ManagedTorrentOptions, pub(crate) connector: Arc, + pub(crate) storage_factory: BoxStorageFactory, } pub struct ManagedTorrent { - pub id: TorrentId, - // TODO: merge ManagedTorrent and ManagedTorrentInfo pub info: Arc, - pub(crate) storage_factory: BoxStorageFactory, - pub(crate) session: Weak, pub(crate) state_change_notify: Notify, pub(crate) locked: RwLock, @@ -127,7 +125,7 @@ pub struct ManagedTorrent { impl ManagedTorrent { pub fn id(&self) -> TorrentId { - self.id + self.info.id } pub fn info(&self) -> &ManagedTorrentInfo { @@ -369,7 +367,7 @@ impl ManagedTorrent { let initializing = Arc::new(TorrentStateInitializing::new( self.info.clone(), g.only_files.clone(), - self.storage_factory.create_and_init(self.info())?, + self.info.storage_factory.create_and_init(self.info())?, )); g.state = ManagedTorrentState::Initializing(initializing.clone()); drop(g); From 451debedbbd69377a1f22d7e370cf34f3715952d Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 16:12:20 +0100 Subject: [PATCH 05/13] Renames --- crates/librqbit/examples/custom_storage.rs | 4 +- crates/librqbit/examples/ubuntu.rs | 2 +- crates/librqbit/src/api.rs | 12 +-- crates/librqbit/src/http_api.rs | 6 +- crates/librqbit/src/lib.rs | 2 +- crates/librqbit/src/session.rs | 22 +++--- .../librqbit/src/session_persistence/json.rs | 10 +-- .../src/session_persistence/postgres.rs | 13 +++- crates/librqbit/src/storage/filesystem/fs.rs | 6 +- .../librqbit/src/storage/filesystem/mmap.rs | 6 +- .../librqbit/src/storage/middleware/slow.rs | 6 +- .../librqbit/src/storage/middleware/timing.rs | 6 +- .../storage/middleware/write_through_cache.rs | 6 +- crates/librqbit/src/storage/mod.rs | 14 ++-- .../src/torrent_state/initializing.rs | 6 +- crates/librqbit/src/torrent_state/live/mod.rs | 77 ++++++++++--------- crates/librqbit/src/torrent_state/mod.rs | 33 ++++---- crates/librqbit/src/torrent_state/paused.rs | 4 +- .../librqbit/src/torrent_state/streaming.rs | 6 +- 19 files changed, 127 insertions(+), 114 deletions(-) diff --git a/crates/librqbit/examples/custom_storage.rs b/crates/librqbit/examples/custom_storage.rs index 2be41c9..f22daa8 100644 --- a/crates/librqbit/examples/custom_storage.rs +++ b/crates/librqbit/examples/custom_storage.rs @@ -20,7 +20,7 @@ struct CustomStorage { impl StorageFactory for CustomStorageFactory { type Storage = CustomStorage; - fn create(&self, _info: &librqbit::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, _info: &librqbit::ManagedTorrentShared) -> anyhow::Result { Ok(CustomStorage::default()) } @@ -54,7 +54,7 @@ impl TorrentStorage for CustomStorage { anyhow::bail!("not implemented") } - fn init(&mut self, _meta: &librqbit::ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, _meta: &librqbit::ManagedTorrentShared) -> anyhow::Result<()> { anyhow::bail!("not implemented") } } diff --git a/crates/librqbit/examples/ubuntu.rs b/crates/librqbit/examples/ubuntu.rs index 6c7f414..7dbd189 100644 --- a/crates/librqbit/examples/ubuntu.rs +++ b/crates/librqbit/examples/ubuntu.rs @@ -49,7 +49,7 @@ async fn main() -> Result<(), anyhow::Error> { _ => unreachable!(), }; - info!("Details: {:?}", &handle.info().info); + info!("Details: {:?}", &handle.shared().info); // Print stats periodically. tokio::spawn({ diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 17e2fcf..65609c3 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -197,7 +197,7 @@ impl Api { torrents .map(|(id, mgr)| TorrentListResponseItem { id, - info_hash: mgr.info().info_hash.as_string(), + info_hash: mgr.shared().info_hash.as_string(), }) .collect() }); @@ -206,9 +206,9 @@ impl Api { pub fn api_torrent_details(&self, idx: TorrentIdOrHash) -> Result { let handle = self.mgr_handle(idx)?; - let info_hash = handle.info().info_hash; + let info_hash = handle.shared().info_hash; let only_files = handle.only_files(); - make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref()) + make_torrent_details(&info_hash, &handle.shared().info, only_files.as_deref()) } pub fn api_session_stats(&self) -> SessionStatsSnapshot { @@ -221,7 +221,7 @@ impl Api { file_idx: usize, ) -> Result<&'static str> { let handle = self.mgr_handle(idx)?; - let info = &handle.info().info; + let info = &handle.shared().info; torrent_file_mime_type(info, file_idx) } @@ -361,7 +361,7 @@ impl Api { AddTorrentResponse::Added(id, handle) => { let details = make_torrent_details( &handle.info_hash(), - &handle.info().info, + &handle.shared().info, handle.only_files().as_deref(), ) .context("error making torrent details")?; @@ -370,7 +370,7 @@ impl Api { details, seen_peers: None, output_folder: handle - .info() + .shared() .options .output_folder .to_string_lossy() diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index f72b95b..fd672d0 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -142,7 +142,7 @@ impl HttpApi { fn torrent_playlist_items(handle: &ManagedTorrent) -> Result> { let mut playlist_items = handle - .info() + .shared() .info .iter_filenames_and_lengths()? .enumerate() @@ -216,8 +216,8 @@ impl HttpApi { .await?; let (info, content) = match added { crate::AddTorrentResponse::AlreadyManaged(_, handle) => ( - handle.info().info.clone(), - handle.info().torrent_bytes.clone(), + handle.shared().info.clone(), + handle.shared().torrent_bytes.clone(), ), crate::AddTorrentResponse::ListOnly(ListOnlyResponse { info, diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 4dda659..6b67b4e 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -77,7 +77,7 @@ pub use session::{ }; pub use spawn_utils::spawn as librqbit_spawn; pub use torrent_state::{ - ManagedTorrent, ManagedTorrentInfo, ManagedTorrentState, TorrentStats, TorrentStatsState, + ManagedTorrent, ManagedTorrentShared, ManagedTorrentState, TorrentStats, TorrentStatsState, }; pub use type_aliases::FileInfos; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index a9263d0..377a744 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -28,7 +28,7 @@ use crate::{ ManagedTorrentOptions, ManagedTorrentState, TorrentStateLive, }, type_aliases::{DiskWorkQueueSender, PeerStream}, - ManagedTorrent, ManagedTorrentInfo, + ManagedTorrent, ManagedTorrentShared, }; use anyhow::{bail, Context}; use bencode::bencode_serialize_to_writer; @@ -1116,7 +1116,7 @@ impl Session { let span = error_span!(parent: self.rs(), "torrent", id); let peer_opts = self.merge_peer_opts(opts.peer_opts); - let minfo = Arc::new(ManagedTorrentInfo { + let minfo = Arc::new(ManagedTorrentShared { id, span, file_infos, @@ -1151,7 +1151,7 @@ impl Session { only_files, }), state_change_notify: Notify::new(), - info: minfo, + shared: minfo, session: Arc::downgrade(self), }); @@ -1181,7 +1181,7 @@ impl Session { ); { - let span = managed_torrent.info.span.clone(); + let span = managed_torrent.shared.span.clone(); let _ = span.enter(); managed_torrent @@ -1257,18 +1257,18 @@ impl Session { _ => None, }) .map(Ok) - .unwrap_or_else(|| removed.info.storage_factory.create(removed.info())); + .unwrap_or_else(|| removed.shared.storage_factory.create(removed.shared())); match (storage, delete_files) { (Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"), (Ok(storage), true) => { debug!("will delete files"); - remove_files_and_dirs(removed.info(), &storage); - if removed.info().options.output_folder != self.output_folder { + remove_files_and_dirs(removed.shared(), &storage); + if removed.shared().options.output_folder != self.output_folder { if let Err(e) = storage.remove_directory_if_empty(Path::new("")) { warn!( "error removing {:?}: {e:?}", - removed.info().options.output_folder + removed.shared().options.output_folder ) } } @@ -1331,9 +1331,9 @@ impl Session { pub async fn unpause(self: &Arc, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { let peer_rx = self.make_peer_rx( handle.info_hash(), - handle.info().trackers.clone().into_iter().collect(), + handle.shared().trackers.clone().into_iter().collect(), self.tcp_listen_port, - handle.info().options.force_tracker_interval, + handle.shared().options.force_tracker_interval, )?; handle.start(peer_rx, false)?; self.try_update_persistence_metadata(handle).await; @@ -1355,7 +1355,7 @@ impl Session { } } -fn remove_files_and_dirs(info: &ManagedTorrentInfo, files: &dyn TorrentStorage) { +fn remove_files_and_dirs(info: &ManagedTorrentShared, files: &dyn TorrentStorage) { let mut all_dirs = HashSet::new(); for (id, fi) in info.file_infos.iter().enumerate() { let mut fname = &*fi.relative_filename; diff --git a/crates/librqbit/src/session_persistence/json.rs b/crates/librqbit/src/session_persistence/json.rs index b8cc7dc..bd2d888 100644 --- a/crates/librqbit/src/session_persistence/json.rs +++ b/crates/librqbit/src/session_persistence/json.rs @@ -128,7 +128,7 @@ impl JsonSessionPersistenceStore { write_torrent_file: bool, ) -> anyhow::Result<()> { if !torrent - .info + .shared .storage_factory .is_type_id(TypeId::of::()) { @@ -137,7 +137,7 @@ impl JsonSessionPersistenceStore { let st = SerializedTorrent { trackers: torrent - .info() + .shared() .trackers .iter() .map(|u| u.to_string()) @@ -147,10 +147,10 @@ impl JsonSessionPersistenceStore { torrent_bytes: Default::default(), only_files: torrent.only_files().clone(), is_paused: torrent.with_state(|s| matches!(s, ManagedTorrentState::Paused(_))), - output_folder: torrent.info().options.output_folder.clone(), + output_folder: torrent.shared().options.output_folder.clone(), }; - if write_torrent_file && !torrent.info().torrent_bytes.is_empty() { + if write_torrent_file && !torrent.shared().torrent_bytes.is_empty() { let torrent_bytes_file = self.torrent_bytes_filename(&torrent.info_hash()); match tokio::fs::OpenOptions::new() .create(true) @@ -160,7 +160,7 @@ impl JsonSessionPersistenceStore { .await { Ok(mut f) => { - if let Err(e) = f.write_all(&torrent.info().torrent_bytes).await { + if let Err(e) = f.write_all(&torrent.shared().torrent_bytes).await { warn!(error=?e, file=?torrent_bytes_file, "error writing torrent bytes") } } diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index 98dd6f3..10d593c 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -96,7 +96,7 @@ impl SessionPersistenceStore for PostgresSessionStorage { } async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> { - let torrent_bytes: &[u8] = &torrent.info().torrent_bytes; + let torrent_bytes: &[u8] = &torrent.shared().torrent_bytes; let q = "INSERT INTO torrents (id, info_hash, torrent_bytes, trackers, output_folder, only_files, is_paused) VALUES($1, $2, $3, $4, $5, $6, $7) ON CONFLICT(id) DO NOTHING"; @@ -104,10 +104,17 @@ impl SessionPersistenceStore for PostgresSessionStorage { .bind::(id.try_into()?) .bind(&torrent.info_hash().0[..]) .bind(torrent_bytes) - .bind(torrent.info().trackers.iter().cloned().collect::>()) .bind( torrent - .info() + .shared() + .trackers + .iter() + .cloned() + .collect::>(), + ) + .bind( + torrent + .shared() .options .output_folder .to_str() diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index 651d2fc..74ac596 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Context; use tracing::warn; -use crate::{storage::StorageFactoryExt, torrent_state::ManagedTorrentInfo}; +use crate::{storage::StorageFactoryExt, torrent_state::ManagedTorrentShared}; use crate::storage::{StorageFactory, TorrentStorage}; @@ -18,7 +18,7 @@ pub struct FilesystemStorageFactory {} impl StorageFactory for FilesystemStorageFactory { type Storage = FilesystemStorage; - fn create(&self, meta: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result { Ok(FilesystemStorage { output_folder: meta.options.output_folder.clone(), opened_files: Default::default(), @@ -149,7 +149,7 @@ impl TorrentStorage for FilesystemStorage { } } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { let mut files = Vec::::new(); for file_details in meta.info.iter_file_details(&meta.lengths)? { let mut full_path = self.output_folder.clone(); diff --git a/crates/librqbit/src/storage/filesystem/mmap.rs b/crates/librqbit/src/storage/filesystem/mmap.rs index 1e68b24..5b92657 100644 --- a/crates/librqbit/src/storage/filesystem/mmap.rs +++ b/crates/librqbit/src/storage/filesystem/mmap.rs @@ -4,7 +4,7 @@ use anyhow::Context; use memmap2::{MmapMut, MmapOptions}; use parking_lot::RwLock; -use crate::torrent_state::ManagedTorrentInfo; +use crate::torrent_state::ManagedTorrentShared; use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; @@ -22,7 +22,7 @@ fn dummy_mmap() -> anyhow::Result { impl StorageFactory for MmapFilesystemStorageFactory { type Storage = MmapFilesystemStorage; - fn create(&self, meta: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result { let fs_storage = FilesystemStorageFactory::default().create(meta)?; Ok(MmapFilesystemStorage { @@ -97,7 +97,7 @@ impl TorrentStorage for MmapFilesystemStorage { })) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.fs.init(meta)?; let mut mmaps = Vec::new(); for (idx, file) in self.fs.opened_files.iter().enumerate() { diff --git a/crates/librqbit/src/storage/middleware/slow.rs b/crates/librqbit/src/storage/middleware/slow.rs index fba5160..5dbdf8a 100644 --- a/crates/librqbit/src/storage/middleware/slow.rs +++ b/crates/librqbit/src/storage/middleware/slow.rs @@ -16,7 +16,7 @@ use parking_lot::Mutex; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, - ManagedTorrentInfo, + ManagedTorrentShared, }; #[derive(Clone)] @@ -35,7 +35,7 @@ impl SlowStorageFactory { impl StorageFactory for SlowStorageFactory { type Storage = SlowStorage; - fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { Ok(SlowStorage { underlying: self.underlying_factory.create(info)?, pwrite_all_bufread: Mutex::new(Box::new( @@ -116,7 +116,7 @@ impl TorrentStorage for SlowStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.underlying.init(meta) } } diff --git a/crates/librqbit/src/storage/middleware/timing.rs b/crates/librqbit/src/storage/middleware/timing.rs index 7079db2..42c3382 100644 --- a/crates/librqbit/src/storage/middleware/timing.rs +++ b/crates/librqbit/src/storage/middleware/timing.rs @@ -4,7 +4,7 @@ A storage middleware that logs the time underlying storage operations took. use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, - ManagedTorrentInfo, + ManagedTorrentShared, }; #[derive(Clone)] @@ -25,7 +25,7 @@ impl TimingStorageFactory { impl StorageFactory for TimingStorageFactory { type Storage = TimingStorage; - fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { Ok(TimingStorage { name: self.name.clone(), underlying: self.underlying_factory.create(info)?, @@ -104,7 +104,7 @@ impl TorrentStorage for TimingStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.underlying.init(meta) } } diff --git a/crates/librqbit/src/storage/middleware/write_through_cache.rs b/crates/librqbit/src/storage/middleware/write_through_cache.rs index 7b979a6..59e8cb0 100644 --- a/crates/librqbit/src/storage/middleware/write_through_cache.rs +++ b/crates/librqbit/src/storage/middleware/write_through_cache.rs @@ -14,7 +14,7 @@ use parking_lot::RwLock; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, - FileInfos, ManagedTorrentInfo, + FileInfos, ManagedTorrentShared, }; #[derive(Clone, Copy)] @@ -35,7 +35,7 @@ impl WriteThroughCacheStorageFactory { impl StorageFactory for WriteThroughCacheStorageFactory { type Storage = WriteThroughCacheStorage; - fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result { let pieces = self .max_cache_bytes .div_ceil(info.lengths.default_piece_length() as u64) @@ -121,7 +121,7 @@ impl TorrentStorage for WriteThroughCacheStorage { self.underlying.remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { self.underlying.init(meta) } } diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index 14632cd..ff710d0 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -11,13 +11,13 @@ use std::{ path::Path, }; -use crate::torrent_state::ManagedTorrentInfo; +use crate::torrent_state::ManagedTorrentShared; pub trait StorageFactory: Send + Sync + Any { type Storage: TorrentStorage; - fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result; - fn create_and_init(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result; + fn create_and_init(&self, info: &ManagedTorrentShared) -> anyhow::Result { let mut storage = self.create(info)?; storage.init(info)?; Ok(storage) @@ -44,7 +44,7 @@ impl StorageFactoryExt for SF { impl StorageFactory for Wrapper { type Storage = Box; - fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result { let s = self.sf.create(info)?; Ok(Box::new(s)) } @@ -65,7 +65,7 @@ impl StorageFactoryExt for SF { impl StorageFactory for Box { type Storage = U::Storage; - fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result { (**self).create(info) } @@ -76,7 +76,7 @@ impl StorageFactory for Box { pub trait TorrentStorage: Send + Sync { // Create/open files etc. - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()>; + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()>; /// Given a file_id (which you can get more info from in init_storage() through torrent info) /// read buf.len() bytes into buf at offset. @@ -124,7 +124,7 @@ impl TorrentStorage for Box { (**self).remove_directory_if_empty(path) } - fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { (**self).init(meta) } } diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 5f7fdb8..0c32b4a 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -19,11 +19,11 @@ use crate::{ FileInfos, }; -use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; +use super::{paused::TorrentStatePaused, ManagedTorrentShared}; pub struct TorrentStateInitializing { pub(crate) files: FileStorage, - pub(crate) meta: Arc, + pub(crate) meta: Arc, pub(crate) only_files: Option>, pub(crate) checked_bytes: AtomicU64, } @@ -48,7 +48,7 @@ fn compute_selected_pieces( impl TorrentStateInitializing { pub fn new( - meta: Arc, + meta: Arc, only_files: Option>, files: FileStorage, ) -> Self { diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index e9da863..ee2245c 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -106,7 +106,7 @@ use super::{ paused::TorrentStatePaused, streaming::TorrentStreams, utils::{timeit, TimedExistence}, - ManagedTorrentInfo, + ManagedTorrentShared, }; #[derive(Debug)] @@ -180,7 +180,7 @@ const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024; pub struct TorrentStateLive { peers: PeerStates, - meta: Arc, + torrent: Arc, locked: RwLock, pub(crate) files: FileStorage, @@ -241,7 +241,7 @@ impl TorrentStateLive { let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128); let state = Arc::new(TorrentStateLive { - meta: paused.info.clone(), + torrent: paused.info.clone(), peers: PeerStates { session_stats: session_stats.clone(), stats: Default::default(), @@ -277,7 +277,7 @@ impl TorrentStateLive { }); state.spawn( - error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), + error_span!(parent: state.torrent.span.clone(), "speed_estimator_updater"), { let state = Arc::downgrade(&state); async move { @@ -303,7 +303,7 @@ impl TorrentStateLive { ); state.spawn( - error_span!(parent: state.meta.span.clone(), "peer_adder"), + error_span!(parent: state.torrent.span.clone(), "peer_adder"), state.clone().task_peer_adder(peer_queue_rx), ); Ok(state) @@ -330,7 +330,7 @@ impl TorrentStateLive { } fn disk_work_tx(&self) -> Option<&DiskWorkQueueSender> { - self.meta.options.disk_write_queue.as_ref() + self.torrent.options.disk_write_queue.as_ref() } pub(crate) fn add_incoming_peer( @@ -378,7 +378,7 @@ impl TorrentStateLive { self.spawn( error_span!( - parent: self.meta.span.clone(), + parent: self.torrent.span.clone(), "manage_incoming_peer", addr = %checked_peer.addr ), @@ -410,18 +410,18 @@ impl TorrentStateLive { first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { - connect_timeout: self.meta.options.peer_connect_timeout, - read_write_timeout: self.meta.options.peer_read_write_timeout, + connect_timeout: self.torrent.options.peer_connect_timeout, + read_write_timeout: self.torrent.options.peer_read_write_timeout, ..Default::default() }; let peer_connection = PeerConnection::new( checked_peer.addr, - self.meta.info_hash, - self.meta.peer_id, + self.torrent.info_hash, + self.torrent.peer_id, &handler, Some(options), - self.meta.spawner, - self.meta.connector.clone(), + self.torrent.spawner, + self.torrent.connector.clone(), ); let requester = handler.task_peer_chunk_requester(); @@ -474,18 +474,18 @@ impl TorrentStateLive { first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { - connect_timeout: state.meta.options.peer_connect_timeout, - read_write_timeout: state.meta.options.peer_read_write_timeout, + connect_timeout: state.torrent.options.peer_connect_timeout, + read_write_timeout: state.torrent.options.peer_read_write_timeout, ..Default::default() }; let peer_connection = PeerConnection::new( addr, - state.meta.info_hash, - state.meta.peer_id, + state.torrent.info_hash, + state.torrent.peer_id, &handler, Some(options), - state.meta.spawner, - state.meta.connector.clone(), + state.torrent.spawner, + state.torrent.connector.clone(), ); let requester = aframe!(handler .task_peer_chunk_requester() @@ -532,30 +532,30 @@ impl TorrentStateLive { let permit = state.peer_semaphore.clone().acquire_owned().await?; state.spawn( - error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()), + error_span!(parent: state.torrent.span.clone(), "manage_peer", peer = addr.to_string()), aframe!(state.clone().task_manage_outgoing_peer(addr, permit)), ); } } - pub fn meta(&self) -> &ManagedTorrentInfo { - &self.meta + pub fn torrent(&self) -> &ManagedTorrentShared { + &self.torrent } pub fn info(&self) -> &TorrentMetaV1Info { - &self.meta.info + &self.torrent.info } pub fn info_hash(&self) -> Id20 { - self.meta.info_hash + self.torrent.info_hash } pub fn peer_id(&self) -> Id20 { - self.meta.peer_id + self.torrent.peer_id } pub(crate) fn file_ops(&self) -> FileOps<'_> { FileOps::new( - &self.meta.info, + &self.torrent.info, &*self.files, - &self.meta().file_infos, + &self.torrent().file_infos, &self.lengths, ) } @@ -664,7 +664,7 @@ impl TorrentStateLive { // g.chunks; Ok(TorrentStatePaused { - info: self.meta.clone(), + info: self.torrent.clone(), files: self.files.take()?, chunk_tracker, streams: self.streams.clone(), @@ -687,7 +687,8 @@ impl TorrentStateLive { pub(crate) fn update_only_files(&self, only_files: &HashSet) -> anyhow::Result<()> { let mut g = self.lock_write("update_only_files"); let ct = g.get_chunks_mut()?; - let hns = ct.update_only_files(self.meta().file_infos.iter().map(|f| f.len), only_files)?; + let hns = + ct.update_only_files(self.torrent().file_infos.iter().map(|f| f.len), only_files)?; if !hns.finished() { self.reconnect_all_not_needed_peers(); } @@ -706,7 +707,7 @@ impl TorrentStateLive { }; self.streams .streamed_file_ids() - .any(|file_id| !chunks.is_file_finished(&self.meta.file_infos[file_id])) + .any(|file_id| !chunks.is_file_finished(&self.torrent.file_infos[file_id])) } // We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite @@ -725,7 +726,7 @@ impl TorrentStateLive { // if we have all the pieces of the file, reopen it read only for (idx, file_info) in self - .meta() + .torrent() .file_infos .iter() .enumerate() @@ -736,9 +737,9 @@ impl TorrentStateLive { } self.streams - .wake_streams_on_piece_completed(id, &self.meta.lengths); + .wake_streams_on_piece_completed(id, &self.torrent.lengths); - g.unflushed_bitv_bytes += self.meta.lengths.piece_length(id) as u64; + g.unflushed_bitv_bytes += self.torrent.lengths.piece_length(id) as u64; if g.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES { g.try_flush_bitv() } @@ -930,7 +931,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { &self, handshake: &mut ExtendedHandshake, ) -> anyhow::Result<()> { - let info_bytes = &self.state.meta().info_bytes; + let info_bytes = &self.state.torrent().info_bytes; if !info_bytes.is_empty() { if let Ok(len) = info_bytes.len().try_into() { handshake.metadata_size = Some(len); @@ -1010,7 +1011,7 @@ impl PeerHandler { if let Some(dur) = backoff { self.state.clone().spawn( error_span!( - parent: self.state.meta.span.clone(), + parent: self.state.torrent.span.clone(), "wait_for_peer", peer = handle.to_string(), duration = format!("{dur:?}") @@ -1069,7 +1070,7 @@ impl PeerHandler { && !g.inflight_pieces.contains_key(pid) }); let natural_order_pieces = chunk_tracker - .iter_queued_pieces(&g.file_priorities, &self.state.meta().file_infos); + .iter_queued_pieces(&g.file_priorities, &self.state.torrent().file_infos); for n in priority_streamed_pieces.chain(natural_order_pieces) { if bf.get(n.get() as usize).map(|v| *v) == Some(true) { n_opt = Some(n); @@ -1612,7 +1613,7 @@ impl PeerHandler { dtx.send(Box::new(work)).await?; } else { self.state - .meta + .torrent .spawner .spawn_block_in_place(|| { write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info) @@ -1624,7 +1625,7 @@ impl PeerHandler { } fn send_metadata_piece(&self, piece_id: u32) -> anyhow::Result<()> { - let data = &self.state.meta().info_bytes; + let data = &self.state.torrent().info_bytes; let metadata_size = data.len(); if metadata_size == 0 { anyhow::bail!("peer requested for info metadata but we don't have it") diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 9da473b..3de4956 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -99,7 +99,12 @@ pub(crate) struct ManagedTorrentOptions { pub disk_write_queue: Option, } -pub struct ManagedTorrentInfo { +/// Common information about torrent shared among all possible states. +/// +// The reason it's not inlined into ManagedTorrent is to break the Arc cycle: +// ManagedTorrent contains the current torrent state, which in turn needs access to a bunch +// of stuff, but it shouldn't access the state. +pub struct ManagedTorrentShared { pub id: TorrentId, pub info: TorrentMetaV1Info, pub torrent_bytes: Bytes, @@ -117,7 +122,7 @@ pub struct ManagedTorrentInfo { } pub struct ManagedTorrent { - pub info: Arc, + pub shared: Arc, pub(crate) session: Weak, pub(crate) state_change_notify: Notify, pub(crate) locked: RwLock, @@ -125,19 +130,19 @@ pub struct ManagedTorrent { impl ManagedTorrent { pub fn id(&self) -> TorrentId { - self.info.id + self.shared.id } - pub fn info(&self) -> &ManagedTorrentInfo { - &self.info + pub fn shared(&self) -> &ManagedTorrentShared { + &self.shared } pub fn get_total_bytes(&self) -> u64 { - self.info.lengths.total_length() + self.shared.lengths.total_length() } pub fn info_hash(&self) -> Id20 { - self.info.info_hash + self.shared.info_hash } pub fn only_files(&self) -> Option> { @@ -217,7 +222,7 @@ impl ManagedTorrent { |state: &Arc, rx: tokio::sync::oneshot::Receiver, token: CancellationToken| { - let span = state.info.span.clone(); + let span = state.shared.span.clone(); let state = Arc::downgrade(state); spawn_with_cancel( error_span!(parent: span, "fatal_errors_receiver"), @@ -239,7 +244,7 @@ impl ManagedTorrent { fn spawn_peer_adder(live: &Arc, peer_rx: Option) { live.spawn( - error_span!(parent: live.meta().span.clone(), "external_peer_adder"), + error_span!(parent: live.torrent().span.clone(), "external_peer_adder"), { let live = live.clone(); async move { @@ -290,7 +295,7 @@ impl ManagedTorrent { let init = init.clone(); drop(g); let t = self.clone(); - let span = self.info().span.clone(); + let span = self.shared().span.clone(); let token = session.cancellation_token().child_token().clone(); spawn_with_cancel( @@ -365,9 +370,9 @@ impl ManagedTorrent { } ManagedTorrentState::Error(_) => { let initializing = Arc::new(TorrentStateInitializing::new( - self.info.clone(), + self.shared.clone(), g.only_files.clone(), - self.info.storage_factory.create_and_init(self.info())?, + self.shared.storage_factory.create_and_init(self.shared())?, )); g.state = ManagedTorrentState::Initializing(initializing.clone()); drop(g); @@ -412,7 +417,7 @@ impl ManagedTorrent { pub fn stats(&self) -> TorrentStats { use stats::TorrentStatsState as S; let mut resp = TorrentStats { - total_bytes: self.info().lengths.total_length(), + total_bytes: self.shared().lengths.total_length(), file_progress: Vec::new(), state: S::Error, error: None, @@ -513,7 +518,7 @@ impl ManagedTorrent { // Returns true if needed to unpause torrent. // This is just implementation detail - it's easier to pause/unpause than to tinker with internals. pub(crate) fn update_only_files(&self, only_files: &HashSet) -> anyhow::Result<()> { - let file_count = self.info().info.iter_file_lengths()?.count(); + let file_count = self.shared().info.iter_file_lengths()?.count(); for f in only_files.iter().copied() { if f >= file_count { anyhow::bail!("only_files contains invalid value {f}") diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index 805b8d5..b92304c 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -5,10 +5,10 @@ use crate::{ type_aliases::FileStorage, }; -use super::{streaming::TorrentStreams, ManagedTorrentInfo}; +use super::{streaming::TorrentStreams, ManagedTorrentShared}; pub struct TorrentStatePaused { - pub(crate) info: Arc, + pub(crate) info: Arc, pub(crate) files: FileStorage, pub(crate) chunk_tracker: ChunkTracker, pub(crate) streams: Arc, diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index cd711a2..deadf3b 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -179,7 +179,7 @@ impl AsyncRead for FileStream { let current = poll_try_io!(self .torrent - .info() + .shared() .lengths .compute_current_piece(self.position, self.file_torrent_abs_offset) .context("invalid position")); @@ -280,7 +280,7 @@ impl ManagedTorrent { s => anyhow::bail!("with_storage_and_file: invalid state: {}", s.name()), }; let fi = self - .info() + .shared() .file_infos .get(file_id) .context("invalid file")?; @@ -311,7 +311,7 @@ impl ManagedTorrent { fn is_file_finished(&self, file_id: usize) -> bool { // TODO: would be nice to remove locking - self.with_chunk_tracker(|ct| ct.is_file_finished(&self.info.file_infos[file_id])) + self.with_chunk_tracker(|ct| ct.is_file_finished(&self.shared.file_infos[file_id])) .unwrap_or(false) } From ad5f62b9b9d2a2fbf3c05d9ea3a115f110b366ce Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 16:15:15 +0100 Subject: [PATCH 06/13] Move session to shared state --- crates/librqbit/src/session.rs | 2 +- crates/librqbit/src/torrent_state/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 377a744..60491eb 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1138,6 +1138,7 @@ impl Session { disk_write_queue: self.disk_write_tx.clone(), }, connector: self.connector.clone(), + session: Arc::downgrade(self), }); let initializing = Arc::new(TorrentStateInitializing::new( @@ -1152,7 +1153,6 @@ impl Session { }), state_change_notify: Notify::new(), shared: minfo, - session: Arc::downgrade(self), }); g.add_torrent(handle.clone(), id); diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 3de4956..d602f5b 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -119,11 +119,11 @@ pub struct ManagedTorrentShared { pub(crate) options: ManagedTorrentOptions, pub(crate) connector: Arc, pub(crate) storage_factory: BoxStorageFactory, + pub(crate) session: Weak, } pub struct ManagedTorrent { pub shared: Arc, - pub(crate) session: Weak, pub(crate) state_change_notify: Notify, pub(crate) locked: RwLock, } @@ -213,6 +213,7 @@ impl ManagedTorrent { start_paused: bool, ) -> anyhow::Result<()> { let session = self + .shared .session .upgrade() .context("session is dead, cannot start torrent")?; From 3067ad21d5d00e9c949b4f74223175878209da6b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 16:20:46 +0100 Subject: [PATCH 07/13] Cleanups --- crates/librqbit/src/torrent_state/live/mod.rs | 12 ++++++++---- crates/librqbit/src/torrent_state/mod.rs | 19 +++++-------------- crates/librqbit_core/src/speed_estimator.rs | 6 ++++++ 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ee2245c..0c583cc 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -214,12 +214,16 @@ impl TorrentStateLive { paused: TorrentStatePaused, fatal_errors_tx: tokio::sync::oneshot::Sender, cancellation_token: CancellationToken, - session_stats: Arc, ) -> anyhow::Result> { let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); - - let down_speed_estimator = SpeedEstimator::new(5); - let up_speed_estimator = SpeedEstimator::new(5); + let session = paused + .info + .session + .upgrade() + .context("session is dead, cannot start torrent")?; + let session_stats = session.stats.atomic.clone(); + let down_speed_estimator = SpeedEstimator::default(); + let up_speed_estimator = SpeedEstimator::default(); let have_bytes = paused.chunk_tracker.get_hns().have_bytes; let lengths = *paused.chunk_tracker.get_lengths(); diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index d602f5b..0feaaf1 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -218,6 +218,7 @@ impl ManagedTorrent { .upgrade() .context("session is dead, cannot start torrent")?; let mut g = self.locked.write(); + let cancellation_token = session.cancellation_token().child_token(); let spawn_fatal_errors_receiver = |state: &Arc, @@ -297,7 +298,7 @@ impl ManagedTorrent { drop(g); let t = self.clone(); let span = self.shared().span.clone(); - let token = session.cancellation_token().child_token().clone(); + let token = cancellation_token.clone(); spawn_with_cancel( error_span!(parent: span.clone(), "initialize_and_start"), @@ -326,12 +327,7 @@ impl ManagedTorrent { } let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new( - paused, - tx, - session.cancellation_token().child_token(), - session.stats.atomic.clone(), - )?; + let live = TorrentStateLive::new(paused, tx, cancellation_token)?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); @@ -356,16 +352,11 @@ impl ManagedTorrent { ManagedTorrentState::Paused(_) => { let paused = g.state.take().assert_paused(); let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new( - paused, - tx, - session.cancellation_token().child_token().clone(), - session.stats.atomic.clone(), - )?; + let live = TorrentStateLive::new(paused, tx, cancellation_token.clone())?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); - spawn_fatal_errors_receiver(self, rx, session.cancellation_token().child_token()); + spawn_fatal_errors_receiver(self, rx, cancellation_token); spawn_peer_adder(&live, peer_rx); Ok(()) } diff --git a/crates/librqbit_core/src/speed_estimator.rs b/crates/librqbit_core/src/speed_estimator.rs index 6fbb1c7..953d3b1 100644 --- a/crates/librqbit_core/src/speed_estimator.rs +++ b/crates/librqbit_core/src/speed_estimator.rs @@ -19,6 +19,12 @@ pub struct SpeedEstimator { time_remaining_millis: AtomicU64, } +impl Default for SpeedEstimator { + fn default() -> Self { + Self::new(5) + } +} + impl SpeedEstimator { pub fn new(window_seconds: usize) -> Self { assert!(window_seconds > 1); From 2ad5fa2f12a65902c5d50692d7ecff6ba409c0fc Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 16:47:48 +0100 Subject: [PATCH 08/13] E2E drop check for memory leaks --- crates/librqbit/src/tests/e2e.rs | 24 ++++++++++++----- crates/librqbit/src/tests/test_util.rs | 36 +++++++++++++++++++++++++- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index ee6f5db..00fcefe 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -16,7 +16,7 @@ use crate::{ create_torrent, tests::test_util::{ create_default_random_dir_with_torrents, setup_test_logging, spawn_debug_server, - TestPeerMetadata, + DropChecks, TestPeerMetadata, }, AddTorrentOptions, AddTorrentResponse, Session, SessionOptions, SessionPersistenceConfig, }; @@ -28,13 +28,20 @@ async fn test_e2e_download() { .and_then(|v| v.parse().ok()) .unwrap_or(180); - tokio::time::timeout(Duration::from_secs(timeout), _test_e2e_download()) - .await - .context("test_e2e_download timed out") - .unwrap() + let drop_checks = DropChecks::default(); + tokio::time::timeout( + Duration::from_secs(timeout), + _test_e2e_download(&drop_checks), + ) + .await + .context("test_e2e_download timed out") + .unwrap(); + + // Wait to ensure everything is dropped. + tokio::time::sleep(Duration::from_secs(1)).await; } -async fn _test_e2e_download() { +async fn _test_e2e_download(drop_checks: &DropChecks) { setup_test_logging(); match crate::try_increase_nofile_limit() { Ok(limit) => info!(limit, "increased ulimit"), @@ -75,6 +82,7 @@ async fn _test_e2e_download() { for i in 0..num_servers { let torrent_file_bytes = torrent_file_bytes.clone(); let tempdir = tempdir.path().to_owned(); + let drop_checks = drop_checks.clone(); let fut = spawn( async move { let peer_id = TestPeerMetadata { @@ -104,6 +112,8 @@ async fn _test_e2e_download() { .await .context("error starting session")?; + drop_checks.add(&session, format!("server session {i}")); + info!("started session"); let handle = session @@ -130,6 +140,7 @@ async fn _test_e2e_download() { if !l.is_finished() { bail!("torrent went live, but expected it to be finished"); } + drop_checks.add(l, format!("server {i} live")); Ok(true) } crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"), @@ -201,6 +212,7 @@ async fn _test_e2e_download() { ) .await .unwrap(); + drop_checks.add(&session, "client session"); info!("started client session"); diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index c785f8f..5aca403 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -1,8 +1,13 @@ -use std::{io::Write, path::Path}; +use std::{ + io::Write, + path::Path, + sync::{Arc, Weak}, +}; use anyhow::Context; use axum::{response::IntoResponse, routing::get, Router}; use librqbit_core::Id20; +use parking_lot::RwLock; use rand::{thread_rng, Rng, RngCore, SeedableRng}; use tempfile::TempDir; use tracing::{debug, info}; @@ -124,3 +129,32 @@ async fn debug_server() -> anyhow::Result<()> { pub fn spawn_debug_server() { tokio::spawn(debug_server()); } + +pub trait DropPlaceholder: Send + Sync {} +impl DropPlaceholder for T {} + +struct DropCheck { + obj: Weak, + name: String, +} + +#[derive(Default, Clone)] +pub struct DropChecks(Arc>>); + +impl DropChecks { + pub fn add>(&self, obj: &Arc, name: S) { + let weak = Arc::downgrade(obj); + self.0.write().push(DropCheck { + obj: weak as Weak, + name: name.into(), + }) + } +} + +impl Drop for DropCheck { + fn drop(&mut self) { + if self.obj.upgrade().is_some() { + panic!("memory leak: {}", self.name); + } + } +} From a5abe977353e9c4a226a0d8dbc20f07bb78de8ae Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 16:56:06 +0100 Subject: [PATCH 09/13] Drop check better message --- crates/librqbit/src/tests/e2e.rs | 7 ++++++- crates/librqbit/src/tests/test_util.rs | 17 ++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 00fcefe..b43b4ac 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -38,7 +38,12 @@ async fn test_e2e_download() { .unwrap(); // Wait to ensure everything is dropped. - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(10)).await; + + let metrics = tokio::runtime::Handle::current().metrics(); + assert_eq!(metrics.num_alive_tasks(), 1); + + drop_checks.check().unwrap(); } async fn _test_e2e_download(drop_checks: &DropChecks) { diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index 5aca403..1c2e79a 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -149,12 +149,19 @@ impl DropChecks { name: name.into(), }) } -} -impl Drop for DropCheck { - fn drop(&mut self) { - if self.obj.upgrade().is_some() { - panic!("memory leak: {}", self.name); + pub fn check(&self) -> anyhow::Result<()> { + let mut still_running = Vec::new(); + for dc in self.0.read().iter() { + if dc.obj.upgrade().is_some() { + still_running.push(dc.name.clone()) + } } + if !still_running.is_empty() { + anyhow::bail!( + "still existing objects that were supposed to be dropped: {still_running:#?}" + ) + } + Ok(()) } } From d5ddf4d2947c9afefcdf3627f4aced4d7005efe0 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 17:25:00 +0100 Subject: [PATCH 10/13] Session is now properly cleaned up --- crates/librqbit/src/session.rs | 14 +++++++++----- crates/librqbit/src/session_stats/mod.rs | 4 +++- crates/librqbit/src/tests/e2e.rs | 13 +++++++++---- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 60491eb..06bbf3e 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -642,7 +642,7 @@ impl Session { if opts.enable_upnp_port_forwarding { session.spawn( error_span!(parent: session.rs(), "upnp_forward", port = listen_port), - session.clone().task_upnp_port_forwarder(listen_port), + Self::task_upnp_port_forwarder(listen_port), ); } } @@ -691,7 +691,7 @@ impl Session { } async fn check_incoming_connection( - &self, + self: Arc, addr: SocketAddr, mut stream: TcpStream, ) -> anyhow::Result<(Arc, CheckedIncomingConnection)> { @@ -744,6 +744,8 @@ impl Session { async fn task_tcp_listener(self: Arc, l: TcpListener) -> anyhow::Result<()> { let mut futs = FuturesUnordered::new(); + let session = Arc::downgrade(&self); + drop(self); loop { tokio::select! { @@ -751,13 +753,15 @@ impl Session { match r { Ok((stream, addr)) => { trace!("accepted connection from {addr}"); + let session = session.upgrade().context("session is dead")?; + let span = error_span!(parent: session.rs(), "incoming", addr=%addr); futs.push( - self.check_incoming_connection(addr, stream) + session.check_incoming_connection(addr, stream) .map_err(|e| { debug!("error checking incoming connection: {e:#}"); e }) - .instrument(error_span!(parent: self.rs(), "incoming", addr=%addr)) + .instrument(span) ); } Err(e) => { @@ -775,7 +779,7 @@ impl Session { } } - async fn task_upnp_port_forwarder(self: Arc, port: u16) -> anyhow::Result<()> { + async fn task_upnp_port_forwarder(port: u16) -> anyhow::Result<()> { let pf = librqbit_upnp::UpnpPortForwarder::new(vec![port], None)?; pf.run_forever().await } diff --git a/crates/librqbit/src/session_stats/mod.rs b/crates/librqbit/src/session_stats/mod.rs index 8cf5eb0..2939694 100644 --- a/crates/librqbit/src/session_stats/mod.rs +++ b/crates/librqbit/src/session_stats/mod.rs @@ -3,6 +3,7 @@ use std::{ time::{Duration, Instant}, }; +use anyhow::Context; use atomic::AtomicSessionStats; use librqbit_core::speed_estimator::SpeedEstimator; use snapshot::SessionStatsSnapshot; @@ -40,12 +41,13 @@ impl Default for SessionStats { impl Session { pub(crate) fn start_speed_estimator_updater(self: &Arc) { self.spawn(error_span!(parent: self.rs(), "speed_estimator"), { - let s = self.clone(); + let s = Arc::downgrade(self); async move { let mut i = tokio::time::interval(Duration::from_secs(1)); loop { i.tick().await; + let s = s.upgrade().context("session is dead")?; let now = Instant::now(); let fetched = s.stats.atomic.fetched_bytes.load(Ordering::Relaxed); let uploaded = s.stats.atomic.uploaded_bytes.load(Ordering::Relaxed); diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index b43b4ac..d762d04 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -38,7 +38,7 @@ async fn test_e2e_download() { .unwrap(); // Wait to ensure everything is dropped. - tokio::time::sleep(Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(1)).await; let metrics = tokio::runtime::Handle::current().metrics(); assert_eq!(metrics.num_alive_tasks(), 1); @@ -157,12 +157,13 @@ async fn _test_e2e_download(drop_checks: &DropChecks) { } } info!("torrent is live"); - Ok::<_, anyhow::Error>(SocketAddr::new( + let addr = SocketAddr::new( std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), session .tcp_listen_port() .context("expected session.tcp_listen_port() to be set")?, - )) + ); + Ok::<_, anyhow::Error>((session, addr)) } .instrument(error_span!("server", id = i)), ); @@ -170,12 +171,15 @@ async fn _test_e2e_download(drop_checks: &DropChecks) { } let mut peers = Vec::new(); + + // This is around just not to drop. + let mut _servers = Vec::new(); for (id, peer) in futures::future::join_all(futs) .await .into_iter() .enumerate() { - let peer = peer + let (server, peer) = peer .with_context(|| format!("join error, server={id}")) .unwrap() .with_context(|| format!("timeout, server={id}")) @@ -183,6 +187,7 @@ async fn _test_e2e_download(drop_checks: &DropChecks) { .with_context(|| format!("server couldn't start, server={id}")) .unwrap(); peers.push(peer); + _servers.push(server); } info!("started all servers, starting client"); From 6bfb8f9e15f2e4d3fddf8208a0ea0776f65fae3e Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 17:39:21 +0100 Subject: [PATCH 11/13] E2E test: wait until i am the last task properly --- crates/librqbit/src/tests/e2e.rs | 11 ++++--- crates/librqbit/src/tests/test_util.rs | 41 +++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index d762d04..67b6fcb 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -16,7 +16,7 @@ use crate::{ create_torrent, tests::test_util::{ create_default_random_dir_with_torrents, setup_test_logging, spawn_debug_server, - DropChecks, TestPeerMetadata, + wait_until_i_am_the_last_task, DropChecks, TestPeerMetadata, }, AddTorrentOptions, AddTorrentResponse, Session, SessionOptions, SessionPersistenceConfig, }; @@ -38,10 +38,7 @@ async fn test_e2e_download() { .unwrap(); // Wait to ensure everything is dropped. - tokio::time::sleep(Duration::from_secs(1)).await; - - let metrics = tokio::runtime::Handle::current().metrics(); - assert_eq!(metrics.num_alive_tasks(), 1); + wait_until_i_am_the_last_task().await; drop_checks.check().unwrap(); } @@ -133,6 +130,9 @@ async fn _test_e2e_download(drop_checks: &DropChecks) { .await .context("error adding torrent")?; let h = handle.into_handle().context("into_handle()")?; + + drop_checks.add(&h.shared, format!("server {i} torrent shared handle")); + let mut interval = interval(Duration::from_millis(100)); info!("added torrent"); @@ -145,7 +145,6 @@ async fn _test_e2e_download(drop_checks: &DropChecks) { if !l.is_finished() { bail!("torrent went live, but expected it to be finished"); } - drop_checks.add(l, format!("server {i} live")); Ok(true) } crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"), diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index 1c2e79a..051d773 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -2,9 +2,10 @@ use std::{ io::Write, path::Path, sync::{Arc, Weak}, + time::Duration, }; -use anyhow::Context; +use anyhow::{bail, Context}; use axum::{response::IntoResponse, routing::get, Router}; use librqbit_core::Id20; use parking_lot::RwLock; @@ -165,3 +166,41 @@ impl DropChecks { Ok(()) } } + +pub async fn wait_until( + mut cond: impl FnMut() -> anyhow::Result<()>, + timeout: Duration, +) -> anyhow::Result<()> { + let mut interval = tokio::time::interval(Duration::from_millis(10)); + let mut last_err: Option = None; + let res = tokio::time::timeout(timeout, async { + loop { + interval.tick().await; + match cond() { + Ok(()) => return Ok::<_, anyhow::Error>(()), + Err(e) => last_err = Some(e), + } + } + }) + .await; + if res.is_err() { + bail!("wait_until timeout: last result = {last_err:?}") + } + Ok(()) +} + +pub async fn wait_until_i_am_the_last_task() { + let metrics = tokio::runtime::Handle::current().metrics(); + wait_until( + || { + let num_alive = metrics.num_alive_tasks(); + if num_alive != 1 { + bail!("metrics.num_alive_tasks() = {num_alive}, expected 1") + } + Ok(()) + }, + Duration::from_secs(5), + ) + .await + .unwrap(); +} From d1f6a57e5413f398bab3bfef3d9d1b5dba027cff Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 17:42:19 +0100 Subject: [PATCH 12/13] Remove bad log message --- crates/librqbit/src/session.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 06bbf3e..096e086 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -923,7 +923,6 @@ impl Session { peer_rx: Some(rx), initial_peers: { let seen = seen.into_iter().collect_vec(); - info!(count=seen.len(), "seen"); for peer in &seen { debug!(?peer, "seen") } From 3469dfce935c0b0a466e3e16909ba56daf91d6a8 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 17:51:55 +0100 Subject: [PATCH 13/13] Fix a new recent bug --- crates/librqbit/src/torrent_state/initializing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 0c32b4a..915139b 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -37,7 +37,7 @@ fn compute_selected_pieces( for (_, fi) in file_infos .iter() .enumerate() - .filter(|(id, _)| only_files.map(|of| of.contains(id)).unwrap_or(false)) + .filter(|(id, _)| only_files.map(|of| of.contains(id)).unwrap_or(true)) { if let Some(r) = bf.get_mut(fi.piece_range_usize()) { r.fill(true);