settings dialog works now and reloads the session properly

This commit is contained in:
Igor Katson 2023-12-06 23:18:06 +00:00
parent e4543e1ba2
commit 9cbe16b387
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
13 changed files with 123 additions and 49 deletions

6
Cargo.lock generated
View file

@ -1253,7 +1253,7 @@ dependencies = [
[[package]] [[package]]
name = "librqbit" name = "librqbit"
version = "5.0.0-beta.0" version = "5.0.0-beta.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"axum 0.7.1", "axum 0.7.1",
@ -1343,7 +1343,7 @@ dependencies = [
[[package]] [[package]]
name = "librqbit-dht" name = "librqbit-dht"
version = "4.1.0" version = "5.0.0-beta.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"backoff", "backoff",
@ -2002,7 +2002,7 @@ dependencies = [
[[package]] [[package]]
name = "rqbit" name = "rqbit"
version = "5.0.0-beta.0" version = "5.0.0-beta.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "librqbit-dht" name = "librqbit-dht"
version = "4.1.0" version = "5.0.0-beta.1"
edition = "2021" edition = "2021"
description = "DHT implementation, used in rqbit torrent client." description = "DHT implementation, used in rqbit torrent client."
license = "Apache-2.0" license = "Apache-2.0"

View file

@ -16,7 +16,9 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let dht = DhtBuilder::new().await.context("error initializing DHT")?; let (dht, worker) = DhtBuilder::new().await.context("error initializing DHT")?;
tokio::spawn(worker);
let mut stream = dht.get_peers(info_hash, None)?; let mut stream = dht.get_peers(info_hash, None)?;
let stats_printer = async { let stats_printer = async {

View file

@ -23,7 +23,7 @@ use anyhow::{bail, Context};
use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
use bencode::ByteString; use bencode::ByteString;
use dashmap::DashMap; use dashmap::DashMap;
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt, TryFutureExt};
use leaky_bucket::RateLimiter; use leaky_bucket::RateLimiter;
use librqbit_core::{id20::Id20, peer_id::generate_peer_id, spawn_utils::spawn}; use librqbit_core::{id20::Id20, peer_id::generate_peer_id, spawn_utils::spawn};
@ -1127,10 +1127,18 @@ pub struct DhtConfig {
} }
impl DhtState { impl DhtState {
pub async fn new() -> anyhow::Result<Arc<Self>> { pub async fn new() -> anyhow::Result<(
Arc<Self>,
impl Future<Output = anyhow::Result<()>> + Send + Sync + 'static,
)> {
Self::with_config(DhtConfig::default()).await Self::with_config(DhtConfig::default()).await
} }
pub async fn with_config(config: DhtConfig) -> anyhow::Result<Arc<Self>> { pub async fn with_config(
config: DhtConfig,
) -> anyhow::Result<(
Arc<Self>,
impl Future<Output = anyhow::Result<()>> + Send + Sync + 'static,
)> {
let socket = match config.listen_addr { let socket = match config.listen_addr {
Some(addr) => UdpSocket::bind(addr) Some(addr) => UdpSocket::bind(addr)
.await .await
@ -1160,15 +1168,15 @@ impl DhtState {
config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)),
)); ));
spawn(error_span!("dht"), { let run_worker = {
let state = state.clone(); let state = state.clone();
async move { async move {
let worker = DhtWorker { socket, dht: state }; let worker = DhtWorker { socket, dht: state };
worker.start(in_rx, &bootstrap_addrs).await?; worker.start(in_rx, &bootstrap_addrs).await?;
Ok(()) Ok(())
} }
}); };
Ok(state) Ok((state, run_worker))
} }
pub fn get_peers( pub fn get_peers(

View file

@ -10,6 +10,7 @@ use std::time::Duration;
pub use crate::dht::DhtStats; pub use crate::dht::DhtStats;
pub use crate::dht::{DhtConfig, DhtState, RequestPeersStream}; pub use crate::dht::{DhtConfig, DhtState, RequestPeersStream};
use futures::Future;
pub use librqbit_core::id20::Id20; pub use librqbit_core::id20::Id20;
pub use persistence::{PersistentDht, PersistentDhtConfig}; pub use persistence::{PersistentDht, PersistentDhtConfig};
@ -26,11 +27,19 @@ pub struct DhtBuilder {}
impl DhtBuilder { impl DhtBuilder {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub async fn new() -> anyhow::Result<Dht> { pub async fn new() -> anyhow::Result<(
Dht,
impl Future<Output = anyhow::Result<()>> + Send + Sync + 'static,
)> {
DhtState::new().await DhtState::new().await
} }
pub async fn with_config(config: DhtConfig) -> anyhow::Result<Dht> { pub async fn with_config(
config: DhtConfig,
) -> anyhow::Result<(
Dht,
impl Future<Output = anyhow::Result<()>> + Send + Sync + 'static,
)> {
DhtState::with_config(config).await DhtState::with_config(config).await
} }
} }

View file

@ -1,7 +1,7 @@
// TODO: this now stores only the routing table, but we also need AT LEAST the same socket address... // TODO: this now stores only the routing table, but we also need AT LEAST the same socket address...
use futures::Future;
use librqbit_core::directories::get_configuration_directory; use librqbit_core::directories::get_configuration_directory;
use librqbit_core::spawn_utils::spawn;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::{BufReader, BufWriter}; use std::io::{BufReader, BufWriter};
@ -10,7 +10,7 @@ use std::path::{Path, PathBuf};
use std::time::Duration; use std::time::Duration;
use anyhow::Context; use anyhow::Context;
use tracing::{debug, error, error_span, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
use crate::peer_store::PeerStore; use crate::peer_store::PeerStore;
use crate::routing_table::RoutingTable; use crate::routing_table::RoutingTable;
@ -74,7 +74,13 @@ impl PersistentDht {
Ok(path) Ok(path)
} }
pub async fn create(config: Option<PersistentDhtConfig>) -> anyhow::Result<Dht> { pub async fn create(
config: Option<PersistentDhtConfig>,
) -> anyhow::Result<(
Dht,
impl Future<Output = anyhow::Result<()>> + Send + Sync + 'static,
impl Future<Output = anyhow::Result<()>> + Send + Sync + 'static,
)> {
let mut config = config.unwrap_or_default(); let mut config = config.unwrap_or_default();
let config_filename = match config.config_filename.take() { let config_filename = match config.config_filename.take() {
Some(config_filename) => config_filename, Some(config_filename) => config_filename,
@ -125,9 +131,9 @@ impl PersistentDht {
peer_store, peer_store,
..Default::default() ..Default::default()
}; };
let dht = DhtState::with_config(dht_config).await?; let (dht, run_worker) = DhtState::with_config(dht_config).await?;
spawn(error_span!("dht_persistence"), { let run_persistence = {
let dht = dht.clone(); let dht = dht.clone();
let dump_interval = config let dump_interval = config
.dump_interval .dump_interval
@ -150,7 +156,8 @@ impl PersistentDht {
} }
} }
} }
}); };
Ok(dht)
Ok((dht, run_worker, run_persistence))
} }
} }

View file

@ -1,6 +1,6 @@
[package] [package]
name = "librqbit" name = "librqbit"
version = "5.0.0-beta.0" version = "5.0.0-beta.1"
authors = ["Igor Katson <igor.katson@gmail.com>"] authors = ["Igor Katson <igor.katson@gmail.com>"]
edition = "2021" edition = "2021"
description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it." description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it."
@ -28,7 +28,7 @@ librqbit-core = {path = "../librqbit_core", version = "3.3.0"}
clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.3.0"} peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.3.0"}
sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"} sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"}
dht = {path = "../dht", package="librqbit-dht", version="4.1.0"} dht = {path = "../dht", package="librqbit-dht", version="5.0.0-beta.1"}
librqbit-upnp = {path = "../upnp", version = "0.1.0"} librqbit-upnp = {path = "../upnp", version = "0.1.0"}
tokio = {version = "1", features = ["macros", "rt-multi-thread"]} tokio = {version = "1", features = ["macros", "rt-multi-thread"]}

View file

@ -107,7 +107,9 @@ mod tests {
init_logging(); init_logging();
let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap(); let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap();
let dht = DhtBuilder::new().await.unwrap(); let (dht, run_dht) = DhtBuilder::new().await.unwrap();
tokio::spawn(run_dht);
let peer_rx = dht.get_peers(info_hash, None).unwrap(); let peer_rx = dht.get_peers(info_hash, None).unwrap();
let peer_id = generate_peer_id(); let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver(peer_id, info_hash, Vec::new(), peer_rx, None).await match read_metainfo_from_peer_receiver(peer_id, info_hash, Vec::new(), peer_rx, None).await

View file

@ -150,6 +150,23 @@ struct SerializedSessionDatabase {
torrents: HashMap<usize, SerializedTorrent>, torrents: HashMap<usize, SerializedTorrent>,
} }
fn spawn_with_cancel_token(
mut cancel_rx: tokio::sync::watch::Receiver<()>,
name: &str,
span: tracing::Span,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) {
spawn(name, span, async move {
tokio::select! {
r = fut => r,
_ = cancel_rx.changed() => {
debug!("task canceled");
Ok(())
}
}
});
}
pub struct Session { pub struct Session {
peer_id: Id20, peer_id: Id20,
dht: Option<Dht>, dht: Option<Dht>,
@ -385,6 +402,8 @@ impl Session {
) -> anyhow::Result<Arc<Self>> { ) -> anyhow::Result<Arc<Self>> {
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(());
let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range { let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range {
let (l, p) = create_tcp_listener(port_range) let (l, p) = create_tcp_listener(port_range)
.await .await
@ -399,12 +418,26 @@ impl Session {
None None
} else { } else {
let dht = if opts.disable_dht_persistence { let dht = if opts.disable_dht_persistence {
DhtBuilder::with_config(DhtConfig::default()).await let (dht, run_worker) = DhtBuilder::with_config(DhtConfig::default())
.await
.context("error initializing DHT")?;
spawn_with_cancel_token(cancel_rx.clone(), "dht", error_span!("dht"), run_worker);
dht
} else { } else {
let pdht_config = opts.dht_config.take().unwrap_or_default(); let pdht_config = opts.dht_config.take().unwrap_or_default();
PersistentDht::create(Some(pdht_config)).await let (dht, run_worker, run_persistence) = PersistentDht::create(Some(pdht_config))
} .await
.context("error initializing DHT")?; .context("error initializing persistent DHT")?;
spawn_with_cancel_token(cancel_rx.clone(), "dht", error_span!("dht"), run_worker);
spawn_with_cancel_token(
cancel_rx.clone(),
"dht_persistence",
error_span!("dht_persistence"),
run_persistence,
);
dht
};
Some(dht) Some(dht)
}; };
let peer_opts = opts.peer_opts.unwrap_or_default(); let peer_opts = opts.peer_opts.unwrap_or_default();
@ -414,8 +447,6 @@ impl Session {
}; };
let spawner = BlockingSpawner::default(); let spawner = BlockingSpawner::default();
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(());
let session = Arc::new(Self { let session = Arc::new(Self {
persistence_filename, persistence_filename,
peer_id, peer_id,
@ -618,22 +649,26 @@ impl Session {
span: tracing::Span, span: tracing::Span,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static, fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) { ) {
let mut cancel_rx = self.cancel_rx.clone(); spawn_with_cancel_token(self.cancel_rx.clone(), name, span, fut);
spawn(name, span, async move {
tokio::select! {
r = fut => r,
_ = cancel_rx.changed() => {
debug!("task canceled");
Ok(())
}
}
});
} }
/// Stop the session and all managed tasks. /// Stop the session and all managed tasks.
// TODO: this probably doesn't kill everything properly.
pub async fn stop(&self) { pub async fn stop(&self) {
let torrents = self
.db
.read()
.torrents
.values()
.cloned()
.collect::<Vec<_>>();
for torrent in torrents {
if let Err(e) = torrent.pause() {
debug!("error pausing torrent: {e:#}");
}
}
let _ = self.cancel_tx.send(()); let _ = self.cancel_tx.send(());
// this sucks, but hopefully will be enough
tokio::time::sleep(Duration::from_secs(1)).await;
} }
async fn populate_from_stored(self: &Arc<Self>) -> anyhow::Result<()> { async fn populate_from_stored(self: &Arc<Self>) -> anyhow::Result<()> {

View file

@ -1,6 +1,6 @@
[package] [package]
name = "rqbit" name = "rqbit"
version = "5.0.0-beta.0" version = "5.0.0-beta.1"
authors = ["Igor Katson <igor.katson@gmail.com>"] authors = ["Igor Katson <igor.katson@gmail.com>"]
edition = "2021" edition = "2021"
description = "A bittorrent command line client and server." description = "A bittorrent command line client and server."
@ -23,7 +23,7 @@ default-tls = ["librqbit/default-tls"]
rust-tls = ["librqbit/rust-tls"] rust-tls = ["librqbit/rust-tls"]
[dependencies] [dependencies]
librqbit = {path="../librqbit", default-features=false, version = "5.0.0-beta.0"} librqbit = {path="../librqbit", default-features=false, version = "5.0.0-beta.1"}
tokio = {version = "1", features = ["macros", "rt-multi-thread"]} tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
console-subscriber = {version = "0.2", optional = true} console-subscriber = {version = "0.2", optional = true}
anyhow = "1" anyhow = "1"

View file

@ -1867,7 +1867,7 @@ dependencies = [
[[package]] [[package]]
name = "librqbit" name = "librqbit"
version = "5.0.0-beta.0" version = "5.0.0-beta.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"axum", "axum",
@ -1951,7 +1951,7 @@ dependencies = [
[[package]] [[package]]
name = "librqbit-dht" name = "librqbit-dht"
version = "4.1.0" version = "5.0.0-beta.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"backoff", "backoff",

View file

@ -58,8 +58,10 @@ export const ConfigModal: React.FC<{
handleConfigured: (config: RqbitDesktopConfig) => void, handleConfigured: (config: RqbitDesktopConfig) => void,
handleCancel?: () => void, handleCancel?: () => void,
initialConfig: RqbitDesktopConfig, initialConfig: RqbitDesktopConfig,
}> = ({ show, handleStartReconfigure, handleConfigured, handleCancel, initialConfig }) => { defaultConfig: RqbitDesktopConfig,
}> = ({ show, handleStartReconfigure, handleConfigured, handleCancel, initialConfig, defaultConfig }) => {
let [config, setConfig] = useState<RqbitDesktopConfig>(initialConfig); let [config, setConfig] = useState<RqbitDesktopConfig>(initialConfig);
let [loading, setLoading] = useState<boolean>(false);
const [error, setError] = useState<any | null>(null); const [error, setError] = useState<any | null>(null);
@ -107,9 +109,14 @@ export const ConfigModal: React.FC<{
const handleOkClick = () => { const handleOkClick = () => {
setError(null); setError(null);
handleStartReconfigure(); handleStartReconfigure();
setLoading(true);
invokeAPI<{}>("config_change", { config }).then( invokeAPI<{}>("config_change", { config }).then(
() => handleConfigured(config), () => {
setLoading(false);
handleConfigured(config);
},
(e: ErrorDetails) => { (e: ErrorDetails) => {
setLoading(false);
setError({ setError({
text: "Error saving configuration", text: "Error saving configuration",
details: e, details: e,
@ -228,6 +235,7 @@ export const ConfigModal: React.FC<{
inputType="text" inputType="text"
value={config.persistence.filename} value={config.persistence.filename}
onChange={handleInputChange} onChange={handleInputChange}
disabled={config.persistence.disable}
/> />
</Tab> </Tab>
@ -293,10 +301,10 @@ export const ConfigModal: React.FC<{
Cancel Cancel
</Button> </Button>
} }
<Button variant="secondary" onClick={() => setConfig(initialConfig)}> <Button variant="secondary" onClick={() => setConfig(defaultConfig)}>
Reset to defaults Reset to defaults
</Button> </Button>
<Button variant="primary" onClick={handleOkClick}> <Button variant="primary" onClick={handleOkClick} disabled={loading}>
OK OK
</Button> </Button>
</Modal.Footer> </Modal.Footer>

View file

@ -32,8 +32,11 @@ export const RqbitDesktop: React.FC<{
}} }}
handleConfigured={(config) => { handleConfigured={(config) => {
setConfig(config); setConfig(config);
setConfigurationOpened(false);
setConfigured(true); setConfigured(true);
}} }}
initialConfig={config} /> initialConfig={config}
defaultConfig={defaultConfig}
/>
</> </>
} }