From 17b243921d3e127d5635f4bf8b2d8d020b16a82d Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 25 Nov 2023 00:24:32 +0000 Subject: [PATCH] Continuing refactor --- .cargo/config | 5 +- Cargo.lock | 258 +++++++++++++++++- TODO.md | 22 +- crates/librqbit/src/http_api.rs | 8 +- crates/librqbit/src/http_api_error.rs | 1 + crates/librqbit/src/session.rs | 34 ++- crates/librqbit/src/spawn_utils.rs | 7 +- crates/librqbit/src/torrent_state/live/mod.rs | 12 +- crates/librqbit/src/torrent_state/mod.rs | 22 +- crates/rqbit/Cargo.toml | 4 +- crates/rqbit/src/main.rs | 86 ++++-- 11 files changed, 395 insertions(+), 64 deletions(-) diff --git a/.cargo/config b/.cargo/config index 0aca9d9..1b76467 100644 --- a/.cargo/config +++ b/.cargo/config @@ -1,2 +1,5 @@ [target.arm-unknown-linux-gnueabihf] -rustflags = ["-l", "atomic"] \ No newline at end of file +rustflags = ["-l", "atomic"] + +[build] +rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index ff0e522..60c88b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,6 +80,28 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.74" @@ -317,6 +339,43 @@ dependencies = [ "libc", ] +[[package]] +name = "console-api" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -342,6 +401,34 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -371,7 +458,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown", + "hashbrown 0.14.2", "lock_api", "once_cell", "parking_lot_core", @@ -451,6 +538,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flate2" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -624,19 +721,38 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 2.1.0", "slab", "tokio", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "heck" version = "0.4.1" @@ -701,6 +817,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.27" @@ -739,6 +861,18 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -762,6 +896,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.1.0" @@ -769,7 +913,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.2", ] [[package]] @@ -787,6 +931,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.0" @@ -860,7 +1013,7 @@ dependencies = [ "futures", "hex 0.4.3", "http", - "itertools", + "itertools 0.12.0", "librqbit-bencode", "librqbit-buffers", "librqbit-clone-to-owned", @@ -917,7 +1070,7 @@ version = "3.0.0" dependencies = [ "anyhow", "hex 0.4.3", - "itertools", + "itertools 0.12.0", "librqbit-bencode", "librqbit-buffers", "librqbit-clone-to-owned", @@ -935,7 +1088,7 @@ dependencies = [ "directories", "futures", "hex 0.4.3", - "indexmap", + "indexmap 2.1.0", "leaky-bucket", "librqbit-bencode", "librqbit-clone-to-owned", @@ -1023,6 +1176,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -1061,6 +1220,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1279,7 +1448,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap", + "indexmap 2.1.0", ] [[package]] @@ -1335,6 +1504,38 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" +dependencies = [ + "anyhow", + "itertools 0.11.0", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" +dependencies = [ + "prost", +] + [[package]] name = "quote" version = "1.0.33" @@ -1507,6 +1708,7 @@ version = "3.3.0" dependencies = [ "anyhow", "clap", + "console-subscriber", "futures", "librqbit", "librqbit-dht", @@ -1888,9 +2090,20 @@ dependencies = [ "pin-project-lite", "socket2 0.5.5", "tokio-macros", + "tracing", "windows-sys", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -1948,6 +2161,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -1956,9 +2196,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/TODO.md b/TODO.md index d91f504..118a23c 100644 --- a/TODO.md +++ b/TODO.md @@ -5,23 +5,25 @@ - [x] tracing instead of logging. Debugging peers: RUST_LOG=[{peer=.*}]=debug test-log for tests - [x] reopen read only is bugged -- [ ] initializing/checking - - [ ] blocks the whole process. Need to break it up. On slower devices (rpi) just hangs for a good while - - [ ] checking torrents should be visible right away +- [x] initializing/checking + - [x] blocks the whole process. Need to break it up. On slower devices (rpi) just hangs for a good while + - [x] checking torrents should be visible right away - [ ] server persistence - [ ] it would be nice to restart the server and keep the state -- [ ] torrent actions - - [ ] pause/unpause - - [ ] remove including from disk +- [x] torrent actions + - [x] pause/unpause + - [x] remove including from disk - [ ] DHT - [ ] for torrents with a few seeds might be cool to re-query DHT once in a while - - [ ] it's sending many requests now way too fast, locks up Mac OS UI annoyingly + - [x] it's sending many requests now way too fast, locks up Mac OS UI annoyingly someday: - [ ] cancellation from the client-side for the lib (i.e. stop the torrent manager) refactor: -- [ ] where are peers stored -- [ ] http api pause/unpause etc -- [ ] when a live torrent fails writing to disk, it should transition to error state \ No newline at end of file +- [x] where are peers stored +- [x] http api pause/unpause etc +- [ ] when a live torrent fails writing to disk, it should transition to error state +- [ ] something is wrong when unpausing - can't finish. Recalculate needed/have from chunk tracker. +- [ ] silence this: WARN torrent{id=0}:external_peer_adder: librqbit::spawn_utils: finished with error: no longer live diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index f461384..0b112ba 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -447,12 +447,16 @@ impl ApiInternal { } fn api_torrent_action_forget(&self, idx: TorrentId) -> Result { - self.session.delete(idx, false)?; + self.session + .delete(idx, false) + .context("error forgetting torrent")?; Ok(Default::default()) } fn api_torrent_action_delete(&self, idx: TorrentId) -> Result { - self.session.delete(idx, true)?; + self.session + .delete(idx, true) + .context("error deleting torrent with files")?; Ok(Default::default()) } diff --git a/crates/librqbit/src/http_api_error.rs b/crates/librqbit/src/http_api_error.rs index 628eb34..46a04ae 100644 --- a/crates/librqbit/src/http_api_error.rs +++ b/crates/librqbit/src/http_api_error.rs @@ -19,6 +19,7 @@ impl ApiError { } } + #[allow(dead_code)] pub fn not_implemented(msg: &str) -> Self { Self { status: Some(StatusCode::INTERNAL_SERVER_ERROR), diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 143e8b0..6733eb9 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -19,7 +19,7 @@ use crate::{ dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, peer_connection::PeerConnectionOptions, spawn_utils::BlockingSpawner, - torrent_state::{ManagedTorrentBuilder, ManagedTorrentHandle}, + torrent_state::{ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState}, }; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; @@ -441,14 +441,34 @@ impl Session { .remove(&id) .with_context(|| format!("torrent with id {} did not exist", id))?; - if let Some(live) = removed.live() { - let _ = live.pause()?; - } + let paused = removed + .with_state_mut(|s| { + let paused = match s.take() { + ManagedTorrentState::Paused(p) => p, + ManagedTorrentState::Live(l) => l.pause()?, + _ => return Ok(None), + }; + Ok::<_, anyhow::Error>(Some(paused)) + }) + .context("error pausing torrent"); - if delete_files { - bail!("torrent deleted, but deleting files not implemented") + match (paused, delete_files) { + (Err(e), true) => Err(e).context("torrent deleted, but could not delete files"), + (Err(e), false) => { + warn!("could not delete torrent files: {:?}", e); + Ok(()) + } + (Ok(Some(paused)), true) => { + drop(paused.files); + for file in paused.filenames { + if let Err(e) = std::fs::remove_file(&file) { + warn!("could not delete file {:?}: {:?}", file, e); + } + } + Ok(()) + } + _ => Ok(()), } - Ok(()) } pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { diff --git a/crates/librqbit/src/spawn_utils.rs b/crates/librqbit/src/spawn_utils.rs index 3b54967..957a837 100644 --- a/crates/librqbit/src/spawn_utils.rs +++ b/crates/librqbit/src/spawn_utils.rs @@ -1,6 +1,7 @@ -use tracing::{debug, trace, Instrument}; +use tracing::{debug, trace, warn, Instrument}; pub fn spawn( + name: &str, span: tracing::Span, fut: impl std::future::Future> + Send + 'static, ) -> tokio::task::JoinHandle<()> { @@ -11,12 +12,12 @@ pub fn spawn( debug!("finished"); } Err(e) => { - debug!("finished with error: {:#}", e) + warn!("finished with error: {:#}", e) } } } .instrument(span.or_current()); - tokio::spawn(fut) + tokio::task::Builder::new().name(name).spawn(fut).unwrap() } #[derive(Clone, Copy, Debug)] diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index faa2955..48171b5 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -226,12 +226,14 @@ impl TorrentStateLive { for tracker in state.meta.trackers.iter() { state.spawn( + "tracker_monitor", error_span!(parent: state.meta.span.clone(), "tracker_monitor", url = tracker.to_string()), state.clone().task_single_tracker_monitor(tracker.clone()), ); } state.spawn( + "speed_estimator_updater", error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), { let state = Arc::downgrade(&state); @@ -258,6 +260,7 @@ impl TorrentStateLive { ); state.spawn( + "peer_adder", error_span!(parent: state.meta.span.clone(), "peer_adder"), state.clone().task_peer_adder(peer_queue_rx), ); @@ -266,15 +269,17 @@ impl TorrentStateLive { fn spawn( &self, + name: &str, span: tracing::Span, fut: impl std::future::Future> + Send + 'static, ) { let mut cancel_rx = self.cancel_rx.clone(); - spawn(span, async move { + spawn(name, span, async move { tokio::select! { r = fut => r, _ = cancel_rx.changed() => { - bail!("canceled") + debug!("task canceled"); + Ok(()) } } }); @@ -429,6 +434,7 @@ impl TorrentStateLive { let permit = state.peer_semaphore.acquire().await?; permit.forget(); state.spawn( + "manage_peer", error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()), state.clone().task_manage_peer(addr), ); @@ -568,6 +574,7 @@ impl TorrentStateLive { // We don't want to remember this task as there may be too many. self.spawn( + "transmit_haves", error_span!( parent: self.meta.span.clone(), "transmit_haves", @@ -830,6 +837,7 @@ impl PeerHandler { if let Some(dur) = backoff { self.state.clone().spawn( + "wait_for_peer", error_span!( parent: self.state.meta.span.clone(), "wait_for_peer", diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 793e8f3..502a2dd 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -3,6 +3,7 @@ pub mod live; pub mod paused; pub mod utils; +use std::collections::HashSet; use std::net::SocketAddr; use std::path::Path; use std::path::PathBuf; @@ -22,6 +23,7 @@ pub use live::*; use parking_lot::RwLock; use tokio_stream::StreamExt; +use tracing::debug; use tracing::error; use tracing::error_span; use url::Url; @@ -52,7 +54,7 @@ impl ManagedTorrentState { } } - fn take(&mut self) -> Self { + pub(crate) fn take(&mut self) -> Self { std::mem::replace(self, Self::None) } } @@ -74,7 +76,7 @@ pub struct ManagedTorrentInfo { pub info_hash: Id20, pub out_dir: PathBuf, pub spawner: BlockingSpawner, - pub trackers: Vec, + pub trackers: HashSet, pub peer_id: Id20, pub lengths: Lengths, pub span: tracing::Span, @@ -108,6 +110,10 @@ impl ManagedTorrent { f(&self.locked.read().state) } + pub(crate) fn with_state_mut(&self, f: impl FnOnce(&mut ManagedTorrentState) -> R) -> R { + f(&mut self.locked.write().state) + } + pub fn with_chunk_tracker(&self, f: impl FnOnce(&ChunkTracker) -> R) -> anyhow::Result { let g = self.locked.read(); match &g.state { @@ -167,14 +173,23 @@ impl ManagedTorrent { let init = init.clone(); let t = self.clone(); spawn( + "initialize_and_start", error_span!(parent: span.clone(), "initialize_and_start"), async move { match init.check().await { Ok(paused) => { + let mut g = t.locked.write(); + if let ManagedTorrentState::Initializing(_) = &g.state { + } else { + debug!("no need to start torrent anymore, as it switched state from initilizing"); + return Ok(()); + } + let live = TorrentStateLive::new(paused); - t.locked.write().state = ManagedTorrentState::Live(live.clone()); + g.state = ManagedTorrentState::Live(live.clone()); spawn( + "external_peer_adder", error_span!(parent: span.clone(), "external_peer_adder"), peer_adder(Arc::downgrade(&live)), ); @@ -196,6 +211,7 @@ impl ManagedTorrent { let live = TorrentStateLive::new(paused); g.state = ManagedTorrentState::Live(live.clone()); spawn( + "external_peer_adder", error_span!(parent: span.clone(), "external_peer_adder"), peer_adder(Arc::downgrade(&live)), ); diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index 18efde5..4e73d12 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -13,6 +13,7 @@ readme = "README.md" [features] default = ["sha1-system", "default-tls", "webui"] +tokio-console = ["console-subscriber"] webui = ["librqbit/webui"] timed_existence = ["librqbit/timed_existence"] sha1-system = ["librqbit/sha1-system"] @@ -24,7 +25,8 @@ rust-tls = ["librqbit/rust-tls"] [dependencies] librqbit = {path="../librqbit", default-features=false, version = "3.3.0"} dht = {path="../dht", package="librqbit-dht", version="3.1.0"} -tokio = {version = "1", features = ["macros", "rt-multi-thread"]} +tokio = {version = "1", features = ["macros", "rt-multi-thread", "tracing"]} +console-subscriber = {version = "0.2", optional = true} anyhow = "1" clap = {version = "4", features = ["derive", "deprecated"]} tracing = "0.1" diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index be85289..87becb3 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -14,7 +14,7 @@ use librqbit::{ torrent_state::ManagedTorrentState, }; use size_format::SizeFormatterBinary as SF; -use tracing::{error, info, span, warn, Level}; +use tracing::{error, error_span, info, trace_span, warn}; #[derive(Debug, Clone, Copy, ValueEnum)] enum LogLevel { @@ -135,30 +135,58 @@ enum SubCommand { } fn init_logging(opts: &Opts) { - if std::env::var_os("RUST_LOG").is_none() { - match opts.log_level.as_ref() { - Some(level) => { - let level_str = match level { - LogLevel::Trace => "trace", - LogLevel::Debug => "debug", - LogLevel::Info => "info", - LogLevel::Warn => "warn", - LogLevel::Error => "error", - }; - std::env::set_var("RUST_LOG", level_str); - } - None => { - std::env::set_var("RUST_LOG", "info"); - } - }; - } + let default_rust_log = match opts.log_level.as_ref() { + Some(level) => match level { + LogLevel::Trace => "trace", + LogLevel::Debug => "debug", + LogLevel::Info => "info", + LogLevel::Warn => "warn", + LogLevel::Error => "error", + }, + None => "info", + }; + let stderr_filter = match std::env::var("RUST_LOG").ok() { + Some(rust_log) => EnvFilter::builder() + .parse(&rust_log) + .expect("can't parse RUST_LOG"), + None => EnvFilter::builder() + .parse(default_rust_log) + .expect("can't parse default_rust_log"), + }; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; - tracing_subscriber::registry() - .with(fmt::layer()) - .with(EnvFilter::from_default_env()) - .init(); + #[cfg(feature = "tokio-console")] + { + let (console_layer, server) = console_subscriber::Builder::default() + .with_default_env() + .build(); + + tracing_subscriber::registry() + .with(fmt::layer().with_filter(stderr_filter)) + .with(console_layer) + .init(); + + spawn( + "console_subscriber server", + error_span!("console_subscriber server"), + async move { + server + .serve() + .await + .map_err(|e| anyhow::anyhow!("{:#?}", e)) + .context("error running console subscriber server") + }, + ); + } + + #[cfg(not(feature = "tokio-console"))] + { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(stderr_filter) + .init(); + } } fn _start_deadlock_detector_thread() { @@ -188,9 +216,6 @@ fn _start_deadlock_detector_thread() { fn main() -> anyhow::Result<()> { let opts = Opts::parse(); - init_logging(&opts); - // start_deadlock_detector_thread(); - let (mut rt_builder, spawner) = match opts.single_thread_runtime { true => ( tokio::runtime::Builder::new_current_thread(), @@ -223,6 +248,8 @@ fn main() -> anyhow::Result<()> { } async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> { + init_logging(&opts); + let sopts = SessionOptions { disable_dht: opts.disable_dht, disable_dht_persistence: opts.disable_dht_persistence, @@ -300,7 +327,8 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> .context("error initializing rqbit session")?, ); spawn( - span!(Level::TRACE, "stats_printer"), + "stats_printer", + trace_span!("stats_printer"), stats_printer(session.clone()), ); let http_api = HttpApi::new(session); @@ -379,13 +407,15 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> .context("error initializing rqbit session")?, ); spawn( - span!(Level::TRACE, "stats_printer"), + "stats_printer", + trace_span!("stats_printer"), stats_printer(session.clone()), ); let http_api = HttpApi::new(session.clone()); let http_api_listen_addr = opts.http_api_listen_addr; spawn( - span!(Level::ERROR, "http_api"), + "http_api", + error_span!("http_api"), http_api.clone().make_http_api_and_run(http_api_listen_addr), );