Enhance the HTTP API with more detail.

This commit is contained in:
Igor Katson 2021-10-23 09:37:37 +01:00
parent 0d41b1e689
commit 967a06a196
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
5 changed files with 205 additions and 86 deletions

View file

@ -1,5 +1,9 @@
use anyhow::Context;
use buffers::ByteString;
use dht::{Dht, DhtStats};
use librqbit_core::id20::Id20;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use log::warn;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
@ -9,7 +13,7 @@ use warp::hyper::body::Bytes;
use warp::hyper::Body;
use warp::Filter;
use crate::session::{AddTorrentOptions, Session};
use crate::session::{AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session};
use crate::torrent_manager::TorrentManagerHandle;
use crate::torrent_state::StatsSnapshot;
@ -70,16 +74,17 @@ struct TorrentListResponse {
torrents: Vec<TorrentListResponseItem>,
}
#[derive(Serialize)]
struct TorrentDetailsResponseFile {
name: Option<String>,
length: u64,
#[derive(Serialize, Deserialize)]
pub struct TorrentDetailsResponseFile {
pub name: String,
pub length: u64,
pub included: bool,
}
#[derive(Serialize)]
struct TorrentDetailsResponse {
info_hash: String,
files: Vec<TorrentDetailsResponseFile>,
#[derive(Serialize, Deserialize)]
pub struct TorrentDetailsResponse {
pub info_hash: String,
pub files: Vec<TorrentDetailsResponseFile>,
}
#[derive(Serialize)]
@ -91,6 +96,43 @@ struct StatsResponse {
time_remaining: Option<Duration>,
}
#[derive(Serialize, Deserialize)]
pub struct ApiAddTorrentResponse {
pub id: Option<usize>,
pub details: TorrentDetailsResponse,
}
fn make_torrent_details(
info_hash: &Id20,
info: &TorrentMetaV1Info<ByteString>,
only_files: Option<&[usize]>,
) -> TorrentDetailsResponse {
let files = info
.iter_filenames_and_lengths()
.unwrap()
.enumerate()
.map(|(idx, (filename_it, length))| {
let name = match filename_it.to_string() {
Ok(s) => s,
Err(err) => {
warn!("error reading filename: {:?}", err);
"<INVALID NAME>".to_string()
}
};
let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true);
TorrentDetailsResponseFile {
name,
length,
included,
}
})
.collect();
TorrentDetailsResponse {
info_hash: info_hash.as_string(),
files,
}
}
impl ApiInternal {
fn mgr_handle(&self, idx: usize) -> Option<TorrentManagerHandle> {
self.torrent_managers.read().get(idx).cloned()
@ -113,32 +155,53 @@ impl ApiInternal {
fn api_torrent_details(&self, idx: usize) -> Option<TorrentDetailsResponse> {
let handle = self.mgr_handle(idx)?;
let info_hash = handle.torrent_state().info_hash().as_string();
let files = handle
.torrent_state()
.info()
.iter_filenames_and_lengths()
.unwrap()
.map(|(filename_it, length)| {
let name = filename_it.to_string().ok();
TorrentDetailsResponseFile { name, length }
})
.collect();
Some(TorrentDetailsResponse { info_hash, files })
let info_hash = handle.torrent_state().info_hash();
let only_files = handle.only_files();
Some(make_torrent_details(
&info_hash,
handle.torrent_state().info(),
only_files,
))
}
async fn api_add_torrent(
&self,
url: String,
opts: Option<AddTorrentOptions>,
) -> anyhow::Result<usize> {
let handle = self
) -> anyhow::Result<ApiAddTorrentResponse> {
let response = match self
.session
.add_torrent(&url, opts)
.await
.context("error adding torrent")?
.context("expected session.add_torrent() to return a handle")?;
Ok(self.add_mgr(handle))
{
AddTorrentResponse::AlreadyManaged(managed) => anyhow::bail!(
"{:?} is already managed, downloaded to {:?}",
managed.info_hash,
managed.output_folder
),
AddTorrentResponse::ListOnly(ListOnlyResponse {
info_hash,
info,
only_files,
}) => ApiAddTorrentResponse {
id: None,
details: make_torrent_details(&info_hash, &info, only_files.as_deref()),
},
AddTorrentResponse::Added(handle) => {
let details = make_torrent_details(
&handle.torrent_state().info_hash(),
handle.torrent_state().info(),
handle.only_files(),
);
let id = self.add_mgr(handle);
ApiAddTorrentResponse {
id: Some(id),
details,
}
}
};
Ok(response)
}
fn api_dht_stats(&self) -> Option<DhtStats> {
@ -214,6 +277,7 @@ pub struct TorrentAddQueryParams {
pub overwrite: Option<bool>,
pub output_folder: Option<String>,
pub only_files_regex: Option<String>,
pub list_only: Option<bool>,
}
impl HttpApi {
@ -279,7 +343,7 @@ impl HttpApi {
.and_then({
let inner = inner.clone();
use warp::http::Response;
fn make_response(status: u16, body: String) -> Response<String> {
fn make_response<T>(status: u16, body: T) -> Response<T> {
Response::builder().status(status).body(body).unwrap()
}
move |body: Bytes, params: TorrentAddQueryParams| {
@ -298,17 +362,16 @@ impl HttpApi {
overwrite: params.overwrite.unwrap_or(false),
only_files_regex: params.only_files_regex,
output_folder: params.output_folder,
list_only: params.list_only.unwrap_or(false),
..Default::default()
};
let idx = inner
match inner
.api_add_torrent(url, Some(opts))
.await
.context("error calling HttpApi::api_add_torrent");
match idx {
Ok(idx) => {
return Ok(make_response(200, format!("{}", idx)));
}
Err(e) => return Ok(make_response(400, format!("{:#}", e))),
.context("error calling HttpApi::api_add_torrent")
{
Ok(response) => Ok(json_response(response)),
Err(err) => Ok(make_response(400, format!("{:#?}", err).into())),
}
}
}

View file

@ -30,9 +30,9 @@ struct ApiRoot {
}
async fn json_response<T: serde::de::DeserializeOwned + std::any::Any>(
url: &reqwest::Url,
response: reqwest::Response,
) -> anyhow::Result<T> {
let url = response.url().clone();
let response = check_response(response).await?;
let body = response.bytes().await?;
let response: T = serde_json::from_slice(&body).with_context(|| {
@ -59,7 +59,7 @@ impl HttpApiClient {
pub async fn validate_rqbit_server(&self) -> anyhow::Result<()> {
let response = self.client.get(self.base_url.clone()).send().await?;
let root: ApiRoot = json_response(&self.base_url, response).await?;
let root: ApiRoot = json_response(response).await?;
if root.server == "rqbit" {
return Ok(());
}
@ -70,12 +70,13 @@ impl HttpApiClient {
&self,
torrent: &str,
opts: Option<AddTorrentOptions>,
) -> anyhow::Result<usize> {
) -> anyhow::Result<crate::http_api::ApiAddTorrentResponse> {
let opts = opts.unwrap_or_default();
let params = TorrentAddQueryParams {
overwrite: Some(opts.overwrite),
only_files_regex: opts.only_files_regex,
output_folder: opts.output_folder,
list_only: Some(opts.list_only),
};
let qs = serde_urlencoded::to_string(&params).unwrap();
let url = format!("{}torrents?{}", &self.base_url, qs);
@ -87,6 +88,6 @@ impl HttpApiClient {
.await?,
)
.await?;
Ok(response.text().await?.parse::<usize>()?)
json_response(response).await
}
}

View file

@ -8,13 +8,11 @@ use librqbit_core::{
peer_id::generate_peer_id,
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned},
};
use log::{info, warn};
use log::{debug, info, warn};
use parking_lot::RwLock;
use reqwest::Url;
use tokio_stream::StreamExt;
use size_format::SizeFormatterBinary as SF;
use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
peer_connection::PeerConnectionOptions,
@ -46,14 +44,19 @@ pub struct SessionLocked {
torrents: Vec<ManagedTorrent>,
}
enum SessionLockedAddTorrentResult {
AlreadyManaged(ManagedTorrent),
Added(usize),
}
impl SessionLocked {
fn add_torrent(&mut self, torrent: ManagedTorrent) -> Option<usize> {
if self.torrents.contains(&torrent) {
return None;
fn add_torrent(&mut self, torrent: ManagedTorrent) -> SessionLockedAddTorrentResult {
if let Some(handle) = self.torrents.iter().find(|t| **t == torrent) {
return SessionLockedAddTorrentResult::AlreadyManaged(handle.clone());
}
let idx = self.torrents.len();
self.torrents.push(torrent);
Some(idx)
SessionLockedAddTorrentResult::Added(idx)
}
}
@ -125,6 +128,18 @@ pub struct AddTorrentOptions {
pub force_tracker_interval: Option<Duration>,
}
pub struct ListOnlyResponse {
pub info_hash: Id20,
pub info: TorrentMetaV1Info<ByteString>,
pub only_files: Option<Vec<usize>>,
}
pub enum AddTorrentResponse {
AlreadyManaged(ManagedTorrent),
ListOnly(ListOnlyResponse),
Added(TorrentManagerHandle),
}
#[derive(Default)]
pub struct SessionOptions {
pub disable_dht: bool,
@ -179,7 +194,7 @@ impl Session {
&self,
url: &str,
opts: Option<AddTorrentOptions>,
) -> anyhow::Result<Option<TorrentManagerHandle>> {
) -> anyhow::Result<AddTorrentResponse> {
// Magnet links are different in that we first need to discover the metadata.
let opts = opts.unwrap_or_default();
if url.starts_with("magnet:") {
@ -278,15 +293,17 @@ impl Session {
initial_peers: Vec<SocketAddr>,
trackers: Vec<reqwest::Url>,
opts: AddTorrentOptions,
) -> anyhow::Result<Option<TorrentManagerHandle>> {
info!("Torrent info: {:#?}", &info);
) -> anyhow::Result<AddTorrentResponse> {
debug!("Torrent info: {:#?}", &info);
let only_files = if let Some(filename_re) = opts.only_files_regex {
let only_files = compute_only_files(&info, &filename_re)?;
for (idx, (filename, _)) in info.iter_filenames_and_lengths()?.enumerate() {
if !only_files.contains(&idx) {
continue;
}
info!("Will download {:?}", filename);
if !opts.list_only {
info!("Will download {:?}", filename);
}
}
Some(only_files)
} else {
@ -294,20 +311,11 @@ impl Session {
};
if opts.list_only {
for (idx, (filename, len)) in info.iter_filenames_and_lengths()?.enumerate() {
let included = match &only_files {
Some(files) => files.contains(&idx),
None => true,
};
info!(
"File {}, size {}{}",
filename.to_string()?,
SF::new(len),
if included { "" } else { "will skip" }
)
}
info!("--list was passed, nothing to do, exiting.");
return Ok(None);
return Ok(AddTorrentResponse::ListOnly(ListOnlyResponse {
info_hash,
info,
only_files,
}));
}
let output_folder = opts
@ -321,13 +329,12 @@ impl Session {
state: ManagedTorrentState::Initializing,
};
if self.locked.write().add_torrent(managed_torrent).is_none() {
anyhow::bail!(
"torrent with info_hash {:?} that is downloaded to {:?} is already managed",
info_hash,
&output_folder
);
};
match self.locked.write().add_torrent(managed_torrent) {
SessionLockedAddTorrentResult::AlreadyManaged(managed) => {
return Ok(AddTorrentResponse::AlreadyManaged(managed))
}
SessionLockedAddTorrentResult::Added(_) => {}
}
let mut builder = TorrentManagerBuilder::new(info, info_hash, output_folder.clone());
builder
@ -400,6 +407,6 @@ impl Session {
});
}
Ok(Some(handle))
Ok(AddTorrentResponse::Added(handle))
}
}

View file

@ -118,6 +118,9 @@ impl TorrentManagerHandle {
false
}
}
pub fn only_files(&self) -> Option<&[usize]> {
self.manager.options.only_files.as_deref()
}
pub fn add_peer(&self, addr: SocketAddr) -> bool {
self.manager.state.add_peer_if_not_seen(addr)
}
@ -234,7 +237,7 @@ impl TorrentManager {
name, length, err
);
} else {
info!(
debug!(
"Set length for file {:?} to {} in {:?}",
name,
SF::new(length),

View file

@ -3,10 +3,13 @@ use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc, time::Duratio
use anyhow::Context;
use clap::{ArgEnum, Parser};
use librqbit::{
http_api::HttpApi,
http_api::{ApiAddTorrentResponse, HttpApi},
http_api_client,
peer_connection::PeerConnectionOptions,
session::{AddTorrentOptions, ManagedTorrentState, Session, SessionOptions},
session::{
AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, ManagedTorrentState, Session,
SessionOptions,
},
spawn_utils::{spawn, BlockingSpawner},
};
use log::{error, info, warn};
@ -273,18 +276,29 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
true
}
Err(err) => {
info!(
"HTTP API at {} returned {:?}, will start the server within this process",
client.base_url(),
err
);
warn!("Error checking HTTP API at {}: {:}", client.base_url(), err);
false
}
};
if connect_to_existing {
for torrent_url in &download_opts.torrent_path {
match client.add_torrent(torrent_url, Some(torrent_opts.clone())).await {
Ok(id) => info!("{} added to the server with index {}. Query {}/torrents/{}/(stats/haves) for details", torrent_url, id, http_api_url, id),
match client
.add_torrent(torrent_url, Some(torrent_opts.clone()))
.await
{
Ok(ApiAddTorrentResponse { id, details }) => {
if let Some(id) = id {
info!("{} added to the server with index {}. Query {}/torrents/{}/(stats/haves) for details", details.info_hash, id, http_api_url, id)
}
for file in details.files {
info!(
"file {:?}, size {}{}",
file.name,
SF::new(file.length),
if file.included { "" } else { ", will skip" }
)
}
}
Err(err) => warn!("error adding {}: {:?}", torrent_url, err),
}
}
@ -317,11 +331,40 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
for path in &download_opts.torrent_path {
let handle = match session.add_torrent(path, Some(torrent_opts.clone())).await {
Ok(Some(handle)) => {
added = true;
handle
}
Ok(None) => continue,
Ok(v) => match v {
AddTorrentResponse::AlreadyManaged(handle) => {
info!(
"torrent {:?} is already managed, downloaded to {:?}",
handle.info_hash, handle.output_folder
);
continue;
}
AddTorrentResponse::ListOnly(ListOnlyResponse {
info_hash: _,
info,
only_files,
}) => {
for (idx, (filename, len)) in
info.iter_filenames_and_lengths()?.enumerate()
{
let included = match &only_files {
Some(files) => files.contains(&idx),
None => true,
};
info!(
"File {}, size {}{}",
filename.to_string()?,
SF::new(len),
if included { "" } else { ", will skip" }
)
}
continue;
}
AddTorrentResponse::Added(handle) => {
added = true;
handle
}
},
Err(err) => {
error!("error adding {:?}: {:?}", &path, err);
continue;
@ -331,7 +374,9 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
http_api.add_mgr(handle.clone());
}
if added {
if download_opts.list {
Ok(())
} else if added {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
}