From 80f4d3b1b212675803c84deb51887574c3fe78b5 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 15:51:58 +0100 Subject: [PATCH] Remove ManagedTorrentBuilder --- crates/librqbit/src/session.rs | 116 ++++++++++------ crates/librqbit/src/torrent_state/mod.rs | 167 +---------------------- 2 files changed, 77 insertions(+), 206 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 670cd8f..eab7a5e 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -12,6 +12,7 @@ use crate::{ api::TorrentIdOrHash, bitv_factory::{BitVFactory, NonPersistentBitVFactory}, dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, + file_info::FileInfo, merge_streams::merge_streams, peer_connection::PeerConnectionOptions, read_buf::ReadBuf, @@ -23,10 +24,11 @@ use crate::{ }, stream_connect::{SocksProxyConfig, StreamConnector}, torrent_state::{ - ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, + initializing::TorrentStateInitializing, ManagedTorrentHandle, ManagedTorrentLocked, + ManagedTorrentOptions, ManagedTorrentState, TorrentStateLive, }, type_aliases::{DiskWorkQueueSender, PeerStream}, - ManagedTorrentInfo, + ManagedTorrent, ManagedTorrentInfo, }; use anyhow::{bail, Context}; use bencode::bencode_serialize_to_writer; @@ -43,6 +45,7 @@ use itertools::Itertools; use librqbit_core::{ constants::CHUNK_SIZE, directories::get_configuration_directory, + lengths::Lengths, magnet::Magnet, peer_id::generate_peer_id, spawn_utils::spawn_with_cancel, @@ -51,7 +54,10 @@ use librqbit_core::{ use parking_lot::RwLock; use peer_binary_protocol::Handshake; use serde::{Deserialize, Serialize}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::Notify, +}; use tokio_stream::StreamExt; use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span}; @@ -1023,7 +1029,7 @@ impl Session { } async fn main_torrent_info( - &self, + self: &Arc, add_res: InternalAddResult, mut opts: AddTorrentOptions, ) -> anyhow::Result { @@ -1084,43 +1090,6 @@ impl Session { .fetch_add(1, std::sync::atomic::Ordering::Relaxed) }; - let mut builder = ManagedTorrentBuilder::new( - id, - info, - info_hash, - torrent_bytes, - info_bytes, - output_folder, - storage_factory, - ); - builder - .allow_overwrite(opts.overwrite) - .spawner(self.spawner) - .trackers(trackers) - .connector(self.connector.clone()) - .peer_id(self.peer_id); - - if let Some(d) = self.disk_write_tx.clone() { - builder.disk_writer(d); - } - - if let Some(only_files) = only_files { - builder.only_files(only_files); - } - if let Some(interval) = opts.force_tracker_interval { - builder.force_tracker_interval(interval); - } - - let peer_opts = self.merge_peer_opts(opts.peer_opts); - - if let Some(t) = peer_opts.connect_timeout { - builder.peer_connect_timeout(t); - } - - if let Some(t) = peer_opts.read_write_timeout { - builder.peer_read_write_timeout(t); - } - let managed_torrent = { let mut g = self.db.write(); if let Some((id, handle)) = g.torrents.iter().find_map(|(eid, t)| { @@ -1132,13 +1101,70 @@ impl Session { }) { return Ok(AddTorrentResponse::AlreadyManaged(id, handle)); } - let managed_torrent = builder.build(error_span!(parent: self.rs(), "torrent", id))?; - g.add_torrent(managed_torrent.clone(), id); - managed_torrent + + let lengths = Lengths::from_torrent(&info)?; + let file_infos = info + .iter_file_details(&lengths)? + .map(|fd| { + Ok::<_, anyhow::Error>(FileInfo { + relative_filename: fd.filename.to_pathbuf()?, + offset_in_torrent: fd.offset, + piece_range: fd.pieces, + len: fd.len, + }) + }) + .collect::>>()?; + + let span = error_span!(parent: self.rs(), "torrent", id); + let peer_opts = self.merge_peer_opts(opts.peer_opts); + let minfo = Arc::new(ManagedTorrentInfo { + span, + file_infos, + info, + torrent_bytes, + info_bytes, + info_hash, + trackers: trackers.into_iter().collect(), + spawner: self.spawner, + peer_id: self.peer_id, + lengths, + options: ManagedTorrentOptions { + force_tracker_interval: opts.force_tracker_interval, + peer_connect_timeout: peer_opts.connect_timeout, + peer_read_write_timeout: peer_opts.read_write_timeout, + allow_overwrite: opts.overwrite, + output_folder, + disk_write_queue: self.disk_write_tx.clone(), + }, + connector: self.connector.clone(), + }); + + let initializing = Arc::new(TorrentStateInitializing::new( + minfo.clone(), + only_files.clone(), + storage_factory.create_and_init(&minfo)?, + )); + let handle = Arc::new(ManagedTorrent { + id, + locked: RwLock::new(ManagedTorrentLocked { + state: ManagedTorrentState::Initializing(initializing), + only_files, + }), + state_change_notify: Notify::new(), + storage_factory, + info: minfo, + session: Arc::downgrade(self), + }); + + g.add_torrent(handle.clone(), id); + handle }; if let Some(p) = self.persistence.as_ref() { - p.store(id, &managed_torrent).await?; + if let Err(e) = p.store(id, &managed_torrent).await { + self.db.write().torrents.remove(&id); + return Err(e); + } } // Merge "initial_peers" and "peer_rx" into one stream. diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 0b506ab..ac7b3b8 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -9,6 +9,7 @@ use std::collections::HashSet; use std::path::PathBuf; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::sync::Weak; use std::time::Duration; use anyhow::bail; @@ -19,7 +20,6 @@ use futures::future::BoxFuture; use futures::FutureExt; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::Lengths; -use librqbit_core::peer_id::generate_peer_id; use librqbit_core::spawn_utils::spawn_with_cancel; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; @@ -36,7 +36,6 @@ use tracing::warn; use crate::bitv_factory::BitVFactory; use crate::chunk_tracker::ChunkTracker; -use crate::file_info::FileInfo; use crate::session::TorrentId; use crate::session_stats::atomic::AtomicSessionStats; use crate::spawn_utils::BlockingSpawner; @@ -46,6 +45,7 @@ use crate::torrent_state::stats::LiveStats; use crate::type_aliases::DiskWorkQueueSender; use crate::type_aliases::FileInfos; use crate::type_aliases::PeerStream; +use crate::Session; use initializing::TorrentStateInitializing; @@ -87,7 +87,7 @@ impl ManagedTorrentState { } pub(crate) struct ManagedTorrentLocked { - pub state: ManagedTorrentState, + pub(crate) state: ManagedTorrentState, pub(crate) only_files: Option>, } @@ -122,8 +122,9 @@ pub struct ManagedTorrent { pub info: Arc, pub(crate) storage_factory: BoxStorageFactory, - state_change_notify: Notify, - locked: RwLock, + pub(crate) session: Weak, + pub(crate) state_change_notify: Notify, + pub(crate) locked: RwLock, } impl ManagedTorrent { @@ -550,160 +551,4 @@ impl ManagedTorrent { } } -pub(crate) struct ManagedTorrentBuilder { - id: TorrentId, - info: TorrentMetaV1Info, - output_folder: PathBuf, - info_hash: Id20, - torrent_bytes: Bytes, - info_bytes: Bytes, - force_tracker_interval: Option, - peer_connect_timeout: Option, - peer_read_write_timeout: Option, - only_files: Option>, - trackers: Vec, - peer_id: Option, - spawner: Option, - allow_overwrite: bool, - storage_factory: BoxStorageFactory, - disk_writer: Option, - connector: Arc, -} - -impl ManagedTorrentBuilder { - pub fn new( - id: usize, - info: TorrentMetaV1Info, - info_hash: Id20, - torrent_bytes: Bytes, - info_bytes: Bytes, - output_folder: PathBuf, - storage_factory: BoxStorageFactory, - ) -> Self { - Self { - id, - info, - info_hash, - torrent_bytes, - info_bytes, - spawner: None, - force_tracker_interval: None, - peer_connect_timeout: None, - peer_read_write_timeout: None, - only_files: None, - trackers: Default::default(), - peer_id: None, - allow_overwrite: false, - output_folder, - storage_factory, - disk_writer: None, - connector: Arc::new(Default::default()), - } - } - - pub fn only_files(&mut self, only_files: Vec) -> &mut Self { - self.only_files = Some(only_files); - self - } - - pub fn trackers(&mut self, trackers: Vec) -> &mut Self { - self.trackers = trackers; - self - } - - pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self { - self.force_tracker_interval = Some(force_tracker_interval); - self - } - - pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self { - self.spawner = Some(spawner); - self - } - - pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self { - self.peer_id = Some(peer_id); - self - } - - pub fn allow_overwrite(&mut self, value: bool) -> &mut Self { - self.allow_overwrite = value; - self - } - - pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self { - self.peer_connect_timeout = Some(timeout); - self - } - - pub fn peer_read_write_timeout(&mut self, timeout: Duration) -> &mut Self { - self.peer_read_write_timeout = Some(timeout); - self - } - - pub fn disk_writer(&mut self, value: DiskWorkQueueSender) -> &mut Self { - self.disk_writer = Some(value); - self - } - - pub fn connector(&mut self, value: Arc) -> &mut Self { - self.connector = value; - self - } - - pub fn build(self, span: tracing::Span) -> anyhow::Result { - let lengths = Lengths::from_torrent(&self.info)?; - let file_infos = self - .info - .iter_file_details(&lengths)? - .map(|fd| { - Ok::<_, anyhow::Error>(FileInfo { - relative_filename: fd.filename.to_pathbuf()?, - offset_in_torrent: fd.offset, - piece_range: fd.pieces, - len: fd.len, - }) - }) - .collect::>>()?; - - let info = Arc::new(ManagedTorrentInfo { - span, - file_infos, - info: self.info, - torrent_bytes: self.torrent_bytes, - info_bytes: self.info_bytes, - info_hash: self.info_hash, - trackers: self.trackers.into_iter().collect(), - spawner: self.spawner.unwrap_or_default(), - peer_id: self.peer_id.unwrap_or_else(generate_peer_id), - lengths, - options: ManagedTorrentOptions { - force_tracker_interval: self.force_tracker_interval, - peer_connect_timeout: self.peer_connect_timeout, - peer_read_write_timeout: self.peer_read_write_timeout, - allow_overwrite: self.allow_overwrite, - output_folder: self.output_folder, - disk_write_queue: self.disk_writer, - }, - connector: self.connector, - }); - - let initializing = Arc::new(TorrentStateInitializing::new( - info.clone(), - self.only_files.clone(), - self.storage_factory.create_and_init(&info)?, - )); - Ok(Arc::new(ManagedTorrent { - id: self.id, - locked: RwLock::new(ManagedTorrentLocked { - state: ManagedTorrentState::Initializing(initializing), - only_files: self.only_files, - }), - state_change_notify: Notify::new(), - storage_factory: self.storage_factory, - info, - })) - } -} - pub type ManagedTorrentHandle = Arc;