Add API to add a torrent

This commit is contained in:
Igor Katson 2021-10-10 09:57:21 +01:00
parent 6e9e79a02e
commit 64900e1fd4
3 changed files with 86 additions and 46 deletions

View file

@ -1,13 +1,15 @@
use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::Context;
use dht::{Dht, DhtStats};
use parking_lot::RwLock;
use serde::Serialize;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use warp::hyper::body::Bytes;
use warp::hyper::Body;
use warp::Filter;
use crate::session::Session;
use crate::torrent_manager::TorrentManagerHandle;
use crate::torrent_state::StatsSnapshot;
@ -15,16 +17,25 @@ struct ApiInternal {
dht: Option<Dht>,
startup_time: Instant,
torrent_managers: RwLock<Vec<TorrentManagerHandle>>,
session: Arc<Session>,
}
impl ApiInternal {
fn new(dht: Option<Dht>) -> Self {
fn new(session: Arc<Session>) -> Self {
Self {
dht,
dht: session.get_dht(),
startup_time: Instant::now(),
torrent_managers: RwLock::new(Vec::new()),
session,
}
}
fn add_mgr(&self, handle: TorrentManagerHandle) -> usize {
let mut g = self.torrent_managers.write();
let idx = g.len();
g.push(handle);
idx
}
}
#[derive(Serialize)]
@ -116,6 +127,16 @@ impl ApiInternal {
Some(TorrentDetailsResponse { info_hash, files })
}
async fn api_add_torrent(&self, url: String) -> anyhow::Result<usize> {
let handle = self
.session
.add_torrent(url, None)
.await
.context("error adding torrent")?
.context("expected session.add_torrent() to return a handle")?;
Ok(self.add_mgr(handle))
}
fn api_dht_stats(&self) -> Option<DhtStats> {
self.dht.as_ref().map(|d| d.stats())
}
@ -185,16 +206,13 @@ fn json_or_404<T: Serialize>(idx: usize, v: Option<T>) -> warp::reply::Response
}
impl HttpApi {
pub fn new(dht: Option<Dht>) -> Self {
pub fn new(session: Arc<Session>) -> Self {
Self {
inner: Arc::new(ApiInternal::new(dht)),
inner: Arc::new(ApiInternal::new(session)),
}
}
pub fn add_mgr(&self, handle: TorrentManagerHandle) -> usize {
let mut g = self.inner.torrent_managers.write();
let idx = g.len();
g.push(handle);
idx
self.inner.add_mgr(handle)
}
pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> {
@ -235,11 +253,46 @@ impl HttpApi {
}
});
let torrent_list = warp::path!("torrents").map({
let torrent_list = warp::get().and(warp::path("torrents")).map({
let inner = inner.clone();
move || json_response(inner.api_torrent_list())
});
let torrent_add = warp::post()
.and(warp::path("torrents"))
.and(warp::body::bytes())
.and_then({
let inner = inner.clone();
use warp::http::Response;
fn make_response(status: u16, body: String) -> Response<String> {
Response::builder().status(status).body(body).unwrap()
}
move |body: Bytes| {
let inner = inner.clone();
async move {
let url = match String::from_utf8(body.to_vec()) {
Ok(str) => str,
Err(_) => {
return Ok::<_, warp::Rejection>(make_response(
400,
"invalid utf-8".into(),
))
}
};
let idx = inner
.api_add_torrent(url)
.await
.context("error calling HttpApi::api_add_torrent");
match idx {
Ok(idx) => {
return Ok(make_response(200, format!("{}", idx)));
}
Err(e) => return Ok(make_response(400, format!("{}", e))),
}
}
}
});
let torrent_details = warp::path!("torrents" / usize).map({
let inner = inner.clone();
move |idx| json_or_404(idx, inner.api_torrent_details(idx))
@ -264,7 +317,8 @@ impl HttpApi {
.or(dht_routing_table)
.or(torrent_details)
.or(torrent_dump_haves)
.or(torrent_dump_stats);
.or(torrent_dump_stats)
.or(torrent_add);
warp::serve(router).run(addr).await;
Ok(())

View file

@ -25,7 +25,6 @@ use crate::{
pub struct Session {
peer_id: Id20,
dht: Option<Dht>,
http_api: Option<HttpApi>,
peer_opts: PeerConnectionOptions,
spawner: BlockingSpawner,
output_folder: PathBuf,
@ -95,8 +94,6 @@ pub struct SessionOptions {
pub disable_dht: bool,
pub disable_dht_persistence: bool,
pub dht_config: Option<PersistentDhtConfig>,
pub disable_http_api: bool,
pub http_api_listen_addr: Option<SocketAddr>,
pub peer_id: Option<Id20>,
pub peer_opts: Option<PeerConnectionOptions>,
}
@ -124,29 +121,17 @@ impl Session {
};
let peer_opts = opts.peer_opts.unwrap_or_default();
let http_api = if opts.disable_http_api {
None
} else {
let http_api_listen_addr = opts
.http_api_listen_addr
.unwrap_or_else(|| "127.0.0.1:3001".parse().unwrap());
let http_api = HttpApi::new(dht.clone());
spawn("HTTP API", {
let http_api = http_api.clone();
async move { http_api.make_http_api_and_run(http_api_listen_addr).await }
});
Some(http_api)
};
Ok(Self {
peer_id,
dht,
http_api,
peer_opts,
spawner,
output_folder,
})
}
pub fn get_dht(&self) -> Option<Dht> {
self.dht.clone()
}
pub async fn add_torrent(
&self,
url: String,
@ -303,16 +288,7 @@ impl Session {
builder.peer_connect_timeout(t);
}
// let http_api = HttpApi::new(self.dht.clone());
// spawn("HTTP API", {
// let http_api = http_api.clone();
// async move { http_api.make_http_api_and_run(http_api_listen_addr).await }
// });
let handle = builder.start_manager()?;
if let Some(http_api) = self.http_api.as_ref() {
http_api.add_mgr(handle.clone());
}
for url in trackers {
handle.add_tracker(url);

View file

@ -1,8 +1,9 @@
use std::{net::SocketAddr, str::FromStr, time::Duration};
use std::{net::SocketAddr, str::FromStr, sync::Arc, time::Duration};
use anyhow::Context;
use clap::Clap;
use librqbit::{
http_api::HttpApi,
peer_connection::PeerConnectionOptions,
session::{AddTorrentOptions, Session, SessionOptions},
spawn_utils::{spawn, BlockingSpawner},
@ -141,17 +142,26 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
disable_dht: opts.disable_dht,
disable_dht_persistence: opts.disable_dht_persistence,
dht_config: None,
disable_http_api: false,
http_api_listen_addr: Some(opts.http_api_listen_addr),
peer_id: None,
peer_opts: Some(PeerConnectionOptions {
connect_timeout: opts.peer_connect_timeout.map(|d| d.0),
..Default::default()
}),
};
let session = Session::new_with_opts(opts.output_folder.into(), spawner, sopts)
.await
.context("error initializing rqbit session")?;
let session = Arc::new(
Session::new_with_opts(opts.output_folder.into(), spawner, sopts)
.await
.context("error initializing rqbit session")?,
);
{
let http_api = HttpApi::new(session.clone());
spawn("HTTP API", {
let http_api_listen_addr = opts.http_api_listen_addr;
async move { http_api.make_http_api_and_run(http_api_listen_addr).await }
});
};
let torrent_opts = AddTorrentOptions {
only_files_regex: opts.only_files_matching_regex,