Ability to upload torrents + other stuff for Web UI to work

This commit is contained in:
Igor Katson 2023-11-20 19:52:48 +00:00
parent e557d76660
commit 1585a7e04a
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
5 changed files with 188 additions and 90 deletions

26
Cargo.lock generated
View file

@ -683,6 +683,12 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "http-range-header"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f"
[[package]]
name = "httparse"
version = "1.8.0"
@ -862,6 +868,7 @@ dependencies = [
"size_format",
"tokio",
"tokio-stream",
"tower-http",
"tracing",
"tracing-subscriber",
"url",
@ -1944,6 +1951,25 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower-http"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140"
dependencies = [
"bitflags 2.4.1",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-range-header",
"pin-project-lite",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.2"

View file

@ -31,6 +31,7 @@ dht = {path = "../dht", package="librqbit-dht", version="3.0.0"}
tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
axum = {version = "0.6"}
tower-http = {version = "0.4", features = ["cors", "trace"]}
tokio-stream = "0.1"
serde = {version = "1", features=["derive"]}
serde_json = "1"

View file

@ -1,4 +1,5 @@
use anyhow::Context;
use axum::body::Bytes;
use axum::extract::{Path, Query, State};
use axum::response::IntoResponse;
use axum::routing::get;
@ -12,13 +13,16 @@ use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tower_http::cors::{AllowHeaders, AllowOrigin};
use tracing::{info, warn};
use axum::Router;
use crate::http_api_error::{ApiError, ApiErrorExt};
use crate::peer_state::PeerStatsFilter;
use crate::session::{AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session};
use crate::session::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session,
};
use crate::torrent_manager::TorrentManagerHandle;
use crate::torrent_state::StatsSnapshot;
@ -75,10 +79,14 @@ impl HttpApi {
async fn torrents_post(
State(state): State<ApiState>,
Query(params): Query<TorrentAddQueryParams>,
url: String,
data: Bytes,
) -> Result<impl IntoResponse> {
let opts = params.into_add_torrent_options();
state.api_add_torrent(url, Some(opts)).await.map(axum::Json)
let add = match String::from_utf8(data.to_vec()) {
Ok(s) => AddTorrent::from(s),
Err(e) => AddTorrent::from(e.into_bytes()),
};
state.api_add_torrent(add, Some(opts)).await.map(axum::Json)
}
async fn torrent_details(
@ -119,6 +127,12 @@ impl HttpApi {
.route("/torrents/:id/haves", get(torrent_haves))
.route("/torrents/:id/stats", get(torrent_stats))
.route("/torrents/:id/peer_stats", get(peer_stats))
.layer(
tower_http::cors::CorsLayer::default()
.allow_origin(AllowOrigin::predicate(|_, _| true))
.allow_headers(AllowHeaders::any()),
)
.layer(tower_http::trace::TraceLayer::new_for_http())
.with_state(state);
info!("starting HTTP server on {}", addr);
@ -177,13 +191,33 @@ pub struct TorrentDetailsResponse {
pub files: Vec<TorrentDetailsResponseFile>,
}
struct DurationWithHumanReadable(Duration);
impl Serialize for DurationWithHumanReadable {
fn serialize<S>(&self, serializer: S) -> core::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
#[derive(Serialize)]
struct Tmp {
duration: Duration,
human_readable: String,
}
Tmp {
duration: self.0,
human_readable: format!("{:?}", self.0),
}
.serialize(serializer)
}
}
#[derive(Serialize)]
struct StatsResponse {
snapshot: StatsSnapshot,
average_piece_download_time: Option<Duration>,
download_speed: Speed,
all_time_download_speed: Speed,
time_remaining: Option<Duration>,
time_remaining: Option<DurationWithHumanReadable>,
}
#[derive(Serialize, Deserialize)]
@ -282,12 +316,12 @@ impl ApiInternal {
pub async fn api_add_torrent(
&self,
url: String,
add: AddTorrent<'_>,
opts: Option<AddTorrentOptions>,
) -> Result<ApiAddTorrentResponse> {
let response = match self
.session
.add_torrent(&url, opts)
.add_torrent(add, opts)
.await
.context("error adding torrent")
.with_error_status_code(StatusCode::BAD_REQUEST)?
@ -353,7 +387,7 @@ impl ApiInternal {
snapshot,
all_time_download_speed: (downloaded_mb / elapsed.as_secs_f64()).into(),
download_speed: estimator.download_mbps().into(),
time_remaining: estimator.time_remaining(),
time_remaining: estimator.time_remaining().map(DurationWithHumanReadable),
})
}

View file

@ -1,4 +1,4 @@
use std::{fs::File, io::Read, net::SocketAddr, path::PathBuf, time::Duration};
use std::{borrow::Cow, fs::File, io::Read, net::SocketAddr, path::PathBuf, time::Duration};
use anyhow::Context;
use buffers::ByteString;
@ -141,6 +141,29 @@ pub enum AddTorrentResponse {
Added(TorrentManagerHandle),
}
pub enum AddTorrent<'a> {
Url(Cow<'a, str>),
TorrentFileBytes(Vec<u8>),
}
impl<'a> From<&'a str> for AddTorrent<'a> {
fn from(s: &'a str) -> Self {
Self::Url(Cow::Borrowed(s))
}
}
impl<'a> From<String> for AddTorrent<'a> {
fn from(s: String) -> Self {
Self::Url(Cow::Owned(s))
}
}
impl<'a> From<Vec<u8>> for AddTorrent<'a> {
fn from(b: Vec<u8>) -> Self {
Self::TorrentFileBytes(b)
}
}
#[derive(Default)]
pub struct SessionOptions {
pub disable_dht: bool,
@ -193,99 +216,110 @@ impl Session {
}
pub async fn add_torrent(
&self,
url: &str,
add: impl Into<AddTorrent<'_>>,
opts: Option<AddTorrentOptions>,
) -> anyhow::Result<AddTorrentResponse> {
// Magnet links are different in that we first need to discover the metadata.
let opts = opts.unwrap_or_default();
if url.starts_with("magnet:") {
let Magnet {
info_hash,
trackers,
} = Magnet::parse(url).context("provided path is not a valid magnet URL")?;
let dht_rx = self
.dht
.as_ref()
.context("magnet links without DHT are not supported")?
.get_peers(info_hash)
.await?;
let (info_hash, info, dht_rx, trackers, initial_peers) = match add.into() {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
let Magnet {
info_hash,
trackers,
} = Magnet::parse(&*magnet).context("provided path is not a valid magnet URL")?;
let trackers = trackers
.into_iter()
.filter_map(|url| match reqwest::Url::parse(&url) {
Ok(url) => Some(url),
Err(e) => {
warn!("error parsing tracker {} as url: {}", url, e);
None
}
})
.collect();
let dht_rx = self
.dht
.as_ref()
.context("magnet links without DHT are not supported")?
.get_peers(info_hash)
.await?;
let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver(
self.peer_id,
info_hash,
dht_rx,
Some(self.peer_opts),
)
.await
{
ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen),
ReadMetainfoResult::ChannelClosed { .. } => {
anyhow::bail!("DHT died, no way to discover torrent metainfo")
}
};
self.main_torrent_info(
info_hash,
info,
Some(dht_rx),
initial_peers.into_iter().collect(),
trackers,
opts,
)
.await
} else {
let torrent = if url.starts_with("http://") || url.starts_with("https://") {
torrent_from_url(url).await?
} else {
torrent_from_file(url)?
};
let dht_rx = match self.dht.as_ref() {
Some(dht) => {
debug!("reading peers for {:?} from DHT", torrent.info_hash);
Some(dht.get_peers(torrent.info_hash).await?)
}
None => None,
};
let trackers = torrent
.iter_announce()
.filter_map(|tracker| {
let url = match std::str::from_utf8(tracker.as_ref()) {
Ok(url) => url,
Err(_) => {
warn!("cannot parse tracker url as utf-8, ignoring");
return None;
}
};
match Url::parse(url) {
let trackers = trackers
.into_iter()
.filter_map(|url| match reqwest::Url::parse(&url) {
Ok(url) => Some(url),
Err(e) => {
warn!("cannot parse tracker URL {}: {}", url, e);
warn!("error parsing tracker {} as url: {}", url, e);
None
}
})
.collect();
let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver(
self.peer_id,
info_hash,
dht_rx,
Some(self.peer_opts),
)
.await
{
ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen),
ReadMetainfoResult::ChannelClosed { .. } => {
anyhow::bail!("DHT died, no way to discover torrent metainfo")
}
})
.collect::<Vec<_>>();
self.main_torrent_info(
torrent.info_hash,
torrent.info,
dht_rx,
Vec::new(),
trackers,
opts,
)
.await
}
};
(info_hash, info, Some(dht_rx), trackers, initial_peers)
}
other => {
let torrent = match other {
AddTorrent::Url(url)
if url.starts_with("http://") || url.starts_with("https://") =>
{
torrent_from_url(&*url).await?
}
AddTorrent::Url(filename) => torrent_from_file(&*filename)?,
AddTorrent::TorrentFileBytes(bytes) => {
torrent_from_bytes(&bytes).context("error decoding torrent")?
}
};
let dht_rx = match self.dht.as_ref() {
Some(dht) => {
debug!("reading peers for {:?} from DHT", torrent.info_hash);
Some(dht.get_peers(torrent.info_hash).await?)
}
None => None,
};
let trackers = torrent
.iter_announce()
.filter_map(|tracker| {
let url = match std::str::from_utf8(tracker.as_ref()) {
Ok(url) => url,
Err(_) => {
warn!("cannot parse tracker url as utf-8, ignoring");
return None;
}
};
match Url::parse(url) {
Ok(url) => Some(url),
Err(e) => {
warn!("cannot parse tracker URL {}: {}", url, e);
None
}
}
})
.collect::<Vec<_>>();
(
torrent.info_hash,
torrent.info,
dht_rx,
trackers,
Default::default(),
)
}
};
self.main_torrent_info(
info_hash,
info,
dht_rx,
initial_peers.into_iter().collect(),
trackers,
opts,
)
.await
}
#[allow(clippy::too_many_arguments)]

View file

@ -381,7 +381,10 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
let mut handles = Vec::new();
for path in &download_opts.torrent_path {
let handle = match session.add_torrent(path, Some(torrent_opts.clone())).await {
let handle = match session
.add_torrent(path.as_str(), Some(torrent_opts.clone()))
.await
{
Ok(v) => match v {
AddTorrentResponse::AlreadyManaged(handle) => {
info!(