Desktop: button to show a modal with logs (#48)
* Add an endpoint to stream logs /stream_logs * Display logs in desktop app * UI component to stream logs
This commit is contained in:
parent
f7345ae6df
commit
9385524a1a
21 changed files with 521 additions and 125 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
|
@ -1262,6 +1262,7 @@ dependencies = [
|
|||
"bincode",
|
||||
"bitvec",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"crypto-hash",
|
||||
"dashmap",
|
||||
"futures",
|
||||
|
|
@ -2008,6 +2009,7 @@ name = "rqbit"
|
|||
version = "5.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"clap",
|
||||
"console-subscriber",
|
||||
"futures",
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ rand = "0.8"
|
|||
openssl = {version="0.10", optional=true}
|
||||
crypto-hash = {version="0.3", optional=true}
|
||||
sha1 = {version = "0.10", optional=true}
|
||||
tracing-subscriber = {version = "0.3", default-features = false}
|
||||
|
||||
uuid = {version = "1.2", features = ["v4"]}
|
||||
futures = "0.3"
|
||||
|
|
@ -65,6 +66,7 @@ dashmap = "5.5.3"
|
|||
base64 = "0.21.5"
|
||||
serde_with = "3.4.0"
|
||||
tokio-util = "0.7.10"
|
||||
bytes = "1.5.0"
|
||||
|
||||
[dev-dependencies]
|
||||
futures = {version = "0.3"}
|
||||
|
|
|
|||
|
|
@ -3,10 +3,12 @@ use std::{net::SocketAddr, sync::Arc};
|
|||
use anyhow::Context;
|
||||
use buffers::ByteString;
|
||||
use dht::{DhtStats, Id20};
|
||||
use futures::Stream;
|
||||
use http::StatusCode;
|
||||
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
|
||||
use tracing::warn;
|
||||
|
||||
use crate::{
|
||||
|
|
@ -17,7 +19,7 @@ use crate::{
|
|||
torrent_state::{
|
||||
peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot},
|
||||
ManagedTorrentHandle,
|
||||
},
|
||||
}, log_subscriber::LineBroadcast,
|
||||
};
|
||||
|
||||
pub use crate::torrent_state::stats::{LiveStats, TorrentStats};
|
||||
|
|
@ -30,13 +32,19 @@ pub type Result<T> = std::result::Result<T, ApiError>;
|
|||
pub struct Api {
|
||||
session: Arc<Session>,
|
||||
rust_log_reload_tx: Option<UnboundedSender<String>>,
|
||||
line_broadcast: Option<LineBroadcast>,
|
||||
}
|
||||
|
||||
impl Api {
|
||||
pub fn new(session: Arc<Session>, rust_log_reload_tx: Option<UnboundedSender<String>>) -> Self {
|
||||
pub fn new(
|
||||
session: Arc<Session>,
|
||||
rust_log_reload_tx: Option<UnboundedSender<String>>,
|
||||
line_broadcast: Option<LineBroadcast>
|
||||
) -> Self {
|
||||
Self {
|
||||
session,
|
||||
rust_log_reload_tx,
|
||||
line_broadcast
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -123,6 +131,21 @@ impl Api {
|
|||
Ok(Default::default())
|
||||
}
|
||||
|
||||
pub fn api_log_lines_stream(
|
||||
&self,
|
||||
) -> Result<
|
||||
impl Stream<Item = std::result::Result<bytes::Bytes, BroadcastStreamRecvError>>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
> {
|
||||
Ok(self
|
||||
.line_broadcast
|
||||
.as_ref()
|
||||
.map(|sender| BroadcastStream::new(sender.subscribe()))
|
||||
.context("line_rx wasn't set")?)
|
||||
}
|
||||
|
||||
pub async fn api_add_torrent(
|
||||
&self,
|
||||
add: AddTorrent<'_>,
|
||||
|
|
|
|||
|
|
@ -8,42 +8,43 @@ use itertools::Itertools;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use std::net::SocketAddr;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tracing::info;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use axum::Router;
|
||||
|
||||
use crate::api::Api;
|
||||
use crate::peer_connection::PeerConnectionOptions;
|
||||
use crate::session::{AddTorrent, AddTorrentOptions, Session, SUPPORTED_SCHEMES};
|
||||
use crate::session::{AddTorrent, AddTorrentOptions, SUPPORTED_SCHEMES};
|
||||
use crate::torrent_state::peer::stats::snapshot::PeerStatsFilter;
|
||||
|
||||
type ApiState = Arc<Api>;
|
||||
type ApiState = Api;
|
||||
|
||||
use crate::api::Result;
|
||||
|
||||
/// An HTTP server for the API.
|
||||
#[derive(Clone)]
|
||||
pub struct HttpApi {
|
||||
inner: ApiState,
|
||||
opts: HttpApiOptions,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct HttpApiOptions {
|
||||
pub cors_enable_all: bool,
|
||||
pub read_only: bool,
|
||||
}
|
||||
|
||||
impl HttpApi {
|
||||
pub fn new(session: Arc<Session>, rust_log_reload_tx: Option<UnboundedSender<String>>) -> Self {
|
||||
pub fn new(api: Api, opts: Option<HttpApiOptions>) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Api::new(session, rust_log_reload_tx)),
|
||||
inner: api,
|
||||
opts: opts.unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the HTTP server forever on the given address.
|
||||
/// If read_only is passed, no state-modifying methods will be exposed.
|
||||
pub async fn make_http_api_and_run(
|
||||
self,
|
||||
addr: SocketAddr,
|
||||
read_only: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> {
|
||||
let state = self.inner;
|
||||
|
||||
async fn api_root() -> impl IntoResponse {
|
||||
|
|
@ -185,8 +186,14 @@ impl HttpApi {
|
|||
state.api_set_rust_log(new_value).map(axum::Json)
|
||||
}
|
||||
|
||||
async fn stream_logs(State(state): State<ApiState>) -> Result<impl IntoResponse> {
|
||||
let s = state.api_log_lines_stream()?;
|
||||
Ok(axum::body::Body::from_stream(s))
|
||||
}
|
||||
|
||||
let mut app = Router::new()
|
||||
.route("/", get(api_root))
|
||||
.route("/stream_logs", get(stream_logs))
|
||||
.route("/rust_log", post(set_rust_log))
|
||||
.route("/dht/stats", get(dht_stats))
|
||||
.route("/dht/table", get(dht_table))
|
||||
|
|
@ -197,7 +204,7 @@ impl HttpApi {
|
|||
.route("/torrents/:id/stats/v1", get(torrent_stats_v1))
|
||||
.route("/torrents/:id/peer_stats", get(peer_stats));
|
||||
|
||||
if !read_only {
|
||||
if !self.opts.read_only {
|
||||
app = app
|
||||
.route("/torrents", post(torrents_post))
|
||||
.route("/torrents/:id/pause", post(torrent_action_pause))
|
||||
|
|
@ -208,7 +215,6 @@ impl HttpApi {
|
|||
|
||||
#[cfg(feature = "webui")]
|
||||
{
|
||||
use tracing::warn;
|
||||
let webui_router = Router::new()
|
||||
.route(
|
||||
"/",
|
||||
|
|
@ -238,23 +244,25 @@ impl HttpApi {
|
|||
}),
|
||||
);
|
||||
|
||||
// This is to develop webui by just doing "open index.html && tsc --watch"
|
||||
let cors_layer = std::env::var("CORS_DEBUG")
|
||||
.ok()
|
||||
.map(|_| {
|
||||
use tower_http::cors::{AllowHeaders, AllowOrigin};
|
||||
|
||||
warn!("CorsLayer: allowing everything because CORS_DEBUG is set");
|
||||
tower_http::cors::CorsLayer::default()
|
||||
.allow_origin(AllowOrigin::predicate(|_, _| true))
|
||||
.allow_headers(AllowHeaders::any())
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
app = app.nest("/web/", webui_router).layer(cors_layer);
|
||||
app = app.nest("/web/", webui_router);
|
||||
}
|
||||
|
||||
let enable_cors = std::env::var("CORS_DEBUG").is_ok() || self.opts.cors_enable_all;
|
||||
|
||||
// This is to develop webui by just doing "open index.html && tsc --watch"
|
||||
let cors_layer = if enable_cors {
|
||||
use tower_http::cors::{AllowHeaders, AllowOrigin};
|
||||
|
||||
warn!("CorsLayer: allowing everything");
|
||||
tower_http::cors::CorsLayer::default()
|
||||
.allow_origin(AllowOrigin::predicate(|_, _| true))
|
||||
.allow_headers(AllowHeaders::any())
|
||||
} else {
|
||||
Default::default()
|
||||
};
|
||||
|
||||
let app = app
|
||||
.layer(cors_layer)
|
||||
.layer(tower_http::trace::TraceLayer::new_for_http())
|
||||
.with_state(state)
|
||||
.into_make_service();
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ mod dht_utils;
|
|||
mod file_ops;
|
||||
pub mod http_api;
|
||||
pub mod http_api_client;
|
||||
pub mod log_subscriber;
|
||||
mod peer_connection;
|
||||
mod peer_info_reader;
|
||||
mod session;
|
||||
|
|
|
|||
47
crates/librqbit/src/log_subscriber.rs
Normal file
47
crates/librqbit/src/log_subscriber.rs
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
use std::io::LineWriter;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tracing_subscriber::fmt::MakeWriter;
|
||||
|
||||
pub struct Subscriber {
|
||||
tx: tokio::sync::broadcast::Sender<Bytes>,
|
||||
}
|
||||
|
||||
pub struct Writer {
|
||||
tx: tokio::sync::broadcast::Sender<Bytes>,
|
||||
}
|
||||
|
||||
pub type LineBroadcast = tokio::sync::broadcast::Sender<Bytes>;
|
||||
|
||||
impl Subscriber {
|
||||
pub fn new() -> (Self, LineBroadcast) {
|
||||
let (tx, _) = tokio::sync::broadcast::channel(100);
|
||||
(Self { tx: tx.clone() }, tx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> MakeWriter<'a> for Subscriber {
|
||||
type Writer = LineWriter<Writer>;
|
||||
|
||||
fn make_writer(&self) -> Self::Writer {
|
||||
LineWriter::new(Writer {
|
||||
tx: self.tx.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::io::Write for Writer {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
let len = buf.len();
|
||||
if self.tx.receiver_count() == 0 {
|
||||
return Ok(len);
|
||||
}
|
||||
let arc = buf.to_vec().into();
|
||||
let _ = self.tx.send(arc);
|
||||
Ok(len)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
@ -117,6 +117,7 @@ export interface AddTorrentOptions {
|
|||
}
|
||||
|
||||
export interface RqbitAPI {
|
||||
getHttpBaseUrl: () => string | null;
|
||||
listTorrents: () => Promise<ListTorrentsResponse>;
|
||||
getTorrentDetails: (index: number) => Promise<TorrentDetails>;
|
||||
getTorrentStats: (index: number) => Promise<TorrentStats>;
|
||||
|
|
|
|||
181
crates/librqbit/webui/src/components/LogStream.tsx
Normal file
181
crates/librqbit/webui/src/components/LogStream.tsx
Normal file
|
|
@ -0,0 +1,181 @@
|
|||
import React, { useEffect, useState } from "react";
|
||||
import { ErrorWithLabel } from "../rqbit-web";
|
||||
import { ErrorComponent } from "./ErrorComponent";
|
||||
|
||||
interface LogStreamProps {
|
||||
httpApiBase: string;
|
||||
maxLines?: number;
|
||||
}
|
||||
|
||||
interface Line {
|
||||
id: number;
|
||||
content: string;
|
||||
}
|
||||
|
||||
const mergeBuffers = (a1: Uint8Array, a2: Uint8Array): Uint8Array => {
|
||||
const merged = new Uint8Array(a1.length + a2.length);
|
||||
merged.set(a1);
|
||||
merged.set(a2, a1.length);
|
||||
return merged;
|
||||
};
|
||||
|
||||
const streamLogs = (
|
||||
httpApiBase: string,
|
||||
addLine: (text: string) => void,
|
||||
setError: (error: ErrorWithLabel | null) => void
|
||||
): (() => void) => {
|
||||
const controller = new AbortController();
|
||||
const signal = controller.signal;
|
||||
|
||||
let canceled = true;
|
||||
|
||||
const cancel = () => {
|
||||
console.log("cancelling fetch");
|
||||
canceled = true;
|
||||
controller.abort();
|
||||
};
|
||||
|
||||
const run = async () => {
|
||||
let response = null;
|
||||
try {
|
||||
response = await fetch(httpApiBase + "/stream_logs", { signal });
|
||||
} catch (e: any) {
|
||||
if (canceled) {
|
||||
return;
|
||||
}
|
||||
setError({
|
||||
text: "network error fetching logs",
|
||||
details: {
|
||||
text: e.toString(),
|
||||
},
|
||||
});
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!response.ok) {
|
||||
let text = await response.text();
|
||||
setError({
|
||||
text: "error fetching logs",
|
||||
details: {
|
||||
statusText: response.statusText,
|
||||
text,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
setError({
|
||||
text: "error fetching logs: ReadableStream not supported.",
|
||||
});
|
||||
throw new Error("ReadableStream not supported.");
|
||||
}
|
||||
|
||||
const reader = response.body.getReader();
|
||||
|
||||
let buffer = new Uint8Array();
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
|
||||
if (done) {
|
||||
// Handle stream completion or errors
|
||||
break;
|
||||
}
|
||||
|
||||
buffer = mergeBuffers(buffer, value);
|
||||
|
||||
while (true) {
|
||||
const newLineIdx = buffer.indexOf(10);
|
||||
if (newLineIdx === -1) {
|
||||
break;
|
||||
}
|
||||
let lineBytes = buffer.slice(0, newLineIdx);
|
||||
let line = new TextDecoder().decode(lineBytes);
|
||||
addLine(line);
|
||||
buffer = buffer.slice(newLineIdx + 1);
|
||||
}
|
||||
}
|
||||
};
|
||||
run();
|
||||
|
||||
return cancel;
|
||||
};
|
||||
|
||||
const SplitByLevelRegexp = new RegExp(
|
||||
/(.*?) +(INFO|WARN|TRACE|ERROR|DEBUG) +(.*)/
|
||||
);
|
||||
|
||||
const LogLine = ({ line }: { line: string }) => {
|
||||
line.split;
|
||||
const getClassNameByLevel = (level: string) => {
|
||||
switch (level) {
|
||||
case "INFO":
|
||||
return "text-success";
|
||||
case "WARN":
|
||||
return "text-warning";
|
||||
case "ERROR":
|
||||
return "text-danger";
|
||||
case "DEBUG":
|
||||
return "text-primary";
|
||||
default:
|
||||
return "text-secondary";
|
||||
}
|
||||
};
|
||||
|
||||
const getContent = () => {
|
||||
let match = line.match(SplitByLevelRegexp);
|
||||
if (!match) {
|
||||
return line;
|
||||
}
|
||||
const [beforeLevel, level, afterLevel] = match.slice(1);
|
||||
return (
|
||||
<>
|
||||
{beforeLevel}
|
||||
<span className={`${getClassNameByLevel(level)} m-2`}>{level}</span>
|
||||
{afterLevel}
|
||||
</>
|
||||
);
|
||||
};
|
||||
|
||||
return (
|
||||
<p className="font-monospace m-0" style={{ fontSize: "10px" }}>
|
||||
{getContent()}
|
||||
</p>
|
||||
);
|
||||
};
|
||||
|
||||
export const LogStream: React.FC<LogStreamProps> = ({
|
||||
httpApiBase,
|
||||
maxLines,
|
||||
}) => {
|
||||
const [logLines, setLogLines] = useState<Line[]>([]);
|
||||
const [error, setError] = useState<ErrorWithLabel | null>(null);
|
||||
const maxL = maxLines ?? 1000;
|
||||
|
||||
const addLine = (text: string) => {
|
||||
setLogLines((logLines: Line[]) => {
|
||||
const nextLineId = logLines.length == 0 ? 0 : logLines[0].id + 1;
|
||||
|
||||
let newLogLines = [
|
||||
{
|
||||
id: nextLineId,
|
||||
content: text,
|
||||
},
|
||||
...logLines.slice(0, maxL - 1),
|
||||
];
|
||||
return newLogLines;
|
||||
});
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
return streamLogs(httpApiBase, addLine, setError);
|
||||
}, [httpApiBase]);
|
||||
|
||||
return (
|
||||
<div className="row">
|
||||
<ErrorComponent error={error} />
|
||||
{logLines.map((line) => (
|
||||
<LogLine key={line.id} line={line.content} />
|
||||
))}
|
||||
</div>
|
||||
);
|
||||
};
|
||||
37
crates/librqbit/webui/src/components/LogStreamModal.tsx
Normal file
37
crates/librqbit/webui/src/components/LogStreamModal.tsx
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
import { useContext } from "react";
|
||||
import { Button, Modal } from "react-bootstrap";
|
||||
import { APIContext } from "../context";
|
||||
import { ErrorComponent } from "./ErrorComponent";
|
||||
import { LogStream } from "./LogStream";
|
||||
|
||||
interface Props {
|
||||
show: boolean;
|
||||
onClose: () => void;
|
||||
}
|
||||
|
||||
export const LogStreamModal: React.FC<Props> = ({ show, onClose }) => {
|
||||
const api = useContext(APIContext);
|
||||
const apiBase = api.getHttpBaseUrl();
|
||||
|
||||
return (
|
||||
<Modal size="xl" show={show} onHide={onClose}>
|
||||
<Modal.Header closeButton>
|
||||
<Modal.Title>rqbit server logs</Modal.Title>
|
||||
</Modal.Header>
|
||||
<Modal.Body>
|
||||
{apiBase ? (
|
||||
<LogStream httpApiBase={apiBase} />
|
||||
) : (
|
||||
<ErrorComponent
|
||||
error={{ text: "HTTP API not available to stream logs" }}
|
||||
></ErrorComponent>
|
||||
)}
|
||||
</Modal.Body>
|
||||
<Modal.Footer>
|
||||
<Button variant="primary" onClick={onClose}>
|
||||
Close
|
||||
</Button>
|
||||
</Modal.Footer>
|
||||
</Modal>
|
||||
);
|
||||
};
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
import { useContext } from "react";
|
||||
import { useContext, useState } from "react";
|
||||
import { Container } from "react-bootstrap";
|
||||
import { TorrentId, ErrorDetails as ApiErrorDetails } from "../api-types";
|
||||
import { AppContext } from "../context";
|
||||
import { APIContext, AppContext } from "../context";
|
||||
import { TorrentsList } from "./TorrentsList";
|
||||
import { ErrorComponent } from "./ErrorComponent";
|
||||
import { Buttons } from "./Buttons";
|
||||
|
|
|
|||
|
|
@ -64,6 +64,7 @@ const makeRequest = async (
|
|||
};
|
||||
|
||||
export const API: RqbitAPI & { getVersion: () => Promise<string> } = {
|
||||
getHttpBaseUrl: () => apiUrl,
|
||||
listTorrents: (): Promise<ListTorrentsResponse> =>
|
||||
makeRequest("GET", "/torrents"),
|
||||
getTorrentDetails: (index: number): Promise<TorrentDetails> => {
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ reqwest = "0.11"
|
|||
serde = {version = "1", features=["derive"]}
|
||||
serde_json = "1"
|
||||
size_format = "1"
|
||||
bytes = "1.5.0"
|
||||
|
||||
[dev-dependencies]
|
||||
futures = {version = "0.3"}
|
||||
futures = {version = "0.3"}
|
||||
|
|
|
|||
|
|
@ -3,9 +3,9 @@ use std::{io::LineWriter, net::SocketAddr, path::PathBuf, sync::Arc, time::Durat
|
|||
use anyhow::Context;
|
||||
use clap::{Parser, ValueEnum};
|
||||
use librqbit::{
|
||||
api::ApiAddTorrentResponse, http_api::HttpApi, http_api_client, librqbit_spawn, AddTorrent,
|
||||
api::ApiAddTorrentResponse, http_api::{HttpApi, HttpApiOptions}, http_api_client, librqbit_spawn, AddTorrent,
|
||||
AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, ManagedTorrentState,
|
||||
PeerConnectionOptions, Session, SessionOptions,
|
||||
PeerConnectionOptions, Session, SessionOptions, log_subscriber::LineBroadcast, Api,
|
||||
};
|
||||
use size_format::SizeFormatterBinary as SF;
|
||||
use tracing::{error, error_span, info, trace_span, warn};
|
||||
|
|
@ -178,8 +178,13 @@ enum SubCommand {
|
|||
Download(DownloadOpts),
|
||||
}
|
||||
|
||||
struct InitLoggingResult {
|
||||
rust_log_reload_tx: tokio::sync::mpsc::UnboundedSender<String>,
|
||||
line_broadcast: LineBroadcast,
|
||||
}
|
||||
|
||||
// Init logging and make a channel to send new RUST_LOG values to.
|
||||
fn init_logging(opts: &Opts) -> tokio::sync::mpsc::UnboundedSender<String> {
|
||||
fn init_logging(opts: &Opts) -> InitLoggingResult {
|
||||
let default_rust_log = match opts.log_level.as_ref() {
|
||||
Some(level) => match level {
|
||||
LogLevel::Trace => "trace",
|
||||
|
|
@ -204,6 +209,8 @@ fn init_logging(opts: &Opts) -> tokio::sync::mpsc::UnboundedSender<String> {
|
|||
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
|
||||
let (line_sub, line_broadcast) = librqbit::log_subscriber::Subscriber::new();
|
||||
|
||||
#[cfg(feature = "tokio-console")]
|
||||
{
|
||||
let (console_layer, server) = console_subscriber::Builder::default()
|
||||
|
|
@ -230,7 +237,15 @@ fn init_logging(opts: &Opts) -> tokio::sync::mpsc::UnboundedSender<String> {
|
|||
|
||||
#[cfg(not(feature = "tokio-console"))]
|
||||
{
|
||||
let layered = tracing_subscriber::registry().with(fmt::layer().with_filter(stderr_filter));
|
||||
let layered = tracing_subscriber::registry()
|
||||
.with(fmt::layer().with_filter(stderr_filter))
|
||||
.with(
|
||||
fmt::layer()
|
||||
.event_format(fmt::format().with_ansi(false).compact())
|
||||
.with_ansi(false)
|
||||
.with_writer(line_sub)
|
||||
.with_filter(EnvFilter::builder().parse("info").unwrap()),
|
||||
);
|
||||
if let Some(log_file) = &opts.log_file {
|
||||
let log_file = log_file.clone();
|
||||
let log_file = move || {
|
||||
|
|
@ -276,7 +291,10 @@ fn init_logging(opts: &Opts) -> tokio::sync::mpsc::UnboundedSender<String> {
|
|||
Ok(())
|
||||
},
|
||||
);
|
||||
reload_tx
|
||||
InitLoggingResult {
|
||||
rust_log_reload_tx: reload_tx,
|
||||
line_broadcast,
|
||||
}
|
||||
}
|
||||
|
||||
fn _start_deadlock_detector_thread() {
|
||||
|
|
@ -332,7 +350,7 @@ fn main() -> anyhow::Result<()> {
|
|||
}
|
||||
|
||||
async fn async_main(opts: Opts) -> anyhow::Result<()> {
|
||||
let logging_reload_tx = init_logging(&opts);
|
||||
let log_config = init_logging(&opts);
|
||||
|
||||
let mut sopts = SessionOptions {
|
||||
disable_dht: opts.disable_dht,
|
||||
|
|
@ -427,10 +445,17 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
|
|||
trace_span!("stats_printer"),
|
||||
stats_printer(session.clone()),
|
||||
);
|
||||
let http_api = HttpApi::new(session, Some(logging_reload_tx));
|
||||
let api = Api::new(session, Some(log_config.rust_log_reload_tx), Some(log_config.line_broadcast));
|
||||
let http_api = HttpApi::new(
|
||||
api,
|
||||
Some(HttpApiOptions{
|
||||
read_only: false,
|
||||
cors_enable_all: false,
|
||||
}),
|
||||
);
|
||||
let http_api_listen_addr = opts.http_api_listen_addr;
|
||||
http_api
|
||||
.make_http_api_and_run(http_api_listen_addr, false)
|
||||
.make_http_api_and_run(http_api_listen_addr)
|
||||
.await
|
||||
.context("error running HTTP API")
|
||||
}
|
||||
|
|
@ -507,14 +532,15 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
|
|||
trace_span!("stats_printer"),
|
||||
stats_printer(session.clone()),
|
||||
);
|
||||
let http_api = HttpApi::new(session.clone(), Some(logging_reload_tx));
|
||||
let api = Api::new(session.clone(), Some(log_config.rust_log_reload_tx), Some(log_config.line_broadcast));
|
||||
let http_api = HttpApi::new(
|
||||
api, Some(HttpApiOptions { cors_enable_all: false, read_only: true })
|
||||
);
|
||||
let http_api_listen_addr = opts.http_api_listen_addr;
|
||||
librqbit_spawn(
|
||||
"http_api",
|
||||
error_span!("http_api"),
|
||||
http_api
|
||||
.clone()
|
||||
.make_http_api_and_run(http_api_listen_addr, true),
|
||||
http_api.make_http_api_and_run(http_api_listen_addr),
|
||||
);
|
||||
|
||||
let mut added = false;
|
||||
|
|
|
|||
2
desktop/src-tauri/Cargo.lock
generated
2
desktop/src-tauri/Cargo.lock
generated
|
|
@ -1876,6 +1876,7 @@ dependencies = [
|
|||
"bincode",
|
||||
"bitvec",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"dashmap",
|
||||
"futures",
|
||||
"hex 0.4.3",
|
||||
|
|
@ -1903,6 +1904,7 @@ dependencies = [
|
|||
"tokio-util",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"urlencoding",
|
||||
"uuid",
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
|
|||
use serde_with::serde_as;
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct RqbitDesktopConfigDht {
|
||||
pub disable: bool,
|
||||
pub disable_persistence: bool,
|
||||
|
|
@ -26,6 +27,7 @@ impl Default for RqbitDesktopConfigDht {
|
|||
}
|
||||
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct RqbitDesktopConfigTcpListen {
|
||||
pub disable: bool,
|
||||
pub min_port: u16,
|
||||
|
|
@ -44,6 +46,7 @@ impl Default for RqbitDesktopConfigTcpListen {
|
|||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct RqbitDesktopConfigPersistence {
|
||||
pub disable: bool,
|
||||
pub filename: PathBuf,
|
||||
|
|
@ -60,6 +63,7 @@ impl Default for RqbitDesktopConfigPersistence {
|
|||
|
||||
#[serde_as]
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct RqbitDesktopConfigPeerOpts {
|
||||
#[serde_as(as = "serde_with::DurationSeconds")]
|
||||
pub connect_timeout: Duration,
|
||||
|
|
@ -79,10 +83,12 @@ impl Default for RqbitDesktopConfigPeerOpts {
|
|||
|
||||
#[serde_as]
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct RqbitDesktopConfigHttpApi {
|
||||
pub disable: bool,
|
||||
pub listen_addr: SocketAddr,
|
||||
pub read_only: bool,
|
||||
pub cors_enable_all: bool,
|
||||
}
|
||||
|
||||
impl Default for RqbitDesktopConfigHttpApi {
|
||||
|
|
@ -91,16 +97,19 @@ impl Default for RqbitDesktopConfigHttpApi {
|
|||
disable: Default::default(),
|
||||
listen_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 3030)),
|
||||
read_only: false,
|
||||
cors_enable_all: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct RqbitDesktopConfigUpnp {
|
||||
pub disable: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct RqbitDesktopConfig {
|
||||
pub default_download_location: PathBuf,
|
||||
pub dht: RqbitDesktopConfigDht,
|
||||
|
|
|
|||
|
|
@ -19,8 +19,9 @@ use librqbit::{
|
|||
TorrentStats,
|
||||
},
|
||||
dht::PersistentDhtConfig,
|
||||
librqbit_spawn, AddTorrent, AddTorrentOptions, Api, ApiError, PeerConnectionOptions, Session,
|
||||
SessionOptions,
|
||||
librqbit_spawn,
|
||||
log_subscriber::LineBroadcast,
|
||||
AddTorrent, AddTorrentOptions, Api, ApiError, PeerConnectionOptions, Session, SessionOptions,
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
use serde::Serialize;
|
||||
|
|
@ -36,12 +37,10 @@ struct StateShared {
|
|||
|
||||
type RustLogReloadTx = tokio::sync::mpsc::UnboundedSender<String>;
|
||||
|
||||
impl StateShared {}
|
||||
|
||||
struct State {
|
||||
config_filename: String,
|
||||
shared: Arc<RwLock<Option<StateShared>>>,
|
||||
rust_log_reload_tx: RustLogReloadTx,
|
||||
init_logging: InitLogging,
|
||||
}
|
||||
|
||||
fn read_config(path: &str) -> anyhow::Result<RqbitDesktopConfig> {
|
||||
|
|
@ -66,7 +65,7 @@ fn write_config(path: &str, config: &RqbitDesktopConfig) -> anyhow::Result<()> {
|
|||
}
|
||||
|
||||
async fn api_from_config(
|
||||
rust_log_reload_tx: &RustLogReloadTx,
|
||||
init_logging: &InitLogging,
|
||||
config: &RqbitDesktopConfig,
|
||||
) -> anyhow::Result<Api> {
|
||||
let session = Session::new_with_opts(
|
||||
|
|
@ -97,12 +96,21 @@ async fn api_from_config(
|
|||
.await
|
||||
.context("couldn't set up librqbit session")?;
|
||||
|
||||
let api = Api::new(session.clone(), None);
|
||||
let api = Api::new(
|
||||
session.clone(),
|
||||
Some(init_logging.reload_stdout_tx.clone()),
|
||||
Some(init_logging.line_broadcast.clone()),
|
||||
);
|
||||
|
||||
if !config.http_api.disable {
|
||||
let http_api_task =
|
||||
librqbit::http_api::HttpApi::new(session.clone(), Some(rust_log_reload_tx.clone()))
|
||||
.make_http_api_and_run(config.http_api.listen_addr, config.http_api.read_only);
|
||||
let http_api_task = librqbit::http_api::HttpApi::new(
|
||||
api.clone(),
|
||||
Some(librqbit::http_api::HttpApiOptions {
|
||||
cors_enable_all: config.http_api.cors_enable_all,
|
||||
read_only: config.http_api.read_only,
|
||||
}),
|
||||
)
|
||||
.make_http_api_and_run(config.http_api.listen_addr);
|
||||
|
||||
session.spawn(error_span!("http_api"), http_api_task);
|
||||
}
|
||||
|
|
@ -110,7 +118,7 @@ async fn api_from_config(
|
|||
}
|
||||
|
||||
impl State {
|
||||
async fn new(rust_log_reload_tx: tokio::sync::mpsc::UnboundedSender<String>) -> Self {
|
||||
async fn new(init_logging: InitLogging) -> Self {
|
||||
let config_filename = directories::ProjectDirs::from("com", "rqbit", "desktop")
|
||||
.expect("directories::ProjectDirs::from")
|
||||
.config_dir()
|
||||
|
|
@ -120,19 +128,19 @@ impl State {
|
|||
.to_owned();
|
||||
|
||||
if let Ok(config) = read_config(&config_filename) {
|
||||
let api = api_from_config(&rust_log_reload_tx, &config).await.ok();
|
||||
let api = api_from_config(&init_logging, &config).await.ok();
|
||||
let shared = Arc::new(RwLock::new(Some(StateShared { config, api })));
|
||||
|
||||
return Self {
|
||||
config_filename,
|
||||
shared,
|
||||
rust_log_reload_tx,
|
||||
init_logging,
|
||||
};
|
||||
}
|
||||
|
||||
Self {
|
||||
config_filename,
|
||||
rust_log_reload_tx,
|
||||
init_logging,
|
||||
shared: Arc::new(RwLock::new(None)),
|
||||
}
|
||||
}
|
||||
|
|
@ -162,7 +170,7 @@ impl State {
|
|||
api.session().stop().await;
|
||||
}
|
||||
|
||||
let api = api_from_config(&self.rust_log_reload_tx, &config).await?;
|
||||
let api = api_from_config(&self.init_logging, &config).await?;
|
||||
if let Err(e) = write_config(&self.config_filename, &config) {
|
||||
error!("error writing config: {:#}", e);
|
||||
}
|
||||
|
|
@ -294,12 +302,32 @@ fn get_version() -> &'static str {
|
|||
env!("CARGO_PKG_VERSION")
|
||||
}
|
||||
|
||||
fn init_logging() -> tokio::sync::mpsc::UnboundedSender<String> {
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
let (stderr_filter, reload_stderr_filter) =
|
||||
tracing_subscriber::reload::Layer::new(EnvFilter::builder().parse("info").unwrap());
|
||||
struct InitLogging {
|
||||
reload_stdout_tx: RustLogReloadTx,
|
||||
line_broadcast: LineBroadcast,
|
||||
}
|
||||
|
||||
let layered = tracing_subscriber::registry().with(fmt::layer().with_filter(stderr_filter));
|
||||
fn init_logging() -> InitLogging {
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
let (stderr_filter, reload_stderr_filter) = tracing_subscriber::reload::Layer::new(
|
||||
EnvFilter::builder()
|
||||
.with_default_directive("info".parse().unwrap())
|
||||
.from_env()
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let (line_sub, line_broadcast) = librqbit::log_subscriber::Subscriber::new();
|
||||
|
||||
let layered = tracing_subscriber::registry()
|
||||
.with(fmt::layer().with_filter(stderr_filter))
|
||||
.with(
|
||||
fmt::layer()
|
||||
.with_ansi(false)
|
||||
.fmt_fields(tracing_subscriber::fmt::format::DefaultFields::new().delimited(","))
|
||||
.event_format(fmt::format().with_ansi(false))
|
||||
.with_writer(line_sub)
|
||||
.with_filter(EnvFilter::builder().parse("info,librqbit=debug").unwrap()),
|
||||
);
|
||||
layered.init();
|
||||
|
||||
let (reload_tx, mut reload_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
|
|
@ -321,7 +349,10 @@ fn init_logging() -> tokio::sync::mpsc::UnboundedSender<String> {
|
|||
Ok(())
|
||||
},
|
||||
);
|
||||
reload_tx
|
||||
InitLogging {
|
||||
reload_stdout_tx: reload_tx,
|
||||
line_broadcast,
|
||||
}
|
||||
}
|
||||
|
||||
async fn start() {
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import { RqbitDesktopConfig } from "./configuration";
|
||||
import {
|
||||
AddTorrentResponse,
|
||||
ListTorrentsResponse,
|
||||
|
|
@ -66,42 +67,49 @@ async function readFileAsBase64(file: File): Promise<string> {
|
|||
});
|
||||
}
|
||||
|
||||
export const API: RqbitAPI = {
|
||||
listTorrents: async function (): Promise<ListTorrentsResponse> {
|
||||
return await invokeAPI<ListTorrentsResponse>("torrents_list");
|
||||
},
|
||||
getTorrentDetails: async function (id: number): Promise<TorrentDetails> {
|
||||
return await invokeAPI<TorrentDetails>("torrent_details", { id });
|
||||
},
|
||||
getTorrentStats: async function (id: number): Promise<TorrentStats> {
|
||||
return await invokeAPI<TorrentStats>("torrent_stats", { id });
|
||||
},
|
||||
uploadTorrent: async function (data, opts): Promise<AddTorrentResponse> {
|
||||
if (data instanceof File) {
|
||||
let contents = await readFileAsBase64(data);
|
||||
return await invokeAPI<AddTorrentResponse>(
|
||||
"torrent_create_from_base64_file",
|
||||
{
|
||||
contents,
|
||||
opts: opts ?? {},
|
||||
}
|
||||
);
|
||||
}
|
||||
return await invokeAPI<AddTorrentResponse>("torrent_create_from_url", {
|
||||
url: data,
|
||||
opts: opts ?? {},
|
||||
});
|
||||
},
|
||||
pause: function (id: number): Promise<void> {
|
||||
return invokeAPI<void>("torrent_action_pause", { id });
|
||||
},
|
||||
start: function (id: number): Promise<void> {
|
||||
return invokeAPI<void>("torrent_action_start", { id });
|
||||
},
|
||||
forget: function (id: number): Promise<void> {
|
||||
return invokeAPI<void>("torrent_action_forget", { id });
|
||||
},
|
||||
delete: function (id: number): Promise<void> {
|
||||
return invokeAPI<void>("torrent_action_delete", { id });
|
||||
},
|
||||
export const makeAPI = (configuration: RqbitDesktopConfig): RqbitAPI => {
|
||||
return {
|
||||
getHttpBaseUrl: () => {
|
||||
return configuration.http_api.listen_addr
|
||||
? `http://${configuration.http_api.listen_addr}`
|
||||
: null;
|
||||
},
|
||||
listTorrents: async function (): Promise<ListTorrentsResponse> {
|
||||
return await invokeAPI<ListTorrentsResponse>("torrents_list");
|
||||
},
|
||||
getTorrentDetails: async function (id: number): Promise<TorrentDetails> {
|
||||
return await invokeAPI<TorrentDetails>("torrent_details", { id });
|
||||
},
|
||||
getTorrentStats: async function (id: number): Promise<TorrentStats> {
|
||||
return await invokeAPI<TorrentStats>("torrent_stats", { id });
|
||||
},
|
||||
uploadTorrent: async function (data, opts): Promise<AddTorrentResponse> {
|
||||
if (data instanceof File) {
|
||||
let contents = await readFileAsBase64(data);
|
||||
return await invokeAPI<AddTorrentResponse>(
|
||||
"torrent_create_from_base64_file",
|
||||
{
|
||||
contents,
|
||||
opts: opts ?? {},
|
||||
}
|
||||
);
|
||||
}
|
||||
return await invokeAPI<AddTorrentResponse>("torrent_create_from_url", {
|
||||
url: data,
|
||||
opts: opts ?? {},
|
||||
});
|
||||
},
|
||||
pause: function (id: number): Promise<void> {
|
||||
return invokeAPI<void>("torrent_action_pause", { id });
|
||||
},
|
||||
start: function (id: number): Promise<void> {
|
||||
return invokeAPI<void>("torrent_action_start", { id });
|
||||
},
|
||||
forget: function (id: number): Promise<void> {
|
||||
return invokeAPI<void>("torrent_action_forget", { id });
|
||||
},
|
||||
delete: function (id: number): Promise<void> {
|
||||
return invokeAPI<void>("torrent_action_delete", { id });
|
||||
},
|
||||
};
|
||||
};
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ interface RqbitDesktopConfigHttpApi {
|
|||
disable: boolean;
|
||||
listen_addr: SocketAddr;
|
||||
read_only: boolean;
|
||||
cors_enable_all: boolean;
|
||||
}
|
||||
|
||||
interface RqbitDesktopConfigUpnp {
|
||||
|
|
|
|||
|
|
@ -295,6 +295,15 @@ export const ConfigModal: React.FC<{
|
|||
help="If enabled, only GET requests will be allowed through the API"
|
||||
/>
|
||||
|
||||
<FormCheck
|
||||
label="CORS any"
|
||||
name="http_api.cors_enable_all"
|
||||
checked={config.http_api.cors_enable_all}
|
||||
disabled={config.http_api.disable}
|
||||
onChange={handleToggleChange}
|
||||
help="If enabled, the API will allow Cross Origin requests (including this app)"
|
||||
/>
|
||||
|
||||
<FormInput
|
||||
label="Listen address"
|
||||
inputType="text"
|
||||
|
|
|
|||
|
|
@ -1,10 +1,8 @@
|
|||
import { StrictMode } from "react";
|
||||
import ReactDOM from "react-dom/client";
|
||||
import { API } from "./api";
|
||||
import { invoke } from "@tauri-apps/api";
|
||||
import { CurrentDesktopState, RqbitDesktopConfig } from "./configuration";
|
||||
import { RqbitDesktop } from "./rqbit-desktop";
|
||||
import { APIContext } from "./rqbit-webui-src/context";
|
||||
|
||||
async function get_version(): Promise<string> {
|
||||
return invoke<string>("get_version");
|
||||
|
|
@ -22,13 +20,11 @@ Promise.all([get_version(), get_default_config(), get_current_config()]).then(
|
|||
([version, defaultConfig, currentState]) => {
|
||||
ReactDOM.createRoot(document.getElementById("root") as HTMLElement).render(
|
||||
<StrictMode>
|
||||
<APIContext.Provider value={API}>
|
||||
<RqbitDesktop
|
||||
version={version}
|
||||
defaultConfig={defaultConfig}
|
||||
currentState={currentState}
|
||||
/>
|
||||
</APIContext.Provider>
|
||||
<RqbitDesktop
|
||||
version={version}
|
||||
defaultConfig={defaultConfig}
|
||||
currentState={currentState}
|
||||
/>
|
||||
</StrictMode>
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,10 @@ import { RqbitWebUI } from "./rqbit-webui-src/rqbit-web";
|
|||
import { CurrentDesktopState, RqbitDesktopConfig } from "./configuration";
|
||||
import { ConfigModal } from "./configure";
|
||||
import { IconButton } from "./rqbit-webui-src/components/IconButton";
|
||||
import { BsSliders2 } from "react-icons/bs";
|
||||
import { BsBodyText, BsSliders2 } from "react-icons/bs";
|
||||
import { LogStreamModal } from "./rqbit-webui-src/components/LogStreamModal";
|
||||
import { APIContext } from "./rqbit-webui-src/context";
|
||||
import { makeAPI } from "./api";
|
||||
|
||||
export const RqbitDesktop: React.FC<{
|
||||
version: string;
|
||||
|
|
@ -15,21 +18,27 @@ export const RqbitDesktop: React.FC<{
|
|||
currentState.config ?? defaultConfig
|
||||
);
|
||||
let [configurationOpened, setConfigurationOpened] = useState<boolean>(false);
|
||||
let [logsOpened, setLogsOpened] = useState<boolean>(false);
|
||||
|
||||
return (
|
||||
<>
|
||||
<APIContext.Provider value={makeAPI(config)}>
|
||||
{configured && (
|
||||
<RqbitWebUI title={`Rqbit Desktop v${version}`}></RqbitWebUI>
|
||||
)}
|
||||
{configured && (
|
||||
<IconButton
|
||||
className="position-absolute top-0 start-0 p-3 text-primary"
|
||||
onClick={() => {
|
||||
setConfigurationOpened(true);
|
||||
}}
|
||||
>
|
||||
<BsSliders2 />
|
||||
</IconButton>
|
||||
<div className="position-absolute top-0 start-0">
|
||||
<IconButton
|
||||
className="p-3 text-primary"
|
||||
onClick={() => {
|
||||
setConfigurationOpened(true);
|
||||
}}
|
||||
>
|
||||
<BsSliders2 />
|
||||
</IconButton>
|
||||
<IconButton onClick={() => setLogsOpened(true)}>
|
||||
<BsBodyText />
|
||||
</IconButton>
|
||||
</div>
|
||||
)}
|
||||
<ConfigModal
|
||||
show={!configured || configurationOpened}
|
||||
|
|
@ -47,6 +56,7 @@ export const RqbitDesktop: React.FC<{
|
|||
initialConfig={config}
|
||||
defaultConfig={defaultConfig}
|
||||
/>
|
||||
</>
|
||||
<LogStreamModal show={logsOpened} onClose={() => setLogsOpened(false)} />
|
||||
</APIContext.Provider>
|
||||
);
|
||||
};
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue