Can now reload RUST_LOG at runtime from HTTP API

This commit is contained in:
Igor Katson 2023-11-25 01:24:57 +00:00
parent fa97dedb98
commit 79bd41a552
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
5 changed files with 69 additions and 18 deletions

View file

@ -1,6 +1,6 @@
use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex}; use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex};
use peer_binary_protocol::Piece; use peer_binary_protocol::Piece;
use tracing::{debug, info}; use tracing::{debug, trace};
use crate::type_aliases::BF; use crate::type_aliases::BF;
@ -129,7 +129,7 @@ impl ChunkTracker {
} }
pub fn mark_piece_broken(&mut self, index: ValidPieceIndex) -> bool { pub fn mark_piece_broken(&mut self, index: ValidPieceIndex) -> bool {
info!("remarking piece={} as broken", index); debug!("remarking piece={} as broken", index);
self.needed_pieces.set(index.get() as usize, true); self.needed_pieces.set(index.get() as usize, true);
self.chunk_status self.chunk_status
.get_mut(self.lengths.chunk_range(index)) .get_mut(self.lengths.chunk_range(index))
@ -170,9 +170,11 @@ impl ChunkTracker {
return Some(ChunkMarkingResult::PreviouslyCompleted); return Some(ChunkMarkingResult::PreviouslyCompleted);
} }
chunk_range.set(chunk_info.chunk_index as usize, true); chunk_range.set(chunk_info.chunk_index as usize, true);
debug!( trace!(
"piece={}, chunk_info={:?}, bits={:?}", "piece={}, chunk_info={:?}, bits={:?}",
piece.index, chunk_info, chunk_range, piece.index,
chunk_info,
chunk_range,
); );
if chunk_range.all() { if chunk_range.all() {

View file

@ -14,6 +14,7 @@ use std::net::SocketAddr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{info, warn}; use tracing::{info, warn};
use axum::Router; use axum::Router;
@ -33,9 +34,9 @@ pub struct HttpApi {
} }
impl HttpApi { impl HttpApi {
pub fn new(session: Arc<Session>) -> Self { pub fn new(session: Arc<Session>, rust_log_reload_tx: Option<UnboundedSender<String>>) -> Self {
Self { Self {
inner: Arc::new(ApiInternal::new(session)), inner: Arc::new(ApiInternal::new(session, rust_log_reload_tx)),
} }
} }
@ -51,11 +52,14 @@ impl HttpApi {
"GET /torrents": "List torrents (default torrent is 0)", "GET /torrents": "List torrents (default torrent is 0)",
"GET /torrents/{index}": "Torrent details", "GET /torrents/{index}": "Torrent details",
"GET /torrents/{index}/haves": "The bitfield of have pieces", "GET /torrents/{index}/haves": "The bitfield of have pieces",
"GET /torrents/{index}/stats": "Torrent stats", "GET /torrents/{index}/stats/v1": "Torrent stats",
"GET /torrents/{index}/peer_stats": "Per peer stats", "GET /torrents/{index}/peer_stats": "Per peer stats",
// This is kind of not secure as it just reads any local file that it has access to, "POST /torrents/{index}/pause": "Pause torrent",
// or any URL, but whatever, ok for our purposes / threat model. "POST /torrents/{index}/start": "Resume torrent",
"POST /torrents/{index}/forget": "Forget about the torrent, keep the files",
"POST /torrents/{index}/delete": "Forget about the torrent, remove the files",
"POST /torrents": "Add a torrent here. magnet: or http:// or a local file.", "POST /torrents": "Add a torrent here. magnet: or http:// or a local file.",
"POST /rust_log": "Set RUST_LOG to this post launch (for debugging)",
"GET /web/": "Web UI", "GET /web/": "Web UI",
}, },
"server": "rqbit", "server": "rqbit",
@ -151,6 +155,13 @@ impl HttpApi {
state.api_torrent_action_delete(idx).map(axum::Json) state.api_torrent_action_delete(idx).map(axum::Json)
} }
async fn set_rust_log(
State(state): State<ApiState>,
new_value: String,
) -> Result<impl IntoResponse> {
state.api_set_rust_log(new_value).map(axum::Json)
}
#[allow(unused_mut)] #[allow(unused_mut)]
let mut app = Router::new() let mut app = Router::new()
.route("/", get(api_root)) .route("/", get(api_root))
@ -165,7 +176,8 @@ impl HttpApi {
.route("/torrents/:id/pause", post(torrent_action_pause)) .route("/torrents/:id/pause", post(torrent_action_pause))
.route("/torrents/:id/start", post(torrent_action_start)) .route("/torrents/:id/start", post(torrent_action_start))
.route("/torrents/:id/forget", post(torrent_action_forget)) .route("/torrents/:id/forget", post(torrent_action_forget))
.route("/torrents/:id/delete", post(torrent_action_delete)); .route("/torrents/:id/delete", post(torrent_action_delete))
.route("/rust_log", post(set_rust_log));
#[cfg(feature = "webui")] #[cfg(feature = "webui")]
{ {
@ -383,15 +395,17 @@ impl TorrentAddQueryParams {
struct ApiInternal { struct ApiInternal {
startup_time: Instant, startup_time: Instant,
session: Arc<Session>, session: Arc<Session>,
rust_log_reload_tx: Option<UnboundedSender<String>>,
} }
type ApiState = Arc<ApiInternal>; type ApiState = Arc<ApiInternal>;
impl ApiInternal { impl ApiInternal {
pub fn new(session: Arc<Session>) -> Self { pub fn new(session: Arc<Session>, rust_log_reload_tx: Option<UnboundedSender<String>>) -> Self {
Self { Self {
startup_time: Instant::now(), startup_time: Instant::now(),
session, session,
rust_log_reload_tx,
} }
} }
@ -460,6 +474,16 @@ impl ApiInternal {
Ok(Default::default()) Ok(Default::default())
} }
fn api_set_rust_log(&self, new_value: String) -> Result<EmptyJsonResponse> {
let tx = self
.rust_log_reload_tx
.as_ref()
.context("rust_log_reload_tx was not set")?;
tx.send(new_value)
.context("noone is listening to RUST_LOG changes")?;
Ok(Default::default())
}
pub async fn api_add_torrent( pub async fn api_add_torrent(
&self, &self,
add: AddTorrent<'_>, add: AddTorrent<'_>,

View file

@ -730,7 +730,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
} }
Message::Have(h) => self.on_have(h), Message::Have(h) => self.on_have(h),
Message::NotInterested => { Message::NotInterested => {
info!("received \"not interested\", but we don't care yet") debug!("received \"not interested\", but we don't care yet")
} }
message => { message => {
warn!("received unsupported message {:?}, ignoring", message); warn!("received unsupported message {:?}, ignoring", message);

View file

@ -13,7 +13,7 @@ readme = "README.md"
[features] [features]
default = ["sha1-system", "default-tls", "webui"] default = ["sha1-system", "default-tls", "webui"]
tokio-console = ["console-subscriber"] tokio-console = ["console-subscriber",]
webui = ["librqbit/webui"] webui = ["librqbit/webui"]
timed_existence = ["librqbit/timed_existence"] timed_existence = ["librqbit/timed_existence"]
sha1-system = ["librqbit/sha1-system"] sha1-system = ["librqbit/sha1-system"]

View file

@ -134,7 +134,8 @@ enum SubCommand {
Download(DownloadOpts), Download(DownloadOpts),
} }
fn init_logging(opts: &Opts) { // Iint logging and make a channel to send new RUST_LOG values to.
fn init_logging(opts: &Opts) -> tokio::sync::mpsc::UnboundedSender<String> {
let default_rust_log = match opts.log_level.as_ref() { let default_rust_log = match opts.log_level.as_ref() {
Some(level) => match level { Some(level) => match level {
LogLevel::Trace => "trace", LogLevel::Trace => "trace",
@ -147,13 +148,16 @@ fn init_logging(opts: &Opts) {
}; };
let stderr_filter = match std::env::var("RUST_LOG").ok() { let stderr_filter = match std::env::var("RUST_LOG").ok() {
Some(rust_log) => EnvFilter::builder() Some(rust_log) => EnvFilter::builder()
.parse(&rust_log) .parse(rust_log)
.expect("can't parse RUST_LOG"), .expect("can't parse RUST_LOG"),
None => EnvFilter::builder() None => EnvFilter::builder()
.parse(default_rust_log) .parse(default_rust_log)
.expect("can't parse default_rust_log"), .expect("can't parse default_rust_log"),
}; };
let (stderr_filter, reload_stderr_filter) =
tracing_subscriber::reload::Layer::new(stderr_filter);
use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use tracing_subscriber::{fmt, prelude::*, EnvFilter};
#[cfg(feature = "tokio-console")] #[cfg(feature = "tokio-console")]
@ -187,6 +191,27 @@ fn init_logging(opts: &Opts) {
.with(stderr_filter) .with(stderr_filter)
.init(); .init();
} }
let (reload_tx, mut reload_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
spawn(
"fmt_filter_reloader",
error_span!("fmt_filter_reloader"),
async move {
while let Some(rust_log) = reload_rx.recv().await {
let stderr_env_filter = match EnvFilter::builder().parse(&rust_log) {
Ok(f) => f,
Err(e) => {
eprintln!("can't parse env filter {:?}: {:#?}", rust_log, e);
continue;
}
};
eprintln!("setting RUST_LOG to {:?}", rust_log);
let _ = reload_stderr_filter.reload(stderr_env_filter);
}
Ok(())
},
);
reload_tx
} }
fn _start_deadlock_detector_thread() { fn _start_deadlock_detector_thread() {
@ -248,7 +273,7 @@ fn main() -> anyhow::Result<()> {
} }
async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> { async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> {
init_logging(&opts); let logging_reload_tx = init_logging(&opts);
let sopts = SessionOptions { let sopts = SessionOptions {
disable_dht: opts.disable_dht, disable_dht: opts.disable_dht,
@ -331,7 +356,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
trace_span!("stats_printer"), trace_span!("stats_printer"),
stats_printer(session.clone()), stats_printer(session.clone()),
); );
let http_api = HttpApi::new(session); let http_api = HttpApi::new(session, Some(logging_reload_tx));
let http_api_listen_addr = opts.http_api_listen_addr; let http_api_listen_addr = opts.http_api_listen_addr;
http_api http_api
.make_http_api_and_run(http_api_listen_addr) .make_http_api_and_run(http_api_listen_addr)
@ -411,7 +436,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
trace_span!("stats_printer"), trace_span!("stats_printer"),
stats_printer(session.clone()), stats_printer(session.clone()),
); );
let http_api = HttpApi::new(session.clone()); let http_api = HttpApi::new(session.clone(), Some(logging_reload_tx));
let http_api_listen_addr = opts.http_api_listen_addr; let http_api_listen_addr = opts.http_api_listen_addr;
spawn( spawn(
"http_api", "http_api",