Add an HTTP API endpoint + UI widgets to stream logs (#49)

* Added JSON logs to Desktop

* Move logging config into librqbit for reuse

* Log printer now available in both Desktop and Web UI

* Fix JS type error
This commit is contained in:
Igor Katson 2023-12-09 00:26:14 +00:00 committed by GitHub
parent 9385524a1a
commit 2017c5ec94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 462 additions and 333 deletions

View file

@ -2,5 +2,8 @@
"editor.defaultFormatter": "esbenp.prettier-vscode", "editor.defaultFormatter": "esbenp.prettier-vscode",
"[javascript]": { "[javascript]": {
"editor.defaultFormatter": "esbenp.prettier-vscode" "editor.defaultFormatter": "esbenp.prettier-vscode"
},
"[rust]": {
"editor.defaultFormatter": "rust-lang.rust-analyzer"
} }
} }

13
Cargo.lock generated
View file

@ -2667,6 +2667,16 @@ dependencies = [
"tracing-core", "tracing-core",
] ]
[[package]]
name = "tracing-serde"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1"
dependencies = [
"serde",
"tracing-core",
]
[[package]] [[package]]
name = "tracing-subscriber" name = "tracing-subscriber"
version = "0.3.18" version = "0.3.18"
@ -2677,12 +2687,15 @@ dependencies = [
"nu-ansi-term", "nu-ansi-term",
"once_cell", "once_cell",
"regex", "regex",
"serde",
"serde_json",
"sharded-slab", "sharded-slab",
"smallvec", "smallvec",
"thread_local", "thread_local",
"tracing", "tracing",
"tracing-core", "tracing-core",
"tracing-log", "tracing-log",
"tracing-serde",
] ]
[[package]] [[package]]

View file

@ -11,7 +11,7 @@ use std::time::Duration;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use anyhow::Context; use anyhow::Context;
use tracing::{debug, error, error_span, info, trace, warn}; use tracing::{error, error_span, info, trace, warn};
use crate::peer_store::PeerStore; use crate::peer_store::PeerStore;
use crate::routing_table::RoutingTable; use crate::routing_table::RoutingTable;
@ -86,8 +86,8 @@ impl PersistentDht {
}; };
info!( info!(
"will store DHT routing table to {:?} periodically", filename=?config_filename,
&config_filename "will store DHT routing table periodically",
); );
if let Some(parent) = config_filename.parent() { if let Some(parent) = config_filename.parent() {
@ -100,13 +100,14 @@ impl PersistentDht {
let reader = BufReader::new(dht_json); let reader = BufReader::new(dht_json);
match serde_json::from_reader::<_, DhtSerialize<RoutingTable, PeerStore>>(reader) { match serde_json::from_reader::<_, DhtSerialize<RoutingTable, PeerStore>>(reader) {
Ok(r) => { Ok(r) => {
info!("loaded DHT routing table from {:?}", &config_filename); info!(filename=?config_filename, "loaded DHT routing table from");
Some(r) Some(r)
} }
Err(e) => { Err(e) => {
warn!( warn!(
"cannot deserialize routing table from file {:?}: {:#}", filename=?config_filename,
&config_filename, e "cannot deserialize routing table: {:#}",
e
); );
None None
} }
@ -152,9 +153,9 @@ impl PersistentDht {
tokio::time::sleep(dump_interval).await; tokio::time::sleep(dump_interval).await;
match dump_dht(&dht, &config_filename, &tempfile_name) { match dump_dht(&dht, &config_filename, &tempfile_name) {
Ok(_) => debug!("dumped DHT to {:?}", &config_filename), Ok(_) => trace!(filename=?config_filename, "dumped DHT"),
Err(e) => { Err(e) => {
error!("error dumping DHT to {:?}: {:#}", &config_filename, e) error!(filename=?config_filename, "error dumping DHT: {:#}", e)
} }
} }
} }

View file

@ -55,7 +55,7 @@ rand = "0.8"
openssl = {version="0.10", optional=true} openssl = {version="0.10", optional=true}
crypto-hash = {version="0.3", optional=true} crypto-hash = {version="0.3", optional=true}
sha1 = {version = "0.10", optional=true} sha1 = {version = "0.10", optional=true}
tracing-subscriber = {version = "0.3", default-features = false} tracing-subscriber = {version = "0.3", default-features = false, features = ["json"]}
uuid = {version = "1.2", features = ["v4"]} uuid = {version = "1.2", features = ["v4"]}
futures = "0.3" futures = "0.3"

View file

@ -19,7 +19,8 @@ use crate::{
torrent_state::{ torrent_state::{
peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}, peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot},
ManagedTorrentHandle, ManagedTorrentHandle,
}, log_subscriber::LineBroadcast, },
tracing_subscriber_config_utils::LineBroadcast,
}; };
pub use crate::torrent_state::stats::{LiveStats, TorrentStats}; pub use crate::torrent_state::stats::{LiveStats, TorrentStats};
@ -39,12 +40,12 @@ impl Api {
pub fn new( pub fn new(
session: Arc<Session>, session: Arc<Session>,
rust_log_reload_tx: Option<UnboundedSender<String>>, rust_log_reload_tx: Option<UnboundedSender<String>>,
line_broadcast: Option<LineBroadcast> line_broadcast: Option<LineBroadcast>,
) -> Self { ) -> Self {
Self { Self {
session, session,
rust_log_reload_tx, rust_log_reload_tx,
line_broadcast line_broadcast,
} }
} }

View file

@ -262,12 +262,12 @@ impl HttpApi {
}; };
let app = app let app = app
.layer(cors_layer) .layer(cors_layer)
.layer(tower_http::trace::TraceLayer::new_for_http()) .layer(tower_http::trace::TraceLayer::new_for_http())
.with_state(state) .with_state(state)
.into_make_service(); .into_make_service();
info!("starting HTTP server on {}", addr); info!(%addr, "starting HTTP server");
use tokio::net::TcpListener; use tokio::net::TcpListener;
let listener = TcpListener::bind(&addr) let listener = TcpListener::bind(&addr)

View file

@ -29,12 +29,12 @@ mod dht_utils;
mod file_ops; mod file_ops;
pub mod http_api; pub mod http_api;
pub mod http_api_client; pub mod http_api_client;
pub mod log_subscriber;
mod peer_connection; mod peer_connection;
mod peer_info_reader; mod peer_info_reader;
mod session; mod session;
mod spawn_utils; mod spawn_utils;
mod torrent_state; mod torrent_state;
pub mod tracing_subscriber_config_utils;
mod tracker_comms; mod tracker_comms;
mod type_aliases; mod type_aliases;

View file

@ -1,47 +0,0 @@
use std::io::LineWriter;
use bytes::Bytes;
use tracing_subscriber::fmt::MakeWriter;
pub struct Subscriber {
tx: tokio::sync::broadcast::Sender<Bytes>,
}
pub struct Writer {
tx: tokio::sync::broadcast::Sender<Bytes>,
}
pub type LineBroadcast = tokio::sync::broadcast::Sender<Bytes>;
impl Subscriber {
pub fn new() -> (Self, LineBroadcast) {
let (tx, _) = tokio::sync::broadcast::channel(100);
(Self { tx: tx.clone() }, tx)
}
}
impl<'a> MakeWriter<'a> for Subscriber {
type Writer = LineWriter<Writer>;
fn make_writer(&self) -> Self::Writer {
LineWriter::new(Writer {
tx: self.tx.clone(),
})
}
}
impl std::io::Write for Writer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let len = buf.len();
if self.tx.receiver_count() == 0 {
return Ok(len);
}
let arc = buf.to_vec().into();
let _ = self.tx.send(arc);
Ok(len)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

View file

@ -725,7 +725,7 @@ impl Session {
std::fs::rename(&tmp_filename, &self.persistence_filename) std::fs::rename(&tmp_filename, &self.persistence_filename)
.context("error renaming persistence file")?; .context("error renaming persistence file")?;
debug!("wrote persistence to {:?}", &self.persistence_filename); trace!(filename=?self.persistence_filename, "wrote persistence");
Ok(()) Ok(())
} }
@ -779,7 +779,7 @@ impl Session {
}) })
.collect(); .collect();
debug!("querying DHT for {:?}", info_hash); debug!(?info_hash, "querying DHT");
let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver( let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver(
self.peer_id, self.peer_id,
info_hash, info_hash,
@ -794,7 +794,7 @@ impl Session {
anyhow::bail!("DHT died, no way to discover torrent metainfo") anyhow::bail!("DHT died, no way to discover torrent metainfo")
} }
}; };
debug!("received result from DHT: {:?}", info); debug!(?info, "received result from DHT");
( (
info_hash, info_hash,
info, info,
@ -828,7 +828,7 @@ impl Session {
let dht_rx = match self.dht.as_ref() { let dht_rx = match self.dht.as_ref() {
Some(dht) if !opts.paused && !opts.list_only => { Some(dht) if !opts.paused && !opts.list_only => {
debug!("reading peers for {:?} from DHT", torrent.info_hash); debug!(info_hash=?torrent.info_hash, "reading peers from DHT");
Some(dht.get_peers(torrent.info_hash, announce_port)?) Some(dht.get_peers(torrent.info_hash, announce_port)?)
} }
_ => None, _ => None,
@ -911,7 +911,7 @@ impl Session {
continue; continue;
} }
if !list_only { if !list_only {
info!("Will download {:?}", filename); info!(?filename, "will download");
} }
} }
Ok(Some(only_files)) Ok(Some(only_files))
@ -1043,14 +1043,14 @@ impl Session {
match (paused, delete_files) { match (paused, delete_files) {
(Err(e), true) => Err(e).context("torrent deleted, but could not delete files"), (Err(e), true) => Err(e).context("torrent deleted, but could not delete files"),
(Err(e), false) => { (Err(e), false) => {
warn!("could not delete torrent files: {:?}", e); warn!(error=?e, "could not delete torrent files");
Ok(()) Ok(())
} }
(Ok(Some(paused)), true) => { (Ok(Some(paused)), true) => {
drop(paused.files); drop(paused.files);
for file in paused.filenames { for file in paused.filenames {
if let Err(e) = std::fs::remove_file(&file) { if let Err(e) = std::fs::remove_file(&file) {
warn!("could not delete file {:?}: {:?}", file, e); warn!(?file, error=?e, "could not delete file");
} }
} }
Ok(()) Ok(())

View file

@ -0,0 +1,142 @@
use std::io::LineWriter;
use anyhow::Context;
use bytes::Bytes;
use librqbit_core::spawn_utils::spawn;
use tracing::error_span;
use tracing_subscriber::fmt::MakeWriter;
struct Subscriber {
tx: tokio::sync::broadcast::Sender<Bytes>,
}
struct Writer {
tx: tokio::sync::broadcast::Sender<Bytes>,
}
pub type LineBroadcast = tokio::sync::broadcast::Sender<Bytes>;
impl Subscriber {
pub fn new() -> (Self, LineBroadcast) {
let (tx, _) = tokio::sync::broadcast::channel(100);
(Self { tx: tx.clone() }, tx)
}
}
impl<'a> MakeWriter<'a> for Subscriber {
type Writer = LineWriter<Writer>;
fn make_writer(&self) -> Self::Writer {
LineWriter::new(Writer {
tx: self.tx.clone(),
})
}
}
impl std::io::Write for Writer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let len = buf.len();
if self.tx.receiver_count() == 0 {
return Ok(len);
}
let arc = buf.to_vec().into();
let _ = self.tx.send(arc);
Ok(len)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
pub struct InitLoggingOptions<'a> {
pub default_rust_log_value: Option<&'a str>,
pub log_file: Option<&'a str>,
pub log_file_rust_log: Option<&'a str>,
}
pub struct InitLoggingResult {
pub rust_log_reload_tx: tokio::sync::mpsc::UnboundedSender<String>,
pub line_broadcast: LineBroadcast,
}
pub fn init_logging(opts: InitLoggingOptions) -> anyhow::Result<InitLoggingResult> {
let stderr_filter = EnvFilter::builder()
.with_default_directive(
opts.default_rust_log_value
.unwrap_or("info")
.parse()
.context("can't parse provided rust_log value")?,
)
.from_env()
.context("invalid RUST_LOG value")?;
let (stderr_filter, reload_stderr_filter) =
tracing_subscriber::reload::Layer::new(stderr_filter);
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
let (line_sub, line_broadcast) = Subscriber::new();
let layered = tracing_subscriber::registry()
// Stderr logging layer.
.with(fmt::layer().with_filter(stderr_filter))
// HTTP API log broadcast layer.
.with(
fmt::layer()
.with_ansi(false)
.fmt_fields(tracing_subscriber::fmt::format::JsonFields::new())
.event_format(fmt::format().with_ansi(false).json())
.with_writer(line_sub)
.with_filter(EnvFilter::builder().parse("info,librqbit=debug").unwrap()),
);
if let Some(log_file) = &opts.log_file {
let log_file = log_file.to_string();
let log_file = move || {
LineWriter::new(
std::fs::OpenOptions::new()
.create(true)
.append(true)
.write(true)
.open(&log_file)
.with_context(|| format!("error opening log file {:?}", log_file))
.unwrap(),
)
};
layered
.with(
fmt::layer()
.with_ansi(false)
.with_writer(log_file)
.with_filter(
EnvFilter::builder()
.parse(opts.log_file_rust_log.unwrap_or("info,librqbit=debug"))
.context("can't parse log-file-rust-log")?,
),
)
.try_init()
.context("can't init logging")?;
} else {
layered.try_init().context("can't init logging")?;
}
let (reload_tx, mut reload_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
spawn(error_span!("fmt_filter_reloader"), async move {
while let Some(rust_log) = reload_rx.recv().await {
let stderr_env_filter = match EnvFilter::builder().parse(&rust_log) {
Ok(f) => f,
Err(e) => {
eprintln!("can't parse env filter {:?}: {:#?}", rust_log, e);
continue;
}
};
eprintln!("setting RUST_LOG to {:?}", rust_log);
let _ = reload_stderr_filter.reload(stderr_env_filter);
}
Ok(())
});
Ok(InitLoggingResult {
rust_log_reload_tx: reload_tx,
line_broadcast,
})
}

File diff suppressed because one or more lines are too long

View file

@ -4,7 +4,7 @@
"src": "assets/logo.svg" "src": "assets/logo.svg"
}, },
"index.html": { "index.html": {
"file": "assets/index-b673c0d1.js", "file": "assets/index-050cef91.js",
"isEntry": true, "isEntry": true,
"src": "index.html" "src": "index.html"
} }

View file

@ -1,6 +1,13 @@
import React, { useEffect, useState } from "react"; import React, {
useCallback,
useEffect,
useMemo,
useRef,
useState,
} from "react";
import { ErrorWithLabel } from "../rqbit-web"; import { ErrorWithLabel } from "../rqbit-web";
import { ErrorComponent } from "./ErrorComponent"; import { ErrorComponent } from "./ErrorComponent";
import { Form } from "react-bootstrap";
interface LogStreamProps { interface LogStreamProps {
httpApiBase: string; httpApiBase: string;
@ -10,6 +17,8 @@ interface LogStreamProps {
interface Line { interface Line {
id: number; id: number;
content: string; content: string;
parsed: JSONLogLine;
show: boolean;
} }
const mergeBuffers = (a1: Uint8Array, a2: Uint8Array): Uint8Array => { const mergeBuffers = (a1: Uint8Array, a2: Uint8Array): Uint8Array => {
@ -21,13 +30,13 @@ const mergeBuffers = (a1: Uint8Array, a2: Uint8Array): Uint8Array => {
const streamLogs = ( const streamLogs = (
httpApiBase: string, httpApiBase: string,
addLine: (text: string) => void, addLine: React.MutableRefObject<(text: string) => void>,
setError: (error: ErrorWithLabel | null) => void setError: (error: ErrorWithLabel | null) => void
): (() => void) => { ): (() => void) => {
const controller = new AbortController(); const controller = new AbortController();
const signal = controller.signal; const signal = controller.signal;
let canceled = true; let canceled = false;
const cancel = () => { const cancel = () => {
console.log("cancelling fetch"); console.log("cancelling fetch");
@ -90,7 +99,7 @@ const streamLogs = (
} }
let lineBytes = buffer.slice(0, newLineIdx); let lineBytes = buffer.slice(0, newLineIdx);
let line = new TextDecoder().decode(lineBytes); let line = new TextDecoder().decode(lineBytes);
addLine(line); addLine.current(line);
buffer = buffer.slice(newLineIdx + 1); buffer = buffer.slice(newLineIdx + 1);
} }
} }
@ -100,45 +109,125 @@ const streamLogs = (
return cancel; return cancel;
}; };
const SplitByLevelRegexp = new RegExp( type Value = string | number | boolean;
/(.*?) +(INFO|WARN|TRACE|ERROR|DEBUG) +(.*)/
);
const LogLine = ({ line }: { line: string }) => { interface Span {
line.split; name: string;
const getClassNameByLevel = (level: string) => { [key: string]: Value;
}
interface JSONLogLine {
level: string;
timestamp: string;
fields: {
message: string;
[key: string]: Value;
};
target: string;
span: Span;
spans: Span[];
}
const EXAMPLE_LOG_JSON: JSONLogLine = {
timestamp: "2023-12-08T21:48:13.649165Z",
level: "DEBUG",
fields: { message: "successfully port forwarded 192.168.0.112:4225" },
target: "librqbit_upnp",
span: { port: 4225, name: "manage_port" },
spans: [
{ port: 4225, name: "upnp_forward" },
{
location: "http://192.168.0.1:49152/IGDdevicedesc_brlan0.xml",
name: "upnp_endpoint",
},
{ device: "ARRIS TG3492LG", name: "device" },
{ device: "WANDevice:1", name: "device" },
{ device: "WANConnectionDevice:1", name: "device" },
{ url: "/upnp/control/WANIPConnection0", name: "service" },
{ port: 4225, name: "manage_port" },
],
};
const LogLine = ({ line }: { line: Line }) => {
const parsed = line.parsed;
const classNameByLevel = (level: string) => {
switch (level) { switch (level) {
case "DEBUG":
return "text-primary";
case "INFO": case "INFO":
return "text-success"; return "text-success";
case "WARN": case "WARN":
return "text-warning"; return "text-warning";
case "ERROR": case "ERROR":
return "text-danger"; return "text-danger";
case "DEBUG":
return "text-primary";
default: default:
return "text-secondary"; return "text-muted";
} }
}; };
const getContent = () => { const spanFields = (span: Span) => {
let match = line.match(SplitByLevelRegexp); let fields = Object.entries(span).filter(([name, value]) => name != "name");
if (!match) { if (fields.length == 0) {
return line; return null;
} }
const [beforeLevel, level, afterLevel] = match.slice(1);
return ( return (
<> <>
{beforeLevel} {"{"}
<span className={`${getClassNameByLevel(level)} m-2`}>{level}</span> {fields
{afterLevel} .map(([name, value]) => {
return (
<span key={name}>
{name} = {value}
</span>
);
})
.reduce((prev, curr) => (
<>
{prev}, {curr}
</>
))}
{"}"}
</> </>
); );
}; };
return ( return (
<p className="font-monospace m-0" style={{ fontSize: "10px" }}> <p
{getContent()} hidden={!line.show}
className="font-monospace m-0 text-break"
style={{ fontSize: "10px" }}
>
<span className="m-1">{parsed.timestamp}</span>
<span className={`m-1 ${classNameByLevel(parsed.level)}`}>
{parsed.level}
</span>
<span className="m-1">
{parsed.spans?.map((span, i) => (
<span key={i}>
<span className="fw-bold">{span.name}</span>
{spanFields(span)}:
</span>
))}
</span>
<span className="m-1 text-muted">{parsed.target}</span>
<span
className={`m-1 ${
parsed.fields.message.match(/error|fail/g)
? "text-danger"
: "text-muted"
}`}
>
{parsed.fields.message}
{Object.entries(parsed.fields)
.filter(([key, value]) => key != "message")
.map(([key, value]) => (
<span className="m-1" key={key}>
<span className="fst-italic fw-bold">{key}</span>={value}
</span>
))}
</span>
</p> </p>
); );
}; };
@ -149,32 +238,72 @@ export const LogStream: React.FC<LogStreamProps> = ({
}) => { }) => {
const [logLines, setLogLines] = useState<Line[]>([]); const [logLines, setLogLines] = useState<Line[]>([]);
const [error, setError] = useState<ErrorWithLabel | null>(null); const [error, setError] = useState<ErrorWithLabel | null>(null);
const [filter, setFilter] = useState<string>("");
const filterRegex = useRef(new RegExp(""));
const maxL = maxLines ?? 1000; const maxL = maxLines ?? 1000;
const addLine = (text: string) => { const addLine = useCallback(
setLogLines((logLines: Line[]) => { (text: string) => {
const nextLineId = logLines.length == 0 ? 0 : logLines[0].id + 1; setLogLines((logLines: Line[]) => {
const nextLineId = logLines.length == 0 ? 0 : logLines[0].id + 1;
let newLogLines = [ let newLogLines = [
{ {
id: nextLineId, id: nextLineId,
content: text, content: text,
}, parsed: JSON.parse(text) as JSONLogLine,
...logLines.slice(0, maxL - 1), show: !!text.match(filterRegex.current),
]; },
return newLogLines; ...logLines.slice(0, maxL - 1),
}); ];
return newLogLines;
});
},
[filterRegex.current, maxLines]
);
const addLineRef = useRef(addLine);
addLineRef.current = addLine;
const handleFilterChange = (value: string) => {
setFilter(value);
try {
let regex = new RegExp(value);
filterRegex.current = regex;
setLogLines((logLines) => {
let tmp = [...logLines];
for (let line of tmp) {
line.show = !!line.content.match(regex);
}
return tmp;
});
} catch (e) {}
}; };
useEffect(() => { useEffect(() => {
return streamLogs(httpApiBase, addLine, setError); return streamLogs(httpApiBase, addLineRef, setError);
}, [httpApiBase]); }, [httpApiBase]);
return ( return (
<div className="row"> <div className="row">
<ErrorComponent error={error} /> <ErrorComponent error={error} />
<div className="mb-3">
Showing last {maxL} logs since this window was opened
</div>
<Form>
<Form.Group className="mb-3">
<Form.Control
type="text"
value={filter}
placeholder="Enter filter (regex)"
onChange={(e) => handleFilterChange(e.target.value)}
/>
</Form.Group>
</Form>
{logLines.map((line) => ( {logLines.map((line) => (
<LogLine key={line.id} line={line.content} /> <LogLine key={line.id} line={line} />
))} ))}
</div> </div>
); );

View file

@ -27,6 +27,9 @@ export const APIContext = createContext<RqbitAPI>({
delete: () => { delete: () => {
throw new Error("Function not implemented."); throw new Error("Function not implemented.");
}, },
getHttpBaseUrl: () => {
throw new Error("Function not implemented.");
},
}); });
export const AppContext = createContext<ContextType>({ export const AppContext = createContext<ContextType>({
setCloseableError: (_) => {}, setCloseableError: (_) => {},

View file

@ -3,6 +3,9 @@ import { TorrentId, ErrorDetails as ApiErrorDetails } from "./api-types";
import { AppContext, APIContext } from "./context"; import { AppContext, APIContext } from "./context";
import { RootContent } from "./components/RootContent"; import { RootContent } from "./components/RootContent";
import { customSetInterval } from "./helper/customSetInterval"; import { customSetInterval } from "./helper/customSetInterval";
import { IconButton } from "./components/IconButton";
import { BsBodyText } from "react-icons/bs";
import { LogStreamModal } from "./components/LogStreamModal";
export interface ErrorWithLabel { export interface ErrorWithLabel {
text: string; text: string;
@ -14,7 +17,10 @@ export interface ContextType {
refreshTorrents: () => void; refreshTorrents: () => void;
} }
export const RqbitWebUI = (props: { title: string }) => { export const RqbitWebUI = (props: {
title: string;
menuButtons?: JSX.Element[];
}) => {
const [closeableError, setCloseableError] = useState<ErrorWithLabel | null>( const [closeableError, setCloseableError] = useState<ErrorWithLabel | null>(
null null
); );
@ -22,6 +28,8 @@ export const RqbitWebUI = (props: { title: string }) => {
const [torrents, setTorrents] = useState<Array<TorrentId> | null>(null); const [torrents, setTorrents] = useState<Array<TorrentId> | null>(null);
const [torrentsLoading, setTorrentsLoading] = useState(false); const [torrentsLoading, setTorrentsLoading] = useState(false);
let [logsOpened, setLogsOpened] = useState<boolean>(false);
const API = useContext(APIContext); const API = useContext(APIContext);
const refreshTorrents = async () => { const refreshTorrents = async () => {
@ -66,6 +74,17 @@ export const RqbitWebUI = (props: { title: string }) => {
torrentsLoading={torrentsLoading} torrentsLoading={torrentsLoading}
/> />
</div> </div>
{/* Menu buttons */}
<div className="position-absolute top-0 start-0 p-1">
{props.menuButtons &&
props.menuButtons.map((b, i) => <span key={i}>{b}</span>)}
<IconButton onClick={() => setLogsOpened(true)}>
<BsBodyText />
</IconButton>
</div>
<LogStreamModal show={logsOpened} onClose={() => setLogsOpened(false)} />
</AppContext.Provider> </AppContext.Provider>
); );
}; };

View file

@ -1,11 +1,14 @@
use std::{io::LineWriter, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use anyhow::Context; use anyhow::Context;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use librqbit::{ use librqbit::{
api::ApiAddTorrentResponse, http_api::{HttpApi, HttpApiOptions}, http_api_client, librqbit_spawn, AddTorrent, api::ApiAddTorrentResponse,
AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, ManagedTorrentState, http_api::{HttpApi, HttpApiOptions},
PeerConnectionOptions, Session, SessionOptions, log_subscriber::LineBroadcast, Api, http_api_client, librqbit_spawn,
tracing_subscriber_config_utils::{init_logging, InitLoggingOptions},
AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ListOnlyResponse, ManagedTorrentState,
PeerConnectionOptions, Session, SessionOptions,
}; };
use size_format::SizeFormatterBinary as SF; use size_format::SizeFormatterBinary as SF;
use tracing::{error, error_span, info, trace_span, warn}; use tracing::{error, error_span, info, trace_span, warn};
@ -178,125 +181,6 @@ enum SubCommand {
Download(DownloadOpts), Download(DownloadOpts),
} }
struct InitLoggingResult {
rust_log_reload_tx: tokio::sync::mpsc::UnboundedSender<String>,
line_broadcast: LineBroadcast,
}
// Init logging and make a channel to send new RUST_LOG values to.
fn init_logging(opts: &Opts) -> InitLoggingResult {
let default_rust_log = match opts.log_level.as_ref() {
Some(level) => match level {
LogLevel::Trace => "trace",
LogLevel::Debug => "debug",
LogLevel::Info => "info",
LogLevel::Warn => "warn",
LogLevel::Error => "error",
},
None => "info",
};
let stderr_filter = match std::env::var("RUST_LOG").ok() {
Some(rust_log) => EnvFilter::builder()
.parse(rust_log)
.expect("can't parse RUST_LOG"),
None => EnvFilter::builder()
.parse(default_rust_log)
.expect("can't parse default_rust_log"),
};
let (stderr_filter, reload_stderr_filter) =
tracing_subscriber::reload::Layer::new(stderr_filter);
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
let (line_sub, line_broadcast) = librqbit::log_subscriber::Subscriber::new();
#[cfg(feature = "tokio-console")]
{
let (console_layer, server) = console_subscriber::Builder::default()
.with_default_env()
.build();
tracing_subscriber::registry()
.with(fmt::layer().with_filter(stderr_filter))
.with(console_layer)
.init();
spawn(
"console_subscriber server",
error_span!("console_subscriber server"),
async move {
server
.serve()
.await
.map_err(|e| anyhow::anyhow!("{:#?}", e))
.context("error running console subscriber server")
},
);
}
#[cfg(not(feature = "tokio-console"))]
{
let layered = tracing_subscriber::registry()
.with(fmt::layer().with_filter(stderr_filter))
.with(
fmt::layer()
.event_format(fmt::format().with_ansi(false).compact())
.with_ansi(false)
.with_writer(line_sub)
.with_filter(EnvFilter::builder().parse("info").unwrap()),
);
if let Some(log_file) = &opts.log_file {
let log_file = log_file.clone();
let log_file = move || {
LineWriter::new(
std::fs::OpenOptions::new()
.create(true)
.append(true)
.write(true)
.open(&log_file)
.with_context(|| format!("error opening log file {:?}", log_file))
.unwrap(),
)
};
layered
.with(
fmt::layer()
.with_ansi(false)
.with_writer(log_file)
.with_filter(EnvFilter::builder().parse(&opts.log_file_rust_log).unwrap()),
)
.init();
} else {
layered.init();
}
}
let (reload_tx, mut reload_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
librqbit_spawn(
"fmt_filter_reloader",
error_span!("fmt_filter_reloader"),
async move {
while let Some(rust_log) = reload_rx.recv().await {
let stderr_env_filter = match EnvFilter::builder().parse(&rust_log) {
Ok(f) => f,
Err(e) => {
eprintln!("can't parse env filter {:?}: {:#?}", rust_log, e);
continue;
}
};
eprintln!("setting RUST_LOG to {:?}", rust_log);
let _ = reload_stderr_filter.reload(stderr_env_filter);
}
Ok(())
},
);
InitLoggingResult {
rust_log_reload_tx: reload_tx,
line_broadcast,
}
}
fn _start_deadlock_detector_thread() { fn _start_deadlock_detector_thread() {
use parking_lot::deadlock; use parking_lot::deadlock;
use std::thread; use std::thread;
@ -350,7 +234,17 @@ fn main() -> anyhow::Result<()> {
} }
async fn async_main(opts: Opts) -> anyhow::Result<()> { async fn async_main(opts: Opts) -> anyhow::Result<()> {
let log_config = init_logging(&opts); let log_config = init_logging(InitLoggingOptions {
default_rust_log_value: Some(match opts.log_level.unwrap_or(LogLevel::Info) {
LogLevel::Trace => "trace",
LogLevel::Debug => "debug",
LogLevel::Info => "info",
LogLevel::Warn => "warn",
LogLevel::Error => "error",
}),
log_file: opts.log_file.as_deref(),
log_file_rust_log: Some(&opts.log_file_rust_log),
})?;
let mut sopts = SessionOptions { let mut sopts = SessionOptions {
disable_dht: opts.disable_dht, disable_dht: opts.disable_dht,
@ -445,10 +339,14 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
trace_span!("stats_printer"), trace_span!("stats_printer"),
stats_printer(session.clone()), stats_printer(session.clone()),
); );
let api = Api::new(session, Some(log_config.rust_log_reload_tx), Some(log_config.line_broadcast)); let api = Api::new(
session,
Some(log_config.rust_log_reload_tx),
Some(log_config.line_broadcast),
);
let http_api = HttpApi::new( let http_api = HttpApi::new(
api, api,
Some(HttpApiOptions{ Some(HttpApiOptions {
read_only: false, read_only: false,
cors_enable_all: false, cors_enable_all: false,
}), }),
@ -532,9 +430,17 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
trace_span!("stats_printer"), trace_span!("stats_printer"),
stats_printer(session.clone()), stats_printer(session.clone()),
); );
let api = Api::new(session.clone(), Some(log_config.rust_log_reload_tx), Some(log_config.line_broadcast)); let api = Api::new(
session.clone(),
Some(log_config.rust_log_reload_tx),
Some(log_config.line_broadcast),
);
let http_api = HttpApi::new( let http_api = HttpApi::new(
api, Some(HttpApiOptions { cors_enable_all: false, read_only: true }) api,
Some(HttpApiOptions {
cors_enable_all: false,
read_only: true,
}),
); );
let http_api_listen_addr = opts.http_api_listen_addr; let http_api_listen_addr = opts.http_api_listen_addr;
librqbit_spawn( librqbit_spawn(

View file

@ -105,11 +105,11 @@ async fn forward_port(
.await .await
.context("error reading response text")?; .context("error reading response text")?;
trace!("AddPortMapping response: {} {}", status, response_text); trace!(status = %status, text=response_text, "AddPortMapping response");
if !status.is_success() { if !status.is_success() {
bail!("failed port forwarding: {}", status); bail!("failed port forwarding: {}", status);
} else { } else {
debug!("successfully port forwarded {}:{}", local_ip, port); debug!(%local_ip, port, "successfully port forwarded");
} }
Ok(()) Ok(())
} }
@ -155,7 +155,7 @@ impl Device {
} }
pub fn span(&self, parent: tracing::Span) -> tracing::Span { pub fn span(&self, parent: tracing::Span) -> tracing::Span {
error_span!(parent: parent, "device", name = self.name()) error_span!(parent: parent, "device", device = self.name())
} }
} }
@ -355,11 +355,11 @@ impl UpnpPortForwarder {
let response = match std::str::from_utf8(&buffer[..len]) { let response = match std::str::from_utf8(&buffer[..len]) {
Ok(response) => response, Ok(response) => response,
Err(_) => { Err(_) => {
warn!("received invalid utf-8 from {addr}"); warn!(%addr, "received invalid utf-8");
continue; continue;
}, },
}; };
trace!("received response from {addr}: {response}"); trace!(%addr, response, "response");
match parse_upnp_discover_response(response, addr) { match parse_upnp_discover_response(response, addr) {
Ok(r) => { Ok(r) => {
tx.send(r)?; tx.send(r)?;

View file

@ -4126,6 +4126,16 @@ dependencies = [
"tracing-core", "tracing-core",
] ]
[[package]]
name = "tracing-serde"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1"
dependencies = [
"serde",
"tracing-core",
]
[[package]] [[package]]
name = "tracing-subscriber" name = "tracing-subscriber"
version = "0.3.18" version = "0.3.18"
@ -4136,12 +4146,15 @@ dependencies = [
"nu-ansi-term", "nu-ansi-term",
"once_cell", "once_cell",
"regex", "regex",
"serde",
"serde_json",
"sharded-slab", "sharded-slab",
"smallvec", "smallvec",
"thread_local", "thread_local",
"tracing", "tracing",
"tracing-core", "tracing-core",
"tracing-log", "tracing-log",
"tracing-serde",
] ]
[[package]] [[package]]

View file

@ -22,7 +22,7 @@ anyhow = "1.0.75"
base64 = "0.21.5" base64 = "0.21.5"
http = "1.0.0" http = "1.0.0"
directories = "5.0.1" directories = "5.0.1"
tracing-subscriber = {version = "0.3.18", features = ["env-filter"] } tracing-subscriber = {version = "0.3.18", features = ["env-filter", "json"] }
tracing = "0.1" tracing = "0.1"
serde_with = "3.4.0" serde_with = "3.4.0"
parking_lot = "0.12.1" parking_lot = "0.12.1"

View file

@ -19,8 +19,7 @@ use librqbit::{
TorrentStats, TorrentStats,
}, },
dht::PersistentDhtConfig, dht::PersistentDhtConfig,
librqbit_spawn, tracing_subscriber_config_utils::{init_logging, InitLoggingOptions, InitLoggingResult},
log_subscriber::LineBroadcast,
AddTorrent, AddTorrentOptions, Api, ApiError, PeerConnectionOptions, Session, SessionOptions, AddTorrent, AddTorrentOptions, Api, ApiError, PeerConnectionOptions, Session, SessionOptions,
}; };
use parking_lot::RwLock; use parking_lot::RwLock;
@ -35,12 +34,10 @@ struct StateShared {
api: Option<Api>, api: Option<Api>,
} }
type RustLogReloadTx = tokio::sync::mpsc::UnboundedSender<String>;
struct State { struct State {
config_filename: String, config_filename: String,
shared: Arc<RwLock<Option<StateShared>>>, shared: Arc<RwLock<Option<StateShared>>>,
init_logging: InitLogging, init_logging: InitLoggingResult,
} }
fn read_config(path: &str) -> anyhow::Result<RqbitDesktopConfig> { fn read_config(path: &str) -> anyhow::Result<RqbitDesktopConfig> {
@ -65,7 +62,7 @@ fn write_config(path: &str, config: &RqbitDesktopConfig) -> anyhow::Result<()> {
} }
async fn api_from_config( async fn api_from_config(
init_logging: &InitLogging, init_logging: &InitLoggingResult,
config: &RqbitDesktopConfig, config: &RqbitDesktopConfig,
) -> anyhow::Result<Api> { ) -> anyhow::Result<Api> {
let session = Session::new_with_opts( let session = Session::new_with_opts(
@ -98,7 +95,7 @@ async fn api_from_config(
let api = Api::new( let api = Api::new(
session.clone(), session.clone(),
Some(init_logging.reload_stdout_tx.clone()), Some(init_logging.rust_log_reload_tx.clone()),
Some(init_logging.line_broadcast.clone()), Some(init_logging.line_broadcast.clone()),
); );
@ -118,7 +115,7 @@ async fn api_from_config(
} }
impl State { impl State {
async fn new(init_logging: InitLogging) -> Self { async fn new(init_logging: InitLoggingResult) -> Self {
let config_filename = directories::ProjectDirs::from("com", "rqbit", "desktop") let config_filename = directories::ProjectDirs::from("com", "rqbit", "desktop")
.expect("directories::ProjectDirs::from") .expect("directories::ProjectDirs::from")
.config_dir() .config_dir()
@ -302,64 +299,16 @@ fn get_version() -> &'static str {
env!("CARGO_PKG_VERSION") env!("CARGO_PKG_VERSION")
} }
struct InitLogging {
reload_stdout_tx: RustLogReloadTx,
line_broadcast: LineBroadcast,
}
fn init_logging() -> InitLogging {
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
let (stderr_filter, reload_stderr_filter) = tracing_subscriber::reload::Layer::new(
EnvFilter::builder()
.with_default_directive("info".parse().unwrap())
.from_env()
.unwrap(),
);
let (line_sub, line_broadcast) = librqbit::log_subscriber::Subscriber::new();
let layered = tracing_subscriber::registry()
.with(fmt::layer().with_filter(stderr_filter))
.with(
fmt::layer()
.with_ansi(false)
.fmt_fields(tracing_subscriber::fmt::format::DefaultFields::new().delimited(","))
.event_format(fmt::format().with_ansi(false))
.with_writer(line_sub)
.with_filter(EnvFilter::builder().parse("info,librqbit=debug").unwrap()),
);
layered.init();
let (reload_tx, mut reload_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
librqbit_spawn(
"fmt_filter_reloader",
error_span!("fmt_filter_reloader"),
async move {
while let Some(rust_log) = reload_rx.recv().await {
let stderr_env_filter = match EnvFilter::builder().parse(&rust_log) {
Ok(f) => f,
Err(e) => {
eprintln!("can't parse env filter {:?}: {:#?}", rust_log, e);
continue;
}
};
eprintln!("setting RUST_LOG to {:?}", rust_log);
let _ = reload_stderr_filter.reload(stderr_env_filter);
}
Ok(())
},
);
InitLogging {
reload_stdout_tx: reload_tx,
line_broadcast,
}
}
async fn start() { async fn start() {
tauri::async_runtime::set(tokio::runtime::Handle::current()); tauri::async_runtime::set(tokio::runtime::Handle::current());
let rust_log_reload_tx = init_logging(); let init_logging_result = init_logging(InitLoggingOptions {
default_rust_log_value: Some("info"),
log_file: None,
log_file_rust_log: None,
})
.unwrap();
let state = State::new(rust_log_reload_tx).await; let state = State::new(init_logging_result).await;
tauri::Builder::default() tauri::Builder::default()
.manage(state) .manage(state)

View file

@ -18,27 +18,25 @@ export const RqbitDesktop: React.FC<{
currentState.config ?? defaultConfig currentState.config ?? defaultConfig
); );
let [configurationOpened, setConfigurationOpened] = useState<boolean>(false); let [configurationOpened, setConfigurationOpened] = useState<boolean>(false);
let [logsOpened, setLogsOpened] = useState<boolean>(false);
const configButton = (
<IconButton
className="p-3 text-primary"
onClick={() => {
setConfigurationOpened(true);
}}
>
<BsSliders2 />
</IconButton>
);
return ( return (
<APIContext.Provider value={makeAPI(config)}> <APIContext.Provider value={makeAPI(config)}>
{configured && ( {configured && (
<RqbitWebUI title={`Rqbit Desktop v${version}`}></RqbitWebUI> <RqbitWebUI
)} title={`Rqbit Desktop v${version}`}
{configured && ( menuButtons={[configButton]}
<div className="position-absolute top-0 start-0"> ></RqbitWebUI>
<IconButton
className="p-3 text-primary"
onClick={() => {
setConfigurationOpened(true);
}}
>
<BsSliders2 />
</IconButton>
<IconButton onClick={() => setLogsOpened(true)}>
<BsBodyText />
</IconButton>
</div>
)} )}
<ConfigModal <ConfigModal
show={!configured || configurationOpened} show={!configured || configurationOpened}
@ -56,7 +54,6 @@ export const RqbitDesktop: React.FC<{
initialConfig={config} initialConfig={config}
defaultConfig={defaultConfig} defaultConfig={defaultConfig}
/> />
<LogStreamModal show={logsOpened} onClose={() => setLogsOpened(false)} />
</APIContext.Provider> </APIContext.Provider>
); );
}; };