pub mod initializing; pub mod live; pub mod paused; pub mod utils; use std::net::SocketAddr; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use anyhow::bail; use anyhow::Context; use buffers::ByteString; use librqbit_core::id20::Id20; use librqbit_core::peer_id::generate_peer_id; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; pub use live::*; use parking_lot::RwLock; use tokio_stream::StreamExt; use tracing::trace_span; use url::Url; use crate::chunk_tracker::ChunkTracker; use crate::spawn_utils::spawn; use crate::spawn_utils::BlockingSpawner; use initializing::TorrentStateInitializing; use self::paused::TorrentStatePaused; pub enum ManagedTorrentState { Initializing(Arc), Paused(TorrentStatePaused), Live(Arc), Error(anyhow::Error), // This is used when swapping between states, outside world should never see it. None, } impl ManagedTorrentState { fn assert_paused(self) -> TorrentStatePaused { match self { Self::Paused(paused) => paused, _ => panic!("Expected paused state"), } } fn take(&mut self) -> Self { std::mem::replace(self, Self::None) } } pub(crate) struct ManagedTorrentLocked { pub state: ManagedTorrentState, } #[derive(Default)] pub(crate) struct ManagedTorrentOptions { pub force_tracker_interval: Option, pub peer_connect_timeout: Option, pub peer_read_write_timeout: Option, pub overwrite: bool, } pub struct ManagedTorrentInfo { pub info: TorrentMetaV1Info, pub info_hash: Id20, pub out_dir: PathBuf, pub spawner: BlockingSpawner, pub trackers: Vec, pub peer_id: Id20, pub(crate) options: ManagedTorrentOptions, } pub struct ManagedTorrent { pub info: Arc, only_files: Option>, locked: RwLock, } impl ManagedTorrent { pub fn info(&self) -> &ManagedTorrentInfo { &self.info } pub fn info_hash(&self) -> Id20 { self.info.info_hash } pub fn only_files(&self) -> Option> { self.only_files.clone() } pub fn with_state(&self, f: impl FnOnce(&ManagedTorrentState) -> R) -> R { f(&self.locked.read().state) } pub fn with_chunk_tracker(&self, f: impl FnOnce(&ChunkTracker) -> R) -> anyhow::Result { let g = self.locked.read(); match &g.state { ManagedTorrentState::Paused(p) => Ok(f(&p.chunk_tracker)), ManagedTorrentState::Live(l) => Ok(f(&l.lock_read("chunk_tracker").chunks)), _ => bail!("no chunk tracker, torrent neither paused nor live"), } } pub fn live(&self) -> Option> { let g = self.locked.read(); match &g.state { ManagedTorrentState::Live(live) => Some(live.clone()), _ => None, } } pub fn start( self: &Arc, initial_peers: Vec, peer_rx: Option + Unpin + Send + Sync + 'static>, ) -> anyhow::Result<()> { let mut g = self.locked.write(); match &g.state { ManagedTorrentState::Live(_) => { bail!("torrent is already live"); } ManagedTorrentState::Initializing(init) => { let init = init.clone(); let t = self.clone(); spawn(trace_span!("initialize_and_start"), async move { match init.check().await { Ok(paused) => { let live = TorrentStateLive::new(paused); t.locked.write().state = ManagedTorrentState::Live(live.clone()); let live = Arc::downgrade(&live); spawn(trace_span!("peer_adder"), async move { { let live: Arc = live.upgrade().context("no longer live")?; for peer in initial_peers { live.add_peer_if_not_seen(peer); } } if let Some(mut peer_rx) = peer_rx { while let Some(peer) = peer_rx.next().await { live.upgrade() .context("no longer live")? .add_peer_if_not_seen(peer); } } Ok(()) }); Ok(()) } Err(err) => { let result = anyhow::anyhow!("{:?}", err); t.locked.write().state = ManagedTorrentState::Error(err); Err(result) } } }); Ok(()) } ManagedTorrentState::Paused(_) => { let paused = g.state.take().assert_paused(); let live = TorrentStateLive::new(paused); g.state = ManagedTorrentState::Live(live); Ok(()) } ManagedTorrentState::Error(_) => { bail!("starting torrents from error state not implemented") } ManagedTorrentState::None => bail!("bug: torrent is in empty state"), } } pub fn pause(&self) -> anyhow::Result<()> { let mut g = self.locked.write(); match &g.state { ManagedTorrentState::Live(live) => { let paused = live.pause()?; g.state = ManagedTorrentState::Paused(paused); Ok(()) } ManagedTorrentState::Initializing(_) => { bail!("torrent is initializing, can't pause"); } ManagedTorrentState::Paused(_) => { bail!("torrent is already paused"); } ManagedTorrentState::Error(_) => { bail!("can't pause torrent in error state") } ManagedTorrentState::None => bail!("bug: torrent is in empty state"), } } pub async fn wait_until_completed(&self) -> anyhow::Result<()> { // TODO: rewrite self.live() .context("torrent isn't live")? .wait_until_completed() .await; Ok(()) } } pub struct ManagedTorrentBuilder { info: TorrentMetaV1Info, info_hash: Id20, output_folder: PathBuf, force_tracker_interval: Option, peer_connect_timeout: Option, peer_read_write_timeout: Option, only_files: Option>, trackers: Vec, peer_id: Option, overwrite: bool, spawner: Option, } impl ManagedTorrentBuilder { pub fn new>( info: TorrentMetaV1Info, info_hash: Id20, output_folder: P, ) -> Self { Self { info, info_hash, output_folder: output_folder.as_ref().into(), spawner: None, force_tracker_interval: None, peer_connect_timeout: None, peer_read_write_timeout: None, only_files: None, trackers: Default::default(), peer_id: None, overwrite: false, } } pub fn only_files(&mut self, only_files: Vec) -> &mut Self { self.only_files = Some(only_files); self } pub fn trackers(&mut self, trackers: Vec) -> &mut Self { self.trackers = trackers; self } pub fn overwrite(&mut self, overwrite: bool) -> &mut Self { self.overwrite = overwrite; self } pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self { self.force_tracker_interval = Some(force_tracker_interval); self } pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self { self.spawner = Some(spawner); self } pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self { self.peer_id = Some(peer_id); self } pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self { self.peer_connect_timeout = Some(timeout); self } pub fn peer_read_write_timeout(&mut self, timeout: Duration) -> &mut Self { self.peer_read_write_timeout = Some(timeout); self } pub(crate) fn build(self) -> ManagedTorrentHandle { let info = Arc::new(ManagedTorrentInfo { info: self.info, info_hash: self.info_hash, out_dir: self.output_folder, trackers: self.trackers.into_iter().collect(), spawner: self.spawner.unwrap_or_default(), peer_id: self.peer_id.unwrap_or_else(generate_peer_id), options: ManagedTorrentOptions { force_tracker_interval: self.force_tracker_interval, peer_connect_timeout: self.peer_connect_timeout, peer_read_write_timeout: self.peer_read_write_timeout, overwrite: self.overwrite, }, }); let initializing = Arc::new(TorrentStateInitializing::new( info.clone(), self.only_files.clone(), )); Arc::new(ManagedTorrent { only_files: self.only_files, locked: RwLock::new(ManagedTorrentLocked { state: ManagedTorrentState::Initializing(initializing), }), info, }) } } pub type ManagedTorrentHandle = Arc;