Completely change the CLI so that we have a server and a client.
This commit is contained in:
parent
b834bb20b3
commit
a8efcfdd26
9 changed files with 309 additions and 108 deletions
|
|
@ -27,6 +27,7 @@ tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
|
|||
tokio-stream = "0.1"
|
||||
serde = {version = "1", features=["derive"]}
|
||||
serde_json = "1"
|
||||
serde_urlencoded = "*"
|
||||
anyhow = "1"
|
||||
|
||||
regex = "1"
|
||||
|
|
|
|||
|
|
@ -134,7 +134,7 @@ impl ApiInternal {
|
|||
) -> anyhow::Result<usize> {
|
||||
let handle = self
|
||||
.session
|
||||
.add_torrent(url, opts)
|
||||
.add_torrent(&url, opts)
|
||||
.await
|
||||
.context("error adding torrent")?
|
||||
.context("expected session.add_torrent() to return a handle")?;
|
||||
|
|
@ -209,6 +209,13 @@ fn json_or_404<T: Serialize>(idx: usize, v: Option<T>) -> warp::reply::Response
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TorrentAddQueryParams {
|
||||
pub overwrite: Option<bool>,
|
||||
pub output_folder: Option<String>,
|
||||
pub only_files_regex: Option<String>,
|
||||
}
|
||||
|
||||
impl HttpApi {
|
||||
pub fn new(session: Arc<Session>) -> Self {
|
||||
Self {
|
||||
|
|
@ -235,7 +242,8 @@ impl HttpApi {
|
|||
// This is kind of not secure as it just reads any local file that it has access to,
|
||||
// or any URL, but whatever, ok for our purposes / thread model.
|
||||
"POST /torrents/": "Add a torrent here. magnet: or http:// or a local file."
|
||||
}
|
||||
},
|
||||
"server": "rqbit",
|
||||
});
|
||||
move || json_response(&api_list)
|
||||
});
|
||||
|
|
@ -264,11 +272,6 @@ impl HttpApi {
|
|||
move || json_response(inner.api_torrent_list())
|
||||
});
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct TorrentAddQueryParams {
|
||||
overwrite: Option<bool>,
|
||||
}
|
||||
|
||||
let torrent_add = warp::post()
|
||||
.and(warp::path("torrents"))
|
||||
.and(warp::body::bytes())
|
||||
|
|
@ -293,6 +296,8 @@ impl HttpApi {
|
|||
};
|
||||
let opts = AddTorrentOptions {
|
||||
overwrite: params.overwrite.unwrap_or(false),
|
||||
only_files_regex: params.only_files_regex,
|
||||
output_folder: params.output_folder,
|
||||
..Default::default()
|
||||
};
|
||||
let idx = inner
|
||||
|
|
|
|||
92
crates/librqbit/src/http_api_client.rs
Normal file
92
crates/librqbit/src/http_api_client.rs
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
use anyhow::Context;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::{http_api::TorrentAddQueryParams, session::AddTorrentOptions};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HttpApiClient {
|
||||
client: reqwest::Client,
|
||||
base_url: reqwest::Url,
|
||||
}
|
||||
|
||||
async fn check_response(r: reqwest::Response) -> anyhow::Result<reqwest::Response> {
|
||||
if r.status().is_success() {
|
||||
return Ok(r);
|
||||
}
|
||||
let status = r.status();
|
||||
let url = r.url().clone();
|
||||
let body = r.text().await.with_context(|| {
|
||||
format!(
|
||||
"cannot read response body for request to {} ({})",
|
||||
url, status,
|
||||
)
|
||||
})?;
|
||||
anyhow::bail!("{} -> {}: {}", url, status, body)
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct ApiRoot {
|
||||
server: String,
|
||||
}
|
||||
|
||||
async fn json_response<T: serde::de::DeserializeOwned + std::any::Any>(
|
||||
url: &reqwest::Url,
|
||||
response: reqwest::Response,
|
||||
) -> anyhow::Result<T> {
|
||||
let response = check_response(response).await?;
|
||||
let body = response.bytes().await?;
|
||||
let response: T = serde_json::from_slice(&body).with_context(|| {
|
||||
format!(
|
||||
"error deserializing response from {:?} as {:?}",
|
||||
url,
|
||||
std::any::type_name::<T>(),
|
||||
)
|
||||
})?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
impl HttpApiClient {
|
||||
pub fn new(url: &str) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
base_url: reqwest::Url::parse(url)?,
|
||||
client: reqwest::ClientBuilder::new().build()?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn base_url(&self) -> &reqwest::Url {
|
||||
&self.base_url
|
||||
}
|
||||
|
||||
pub async fn validate_rqbit_server(&self) -> anyhow::Result<()> {
|
||||
let response = self.client.get(self.base_url.clone()).send().await?;
|
||||
let root: ApiRoot = json_response(&self.base_url, response).await?;
|
||||
if root.server == "rqbit" {
|
||||
return Ok(());
|
||||
}
|
||||
anyhow::bail!("not an rqbit server at {}", &self.base_url)
|
||||
}
|
||||
|
||||
pub async fn add_torrent(
|
||||
&self,
|
||||
torrent: &str,
|
||||
opts: Option<AddTorrentOptions>,
|
||||
) -> anyhow::Result<usize> {
|
||||
let opts = opts.unwrap_or_default();
|
||||
let params = TorrentAddQueryParams {
|
||||
overwrite: Some(opts.overwrite),
|
||||
only_files_regex: opts.only_files_regex,
|
||||
output_folder: opts.output_folder,
|
||||
};
|
||||
let qs = serde_urlencoded::to_string(¶ms).unwrap();
|
||||
let url = format!("{}torrents?{}", &self.base_url, qs);
|
||||
let response = check_response(
|
||||
self.client
|
||||
.post(&url)
|
||||
.body(torrent.to_owned())
|
||||
.send()
|
||||
.await?,
|
||||
)
|
||||
.await?;
|
||||
Ok(response.text().await?.parse::<usize>()?)
|
||||
}
|
||||
}
|
||||
|
|
@ -2,6 +2,7 @@ pub mod chunk_tracker;
|
|||
pub mod dht_utils;
|
||||
pub mod file_ops;
|
||||
pub mod http_api;
|
||||
pub mod http_api_client;
|
||||
pub mod peer_connection;
|
||||
pub mod peer_handler;
|
||||
pub mod peer_info_reader;
|
||||
|
|
|
|||
|
|
@ -115,7 +115,7 @@ fn compute_only_files<ByteBuf: AsRef<[u8]>>(
|
|||
Ok(only_files)
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Clone)]
|
||||
pub struct AddTorrentOptions {
|
||||
pub only_files_regex: Option<String>,
|
||||
pub overwrite: bool,
|
||||
|
|
@ -177,7 +177,7 @@ impl Session {
|
|||
}
|
||||
pub async fn add_torrent(
|
||||
&self,
|
||||
url: String,
|
||||
url: &str,
|
||||
opts: Option<AddTorrentOptions>,
|
||||
) -> anyhow::Result<Option<TorrentManagerHandle>> {
|
||||
// Magnet links are different in that we first need to discover the metadata.
|
||||
|
|
@ -186,7 +186,7 @@ impl Session {
|
|||
let Magnet {
|
||||
info_hash,
|
||||
trackers,
|
||||
} = Magnet::parse(&url).context("provided path is not a valid magnet URL")?;
|
||||
} = Magnet::parse(url).context("provided path is not a valid magnet URL")?;
|
||||
|
||||
let dht_rx = self
|
||||
.dht
|
||||
|
|
@ -230,9 +230,9 @@ impl Session {
|
|||
.await
|
||||
} else {
|
||||
let torrent = if url.starts_with("http://") || url.starts_with("https://") {
|
||||
torrent_from_url(&url).await?
|
||||
torrent_from_url(url).await?
|
||||
} else {
|
||||
torrent_from_file(&url)?
|
||||
torrent_from_file(url)?
|
||||
};
|
||||
let dht_rx = match self.dht.as_ref() {
|
||||
Some(dht) => Some(dht.get_peers(torrent.info_hash).await?),
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "rqbit"
|
||||
description = "A bittorent client"
|
||||
version = "1.1.2"
|
||||
version = "2.0.0"
|
||||
authors = ["Igor Katson <igor.katson@gmail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
|
|
@ -18,12 +18,15 @@ librqbit = {path="../librqbit", default-features=false}
|
|||
dht = {path="../dht"}
|
||||
tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
|
||||
anyhow = "1"
|
||||
clap = "3.0.0-beta.2"
|
||||
clap = "3.0.0-beta.5"
|
||||
log = "0.4"
|
||||
pretty_env_logger = "0.4"
|
||||
regex = "1"
|
||||
futures = "0.3"
|
||||
parse_duration = "2"
|
||||
reqwest = "*"
|
||||
serde = {version = "1", features=["derive"]}
|
||||
serde_json = "1"
|
||||
size_format = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
|||
|
|
@ -1,17 +1,18 @@
|
|||
use std::{net::SocketAddr, str::FromStr, sync::Arc, time::Duration};
|
||||
use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc, time::Duration};
|
||||
|
||||
use anyhow::Context;
|
||||
use clap::{ArgEnum, Clap};
|
||||
use clap::{ArgEnum, Parser};
|
||||
use librqbit::{
|
||||
http_api::HttpApi,
|
||||
http_api_client,
|
||||
peer_connection::PeerConnectionOptions,
|
||||
session::{AddTorrentOptions, ManagedTorrentState, Session, SessionOptions},
|
||||
spawn_utils::{spawn, BlockingSpawner},
|
||||
};
|
||||
use log::info;
|
||||
use log::{error, info, warn};
|
||||
use size_format::SizeFormatterBinary as SF;
|
||||
|
||||
#[derive(Debug, ArgEnum)]
|
||||
#[derive(Debug, Clone, Copy, ArgEnum)]
|
||||
enum LogLevel {
|
||||
Trace,
|
||||
Debug,
|
||||
|
|
@ -30,28 +31,9 @@ impl FromStr for ParsedDuration {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clap)]
|
||||
#[derive(Parser)]
|
||||
#[clap(version, author, about)]
|
||||
struct Opts {
|
||||
/// The filename or URL of the torrent. If URL, http/https/magnet are supported.
|
||||
torrent_path: String,
|
||||
|
||||
/// The output folder to write to. If not exists, it will be created.
|
||||
output_folder: String,
|
||||
|
||||
/// If set, only the file whose filename matching this regex will
|
||||
/// be downloaded
|
||||
#[clap(short = 'r', long = "filename-re")]
|
||||
only_files_matching_regex: Option<String>,
|
||||
|
||||
/// Set if you are ok to write on top of existing files
|
||||
#[clap(long)]
|
||||
overwrite: bool,
|
||||
|
||||
/// Only list the torrent metadata contents, don't do anything else.
|
||||
#[clap(short, long)]
|
||||
list: bool,
|
||||
|
||||
/// The loglevel
|
||||
#[clap(arg_enum, short = 'v')]
|
||||
log_level: Option<LogLevel>,
|
||||
|
|
@ -88,6 +70,58 @@ struct Opts {
|
|||
/// How many threads to spawn for the executor.
|
||||
#[clap(short = 't', long)]
|
||||
worker_threads: Option<usize>,
|
||||
|
||||
#[clap(subcommand)]
|
||||
subcommand: SubCommand,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
struct ServerStartOptions {
|
||||
/// The output folder to write to. If not exists, it will be created.
|
||||
output_folder: String,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
struct ServerOpts {
|
||||
#[clap(subcommand)]
|
||||
subcommand: ServerSubcommand,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
enum ServerSubcommand {
|
||||
Start(ServerStartOptions),
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
struct DownloadOpts {
|
||||
/// The filename or URL of the torrent. If URL, http/https/magnet are supported.
|
||||
torrent_path: Vec<String>,
|
||||
|
||||
/// The output folder to write to. If not exists, it will be created.
|
||||
#[clap(short = 'o', long)]
|
||||
output_folder: Option<String>,
|
||||
|
||||
/// If set, only the file whose filename matching this regex will
|
||||
/// be downloaded
|
||||
#[clap(short = 'r', long = "filename-re")]
|
||||
only_files_matching_regex: Option<String>,
|
||||
|
||||
/// Only list the torrent metadata contents, don't do anything else.
|
||||
#[clap(short, long)]
|
||||
list: bool,
|
||||
|
||||
/// Set if you are ok to write on top of existing files
|
||||
#[clap(long)]
|
||||
overwrite: bool,
|
||||
}
|
||||
|
||||
// server start
|
||||
// download [--connect-to-existing] --output-folder(required) [file1] [file2]
|
||||
|
||||
#[derive(Parser)]
|
||||
enum SubCommand {
|
||||
Server(ServerOpts),
|
||||
Download(DownloadOpts),
|
||||
}
|
||||
|
||||
fn init_logging(opts: &Opts) {
|
||||
|
|
@ -159,46 +193,9 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
|
|||
}),
|
||||
};
|
||||
|
||||
let session = Arc::new(
|
||||
Session::new_with_opts(opts.output_folder.into(), spawner, sopts)
|
||||
.await
|
||||
.context("error initializing rqbit session")?,
|
||||
);
|
||||
|
||||
let torrent_opts = AddTorrentOptions {
|
||||
only_files_regex: opts.only_files_matching_regex,
|
||||
overwrite: opts.overwrite,
|
||||
list_only: opts.list,
|
||||
force_tracker_interval: opts.force_tracker_interval.map(|d| d.0),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let http_api = {
|
||||
let http_api = HttpApi::new(session.clone());
|
||||
spawn("HTTP API", {
|
||||
let http_api_listen_addr = opts.http_api_listen_addr;
|
||||
let http_api = http_api.clone();
|
||||
async move { http_api.make_http_api_and_run(http_api_listen_addr).await }
|
||||
});
|
||||
http_api
|
||||
};
|
||||
|
||||
let handle = match session
|
||||
.add_torrent(opts.torrent_path, Some(torrent_opts))
|
||||
.await
|
||||
.context("error adding torrent to session")?
|
||||
{
|
||||
Some(handle) => handle,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
http_api.add_mgr(handle.clone());
|
||||
|
||||
spawn("Stats printer", {
|
||||
let session = session.clone();
|
||||
async move {
|
||||
loop {
|
||||
session.with_torrents(|torrents| {
|
||||
let stats_printer = |session: Arc<Session>| async move {
|
||||
loop {
|
||||
session.with_torrents(|torrents| {
|
||||
for (idx, torrent) in torrents.iter().enumerate() {
|
||||
match &torrent.state {
|
||||
ManagedTorrentState::Initializing => {
|
||||
|
|
@ -234,14 +231,114 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
|
|||
}
|
||||
}
|
||||
});
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
};
|
||||
|
||||
match &opts.subcommand {
|
||||
SubCommand::Server(server_opts) => match &server_opts.subcommand {
|
||||
ServerSubcommand::Start(start_opts) => {
|
||||
let session = Arc::new(
|
||||
Session::new_with_opts(
|
||||
PathBuf::from(&start_opts.output_folder),
|
||||
spawner,
|
||||
sopts,
|
||||
)
|
||||
.await
|
||||
.context("error initializing rqbit session")?,
|
||||
);
|
||||
spawn("Stats printer", stats_printer(session.clone()));
|
||||
let http_api = HttpApi::new(session);
|
||||
let http_api_listen_addr = opts.http_api_listen_addr;
|
||||
http_api.make_http_api_and_run(http_api_listen_addr).await
|
||||
}
|
||||
},
|
||||
SubCommand::Download(download_opts) => {
|
||||
if download_opts.torrent_path.is_empty() {
|
||||
anyhow::bail!("you must provide at least one URL to download")
|
||||
}
|
||||
let http_api_url = format!("http://{}", opts.http_api_listen_addr);
|
||||
let client = http_api_client::HttpApiClient::new(&http_api_url)?;
|
||||
let torrent_opts = AddTorrentOptions {
|
||||
only_files_regex: download_opts.only_files_matching_regex.clone(),
|
||||
overwrite: download_opts.overwrite,
|
||||
list_only: download_opts.list,
|
||||
force_tracker_interval: opts.force_tracker_interval.map(|d| d.0),
|
||||
output_folder: download_opts.output_folder.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
let connect_to_existing = match client.validate_rqbit_server().await {
|
||||
Ok(_) => {
|
||||
info!("Connected to HTTP API at {}, will call it instead of downloading within this process", client.base_url());
|
||||
true
|
||||
}
|
||||
Err(err) => {
|
||||
info!(
|
||||
"HTTP API at {} returned {:?}, will start the server within this process",
|
||||
client.base_url(),
|
||||
err
|
||||
);
|
||||
false
|
||||
}
|
||||
};
|
||||
if connect_to_existing {
|
||||
for torrent_url in &download_opts.torrent_path {
|
||||
match client.add_torrent(torrent_url, Some(torrent_opts.clone())).await {
|
||||
Ok(id) => info!("{} added to the server with index {}. Query {}/torrents/{}/(stats/haves) for details", torrent_url, id, http_api_url, id),
|
||||
Err(err) => warn!("error adding {}: {:?}", torrent_url, err),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
let session = Arc::new(
|
||||
Session::new_with_opts(
|
||||
download_opts
|
||||
.output_folder
|
||||
.as_ref()
|
||||
.map(PathBuf::from)
|
||||
.context(
|
||||
"output_folder is required if can't connect to an existing server",
|
||||
)?,
|
||||
spawner,
|
||||
sopts,
|
||||
)
|
||||
.await
|
||||
.context("error initializing rqbit session")?,
|
||||
);
|
||||
spawn("Stats printer", stats_printer(session.clone()));
|
||||
let http_api = HttpApi::new(session.clone());
|
||||
let http_api_listen_addr = opts.http_api_listen_addr;
|
||||
spawn(
|
||||
"HTTP API",
|
||||
http_api.clone().make_http_api_and_run(http_api_listen_addr),
|
||||
);
|
||||
|
||||
let mut added = false;
|
||||
|
||||
for path in &download_opts.torrent_path {
|
||||
let handle = match session.add_torrent(path, Some(torrent_opts.clone())).await {
|
||||
Ok(Some(handle)) => {
|
||||
added = true;
|
||||
handle
|
||||
}
|
||||
Ok(None) => continue,
|
||||
Err(err) => {
|
||||
error!("error adding {:?}: {:?}", &path, err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
http_api.add_mgr(handle.clone());
|
||||
}
|
||||
|
||||
if added {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("no torrents were added")
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
handle
|
||||
.wait_until_completed()
|
||||
.await
|
||||
.context("error waiting for torrent completion")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue