diff --git a/Cargo.lock b/Cargo.lock index 1b88261..cd5519b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1253,7 +1253,7 @@ dependencies = [ [[package]] name = "librqbit" -version = "5.0.0-beta.0" +version = "5.0.0-beta.1" dependencies = [ "anyhow", "axum 0.7.1", @@ -1343,7 +1343,7 @@ dependencies = [ [[package]] name = "librqbit-dht" -version = "4.1.0" +version = "5.0.0-beta.1" dependencies = [ "anyhow", "backoff", @@ -2002,7 +2002,7 @@ dependencies = [ [[package]] name = "rqbit" -version = "5.0.0-beta.0" +version = "5.0.0-beta.1" dependencies = [ "anyhow", "clap", diff --git a/crates/dht/Cargo.toml b/crates/dht/Cargo.toml index e3a55b3..220147c 100644 --- a/crates/dht/Cargo.toml +++ b/crates/dht/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit-dht" -version = "4.1.0" +version = "5.0.0-beta.1" edition = "2021" description = "DHT implementation, used in rqbit torrent client." license = "Apache-2.0" diff --git a/crates/dht/examples/dht.rs b/crates/dht/examples/dht.rs index 11c289c..07a3375 100644 --- a/crates/dht/examples/dht.rs +++ b/crates/dht/examples/dht.rs @@ -16,7 +16,9 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); - let dht = DhtBuilder::new().await.context("error initializing DHT")?; + let (dht, worker) = DhtBuilder::new().await.context("error initializing DHT")?; + tokio::spawn(worker); + let mut stream = dht.get_peers(info_hash, None)?; let stats_printer = async { diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index b55a54c..da58179 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -23,7 +23,7 @@ use anyhow::{bail, Context}; use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use bencode::ByteString; use dashmap::DashMap; -use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; +use futures::{stream::FuturesUnordered, Future, Stream, StreamExt, TryFutureExt}; use leaky_bucket::RateLimiter; use librqbit_core::{id20::Id20, peer_id::generate_peer_id, spawn_utils::spawn}; @@ -1127,10 +1127,18 @@ pub struct DhtConfig { } impl DhtState { - pub async fn new() -> anyhow::Result> { + pub async fn new() -> anyhow::Result<( + Arc, + impl Future> + Send + Sync + 'static, + )> { Self::with_config(DhtConfig::default()).await } - pub async fn with_config(config: DhtConfig) -> anyhow::Result> { + pub async fn with_config( + config: DhtConfig, + ) -> anyhow::Result<( + Arc, + impl Future> + Send + Sync + 'static, + )> { let socket = match config.listen_addr { Some(addr) => UdpSocket::bind(addr) .await @@ -1160,15 +1168,15 @@ impl DhtState { config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), )); - spawn(error_span!("dht"), { + let run_worker = { let state = state.clone(); async move { let worker = DhtWorker { socket, dht: state }; worker.start(in_rx, &bootstrap_addrs).await?; Ok(()) } - }); - Ok(state) + }; + Ok((state, run_worker)) } pub fn get_peers( diff --git a/crates/dht/src/lib.rs b/crates/dht/src/lib.rs index 94188d0..325c789 100644 --- a/crates/dht/src/lib.rs +++ b/crates/dht/src/lib.rs @@ -10,6 +10,7 @@ use std::time::Duration; pub use crate::dht::DhtStats; pub use crate::dht::{DhtConfig, DhtState, RequestPeersStream}; +use futures::Future; pub use librqbit_core::id20::Id20; pub use persistence::{PersistentDht, PersistentDhtConfig}; @@ -26,11 +27,19 @@ pub struct DhtBuilder {} impl DhtBuilder { #[allow(clippy::new_ret_no_self)] - pub async fn new() -> anyhow::Result { + pub async fn new() -> anyhow::Result<( + Dht, + impl Future> + Send + Sync + 'static, + )> { DhtState::new().await } - pub async fn with_config(config: DhtConfig) -> anyhow::Result { + pub async fn with_config( + config: DhtConfig, + ) -> anyhow::Result<( + Dht, + impl Future> + Send + Sync + 'static, + )> { DhtState::with_config(config).await } } diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index b931910..4b8a44a 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -1,7 +1,7 @@ // TODO: this now stores only the routing table, but we also need AT LEAST the same socket address... +use futures::Future; use librqbit_core::directories::get_configuration_directory; -use librqbit_core::spawn_utils::spawn; use serde::{Deserialize, Serialize}; use std::fs::OpenOptions; use std::io::{BufReader, BufWriter}; @@ -10,7 +10,7 @@ use std::path::{Path, PathBuf}; use std::time::Duration; use anyhow::Context; -use tracing::{debug, error, error_span, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; use crate::peer_store::PeerStore; use crate::routing_table::RoutingTable; @@ -74,7 +74,13 @@ impl PersistentDht { Ok(path) } - pub async fn create(config: Option) -> anyhow::Result { + pub async fn create( + config: Option, + ) -> anyhow::Result<( + Dht, + impl Future> + Send + Sync + 'static, + impl Future> + Send + Sync + 'static, + )> { let mut config = config.unwrap_or_default(); let config_filename = match config.config_filename.take() { Some(config_filename) => config_filename, @@ -125,9 +131,9 @@ impl PersistentDht { peer_store, ..Default::default() }; - let dht = DhtState::with_config(dht_config).await?; + let (dht, run_worker) = DhtState::with_config(dht_config).await?; - spawn(error_span!("dht_persistence"), { + let run_persistence = { let dht = dht.clone(); let dump_interval = config .dump_interval @@ -150,7 +156,8 @@ impl PersistentDht { } } } - }); - Ok(dht) + }; + + Ok((dht, run_worker, run_persistence)) } } diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index e79e223..4259b90 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit" -version = "5.0.0-beta.0" +version = "5.0.0-beta.1" authors = ["Igor Katson "] edition = "2021" description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it." @@ -28,7 +28,7 @@ librqbit-core = {path = "../librqbit_core", version = "3.3.0"} clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.3.0"} sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"} -dht = {path = "../dht", package="librqbit-dht", version="4.1.0"} +dht = {path = "../dht", package="librqbit-dht", version="5.0.0-beta.1"} librqbit-upnp = {path = "../upnp", version = "0.1.0"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index ea1225b..ac84fd5 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -107,7 +107,9 @@ mod tests { init_logging(); let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap(); - let dht = DhtBuilder::new().await.unwrap(); + let (dht, run_dht) = DhtBuilder::new().await.unwrap(); + tokio::spawn(run_dht); + let peer_rx = dht.get_peers(info_hash, None).unwrap(); let peer_id = generate_peer_id(); match read_metainfo_from_peer_receiver(peer_id, info_hash, Vec::new(), peer_rx, None).await diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 1b74d88..814ebfc 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -150,6 +150,23 @@ struct SerializedSessionDatabase { torrents: HashMap, } +fn spawn_with_cancel_token( + mut cancel_rx: tokio::sync::watch::Receiver<()>, + name: &str, + span: tracing::Span, + fut: impl std::future::Future> + Send + 'static, +) { + spawn(name, span, async move { + tokio::select! { + r = fut => r, + _ = cancel_rx.changed() => { + debug!("task canceled"); + Ok(()) + } + } + }); +} + pub struct Session { peer_id: Id20, dht: Option, @@ -385,6 +402,8 @@ impl Session { ) -> anyhow::Result> { let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); + let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(()); + let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range { let (l, p) = create_tcp_listener(port_range) .await @@ -399,12 +418,26 @@ impl Session { None } else { let dht = if opts.disable_dht_persistence { - DhtBuilder::with_config(DhtConfig::default()).await + let (dht, run_worker) = DhtBuilder::with_config(DhtConfig::default()) + .await + .context("error initializing DHT")?; + spawn_with_cancel_token(cancel_rx.clone(), "dht", error_span!("dht"), run_worker); + dht } else { let pdht_config = opts.dht_config.take().unwrap_or_default(); - PersistentDht::create(Some(pdht_config)).await - } - .context("error initializing DHT")?; + let (dht, run_worker, run_persistence) = PersistentDht::create(Some(pdht_config)) + .await + .context("error initializing persistent DHT")?; + spawn_with_cancel_token(cancel_rx.clone(), "dht", error_span!("dht"), run_worker); + spawn_with_cancel_token( + cancel_rx.clone(), + "dht_persistence", + error_span!("dht_persistence"), + run_persistence, + ); + dht + }; + Some(dht) }; let peer_opts = opts.peer_opts.unwrap_or_default(); @@ -414,8 +447,6 @@ impl Session { }; let spawner = BlockingSpawner::default(); - let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(()); - let session = Arc::new(Self { persistence_filename, peer_id, @@ -618,22 +649,26 @@ impl Session { span: tracing::Span, fut: impl std::future::Future> + Send + 'static, ) { - let mut cancel_rx = self.cancel_rx.clone(); - spawn(name, span, async move { - tokio::select! { - r = fut => r, - _ = cancel_rx.changed() => { - debug!("task canceled"); - Ok(()) - } - } - }); + spawn_with_cancel_token(self.cancel_rx.clone(), name, span, fut); } /// Stop the session and all managed tasks. - // TODO: this probably doesn't kill everything properly. pub async fn stop(&self) { + let torrents = self + .db + .read() + .torrents + .values() + .cloned() + .collect::>(); + for torrent in torrents { + if let Err(e) = torrent.pause() { + debug!("error pausing torrent: {e:#}"); + } + } let _ = self.cancel_tx.send(()); + // this sucks, but hopefully will be enough + tokio::time::sleep(Duration::from_secs(1)).await; } async fn populate_from_stored(self: &Arc) -> anyhow::Result<()> { diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index 00b6fd3..b4469d8 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rqbit" -version = "5.0.0-beta.0" +version = "5.0.0-beta.1" authors = ["Igor Katson "] edition = "2021" description = "A bittorrent command line client and server." @@ -23,7 +23,7 @@ default-tls = ["librqbit/default-tls"] rust-tls = ["librqbit/rust-tls"] [dependencies] -librqbit = {path="../librqbit", default-features=false, version = "5.0.0-beta.0"} +librqbit = {path="../librqbit", default-features=false, version = "5.0.0-beta.1"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} console-subscriber = {version = "0.2", optional = true} anyhow = "1" diff --git a/desktop/src-tauri/Cargo.lock b/desktop/src-tauri/Cargo.lock index 6c31929..d1be380 100644 --- a/desktop/src-tauri/Cargo.lock +++ b/desktop/src-tauri/Cargo.lock @@ -1867,7 +1867,7 @@ dependencies = [ [[package]] name = "librqbit" -version = "5.0.0-beta.0" +version = "5.0.0-beta.1" dependencies = [ "anyhow", "axum", @@ -1951,7 +1951,7 @@ dependencies = [ [[package]] name = "librqbit-dht" -version = "4.1.0" +version = "5.0.0-beta.1" dependencies = [ "anyhow", "backoff", diff --git a/desktop/src/configure.tsx b/desktop/src/configure.tsx index 11616e6..9044abc 100644 --- a/desktop/src/configure.tsx +++ b/desktop/src/configure.tsx @@ -58,8 +58,10 @@ export const ConfigModal: React.FC<{ handleConfigured: (config: RqbitDesktopConfig) => void, handleCancel?: () => void, initialConfig: RqbitDesktopConfig, -}> = ({ show, handleStartReconfigure, handleConfigured, handleCancel, initialConfig }) => { + defaultConfig: RqbitDesktopConfig, +}> = ({ show, handleStartReconfigure, handleConfigured, handleCancel, initialConfig, defaultConfig }) => { let [config, setConfig] = useState(initialConfig); + let [loading, setLoading] = useState(false); const [error, setError] = useState(null); @@ -107,9 +109,14 @@ export const ConfigModal: React.FC<{ const handleOkClick = () => { setError(null); handleStartReconfigure(); + setLoading(true); invokeAPI<{}>("config_change", { config }).then( - () => handleConfigured(config), + () => { + setLoading(false); + handleConfigured(config); + }, (e: ErrorDetails) => { + setLoading(false); setError({ text: "Error saving configuration", details: e, @@ -228,6 +235,7 @@ export const ConfigModal: React.FC<{ inputType="text" value={config.persistence.filename} onChange={handleInputChange} + disabled={config.persistence.disable} /> @@ -293,10 +301,10 @@ export const ConfigModal: React.FC<{ Cancel } - - diff --git a/desktop/src/rqbit-desktop.tsx b/desktop/src/rqbit-desktop.tsx index c739776..a20699c 100644 --- a/desktop/src/rqbit-desktop.tsx +++ b/desktop/src/rqbit-desktop.tsx @@ -32,8 +32,11 @@ export const RqbitDesktop: React.FC<{ }} handleConfigured={(config) => { setConfig(config); + setConfigurationOpened(false); setConfigured(true); }} - initialConfig={config} /> + initialConfig={config} + defaultConfig={defaultConfig} + /> } \ No newline at end of file