diff --git a/TODO.md b/TODO.md index 6090d34..d91f504 100644 --- a/TODO.md +++ b/TODO.md @@ -18,4 +18,10 @@ - [ ] it's sending many requests now way too fast, locks up Mac OS UI annoyingly someday: -- [ ] cancellation from the client-side for the lib (i.e. stop the torrent manager) \ No newline at end of file +- [ ] cancellation from the client-side for the lib (i.e. stop the torrent manager) + + +refactor: +- [ ] where are peers stored +- [ ] http api pause/unpause etc +- [ ] when a live torrent fails writing to disk, it should transition to error state \ No newline at end of file diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index f5b16e3..21207fc 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -76,14 +76,6 @@ impl ChunkTracker { } } - pub fn get_have_bytes(&self) -> u64 { - todo!() - } - - pub fn get_needed_bytes(&self) -> u64 { - todo!() - } - pub fn get_lengths(&self) -> &Lengths { &self.lengths } diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 41cb617..bd94f2b 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,4 +1,4 @@ -use anyhow::{Context}; +use anyhow::Context; use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; @@ -20,7 +20,7 @@ use axum::Router; use crate::http_api_error::{ApiError, ApiErrorExt}; use crate::session::{ - AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, + AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId, }; use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}; use crate::torrent_state::stats::snapshot::StatsSnapshot; @@ -38,9 +38,6 @@ impl HttpApi { inner: Arc::new(ApiInternal::new(session)), } } - pub fn add_torrent_handle(&self, handle: ManagedTorrentHandle) -> usize { - self.inner.add_torrent_handle(handle) - } pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { let state = self.inner; @@ -331,9 +328,7 @@ impl TorrentAddQueryParams { // Private HTTP API internals. Agnostic of web framework. struct ApiInternal { - dht: Option, startup_time: Instant, - torrent_managers: RwLock>, session: Arc, } @@ -342,41 +337,29 @@ type ApiState = Arc; impl ApiInternal { pub fn new(session: Arc) -> Self { Self { - dht: session.get_dht(), startup_time: Instant::now(), - torrent_managers: RwLock::new(Vec::new()), session, } } - fn add_torrent_handle(&self, handle: ManagedTorrentHandle) -> usize { - let mut g = self.torrent_managers.write(); - let idx = g.len(); - g.push(handle); - idx - } - - fn mgr_handle(&self, idx: usize) -> Result { - self.torrent_managers - .read() + fn mgr_handle(&self, idx: TorrentId) -> Result { + self.session .get(idx) - .cloned() .ok_or(ApiError::torrent_not_found(idx)) } fn api_torrent_list(&self) -> TorrentListResponse { - TorrentListResponse { - torrents: self - .torrent_managers - .read() + let items = self.session.with_torrents(|torrents| { + torrents .iter() .enumerate() .map(|(id, mgr)| TorrentListResponseItem { id, info_hash: mgr.info().info_hash.as_string(), }) - .collect(), - } + .collect() + }); + TorrentListResponse { torrents: items } } fn api_torrent_details(&self, idx: usize) -> Result { @@ -406,10 +389,11 @@ impl ApiInternal { .context("error adding torrent") .with_error_status_code(StatusCode::BAD_REQUEST)? { - AddTorrentResponse::AlreadyManaged(managed) => { + AddTorrentResponse::AlreadyManaged(id, managed) => { return Err(anyhow::anyhow!( - "{:?} is already managed, downloaded to {:?}", + "{:?} is already managed, id={}, downloaded to {:?}", managed.info_hash(), + id, &managed.info().out_dir )) .with_error_status_code(StatusCode::CONFLICT); @@ -423,14 +407,13 @@ impl ApiInternal { details: make_torrent_details(&info_hash, &info, only_files.as_deref()) .context("error making torrent details")?, }, - AddTorrentResponse::Added(handle) => { + AddTorrentResponse::Added(id, handle) => { let details = make_torrent_details( &handle.info_hash(), &handle.info().info, handle.only_files().as_deref(), ) .context("error making torrent details")?; - let id = self.add_torrent_handle(handle); ApiAddTorrentResponse { id: Some(id), details, @@ -441,14 +424,15 @@ impl ApiInternal { } fn api_dht_stats(&self) -> Result { - self.dht + self.session + .get_dht() .as_ref() .map(|d| d.stats()) .ok_or(ApiError::dht_disabled()) } fn api_dht_table(&self) -> Result { - let dht = self.dht.as_ref().ok_or(ApiError::dht_disabled())?; + let dht = self.session.get_dht().ok_or(ApiError::dht_disabled())?; Ok(dht.with_routing_table(|r| r.clone())) } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index d4bbd28..c5e5703 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -11,39 +11,29 @@ use librqbit_core::{ use parking_lot::RwLock; use reqwest::Url; use tokio_stream::StreamExt; -use tracing::{debug, info, span, warn, Level}; +use tracing::{debug, info, trace_span, warn}; use crate::{ dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, peer_connection::PeerConnectionOptions, - spawn_utils::{spawn, BlockingSpawner}, + spawn_utils::BlockingSpawner, torrent_state::{ManagedTorrentBuilder, ManagedTorrentHandle}, }; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; +pub type TorrentId = usize; + #[derive(Default)] pub struct SessionLocked { torrents: Vec, } -enum SessionLockedAddTorrentResult { - AlreadyManaged(ManagedTorrentHandle), - Added(usize), -} - impl SessionLocked { - fn add_torrent(&mut self, torrent: ManagedTorrentHandle) -> SessionLockedAddTorrentResult { - if let Some(handle) = self - .torrents - .iter() - .find(|t| t.info_hash() == torrent.info_hash()) - { - return SessionLockedAddTorrentResult::AlreadyManaged(handle.clone()); - } + fn add_torrent(&mut self, torrent: ManagedTorrentHandle) -> TorrentId { let idx = self.torrents.len(); self.torrents.push(torrent); - SessionLockedAddTorrentResult::Added(idx) + idx } } @@ -109,9 +99,9 @@ pub struct ListOnlyResponse { } pub enum AddTorrentResponse { - AlreadyManaged(ManagedTorrentHandle), + AlreadyManaged(TorrentId, ManagedTorrentHandle), ListOnly(ListOnlyResponse), - Added(ManagedTorrentHandle), + Added(TorrentId, ManagedTorrentHandle), } pub fn read_local_file_including_stdin(filename: &str) -> anyhow::Result> { @@ -207,15 +197,14 @@ impl Session { locked: RwLock::new(SessionLocked::default()), }) } - pub fn get_dht(&self) -> Option { - self.dht.clone() + pub fn get_dht(&self) -> Option<&Dht> { + self.dht.as_ref() } - pub fn with_torrents(&self, callback: F) - where - F: Fn(&[ManagedTorrentHandle]), - { + + pub fn with_torrents(&self, callback: impl Fn(&[ManagedTorrentHandle]) -> R) -> R { callback(&self.locked.read().torrents) } + pub async fn add_torrent( &self, add: impl Into>, @@ -411,32 +400,37 @@ impl Session { builder.peer_read_write_timeout(t); } - let managed_torrent = builder.build(); - - match self.locked.write().add_torrent(managed_torrent.clone()) { - SessionLockedAddTorrentResult::AlreadyManaged(managed) => { - return Ok(AddTorrentResponse::AlreadyManaged(managed)) + let (managed_torrent, id) = { + let mut g = self.locked.write(); + if let Some((id, handle)) = g + .torrents + .iter() + .enumerate() + .find(|(_, t)| t.info_hash() == info_hash) + { + return Ok(AddTorrentResponse::AlreadyManaged(id, handle.clone())); } - SessionLockedAddTorrentResult::Added(_) => {} + let managed_torrent = builder.build(); + let id = g.add_torrent(managed_torrent.clone()); + (managed_torrent, id) + }; + + { + let span = trace_span!("torrent", id = id); + let _ = span.enter(); + managed_torrent + .start(initial_peers, dht_peer_rx) + .context("error starting torrent")?; } - for peer in initial_peers { - managed_torrent.add_peer(peer); - } + Ok(AddTorrentResponse::Added(id, managed_torrent)) + } - if let Some(mut dht_peer_rx) = dht_peer_rx { - spawn(span!(Level::INFO, "dht_peer_adder"), { - let handle = managed_torrent.clone(); - async move { - while let Some(peer) = dht_peer_rx.next().await { - handle.add_peer(peer); - } - warn!("dht was closed"); - Ok(()) - } - }); - } + pub fn get(&self, id: TorrentId) -> Option { + self.locked.read().torrents.get(id).cloned() + } - Ok(AddTorrentResponse::Added(managed_torrent)) + pub fn restart(&self, id: usize) -> anyhow::Result<()> { + todo!() } } diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 0878be6..1858878 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -29,8 +29,8 @@ fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> { } pub struct TorrentStateInitializing { - meta: Arc, - only_files: Option>, + pub(crate) meta: Arc, + pub(crate) only_files: Option>, } impl TorrentStateInitializing { diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index b48a206..957ca5d 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -64,7 +64,7 @@ use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::{ id20::Id20, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, - speed_estimator::{SpeedEstimator}, + speed_estimator::SpeedEstimator, torrent_metainfo::TorrentMetaV1Info, }; use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -88,7 +88,7 @@ use crate::{ peer_connection::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, - spawn_utils::{spawn}, + spawn_utils::spawn, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, type_aliases::{PeerHandle, BF}, }; @@ -157,14 +157,13 @@ pub struct TorrentStateLive { } impl TorrentStateLive { - #[allow(clippy::too_many_arguments)] pub(crate) fn new(paused: TorrentStatePaused) -> Arc { let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); let speed_estimator = SpeedEstimator::new(5); - let have_bytes = paused.chunk_tracker.get_have_bytes(); - let needed_bytes = paused.chunk_tracker.get_needed_bytes(); + let have_bytes = paused.have_bytes; + let needed_bytes = paused.needed_bytes; let lengths = *paused.chunk_tracker.get_lengths(); let state = Arc::new(TorrentStateLive { @@ -560,6 +559,10 @@ impl TorrentStateLive { } self.finished_notify.notified().await; } + + pub fn pause(&self) -> anyhow::Result { + bail!("pause not implemented yet") + } } struct PeerHandlerLocked { diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 04e050f..9de0d9a 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -4,11 +4,12 @@ pub mod paused; pub mod utils; use std::net::SocketAddr; +use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use std::{path::Path}; +use anyhow::bail; use anyhow::Context; use buffers::ByteString; use librqbit_core::id20::Id20; @@ -18,20 +19,38 @@ use librqbit_core::torrent_metainfo::TorrentMetaV1Info; pub use live::*; use parking_lot::RwLock; - +use tokio_stream::StreamExt; +use tracing::trace_span; use url::Url; -use crate::spawn_utils::{BlockingSpawner}; +use crate::spawn_utils::spawn; +use crate::spawn_utils::BlockingSpawner; use initializing::TorrentStateInitializing; use self::paused::TorrentStatePaused; pub enum ManagedTorrentState { - Initializing(TorrentStateInitializing), + Initializing(Arc), Paused(TorrentStatePaused), Live(Arc), Error(anyhow::Error), + + // This is used when swapping between states, outside world should never see it. + None, +} + +impl ManagedTorrentState { + fn assert_paused(self) -> TorrentStatePaused { + match self { + Self::Paused(paused) => paused, + _ => panic!("Expected paused state"), + } + } + + fn take(&mut self) -> Self { + std::mem::replace(self, Self::None) + } } pub(crate) struct ManagedTorrentLocked { @@ -58,6 +77,7 @@ pub struct ManagedTorrentInfo { pub struct ManagedTorrent { pub info: Arc, + only_files: Option>, locked: RwLock, } @@ -70,13 +90,8 @@ impl ManagedTorrent { self.info.info_hash } - pub(crate) fn add_peer(&self, _peer: SocketAddr) -> bool { - todo!() - } - pub fn only_files(&self) -> Option> { - // self.locked.write().only_files.clone() - todo!() + self.only_files.clone() } pub fn with_state(&self, f: impl FnOnce(&ManagedTorrentState) -> R) -> R { @@ -91,6 +106,91 @@ impl ManagedTorrent { } } + pub fn start( + self: &Arc, + initial_peers: Vec, + peer_rx: Option + Unpin + Send + Sync + 'static>, + ) -> anyhow::Result<()> { + let mut g = self.locked.write(); + match &g.state { + ManagedTorrentState::Live(_) => { + bail!("torrent is already live"); + } + ManagedTorrentState::Initializing(init) => { + let init = init.clone(); + let t = self.clone(); + spawn(trace_span!("initialize_and_start"), async move { + match init.check().await { + Ok(paused) => { + let live = TorrentStateLive::new(paused); + t.locked.write().state = ManagedTorrentState::Live(live.clone()); + + let live = Arc::downgrade(&live); + spawn(trace_span!("peer_adder"), async move { + { + let live: Arc = + live.upgrade().context("no longer live")?; + for peer in initial_peers { + live.add_peer_if_not_seen(peer); + } + } + + if let Some(mut peer_rx) = peer_rx { + while let Some(peer) = peer_rx.next().await { + live.upgrade() + .context("no longer live")? + .add_peer_if_not_seen(peer); + } + } + + Ok(()) + }); + + Ok(()) + } + Err(err) => { + let result = anyhow::anyhow!("{:?}", err); + t.locked.write().state = ManagedTorrentState::Error(err); + Err(result) + } + } + }); + Ok(()) + } + ManagedTorrentState::Paused(_) => { + let paused = g.state.take().assert_paused(); + let live = TorrentStateLive::new(paused); + g.state = ManagedTorrentState::Live(live); + Ok(()) + } + ManagedTorrentState::Error(_) => { + bail!("starting torrents from error state not implemented") + } + ManagedTorrentState::None => bail!("bug: torrent is in empty state"), + } + } + + pub fn pause(&self) -> anyhow::Result<()> { + let mut g = self.locked.write(); + match &g.state { + ManagedTorrentState::Live(live) => { + let paused = live.pause()?; + g.state = ManagedTorrentState::Paused(paused); + Ok(()) + } + ManagedTorrentState::Initializing(_) => { + bail!("torrent is initializing, can't pause"); + } + ManagedTorrentState::Paused(_) => { + bail!("torrent is already paused"); + } + ManagedTorrentState::Error(_) => { + bail!("can't pause torrent in error state") + } + ManagedTorrentState::None => bail!("bug: torrent is in empty state"), + } + } + pub async fn wait_until_completed(&self) -> anyhow::Result<()> { // TODO: rewrite self.live() @@ -191,8 +291,12 @@ impl ManagedTorrentBuilder { overwrite: self.overwrite, }, }); - let initializing = TorrentStateInitializing::new(info.clone(), self.only_files); + let initializing = Arc::new(TorrentStateInitializing::new( + info.clone(), + self.only_files.clone(), + )); Arc::new(ManagedTorrent { + only_files: self.only_files, locked: RwLock::new(ManagedTorrentLocked { state: ManagedTorrentState::Initializing(initializing), }), diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index 0a2b2c9..b8ac7c1 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -14,3 +14,12 @@ pub struct TorrentStatePaused { pub(crate) have_bytes: u64, pub(crate) needed_bytes: u64, } + +// impl TorrentStatePaused { +// pub fn get_have_bytes(&self) -> u64 { +// self.have_bytes +// } +// pub fn get_needed_bytes(&self) -> u64 { +// self.needed_bytes +// } +// }