From 739666ff8899076cdffa0caf5068e9614f102fbd Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 24 Nov 2023 09:30:21 +0000 Subject: [PATCH] HUGE REFACTOR to suppor multiple states. Incomplete, broken --- crates/librqbit/src/http_api.rs | 55 +++--- crates/librqbit/src/lib.rs | 2 +- crates/librqbit/src/session.rs | 105 +++-------- crates/librqbit/src/torrent_manager.rs | 124 +------------ crates/librqbit/src/torrent_state/live/mod.rs | 11 ++ crates/librqbit/src/torrent_state/mod.rs | 172 +++++++++++++++++- crates/rqbit/src/main.rs | 17 +- 7 files changed, 242 insertions(+), 244 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 2249c4f..e86e31d 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,4 +1,4 @@ -use anyhow::Context; +use anyhow::{bail, Context}; use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; @@ -22,9 +22,9 @@ use crate::http_api_error::{ApiError, ApiErrorExt}; use crate::session::{ AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, }; -use crate::torrent_manager::TorrentManagerHandle; use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}; use crate::torrent_state::stats::snapshot::StatsSnapshot; +use crate::torrent_state::ManagedTorrentHandle; // Public API #[derive(Clone)] @@ -38,7 +38,7 @@ impl HttpApi { inner: Arc::new(ApiInternal::new(session)), } } - pub fn add_torrent_handle(&self, handle: TorrentManagerHandle) -> usize { + pub fn add_torrent_handle(&self, handle: ManagedTorrentHandle) -> usize { self.inner.add_torrent_handle(handle) } @@ -333,7 +333,7 @@ impl TorrentAddQueryParams { struct ApiInternal { dht: Option, startup_time: Instant, - torrent_managers: RwLock>, + torrent_managers: RwLock>, session: Arc, } @@ -349,14 +349,14 @@ impl ApiInternal { } } - fn add_torrent_handle(&self, handle: TorrentManagerHandle) -> usize { + fn add_torrent_handle(&self, handle: ManagedTorrentHandle) -> usize { let mut g = self.torrent_managers.write(); let idx = g.len(); g.push(handle); idx } - fn mgr_handle(&self, idx: usize) -> Result { + fn mgr_handle(&self, idx: usize) -> Result { self.torrent_managers .read() .get(idx) @@ -373,7 +373,7 @@ impl ApiInternal { .enumerate() .map(|(id, mgr)| TorrentListResponseItem { id, - info_hash: mgr.torrent_state().info_hash().as_string(), + info_hash: mgr.info().info_hash.as_string(), }) .collect(), } @@ -381,14 +381,17 @@ impl ApiInternal { fn api_torrent_details(&self, idx: usize) -> Result { let handle = self.mgr_handle(idx)?; - let info_hash = handle.torrent_state().info_hash(); + let info_hash = handle.info().info_hash; let only_files = handle.only_files(); - make_torrent_details(&info_hash, handle.torrent_state().info(), only_files) + make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref()) } fn api_peer_stats(&self, idx: usize, filter: PeerStatsFilter) -> Result { let handle = self.mgr_handle(idx)?; - Ok(handle.torrent_state().per_peer_stats_snapshot(filter)) + Ok(handle + .live() + .context("not live")? + .per_peer_stats_snapshot(filter)) } pub async fn api_add_torrent( @@ -406,8 +409,8 @@ impl ApiInternal { AddTorrentResponse::AlreadyManaged(managed) => { return Err(anyhow::anyhow!( "{:?} is already managed, downloaded to {:?}", - managed.info_hash, - managed.output_folder + managed.info_hash(), + &managed.info().out_dir )) .with_error_status_code(StatusCode::CONFLICT); } @@ -422,9 +425,9 @@ impl ApiInternal { }, AddTorrentResponse::Added(handle) => { let details = make_torrent_details( - &handle.torrent_state().info_hash(), - handle.torrent_state().info(), - handle.only_files(), + &handle.info_hash(), + &handle.info().info, + handle.only_files().as_deref(), ) .context("error making torrent details")?; let id = self.add_torrent_handle(handle); @@ -451,8 +454,9 @@ impl ApiInternal { fn api_stats(&self, idx: usize) -> Result { let mgr = self.mgr_handle(idx)?; - let snapshot = mgr.torrent_state().stats_snapshot(); - let estimator = mgr.speed_estimator(); + let live = mgr.live().context("not live")?; + let snapshot = live.stats_snapshot(); + let estimator = live.speed_estimator(); // Poor mans download speed computation let elapsed = self.startup_time.elapsed(); @@ -469,14 +473,15 @@ impl ApiInternal { } fn api_dump_haves(&self, idx: usize) -> Result { - let mgr = self.mgr_handle(idx)?; - Ok(format!( - "{:?}", - mgr.torrent_state() - .lock_read("api_dump_haves") - .chunks - .get_have_pieces(), - )) + Err(anyhow::anyhow!("not implemented").into()) + // let mgr = self.mgr_handle(idx)?; + // Ok(format!( + // "{:?}", + // mgr.live().conetext() + // .lock_read("api_dump_haves") + // .chunks + // .get_have_pieces(), + // )) } } diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 78d44aa..4be7c2d 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -8,7 +8,7 @@ pub mod peer_connection; pub mod peer_info_reader; pub mod session; pub mod spawn_utils; -pub mod torrent_manager; +// pub mod torrent_manager; pub mod torrent_state; pub mod tracker_comms; pub mod type_aliases; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 644cb81..ddc213d 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -17,43 +17,28 @@ use crate::{ dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, peer_connection::PeerConnectionOptions, spawn_utils::{spawn, BlockingSpawner}, - torrent_manager::{TorrentManagerBuilder, TorrentManagerHandle}, + torrent_state::{ManagedTorrent, ManagedTorrentBuilder, ManagedTorrentHandle}, }; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; -#[derive(Clone)] -pub enum ManagedTorrentState { - Initializing, - Running(TorrentManagerHandle), -} - -#[derive(Clone)] -pub struct ManagedTorrent { - pub info_hash: Id20, - pub output_folder: PathBuf, - pub state: ManagedTorrentState, -} - -impl PartialEq for ManagedTorrent { - fn eq(&self, other: &Self) -> bool { - self.info_hash == other.info_hash && self.output_folder == other.output_folder - } -} - #[derive(Default)] pub struct SessionLocked { - torrents: Vec, + torrents: Vec, } enum SessionLockedAddTorrentResult { - AlreadyManaged(ManagedTorrent), + AlreadyManaged(ManagedTorrentHandle), Added(usize), } impl SessionLocked { - fn add_torrent(&mut self, torrent: ManagedTorrent) -> SessionLockedAddTorrentResult { - if let Some(handle) = self.torrents.iter().find(|t| **t == torrent) { + fn add_torrent(&mut self, torrent: ManagedTorrentHandle) -> SessionLockedAddTorrentResult { + if let Some(handle) = self + .torrents + .iter() + .find(|t| t.info_hash() == torrent.info_hash()) + { return SessionLockedAddTorrentResult::AlreadyManaged(handle.clone()); } let idx = self.torrents.len(); @@ -124,9 +109,9 @@ pub struct ListOnlyResponse { } pub enum AddTorrentResponse { - AlreadyManaged(ManagedTorrent), + AlreadyManaged(ManagedTorrentHandle), ListOnly(ListOnlyResponse), - Added(TorrentManagerHandle), + Added(ManagedTorrentHandle), } pub fn read_local_file_including_stdin(filename: &str) -> anyhow::Result> { @@ -227,7 +212,7 @@ impl Session { } pub fn with_torrents(&self, callback: F) where - F: Fn(&[ManagedTorrent]), + F: Fn(&[ManagedTorrentHandle]), { callback(&self.locked.read().torrents) } @@ -404,24 +389,13 @@ impl Session { .unwrap_or_else(|| self.output_folder.clone()) .join(sub_folder); - let managed_torrent = ManagedTorrent { - info_hash, - output_folder: output_folder.clone(), - state: ManagedTorrentState::Initializing, - }; - - match self.locked.write().add_torrent(managed_torrent) { - SessionLockedAddTorrentResult::AlreadyManaged(managed) => { - return Ok(AddTorrentResponse::AlreadyManaged(managed)) - } - SessionLockedAddTorrentResult::Added(_) => {} - } - - let mut builder = TorrentManagerBuilder::new(info, info_hash, output_folder.clone()); + let mut builder = ManagedTorrentBuilder::new(info, info_hash, output_folder.clone()); builder .overwrite(opts.overwrite) .spawner(self.spawner) - .peer_id(self.peer_id); + .peer_id(self.peer_id) + .trackers(trackers); + if let Some(only_files) = only_files { builder.only_files(only_files); } @@ -437,51 +411,22 @@ impl Session { builder.peer_read_write_timeout(t); } - let handle = match builder - .start_manager() - .context("error starting torrent manager") - { - Ok(handle) => { - let mut g = self.locked.write(); - let m = g - .torrents - .iter_mut() - .find(|t| t.info_hash == info_hash && t.output_folder == output_folder) - .unwrap(); - m.state = ManagedTorrentState::Running(handle.clone()); - handle + let managed_torrent = builder.build(); + + match self.locked.write().add_torrent(managed_torrent.clone()) { + SessionLockedAddTorrentResult::AlreadyManaged(managed) => { + return Ok(AddTorrentResponse::AlreadyManaged(managed)) } - Err(error) => { - let mut g = self.locked.write(); - let idx = g - .torrents - .iter() - .position(|t| t.info_hash == info_hash && t.output_folder == output_folder) - .unwrap(); - g.torrents.remove(idx); - return Err(error); - } - }; - { - let mut g = self.locked.write(); - let m = g - .torrents - .iter_mut() - .find(|t| t.info_hash == info_hash && t.output_folder == output_folder) - .unwrap(); - m.state = ManagedTorrentState::Running(handle.clone()); + SessionLockedAddTorrentResult::Added(_) => {} } - for url in trackers { - handle.add_tracker(url); - } for peer in initial_peers { - handle.add_peer(peer); + managed_torrent.add_peer(peer); } if let Some(mut dht_peer_rx) = dht_peer_rx { spawn(span!(Level::INFO, "dht_peer_adder"), { - let handle = handle.clone(); + let handle = managed_torrent.clone(); async move { while let Some(peer) = dht_peer_rx.next().await { handle.add_peer(peer); @@ -492,6 +437,6 @@ impl Session { }); } - Ok(AddTorrentResponse::Added(handle)) + Ok(AddTorrentResponse::Added(managed_torrent)) } } diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 4be9c9c..16507af 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -24,128 +24,10 @@ use crate::{ chunk_tracker::ChunkTracker, file_ops::FileOps, spawn_utils::{spawn, BlockingSpawner}, - torrent_state::{TorrentStateLive, TorrentStateOptions}, + torrent_state::{ManagedTorrent, ManagedTorrentHandle, TorrentStateLive, TorrentStateOptions}, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, }; -#[derive(Default)] -struct TorrentManagerOptions { - force_tracker_interval: Option, - peer_connect_timeout: Option, - peer_read_write_timeout: Option, - only_files: Option>, - peer_id: Option, - overwrite: bool, -} - -pub struct TorrentManagerBuilder { - info: TorrentMetaV1Info, - info_hash: Id20, - output_folder: PathBuf, - options: TorrentManagerOptions, - spawner: Option, -} - -impl TorrentManagerBuilder { - pub fn new>( - info: TorrentMetaV1Info, - info_hash: Id20, - output_folder: P, - ) -> Self { - Self { - info, - info_hash, - output_folder: output_folder.as_ref().into(), - spawner: None, - options: TorrentManagerOptions::default(), - } - } - - pub fn only_files(&mut self, only_files: Vec) -> &mut Self { - self.options.only_files = Some(only_files); - self - } - - pub fn overwrite(&mut self, overwrite: bool) -> &mut Self { - self.options.overwrite = overwrite; - self - } - - pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self { - self.options.force_tracker_interval = Some(force_tracker_interval); - self - } - - pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self { - self.spawner = Some(spawner); - self - } - - pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self { - self.options.peer_id = Some(peer_id); - self - } - - pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self { - self.options.peer_connect_timeout = Some(timeout); - self - } - - pub fn peer_read_write_timeout(&mut self, timeout: Duration) -> &mut Self { - self.options.peer_read_write_timeout = Some(timeout); - self - } - - pub fn start_manager(self) -> anyhow::Result { - TorrentManager::start( - self.info, - self.info_hash, - self.output_folder, - self.spawner.unwrap_or_else(|| BlockingSpawner::new(true)), - Some(self.options), - ) - } -} - -#[derive(Clone)] -pub struct TorrentManagerHandle { - manager: Arc, -} - -impl TorrentManagerHandle { - pub fn add_tracker(&self, url: Url) -> bool { - let mgr = self.manager.clone(); - if mgr.trackers.lock().insert(url.clone()) { - spawn( - span!(Level::ERROR, "tracker_monitor", url = url.to_string()), - async move { mgr.single_tracker_monitor(url).await }, - ); - true - } else { - false - } - } - pub fn only_files(&self) -> Option<&[usize]> { - self.manager.options.only_files.as_deref() - } - pub fn add_peer(&self, addr: SocketAddr) -> bool { - self.manager.state.add_peer_if_not_seen(addr) - } - pub fn torrent_state(&self) -> &TorrentStateLive { - &self.manager.state - } - pub fn speed_estimator(&self) -> &Arc { - &self.manager.speed_estimator - } - pub async fn cancel(&self) -> anyhow::Result<()> { - todo!() - } - pub async fn wait_until_completed(&self) -> anyhow::Result<()> { - self.manager.state.wait_until_completed().await; - Ok(()) - } -} - struct TorrentManager { state: Arc, #[allow(dead_code)] @@ -171,8 +53,8 @@ impl TorrentManager { info_hash: Id20, out: P, spawner: BlockingSpawner, - options: Option, - ) -> anyhow::Result { + options: Option, + ) -> anyhow::Result { let options = options.unwrap_or_default(); let (files, filenames) = { let mut files = diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 6335f6f..08650a5 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -63,6 +63,7 @@ use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::{ id20::Id20, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, + speed_estimator::{self, SpeedEstimator}, torrent_metainfo::TorrentMetaV1Info, }; use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -144,6 +145,8 @@ pub struct TorrentStateLive { peer_queue_tx: UnboundedSender, finished_notify: Notify, + + speed_estimator: SpeedEstimator, } impl TorrentStateLive { @@ -163,6 +166,9 @@ impl TorrentStateLive { ) -> Arc { let options = options.unwrap_or_default(); let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); + + let speed_estimator = SpeedEstimator::new(5); + let state = Arc::new(TorrentStateLive { info_hash, info, @@ -186,6 +192,7 @@ impl TorrentStateLive { peer_semaphore: Semaphore::new(128), peer_queue_tx, finished_notify: Notify::new(), + speed_estimator, }); spawn( span!(Level::ERROR, "peer_adder"), @@ -194,6 +201,10 @@ impl TorrentStateLive { state } + pub fn speed_estimator(&self) -> &SpeedEstimator { + &self.speed_estimator + } + async fn task_manage_peer( self: Arc, addr: SocketAddr, diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 6b8347b..f566f51 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -2,36 +2,188 @@ pub mod utils; pub mod live; +use std::net::SocketAddr; +use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; +use std::{collections::HashSet, path::Path}; +use anyhow::Context; use buffers::ByteString; use librqbit_core::id20::Id20; +use librqbit_core::speed_estimator::SpeedEstimator; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; pub use live::*; use parking_lot::RwLock; use tokio::sync::mpsc::Sender; +use tracing::trace_span; use url::Url; -pub(crate) enum ManagedTorrentState { - Live { - state: TorrentStateLive, - only_files_tx: Sender>, - trackers_tx: Sender, - }, +use crate::spawn_utils::{spawn, BlockingSpawner}; + +pub struct TorrentStateInitializing {} + +#[derive(Default, Clone)] +pub enum ManagedTorrentState { + #[default] + Created, + + Initializing(Arc), + + // TODO: only_files_tx + // TODO: trackers_tx?? + Live(Arc), } pub(crate) struct ManagedTorrentLocked { - pub trackers: Vec, - pub only_files: Vec, + pub only_files: Option>, pub state: ManagedTorrentState, } pub struct ManagedTorrentInfo { pub info: TorrentMetaV1Info, pub info_hash: Id20, + pub out_dir: PathBuf, + pub spawner: BlockingSpawner, + pub trackers: Vec, + // pub options: Option, } -pub(crate) struct ManagedTorrent { +pub struct ManagedTorrent { pub info: Arc, - pub(crate) locked: RwLock, + locked: RwLock, } + +impl ManagedTorrent { + pub fn info(&self) -> &ManagedTorrentInfo { + &self.info + } + + pub fn info_hash(&self) -> Id20 { + self.info.info_hash + } + + pub(crate) fn add_peer(&self, peer: SocketAddr) -> bool { + todo!() + } + + pub fn only_files(&self) -> Option> { + self.locked.write().only_files.clone() + } + + pub fn state(&self) -> ManagedTorrentState { + self.locked.read().state.clone() + } + + pub fn live(&self) -> Option> { + let g = self.locked.read(); + match &g.state { + ManagedTorrentState::Live(live) => Some(live.clone()), + _ => None, + } + } + + pub async fn wait_until_completed(&self) -> anyhow::Result<()> { + // TODO: rewrite + self.live() + .context("torrent isn't live")? + .wait_until_completed() + .await; + Ok(()) + } +} + +pub struct ManagedTorrentBuilder { + info: TorrentMetaV1Info, + info_hash: Id20, + output_folder: PathBuf, + force_tracker_interval: Option, + peer_connect_timeout: Option, + peer_read_write_timeout: Option, + only_files: Option>, + trackers: Vec, + peer_id: Option, + overwrite: bool, + spawner: Option, +} + +impl ManagedTorrentBuilder { + pub fn new>( + info: TorrentMetaV1Info, + info_hash: Id20, + output_folder: P, + ) -> Self { + Self { + info, + info_hash, + output_folder: output_folder.as_ref().into(), + spawner: None, + force_tracker_interval: None, + peer_connect_timeout: None, + peer_read_write_timeout: None, + only_files: None, + trackers: Default::default(), + peer_id: None, + overwrite: false, + } + } + + pub fn only_files(&mut self, only_files: Vec) -> &mut Self { + self.only_files = Some(only_files); + self + } + + pub fn trackers(&mut self, trackers: Vec) -> &mut Self { + self.trackers = trackers; + self + } + + pub fn overwrite(&mut self, overwrite: bool) -> &mut Self { + self.overwrite = overwrite; + self + } + + pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self { + self.force_tracker_interval = Some(force_tracker_interval); + self + } + + pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self { + self.spawner = Some(spawner); + self + } + + pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self { + self.peer_id = Some(peer_id); + self + } + + pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self { + self.peer_connect_timeout = Some(timeout); + self + } + + pub fn peer_read_write_timeout(&mut self, timeout: Duration) -> &mut Self { + self.peer_read_write_timeout = Some(timeout); + self + } + + pub(crate) fn build(self) -> ManagedTorrentHandle { + Arc::new(ManagedTorrent { + locked: RwLock::new(ManagedTorrentLocked { + only_files: self.only_files, + state: Default::default(), + }), + info: Arc::new(ManagedTorrentInfo { + info: self.info, + info_hash: self.info_hash, + out_dir: self.output_folder, + trackers: self.trackers.into_iter().collect(), + spawner: self.spawner.unwrap_or_default(), + // options: Some(self.options), + }), + }) + } +} + +pub type ManagedTorrentHandle = Arc; diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index e457e36..8a94abe 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -7,10 +7,11 @@ use librqbit::{ http_api_client, peer_connection::PeerConnectionOptions, session::{ - AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, ManagedTorrentState, - Session, SessionOptions, + AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, + SessionOptions, }, spawn_utils::{spawn, BlockingSpawner}, + torrent_state::ManagedTorrentState, }; use size_format::SizeFormatterBinary as SF; use tracing::{error, info, span, warn, Level}; @@ -238,12 +239,12 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> loop { session.with_torrents(|torrents| { for (idx, torrent) in torrents.iter().enumerate() { - match &torrent.state { - ManagedTorrentState::Initializing => { + match torrent.state() { + ManagedTorrentState::Initializing(_) => { info!("[{}] initializing", idx); }, - ManagedTorrentState::Running(handle) => { - let stats = handle.torrent_state().stats_snapshot(); + ManagedTorrentState::Live(handle) => { + let stats = handle.stats_snapshot(); let speed = handle.speed_estimator(); let total = stats.total_bytes; let progress = stats.total_bytes - stats.remaining_bytes; @@ -269,6 +270,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> stats.peer_stats.dead, ); }, + ManagedTorrentState::Created => warn!("the torrent was just created, but not initializing"), } } }); @@ -394,7 +396,8 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> AddTorrentResponse::AlreadyManaged(handle) => { info!( "torrent {:?} is already managed, downloaded to {:?}", - handle.info_hash, handle.output_folder + handle.info_hash(), + handle.info().out_dir ); continue; }