Continuing refactor
This commit is contained in:
parent
73e41ba7d5
commit
17b243921d
11 changed files with 395 additions and 64 deletions
|
|
@ -447,12 +447,16 @@ impl ApiInternal {
|
|||
}
|
||||
|
||||
fn api_torrent_action_forget(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
|
||||
self.session.delete(idx, false)?;
|
||||
self.session
|
||||
.delete(idx, false)
|
||||
.context("error forgetting torrent")?;
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
fn api_torrent_action_delete(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
|
||||
self.session.delete(idx, true)?;
|
||||
self.session
|
||||
.delete(idx, true)
|
||||
.context("error deleting torrent with files")?;
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ impl ApiError {
|
|||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn not_implemented(msg: &str) -> Self {
|
||||
Self {
|
||||
status: Some(StatusCode::INTERNAL_SERVER_ERROR),
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ use crate::{
|
|||
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
|
||||
peer_connection::PeerConnectionOptions,
|
||||
spawn_utils::BlockingSpawner,
|
||||
torrent_state::{ManagedTorrentBuilder, ManagedTorrentHandle},
|
||||
torrent_state::{ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState},
|
||||
};
|
||||
|
||||
pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
|
||||
|
|
@ -441,14 +441,34 @@ impl Session {
|
|||
.remove(&id)
|
||||
.with_context(|| format!("torrent with id {} did not exist", id))?;
|
||||
|
||||
if let Some(live) = removed.live() {
|
||||
let _ = live.pause()?;
|
||||
}
|
||||
let paused = removed
|
||||
.with_state_mut(|s| {
|
||||
let paused = match s.take() {
|
||||
ManagedTorrentState::Paused(p) => p,
|
||||
ManagedTorrentState::Live(l) => l.pause()?,
|
||||
_ => return Ok(None),
|
||||
};
|
||||
Ok::<_, anyhow::Error>(Some(paused))
|
||||
})
|
||||
.context("error pausing torrent");
|
||||
|
||||
if delete_files {
|
||||
bail!("torrent deleted, but deleting files not implemented")
|
||||
match (paused, delete_files) {
|
||||
(Err(e), true) => Err(e).context("torrent deleted, but could not delete files"),
|
||||
(Err(e), false) => {
|
||||
warn!("could not delete torrent files: {:?}", e);
|
||||
Ok(())
|
||||
}
|
||||
(Ok(Some(paused)), true) => {
|
||||
drop(paused.files);
|
||||
for file in paused.filenames {
|
||||
if let Err(e) = std::fs::remove_file(&file) {
|
||||
warn!("could not delete file {:?}: {:?}", file, e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use tracing::{debug, trace, Instrument};
|
||||
use tracing::{debug, trace, warn, Instrument};
|
||||
|
||||
pub fn spawn(
|
||||
name: &str,
|
||||
span: tracing::Span,
|
||||
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
|
|
@ -11,12 +12,12 @@ pub fn spawn(
|
|||
debug!("finished");
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("finished with error: {:#}", e)
|
||||
warn!("finished with error: {:#}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(span.or_current());
|
||||
tokio::spawn(fut)
|
||||
tokio::task::Builder::new().name(name).spawn(fut).unwrap()
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
|
|
|
|||
|
|
@ -226,12 +226,14 @@ impl TorrentStateLive {
|
|||
|
||||
for tracker in state.meta.trackers.iter() {
|
||||
state.spawn(
|
||||
"tracker_monitor",
|
||||
error_span!(parent: state.meta.span.clone(), "tracker_monitor", url = tracker.to_string()),
|
||||
state.clone().task_single_tracker_monitor(tracker.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
state.spawn(
|
||||
"speed_estimator_updater",
|
||||
error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"),
|
||||
{
|
||||
let state = Arc::downgrade(&state);
|
||||
|
|
@ -258,6 +260,7 @@ impl TorrentStateLive {
|
|||
);
|
||||
|
||||
state.spawn(
|
||||
"peer_adder",
|
||||
error_span!(parent: state.meta.span.clone(), "peer_adder"),
|
||||
state.clone().task_peer_adder(peer_queue_rx),
|
||||
);
|
||||
|
|
@ -266,15 +269,17 @@ impl TorrentStateLive {
|
|||
|
||||
fn spawn(
|
||||
&self,
|
||||
name: &str,
|
||||
span: tracing::Span,
|
||||
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||
) {
|
||||
let mut cancel_rx = self.cancel_rx.clone();
|
||||
spawn(span, async move {
|
||||
spawn(name, span, async move {
|
||||
tokio::select! {
|
||||
r = fut => r,
|
||||
_ = cancel_rx.changed() => {
|
||||
bail!("canceled")
|
||||
debug!("task canceled");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
@ -429,6 +434,7 @@ impl TorrentStateLive {
|
|||
let permit = state.peer_semaphore.acquire().await?;
|
||||
permit.forget();
|
||||
state.spawn(
|
||||
"manage_peer",
|
||||
error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()),
|
||||
state.clone().task_manage_peer(addr),
|
||||
);
|
||||
|
|
@ -568,6 +574,7 @@ impl TorrentStateLive {
|
|||
|
||||
// We don't want to remember this task as there may be too many.
|
||||
self.spawn(
|
||||
"transmit_haves",
|
||||
error_span!(
|
||||
parent: self.meta.span.clone(),
|
||||
"transmit_haves",
|
||||
|
|
@ -830,6 +837,7 @@ impl PeerHandler {
|
|||
|
||||
if let Some(dur) = backoff {
|
||||
self.state.clone().spawn(
|
||||
"wait_for_peer",
|
||||
error_span!(
|
||||
parent: self.state.meta.span.clone(),
|
||||
"wait_for_peer",
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ pub mod live;
|
|||
pub mod paused;
|
||||
pub mod utils;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
|
@ -22,6 +23,7 @@ pub use live::*;
|
|||
use parking_lot::RwLock;
|
||||
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
use tracing::error_span;
|
||||
use url::Url;
|
||||
|
|
@ -52,7 +54,7 @@ impl ManagedTorrentState {
|
|||
}
|
||||
}
|
||||
|
||||
fn take(&mut self) -> Self {
|
||||
pub(crate) fn take(&mut self) -> Self {
|
||||
std::mem::replace(self, Self::None)
|
||||
}
|
||||
}
|
||||
|
|
@ -74,7 +76,7 @@ pub struct ManagedTorrentInfo {
|
|||
pub info_hash: Id20,
|
||||
pub out_dir: PathBuf,
|
||||
pub spawner: BlockingSpawner,
|
||||
pub trackers: Vec<Url>,
|
||||
pub trackers: HashSet<Url>,
|
||||
pub peer_id: Id20,
|
||||
pub lengths: Lengths,
|
||||
pub span: tracing::Span,
|
||||
|
|
@ -108,6 +110,10 @@ impl ManagedTorrent {
|
|||
f(&self.locked.read().state)
|
||||
}
|
||||
|
||||
pub(crate) fn with_state_mut<R>(&self, f: impl FnOnce(&mut ManagedTorrentState) -> R) -> R {
|
||||
f(&mut self.locked.write().state)
|
||||
}
|
||||
|
||||
pub fn with_chunk_tracker<R>(&self, f: impl FnOnce(&ChunkTracker) -> R) -> anyhow::Result<R> {
|
||||
let g = self.locked.read();
|
||||
match &g.state {
|
||||
|
|
@ -167,14 +173,23 @@ impl ManagedTorrent {
|
|||
let init = init.clone();
|
||||
let t = self.clone();
|
||||
spawn(
|
||||
"initialize_and_start",
|
||||
error_span!(parent: span.clone(), "initialize_and_start"),
|
||||
async move {
|
||||
match init.check().await {
|
||||
Ok(paused) => {
|
||||
let mut g = t.locked.write();
|
||||
if let ManagedTorrentState::Initializing(_) = &g.state {
|
||||
} else {
|
||||
debug!("no need to start torrent anymore, as it switched state from initilizing");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let live = TorrentStateLive::new(paused);
|
||||
t.locked.write().state = ManagedTorrentState::Live(live.clone());
|
||||
g.state = ManagedTorrentState::Live(live.clone());
|
||||
|
||||
spawn(
|
||||
"external_peer_adder",
|
||||
error_span!(parent: span.clone(), "external_peer_adder"),
|
||||
peer_adder(Arc::downgrade(&live)),
|
||||
);
|
||||
|
|
@ -196,6 +211,7 @@ impl ManagedTorrent {
|
|||
let live = TorrentStateLive::new(paused);
|
||||
g.state = ManagedTorrentState::Live(live.clone());
|
||||
spawn(
|
||||
"external_peer_adder",
|
||||
error_span!(parent: span.clone(), "external_peer_adder"),
|
||||
peer_adder(Arc::downgrade(&live)),
|
||||
);
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ readme = "README.md"
|
|||
|
||||
[features]
|
||||
default = ["sha1-system", "default-tls", "webui"]
|
||||
tokio-console = ["console-subscriber"]
|
||||
webui = ["librqbit/webui"]
|
||||
timed_existence = ["librqbit/timed_existence"]
|
||||
sha1-system = ["librqbit/sha1-system"]
|
||||
|
|
@ -24,7 +25,8 @@ rust-tls = ["librqbit/rust-tls"]
|
|||
[dependencies]
|
||||
librqbit = {path="../librqbit", default-features=false, version = "3.3.0"}
|
||||
dht = {path="../dht", package="librqbit-dht", version="3.1.0"}
|
||||
tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
|
||||
tokio = {version = "1", features = ["macros", "rt-multi-thread", "tracing"]}
|
||||
console-subscriber = {version = "0.2", optional = true}
|
||||
anyhow = "1"
|
||||
clap = {version = "4", features = ["derive", "deprecated"]}
|
||||
tracing = "0.1"
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ use librqbit::{
|
|||
torrent_state::ManagedTorrentState,
|
||||
};
|
||||
use size_format::SizeFormatterBinary as SF;
|
||||
use tracing::{error, info, span, warn, Level};
|
||||
use tracing::{error, error_span, info, trace_span, warn};
|
||||
|
||||
#[derive(Debug, Clone, Copy, ValueEnum)]
|
||||
enum LogLevel {
|
||||
|
|
@ -135,30 +135,58 @@ enum SubCommand {
|
|||
}
|
||||
|
||||
fn init_logging(opts: &Opts) {
|
||||
if std::env::var_os("RUST_LOG").is_none() {
|
||||
match opts.log_level.as_ref() {
|
||||
Some(level) => {
|
||||
let level_str = match level {
|
||||
LogLevel::Trace => "trace",
|
||||
LogLevel::Debug => "debug",
|
||||
LogLevel::Info => "info",
|
||||
LogLevel::Warn => "warn",
|
||||
LogLevel::Error => "error",
|
||||
};
|
||||
std::env::set_var("RUST_LOG", level_str);
|
||||
}
|
||||
None => {
|
||||
std::env::set_var("RUST_LOG", "info");
|
||||
}
|
||||
};
|
||||
}
|
||||
let default_rust_log = match opts.log_level.as_ref() {
|
||||
Some(level) => match level {
|
||||
LogLevel::Trace => "trace",
|
||||
LogLevel::Debug => "debug",
|
||||
LogLevel::Info => "info",
|
||||
LogLevel::Warn => "warn",
|
||||
LogLevel::Error => "error",
|
||||
},
|
||||
None => "info",
|
||||
};
|
||||
let stderr_filter = match std::env::var("RUST_LOG").ok() {
|
||||
Some(rust_log) => EnvFilter::builder()
|
||||
.parse(&rust_log)
|
||||
.expect("can't parse RUST_LOG"),
|
||||
None => EnvFilter::builder()
|
||||
.parse(default_rust_log)
|
||||
.expect("can't parse default_rust_log"),
|
||||
};
|
||||
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(fmt::layer())
|
||||
.with(EnvFilter::from_default_env())
|
||||
.init();
|
||||
#[cfg(feature = "tokio-console")]
|
||||
{
|
||||
let (console_layer, server) = console_subscriber::Builder::default()
|
||||
.with_default_env()
|
||||
.build();
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(fmt::layer().with_filter(stderr_filter))
|
||||
.with(console_layer)
|
||||
.init();
|
||||
|
||||
spawn(
|
||||
"console_subscriber server",
|
||||
error_span!("console_subscriber server"),
|
||||
async move {
|
||||
server
|
||||
.serve()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{:#?}", e))
|
||||
.context("error running console subscriber server")
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tokio-console"))]
|
||||
{
|
||||
tracing_subscriber::registry()
|
||||
.with(fmt::layer())
|
||||
.with(stderr_filter)
|
||||
.init();
|
||||
}
|
||||
}
|
||||
|
||||
fn _start_deadlock_detector_thread() {
|
||||
|
|
@ -188,9 +216,6 @@ fn _start_deadlock_detector_thread() {
|
|||
fn main() -> anyhow::Result<()> {
|
||||
let opts = Opts::parse();
|
||||
|
||||
init_logging(&opts);
|
||||
// start_deadlock_detector_thread();
|
||||
|
||||
let (mut rt_builder, spawner) = match opts.single_thread_runtime {
|
||||
true => (
|
||||
tokio::runtime::Builder::new_current_thread(),
|
||||
|
|
@ -223,6 +248,8 @@ fn main() -> anyhow::Result<()> {
|
|||
}
|
||||
|
||||
async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> {
|
||||
init_logging(&opts);
|
||||
|
||||
let sopts = SessionOptions {
|
||||
disable_dht: opts.disable_dht,
|
||||
disable_dht_persistence: opts.disable_dht_persistence,
|
||||
|
|
@ -300,7 +327,8 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
|
|||
.context("error initializing rqbit session")?,
|
||||
);
|
||||
spawn(
|
||||
span!(Level::TRACE, "stats_printer"),
|
||||
"stats_printer",
|
||||
trace_span!("stats_printer"),
|
||||
stats_printer(session.clone()),
|
||||
);
|
||||
let http_api = HttpApi::new(session);
|
||||
|
|
@ -379,13 +407,15 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
|
|||
.context("error initializing rqbit session")?,
|
||||
);
|
||||
spawn(
|
||||
span!(Level::TRACE, "stats_printer"),
|
||||
"stats_printer",
|
||||
trace_span!("stats_printer"),
|
||||
stats_printer(session.clone()),
|
||||
);
|
||||
let http_api = HttpApi::new(session.clone());
|
||||
let http_api_listen_addr = opts.http_api_listen_addr;
|
||||
spawn(
|
||||
span!(Level::ERROR, "http_api"),
|
||||
"http_api",
|
||||
error_span!("http_api"),
|
||||
http_api.clone().make_http_api_and_run(http_api_listen_addr),
|
||||
);
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue