From 79bd41a552289c8c7b2fd43fa9408cfc8565451b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 25 Nov 2023 01:24:57 +0000 Subject: [PATCH] Can now reload RUST_LOG at runtime from HTTP API --- crates/librqbit/src/chunk_tracker.rs | 10 +++-- crates/librqbit/src/http_api.rs | 38 +++++++++++++++---- crates/librqbit/src/torrent_state/live/mod.rs | 2 +- crates/rqbit/Cargo.toml | 2 +- crates/rqbit/src/main.rs | 35 ++++++++++++++--- 5 files changed, 69 insertions(+), 18 deletions(-) diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 008e13e..6f0ec5e 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -1,6 +1,6 @@ use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex}; use peer_binary_protocol::Piece; -use tracing::{debug, info}; +use tracing::{debug, trace}; use crate::type_aliases::BF; @@ -129,7 +129,7 @@ impl ChunkTracker { } pub fn mark_piece_broken(&mut self, index: ValidPieceIndex) -> bool { - info!("remarking piece={} as broken", index); + debug!("remarking piece={} as broken", index); self.needed_pieces.set(index.get() as usize, true); self.chunk_status .get_mut(self.lengths.chunk_range(index)) @@ -170,9 +170,11 @@ impl ChunkTracker { return Some(ChunkMarkingResult::PreviouslyCompleted); } chunk_range.set(chunk_info.chunk_index as usize, true); - debug!( + trace!( "piece={}, chunk_info={:?}, bits={:?}", - piece.index, chunk_info, chunk_range, + piece.index, + chunk_info, + chunk_range, ); if chunk_range.all() { diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 0b112ba..4266c6a 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -14,6 +14,7 @@ use std::net::SocketAddr; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; +use tokio::sync::mpsc::UnboundedSender; use tracing::{info, warn}; use axum::Router; @@ -33,9 +34,9 @@ pub struct HttpApi { } impl HttpApi { - pub fn new(session: Arc) -> Self { + pub fn new(session: Arc, rust_log_reload_tx: Option>) -> Self { Self { - inner: Arc::new(ApiInternal::new(session)), + inner: Arc::new(ApiInternal::new(session, rust_log_reload_tx)), } } @@ -51,11 +52,14 @@ impl HttpApi { "GET /torrents": "List torrents (default torrent is 0)", "GET /torrents/{index}": "Torrent details", "GET /torrents/{index}/haves": "The bitfield of have pieces", - "GET /torrents/{index}/stats": "Torrent stats", + "GET /torrents/{index}/stats/v1": "Torrent stats", "GET /torrents/{index}/peer_stats": "Per peer stats", - // This is kind of not secure as it just reads any local file that it has access to, - // or any URL, but whatever, ok for our purposes / threat model. + "POST /torrents/{index}/pause": "Pause torrent", + "POST /torrents/{index}/start": "Resume torrent", + "POST /torrents/{index}/forget": "Forget about the torrent, keep the files", + "POST /torrents/{index}/delete": "Forget about the torrent, remove the files", "POST /torrents": "Add a torrent here. magnet: or http:// or a local file.", + "POST /rust_log": "Set RUST_LOG to this post launch (for debugging)", "GET /web/": "Web UI", }, "server": "rqbit", @@ -151,6 +155,13 @@ impl HttpApi { state.api_torrent_action_delete(idx).map(axum::Json) } + async fn set_rust_log( + State(state): State, + new_value: String, + ) -> Result { + state.api_set_rust_log(new_value).map(axum::Json) + } + #[allow(unused_mut)] let mut app = Router::new() .route("/", get(api_root)) @@ -165,7 +176,8 @@ impl HttpApi { .route("/torrents/:id/pause", post(torrent_action_pause)) .route("/torrents/:id/start", post(torrent_action_start)) .route("/torrents/:id/forget", post(torrent_action_forget)) - .route("/torrents/:id/delete", post(torrent_action_delete)); + .route("/torrents/:id/delete", post(torrent_action_delete)) + .route("/rust_log", post(set_rust_log)); #[cfg(feature = "webui")] { @@ -383,15 +395,17 @@ impl TorrentAddQueryParams { struct ApiInternal { startup_time: Instant, session: Arc, + rust_log_reload_tx: Option>, } type ApiState = Arc; impl ApiInternal { - pub fn new(session: Arc) -> Self { + pub fn new(session: Arc, rust_log_reload_tx: Option>) -> Self { Self { startup_time: Instant::now(), session, + rust_log_reload_tx, } } @@ -460,6 +474,16 @@ impl ApiInternal { Ok(Default::default()) } + fn api_set_rust_log(&self, new_value: String) -> Result { + let tx = self + .rust_log_reload_tx + .as_ref() + .context("rust_log_reload_tx was not set")?; + tx.send(new_value) + .context("noone is listening to RUST_LOG changes")?; + Ok(Default::default()) + } + pub async fn api_add_torrent( &self, add: AddTorrent<'_>, diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 68f035d..4045d85 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -730,7 +730,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { } Message::Have(h) => self.on_have(h), Message::NotInterested => { - info!("received \"not interested\", but we don't care yet") + debug!("received \"not interested\", but we don't care yet") } message => { warn!("received unsupported message {:?}, ignoring", message); diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index 4e73d12..a0c5410 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -13,7 +13,7 @@ readme = "README.md" [features] default = ["sha1-system", "default-tls", "webui"] -tokio-console = ["console-subscriber"] +tokio-console = ["console-subscriber",] webui = ["librqbit/webui"] timed_existence = ["librqbit/timed_existence"] sha1-system = ["librqbit/sha1-system"] diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 87becb3..4e79124 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -134,7 +134,8 @@ enum SubCommand { Download(DownloadOpts), } -fn init_logging(opts: &Opts) { +// Iint logging and make a channel to send new RUST_LOG values to. +fn init_logging(opts: &Opts) -> tokio::sync::mpsc::UnboundedSender { let default_rust_log = match opts.log_level.as_ref() { Some(level) => match level { LogLevel::Trace => "trace", @@ -147,13 +148,16 @@ fn init_logging(opts: &Opts) { }; let stderr_filter = match std::env::var("RUST_LOG").ok() { Some(rust_log) => EnvFilter::builder() - .parse(&rust_log) + .parse(rust_log) .expect("can't parse RUST_LOG"), None => EnvFilter::builder() .parse(default_rust_log) .expect("can't parse default_rust_log"), }; + let (stderr_filter, reload_stderr_filter) = + tracing_subscriber::reload::Layer::new(stderr_filter); + use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[cfg(feature = "tokio-console")] @@ -187,6 +191,27 @@ fn init_logging(opts: &Opts) { .with(stderr_filter) .init(); } + + let (reload_tx, mut reload_rx) = tokio::sync::mpsc::unbounded_channel::(); + spawn( + "fmt_filter_reloader", + error_span!("fmt_filter_reloader"), + async move { + while let Some(rust_log) = reload_rx.recv().await { + let stderr_env_filter = match EnvFilter::builder().parse(&rust_log) { + Ok(f) => f, + Err(e) => { + eprintln!("can't parse env filter {:?}: {:#?}", rust_log, e); + continue; + } + }; + eprintln!("setting RUST_LOG to {:?}", rust_log); + let _ = reload_stderr_filter.reload(stderr_env_filter); + } + Ok(()) + }, + ); + reload_tx } fn _start_deadlock_detector_thread() { @@ -248,7 +273,7 @@ fn main() -> anyhow::Result<()> { } async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> { - init_logging(&opts); + let logging_reload_tx = init_logging(&opts); let sopts = SessionOptions { disable_dht: opts.disable_dht, @@ -331,7 +356,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> trace_span!("stats_printer"), stats_printer(session.clone()), ); - let http_api = HttpApi::new(session); + let http_api = HttpApi::new(session, Some(logging_reload_tx)); let http_api_listen_addr = opts.http_api_listen_addr; http_api .make_http_api_and_run(http_api_listen_addr) @@ -411,7 +436,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> trace_span!("stats_printer"), stats_printer(session.clone()), ); - let http_api = HttpApi::new(session.clone()); + let http_api = HttpApi::new(session.clone(), Some(logging_reload_tx)); let http_api_listen_addr = opts.http_api_listen_addr; spawn( "http_api",