Merge pull request #187 from ikatson/hash-based-api

Hash-based API in addition to integer based
This commit is contained in:
Igor Katson 2024-08-15 16:11:25 +01:00 committed by GitHub
commit 33554159bf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 187 additions and 56 deletions

View file

@ -119,18 +119,18 @@ By default it listens on http://127.0.0.1:3030.
"GET /dht/stats": "DHT stats",
"GET /dht/table": "DHT routing table",
"GET /torrents": "List torrents (default torrent is 0)",
"GET /torrents/{index}": "Torrent details",
"GET /torrents/{index}/haves": "The bitfield of have pieces",
"GET /torrents/{index}/peer_stats": "Per peer stats",
"GET /torrents/{index}/stats/v1": "Torrent stats",
"GET /torrents/{id_or_infohash}": "Torrent details",
"GET /torrents/{id_or_infohash}/haves": "The bitfield of have pieces",
"GET /torrents/{id_or_infohash}/peer_stats": "Per peer stats",
"GET /torrents/{id_or_infohash}/stats/v1": "Torrent stats",
"GET /web/": "Web UI",
"POST /rust_log": "Set RUST_LOG to this post launch (for debugging)",
"POST /torrents": "Add a torrent here. magnet: or http:// or a local file.",
"POST /torrents/{index}/delete": "Forget about the torrent, remove the files",
"POST /torrents/{index}/forget": "Forget about the torrent, keep the files",
"POST /torrents/{index}/pause": "Pause torrent",
"POST /torrents/{index}/start": "Resume torrent",
"POST /torrents/{index}/update_only_files": "Change the selection of files to download. You need to POST json of the following form {"only_files": [0, 1, 2]}"
"POST /torrents/{id_or_infohash}/delete": "Forget about the torrent, remove the files",
"POST /torrents/{id_or_infohash}/forget": "Forget about the torrent, keep the files",
"POST /torrents/{id_or_infohash}/pause": "Pause torrent",
"POST /torrents/{id_or_infohash}/start": "Resume torrent",
"POST /torrents/{id_or_infohash}/update_only_files": "Change the selection of files to download. You need to POST json of the following form {"only_files": [0, 1, 2]}"
},
"server": "rqbit"
}

View file

@ -1,4 +1,4 @@
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use std::{collections::HashSet, marker::PhantomData, net::SocketAddr, str::FromStr, sync::Arc};
use anyhow::Context;
use buffers::ByteBufOwned;
@ -36,6 +36,93 @@ pub struct Api {
line_broadcast: Option<LineBroadcast>,
}
#[derive(Debug, Clone, Copy)]
pub enum TorrentIdOrHash {
Id(TorrentId),
Hash(Id20),
}
impl Serialize for TorrentIdOrHash {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
TorrentIdOrHash::Id(id) => id.serialize(serializer),
TorrentIdOrHash::Hash(h) => h.as_string().serialize(serializer),
}
}
}
impl<'de> Deserialize<'de> for TorrentIdOrHash {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Default)]
struct V<'de> {
p: PhantomData<&'de ()>,
}
impl<'de> serde::de::Visitor<'de> for V<'de> {
type Value = TorrentIdOrHash;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str("integer or 40 byte info hash")
}
fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
TorrentIdOrHash::parse(v)
.map_err(|_| E::custom("expected integer or 40 byte info hash"))
}
}
deserializer.deserialize_str(V::default())
}
}
impl std::fmt::Display for TorrentIdOrHash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TorrentIdOrHash::Id(id) => write!(f, "{}", id),
TorrentIdOrHash::Hash(h) => write!(f, "{:?}", h),
}
}
}
impl From<TorrentId> for TorrentIdOrHash {
fn from(value: TorrentId) -> Self {
TorrentIdOrHash::Id(value)
}
}
impl From<Id20> for TorrentIdOrHash {
fn from(value: Id20) -> Self {
TorrentIdOrHash::Hash(value)
}
}
impl<'a> TryFrom<&'a str> for TorrentIdOrHash {
type Error = anyhow::Error;
fn try_from(value: &'a str) -> std::result::Result<Self, Self::Error> {
Self::parse(value)
}
}
impl TorrentIdOrHash {
pub fn parse(s: &str) -> anyhow::Result<Self> {
if s.len() == 40 {
let id = Id20::from_str(s)?;
return Ok(id.into());
}
let id: TorrentId = s.parse()?;
Ok(id.into())
}
}
impl Api {
pub fn new(
session: Arc<Session>,
@ -53,7 +140,7 @@ impl Api {
&self.session
}
pub fn mgr_handle(&self, idx: TorrentId) -> Result<ManagedTorrentHandle> {
pub fn mgr_handle(&self, idx: TorrentIdOrHash) -> Result<ManagedTorrentHandle> {
self.session
.get(idx)
.ok_or(ApiError::torrent_not_found(idx))
@ -71,14 +158,18 @@ impl Api {
TorrentListResponse { torrents: items }
}
pub fn api_torrent_details(&self, idx: TorrentId) -> Result<TorrentDetailsResponse> {
pub fn api_torrent_details(&self, idx: TorrentIdOrHash) -> Result<TorrentDetailsResponse> {
let handle = self.mgr_handle(idx)?;
let info_hash = handle.info().info_hash;
let only_files = handle.only_files();
make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref())
}
pub fn torrent_file_mime_type(&self, idx: TorrentId, file_idx: usize) -> Result<&'static str> {
pub fn torrent_file_mime_type(
&self,
idx: TorrentIdOrHash,
file_idx: usize,
) -> Result<&'static str> {
let handle = self.mgr_handle(idx)?;
let info = &handle.info().info;
torrent_file_mime_type(info, file_idx)
@ -86,7 +177,7 @@ impl Api {
pub fn api_peer_stats(
&self,
idx: TorrentId,
idx: TorrentIdOrHash,
filter: PeerStatsFilter,
) -> Result<PeerStatsSnapshot> {
let handle = self.mgr_handle(idx)?;
@ -96,7 +187,10 @@ impl Api {
.per_peer_stats_snapshot(filter))
}
pub async fn api_torrent_action_pause(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
pub async fn api_torrent_action_pause(
&self,
idx: TorrentIdOrHash,
) -> Result<EmptyJsonResponse> {
let handle = self.mgr_handle(idx)?;
self.session()
.pause(&handle)
@ -106,7 +200,10 @@ impl Api {
Ok(Default::default())
}
pub async fn api_torrent_action_start(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
pub async fn api_torrent_action_start(
&self,
idx: TorrentIdOrHash,
) -> Result<EmptyJsonResponse> {
let handle = self.mgr_handle(idx)?;
self.session
.unpause(&handle)
@ -116,7 +213,10 @@ impl Api {
Ok(Default::default())
}
pub async fn api_torrent_action_forget(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
pub async fn api_torrent_action_forget(
&self,
idx: TorrentIdOrHash,
) -> Result<EmptyJsonResponse> {
self.session
.delete(idx, false)
.await
@ -124,7 +224,10 @@ impl Api {
Ok(Default::default())
}
pub async fn api_torrent_action_delete(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
pub async fn api_torrent_action_delete(
&self,
idx: TorrentIdOrHash,
) -> Result<EmptyJsonResponse> {
self.session
.delete(idx, true)
.await
@ -134,7 +237,7 @@ impl Api {
pub async fn api_torrent_action_update_only_files(
&self,
idx: TorrentId,
idx: TorrentIdOrHash,
only_files: &HashSet<usize>,
) -> Result<EmptyJsonResponse> {
let handle = self.mgr_handle(idx)?;
@ -240,23 +343,23 @@ impl Api {
Ok(dht.with_routing_table(|r| r.clone()))
}
pub fn api_stats_v0(&self, idx: TorrentId) -> Result<LiveStats> {
pub fn api_stats_v0(&self, idx: TorrentIdOrHash) -> Result<LiveStats> {
let mgr = self.mgr_handle(idx)?;
let live = mgr.live().context("torrent not live")?;
Ok(LiveStats::from(&*live))
}
pub fn api_stats_v1(&self, idx: TorrentId) -> Result<TorrentStats> {
pub fn api_stats_v1(&self, idx: TorrentIdOrHash) -> Result<TorrentStats> {
let mgr = self.mgr_handle(idx)?;
Ok(mgr.stats())
}
pub fn api_dump_haves(&self, idx: usize) -> Result<String> {
pub fn api_dump_haves(&self, idx: TorrentIdOrHash) -> Result<String> {
let mgr = self.mgr_handle(idx)?;
Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?)
}
pub fn api_stream(&self, idx: TorrentId, file_id: usize) -> Result<FileStream> {
pub fn api_stream(&self, idx: TorrentIdOrHash, file_id: usize) -> Result<FileStream> {
let mgr = self.mgr_handle(idx)?;
Ok(mgr.stream(file_id)?)
}

View file

@ -2,6 +2,8 @@ use axum::response::{IntoResponse, Response};
use http::StatusCode;
use serde::{Serialize, Serializer};
use crate::api::TorrentIdOrHash;
// Convenience error type.
#[derive(Debug)]
pub struct ApiError {
@ -19,7 +21,7 @@ impl ApiError {
}
}
pub const fn torrent_not_found(torrent_id: usize) -> Self {
pub const fn torrent_not_found(torrent_id: TorrentIdOrHash) -> Self {
Self {
status: Some(StatusCode::NOT_FOUND),
kind: ApiErrorKind::TorrentNotFound(torrent_id),
@ -75,7 +77,7 @@ impl ApiError {
#[derive(Debug)]
enum ApiErrorKind {
TorrentNotFound(usize),
TorrentNotFound(TorrentIdOrHash),
DhtDisabled,
Text(&'static str),
Other(anyhow::Error),
@ -93,7 +95,7 @@ impl Serialize for ApiError {
status: u16,
status_text: String,
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<usize>,
id: Option<TorrentIdOrHash>,
}
let mut serr: SerializedError = SerializedError {
error_kind: match self.kind {

View file

@ -18,7 +18,7 @@ use tracing::{debug, info, trace};
use axum::Router;
use crate::api::Api;
use crate::api::{Api, TorrentIdOrHash};
use crate::peer_connection::PeerConnectionOptions;
use crate::session::{AddTorrent, AddTorrentOptions, SUPPORTED_SCHEMES};
use crate::torrent_state::peer::stats::snapshot::PeerStatsFilter;
@ -59,16 +59,16 @@ impl HttpApi {
"GET /": "list all available APIs",
"GET /dht/stats": "DHT stats",
"GET /dht/table": "DHT routing table",
"GET /torrents": "List torrents (default torrent is 0)",
"GET /torrents/{index}": "Torrent details",
"GET /torrents/{index}/haves": "The bitfield of have pieces",
"GET /torrents/{index}/stats/v1": "Torrent stats",
"GET /torrents/{index}/peer_stats": "Per peer stats",
"POST /torrents/{index}/pause": "Pause torrent",
"POST /torrents/{index}/start": "Resume torrent",
"POST /torrents/{index}/forget": "Forget about the torrent, keep the files",
"POST /torrents/{index}/delete": "Forget about the torrent, remove the files",
"POST /torrents/{index}/update_only_files": "Change the selection of files to download. You need to POST json of the following form {\"only_files\": [0, 1, 2]}",
"GET /torrents": "List torrents",
"GET /torrents/{id_or_infohash}": "Torrent details",
"GET /torrents/{id_or_infohash}/haves": "The bitfield of have pieces",
"GET /torrents/{id_or_infohash}/stats/v1": "Torrent stats",
"GET /torrents/{id_or_infohash}/peer_stats": "Per peer stats",
"POST /torrents/{id_or_infohash}/pause": "Pause torrent",
"POST /torrents/{id_or_infohash}/start": "Resume torrent",
"POST /torrents/{id_or_infohash}/forget": "Forget about the torrent, keep the files",
"POST /torrents/{id_or_infohash}/delete": "Forget about the torrent, remove the files",
"POST /torrents/{id_or_infohash}/update_only_files": "Change the selection of files to download. You need to POST json of the following form {\"only_files\": [0, 1, 2]}",
"POST /torrents": "Add a torrent here. magnet: or http:// or a local file.",
"POST /rust_log": "Set RUST_LOG to this post launch (for debugging)",
"GET /web/": "Web UI",
@ -124,7 +124,7 @@ impl HttpApi {
async fn torrent_details(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api_torrent_details(idx).map(axum::Json)
}
@ -168,7 +168,7 @@ impl HttpApi {
fn build_playlist_content(
host: &str,
it: impl IntoIterator<Item = (usize, usize, String)>,
it: impl IntoIterator<Item = (TorrentIdOrHash, usize, String)>,
) -> impl IntoResponse {
let body = it
.into_iter()
@ -240,7 +240,7 @@ impl HttpApi {
async fn torrent_playlist(
State(state): State<ApiState>,
headers: HeaderMap,
Path(idx): Path<usize>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
let host = get_host(&headers)?;
let playlist_items = torrent_playlist_items(&*state.mgr_handle(idx)?)?;
@ -263,7 +263,7 @@ impl HttpApi {
torrent_playlist_items(handle)
.map(move |items| {
items.into_iter().map(move |(file_idx, filename)| {
(torrent_idx, file_idx, filename)
(torrent_idx.into(), file_idx, filename)
})
})
.ok()
@ -276,28 +276,28 @@ impl HttpApi {
async fn torrent_haves(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api_dump_haves(idx)
}
async fn torrent_stats_v0(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api_stats_v0(idx).map(axum::Json)
}
async fn torrent_stats_v1(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api_stats_v1(idx).map(axum::Json)
}
async fn peer_stats(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Path(idx): Path<TorrentIdOrHash>,
Query(filter): Query<PeerStatsFilter>,
) -> Result<impl IntoResponse> {
state.api_peer_stats(idx, filter).map(axum::Json)
@ -305,7 +305,7 @@ impl HttpApi {
async fn torrent_stream_file(
State(state): State<ApiState>,
Path((idx, file_id)): Path<(usize, usize)>,
Path((idx, file_id)): Path<(TorrentIdOrHash, usize)>,
headers: http::HeaderMap,
) -> Result<impl IntoResponse> {
let mut stream = state.api_stream(idx, file_id)?;
@ -321,7 +321,7 @@ impl HttpApi {
}
let range_header = headers.get(http::header::RANGE);
trace!(torrent_id=idx, file_id=file_id, range=?range_header, "request for HTTP stream");
trace!(torrent_id=%idx, file_id=file_id, range=?range_header, "request for HTTP stream");
if let Some(range) = range_header {
let offset: Option<u64> = range
@ -366,28 +366,28 @@ impl HttpApi {
async fn torrent_action_pause(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api_torrent_action_pause(idx).await.map(axum::Json)
}
async fn torrent_action_start(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api_torrent_action_start(idx).await.map(axum::Json)
}
async fn torrent_action_forget(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api_torrent_action_forget(idx).await.map(axum::Json)
}
async fn torrent_action_delete(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Path(idx): Path<TorrentIdOrHash>,
) -> Result<impl IntoResponse> {
state.api_torrent_action_delete(idx).await.map(axum::Json)
}
@ -399,7 +399,7 @@ impl HttpApi {
async fn torrent_action_update_only_files(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Path(idx): Path<TorrentIdOrHash>,
axum::Json(req): axum::Json<UpdateOnlyFilesRequest>,
) -> Result<impl IntoResponse> {
state

View file

@ -9,6 +9,7 @@ use std::{
};
use crate::{
api::TorrentIdOrHash,
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
merge_streams::merge_streams,
peer_connection::PeerConnectionOptions,
@ -1091,11 +1092,36 @@ impl Session {
Ok(AddTorrentResponse::Added(id, managed_torrent))
}
pub fn get(&self, id: TorrentId) -> Option<ManagedTorrentHandle> {
self.db.read().torrents.get(&id).cloned()
pub fn get(&self, id: TorrentIdOrHash) -> Option<ManagedTorrentHandle> {
match id {
TorrentIdOrHash::Id(id) => self.db.read().torrents.get(&id).cloned(),
TorrentIdOrHash::Hash(id) => self.db.read().torrents.iter().find_map(|(_, v)| {
if v.info_hash() == id {
Some(v.clone())
} else {
None
}
}),
}
}
pub async fn delete(&self, id: TorrentId, delete_files: bool) -> anyhow::Result<()> {
pub async fn delete(&self, id: TorrentIdOrHash, delete_files: bool) -> anyhow::Result<()> {
let id = match id {
TorrentIdOrHash::Id(id) => id,
TorrentIdOrHash::Hash(h) => self
.db
.read()
.torrents
.values()
.find_map(|v| {
if v.info_hash() == h {
Some(v.id())
} else {
None
}
})
.context("no such torrent in db")?,
};
let removed = self
.db
.write()

View file

@ -228,7 +228,7 @@ async fn test_e2e_download() {
}
info!("handle is completed");
session.delete(id, false).await.unwrap();
session.delete(id.into(), false).await.unwrap();
info!("deleted handle");