Merge pull request #181 from ikatson/torrent-bytes

Add an HTTP endopoint to resolve magnet URL to bytes (address #177)
This commit is contained in:
Igor Katson 2024-08-13 20:37:35 +01:00 committed by GitHub
commit 3cc9e444b1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 292 additions and 62 deletions

View file

@ -1,4 +1,5 @@
mod bencode_value;
pub mod raw_value;
mod serde_bencode_de;
mod serde_bencode_ser;

View file

@ -0,0 +1,28 @@
use serde::Serialize;
pub struct RawValue<T>(pub T);
pub(crate) const TAG: &str = "::librqbit_bencode::RawValue";
impl<T> Serialize for RawValue<T>
where
T: AsRef<[u8]>,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
struct Wrapper<'a>(&'a [u8]);
impl<'a> Serialize for Wrapper<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_bytes(self.0)
}
}
serializer.serialize_newtype_struct(TAG, &Wrapper(self.0.as_ref()))
}
}

View file

@ -10,6 +10,7 @@ pub struct BencodeDeserializer<'de> {
// This is a f**ing hack
pub is_torrent_info: bool,
pub torrent_info_digest: Option<[u8; 20]>,
pub torrent_info_bytes: Option<&'de [u8]>,
}
impl<'de> BencodeDeserializer<'de> {
@ -20,6 +21,7 @@ impl<'de> BencodeDeserializer<'de> {
parsing_key: false,
is_torrent_info: false,
torrent_info_digest: None,
torrent_info_bytes: None,
}
}
pub fn into_remaining(self) -> &'de [u8] {
@ -542,9 +544,11 @@ impl<'a, 'de> serde::de::MapAccess<'de> for MapAccess<'a, 'de> {
if self.de.is_torrent_info && self.de.field_context.as_slice() == [ByteBuf(b"info")] {
let len = self.de.buf.as_ptr() as usize - buf_before.as_ptr() as usize;
let mut hash = Sha1::new();
hash.update(&buf_before[..len]);
let torrent_info_bytes = &buf_before[..len];
hash.update(torrent_info_bytes);
let digest = hash.finish();
self.de.torrent_info_digest = Some(digest)
self.de.torrent_info_digest = Some(digest);
self.de.torrent_info_bytes = Some(torrent_info_bytes);
}
self.de.field_context.pop();
Ok(value)

View file

@ -328,12 +328,18 @@ impl<'ser, W: std::io::Write> Serializer for &'ser mut BencodeSerializer<W> {
fn serialize_newtype_struct<T>(
self,
_name: &'static str,
_value: &T,
name: &'static str,
value: &T,
) -> Result<Self::Ok, Self::Error>
where
T: ?Sized + serde::Serialize,
{
if name == crate::raw_value::TAG {
self.hack_no_bytestring_prefix = true;
value.serialize(&mut *self)?;
self.hack_no_bytestring_prefix = false;
return Ok(());
}
Err(SerError::custom_with_ser(
"bencode doesn't support newtype structs",
self,

View file

@ -191,6 +191,7 @@ impl Api {
only_files,
seen_peers,
output_folder,
..
}) => ApiAddTorrentResponse {
id: None,
output_folder: output_folder.to_string_lossy().into_owned(),

View file

@ -16,6 +16,7 @@ use librqbit_core::hash_id::Id20;
pub enum ReadMetainfoResult<Rx> {
Found {
info: TorrentMetaV1Info<ByteBufOwned>,
info_bytes: ByteBufOwned,
rx: Rx,
seen: HashSet<SocketAddr>,
},
@ -80,7 +81,7 @@ pub async fn read_metainfo_from_peer_receiver<A: Stream<Item = SocketAddr> + Unp
},
done = unordered.next(), if !unordered.is_empty() => {
match done {
Some(Ok(info)) => return ReadMetainfoResult::Found { info, seen, rx: addrs },
Some(Ok((info, info_bytes))) => return ReadMetainfoResult::Found { info, info_bytes, seen, rx: addrs },
Some(Err(e)) => {
debug!("{:#}", e);
},

View file

@ -26,7 +26,7 @@ use crate::torrent_state::peer::stats::snapshot::PeerStatsFilter;
type ApiState = Api;
use crate::api::Result;
use crate::{ApiError, ManagedTorrent};
use crate::{ApiError, ListOnlyResponse, ManagedTorrent};
/// An HTTP server for the API.
pub struct HttpApi {
@ -188,6 +188,55 @@ impl HttpApi {
)
}
async fn resolve_magnet(
State(state): State<ApiState>,
url: String,
) -> Result<impl IntoResponse> {
let added = state
.session()
.add_torrent(
AddTorrent::from_url(&url),
Some(AddTorrentOptions {
list_only: true,
..Default::default()
}),
)
.await?;
let (info, content) = match added {
crate::AddTorrentResponse::AlreadyManaged(_, handle) => (
handle.info().info.clone(),
handle.info().torrent_bytes.clone(),
),
crate::AddTorrentResponse::ListOnly(ListOnlyResponse {
info,
torrent_bytes,
..
}) => (info, torrent_bytes),
crate::AddTorrentResponse::Added(_, _) => {
return Err(ApiError::new_from_text(
StatusCode::INTERNAL_SERVER_ERROR,
"bug: torrent was added to session, but shouldn't have been",
))
}
};
let mut headers = HeaderMap::new();
headers.insert(
"Content-Type",
HeaderValue::from_static("application/x-bittorrent"),
);
if let Some(name) = info.name.as_ref() {
if let Ok(name) = std::str::from_utf8(name) {
if let Ok(h) =
HeaderValue::from_str(&format!("attachment; filename=\"{}.torrent\"", name))
{
headers.insert("Content-Disposition", h);
}
}
}
Ok((headers, content))
}
async fn torrent_playlist(
State(state): State<ApiState>,
headers: HeaderMap,
@ -388,6 +437,7 @@ impl HttpApi {
.route("/torrents/:id/stream/:file_id", get(torrent_stream_file))
.route("/torrents/:id/playlist", get(torrent_playlist))
.route("/torrents/playlist", get(global_playlist))
.route("/torrents/resolve_magnet", post(resolve_magnet))
.route(
"/torrents/:id/stream/:file_id/*filename",
get(torrent_stream_file),

View file

@ -32,9 +32,10 @@ pub(crate) async fn read_metainfo_from_peer(
peer_connection_options: Option<PeerConnectionOptions>,
spawner: BlockingSpawner,
connector: Arc<StreamConnector>,
) -> anyhow::Result<TorrentMetaV1Info<ByteBufOwned>> {
let (result_tx, result_rx) =
tokio::sync::oneshot::channel::<anyhow::Result<TorrentMetaV1Info<ByteBufOwned>>>();
) -> anyhow::Result<TorrentAndInfoBytes> {
let (result_tx, result_rx) = tokio::sync::oneshot::channel::<
anyhow::Result<(TorrentMetaV1Info<ByteBufOwned>, ByteBufOwned)>,
>();
let (writer_tx, writer_rx) = tokio::sync::mpsc::unbounded_channel::<WriterRequest>();
let handler = Handler {
addr,
@ -135,13 +136,13 @@ impl HandlerLocked {
}
}
pub type TorrentAndInfoBytes = (TorrentMetaV1Info<ByteBufOwned>, ByteBufOwned);
struct Handler {
addr: SocketAddr,
info_hash: Id20,
writer_tx: UnboundedSender<WriterRequest>,
result_tx: Mutex<
Option<tokio::sync::oneshot::Sender<anyhow::Result<TorrentMetaV1Info<ByteBufOwned>>>>,
>,
result_tx: Mutex<Option<tokio::sync::oneshot::Sender<anyhow::Result<TorrentAndInfoBytes>>>>,
locked: RwLock<Option<HandlerLocked>>,
}
@ -179,6 +180,7 @@ impl PeerConnectionHandler for Handler {
if piece_ready {
let buf = self.locked.write().take().unwrap().buffer;
let info = from_bytes::<TorrentMetaV1Info<ByteBufOwned>>(&buf);
let info = info.map(|i| (i, ByteBufOwned(buf.into_boxed_slice())));
self.result_tx
.lock()
.take()

View file

@ -29,6 +29,7 @@ use crate::{
use anyhow::{bail, Context};
use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::{ByteBuf, ByteBufOwned, ByteBufT};
use bytes::Bytes;
use clone_to_owned::CloneToOwned;
use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig};
use futures::{
@ -119,6 +120,7 @@ impl SessionDatabase {
.map(|u| u.to_string())
.collect(),
info_hash: torrent.info_hash().as_string(),
torrent_bytes: torrent.info.torrent_bytes.clone(),
info: torrent.info().info.clone(),
only_files: torrent.only_files().clone(),
is_paused: torrent
@ -140,6 +142,12 @@ struct SerializedTorrent {
deserialize_with = "deserialize_torrent"
)]
info: TorrentMetaV1Info<ByteBufOwned>,
#[serde(
serialize_with = "serialize_torrent_bytes",
deserialize_with = "deserialize_torrent_bytes",
default
)]
torrent_bytes: Bytes,
trackers: HashSet<String>,
output_folder: PathBuf,
only_files: Option<Vec<usize>>,
@ -175,6 +183,28 @@ where
.map_err(D::Error::custom)
}
fn serialize_torrent_bytes<S>(t: &Bytes, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use base64::{engine::general_purpose, Engine as _};
let s = general_purpose::STANDARD_NO_PAD.encode(t);
s.serialize(serializer)
}
fn deserialize_torrent_bytes<'de, D>(deserializer: D) -> Result<Bytes, D::Error>
where
D: Deserializer<'de>,
{
use base64::{engine::general_purpose, Engine as _};
use serde::de::Error;
let s = String::deserialize(deserializer)?;
let b = general_purpose::STANDARD_NO_PAD
.decode(s)
.map_err(D::Error::custom)?;
Ok(b.into())
}
#[derive(Serialize, Deserialize)]
struct SerializedSessionDatabase {
torrents: HashMap<usize, SerializedTorrent>,
@ -207,7 +237,7 @@ pub struct Session {
async fn torrent_from_url(
reqwest_client: &reqwest::Client,
url: &str,
) -> anyhow::Result<TorrentMetaV1Owned> {
) -> anyhow::Result<(TorrentMetaV1Owned, ByteBufOwned)> {
let response = reqwest_client
.get(url)
.send()
@ -220,7 +250,10 @@ async fn torrent_from_url(
.bytes()
.await
.with_context(|| format!("error reading response body from {url}"))?;
torrent_from_bytes(&b).context("error decoding torrent")
Ok((
torrent_from_bytes(&b).context("error decoding torrent")?,
b.to_vec().into(),
))
}
fn compute_only_files_regex<ByteBuf: AsRef<[u8]>>(
@ -344,6 +377,7 @@ pub struct ListOnlyResponse {
pub only_files: Option<Vec<usize>>,
pub output_folder: PathBuf,
pub seen_peers: Vec<SocketAddr>,
pub torrent_bytes: Bytes,
}
#[allow(clippy::large_enum_variant)]
@ -468,6 +502,25 @@ async fn create_tcp_listener(
bail!("no free TCP ports in range {port_range:?}");
}
fn torrent_file_from_info_bytes(info_bytes: &[u8], trackers: &[String]) -> anyhow::Result<Bytes> {
#[derive(Serialize)]
struct Tmp<'a> {
announce: &'a str,
#[serde(rename = "announce-list")]
announce_list: &'a [&'a [String]],
info: bencode::raw_value::RawValue<&'a [u8]>,
}
let mut w = Vec::new();
let v = Tmp {
info: bencode::raw_value::RawValue(info_bytes),
announce: trackers.first().map(|s| s.as_str()).unwrap_or(""),
announce_list: &[trackers],
};
bencode_serialize_to_writer(&v, &mut w)?;
Ok(w.into())
}
pub(crate) struct CheckedIncomingConnection {
pub addr: SocketAddr,
pub stream: tokio::net::TcpStream,
@ -475,6 +528,15 @@ pub(crate) struct CheckedIncomingConnection {
pub handshake: Handshake<ByteBufOwned>,
}
struct InternalAddResult {
info_hash: Id20,
info: TorrentMetaV1Info<ByteBufOwned>,
torrent_bytes: Bytes,
trackers: Vec<String>,
peer_rx: Option<PeerStream>,
initial_peers: Vec<SocketAddr>,
}
impl Session {
/// Create a new session with default options.
/// The passed in folder will be used as a default unless overriden per torrent.
@ -897,9 +959,6 @@ impl Session {
) -> BoxFuture<'a, anyhow::Result<AddTorrentResponse>> {
async move {
// Magnet links are different in that we first need to discover the metadata.
let span = error_span!("add_torrent");
let _ = span.enter();
let opts = opts.unwrap_or_default();
let paused = opts.list_only || opts.paused;
@ -910,7 +969,7 @@ impl Session {
// into a torrent file by connecting to peers that support extended handshakes.
// So we must discover at least one peer and connect to it to be able to proceed further.
let (info_hash, info, trackers, peer_rx, initial_peers) = match add {
let add_res = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
let magnet = Magnet::parse(&magnet)
.context("provided path is not a valid magnet URL")?;
@ -934,7 +993,7 @@ impl Session {
};
debug!(?info_hash, "querying DHT");
let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver(
match read_metainfo_from_peer_receiver(
self.peer_id,
info_hash,
opts.initial_peers.clone().unwrap_or_default(),
@ -944,22 +1003,33 @@ impl Session {
)
.await
{
ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen),
ReadMetainfoResult::Found {
info,
info_bytes,
rx,
seen,
} => {
debug!(?info, "received result from DHT");
let trackers = magnet.trackers.into_iter().unique().collect_vec();
InternalAddResult {
info_hash,
torrent_bytes: torrent_file_from_info_bytes(
&info_bytes,
&trackers,
)?,
info,
trackers,
peer_rx: Some(rx),
initial_peers: seen.into_iter().collect(),
}
}
ReadMetainfoResult::ChannelClosed { .. } => {
bail!("DHT died, no way to discover torrent metainfo")
}
};
debug!(?info, "received result from DHT");
(
info_hash,
info,
magnet.trackers.into_iter().unique().collect(),
Some(peer_rx),
initial_peers,
)
}
}
other => {
let torrent = match other {
let (torrent, bytes) = match other {
AddTorrent::Url(url)
if url.starts_with("http://") || url.starts_with("https://") =>
{
@ -971,10 +1041,14 @@ impl Session {
url
)
}
AddTorrent::TorrentFileBytes(bytes) => {
torrent_from_bytes(&bytes).context("error decoding torrent")?
AddTorrent::TorrentFileBytes(bytes) => (
torrent_from_bytes(&bytes).context("error decoding torrent")?,
ByteBufOwned::from(bytes.into_owned()),
),
AddTorrent::TorrentInfo(t) => {
// TODO: this is lossy, as we don't store the bytes.
(*t, ByteBufOwned(Vec::new().into_boxed_slice()))
}
AddTorrent::TorrentInfo(t) => *t,
};
let trackers = torrent
@ -1004,30 +1078,25 @@ impl Session {
)?
};
(
torrent.info_hash,
torrent.info,
InternalAddResult {
info_hash: torrent.info_hash,
info: torrent.info,
torrent_bytes: Bytes::from(bytes.0),
trackers,
peer_rx,
opts.initial_peers
initial_peers: opts
.initial_peers
.clone()
.unwrap_or_default()
.into_iter()
.collect(),
)
}
}
};
self.main_torrent_info(
info_hash,
info,
trackers,
peer_rx,
initial_peers.into_iter().collect(),
opts,
)
.await
self.main_torrent_info(add_res, opts).await
}
.instrument(error_span!("add_torrent"))
.boxed()
}
@ -1060,13 +1129,18 @@ impl Session {
async fn main_torrent_info(
&self,
info_hash: Id20,
info: TorrentMetaV1Info<ByteBufOwned>,
trackers: Vec<String>,
peer_rx: Option<PeerStream>,
initial_peers: Vec<SocketAddr>,
add_res: InternalAddResult,
mut opts: AddTorrentOptions,
) -> anyhow::Result<AddTorrentResponse> {
let InternalAddResult {
info,
info_hash,
trackers,
peer_rx,
initial_peers,
torrent_bytes,
} = add_res;
debug!("Torrent info: {:#?}", &info);
let only_files = compute_only_files(
@ -1101,11 +1175,17 @@ impl Session {
only_files,
output_folder,
seen_peers: initial_peers,
torrent_bytes,
}));
}
let mut builder =
ManagedTorrentBuilder::new(info, info_hash, output_folder, storage_factory);
let mut builder = ManagedTorrentBuilder::new(
info,
info_hash,
torrent_bytes,
output_folder,
storage_factory,
);
builder
.allow_overwrite(opts.overwrite)
.spawner(self.spawner)
@ -1355,3 +1435,35 @@ impl tracker_comms::TorrentStatsProvider for PeerRxTorrentInfo {
}
}
}
#[cfg(test)]
mod tests {
use buffers::ByteBuf;
use itertools::Itertools;
use librqbit_core::torrent_metainfo::{torrent_from_bytes_ext, TorrentMetaV1};
use super::torrent_file_from_info_bytes;
#[test]
fn test_torrent_file_from_info_and_bytes() {
fn get_trackers(info: &TorrentMetaV1<ByteBuf>) -> Vec<String> {
info.iter_announce()
.filter_map(|t| std::str::from_utf8(t.as_ref()).ok().map(|t| t.to_owned()))
.collect_vec()
}
let orig_full_torrent =
include_bytes!("../resources/ubuntu-21.04-desktop-amd64.iso.torrent");
let parsed = torrent_from_bytes_ext::<ByteBuf>(&orig_full_torrent[..]).unwrap();
let parsed_trackers = get_trackers(&parsed.meta);
let generated_torrent =
torrent_file_from_info_bytes(parsed.info_bytes.as_ref(), &parsed_trackers).unwrap();
let generated_parsed =
torrent_from_bytes_ext::<ByteBuf>(generated_torrent.as_ref()).unwrap();
assert_eq!(parsed.meta.info_hash, generated_parsed.meta.info_hash);
assert_eq!(parsed.meta.info, generated_parsed.meta.info);
assert_eq!(parsed.info_bytes, generated_parsed.info_bytes);
assert_eq!(parsed_trackers, get_trackers(&generated_parsed.meta));
}
}

View file

@ -14,6 +14,7 @@ use std::time::Duration;
use anyhow::bail;
use anyhow::Context;
use buffers::ByteBufOwned;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use librqbit_core::hash_id::Id20;
@ -99,6 +100,7 @@ pub(crate) struct ManagedTorrentOptions {
pub struct ManagedTorrentInfo {
pub info: TorrentMetaV1Info<ByteBufOwned>,
pub torrent_bytes: Bytes,
pub info_hash: Id20,
pub(crate) spawner: BlockingSpawner,
pub trackers: HashSet<String>,
@ -501,6 +503,7 @@ pub(crate) struct ManagedTorrentBuilder {
info: TorrentMetaV1Info<ByteBufOwned>,
output_folder: PathBuf,
info_hash: Id20,
torrent_bytes: Bytes,
force_tracker_interval: Option<Duration>,
peer_connect_timeout: Option<Duration>,
peer_read_write_timeout: Option<Duration>,
@ -518,12 +521,14 @@ impl ManagedTorrentBuilder {
pub fn new(
info: TorrentMetaV1Info<ByteBufOwned>,
info_hash: Id20,
torrent_bytes: Bytes,
output_folder: PathBuf,
storage_factory: BoxStorageFactory,
) -> Self {
Self {
info,
info_hash,
torrent_bytes,
spawner: None,
force_tracker_interval: None,
peer_connect_timeout: None,
@ -608,6 +613,7 @@ impl ManagedTorrentBuilder {
span,
file_infos,
info: self.info,
torrent_bytes: self.torrent_bytes,
info_hash: self.info_hash,
trackers: self.trackers.into_iter().collect(),
spawner: self.spawner.unwrap_or_default(),

View file

@ -12,18 +12,37 @@ use crate::{hash_id::Id20, lengths::Lengths};
pub type TorrentMetaV1Borrowed<'a> = TorrentMetaV1<ByteBuf<'a>>;
pub type TorrentMetaV1Owned = TorrentMetaV1<ByteBufOwned>;
/// Parse torrent metainfo from bytes.
pub fn torrent_from_bytes<'de, BufType: Deserialize<'de>>(
pub struct ParsedTorrent<BufType> {
/// The parsed torrent.
pub meta: TorrentMetaV1<BufType>,
/// The raw bytes of the torrent's "info" dict.
pub info_bytes: BufType,
}
/// Parse torrent metainfo from bytes (includes additional fields).
pub fn torrent_from_bytes_ext<'de, BufType: Deserialize<'de> + From<&'de [u8]>>(
buf: &'de [u8],
) -> anyhow::Result<TorrentMetaV1<BufType>> {
) -> anyhow::Result<ParsedTorrent<BufType>> {
let mut de = BencodeDeserializer::new_from_buf(buf);
de.is_torrent_info = true;
let mut t = TorrentMetaV1::deserialize(&mut de)?;
t.info_hash = Id20::new(
de.torrent_info_digest
.ok_or_else(|| anyhow::anyhow!("programming error"))?,
);
Ok(t)
let (digest, info_bytes) = match (de.torrent_info_digest, de.torrent_info_bytes) {
(Some(digest), Some(info_bytes)) => (digest, info_bytes),
_ => anyhow::bail!("programming error"),
};
t.info_hash = Id20::new(digest);
Ok(ParsedTorrent {
meta: t,
info_bytes: BufType::from(info_bytes),
})
}
/// Parse torrent metainfo from bytes.
pub fn torrent_from_bytes<'de, BufType: Deserialize<'de> + From<&'de [u8]>>(
buf: &'de [u8],
) -> anyhow::Result<TorrentMetaV1<BufType>> {
torrent_from_bytes_ext(buf).map(|r| r.meta)
}
/// A parsed .torrent file.