From d8538af25d7295acd5ef24469a4c80bf11c62114 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 24 Nov 2023 12:44:36 +0000 Subject: [PATCH] [2/n] HUGE REFACTOR to suppor multiple states. Incomplete, broken --- crates/librqbit/src/chunk_tracker.rs | 12 + crates/librqbit/src/torrent_manager.rs | 262 ------------------ .../src/torrent_state/initializing.rs | 157 +++++++++++ crates/librqbit/src/torrent_state/live/mod.rs | 181 ++++++++---- crates/librqbit/src/torrent_state/mod.rs | 67 +++-- crates/librqbit/src/torrent_state/paused.rs | 14 + crates/rqbit/src/main.rs | 70 ++--- 7 files changed, 394 insertions(+), 369 deletions(-) create mode 100644 crates/librqbit/src/torrent_state/initializing.rs create mode 100644 crates/librqbit/src/torrent_state/paused.rs diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 880ce53..f5b16e3 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -76,6 +76,18 @@ impl ChunkTracker { } } + pub fn get_have_bytes(&self) -> u64 { + todo!() + } + + pub fn get_needed_bytes(&self) -> u64 { + todo!() + } + + pub fn get_lengths(&self) -> &Lengths { + &self.lengths + } + pub fn get_have_pieces(&self) -> &BF { &self.have } diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 16507af..e69de29 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -1,262 +0,0 @@ -use std::{ - collections::HashSet, - fs::{File, OpenOptions}, - net::SocketAddr, - path::{Path, PathBuf}, - sync::Arc, - time::{Duration, Instant}, -}; - -use anyhow::Context; -use bencode::from_bytes; -use buffers::ByteString; -use librqbit_core::{ - id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator, - torrent_metainfo::TorrentMetaV1Info, -}; -use parking_lot::Mutex; -use reqwest::Url; -use sha1w::Sha1; -use size_format::SizeFormatterBinary as SF; -use tracing::{debug, info, span, warn, Level}; - -use crate::{ - chunk_tracker::ChunkTracker, - file_ops::FileOps, - spawn_utils::{spawn, BlockingSpawner}, - torrent_state::{ManagedTorrent, ManagedTorrentHandle, TorrentStateLive, TorrentStateOptions}, - tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, -}; - -struct TorrentManager { - state: Arc, - #[allow(dead_code)] - speed_estimator: Arc, - trackers: Mutex>, - options: TorrentManagerOptions, -} - -fn make_lengths>( - torrent: &TorrentMetaV1Info, -) -> anyhow::Result { - let total_length = torrent.iter_file_lengths()?.sum(); - Lengths::new(total_length, torrent.piece_length, None) -} - -fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> { - Ok(file.set_len(length)?) -} - -impl TorrentManager { - fn start>( - info: TorrentMetaV1Info, - info_hash: Id20, - out: P, - spawner: BlockingSpawner, - options: Option, - ) -> anyhow::Result { - let options = options.unwrap_or_default(); - let (files, filenames) = { - let mut files = - Vec::>>::with_capacity(info.iter_file_lengths()?.count()); - let mut filenames = Vec::new(); - for (path_bits, _) in info.iter_filenames_and_lengths()? { - let mut full_path = out.as_ref().to_owned(); - let relative_path = path_bits - .to_pathbuf() - .context("error converting file to path")?; - full_path.push(relative_path); - - std::fs::create_dir_all(full_path.parent().unwrap())?; - let file = if options.overwrite { - OpenOptions::new() - .create(true) - .read(true) - .write(true) - .open(&full_path)? - } else { - // TODO: create_new does not seem to work with read(true), so calling this twice. - OpenOptions::new() - .create_new(true) - .write(true) - .open(&full_path) - .with_context(|| format!("error creating {:?}", &full_path))?; - OpenOptions::new().read(true).write(true).open(&full_path)? - }; - filenames.push(full_path); - files.push(Arc::new(Mutex::new(file))) - } - (files, filenames) - }; - - let peer_id = options.peer_id.unwrap_or_else(generate_peer_id); - let lengths = make_lengths(&info).context("unable to compute Lengths from torrent")?; - debug!("computed lengths: {:?}", &lengths); - - info!("Doing initial checksum validation, this might take a while..."); - let initial_check_results = spawner.spawn_block_in_place(|| { - FileOps::::new(&info, &files, &lengths) - .initial_check(options.only_files.as_deref()) - })?; - - info!( - "Initial check results: have {}, needed {}", - SF::new(initial_check_results.have_bytes), - SF::new(initial_check_results.needed_bytes) - ); - - spawner.spawn_block_in_place(|| { - for (idx, (file, (name, length))) in files - .iter() - .zip(info.iter_filenames_and_lengths().unwrap()) - .enumerate() - { - if options - .only_files - .as_ref() - .map(|v| !v.contains(&idx)) - .unwrap_or(false) - { - continue; - } - let now = Instant::now(); - if let Err(err) = ensure_file_length(&file.lock(), length) { - warn!( - "Error setting length for file {:?} to {}: {:#?}", - name, length, err - ); - } else { - debug!( - "Set length for file {:?} to {} in {:?}", - name, - SF::new(length), - now.elapsed() - ); - } - } - }); - - let chunk_tracker = ChunkTracker::new( - initial_check_results.needed_pieces, - initial_check_results.have_pieces, - lengths, - ); - - #[allow(clippy::needless_update)] - let state_options = TorrentStateOptions { - peer_connect_timeout: options.peer_connect_timeout, - peer_read_write_timeout: options.peer_read_write_timeout, - ..Default::default() - }; - - let state = TorrentStateLive::new( - info, - info_hash, - peer_id, - files, - filenames, - chunk_tracker, - lengths, - initial_check_results.have_bytes, - initial_check_results.needed_bytes, - spawner, - Some(state_options), - ); - - let estimator = Arc::new(SpeedEstimator::new(5)); - - let mgr = Arc::new(Self { - state, - speed_estimator: estimator.clone(), - trackers: Mutex::new(HashSet::new()), - options, - }); - - spawn(span!(Level::ERROR, "speed_estimator_updater"), { - let state = mgr.state.clone(); - async move { - loop { - let stats = state.stats_snapshot(); - let fetched = stats.fetched_bytes; - let needed = state.initially_needed(); - // fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64. - let remaining = needed - .wrapping_sub(fetched) - .min(needed - stats.downloaded_and_checked_bytes); - estimator.add_snapshot(fetched, remaining, Instant::now()); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - }); - - Ok(mgr.into_handle()) - } - - fn into_handle(self: Arc) -> TorrentManagerHandle { - TorrentManagerHandle { manager: self } - } - - async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result { - let response: reqwest::Response = reqwest::get(tracker_url).await?; - if !response.status().is_success() { - anyhow::bail!("tracker responded with {:?}", response.status()); - } - let bytes = response.bytes().await?; - if let Ok(error) = from_bytes::(&bytes) { - anyhow::bail!( - "tracker returned failure. Failure reason: {}", - error.failure_reason - ) - }; - let response = from_bytes::(&bytes)?; - - for peer in response.peers.iter_sockaddrs() { - self.state.add_peer_if_not_seen(peer); - } - Ok(response.interval) - } - - async fn single_tracker_monitor(&self, mut tracker_url: Url) -> anyhow::Result<()> { - let mut event = Some(TrackerRequestEvent::Started); - loop { - let request = TrackerRequest { - info_hash: self.state.info_hash(), - peer_id: self.state.peer_id(), - port: 6778, - uploaded: self.state.get_uploaded_bytes(), - downloaded: self.state.get_downloaded_bytes(), - left: self.state.get_left_to_download_bytes(), - compact: true, - no_peer_id: false, - event, - ip: None, - numwant: None, - key: None, - trackerid: None, - }; - - let request_query = request.as_querystring(); - tracker_url.set_query(Some(&request_query)); - - match self.tracker_one_request(tracker_url.clone()).await { - Ok(interval) => { - event = None; - let interval = self - .options - .force_tracker_interval - .unwrap_or_else(|| Duration::from_secs(interval)); - debug!( - "sleeping for {:?} after calling tracker {}", - interval, - tracker_url.host().unwrap() - ); - tokio::time::sleep(interval).await; - } - Err(e) => { - debug!("error calling the tracker {}: {:#}", tracker_url, e); - tokio::time::sleep(Duration::from_secs(60)).await; - } - }; - } - } -} diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs new file mode 100644 index 0000000..2873cb1 --- /dev/null +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -0,0 +1,157 @@ +use std::{ + collections::HashSet, + fs::{File, OpenOptions}, + net::SocketAddr, + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, Instant}, +}; + +use anyhow::Context; +use bencode::from_bytes; +use buffers::ByteString; +use librqbit_core::{ + id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator, + torrent_metainfo::TorrentMetaV1Info, +}; +use parking_lot::Mutex; +use reqwest::Url; +use sha1w::Sha1; +use size_format::SizeFormatterBinary as SF; +use tracing::{debug, info, span, warn, Level}; + +use crate::{ + chunk_tracker::ChunkTracker, + file_ops::FileOps, + spawn_utils::{spawn, BlockingSpawner}, + torrent_state::{ManagedTorrent, ManagedTorrentHandle, TorrentStateLive, TorrentStateOptions}, + tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, +}; + +use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; + +fn make_lengths>( + torrent: &TorrentMetaV1Info, +) -> anyhow::Result { + let total_length = torrent.iter_file_lengths()?.sum(); + Lengths::new(total_length, torrent.piece_length, None) +} + +fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> { + Ok(file.set_len(length)?) +} + +pub struct TorrentStateInitializing { + info: Arc, + only_files: Option>, +} + +impl TorrentStateInitializing { + pub fn new(info: Arc, only_files: Option>) -> Self { + Self { info, only_files } + } + + pub async fn check(&self) -> anyhow::Result { + let (files, filenames) = { + let mut files = Vec::>>::with_capacity( + (&self.info).info.iter_file_lengths()?.count(), + ); + let mut filenames = Vec::new(); + for (path_bits, _) in (&self.info).info.iter_filenames_and_lengths()? { + let mut full_path = (&self.info).out_dir.clone(); + let relative_path = path_bits + .to_pathbuf() + .context("error converting file to path")?; + full_path.push(relative_path); + + std::fs::create_dir_all(full_path.parent().unwrap())?; + let file = if (&self.info).options.overwrite { + OpenOptions::new() + .create(true) + .read(true) + .write(true) + .open(&full_path)? + } else { + // TODO: create_new does not seem to work with read(true), so calling this twice. + OpenOptions::new() + .create_new(true) + .write(true) + .open(&full_path) + .with_context(|| format!("error creating {:?}", &full_path))?; + OpenOptions::new().read(true).write(true).open(&full_path)? + }; + filenames.push(full_path); + files.push(Arc::new(Mutex::new(file))) + } + (files, filenames) + }; + + let lengths = + make_lengths(&(&self.info).info).context("unable to compute Lengths from torrent")?; + debug!("computed lengths: {:?}", &lengths); + + info!("Doing initial checksum validation, this might take a while..."); + let initial_check_results = (&self.info).spawner.spawn_block_in_place(|| { + FileOps::::new(&(&self.info).info, &files, &lengths) + .initial_check(self.only_files.as_deref()) + })?; + + info!( + "Initial check results: have {}, needed {}", + SF::new(initial_check_results.have_bytes), + SF::new(initial_check_results.needed_bytes) + ); + + (&self.info).spawner.spawn_block_in_place(|| { + for (idx, (file, (name, length))) in files + .iter() + .zip((&self.info).info.iter_filenames_and_lengths().unwrap()) + .enumerate() + { + if self + .only_files + .as_ref() + .map(|v| !v.contains(&idx)) + .unwrap_or(false) + { + continue; + } + let now = Instant::now(); + if let Err(err) = ensure_file_length(&file.lock(), length) { + warn!( + "Error setting length for file {:?} to {}: {:#?}", + name, length, err + ); + } else { + debug!( + "Set length for file {:?} to {} in {:?}", + name, + SF::new(length), + now.elapsed() + ); + } + } + }); + + let chunk_tracker = ChunkTracker::new( + initial_check_results.needed_pieces, + initial_check_results.have_pieces, + lengths, + ); + + #[allow(clippy::needless_update)] + let state_options = TorrentStateOptions { + peer_connect_timeout: (&self.info).options.peer_connect_timeout, + peer_read_write_timeout: (&self.info).options.peer_read_write_timeout, + ..Default::default() + }; + + let paused = TorrentStatePaused { + info: self.info.clone(), + files, + filenames, + chunk_tracker, + }; + Ok(paused) + } +} diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 08650a5..e59a6d3 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -57,12 +57,13 @@ use std::{ use anyhow::{bail, Context}; use backoff::backoff::Backoff; +use bencode::from_bytes; use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::{ id20::Id20, - lengths::{ChunkInfo, Lengths, ValidPieceIndex}, + lengths::{self, ChunkInfo, Lengths, ValidPieceIndex}, speed_estimator::{self, SpeedEstimator}, torrent_metainfo::TorrentMetaV1Info, }; @@ -78,7 +79,8 @@ use tokio::{ }, time::timeout, }; -use tracing::{debug, error, info, span, trace, warn, Level}; +use tracing::{debug, error, info, span, trace, trace_span, warn, Level}; +use url::Url; use crate::{ chunk_tracker::{ChunkMarkingResult, ChunkTracker}, @@ -87,6 +89,7 @@ use crate::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, spawn_utils::{spawn, BlockingSpawner}, + tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, type_aliases::{PeerHandle, BF}, }; @@ -102,7 +105,11 @@ use self::{ stats::{atomic::AtomicStats, snapshot::StatsSnapshot}, }; -use super::utils::{timeit, TimedExistence}; +use super::{ + paused::TorrentStatePaused, + utils::{timeit, TimedExistence}, + ManagedTorrentInfo, +}; struct InflightPiece { peer: PeerHandle, @@ -126,17 +133,17 @@ pub struct TorrentStateOptions { pub struct TorrentStateLive { peers: PeerStates, - info: TorrentMetaV1Info, + meta: Arc, locked: Arc>, files: Vec>>, filenames: Vec, - info_hash: Id20, - peer_id: Id20, - lengths: Lengths, + + // TODO: why the hell do we need these here, remove it. needed_bytes: u64, have_plus_needed_bytes: u64, + stats: AtomicStats, - options: TorrentStateOptions, + lengths: Lengths, // Limits how many active (occupying network resources) peers there are at a moment in time. peer_semaphore: Semaphore, @@ -151,35 +158,24 @@ pub struct TorrentStateLive { impl TorrentStateLive { #[allow(clippy::too_many_arguments)] - pub(crate) fn new( - info: TorrentMetaV1Info, - info_hash: Id20, - peer_id: Id20, - files: Vec>>, - filenames: Vec, - chunk_tracker: ChunkTracker, - lengths: Lengths, - have_bytes: u64, - needed_bytes: u64, - spawner: BlockingSpawner, - options: Option, - ) -> Arc { - let options = options.unwrap_or_default(); + pub(crate) fn new(paused: TorrentStatePaused) -> Arc { let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); let speed_estimator = SpeedEstimator::new(5); + let have_bytes = paused.chunk_tracker.get_have_bytes(); + let needed_bytes = paused.chunk_tracker.get_needed_bytes(); + let lengths = paused.chunk_tracker.get_lengths().clone(); + let state = Arc::new(TorrentStateLive { - info_hash, - info, - peer_id, + meta: paused.info.clone(), peers: Default::default(), locked: Arc::new(RwLock::new(TorrentStateLocked { - chunks: chunk_tracker, + chunks: paused.chunk_tracker, inflight_pieces: Default::default(), })), - files, - filenames, + files: paused.files, + filenames: paused.filenames, stats: AtomicStats { have_bytes: AtomicU64::new(have_bytes), ..Default::default() @@ -187,16 +183,42 @@ impl TorrentStateLive { needed_bytes, have_plus_needed_bytes: needed_bytes + have_bytes, lengths, - options, peer_semaphore: Semaphore::new(128), peer_queue_tx, finished_notify: Notify::new(), speed_estimator, }); + + for tracker in state.meta.trackers.iter() { + spawn( + trace_span!("tracker_monitor", url = tracker.to_string()), + state.clone().task_single_tracker_monitor(tracker.clone()), + ); + } + + spawn(span!(Level::ERROR, "speed_estimator_updater"), { + let state = state.clone(); + async move { + loop { + let stats = state.stats_snapshot(); + let fetched = stats.fetched_bytes; + let needed = state.initially_needed(); + // fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64. + let remaining = needed + .wrapping_sub(fetched) + .min(needed - stats.downloaded_and_checked_bytes); + state + .speed_estimator + .add_snapshot(fetched, remaining, Instant::now()); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }); + spawn( span!(Level::ERROR, "peer_adder"), - state.clone().task_peer_adder(peer_queue_rx, spawner), + state.clone().task_peer_adder(peer_queue_rx), ); state } @@ -205,11 +227,75 @@ impl TorrentStateLive { &self.speed_estimator } - async fn task_manage_peer( + async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result { + let response: reqwest::Response = reqwest::get(tracker_url).await?; + if !response.status().is_success() { + anyhow::bail!("tracker responded with {:?}", response.status()); + } + let bytes = response.bytes().await?; + if let Ok(error) = from_bytes::(&bytes) { + anyhow::bail!( + "tracker returned failure. Failure reason: {}", + error.failure_reason + ) + }; + let response = from_bytes::(&bytes)?; + + for peer in response.peers.iter_sockaddrs() { + self.add_peer_if_not_seen(peer); + } + Ok(response.interval) + } + + async fn task_single_tracker_monitor( self: Arc, - addr: SocketAddr, - spawner: BlockingSpawner, + mut tracker_url: Url, ) -> anyhow::Result<()> { + let mut event = Some(TrackerRequestEvent::Started); + loop { + let request = TrackerRequest { + info_hash: self.info_hash(), + peer_id: self.peer_id(), + port: 6778, + uploaded: self.get_uploaded_bytes(), + downloaded: self.get_downloaded_bytes(), + left: self.get_left_to_download_bytes(), + compact: true, + no_peer_id: false, + event, + ip: None, + numwant: None, + key: None, + trackerid: None, + }; + + let request_query = request.as_querystring(); + tracker_url.set_query(Some(&request_query)); + + match self.tracker_one_request(tracker_url.clone()).await { + Ok(interval) => { + event = None; + let interval = self + .meta + .options + .force_tracker_interval + .unwrap_or_else(|| Duration::from_secs(interval)); + debug!( + "sleeping for {:?} after calling tracker {}", + interval, + tracker_url.host().unwrap() + ); + tokio::time::sleep(interval).await; + } + Err(e) => { + debug!("error calling the tracker {}: {:#}", tracker_url, e); + tokio::time::sleep(Duration::from_secs(60)).await; + } + }; + } + } + + async fn task_manage_peer(self: Arc, addr: SocketAddr) -> anyhow::Result<()> { let state = self; let (rx, tx) = state.peers.mark_peer_connecting(addr)?; @@ -229,21 +315,20 @@ impl TorrentStateLive { requests_sem: Semaphore::new(0), state: state.clone(), tx, - spawner, counters, }; let options = PeerConnectionOptions { - connect_timeout: state.options.peer_connect_timeout, - read_write_timeout: state.options.peer_read_write_timeout, + connect_timeout: state.meta.options.peer_connect_timeout, + read_write_timeout: state.meta.options.peer_read_write_timeout, ..Default::default() }; let peer_connection = PeerConnection::new( addr, - state.info_hash, - state.peer_id, + state.meta.info_hash, + state.meta.peer_id, &handler, Some(options), - spawner, + state.meta.spawner, ); let requester = handler.task_peer_chunk_requester(addr); @@ -274,7 +359,6 @@ impl TorrentStateLive { async fn task_peer_adder( self: Arc, mut peer_queue_rx: UnboundedReceiver, - spawner: BlockingSpawner, ) -> anyhow::Result<()> { let state = self; loop { @@ -289,22 +373,22 @@ impl TorrentStateLive { permit.forget(); spawn( span!(parent: None, Level::ERROR, "manage_peer", peer = addr.to_string()), - state.clone().task_manage_peer(addr, spawner), + state.clone().task_manage_peer(addr), ); } } pub fn info(&self) -> &TorrentMetaV1Info { - &self.info + &self.meta.info } pub fn info_hash(&self) -> Id20 { - self.info_hash + self.meta.info_hash } pub fn peer_id(&self) -> Id20 { - self.peer_id + self.meta.peer_id } pub(crate) fn file_ops(&self) -> FileOps<'_, Sha1> { - FileOps::new(&self.info, &self.files, &self.lengths) + FileOps::new(&self.meta.info, &self.files, &self.lengths) } pub fn initially_needed(&self) -> u64 { self.needed_bytes @@ -429,7 +513,7 @@ impl TorrentStateLive { ); } - pub(crate) fn add_peer_if_not_seen(self: &Arc, addr: SocketAddr) -> bool { + pub(crate) fn add_peer_if_not_seen(&self, addr: SocketAddr) -> bool { match self.peers.add_if_not_seen(addr) { Some(handle) => handle, None => return false, @@ -509,7 +593,6 @@ struct PeerHandler { requests_sem: Semaphore, addr: SocketAddr, - spawner: BlockingSpawner, tx: PeerTx, } @@ -1083,7 +1166,9 @@ impl PeerHandler { // By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would // have fallen off above in one of the defensive checks. - self.spawner + self.state + .meta + .spawner .spawn_block_in_place(move || { let index = piece.index; diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index f566f51..eda146a 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -1,6 +1,7 @@ -pub mod utils; - +pub mod initializing; pub mod live; +pub mod paused; +pub mod utils; use std::net::SocketAddr; use std::path::PathBuf; @@ -11,6 +12,7 @@ use std::{collections::HashSet, path::Path}; use anyhow::Context; use buffers::ByteString; use librqbit_core::id20::Id20; +use librqbit_core::peer_id::generate_peer_id; use librqbit_core::speed_estimator::SpeedEstimator; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; pub use live::*; @@ -21,32 +23,37 @@ use url::Url; use crate::spawn_utils::{spawn, BlockingSpawner}; -pub struct TorrentStateInitializing {} +use initializing::TorrentStateInitializing; + +use self::paused::TorrentStatePaused; -#[derive(Default, Clone)] pub enum ManagedTorrentState { - #[default] - Created, - - Initializing(Arc), - - // TODO: only_files_tx - // TODO: trackers_tx?? + Initializing(TorrentStateInitializing), + Paused(TorrentStatePaused), Live(Arc), + Error(anyhow::Error), } pub(crate) struct ManagedTorrentLocked { - pub only_files: Option>, pub state: ManagedTorrentState, } +#[derive(Default)] +pub(crate) struct ManagedTorrentOptions { + pub force_tracker_interval: Option, + pub peer_connect_timeout: Option, + pub peer_read_write_timeout: Option, + pub overwrite: bool, +} + pub struct ManagedTorrentInfo { pub info: TorrentMetaV1Info, pub info_hash: Id20, pub out_dir: PathBuf, pub spawner: BlockingSpawner, pub trackers: Vec, - // pub options: Option, + pub peer_id: Id20, + pub(crate) options: ManagedTorrentOptions, } pub struct ManagedTorrent { @@ -68,11 +75,12 @@ impl ManagedTorrent { } pub fn only_files(&self) -> Option> { - self.locked.write().only_files.clone() + // self.locked.write().only_files.clone() + todo!() } - pub fn state(&self) -> ManagedTorrentState { - self.locked.read().state.clone() + pub fn with_state(&self, f: impl FnOnce(&ManagedTorrentState) -> R) -> R { + f(&self.locked.read().state) } pub fn live(&self) -> Option> { @@ -169,19 +177,26 @@ impl ManagedTorrentBuilder { } pub(crate) fn build(self) -> ManagedTorrentHandle { + let info = Arc::new(ManagedTorrentInfo { + info: self.info, + info_hash: self.info_hash, + out_dir: self.output_folder, + trackers: self.trackers.into_iter().collect(), + spawner: self.spawner.unwrap_or_default(), + peer_id: self.peer_id.unwrap_or_else(generate_peer_id), + options: ManagedTorrentOptions { + force_tracker_interval: self.force_tracker_interval, + peer_connect_timeout: self.peer_connect_timeout, + peer_read_write_timeout: self.peer_read_write_timeout, + overwrite: self.overwrite, + }, + }); + let initializing = TorrentStateInitializing::new(info.clone(), self.only_files); Arc::new(ManagedTorrent { locked: RwLock::new(ManagedTorrentLocked { - only_files: self.only_files, - state: Default::default(), - }), - info: Arc::new(ManagedTorrentInfo { - info: self.info, - info_hash: self.info_hash, - out_dir: self.output_folder, - trackers: self.trackers.into_iter().collect(), - spawner: self.spawner.unwrap_or_default(), - // options: Some(self.options), + state: ManagedTorrentState::Initializing(initializing), }), + info, }) } } diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs new file mode 100644 index 0000000..d4f9da7 --- /dev/null +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -0,0 +1,14 @@ +use std::{fs::File, path::PathBuf, sync::Arc}; + +use parking_lot::Mutex; + +use crate::chunk_tracker::ChunkTracker; + +use super::ManagedTorrentInfo; + +pub struct TorrentStatePaused { + pub(crate) info: Arc, + pub(crate) files: Vec>>, + pub(crate) filenames: Vec, + pub(crate) chunk_tracker: ChunkTracker, +} diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 8a94abe..c46d0e2 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -239,39 +239,43 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> loop { session.with_torrents(|torrents| { for (idx, torrent) in torrents.iter().enumerate() { - match torrent.state() { - ManagedTorrentState::Initializing(_) => { - info!("[{}] initializing", idx); - }, - ManagedTorrentState::Live(handle) => { - let stats = handle.stats_snapshot(); - let speed = handle.speed_estimator(); - let total = stats.total_bytes; - let progress = stats.total_bytes - stats.remaining_bytes; - let downloaded_pct = if stats.remaining_bytes == 0 { - 100f64 - } else { - (progress as f64 / total as f64) * 100f64 - }; - info!( - "[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}, dead: {}}}", - idx, - downloaded_pct, - SF::new(progress), - speed.download_mbps(), - SF::new(stats.fetched_bytes), - SF::new(stats.remaining_bytes), - SF::new(total), - SF::new(stats.uploaded_bytes), - stats.peer_stats.live, - stats.peer_stats.connecting, - stats.peer_stats.queued, - stats.peer_stats.seen, - stats.peer_stats.dead, - ); - }, - ManagedTorrentState::Created => warn!("the torrent was just created, but not initializing"), - } + let live = torrent.with_state(|s| { + match s { + ManagedTorrentState::Initializing(_) => info!("[{}] initializing", idx), + ManagedTorrentState::Live(h) => return Some(h.clone()), + ManagedTorrentState::Error(_) | ManagedTorrentState::Paused(_) => {}, + }; + None + }); + let handle = match live { + Some(live) => live, + None => continue + }; + let stats = handle.stats_snapshot(); + let speed = handle.speed_estimator(); + let total = stats.total_bytes; + let progress = stats.total_bytes - stats.remaining_bytes; + let downloaded_pct = if stats.remaining_bytes == 0 { + 100f64 + } else { + (progress as f64 / total as f64) * 100f64 + }; + info!( + "[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}, dead: {}}}", + idx, + downloaded_pct, + SF::new(progress), + speed.download_mbps(), + SF::new(stats.fetched_bytes), + SF::new(stats.remaining_bytes), + SF::new(total), + SF::new(stats.uploaded_bytes), + stats.peer_stats.live, + stats.peer_stats.connecting, + stats.peer_stats.queued, + stats.peer_stats.seen, + stats.peer_stats.dead, + ); } }); tokio::time::sleep(Duration::from_secs(1)).await;