Merge pull request #43 from ikatson/desktop-configuration

Desktop configuration
This commit is contained in:
Igor Katson 2023-12-07 12:28:16 +00:00 committed by GitHub
commit e480ca71ca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 1071 additions and 227 deletions

9
Cargo.lock generated
View file

@ -1253,7 +1253,7 @@ dependencies = [
[[package]] [[package]]
name = "librqbit" name = "librqbit"
version = "5.0.0-beta.0" version = "5.0.0-beta.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"axum 0.7.1", "axum 0.7.1",
@ -1290,6 +1290,7 @@ dependencies = [
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-test", "tokio-test",
"tokio-util",
"tower-http", "tower-http",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
@ -1336,6 +1337,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-util",
"tracing", "tracing",
"url", "url",
"uuid", "uuid",
@ -1343,7 +1345,7 @@ dependencies = [
[[package]] [[package]]
name = "librqbit-dht" name = "librqbit-dht"
version = "4.1.0" version = "5.0.0-beta.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"backoff", "backoff",
@ -1362,6 +1364,7 @@ dependencies = [
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-util",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
] ]
@ -2002,7 +2005,7 @@ dependencies = [
[[package]] [[package]]
name = "rqbit" name = "rqbit"
version = "5.0.0-beta.0" version = "5.0.0-beta.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "librqbit-dht" name = "librqbit-dht"
version = "4.1.0" version = "5.0.0-beta.1"
edition = "2021" edition = "2021"
description = "DHT implementation, used in rqbit torrent client." description = "DHT implementation, used in rqbit torrent client."
license = "Apache-2.0" license = "Apache-2.0"
@ -32,10 +32,10 @@ futures = "0.3"
rand = "0.8" rand = "0.8"
indexmap = "2" indexmap = "2"
dashmap = {version = "5.5.3", features = ["serde"]} dashmap = {version = "5.5.3", features = ["serde"]}
clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
librqbit-core = {path="../librqbit_core", version = "3.3.0"} librqbit-core = {path="../librqbit_core", version = "3.3.0"}
chrono = {version = "0.4.31", features = ["serde"]} chrono = {version = "0.4.31", features = ["serde"]}
tokio-util = "0.7.10"
[dev-dependencies] [dev-dependencies]
tracing-subscriber = "0.3" tracing-subscriber = "0.3"

View file

@ -17,6 +17,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let dht = DhtBuilder::new().await.context("error initializing DHT")?; let dht = DhtBuilder::new().await.context("error initializing DHT")?;
let mut stream = dht.get_peers(info_hash, None)?; let mut stream = dht.get_peers(info_hash, None)?;
let stats_printer = async { let stats_printer = async {

View file

@ -26,7 +26,11 @@ use dashmap::DashMap;
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use leaky_bucket::RateLimiter; use leaky_bucket::RateLimiter;
use librqbit_core::{id20::Id20, peer_id::generate_peer_id, spawn_utils::spawn}; use librqbit_core::{
id20::Id20,
peer_id::generate_peer_id,
spawn_utils::{spawn, spawn_with_cancel},
};
use parking_lot::RwLock; use parking_lot::RwLock;
use serde::Serialize; use serde::Serialize;
@ -35,6 +39,7 @@ use tokio::{
sync::mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, sync::mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
}; };
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, error_span, info, trace, warn, Instrument}; use tracing::{debug, debug_span, error, error_span, info, trace, warn, Instrument};
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@ -535,6 +540,8 @@ pub struct DhtState {
// This is to send raw messages // This is to send raw messages
worker_sender: UnboundedSender<WorkerSendRequest>, worker_sender: UnboundedSender<WorkerSendRequest>,
cancellation_token: CancellationToken,
pub(crate) peer_store: PeerStore, pub(crate) peer_store: PeerStore,
} }
@ -545,6 +552,7 @@ impl DhtState {
routing_table: Option<RoutingTable>, routing_table: Option<RoutingTable>,
listen_addr: SocketAddr, listen_addr: SocketAddr,
peer_store: PeerStore, peer_store: PeerStore,
cancellation_token: CancellationToken,
) -> Self { ) -> Self {
let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None)); let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None));
Self { Self {
@ -556,6 +564,7 @@ impl DhtState {
listen_addr, listen_addr,
rate_limiter: make_rate_limiter(), rate_limiter: make_rate_limiter(),
peer_store, peer_store,
cancellation_token,
} }
} }
@ -1124,13 +1133,18 @@ pub struct DhtConfig {
pub routing_table: Option<RoutingTable>, pub routing_table: Option<RoutingTable>,
pub listen_addr: Option<SocketAddr>, pub listen_addr: Option<SocketAddr>,
pub peer_store: Option<PeerStore>, pub peer_store: Option<PeerStore>,
pub cancellation_token: Option<CancellationToken>,
} }
impl DhtState { impl DhtState {
pub async fn new() -> anyhow::Result<Arc<Self>> { pub async fn new() -> anyhow::Result<Arc<Self>> {
Self::with_config(DhtConfig::default()).await Self::with_config(DhtConfig::default()).await
} }
pub async fn with_config(config: DhtConfig) -> anyhow::Result<Arc<Self>> { pub fn cancellation_token(&self) -> &CancellationToken {
&self.cancellation_token
}
pub async fn with_config(mut config: DhtConfig) -> anyhow::Result<Arc<Self>> {
let socket = match config.listen_addr { let socket = match config.listen_addr {
Some(addr) => UdpSocket::bind(addr) Some(addr) => UdpSocket::bind(addr)
.await .await
@ -1151,6 +1165,8 @@ impl DhtState {
.bootstrap_addrs .bootstrap_addrs
.unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect()); .unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect());
let token = config.cancellation_token.take().unwrap_or_default();
let (in_tx, in_rx) = unbounded_channel(); let (in_tx, in_rx) = unbounded_channel();
let state = Arc::new(Self::new_internal( let state = Arc::new(Self::new_internal(
peer_id, peer_id,
@ -1158,14 +1174,14 @@ impl DhtState {
config.routing_table, config.routing_table,
listen_addr, listen_addr,
config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)),
token,
)); ));
spawn(error_span!("dht"), { spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), {
let state = state.clone(); let state = state.clone();
async move { async move {
let worker = DhtWorker { socket, dht: state }; let worker = DhtWorker { socket, dht: state };
worker.start(in_rx, &bootstrap_addrs).await?; worker.start(in_rx, &bootstrap_addrs).await
Ok(())
} }
}); });
Ok(state) Ok(state)

View file

@ -1,13 +1,14 @@
// TODO: this now stores only the routing table, but we also need AT LEAST the same socket address... // TODO: this now stores only the routing table, but we also need AT LEAST the same socket address...
use librqbit_core::directories::get_configuration_directory; use librqbit_core::directories::get_configuration_directory;
use librqbit_core::spawn_utils::spawn; use librqbit_core::spawn_utils::spawn_with_cancel;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::{BufReader, BufWriter}; use std::io::{BufReader, BufWriter};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::Duration; use std::time::Duration;
use tokio_util::sync::CancellationToken;
use anyhow::Context; use anyhow::Context;
use tracing::{debug, error, error_span, info, trace, warn}; use tracing::{debug, error, error_span, info, trace, warn};
@ -68,18 +69,27 @@ fn dump_dht(dht: &Dht, filename: &Path, tempfile_name: &Path) -> anyhow::Result<
} }
impl PersistentDht { impl PersistentDht {
pub async fn create(config: Option<PersistentDhtConfig>) -> anyhow::Result<Dht> { pub fn default_persistence_filename() -> anyhow::Result<PathBuf> {
let dirs = get_configuration_directory("dht")?;
let path = dirs.cache_dir().join("dht.json");
Ok(path)
}
pub async fn create(
config: Option<PersistentDhtConfig>,
cancellation_token: Option<CancellationToken>,
) -> anyhow::Result<Dht> {
let mut config = config.unwrap_or_default(); let mut config = config.unwrap_or_default();
let config_filename = match config.config_filename.take() { let config_filename = match config.config_filename.take() {
Some(config_filename) => config_filename, Some(config_filename) => config_filename,
None => { None => Self::default_persistence_filename()?,
let dirs = get_configuration_directory("dht")?;
let path = dirs.cache_dir().join("dht.json");
info!("will store DHT routing table to {:?} periodically", &path);
path
}
}; };
info!(
"will store DHT routing table to {:?} periodically",
&config_filename
);
if let Some(parent) = config_filename.parent() { if let Some(parent) = config_filename.parent() {
std::fs::create_dir_all(parent) std::fs::create_dir_all(parent)
.with_context(|| format!("error creating dir {:?}", &parent))?; .with_context(|| format!("error creating dir {:?}", &parent))?;
@ -117,34 +127,41 @@ impl PersistentDht {
routing_table, routing_table,
listen_addr, listen_addr,
peer_store, peer_store,
cancellation_token,
..Default::default() ..Default::default()
}; };
let dht = DhtState::with_config(dht_config).await?; let dht = DhtState::with_config(dht_config).await?;
spawn_with_cancel(
error_span!("dht_persistence"),
dht.cancellation_token().clone(),
{
let dht = dht.clone();
let dump_interval = config
.dump_interval
.unwrap_or_else(|| Duration::from_secs(3));
async move {
let tempfile_name = {
let file_name = format!("dht.json.tmp.{}", std::process::id());
let mut tmp = config_filename.clone();
tmp.set_file_name(file_name);
tmp
};
spawn(error_span!("dht_persistence"), { loop {
let dht = dht.clone(); trace!("sleeping for {:?}", &dump_interval);
let dump_interval = config tokio::time::sleep(dump_interval).await;
.dump_interval
.unwrap_or_else(|| Duration::from_secs(3));
async move {
let tempfile_name = {
let file_name = format!("dht.json.tmp.{}", std::process::id());
let mut tmp = config_filename.clone();
tmp.set_file_name(file_name);
tmp
};
loop { match dump_dht(&dht, &config_filename, &tempfile_name) {
trace!("sleeping for {:?}", &dump_interval); Ok(_) => debug!("dumped DHT to {:?}", &config_filename),
tokio::time::sleep(dump_interval).await; Err(e) => {
error!("error dumping DHT to {:?}: {:#}", &config_filename, e)
match dump_dht(&dht, &config_filename, &tempfile_name) { }
Ok(_) => debug!("dumped DHT to {:?}", &config_filename), }
Err(e) => error!("error dumping DHT to {:?}: {:#}", &config_filename, e),
} }
} }
} },
}); );
Ok(dht) Ok(dht)
} }
} }

View file

@ -1,6 +1,6 @@
[package] [package]
name = "librqbit" name = "librqbit"
version = "5.0.0-beta.0" version = "5.0.0-beta.1"
authors = ["Igor Katson <igor.katson@gmail.com>"] authors = ["Igor Katson <igor.katson@gmail.com>"]
edition = "2021" edition = "2021"
description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it." description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it."
@ -28,7 +28,7 @@ librqbit-core = {path = "../librqbit_core", version = "3.3.0"}
clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.3.0"} peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.3.0"}
sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"} sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"}
dht = {path = "../dht", package="librqbit-dht", version="4.1.0"} dht = {path = "../dht", package="librqbit-dht", version="5.0.0-beta.1"}
librqbit-upnp = {path = "../upnp", version = "0.1.0"} librqbit-upnp = {path = "../upnp", version = "0.1.0"}
tokio = {version = "1", features = ["macros", "rt-multi-thread"]} tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
@ -64,8 +64,9 @@ backoff = "0.4.0"
dashmap = "5.5.3" dashmap = "5.5.3"
base64 = "0.21.5" base64 = "0.21.5"
serde_with = "3.4.0" serde_with = "3.4.0"
tokio-util = "0.7.10"
[dev-dependencies] [dev-dependencies]
futures = {version = "0.3"} futures = {version = "0.3"}
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
tokio-test = "0.4" tokio-test = "0.4"

View file

@ -26,6 +26,7 @@ pub type Result<T> = std::result::Result<T, ApiError>;
/// Library API for use in different web frameworks. /// Library API for use in different web frameworks.
/// Contains all methods you might want to expose with (de)serializable inputs/outputs. /// Contains all methods you might want to expose with (de)serializable inputs/outputs.
#[derive(Clone)]
pub struct Api { pub struct Api {
session: Arc<Session>, session: Arc<Session>,
rust_log_reload_tx: Option<UnboundedSender<String>>, rust_log_reload_tx: Option<UnboundedSender<String>>,
@ -39,6 +40,10 @@ impl Api {
} }
} }
pub fn session(&self) -> &Arc<Session> {
&self.session
}
pub fn mgr_handle(&self, idx: TorrentId) -> Result<ManagedTorrentHandle> { pub fn mgr_handle(&self, idx: TorrentId) -> Result<ManagedTorrentHandle> {
self.session self.session
.get(idx) .get(idx)

View file

@ -27,6 +27,14 @@ impl ApiError {
} }
} }
pub const fn new_from_text(status: StatusCode, text: &'static str) -> Self {
Self {
status: Some(status),
kind: ApiErrorKind::Text(text),
plaintext: false,
}
}
#[allow(dead_code)] #[allow(dead_code)]
pub fn not_implemented(msg: &str) -> Self { pub fn not_implemented(msg: &str) -> Self {
Self { Self {
@ -69,6 +77,7 @@ impl ApiError {
enum ApiErrorKind { enum ApiErrorKind {
TorrentNotFound(usize), TorrentNotFound(usize),
DhtDisabled, DhtDisabled,
Text(&'static str),
Other(anyhow::Error), Other(anyhow::Error),
} }
@ -91,6 +100,7 @@ impl Serialize for ApiError {
ApiErrorKind::TorrentNotFound(_) => "torrent_not_found", ApiErrorKind::TorrentNotFound(_) => "torrent_not_found",
ApiErrorKind::DhtDisabled => "dht_disabled", ApiErrorKind::DhtDisabled => "dht_disabled",
ApiErrorKind::Other(_) => "internal_error", ApiErrorKind::Other(_) => "internal_error",
ApiErrorKind::Text(_) => "internal_error",
}, },
human_readable: format!("{self}"), human_readable: format!("{self}"),
status: self.status().as_u16(), status: self.status().as_u16(),
@ -130,6 +140,7 @@ impl std::fmt::Display for ApiError {
ApiErrorKind::TorrentNotFound(idx) => write!(f, "torrent {idx} not found"), ApiErrorKind::TorrentNotFound(idx) => write!(f, "torrent {idx} not found"),
ApiErrorKind::Other(err) => write!(f, "{err:?}"), ApiErrorKind::Other(err) => write!(f, "{err:?}"),
ApiErrorKind::DhtDisabled => write!(f, "DHT is disabled"), ApiErrorKind::DhtDisabled => write!(f, "DHT is disabled"),
ApiErrorKind::Text(t) => write!(f, "{t}"),
} }
} }
} }

View file

@ -108,6 +108,7 @@ mod tests {
let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap(); let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap();
let dht = DhtBuilder::new().await.unwrap(); let dht = DhtBuilder::new().await.unwrap();
let peer_rx = dht.get_peers(info_hash, None).unwrap(); let peer_rx = dht.get_peers(info_hash, None).unwrap();
let peer_id = generate_peer_id(); let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver(peer_id, info_hash, Vec::new(), peer_rx, None).await match read_metainfo_from_peer_receiver(peer_id, info_hash, Vec::new(), peer_rx, None).await

View file

@ -66,6 +66,7 @@ impl HttpApi {
"GET /web/": "Web UI", "GET /web/": "Web UI",
}, },
"server": "rqbit", "server": "rqbit",
"version": env!("CARGO_PKG_VERSION"),
})) }))
} }

View file

@ -21,6 +21,7 @@ use librqbit_core::{
directories::get_configuration_directory, directories::get_configuration_directory,
magnet::Magnet, magnet::Magnet,
peer_id::generate_peer_id, peer_id::generate_peer_id,
spawn_utils::spawn_with_cancel,
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned}, torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned},
}; };
use parking_lot::RwLock; use parking_lot::RwLock;
@ -32,12 +33,13 @@ use tokio::{
io::AsyncReadExt, io::AsyncReadExt,
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
}; };
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use tracing::{debug, error, error_span, info, trace, warn, Instrument};
use crate::{ use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
peer_connection::{with_timeout, PeerConnectionOptions}, peer_connection::{with_timeout, PeerConnectionOptions},
spawn_utils::{spawn, BlockingSpawner}, spawn_utils::BlockingSpawner,
torrent_state::{ torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
}, },
@ -161,8 +163,7 @@ pub struct Session {
tcp_listen_port: Option<u16>, tcp_listen_port: Option<u16>,
cancel_tx: tokio::sync::watch::Sender<()>, cancellation_token: CancellationToken,
cancel_rx: tokio::sync::watch::Receiver<()>,
} }
async fn torrent_from_url(url: &str) -> anyhow::Result<TorrentMetaV1Owned> { async fn torrent_from_url(url: &str) -> anyhow::Result<TorrentMetaV1Owned> {
@ -373,12 +374,22 @@ impl Session {
Self::new_with_opts(output_folder, SessionOptions::default()).await Self::new_with_opts(output_folder, SessionOptions::default()).await
} }
pub fn default_persistence_filename() -> anyhow::Result<PathBuf> {
let dir = get_configuration_directory("session")?;
Ok(dir.data_dir().join("session.json"))
}
pub fn cancellation_token(&self) -> &CancellationToken {
&self.cancellation_token
}
/// Create a new session with options. /// Create a new session with options.
pub async fn new_with_opts( pub async fn new_with_opts(
output_folder: PathBuf, output_folder: PathBuf,
mut opts: SessionOptions, mut opts: SessionOptions,
) -> anyhow::Result<Arc<Self>> { ) -> anyhow::Result<Arc<Self>> {
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
let token = CancellationToken::new();
let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range { let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range {
let (l, p) = create_tcp_listener(port_range) let (l, p) = create_tcp_listener(port_range)
@ -394,25 +405,28 @@ impl Session {
None None
} else { } else {
let dht = if opts.disable_dht_persistence { let dht = if opts.disable_dht_persistence {
DhtBuilder::with_config(DhtConfig::default()).await DhtBuilder::with_config(DhtConfig {
cancellation_token: Some(token.child_token()),
..Default::default()
})
.await
.context("error initializing DHT")?
} else { } else {
let pdht_config = opts.dht_config.take().unwrap_or_default(); let pdht_config = opts.dht_config.take().unwrap_or_default();
PersistentDht::create(Some(pdht_config)).await PersistentDht::create(Some(pdht_config), Some(token.clone()))
} .await
.context("error initializing DHT")?; .context("error initializing persistent DHT")?
};
Some(dht) Some(dht)
}; };
let peer_opts = opts.peer_opts.unwrap_or_default(); let peer_opts = opts.peer_opts.unwrap_or_default();
let persistence_filename = match opts.persistence_filename { let persistence_filename = match opts.persistence_filename {
Some(filename) => filename, Some(filename) => filename,
None => get_configuration_directory("session")? None => Self::default_persistence_filename()?,
.data_dir()
.join("session.json"),
}; };
let spawner = BlockingSpawner::default(); let spawner = BlockingSpawner::default();
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(());
let session = Arc::new(Self { let session = Arc::new(Self {
persistence_filename, persistence_filename,
peer_id, peer_id,
@ -421,14 +435,12 @@ impl Session {
spawner, spawner,
output_folder, output_folder,
db: RwLock::new(Default::default()), db: RwLock::new(Default::default()),
cancel_rx, cancellation_token: token,
cancel_tx,
tcp_listen_port, tcp_listen_port,
}); });
if let Some(tcp_listener) = tcp_listener { if let Some(tcp_listener) = tcp_listener {
session.spawn( session.spawn(
"tcp listener",
error_span!("tcp_listen", port = tcp_listen_port), error_span!("tcp_listen", port = tcp_listen_port),
session.clone().task_tcp_listener(tcp_listener), session.clone().task_tcp_listener(tcp_listener),
); );
@ -437,7 +449,6 @@ impl Session {
if let Some(listen_port) = tcp_listen_port { if let Some(listen_port) = tcp_listen_port {
if opts.enable_upnp_port_forwarding { if opts.enable_upnp_port_forwarding {
session.spawn( session.spawn(
"upnp_forward",
error_span!("upnp_forward", port = listen_port), error_span!("upnp_forward", port = listen_port),
session.clone().task_upnp_port_forwarder(listen_port), session.clone().task_upnp_port_forwarder(listen_port),
); );
@ -455,11 +466,7 @@ impl Session {
})?; })?;
} }
let persistence_task = session.clone().task_persistence(); let persistence_task = session.clone().task_persistence();
session.spawn( session.spawn(error_span!("session_persistence"), persistence_task);
"session persistene",
error_span!("session_persistence"),
persistence_task,
);
} }
Ok(session) Ok(session)
@ -608,26 +615,32 @@ impl Session {
} }
} }
fn spawn( /// Spawn a task in the context of the session.
pub fn spawn(
&self, &self,
name: &str,
span: tracing::Span, span: tracing::Span,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static, fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) { ) {
let mut cancel_rx = self.cancel_rx.clone(); spawn_with_cancel(span, self.cancellation_token.clone(), fut);
spawn(name, span, async move {
tokio::select! {
r = fut => r,
_ = cancel_rx.changed() => {
debug!("task canceled");
Ok(())
}
}
});
} }
pub fn stop(&self) { /// Stop the session and all managed tasks.
let _ = self.cancel_tx.send(()); pub async fn stop(&self) {
let torrents = self
.db
.read()
.torrents
.values()
.cloned()
.collect::<Vec<_>>();
for torrent in torrents {
if let Err(e) = torrent.pause() {
debug!("error pausing torrent: {e:#}");
}
}
self.cancellation_token.cancel();
// this sucks, but hopefully will be enough
tokio::time::sleep(Duration::from_secs(1)).await;
} }
async fn populate_from_stored(self: &Arc<Self>) -> anyhow::Result<()> { async fn populate_from_stored(self: &Arc<Self>) -> anyhow::Result<()> {
@ -958,6 +971,7 @@ impl Session {
builder builder
.overwrite(opts.overwrite) .overwrite(opts.overwrite)
.spawner(self.spawner) .spawner(self.spawner)
.cancellation_token(self.cancellation_token.child_token())
.peer_id(self.peer_id); .peer_id(self.peer_id);
if opts.disable_trackers { if opts.disable_trackers {

View file

@ -65,6 +65,7 @@ use itertools::Itertools;
use librqbit_core::{ use librqbit_core::{
id20::Id20, id20::Id20,
lengths::{ChunkInfo, Lengths, ValidPieceIndex}, lengths::{ChunkInfo, Lengths, ValidPieceIndex},
spawn_utils::spawn_with_cancel,
speed_estimator::SpeedEstimator, speed_estimator::SpeedEstimator,
torrent_metainfo::TorrentMetaV1Info, torrent_metainfo::TorrentMetaV1Info,
}; };
@ -80,6 +81,7 @@ use tokio::{
}, },
time::timeout, time::timeout,
}; };
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, error_span, info, trace, warn}; use tracing::{debug, error, error_span, info, trace, warn};
use url::Url; use url::Url;
@ -90,7 +92,6 @@ use crate::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
}, },
session::CheckedIncomingConnection, session::CheckedIncomingConnection,
spawn_utils::spawn,
torrent_state::{peer::Peer, utils::atomic_inc}, torrent_state::{peer::Peer, utils::atomic_inc},
tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse},
type_aliases::{PeerHandle, BF}, type_aliases::{PeerHandle, BF},
@ -185,17 +186,16 @@ pub struct TorrentStateLive {
finished_notify: Notify, finished_notify: Notify,
cancel_tx: tokio::sync::watch::Sender<()>,
cancel_rx: tokio::sync::watch::Receiver<()>,
down_speed_estimator: SpeedEstimator, down_speed_estimator: SpeedEstimator,
up_speed_estimator: SpeedEstimator, up_speed_estimator: SpeedEstimator,
cancellation_token: CancellationToken,
} }
impl TorrentStateLive { impl TorrentStateLive {
pub(crate) fn new( pub(crate) fn new(
paused: TorrentStatePaused, paused: TorrentStatePaused,
fatal_errors_tx: tokio::sync::oneshot::Sender<anyhow::Error>, fatal_errors_tx: tokio::sync::oneshot::Sender<anyhow::Error>,
cancellation_token: CancellationToken,
) -> Arc<Self> { ) -> Arc<Self> {
let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
@ -206,8 +206,6 @@ impl TorrentStateLive {
let needed_bytes = paused.info.lengths.total_length() - have_bytes; let needed_bytes = paused.info.lengths.total_length() - have_bytes;
let lengths = *paused.chunk_tracker.get_lengths(); let lengths = *paused.chunk_tracker.get_lengths();
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(());
let state = Arc::new(TorrentStateLive { let state = Arc::new(TorrentStateLive {
meta: paused.info.clone(), meta: paused.info.clone(),
peers: Default::default(), peers: Default::default(),
@ -229,20 +227,17 @@ impl TorrentStateLive {
finished_notify: Notify::new(), finished_notify: Notify::new(),
down_speed_estimator, down_speed_estimator,
up_speed_estimator, up_speed_estimator,
cancel_rx, cancellation_token,
cancel_tx,
}); });
for tracker in state.meta.trackers.iter() { for tracker in state.meta.trackers.iter() {
state.spawn( state.spawn(
"tracker_monitor",
error_span!(parent: state.meta.span.clone(), "tracker_monitor", url = tracker.to_string()), error_span!(parent: state.meta.span.clone(), "tracker_monitor", url = tracker.to_string()),
state.clone().task_single_tracker_monitor(tracker.clone()), state.clone().task_single_tracker_monitor(tracker.clone()),
); );
} }
state.spawn( state.spawn(
"speed_estimator_updater",
error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"),
{ {
let state = Arc::downgrade(&state); let state = Arc::downgrade(&state);
@ -273,29 +268,18 @@ impl TorrentStateLive {
); );
state.spawn( state.spawn(
"peer_adder",
error_span!(parent: state.meta.span.clone(), "peer_adder"), error_span!(parent: state.meta.span.clone(), "peer_adder"),
state.clone().task_peer_adder(peer_queue_rx), state.clone().task_peer_adder(peer_queue_rx),
); );
state state
} }
fn spawn( pub(crate) fn spawn(
&self, &self,
name: &str,
span: tracing::Span, span: tracing::Span,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static, fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) { ) {
let mut cancel_rx = self.cancel_rx.clone(); spawn_with_cancel(span, self.cancellation_token.clone(), fut);
spawn(name, span, async move {
tokio::select! {
r = fut => r,
_ = cancel_rx.changed() => {
debug!("task canceled");
Ok(())
}
}
});
} }
pub fn down_speed_estimator(&self) -> &SpeedEstimator { pub fn down_speed_estimator(&self) -> &SpeedEstimator {
@ -418,7 +402,6 @@ impl TorrentStateLive {
atomic_inc(&counters.incoming_connections); atomic_inc(&counters.incoming_connections);
self.spawn( self.spawn(
"incoming peer",
error_span!( error_span!(
parent: self.meta.span.clone(), parent: self.meta.span.clone(),
"manage_incoming_peer", "manage_incoming_peer",
@ -570,7 +553,6 @@ impl TorrentStateLive {
let permit = state.peer_semaphore.clone().acquire_owned().await?; let permit = state.peer_semaphore.clone().acquire_owned().await?;
state.spawn( state.spawn(
"manage_peer",
error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()), error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()),
state.clone().task_manage_outgoing_peer(addr, permit), state.clone().task_manage_outgoing_peer(addr, permit),
); );
@ -682,7 +664,6 @@ impl TorrentStateLive {
// We don't want to remember this task as there may be too many. // We don't want to remember this task as there may be too many.
self.spawn( self.spawn(
"transmit_haves",
error_span!( error_span!(
parent: self.meta.span.clone(), parent: self.meta.span.clone(),
"transmit_haves", "transmit_haves",
@ -744,7 +725,7 @@ impl TorrentStateLive {
} }
pub fn pause(&self) -> anyhow::Result<TorrentStatePaused> { pub fn pause(&self) -> anyhow::Result<TorrentStatePaused> {
let _ = self.cancel_tx.send(()); self.cancellation_token.cancel();
let mut g = self.locked.write(); let mut g = self.locked.write();
@ -856,7 +837,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
} }
Message::Have(h) => self.on_have(h), Message::Have(h) => self.on_have(h),
Message::NotInterested => { Message::NotInterested => {
debug!("received \"not interested\", but we don't care yet") debug!("received \"not interested\", but we don't process it yet")
}
Message::Cancel(_) => {
debug!("received \"cancel\", but we don't process it yet")
} }
message => { message => {
warn!("received unsupported message {:?}, ignoring", message); warn!("received unsupported message {:?}, ignoring", message);
@ -968,7 +952,6 @@ impl PeerHandler {
if let Some(dur) = backoff { if let Some(dur) = backoff {
self.state.clone().spawn( self.state.clone().spawn(
"wait_for_peer",
error_span!( error_span!(
parent: self.state.meta.span.clone(), parent: self.state.meta.span.clone(),
"wait_for_peer", "wait_for_peer",

View file

@ -20,12 +20,14 @@ use librqbit_core::id20::Id20;
use librqbit_core::lengths::Lengths; use librqbit_core::lengths::Lengths;
use librqbit_core::peer_id::generate_peer_id; use librqbit_core::peer_id::generate_peer_id;
use librqbit_core::spawn_utils::spawn_with_cancel;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
pub use live::*; pub use live::*;
use parking_lot::RwLock; use parking_lot::RwLock;
use tokio::time::timeout; use tokio::time::timeout;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::debug; use tracing::debug;
use tracing::error_span; use tracing::error_span;
use tracing::trace; use tracing::trace;
@ -33,7 +35,6 @@ use tracing::warn;
use url::Url; use url::Url;
use crate::chunk_tracker::ChunkTracker; use crate::chunk_tracker::ChunkTracker;
use crate::spawn_utils::spawn;
use crate::spawn_utils::BlockingSpawner; use crate::spawn_utils::BlockingSpawner;
use crate::torrent_state::stats::LiveStats; use crate::torrent_state::stats::LiveStats;
@ -91,6 +92,7 @@ pub struct ManagedTorrentInfo {
pub struct ManagedTorrent { pub struct ManagedTorrent {
pub info: Arc<ManagedTorrentInfo>, pub info: Arc<ManagedTorrentInfo>,
pub cancellation_token: CancellationToken,
pub(crate) only_files: Option<Vec<usize>>, pub(crate) only_files: Option<Vec<usize>>,
locked: RwLock<ManagedTorrentLocked>, locked: RwLock<ManagedTorrentLocked>,
} }
@ -179,10 +181,11 @@ impl ManagedTorrent {
let spawn_fatal_errors_receiver = let spawn_fatal_errors_receiver =
|state: &Arc<Self>, rx: tokio::sync::oneshot::Receiver<anyhow::Error>| { |state: &Arc<Self>, rx: tokio::sync::oneshot::Receiver<anyhow::Error>| {
let span = state.info.span.clone(); let span = state.info.span.clone();
let token = state.cancellation_token.clone();
let state = Arc::downgrade(state); let state = Arc::downgrade(state);
spawn( spawn_with_cancel(
"fatal_errors_receiver",
error_span!(parent: span, "fatal_errors_receiver"), error_span!(parent: span, "fatal_errors_receiver"),
token,
async move { async move {
let e = match rx.await { let e = match rx.await {
Ok(e) => e, Ok(e) => e,
@ -191,7 +194,7 @@ impl ManagedTorrent {
if let Some(state) = state.upgrade() { if let Some(state) = state.upgrade() {
state.stop_with_error(e); state.stop_with_error(e);
} else { } else {
warn!("tried to stop the torrent with error, but it's couldn't upgrade the arc"); warn!("tried to stop the torrent with error, but couldn't upgrade the arc");
} }
Ok(()) Ok(())
}, },
@ -203,40 +206,42 @@ impl ManagedTorrent {
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
peer_rx: Option<RequestPeersStream>, peer_rx: Option<RequestPeersStream>,
) { ) {
let span = live.meta().span.clone(); live.spawn(
let live = Arc::downgrade(live); error_span!(parent: live.meta().span.clone(), "external_peer_adder"),
spawn( {
"external_peer_adder", let live = live.clone();
error_span!(parent: span, "external_peer_adder"), async move {
async move {
{
let live: Arc<TorrentStateLive> =
live.upgrade().context("no longer live")?;
trace!("adding {} initial peers", initial_peers.len()); trace!("adding {} initial peers", initial_peers.len());
for peer in initial_peers { for peer in initial_peers {
live.add_peer_if_not_seen(peer).context("torrent closed")?; live.add_peer_if_not_seen(peer).context("torrent closed")?;
} }
}
let mut peer_rx = if let Some(peer_rx) = peer_rx { let live = {
peer_rx let weak = Arc::downgrade(&live);
} else { drop(live);
return Ok(()); weak
}; };
loop { let mut peer_rx = if let Some(peer_rx) = peer_rx {
match timeout(Duration::from_secs(5), peer_rx.next()).await { peer_rx
Ok(Some(peer)) => { } else {
let live = match live.upgrade() { return Ok(());
Some(live) => live, };
None => return Ok(()),
}; loop {
live.add_peer_if_not_seen(peer).context("torrent closed")?; match timeout(Duration::from_secs(5), peer_rx.next()).await {
Ok(Some(peer)) => {
let live = match live.upgrade() {
Some(live) => live,
None => return Ok(()),
};
live.add_peer_if_not_seen(peer).context("torrent closed")?;
}
Ok(None) => return Ok(()),
// If timeout, check if the torrent is live.
Err(_) if live.strong_count() == 0 => return Ok(()),
Err(_) => continue,
} }
Ok(None) => return Ok(()),
// If timeout, check if the torrent is live.
Err(_) if live.strong_count() == 0 => return Ok(()),
Err(_) => continue,
} }
} }
}, },
@ -252,9 +257,10 @@ impl ManagedTorrent {
drop(g); drop(g);
let t = self.clone(); let t = self.clone();
let span = self.info().span.clone(); let span = self.info().span.clone();
spawn( let token = self.cancellation_token.clone();
"initialize_and_start", spawn_with_cancel(
error_span!(parent: span.clone(), "initialize_and_start"), error_span!(parent: span.clone(), "initialize_and_start"),
token.clone(),
async move { async move {
match init.check().await { match init.check().await {
Ok(paused) => { Ok(paused) => {
@ -271,7 +277,7 @@ impl ManagedTorrent {
} }
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx); let live = TorrentStateLive::new(paused, tx, token.child_token());
g.state = ManagedTorrentState::Live(live.clone()); g.state = ManagedTorrentState::Live(live.clone());
spawn_fatal_errors_receiver(&t, rx); spawn_fatal_errors_receiver(&t, rx);
@ -292,7 +298,11 @@ impl ManagedTorrent {
ManagedTorrentState::Paused(_) => { ManagedTorrentState::Paused(_) => {
let paused = g.state.take().assert_paused(); let paused = g.state.take().assert_paused();
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx); let live = TorrentStateLive::new(
paused,
tx,
self.cancellation_token.child_token().clone(),
);
g.state = ManagedTorrentState::Live(live.clone()); g.state = ManagedTorrentState::Live(live.clone());
spawn_fatal_errors_receiver(self, rx); spawn_fatal_errors_receiver(self, rx);
spawn_peer_adder(&live, initial_peers, peer_rx); spawn_peer_adder(&live, initial_peers, peer_rx);
@ -409,6 +419,7 @@ pub struct ManagedTorrentBuilder {
peer_id: Option<Id20>, peer_id: Option<Id20>,
overwrite: bool, overwrite: bool,
spawner: Option<BlockingSpawner>, spawner: Option<BlockingSpawner>,
cancellation_token: Option<CancellationToken>,
} }
impl ManagedTorrentBuilder { impl ManagedTorrentBuilder {
@ -429,9 +440,15 @@ impl ManagedTorrentBuilder {
trackers: Default::default(), trackers: Default::default(),
peer_id: None, peer_id: None,
overwrite: false, overwrite: false,
cancellation_token: None,
} }
} }
pub fn cancellation_token(&mut self, token: CancellationToken) -> &mut Self {
self.cancellation_token = Some(token);
self
}
pub fn only_files(&mut self, only_files: Vec<usize>) -> &mut Self { pub fn only_files(&mut self, only_files: Vec<usize>) -> &mut Self {
self.only_files = Some(only_files); self.only_files = Some(only_files);
self self
@ -472,7 +489,7 @@ impl ManagedTorrentBuilder {
self self
} }
pub(crate) fn build(self, span: tracing::Span) -> anyhow::Result<ManagedTorrentHandle> { pub(crate) fn build(mut self, span: tracing::Span) -> anyhow::Result<ManagedTorrentHandle> {
let lengths = Lengths::from_torrent(&self.info)?; let lengths = Lengths::from_torrent(&self.info)?;
let info = Arc::new(ManagedTorrentInfo { let info = Arc::new(ManagedTorrentInfo {
span, span,
@ -499,6 +516,7 @@ impl ManagedTorrentBuilder {
locked: RwLock::new(ManagedTorrentLocked { locked: RwLock::new(ManagedTorrentLocked {
state: ManagedTorrentState::Initializing(initializing), state: ManagedTorrentState::Initializing(initializing),
}), }),
cancellation_token: self.cancellation_token.take().unwrap_or_default(),
info, info,
})) }))
} }

File diff suppressed because one or more lines are too long

View file

@ -4,7 +4,7 @@
"src": "assets/logo.svg" "src": "assets/logo.svg"
}, },
"index.html": { "index.html": {
"file": "assets/index-687608b7.js", "file": "assets/index-f791e636.js",
"isEntry": true, "isEntry": true,
"src": "index.html" "src": "index.html"
} }

View file

@ -46,7 +46,7 @@ const makeRequest = async (method: string, path: string, data?: any): Promise<an
return result; return result;
} }
export const API: RqbitAPI = { export const API: RqbitAPI & { getVersion: () => Promise<string> } = {
listTorrents: (): Promise<ListTorrentsResponse> => makeRequest('GET', '/torrents'), listTorrents: (): Promise<ListTorrentsResponse> => makeRequest('GET', '/torrents'),
getTorrentDetails: (index: number): Promise<TorrentDetails> => { getTorrentDetails: (index: number): Promise<TorrentDetails> => {
return makeRequest('GET', `/torrents/${index}`); return makeRequest('GET', `/torrents/${index}`);
@ -95,5 +95,9 @@ export const API: RqbitAPI = {
delete: (index: number): Promise<void> => { delete: (index: number): Promise<void> => {
return makeRequest('POST', `/torrents/${index}/delete`); return makeRequest('POST', `/torrents/${index}/delete`);
},
getVersion: async (): Promise<string> => {
const r = await makeRequest('GET', '/');
return r.version;
} }
} }

View file

@ -1,12 +1,27 @@
import { StrictMode } from "react"; import { StrictMode, useEffect, useState } from "react";
import ReactDOM from 'react-dom/client'; import ReactDOM from 'react-dom/client';
import { RqbitWebUI, APIContext } from "./rqbit-web"; import { RqbitWebUI, APIContext, customSetInterval } from "./rqbit-web";
import { API } from "./http-api"; import { API } from "./http-api";
ReactDOM.createRoot(document.getElementById('app') as HTMLInputElement).render( const RootWithVersion = () => {
<StrictMode> let [title, setTitle] = useState<string>("rqbit web UI");
useEffect(() => {
const refreshVersion = () => API.getVersion().then((version) => {
setTitle(`rqbit web UI - v${version}`);
return 10000;
}, (e) => {
return 1000;
});
return customSetInterval(refreshVersion, 0)
}, [])
return <StrictMode>
<APIContext.Provider value={API}> <APIContext.Provider value={API}>
<RqbitWebUI title="rqbit web UI - v5.0.0-beta.0" /> <RqbitWebUI title={title} />
</APIContext.Provider> </APIContext.Provider>
</StrictMode> </StrictMode>;
}
ReactDOM.createRoot(document.getElementById('app') as HTMLInputElement).render(
<RootWithVersion />
); );

View file

@ -2,7 +2,7 @@ import { MouseEventHandler, RefObject, createContext, useContext, useEffect, use
import { ProgressBar, Button, Container, Row, Col, Alert, Modal, Form, Spinner } from 'react-bootstrap'; import { ProgressBar, Button, Container, Row, Col, Alert, Modal, Form, Spinner } from 'react-bootstrap';
import { AddTorrentResponse, TorrentDetails, TorrentId, TorrentStats, ErrorDetails as ApiErrorDetails, STATE_INITIALIZING, STATE_LIVE, STATE_PAUSED, STATE_ERROR, RqbitAPI, AddTorrentOptions } from './api-types'; import { AddTorrentResponse, TorrentDetails, TorrentId, TorrentStats, ErrorDetails as ApiErrorDetails, STATE_INITIALIZING, STATE_LIVE, STATE_PAUSED, STATE_ERROR, RqbitAPI, AddTorrentOptions } from './api-types';
interface Error { export interface Error {
text: string, text: string,
details?: ApiErrorDetails, details?: ApiErrorDetails,
} }
@ -409,7 +409,7 @@ const ErrorDetails = (props: { details: ApiErrorDetails | null | undefined }) =>
</> </>
} }
const ErrorComponent = (props: { error: Error | null, remove?: () => void }) => { export const ErrorComponent = (props: { error: Error | null, remove?: () => void }) => {
let { error, remove } = props; let { error, remove } = props;
if (error == null) { if (error == null) {
@ -786,7 +786,7 @@ function formatSecondsToTime(seconds: number): string {
// Run a function with initial interval, then run it forever with the interval that the // Run a function with initial interval, then run it forever with the interval that the
// callback returns. // callback returns.
// Returns a callback to clear it. // Returns a callback to clear it.
function customSetInterval(asyncCallback: () => Promise<number>, initialInterval: number): () => void { export function customSetInterval(asyncCallback: () => Promise<number>, initialInterval: number): () => void {
let timeoutId: number; let timeoutId: number;
let currentInterval: number = initialInterval; let currentInterval: number = initialInterval;
@ -809,7 +809,7 @@ function customSetInterval(asyncCallback: () => Promise<number>, initialInterval
}; };
} }
function loopUntilSuccess<T>(callback: () => Promise<T>, interval: number): () => void { export function loopUntilSuccess<T>(callback: () => Promise<T>, interval: number): () => void {
let timeoutId: number; let timeoutId: number;
const executeCallback = async () => { const executeCallback = async () => {

View file

@ -30,6 +30,7 @@ bencode = {path = "../bencode", default-features=false, package="librqbit-bencod
clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
itertools = "0.12" itertools = "0.12"
directories = "5" directories = "5"
tokio-util = "0.7.10"
[dev-dependencies] [dev-dependencies]
serde_json = "1" serde_json = "1"

View file

@ -1,3 +1,5 @@
use anyhow::bail;
use tokio_util::sync::CancellationToken;
use tracing::{error, trace, Instrument}; use tracing::{error, trace, Instrument};
/// Spawns a future with tracing instrumentation. /// Spawns a future with tracing instrumentation.
@ -32,3 +34,18 @@ pub fn spawn(
.instrument(span); .instrument(span);
tokio::task::spawn(fut) tokio::task::spawn(fut)
} }
pub fn spawn_with_cancel(
span: tracing::Span,
cancellation_token: CancellationToken,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) -> tokio::task::JoinHandle<()> {
spawn(span, async move {
tokio::select! {
_ = cancellation_token.cancelled() => {
bail!("cancelled");
},
r = fut => r
}
})
}

View file

@ -40,6 +40,7 @@ const MSGID_HAVE: u8 = 4;
const MSGID_BITFIELD: u8 = 5; const MSGID_BITFIELD: u8 = 5;
const MSGID_REQUEST: u8 = 6; const MSGID_REQUEST: u8 = 6;
const MSGID_PIECE: u8 = 7; const MSGID_PIECE: u8 = 7;
const MSGID_CANCEL: u8 = 8;
const MSGID_EXTENDED: u8 = 20; const MSGID_EXTENDED: u8 = 20;
pub const MY_EXTENDED_UT_METADATA: u8 = 3; pub const MY_EXTENDED_UT_METADATA: u8 = 3;
@ -169,6 +170,7 @@ impl From<anyhow::Error> for MessageDeserializeError {
#[derive(Debug)] #[derive(Debug)]
pub enum Message<ByteBuf: std::hash::Hash + Eq> { pub enum Message<ByteBuf: std::hash::Hash + Eq> {
Request(Request), Request(Request),
Cancel(Request),
Bitfield(ByteBuf), Bitfield(ByteBuf),
KeepAlive, KeepAlive,
Have(u32), Have(u32),
@ -200,6 +202,7 @@ where
fn clone_to_owned(&self) -> Self::Target { fn clone_to_owned(&self) -> Self::Target {
match self { match self {
Message::Request(req) => Message::Request(*req), Message::Request(req) => Message::Request(*req),
Message::Cancel(req) => Message::Cancel(*req),
Message::Bitfield(b) => Message::Bitfield(b.clone_to_owned()), Message::Bitfield(b) => Message::Bitfield(b.clone_to_owned()),
Message::Choke => Message::Choke, Message::Choke => Message::Choke,
Message::Unchoke => Message::Unchoke, Message::Unchoke => Message::Unchoke,
@ -240,7 +243,7 @@ where
{ {
pub fn len_prefix_and_msg_id(&self) -> (u32, u8) { pub fn len_prefix_and_msg_id(&self) -> (u32, u8) {
match self { match self {
Message::Request(_) => (LEN_PREFIX_REQUEST, MSGID_REQUEST), Message::Request(_) | Message::Cancel(_) => (LEN_PREFIX_REQUEST, MSGID_REQUEST),
Message::Bitfield(b) => (1 + b.as_ref().len() as u32, MSGID_BITFIELD), Message::Bitfield(b) => (1 + b.as_ref().len() as u32, MSGID_BITFIELD),
Message::Choke => (LEN_PREFIX_CHOKE, MSGID_CHOKE), Message::Choke => (LEN_PREFIX_CHOKE, MSGID_CHOKE),
Message::Unchoke => (LEN_PREFIX_UNCHOKE, MSGID_UNCHOKE), Message::Unchoke => (LEN_PREFIX_UNCHOKE, MSGID_UNCHOKE),
@ -270,7 +273,7 @@ where
let ser = bopts(); let ser = bopts();
match self { match self {
Message::Request(request) => { Message::Request(request) | Message::Cancel(request) => {
const MSG_LEN: usize = PREAMBLE_LEN + 12; const MSG_LEN: usize = PREAMBLE_LEN + 12;
out.resize(MSG_LEN, 0); out.resize(MSG_LEN, 0);
debug_assert_eq!(out[PREAMBLE_LEN..].len(), 12); debug_assert_eq!(out[PREAMBLE_LEN..].len(), 12);
@ -411,16 +414,28 @@ where
} }
} }
} }
MSGID_REQUEST => { MSGID_REQUEST | MSGID_CANCEL => {
let expected_len = 12; let expected_len = 12;
match rest.get(..expected_len) { match rest.get(..expected_len) {
Some(b) => { Some(b) => {
let request = decoder_config.deserialize::<Request>(b).unwrap(); let request = decoder_config.deserialize::<Request>(b).unwrap();
Ok((Message::Request(request), PREAMBLE_LEN + expected_len)) let req = if msg_id == MSGID_REQUEST {
Message::Request(request)
} else {
Message::Cancel(request)
};
Ok((req, PREAMBLE_LEN + expected_len))
} }
None => { None => {
let missing = expected_len - rest.len(); let missing = expected_len - rest.len();
Err(MessageDeserializeError::NotEnoughData(missing, "request")) Err(MessageDeserializeError::NotEnoughData(
missing,
if msg_id == MSGID_REQUEST {
"request"
} else {
"cancel"
},
))
} }
} }
} }

View file

@ -1,6 +1,6 @@
[package] [package]
name = "rqbit" name = "rqbit"
version = "5.0.0-beta.0" version = "5.0.0-beta.1"
authors = ["Igor Katson <igor.katson@gmail.com>"] authors = ["Igor Katson <igor.katson@gmail.com>"]
edition = "2021" edition = "2021"
description = "A bittorrent command line client and server." description = "A bittorrent command line client and server."
@ -23,7 +23,7 @@ default-tls = ["librqbit/default-tls"]
rust-tls = ["librqbit/rust-tls"] rust-tls = ["librqbit/rust-tls"]
[dependencies] [dependencies]
librqbit = {path="../librqbit", default-features=false, version = "5.0.0-beta.0"} librqbit = {path="../librqbit", default-features=false, version = "5.0.0-beta.1"}
tokio = {version = "1", features = ["macros", "rt-multi-thread"]} tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
console-subscriber = {version = "0.2", optional = true} console-subscriber = {version = "0.2", optional = true}
anyhow = "1" anyhow = "1"

View file

@ -501,6 +501,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
) )
.await .await
.context("error initializing rqbit session")?; .context("error initializing rqbit session")?;
librqbit_spawn( librqbit_spawn(
"stats_printer", "stats_printer",
trace_span!("stats_printer"), trace_span!("stats_printer"),

View file

@ -1867,7 +1867,7 @@ dependencies = [
[[package]] [[package]]
name = "librqbit" name = "librqbit"
version = "5.0.0-beta.0" version = "5.0.0-beta.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"axum", "axum",
@ -1900,6 +1900,7 @@ dependencies = [
"size_format", "size_format",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-util",
"tower-http", "tower-http",
"tracing", "tracing",
"url", "url",
@ -1944,6 +1945,7 @@ dependencies = [
"parking_lot", "parking_lot",
"serde", "serde",
"tokio", "tokio",
"tokio-util",
"tracing", "tracing",
"url", "url",
"uuid", "uuid",
@ -1951,7 +1953,7 @@ dependencies = [
[[package]] [[package]]
name = "librqbit-dht" name = "librqbit-dht"
version = "4.1.0" version = "5.0.0-beta.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"backoff", "backoff",
@ -1970,6 +1972,7 @@ dependencies = [
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-util",
"tracing", "tracing",
] ]
@ -3015,8 +3018,10 @@ dependencies = [
"directories", "directories",
"http 1.0.0", "http 1.0.0",
"librqbit", "librqbit",
"parking_lot",
"serde", "serde",
"serde_json", "serde_json",
"serde_with",
"tauri", "tauri",
"tauri-build", "tauri-build",
"tokio", "tokio",

View file

@ -24,6 +24,8 @@ http = "1.0.0"
directories = "5.0.1" directories = "5.0.1"
tracing-subscriber = {version = "0.3.18", features = ["env-filter"] } tracing-subscriber = {version = "0.3.18", features = ["env-filter"] }
tracing = "0.1" tracing = "0.1"
serde_with = "3.4.0"
parking_lot = "0.12.1"
[features] [features]
# this feature is used for production builds or when `devPath` points to the filesystem # this feature is used for production builds or when `devPath` points to the filesystem

View file

@ -0,0 +1,132 @@
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
path::PathBuf,
time::Duration,
};
use librqbit::{dht::PersistentDht, Session};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RqbitDesktopConfigDht {
pub disable: bool,
pub disable_persistence: bool,
pub persistence_filename: PathBuf,
}
impl Default for RqbitDesktopConfigDht {
fn default() -> Self {
Self {
disable: false,
disable_persistence: false,
persistence_filename: PersistentDht::default_persistence_filename().unwrap(),
}
}
}
#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub struct RqbitDesktopConfigTcpListen {
pub disable: bool,
pub min_port: u16,
pub max_port: u16,
}
impl Default for RqbitDesktopConfigTcpListen {
fn default() -> Self {
Self {
disable: false,
// TODO: use consts from librqbit
min_port: 4240,
max_port: 4260,
}
}
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RqbitDesktopConfigPersistence {
pub disable: bool,
pub filename: PathBuf,
}
impl Default for RqbitDesktopConfigPersistence {
fn default() -> Self {
Self {
disable: false,
filename: Session::default_persistence_filename().unwrap(),
}
}
}
#[serde_as]
#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub struct RqbitDesktopConfigPeerOpts {
#[serde_as(as = "serde_with::DurationSeconds")]
pub connect_timeout: Duration,
#[serde_as(as = "serde_with::DurationSeconds")]
pub read_write_timeout: Duration,
}
impl Default for RqbitDesktopConfigPeerOpts {
fn default() -> Self {
Self {
connect_timeout: Duration::from_secs(2),
read_write_timeout: Duration::from_secs(10),
}
}
}
#[serde_as]
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RqbitDesktopConfigHttpApi {
pub disable: bool,
pub listen_addr: SocketAddr,
pub read_only: bool,
}
impl Default for RqbitDesktopConfigHttpApi {
fn default() -> Self {
Self {
disable: Default::default(),
listen_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 3030)),
read_only: false,
}
}
}
#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct RqbitDesktopConfigUpnp {
pub disable: bool,
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RqbitDesktopConfig {
pub default_download_location: PathBuf,
pub dht: RqbitDesktopConfigDht,
pub tcp_listen: RqbitDesktopConfigTcpListen,
pub upnp: RqbitDesktopConfigUpnp,
pub persistence: RqbitDesktopConfigPersistence,
pub peer_opts: RqbitDesktopConfigPeerOpts,
pub http_api: RqbitDesktopConfigHttpApi,
}
impl Default for RqbitDesktopConfig {
fn default() -> Self {
let download_folder = directories::UserDirs::new()
.expect("directories::UserDirs::new()")
.download_dir()
.expect("download_dir()")
.to_path_buf();
Self {
default_download_location: download_folder,
dht: Default::default(),
tcp_listen: Default::default(),
upnp: Default::default(),
persistence: Default::default(),
peer_opts: Default::default(),
http_api: Default::default(),
}
}
}

View file

@ -1,24 +1,211 @@
// Prevents additional console window on Windows in release, DO NOT REMOVE!! // Prevents additional console window on Windows in release, DO NOT REMOVE!!
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] #![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
mod config;
use std::{
fs::{File, OpenOptions},
io::{BufReader, BufWriter},
sync::Arc,
};
use anyhow::Context; use anyhow::Context;
use config::RqbitDesktopConfig;
use http::StatusCode; use http::StatusCode;
use librqbit::{ use librqbit::{
api::{ api::{
ApiAddTorrentResponse, EmptyJsonResponse, TorrentDetailsResponse, TorrentListResponse, ApiAddTorrentResponse, EmptyJsonResponse, TorrentDetailsResponse, TorrentListResponse,
TorrentStats, TorrentStats,
}, },
librqbit_spawn, AddTorrent, AddTorrentOptions, Api, ApiError, Session, SessionOptions, dht::PersistentDhtConfig,
librqbit_spawn, AddTorrent, AddTorrentOptions, Api, ApiError, PeerConnectionOptions, Session,
SessionOptions,
}; };
use tracing::error_span; use parking_lot::RwLock;
use serde::Serialize;
use tracing::{error, error_span};
const ERR_NOT_CONFIGURED: ApiError =
ApiError::new_from_text(StatusCode::FAILED_DEPENDENCY, "not configured");
struct StateShared {
config: config::RqbitDesktopConfig,
api: Option<Api>,
}
type RustLogReloadTx = tokio::sync::mpsc::UnboundedSender<String>;
impl StateShared {}
struct State { struct State {
api: Api, config_filename: String,
shared: Arc<RwLock<Option<StateShared>>>,
rust_log_reload_tx: RustLogReloadTx,
}
fn read_config(path: &str) -> anyhow::Result<RqbitDesktopConfig> {
let rdr = BufReader::new(File::open(path)?);
Ok(serde_json::from_reader(rdr)?)
}
fn write_config(path: &str, config: &RqbitDesktopConfig) -> anyhow::Result<()> {
let tmp = format!("{}.tmp", path);
let mut tmp_file = BufWriter::new(
OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&tmp)?,
);
serde_json::to_writer(&mut tmp_file, config)?;
std::fs::rename(tmp, path)?;
Ok(())
}
async fn api_from_config(
rust_log_reload_tx: &RustLogReloadTx,
config: &RqbitDesktopConfig,
) -> anyhow::Result<Api> {
let session = Session::new_with_opts(
config.default_download_location.clone(),
SessionOptions {
disable_dht: config.dht.disable,
disable_dht_persistence: config.dht.disable_persistence,
dht_config: Some(PersistentDhtConfig {
config_filename: Some(config.dht.persistence_filename.clone()),
..Default::default()
}),
persistence: !config.persistence.disable,
persistence_filename: Some(config.persistence.filename.clone()),
peer_opts: Some(PeerConnectionOptions {
connect_timeout: Some(config.peer_opts.connect_timeout),
read_write_timeout: Some(config.peer_opts.read_write_timeout),
..Default::default()
}),
listen_port_range: if !config.tcp_listen.disable {
Some(config.tcp_listen.min_port..config.tcp_listen.max_port)
} else {
None
},
enable_upnp_port_forwarding: !config.upnp.disable,
..Default::default()
},
)
.await
.context("couldn't set up librqbit session")?;
let api = Api::new(session.clone(), None);
if !config.http_api.disable {
let http_api_task =
librqbit::http_api::HttpApi::new(session.clone(), Some(rust_log_reload_tx.clone()))
.make_http_api_and_run(config.http_api.listen_addr, config.http_api.read_only);
session.spawn(error_span!("http_api"), http_api_task);
}
Ok(api)
}
impl State {
async fn new(rust_log_reload_tx: tokio::sync::mpsc::UnboundedSender<String>) -> Self {
let config_filename = directories::ProjectDirs::from("com", "rqbit", "desktop")
.expect("directories::ProjectDirs::from")
.config_dir()
.to_str()
.expect("to_str()")
.to_owned();
if let Ok(config) = read_config(&config_filename) {
let api = api_from_config(&rust_log_reload_tx, &config).await.ok();
let shared = Arc::new(RwLock::new(Some(StateShared { config, api })));
return Self {
config_filename,
shared,
rust_log_reload_tx,
};
}
Self {
config_filename,
rust_log_reload_tx,
shared: Arc::new(RwLock::new(None)),
}
}
fn api(&self) -> Result<Api, ApiError> {
let g = self.shared.read();
match g.as_ref().and_then(|s| s.api.as_ref()) {
Some(api) => Ok(api.clone()),
None => Err(ERR_NOT_CONFIGURED),
}
}
async fn configure(&self, config: RqbitDesktopConfig) -> Result<(), ApiError> {
{
let g = self.shared.read();
if let Some(shared) = g.as_ref() {
if shared.api.is_some() && shared.config == config {
// The config didn't change, and the API is running, nothing to do.
return Ok(());
}
}
}
let existing = self.shared.write().as_mut().and_then(|s| s.api.take());
if let Some(api) = existing {
api.session().stop().await;
}
let api = api_from_config(&self.rust_log_reload_tx, &config).await?;
if let Err(e) = write_config(&self.config_filename, &config) {
error!("error writing config: {:#}", e);
}
let mut g = self.shared.write();
*g = Some(StateShared {
config,
api: Some(api),
});
Ok(())
}
}
#[derive(Default, Serialize)]
struct CurrentState {
config: Option<RqbitDesktopConfig>,
configured: bool,
} }
#[tauri::command] #[tauri::command]
fn torrents_list(state: tauri::State<State>) -> TorrentListResponse { fn config_default() -> config::RqbitDesktopConfig {
state.api.api_torrent_list() config::RqbitDesktopConfig::default()
}
#[tauri::command]
fn config_current(state: tauri::State<'_, State>) -> CurrentState {
let g = state.shared.read();
match &*g {
Some(s) => CurrentState {
config: Some(s.config.clone()),
configured: s.api.is_some(),
},
None => Default::default(),
}
}
#[tauri::command]
async fn config_change(
state: tauri::State<'_, State>,
config: RqbitDesktopConfig,
) -> Result<EmptyJsonResponse, ApiError> {
state.configure(config).await.map(|_| EmptyJsonResponse {})
}
#[tauri::command]
fn torrents_list(state: tauri::State<State>) -> Result<TorrentListResponse, ApiError> {
Ok(state.api()?.api_torrent_list())
} }
#[tauri::command] #[tauri::command]
@ -28,7 +215,7 @@ async fn torrent_create_from_url(
opts: Option<AddTorrentOptions>, opts: Option<AddTorrentOptions>,
) -> Result<ApiAddTorrentResponse, ApiError> { ) -> Result<ApiAddTorrentResponse, ApiError> {
state state
.api .api()?
.api_add_torrent(AddTorrent::Url(url.into()), opts) .api_add_torrent(AddTorrent::Url(url.into()), opts)
.await .await
} }
@ -45,7 +232,7 @@ async fn torrent_create_from_base64_file(
.context("invalid base64") .context("invalid base64")
.map_err(|e| ApiError::new_from_anyhow(StatusCode::BAD_REQUEST, e))?; .map_err(|e| ApiError::new_from_anyhow(StatusCode::BAD_REQUEST, e))?;
state state
.api .api()?
.api_add_torrent(AddTorrent::TorrentFileBytes(bytes.into()), opts) .api_add_torrent(AddTorrent::TorrentFileBytes(bytes.into()), opts)
.await .await
} }
@ -55,7 +242,7 @@ async fn torrent_details(
state: tauri::State<'_, State>, state: tauri::State<'_, State>,
id: usize, id: usize,
) -> Result<TorrentDetailsResponse, ApiError> { ) -> Result<TorrentDetailsResponse, ApiError> {
state.api.api_torrent_details(id) state.api()?.api_torrent_details(id)
} }
#[tauri::command] #[tauri::command]
@ -63,7 +250,7 @@ async fn torrent_stats(
state: tauri::State<'_, State>, state: tauri::State<'_, State>,
id: usize, id: usize,
) -> Result<TorrentStats, ApiError> { ) -> Result<TorrentStats, ApiError> {
state.api.api_stats_v1(id) state.api()?.api_stats_v1(id)
} }
#[tauri::command] #[tauri::command]
@ -71,7 +258,7 @@ async fn torrent_action_delete(
state: tauri::State<'_, State>, state: tauri::State<'_, State>,
id: usize, id: usize,
) -> Result<EmptyJsonResponse, ApiError> { ) -> Result<EmptyJsonResponse, ApiError> {
state.api.api_torrent_action_delete(id) state.api()?.api_torrent_action_delete(id)
} }
#[tauri::command] #[tauri::command]
@ -79,7 +266,7 @@ async fn torrent_action_pause(
state: tauri::State<'_, State>, state: tauri::State<'_, State>,
id: usize, id: usize,
) -> Result<EmptyJsonResponse, ApiError> { ) -> Result<EmptyJsonResponse, ApiError> {
state.api.api_torrent_action_pause(id) state.api()?.api_torrent_action_pause(id)
} }
#[tauri::command] #[tauri::command]
@ -87,7 +274,7 @@ async fn torrent_action_forget(
state: tauri::State<'_, State>, state: tauri::State<'_, State>,
id: usize, id: usize,
) -> Result<EmptyJsonResponse, ApiError> { ) -> Result<EmptyJsonResponse, ApiError> {
state.api.api_torrent_action_forget(id) state.api()?.api_torrent_action_forget(id)
} }
#[tauri::command] #[tauri::command]
@ -95,7 +282,7 @@ async fn torrent_action_start(
state: tauri::State<'_, State>, state: tauri::State<'_, State>,
id: usize, id: usize,
) -> Result<EmptyJsonResponse, ApiError> { ) -> Result<EmptyJsonResponse, ApiError> {
state.api.api_torrent_action_start(id) state.api()?.api_torrent_action_start(id)
} }
#[tauri::command] #[tauri::command]
@ -133,41 +320,14 @@ fn init_logging() -> tokio::sync::mpsc::UnboundedSender<String> {
reload_tx reload_tx
} }
async fn start_session() { async fn start() {
tauri::async_runtime::set(tokio::runtime::Handle::current());
let rust_log_reload_tx = init_logging(); let rust_log_reload_tx = init_logging();
tauri::async_runtime::set(tokio::runtime::Handle::current()); let state = State::new(rust_log_reload_tx).await;
let download_folder = directories::UserDirs::new()
.expect("directories::UserDirs::new()")
.download_dir()
.expect("download_dir()")
.to_path_buf();
let session = Session::new_with_opts(
download_folder,
SessionOptions {
disable_dht: false,
disable_dht_persistence: false,
persistence: true,
listen_port_range: Some(4240..4260),
..Default::default()
},
)
.await
.expect("couldn't set up librqbit session");
let api = Api::new(session.clone(), None);
librqbit_spawn(
"http api",
error_span!("http_api"),
librqbit::http_api::HttpApi::new(session, Some(rust_log_reload_tx))
.make_http_api_and_run("127.0.0.1:3030".parse().unwrap(), false),
);
tauri::Builder::default() tauri::Builder::default()
.manage(State { api }) .manage(state)
.invoke_handler(tauri::generate_handler![ .invoke_handler(tauri::generate_handler![
torrents_list, torrents_list,
torrent_details, torrent_details,
@ -178,7 +338,10 @@ async fn start_session() {
torrent_action_forget, torrent_action_forget,
torrent_action_start, torrent_action_start,
torrent_create_from_base64_file, torrent_create_from_base64_file,
get_version get_version,
config_default,
config_current,
config_change,
]) ])
.run(tauri::generate_context!()) .run(tauri::generate_context!())
.expect("error while running tauri application"); .expect("error while running tauri application");
@ -189,5 +352,5 @@ fn main() {
.enable_all() .enable_all()
.build() .build()
.expect("couldn't set up tokio runtime") .expect("couldn't set up tokio runtime")
.block_on(start_session()) .block_on(start())
} }

View file

@ -23,7 +23,7 @@ function errorToUIError(path: string): (e: InvokeErrorResponse) => Promise<never
} }
} }
async function invokeAPI<Response>(name: string, params?: InvokeArgs): Promise<Response> { export async function invokeAPI<Response>(name: string, params?: InvokeArgs): Promise<Response> {
console.log("invoking", name, params); console.log("invoking", name, params);
const result = await invoke<Response>(name, params).catch(errorToUIError(name)); const result = await invoke<Response>(name, params).catch(errorToUIError(name));
console.log(result); console.log(result);

View file

@ -0,0 +1,50 @@
type PathLike = string;
type Duration = string;
type SocketAddr = string;
interface RqbitDesktopConfigDht {
disable: boolean;
disable_persistence: boolean;
persistence_filename: PathLike;
}
interface RqbitDesktopConfigTcpListen {
disable: boolean;
min_port: number;
max_port: number;
}
interface RqbitDesktopConfigPersistence {
disable: boolean;
filename: PathLike;
}
interface RqbitDesktopConfigPeerOpts {
connect_timeout: Duration;
read_write_timeout: Duration;
}
interface RqbitDesktopConfigHttpApi {
disable: boolean;
listen_addr: SocketAddr;
read_only: boolean;
}
interface RqbitDesktopConfigUpnp {
disable: boolean;
}
export interface RqbitDesktopConfig {
default_download_location: PathLike;
dht: RqbitDesktopConfigDht;
tcp_listen: RqbitDesktopConfigTcpListen;
upnp: RqbitDesktopConfigUpnp;
persistence: RqbitDesktopConfigPersistence;
peer_opts: RqbitDesktopConfigPeerOpts;
http_api: RqbitDesktopConfigHttpApi;
}
export interface CurrentDesktopState {
config: RqbitDesktopConfig | null,
configured: boolean,
}

315
desktop/src/configure.tsx Normal file
View file

@ -0,0 +1,315 @@
import React, { useState } from "react";
import { RqbitDesktopConfig } from "./configuration";
import { Button, Form, Modal, Row, Tab, Tabs } from "react-bootstrap";
import { ErrorComponent } from "./rqbit-webui-src/rqbit-web";
import { invokeAPI } from "./api";
import { ErrorDetails } from "./rqbit-webui-src/api-types";
const FormCheck: React.FC<{
label: string,
name: string,
checked: boolean,
onChange: (e: any) => void,
disabled?: boolean,
help?: string,
}> = ({ label, name, checked, onChange, disabled, help }) => {
return <Form.Group as={Row} controlId={name} className="mb-3">
<Form.Label className="col-4">{label}</Form.Label>
<div className="col-8">
<Form.Check
type="switch"
name={name}
checked={checked}
onChange={onChange}
disabled={disabled}
/>
</div>
{help && <div className="form-text">{help}</div>}
</Form.Group>
}
const FormInput: React.FC<{
label: string,
name: string,
value: string | number,
inputType: string,
onChange: (e: any) => void,
disabled?: boolean,
help?: string
}> = ({ label, name, value, inputType, onChange, disabled, help }) => {
return <Form.Group as={Row} controlId={name} className="mb-3">
<Form.Label className="col-4 col-form-label">{label}</Form.Label>
<div className="col-8">
<Form.Control
type={inputType}
name={name}
value={value}
onChange={onChange}
disabled={disabled}
/>
</div>
{help && <div className="form-text">{help}</div>}
</Form.Group>
}
export const ConfigModal: React.FC<{
show: boolean,
handleStartReconfigure: () => void,
handleConfigured: (config: RqbitDesktopConfig) => void,
handleCancel?: () => void,
initialConfig: RqbitDesktopConfig,
defaultConfig: RqbitDesktopConfig,
}> = ({ show, handleStartReconfigure, handleConfigured, handleCancel, initialConfig, defaultConfig }) => {
let [config, setConfig] = useState<RqbitDesktopConfig>(initialConfig);
let [loading, setLoading] = useState<boolean>(false);
const [error, setError] = useState<any | null>(null);
const handleInputChange = (e: any) => {
const name: string = e.target.name;
const value: any = e.target.value;
const [mainField, subField] = name.split('.', 2);
if (subField) {
setConfig((prevConfig: any) => ({
...prevConfig,
[mainField]: {
...prevConfig[mainField],
[subField]: value,
},
}));
} else {
setConfig((prevConfig) => ({
...prevConfig,
[name]: value,
}));
}
};
const handleToggleChange = (e: any) => {
const name: string = e.target.name;
const [mainField, subField] = name.split('.', 2);
if (subField) {
setConfig((prevConfig: any) => ({
...prevConfig,
[mainField]: {
...prevConfig[mainField],
[subField]: !prevConfig[mainField][subField],
},
}));
} else {
setConfig((prevConfig: any) => ({
...prevConfig,
[name]: !prevConfig[name],
}));
}
};
const handleOkClick = () => {
setError(null);
handleStartReconfigure();
setLoading(true);
invokeAPI<{}>("config_change", { config }).then(
() => {
setLoading(false);
handleConfigured(config);
},
(e: ErrorDetails) => {
setLoading(false);
setError({
text: "Error saving configuration",
details: e,
});
}
)
};
return (
<Modal show={show} size='xl' onHide={handleCancel}>
<Modal.Header closeButton>
<Modal.Title>Configure Rqbit desktop</Modal.Title>
</Modal.Header>
<Modal.Body>
<ErrorComponent error={error}></ErrorComponent>
<Tabs
defaultActiveKey="home"
id="rqbit-config"
className="mb-3">
<Tab className="mb-3" eventKey="home" title="Home">
<FormInput
label="Default download folder"
name="default_download_location"
value={config.default_download_location}
inputType="text"
onChange={handleInputChange}
help="Where to download torrents by default. You can override this per torrent."
/>
</Tab>
<Tab className="mb-3" eventKey="dht" title="DHT">
<legend>DHT config</legend>
<FormCheck
label="Enable DHT"
name="dht.disable"
checked={!config.dht.disable}
onChange={handleToggleChange}
help="DHT is required to read magnet links. There's no good reason to disable it, unless you know what you are doing."
/>
<FormCheck
label="Enable DHT persistence"
name="dht.disable_persistence"
checked={!config.dht.disable_persistence}
onChange={handleToggleChange}
disabled={config.dht.disable}
help="Enable to store DHT state in a file periodically. If disabled, DHT will bootstrap from scratch on restart."
/>
<FormInput
label="Persistence filename"
name="dht.persistence_filename"
value={config.dht.persistence_filename}
inputType="text"
disabled={config.dht.disable}
onChange={handleInputChange}
help="The filename to store DHT state into"
/>
</Tab>
<Tab className="mb-3" eventKey="tcp_listen" title="TCP">
<legend>TCP Listener config</legend>
<FormCheck
label="Listen on TCP"
name="tcp_listen.disable"
checked={!config.tcp_listen.disable}
onChange={handleToggleChange}
help="Listen for torrent requests on TCP. Required for peers to be able to connect to you, mainly for uploading."
/>
<FormCheck
label="Advertise over UPnP"
name="tcp_listen.disable"
checked={!config.tcp_listen.disable}
onChange={handleToggleChange}
help="Advertise your port over UPnP. This is required for peers to be able to connect to you from the internet. Will only work if your router has a static IP."
/>
<FormInput
inputType="number"
label="Min port"
name="tcp_listen.min_port"
value={config.tcp_listen.min_port}
disabled={config.tcp_listen.disable}
onChange={handleInputChange}
help="The min port to try to listen on. First successful is taken."
/>
<FormInput
inputType="number"
label="Max port"
name="tcp_listen.max_port"
value={config.tcp_listen.max_port}
disabled={config.tcp_listen.disable}
onChange={handleInputChange}
help="The max port to try to listen on."
/>
</Tab>
<Tab className="mb-3" eventKey="session_persistence" title="Session">
<legend>Session persistence</legend>
<FormCheck
label="Enable persistence"
name="persistence.disable"
checked={!config.persistence.disable}
onChange={handleToggleChange}
help="If you disable session persistence, rqbit won't remember the torrents you had before restart."
/>
<FormInput
label="Persistence filename"
name="persistence.filename"
inputType="text"
value={config.persistence.filename}
onChange={handleInputChange}
disabled={config.persistence.disable}
/>
</Tab>
<Tab className="mb-3" eventKey="peer_opts" title="Peer options">
<legend>Peer connection options</legend>
<FormInput
label="Connect timeout (seconds)"
inputType="number"
name="peer_opts.connect_timeout"
value={config.peer_opts.connect_timeout}
onChange={handleInputChange}
help="How much to wait for outgoing connections to connect. Default is low to prefer faster peers."
/>
<FormInput
label="Read/write timeout (seconds)"
inputType="number"
name="peer_opts.read_write_timeout"
value={config.peer_opts.read_write_timeout}
onChange={handleInputChange}
help="Peer socket read/write timeout."
/>
</Tab>
<Tab className="mb-3" eventKey="http_api" title="HTTP API">
<legend>HTTP API config</legend>
<FormCheck
label="Enable HTTP API"
name="http_api.disable"
checked={!config.http_api.disable}
onChange={handleToggleChange}
help="If enabled you can access the HTTP API at the address below"
/>
<FormCheck
label="Read only"
name="http_api.read_only"
checked={config.http_api.read_only}
disabled={config.http_api.disable}
onChange={handleToggleChange}
help="If enabled, only GET requests will be allowed through the API"
/>
<FormInput
label="Listen address"
inputType="text"
name="http_api.listen_addr"
value={config.http_api.listen_addr}
disabled={config.http_api.disable}
onChange={handleInputChange}
help={`You'll access the API at http://${config.http_api.listen_addr}`}
/>
</Tab>
</Tabs>
</Modal.Body>
<Modal.Footer>
{!!handleCancel &&
<Button variant="secondary" onClick={handleCancel}>
Cancel
</Button>
}
<Button variant="secondary" onClick={() => setConfig(defaultConfig)}>
Reset to defaults
</Button>
<Button variant="primary" onClick={handleOkClick} disabled={loading}>
OK
</Button>
</Modal.Footer>
</Modal>
);
};

View file

@ -1,19 +1,29 @@
import { StrictMode } from "react"; import { StrictMode } from "react";
import ReactDOM from 'react-dom/client'; import ReactDOM from 'react-dom/client';
import { APIContext, RqbitWebUI } from "./rqbit-webui-src/rqbit-web"; import { APIContext } from "./rqbit-webui-src/rqbit-web";
import { API } from "./api"; import { API } from "./api";
import { invoke } from "@tauri-apps/api"; import { invoke } from "@tauri-apps/api";
import { CurrentDesktopState, RqbitDesktopConfig } from "./configuration";
import { RqbitDesktop } from "./rqbit-desktop";
let version = invoke<string>("get_version").then((version) => { async function get_version(): Promise<string> {
return invoke<string>("get_version");
}
async function get_default_config(): Promise<RqbitDesktopConfig> {
return invoke<RqbitDesktopConfig>("config_default");
}
async function get_current_config(): Promise<CurrentDesktopState> {
return invoke<CurrentDesktopState>("config_current");
}
Promise.all([get_version(), get_default_config(), get_current_config()]).then(([version, defaultConfig, currentState]) => {
ReactDOM.createRoot(document.getElementById('root') as HTMLElement).render( ReactDOM.createRoot(document.getElementById('root') as HTMLElement).render(
<StrictMode> <StrictMode>
<APIContext.Provider value={API}> <APIContext.Provider value={API}>
<RqbitWebUI title={`Rqbit Desktop v${version}`} /> <RqbitDesktop version={version} defaultConfig={defaultConfig} currentState={currentState} />
</APIContext.Provider> </APIContext.Provider>
</StrictMode> </StrictMode>
); );
}); })

View file

@ -0,0 +1,43 @@
import { useState } from "react";
import { RqbitWebUI } from "./rqbit-webui-src/rqbit-web";
import { CurrentDesktopState, RqbitDesktopConfig } from "./configuration";
import { ConfigModal } from "./configure";
export const RqbitDesktop: React.FC<{
version: string,
defaultConfig: RqbitDesktopConfig,
currentState: CurrentDesktopState,
}> = ({ version, defaultConfig, currentState }) => {
let [configured, setConfigured] = useState<boolean>(currentState.configured);
let [config, setConfig] = useState<RqbitDesktopConfig>(currentState.config ?? defaultConfig);
let [configurationOpened, setConfigurationOpened] = useState<boolean>(false);
return <>
{configured && <RqbitWebUI title={`Rqbit Desktop v${version}`}></RqbitWebUI>}
{configured && <a
className="bi bi-sliders2 position-absolute top-0 start-0 p-3 text-primary"
onClick={(e) => {
e.stopPropagation();
setConfigurationOpened(true);
}}
href="#"
aria-label="Settings" />}
<ConfigModal
show={!configured || configurationOpened}
handleStartReconfigure={() => {
setConfigured(false);
}}
handleCancel={() => {
setConfigurationOpened(false);
}}
handleConfigured={(config) => {
setConfig(config);
setConfigurationOpened(false);
setConfigured(true);
}}
initialConfig={config}
defaultConfig={defaultConfig}
/>
</>
}