diff --git a/Cargo.lock b/Cargo.lock index ae04cc1..af959e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -200,12 +200,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "atomic-waker" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" - [[package]] name = "autocfg" version = "1.3.0" @@ -401,6 +395,17 @@ dependencies = [ "generic-array 0.14.7", ] +[[package]] +name = "bstr" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40723b8fb387abc38f4f4a37c09073622e41dd12327033091ef8950659e6dc0c" +dependencies = [ + "memchr", + "regex-automata 0.4.7", + "serde", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -793,15 +798,6 @@ dependencies = [ "serde", ] -[[package]] -name = "encoding_rs" -version = "0.8.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" -dependencies = [ - "cfg-if", -] - [[package]] name = "equivalent" version = "1.0.1" @@ -1041,6 +1037,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "gethostname" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc3655aa6818d65bc620d6911f05aa7b6aeb596291e1e9f79e52df85583d1e30" +dependencies = [ + "rustix", + "windows-targets 0.52.6", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -1077,25 +1083,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "h2" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" -dependencies = [ - "atomic-waker", - "bytes", - "fnv", - "futures-core", - "futures-sink", - "http 1.1.0", - "indexmap 2.4.0", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -1275,7 +1262,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.3.26", + "h2", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -1298,7 +1285,6 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "httparse", @@ -1576,6 +1562,7 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", + "upnp-serve", "url", "urlencoding", "uuid", @@ -2284,6 +2271,15 @@ dependencies = [ "prost", ] +[[package]] +name = "quick-xml" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc" +dependencies = [ + "memchr", +] + [[package]] name = "quinn" version = "0.11.3" @@ -2458,10 +2454,8 @@ checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" dependencies = [ "base64 0.22.1", "bytes", - "encoding_rs", "futures-core", "futures-util", - "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "http-body-util", @@ -2485,7 +2479,6 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 1.0.1", - "system-configuration", "tokio", "tokio-native-tls", "tokio-rustls", @@ -2544,6 +2537,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "upnp-serve", ] [[package]] @@ -2827,6 +2821,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -3159,27 +3162,6 @@ dependencies = [ "futures-core", ] -[[package]] -name = "system-configuration" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bc6ee10a9b4fcf576e9b0819d95ec16f4d2c02d39fd83ac1c8789785c4a42" -dependencies = [ - "bitflags 2.6.0", - "core-foundation", - "system-configuration-sys", -] - -[[package]] -name = "system-configuration-sys" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "tap" version = "1.0.1" @@ -3295,7 +3277,9 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "tracing", @@ -3405,7 +3389,7 @@ dependencies = [ "axum 0.6.20", "base64 0.21.7", "bytes", - "h2 0.3.26", + "h2", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", @@ -3605,6 +3589,32 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "upnp-serve" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum 0.7.5", + "bstr", + "gethostname", + "http 1.1.0", + "httparse", + "librqbit-core", + "librqbit-sha1-wrapper", + "librqbit-upnp", + "mime_guess", + "parking_lot", + "quick-xml", + "reqwest", + "tokio", + "tokio-util", + "tower-http", + "tracing", + "tracing-subscriber", + "url", + "uuid", +] + [[package]] name = "url" version = "2.5.2" diff --git a/Cargo.toml b/Cargo.toml index 70f255f..33f5d8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ "crates/peer_binary_protocol", "crates/dht", "crates/upnp", - "crates/tracker_comms", + "crates/tracker_comms", "crates/upnp-serve", ] [profile.dev] diff --git a/Makefile b/Makefile index 34b7dc8..8849f30 100644 --- a/Makefile +++ b/Makefile @@ -20,8 +20,10 @@ devserver: echo -n '' > /tmp/rqbit-log && CORS_ALLOW_REGEXP=".*" \ cargo run -- \ --log-file /tmp/rqbit-log \ - --log-file-rust-log=debug,librqbit=trace \ + --log-file-rust-log=debug,librqbit=trace,upnp_serve=trace \ --http-api-listen-addr 0.0.0.0:3030 \ + --upnp-server-hostname 192.168.0.112 \ + --upnp-server-friendly-name rqbit-dev \ server start /tmp/scratch/ @PHONY: devserver diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index dabfe66..8c1d329 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -14,6 +14,7 @@ readme = "README.md" [features] default = ["default-tls"] http-api = ["axum", "tower-http"] +upnp-serve-adapter = ["upnp-serve"] webui = [] timed_existence = [] default-tls = ["reqwest/default-tls"] @@ -38,6 +39,7 @@ peer_binary_protocol = { path = "../peer_binary_protocol", package = "librqbit-p sha1w = { path = "../sha1w", default-features = false, package = "librqbit-sha1-wrapper", version = "3.0.0" } dht = { path = "../dht", package = "librqbit-dht", version = "5.1.0" } librqbit-upnp = { path = "../upnp", version = "0.1.1" } +upnp-serve = { path = "../upnp-serve", version = "0.1.0", optional = true } tokio = { version = "1", features = [ "macros", @@ -80,7 +82,7 @@ backoff = "0.4.0" dashmap = "5.5.3" base64 = "0.21.5" serde_with = "3.4.0" -tokio-util = "0.7.10" +tokio-util = { version = "0.7.10", features = ["io"] } bytes = "1.5.0" rlimit = "0.10.1" async-stream = "0.3.5" diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 7e17445..1c5eb85 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,6 +1,6 @@ use anyhow::Context; use axum::body::Bytes; -use axum::extract::{Path, Query, State}; +use axum::extract::{ConnectInfo, Path, Query, Request, State}; use axum::response::IntoResponse; use axum::routing::{get, post}; use bencode::AsDisplay; @@ -16,7 +16,8 @@ use std::net::SocketAddr; use std::str::FromStr; use std::time::Duration; use tokio::io::AsyncSeekExt; -use tracing::{debug, info, trace}; +use tokio::net::TcpListener; +use tracing::{debug, error_span, trace}; use axum::Router; @@ -52,7 +53,11 @@ impl HttpApi { /// Run the HTTP server forever on the given address. /// If read_only is passed, no state-modifying methods will be exposed. #[inline(never)] - pub fn make_http_api_and_run(self, addr: SocketAddr) -> BoxFuture<'static, anyhow::Result<()>> { + pub fn make_http_api_and_run( + self, + listener: TcpListener, + upnp_router: Option, + ) -> BoxFuture<'static, anyhow::Result<()>> { let state = self.inner; async fn api_root() -> impl IntoResponse { @@ -558,22 +563,33 @@ impl HttpApi { .allow_headers(AllowHeaders::any()) }; + let mut app = app.with_state(state); + + if let Some(upnp_router) = upnp_router { + app = app.nest("/upnp", upnp_router); + } + let app = app .layer(cors_layer) - .layer(tower_http::trace::TraceLayer::new_for_http()) - .with_state(state) - .into_make_service(); - - info!(%addr, "starting HTTP server"); - - use tokio::net::TcpListener; + .layer( + tower_http::trace::TraceLayer::new_for_http().make_span_with(|req: &Request| { + let method = req.method(); + let uri = req.uri(); + if let Some(ConnectInfo(addr)) = + req.extensions().get::>() + { + error_span!("request", %method, %uri, %addr) + } else { + error_span!("request", %method, %uri) + } + }), + ) + .into_make_service_with_connect_info::(); async move { - let listener = TcpListener::bind(&addr) + axum::serve(listener, app) .await - .with_context(|| format!("error binding to {addr}"))?; - axum::serve(listener, app).await?; - Ok(()) + .context("error running HTTP API") } .boxed() } diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 6b67b4e..110781d 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -65,6 +65,8 @@ mod torrent_state; #[cfg(feature = "tracing-subscriber-utils")] pub mod tracing_subscriber_config_utils; mod type_aliases; +#[cfg(all(feature = "http-api", feature = "upnp-serve-adapter"))] +pub mod upnp_server_adapter; pub use api::Api; pub use api_error::ApiError; diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index 051d773..35a3ae6 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -6,7 +6,6 @@ use std::{ }; use anyhow::{bail, Context}; -use axum::{response::IntoResponse, routing::get, Router}; use librqbit_core::Id20; use parking_lot::RwLock; use rand::{thread_rng, Rng, RngCore, SeedableRng}; @@ -96,7 +95,9 @@ impl TestPeerMetadata { } } +#[cfg(feature = "http-api")] async fn debug_server() -> anyhow::Result<()> { + use axum::{response::IntoResponse, routing::get, Router}; async fn backtraces() -> impl IntoResponse { #[cfg(feature = "async-bt")] { @@ -127,6 +128,11 @@ async fn debug_server() -> anyhow::Result<()> { Ok(()) } +#[cfg(not(feature = "http-api"))] +async fn debug_server() -> anyhow::Result<()> { + Ok(()) +} + pub fn spawn_debug_server() { tokio::spawn(debug_server()); } diff --git a/crates/librqbit/src/upnp_server_adapter.rs b/crates/librqbit/src/upnp_server_adapter.rs new file mode 100644 index 0000000..cf188e0 --- /dev/null +++ b/crates/librqbit/src/upnp_server_adapter.rs @@ -0,0 +1,589 @@ +use std::{ + collections::{ + hash_map::Entry::{Occupied, Vacant}, + HashMap, + }, + sync::Arc, +}; + +use crate::{session::TorrentId, ManagedTorrent, Session}; + +#[derive(Clone)] +pub struct UpnpServerSessionAdapter { + session: Arc, + hostname: String, + port: u16, +} + +use anyhow::Context; +use buffers::ByteBufOwned; +use itertools::Itertools; +use librqbit_core::torrent_metainfo::TorrentMetaV1Info; +use tracing::{debug, trace, warn}; +use upnp_serve::{ + upnp_types::content_directory::{ + response::{Container, Item, ItemOrContainer}, + ContentDirectoryBrowseProvider, + }, + UpnpServer, UpnpServerOptions, +}; + +#[derive(Debug, PartialEq, Eq)] +struct TorrentFileTreeNode { + title: String, + parent_id: Option, + children: Vec, + + real_torrent_file_id: Option, +} + +fn encode_id(local_id: usize, torrent_id: usize) -> usize { + (local_id << 16) | (torrent_id + 1) +} + +fn decode_id(id: usize) -> anyhow::Result<(usize, usize)> { + let torrent_id = id & 0xffff; + if torrent_id == 0 { + anyhow::bail!("invalid id") + } + let torrent_id = torrent_id - 1; + Ok((id >> 16, torrent_id)) +} + +impl TorrentFileTreeNode { + fn as_item_or_container( + &self, + id: usize, + torrent: &ManagedTorrent, + adapter: &UpnpServerSessionAdapter, + ) -> ItemOrContainer { + let encoded_id = encode_id(id, torrent.id()); + let encoded_parent_id = self.parent_id.map(|p| encode_id(p, torrent.id())); + match self.real_torrent_file_id { + Some(fid) => { + let filename = &torrent.shared().file_infos[fid].relative_filename; + let last_url_bit = filename.to_str().unwrap_or(&self.title); + return ItemOrContainer::Item(Item { + id: encoded_id, + parent_id: encoded_parent_id, + title: self.title.clone(), + mime_type: mime_guess::from_path( + &torrent.shared().file_infos[fid].relative_filename, + ) + .first(), + url: format!( + "http://{}:{}/torrents/{}/stream/{}/{}", + adapter.hostname, + adapter.port, + torrent.id(), + fid, + last_url_bit + ), + }); + } + None => ItemOrContainer::Container(Container { + id: encoded_id, + parent_id: encoded_parent_id, + title: self.title.clone(), + children_count: Some(self.children.len()), + }), + } + } +} + +struct TorrentFileTree { + // root id is 0 + nodes: Vec, +} + +fn is_single_file_at_root(info: &TorrentMetaV1Info) -> bool { + info.iter_filenames_and_lengths() + .into_iter() + .flatten() + .flat_map(|(f, _)| f.iter_components()) + .nth(1) + .is_none() +} + +impl TorrentFileTree { + fn build(torent_id: TorrentId, info: &TorrentMetaV1Info) -> anyhow::Result { + if is_single_file_at_root(info) { + let filename = info + .iter_filenames_and_lengths()? + .next() + .context("bug")? + .0 + .iter_components() + .last() + .context("bug")??; + let root_node = TorrentFileTreeNode { + title: filename.to_owned(), + parent_id: None, + children: vec![], + real_torrent_file_id: Some(0), + }; + return Ok(TorrentFileTree { + nodes: vec![root_node], + }); + } + + let root_node = TorrentFileTreeNode { + title: match info.name.as_ref() { + Some(n) => std::str::from_utf8(n)?.to_owned(), + None => { + format!("torrent {}", torent_id) + } + }, + parent_id: None, + children: vec![], + real_torrent_file_id: None, + }; + + let mut tree = TorrentFileTree { + nodes: vec![root_node], + }; + + let mut name_cache = HashMap::new(); + + for (fid, (fi, _)) in info.iter_filenames_and_lengths()?.enumerate() { + let components = match fi.to_vec() { + Ok(v) => v, + Err(_) => continue, + }; + let mut parent_id = 0; + let mut it = components.iter().peekable(); + while let Some(component) = it.next() { + let is_last = it.peek().is_none(); + if is_last { + let current_id = tree.nodes.len(); + let node = TorrentFileTreeNode { + title: component.clone(), + parent_id: Some(parent_id), + children: vec![], + real_torrent_file_id: Some(fid), + }; + tree.nodes.push(node); + tree.nodes[parent_id].children.push(current_id); + break; + } + + parent_id = match name_cache.entry((parent_id, component.clone())) { + Occupied(occ) => *occ.get(), + Vacant(vac) => { + let id = tree.nodes.len(); + let node = TorrentFileTreeNode { + title: component.clone(), + parent_id: Some(parent_id), + children: vec![], + real_torrent_file_id: None, + }; + tree.nodes.push(node); + tree.nodes[parent_id].children.push(id); + vac.insert(id); + id + } + }; + } + } + + Ok(tree) + } +} + +impl UpnpServerSessionAdapter { + fn build_root(&self) -> Vec { + let mut all = self + .session + .with_torrents(|torrents| torrents.map(|(_, t)| t.clone()).collect_vec()); + + all.sort_unstable_by_key(|t| t.id()); + + all.iter() + .filter_map(|t| { + let real_id = t.id(); + let upnp_id = real_id + 1; + + if is_single_file_at_root(&t.shared().info) { + // Just add the file directly + let rf = &t.shared().file_infos[0].relative_filename; + let title = rf.file_name()?.to_str()?.to_owned(); + let mime_type = mime_guess::from_path(rf).first(); + let url = format!( + "http://{}:{}/torrents/{real_id}/stream/0/{title}", + self.hostname, self.port + ); + Some(ItemOrContainer::Item(Item { + id: upnp_id, + parent_id: None, + title, + mime_type, + url, + })) + } else { + let title = t + .shared() + .info + .name + .as_ref() + .and_then(|b| std::str::from_utf8(&b.0).ok()) + .map(|n| n.to_owned()) + .unwrap_or_else(|| format!("torrent {real_id}")); + + // Create a folder + Some(ItemOrContainer::Container(Container { + id: upnp_id, + parent_id: None, + title, + children_count: None, + })) + } + }) + .collect_vec() + } +} + +impl ContentDirectoryBrowseProvider for UpnpServerSessionAdapter { + fn browse_direct_children(&self, parent_id: usize) -> Vec { + if parent_id == 0 { + return self.build_root(); + } + + let (node_id, torrent_id) = match decode_id(parent_id) { + Ok((node_id, torrent_id)) => (node_id, torrent_id), + Err(_) => { + debug!(id=?parent_id, "invalid id"); + return vec![]; + } + }; + trace!(parent_id, node_id, torrent_id); + + let torrent = match self.session.get(torrent_id.into()) { + Some(t) => t, + None => { + warn!(torrent_id, "no such torrent"); + return vec![]; + } + }; + + let tree = match TorrentFileTree::build(torrent.id(), &torrent.shared().info) { + Ok(tree) => tree, + Err(e) => { + warn!(parent_id, error=?e, "error building torrent file tree"); + return vec![]; + } + }; + + let node = match tree.nodes.get(node_id) { + Some(n) => n, + None => { + warn!(torrent_id, node_id, "no such internal ID in torrent"); + return vec![]; + } + }; + + trace!(node_id, torrent_id, ?node); + + let mut result = Vec::new(); + + if node.real_torrent_file_id.is_some() { + result.push(node.as_item_or_container(node_id, &torrent, self)) + } else { + for (child_node_id, child_node) in node + .children + .iter() + .filter_map(|id| Some((*id, tree.nodes.get(*id)?))) + { + result.push(child_node.as_item_or_container(child_node_id, &torrent, self)); + } + }; + + result + } +} + +impl Session { + pub async fn make_upnp_adapter( + self: &Arc, + friendly_name: String, + http_hostname: String, + http_listen_port: u16, + ) -> anyhow::Result { + UpnpServer::new(UpnpServerOptions { + friendly_name, + http_hostname: http_hostname.clone(), + http_listen_port, + http_prefix: "/upnp".to_owned(), + browse_provider: Box::new(UpnpServerSessionAdapter { + session: self.clone(), + hostname: http_hostname, + port: http_listen_port, + }), + cancellation_token: self.cancellation_token().child_token(), + }) + .await + .context("error creating upnp adapter") + } +} + +#[cfg(test)] +mod tests { + use bencode::bencode_serialize_to_writer; + use bytes::Bytes; + use dht::Id20; + use librqbit_core::torrent_metainfo::{ + TorrentMetaV1File, TorrentMetaV1Info, TorrentMetaV1Owned, + }; + use tempfile::TempDir; + use upnp_serve::upnp_types::content_directory::{ + response::{Container, Item, ItemOrContainer}, + ContentDirectoryBrowseProvider, + }; + + use crate::{ + tests::test_util::setup_test_logging, + upnp_server_adapter::{ + decode_id, encode_id, TorrentFileTree, TorrentFileTreeNode, UpnpServerSessionAdapter, + }, + AddTorrent, AddTorrentOptions, Session, SessionOptions, + }; + + fn create_torrent(name: Option<&str>, files: &[&str]) -> TorrentMetaV1Owned { + TorrentMetaV1Owned { + announce: None, + announce_list: vec![], + info: TorrentMetaV1Info { + name: name.map(|n| n.as_bytes().into()), + pieces: b""[..].into(), + piece_length: 1, + length: None, + md5sum: None, + files: Some( + files + .iter() + .map(|f| TorrentMetaV1File { + length: 1, + path: f.split("/").map(|f| f.as_bytes().into()).collect(), + }) + .collect(), + ), + }, + comment: None, + created_by: None, + encoding: None, + publisher: None, + publisher_url: None, + creation_date: None, + info_hash: Id20::default(), + } + } + + #[test] + fn test_torrent_file_tree_single() -> anyhow::Result<()> { + let t = create_torrent(Some("test t"), &["file0"]); + let tree = TorrentFileTree::build(0, &t.info)?; + assert_eq!( + &tree.nodes, + &[TorrentFileTreeNode { + children: vec![], + parent_id: None, + real_torrent_file_id: Some(0), + title: "file0".into() + }] + ); + + Ok(()) + } + + #[test] + fn test_torrent_file_tree_flat() -> anyhow::Result<()> { + let t = create_torrent(Some("test t"), &["file0", "file1"]); + let tree = TorrentFileTree::build(0, &t.info)?; + assert_eq!( + &tree.nodes, + &[ + TorrentFileTreeNode { + children: vec![1, 2], + parent_id: None, + real_torrent_file_id: None, + title: "test t".into() + }, + TorrentFileTreeNode { + children: vec![], + parent_id: Some(0), + real_torrent_file_id: Some(0), + title: "file0".into() + }, + TorrentFileTreeNode { + children: vec![], + parent_id: Some(0), + real_torrent_file_id: Some(1), + title: "file1".into() + } + ] + ); + + Ok(()) + } + + #[test] + fn test_torrent_file_tree_nested() -> anyhow::Result<()> { + let t = create_torrent( + Some("test t"), + &["file0", "file1", "dir0/file2", "dir0/dir1/file3"], + ); + let tree = TorrentFileTree::build(0, &t.info)?; + assert_eq!( + &tree.nodes, + &[ + TorrentFileTreeNode { + children: vec![1, 2, 3], + parent_id: None, + real_torrent_file_id: None, + title: "test t".into() + }, + TorrentFileTreeNode { + children: vec![], + parent_id: Some(0), + real_torrent_file_id: Some(0), + title: "file0".into() + }, + TorrentFileTreeNode { + children: vec![], + parent_id: Some(0), + real_torrent_file_id: Some(1), + title: "file1".into() + }, + TorrentFileTreeNode { + children: vec![4, 5], + parent_id: Some(0), + real_torrent_file_id: None, + title: "dir0".into() + }, + TorrentFileTreeNode { + children: vec![], + parent_id: Some(3), + real_torrent_file_id: Some(2), + title: "file2".into() + }, + TorrentFileTreeNode { + children: vec![6], + parent_id: Some(3), + real_torrent_file_id: None, + title: "dir1".into() + }, + TorrentFileTreeNode { + children: vec![], + parent_id: Some(5), + real_torrent_file_id: Some(3), + title: "file3".into() + }, + ] + ); + + Ok(()) + } + + #[tokio::test] + async fn test_browse_direct_children() { + setup_test_logging(); + + let t1 = create_torrent(Some("t1"), &["f1"]); + let t2 = create_torrent(Some("t2"), &["d1/f2"]); + + fn as_bytes(t: &TorrentMetaV1Owned) -> Bytes { + let mut b = Vec::new(); + bencode_serialize_to_writer(t, &mut b).unwrap(); + b.into() + } + + let td = TempDir::new().unwrap(); + let session = Session::new_with_opts( + td.path().to_owned(), + SessionOptions { + disable_dht: true, + ..Default::default() + }, + ) + .await + .unwrap(); + + session + .add_torrent( + AddTorrent::from_bytes(as_bytes(&t1)), + Some(AddTorrentOptions { + paused: true, + ..Default::default() + }), + ) + .await + .unwrap(); + session + .add_torrent( + AddTorrent::from_bytes(as_bytes(&t2)), + Some(AddTorrentOptions { + paused: true, + ..Default::default() + }), + ) + .await + .unwrap(); + + let adapter = UpnpServerSessionAdapter { + session, + hostname: "127.0.0.1".into(), + port: 9005, + }; + + assert_eq!( + adapter.browse_direct_children(0), + vec![ + ItemOrContainer::Item(Item { + id: encode_id(0, 0), + parent_id: None, + title: "f1".into(), + mime_type: None, + url: "http://127.0.0.1:9005/torrents/0/stream/0/f1".into() + }), + ItemOrContainer::Container(Container { + id: encode_id(0, 1), + parent_id: None, + children_count: None, + title: "t2".into() + }) + ] + ); + + assert_eq!( + adapter.browse_direct_children(encode_id(0, 1)), + vec![ItemOrContainer::Container(Container { + id: encode_id(1, 1), + parent_id: Some(encode_id(0, 1)), + children_count: Some(1), + title: "d1".into() + }),] + ); + + assert_eq!( + adapter.browse_direct_children(encode_id(1, 1)), + vec![ItemOrContainer::Item(Item { + id: encode_id(2, 1), + parent_id: Some(encode_id(1, 1)), + title: "f2".into(), + mime_type: None, + url: "http://127.0.0.1:9005/torrents/1/stream/0/d1/f2".into() + })] + ); + } + + #[test] + fn test_encode_id() { + for local_id in 0..5 { + for torrent_id in 0..5 { + let encoded = encode_id(local_id, torrent_id); + let (decoded_local_id, decoded_torrent_id) = decode_id(encoded).unwrap(); + assert_eq!(local_id, decoded_local_id); + assert_eq!(torrent_id, decoded_torrent_id); + } + } + } +} diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index c8cdbfd..7495d7a 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -26,6 +26,7 @@ postgres = ["librqbit/postgres"] librqbit = { path = "../librqbit", default-features = false, features = [ "http-api", "tracing-subscriber-utils", + "upnp-serve-adapter", ], version = "7.0.0-beta.2" } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } console-subscriber = { version = "0.2", optional = true } @@ -43,6 +44,7 @@ serde_json = "1" size_format = "1" bytes = "1.5.0" openssl = { version = "0.10", features = ["vendored"], optional = true } +upnp-serve = { path = "../upnp-serve" } [dev-dependencies] futures = { version = "0.3" } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 90d50f3..d63aeec 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -1,6 +1,6 @@ use std::{io, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; -use anyhow::Context; +use anyhow::{bail, Context}; use clap::{CommandFactory, Parser, ValueEnum}; use clap_complete::Shell; use librqbit::{ @@ -16,6 +16,7 @@ use librqbit::{ PeerConnectionOptions, Session, SessionOptions, SessionPersistenceConfig, TorrentStatsState, }; use size_format::SizeFormatterBinary as SF; +use tokio::net::TcpListener; use tracing::{error, error_span, info, trace_span, warn}; #[derive(Debug, Clone, Copy, ValueEnum)] @@ -95,6 +96,15 @@ struct Opts { #[arg(long = "disable-upnp")] disable_upnp: bool, + /// If set, will run a UPNP Media server and stream all the torrents through it. + /// Should be set to your hostname/IP as seen by your LAN neighbors. + #[arg(long = "upnp-server-hostname")] + upnp_server_hostname: Option, + + /// UPNP server name that would be displayed on devices in your network. + #[arg(long = "upnp-server-friendly-name")] + upnp_server_friendly_name: Option, + #[command(subcommand)] subcommand: SubCommand, @@ -437,6 +447,28 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { trace_span!("stats_printer"), stats_printer(session.clone()), ); + + let mut upnp_server = { + match opts.upnp_server_hostname { + Some(hn) => { + if opts.http_api_listen_addr.ip().is_loopback() { + bail!("cannot enable UPNP server as HTTP API listen addr is localhost. Change --http-api-listen-addr to start with 0.0.0.0"); + } + let server = session + .make_upnp_adapter( + opts.upnp_server_friendly_name + .unwrap_or_else(|| format!("rqbit at {hn}")), + hn, + opts.http_api_listen_addr.port(), + ) + .await + .context("error starting UPNP server")?; + Some(server) + } + None => None, + } + }; + let api = Api::new( session, Some(log_config.rust_log_reload_tx), @@ -444,10 +476,31 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { ); let http_api = HttpApi::new(api, Some(HttpApiOptions { read_only: false })); let http_api_listen_addr = opts.http_api_listen_addr; - http_api - .make_http_api_and_run(http_api_listen_addr) + + info!("starting HTTP API at http://{http_api_listen_addr}"); + let tcp_listener = TcpListener::bind(http_api_listen_addr) .await - .context("error running HTTP API") + .with_context(|| format!("error binding to {http_api_listen_addr}"))?; + + let upnp_router = upnp_server.as_mut().and_then(|s| s.take_router().ok()); + let http_api_fut = http_api.make_http_api_and_run(tcp_listener, upnp_router); + + let res = match upnp_server { + Some(srv) => { + let upnp_fut = srv.run_ssdp_forever(); + + tokio::pin!(http_api_fut); + tokio::pin!(upnp_fut); + + tokio::select! { + r = &mut http_api_fut => r, + r = &mut upnp_fut => r + } + } + None => http_api_fut.await, + }; + + res.context("error running rqbit server") } }, SubCommand::Download(download_opts) => { @@ -534,10 +587,16 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { ); let http_api = HttpApi::new(api, Some(HttpApiOptions { read_only: true })); let http_api_listen_addr = opts.http_api_listen_addr; + + info!("starting HTTP API at http://{http_api_listen_addr}"); + let listener = tokio::net::TcpListener::bind(opts.http_api_listen_addr) + .await + .with_context(|| format!("error binding to {http_api_listen_addr}"))?; + librqbit_spawn( "http_api", error_span!("http_api"), - http_api.make_http_api_and_run(http_api_listen_addr), + http_api.make_http_api_and_run(listener, None), ); let mut added = false; diff --git a/crates/upnp-serve/Cargo.toml b/crates/upnp-serve/Cargo.toml new file mode 100644 index 0000000..67f6ea7 --- /dev/null +++ b/crates/upnp-serve/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "upnp-serve" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.86" +axum = { version = "0.7.5", features = ["tokio"] } +tokio = { version = "1.39.3", features = ["full"] } +tracing = "0.1.40" +bstr = "1.10.0" +quick-xml = "0.36.1" +http = "1.1.0" +httparse = "1.9.4" +uuid = { version = "1.10.0", features = ["v4"] } +librqbit-upnp = { path = "../upnp" } +gethostname = "0.5.0" +librqbit-sha1-wrapper = { path = "../sha1w" } +librqbit-core = { path = "../librqbit_core" } +mime_guess = "2.0.5" +url = "2.5.2" +parking_lot = "0.12.3" +tokio-util = "0.7.11" +reqwest = { version = "0.12.7", default-features = false } + +[dev-dependencies] +tracing-subscriber = "0.3.18" +tower-http = { version = "0.5", features = ["trace"] } + +[[example]] +name = "upnp-stub-server" diff --git a/crates/upnp-serve/Makefile b/crates/upnp-serve/Makefile new file mode 100644 index 0000000..0a4f40c --- /dev/null +++ b/crates/upnp-serve/Makefile @@ -0,0 +1,17 @@ +all: release-linux-armv7-musl + +@PHONY: release-linux-current-target +release-linux-current-target: + CC_$(TARGET_SNAKE_CASE)=$(CROSS_COMPILE_PREFIX)-gcc \ + CXX_$(TARGET_SNAKE_CASE)=$(CROSS_COMPILE_PREFIX)-g++ \ + AR_$(TARGET_SNAKE_CASE)=$(CROSS_COMPILE_PREFIX)-ar \ + CARGO_TARGET_$(TARGET_SNAKE_UPPER_CASE)_LINKER=$(CROSS_COMPILE_PREFIX)-gcc \ + cargo build --target=$(TARGET) + +@PHONY: release-linux-armv7-musl +release-linux-armv7-musl: + TARGET=armv7-unknown-linux-musleabihf \ + TARGET_SNAKE_CASE=armv7_unknown_linux_musleabihf \ + TARGET_SNAKE_UPPER_CASE=ARMV7_UNKNOWN_LINUX_MUSLEABIHF \ + CROSS_COMPILE_PREFIX=armv7-linux-musleabihf \ + $(MAKE) release-linux-current-target diff --git a/crates/upnp-serve/examples/upnp-stub-server.rs b/crates/upnp-serve/examples/upnp-stub-server.rs new file mode 100644 index 0000000..0663d68 --- /dev/null +++ b/crates/upnp-serve/examples/upnp-stub-server.rs @@ -0,0 +1,74 @@ +use std::{ + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + str::FromStr, +}; + +use anyhow::Context; +use axum::routing::get; +use mime_guess::Mime; +use tracing::{error, info}; +use upnp_serve::{ + upnp_types::content_directory::response::{Item, ItemOrContainer}, + UpnpServer, UpnpServerOptions, +}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "trace"); + } + + tracing_subscriber::fmt::init(); + + let items: Vec = vec![ItemOrContainer::Item(Item { + title: "Example".to_owned(), + mime_type: Some(Mime::from_str("video/x-matroska")?), + url: "http://192.168.0.165:3030/torrents/4/stream/0/file.mkv".to_owned(), + id: 1, + parent_id: Some(0), + })]; + + const HTTP_PORT: u16 = 9005; + const HTTP_PREFIX: &str = "/upnp"; + + info!("Creating UpnpServer"); + let mut server = UpnpServer::new(UpnpServerOptions { + friendly_name: "demo upnp server".to_owned(), + http_hostname: std::env::var("UPNP_HOSTNAME") + .context("you need to set UPNP_HOSTNAME to your IP visible from LAN")?, + http_listen_port: HTTP_PORT, + http_prefix: HTTP_PREFIX.to_owned(), + browse_provider: Box::new(items), + cancellation_token: Default::default(), + }) + .await?; + + let app = axum::Router::new() + .route("/", get(|| async { "hello world" })) + .nest(HTTP_PREFIX, server.take_router()?) + .layer(tower_http::trace::TraceLayer::new_for_http()) + .into_make_service_with_connect_info::(); + + use tokio::net::TcpListener; + + let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, HTTP_PORT); + + info!(?addr, "Binding TcpListener"); + let listener = TcpListener::bind(addr) + .await + .with_context(|| format!("error binding to {addr}"))?; + + tokio::spawn(async move { + let res = axum::serve(listener, app).await; + error!(error=?res, "error running HTTP server"); + }); + + info!("Running SSDP"); + server + .run_ssdp_forever() + .await + .context("error running SSDP")?; + + error!("Unreachable"); + Ok(()) +} diff --git a/crates/upnp-serve/src/constants.rs b/crates/upnp-serve/src/constants.rs new file mode 100644 index 0000000..ad43f98 --- /dev/null +++ b/crates/upnp-serve/src/constants.rs @@ -0,0 +1,9 @@ +pub const UPNP_KIND_ROOT_DEVICE: &str = "upnp:rootdevice"; +pub const UPNP_KIND_MEDIASERVER: &str = "urn:schemas-upnp-org:device:MediaServer:1"; + +pub const SOAP_ACTION_CONTENT_DIRECTORY_BROWSE: &[u8] = + b"\"urn:schemas-upnp-org:service:ContentDirectory:1#Browse\""; +pub const SOAP_ACTION_GET_SYSTEM_UPDATE_ID: &[u8] = + b"\"urn:schemas-upnp-org:service:ContentDirectory:1#GetSystemUpdateID\""; + +pub const CONTENT_TYPE_XML_UTF8: &str = "text/xml; charset=\"utf-8\""; diff --git a/crates/upnp-serve/src/http_handlers.rs b/crates/upnp-serve/src/http_handlers.rs new file mode 100644 index 0000000..41078a2 --- /dev/null +++ b/crates/upnp-serve/src/http_handlers.rs @@ -0,0 +1,214 @@ +use std::{sync::atomic::Ordering, time::Duration}; + +use anyhow::Context; +use axum::{ + body::Bytes, + extract::State, + handler::HandlerWithoutStateExt, + response::IntoResponse, + routing::{get, post}, +}; +use bstr::BStr; +use http::{header::CONTENT_TYPE, HeaderMap, HeaderName, StatusCode}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, trace, warn}; + +use crate::{ + constants::{ + CONTENT_TYPE_XML_UTF8, SOAP_ACTION_CONTENT_DIRECTORY_BROWSE, + SOAP_ACTION_GET_SYSTEM_UPDATE_ID, + }, + state::{UnpnServerState, UpnpServerStateInner}, + templates::{ + render_content_directory_browse, render_content_directory_control_get_system_update_id, + render_root_description_xml, RootDescriptionInputs, + }, + upnp_types::content_directory::{ + request::ContentDirectoryControlRequest, ContentDirectoryBrowseProvider, + }, +}; + +async fn description_xml(State(state): State) -> impl IntoResponse { + state.rendered_root_description.clone() +} + +async fn generate_content_directory_control_response( + headers: HeaderMap, + State(state): State, + body: Bytes, +) -> impl IntoResponse { + let body = BStr::new(&body); + let action = headers.get("soapaction").map(|v| BStr::new(v.as_bytes())); + trace!(?body, ?action, "received control request"); + let action = match action { + Some(action) => action, + None => { + debug!("missing SOAPACTION header"); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + match action.as_ref() { + SOAP_ACTION_CONTENT_DIRECTORY_BROWSE => { + let body = match std::str::from_utf8(body) { + Ok(body) => body, + Err(_) => return (StatusCode::BAD_REQUEST, "cannot parse request").into_response(), + }; + + let request = match ContentDirectoryControlRequest::parse(body) { + Ok(req) => req, + Err(e) => { + debug!(error=?e, "error parsing XML"); + return (StatusCode::BAD_REQUEST, "cannot parse request").into_response(); + } + }; + + ( + [(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)], + render_content_directory_browse( + state.provider.browse_direct_children(request.object_id), + ), + ) + .into_response() + } + SOAP_ACTION_GET_SYSTEM_UPDATE_ID => { + let update_id = state.system_update_id.load(Ordering::Relaxed); + ( + [(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)], + render_content_directory_control_get_system_update_id(update_id), + ) + .into_response() + } + _ => { + debug!(?action, "unsupported ContentDirectory action"); + (StatusCode::NOT_IMPLEMENTED, "").into_response() + } + } +} + +async fn subscription( + State(state): State, + request: axum::extract::Request, +) -> impl IntoResponse { + if request.method().as_str() != "SUBSCRIBE" { + return (StatusCode::METHOD_NOT_ALLOWED, "").into_response(); + } + + let (parts, _body) = request.into_parts(); + trace!(?parts.headers, "subscription request"); + let is_event = parts + .headers + .get(HeaderName::from_static("nt")) + .map(|v| v.as_bytes() == b"upnp:event") + .unwrap_or_default(); + if !is_event { + return (StatusCode::BAD_REQUEST, "expected NT: upnp:event header").into_response(); + } + + let callback = parts + .headers + .get(HeaderName::from_static("callback")) + .and_then(|v| v.to_str().ok()) + .map(|s| s.trim_matches(|c| c == '>' || c == '<')) + .and_then(|u| url::Url::parse(u).ok()); + let callback = match callback { + Some(c) => c, + None => return (StatusCode::BAD_REQUEST, "callback not provided").into_response(), + }; + let subscription_id = parts + .headers + .get(HeaderName::from_static("sid")) + .and_then(|v| v.to_str().ok()); + + let timeout = parts + .headers + .get(HeaderName::from_static("timeout")) + .and_then(|v| v.to_str().ok()) + .and_then(|t| t.strip_prefix("Second-")) + .and_then(|t| t.parse::().ok()) + .map(|t| Duration::from_secs(t as u64)); + + const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800); + + let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT); + + if let Some(sid) = subscription_id { + match state.renew_subscription(sid, timeout) { + Ok(()) => ( + StatusCode::OK, + [ + ("SID", sid.to_owned()), + ("TIMEOUT", format!("Second-{}", timeout.as_secs())), + ], + ) + .into_response(), + Err(e) => { + warn!(sid, error=?e, "error renewing subscription"); + StatusCode::NOT_FOUND.into_response() + } + } + } else { + match state.new_subscription(callback, timeout) { + Ok(sid) => ( + StatusCode::OK, + [ + ("SID", sid), + ("TIMEOUT", format!("Second-{}", timeout.as_secs())), + ], + ) + .into_response(), + Err(e) => { + warn!(error=?e, "error creating subscription"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } + } +} + +pub fn make_router( + friendly_name: String, + http_prefix: String, + upnp_usn: String, + browse_provider: Box, + cancellation_token: CancellationToken, +) -> anyhow::Result { + let root_desc = render_root_description_xml(&RootDescriptionInputs { + friendly_name: &friendly_name, + manufacturer: "rqbit developers", + model_name: "1.0.0", + unique_id: &upnp_usn, + http_prefix: &http_prefix, + }); + + let state = UpnpServerStateInner::new(root_desc.into(), browse_provider, cancellation_token) + .context("error creating UPNP server")?; + + let sub_handler = { + let state = state.clone(); + move |request: axum::extract::Request| async move { + subscription(State(state.clone()), request).await + } + }; + + let app = axum::Router::new() + .route("/description.xml", get(description_xml)) + .route( + "/scpd/ContentDirectory.xml", + get(|| async { include_str!("resources/scpd_content_directory.xml") }), + ) + .route( + "/scpd/ConnectionManager.xml", + get(|| async { include_str!("resources/scpd_connection_manager.xml") }), + ) + .route( + "/control/ContentDirectory", + post(generate_content_directory_control_response), + ) + .route( + "/control/ConnectionManager", + post(|| async { (StatusCode::NOT_IMPLEMENTED, "") }), + ) + .route_service("/subscribe", sub_handler.into_service()) + .with_state(state); + + Ok(app) +} diff --git a/crates/upnp-serve/src/lib.rs b/crates/upnp-serve/src/lib.rs new file mode 100644 index 0000000..c29b06f --- /dev/null +++ b/crates/upnp-serve/src/lib.rs @@ -0,0 +1,102 @@ +use std::{io::Write, time::Duration}; + +use anyhow::Context; +use gethostname::gethostname; +use http_handlers::make_router; +use librqbit_sha1_wrapper::ISha1; +use ssdp::SsdpRunner; + +use tokio_util::sync::CancellationToken; +use tracing::debug; +use upnp_types::content_directory::ContentDirectoryBrowseProvider; + +mod constants; +mod http_handlers; +mod ssdp; +pub mod state; +mod subscriptions; +mod templates; +pub mod upnp_types; + +pub struct UpnpServerOptions { + pub friendly_name: String, + pub http_hostname: String, + pub http_listen_port: u16, + pub http_prefix: String, + pub browse_provider: Box, + pub cancellation_token: CancellationToken, +} + +pub struct UpnpServer { + axum_router: Option, + ssdp_runner: SsdpRunner, +} + +fn create_usn(opts: &UpnpServerOptions) -> anyhow::Result { + let mut buf = Vec::new(); + + buf.write_all(gethostname().as_encoded_bytes())?; + write!( + &mut buf, + "{}{}{}", + opts.friendly_name, opts.http_listen_port, opts.http_prefix + )?; + + let mut sha1 = librqbit_sha1_wrapper::Sha1::new(); + sha1.update(&buf); + + let hash = sha1.finish(); + let uuid = uuid::Builder::from_slice(&hash[..16]) + .context("error generating UUID")? + .into_uuid(); + Ok(format!("uuid:{}", uuid)) +} + +impl UpnpServer { + pub async fn new(opts: UpnpServerOptions) -> anyhow::Result { + let usn = create_usn(&opts).context("error generating USN")?; + + let description_http_location = { + let hostname = &opts.http_hostname; + let port = opts.http_listen_port; + let http_prefix = &opts.http_prefix; + format!("http://{hostname}:{port}{http_prefix}/description.xml") + }; + + let ssdp_runner = crate::ssdp::SsdpRunner::new(ssdp::SsdpRunnerOptions { + usn: usn.clone(), + description_http_location, + server_string: "Linux/3.4 UPnP/1.0 rqbit/1".to_owned(), + notify_interval: Duration::from_secs(60), + }) + .await + .context("error initializing SsdpRunner")?; + + let router = make_router( + opts.friendly_name, + opts.http_prefix, + usn, + opts.browse_provider, + opts.cancellation_token, + )?; + + Ok(Self { + axum_router: Some(router), + ssdp_runner, + }) + } + + pub fn take_router(&mut self) -> anyhow::Result { + self.axum_router + .take() + .context("programming error: router already taken") + } + + pub async fn run_ssdp_forever(&self) -> anyhow::Result<()> { + debug!("starting SSDP"); + self.ssdp_runner + .run_forever() + .await + .context("error running SSDP loop") + } +} diff --git a/crates/upnp-serve/src/resources/scpd_connection_manager.xml b/crates/upnp-serve/src/resources/scpd_connection_manager.xml new file mode 100644 index 0000000..5304264 --- /dev/null +++ b/crates/upnp-serve/src/resources/scpd_connection_manager.xml @@ -0,0 +1,182 @@ + + + + 1 + 0 + + + + GetProtocolInfo + + + Source + out + SourceProtocolInfo + + + Sink + out + SinkProtocolInfo + + + + + PrepareForConnection + + + RemoteProtocolInfo + in + A_ARG_TYPE_ProtocolInfo + + + PeerConnectionManager + in + A_ARG_TYPE_ConnectionManager + + + PeerConnectionID + in + A_ARG_TYPE_ConnectionID + + + Direction + in + A_ARG_TYPE_Direction + + + ConnectionID + out + A_ARG_TYPE_ConnectionID + + + AVTransportID + out + A_ARG_TYPE_AVTransportID + + + RcsID + out + A_ARG_TYPE_RcsID + + + + + ConnectionComplete + + + ConnectionID + in + A_ARG_TYPE_ConnectionID + + + + + GetCurrentConnectionIDs + + + ConnectionIDs + out + CurrentConnectionIDs + + + + + GetCurrentConnectionInfo + + + ConnectionID + in + A_ARG_TYPE_ConnectionID + + + RcsID + out + A_ARG_TYPE_RcsID + + + AVTransportID + out + A_ARG_TYPE_AVTransportID + + + ProtocolInfo + out + A_ARG_TYPE_ProtocolInfo + + + PeerConnectionManager + out + A_ARG_TYPE_ConnectionManager + + + PeerConnectionID + out + A_ARG_TYPE_ConnectionID + + + Direction + out + A_ARG_TYPE_Direction + + + Status + out + A_ARG_TYPE_ConnectionStatus + + + + + + + SourceProtocolInfo + string + + + SinkProtocolInfo + string + + + CurrentConnectionIDs + string + + + A_ARG_TYPE_ConnectionStatus + string + + OK + ContentFormatMismatch + InsufficientBandwidth + UnreliableChannel + Unknown + + + + A_ARG_TYPE_ConnectionManager + string + + + A_ARG_TYPE_Direction + string + + Input + Output + + + + A_ARG_TYPE_ProtocolInfo + string + + + A_ARG_TYPE_ConnectionID + i4 + + + A_ARG_TYPE_AVTransportID + i4 + + + A_ARG_TYPE_RcsID + i4 + + + diff --git a/crates/upnp-serve/src/resources/scpd_content_directory.xml b/crates/upnp-serve/src/resources/scpd_content_directory.xml new file mode 100644 index 0000000..c792712 --- /dev/null +++ b/crates/upnp-serve/src/resources/scpd_content_directory.xml @@ -0,0 +1,184 @@ + + + + 1 + 0 + + + + Browse + + + ObjectID + in + A_ARG_TYPE_ObjectID + + + BrowseFlag + in + A_ARG_TYPE_BrowseFlag + + + Filter + in + A_ARG_TYPE_Filter + + + StartingIndex + in + A_ARG_TYPE_Index + + + RequestedCount + in + A_ARG_TYPE_Count + + + SortCriteria + in + A_ARG_TYPE_SortCriteria + + + Result + out + A_ARG_TYPE_Result + + + NumberReturned + out + A_ARG_TYPE_Count + + + TotalMatches + out + A_ARG_TYPE_Count + + + UpdateID + out + A_ARG_TYPE_UpdateID + + + + + + + SearchCapabilities + string + + + SortCapabilities + string + + + SortExtensionCapabilities + string + + + SystemUpdateID + ui4 + + + ContainerUpdateIDs + string + + + TransferIDs + string + + + FeatureList + string + + + A_ARG_TYPE_ObjectID + string + + + A_ARG_TYPE_Result + string + + + A_ARG_TYPE_SearchCriteria + string + + + A_ARG_TYPE_BrowseFlag + string + + BrowseMetadata + BrowseDirectChildren + + + + A_ARG_TYPE_Filter + string + + + A_ARG_TYPE_SortCriteria + string + + + A_ARG_TYPE_Index + ui4 + + + A_ARG_TYPE_Count + ui4 + + + A_ARG_TYPE_UpdateID + ui4 + + + A_ARG_TYPE_TransferID + ui4 + + + A_ARG_TYPE_TransferStatus + string + + COMPLETED + ERROR + IN_PROGRESS + STOPPED + + + + A_ARG_TYPE_TransferLength + string + + + A_ARG_TYPE_TransferTotal + string + + + A_ARG_TYPE_TagValueList + string + + + A_ARG_TYPE_URI + uri + + + A_ARG_TYPE_CategoryType + ui4 + + + + A_ARG_TYPE_RID + ui4 + + + + A_ARG_TYPE_PosSec + ui4 + + + + A_ARG_TYPE_Featurelist + string + + + + diff --git a/crates/upnp-serve/src/resources/templates/content_directory_control_browse_container.tmpl.xml b/crates/upnp-serve/src/resources/templates/content_directory_control_browse_container.tmpl.xml new file mode 100644 index 0000000..cadb7a2 --- /dev/null +++ b/crates/upnp-serve/src/resources/templates/content_directory_control_browse_container.tmpl.xml @@ -0,0 +1,4 @@ + + {title} + object.container.storageFolder + diff --git a/crates/upnp-serve/src/resources/templates/content_directory_control_browse_envelope.tmpl.xml b/crates/upnp-serve/src/resources/templates/content_directory_control_browse_envelope.tmpl.xml new file mode 100644 index 0000000..a545fdf --- /dev/null +++ b/crates/upnp-serve/src/resources/templates/content_directory_control_browse_envelope.tmpl.xml @@ -0,0 +1,11 @@ + + + + + + {number_returned} + {total_matches} + {update_id} + + + diff --git a/crates/upnp-serve/src/resources/templates/content_directory_control_browse_item.tmpl.xml b/crates/upnp-serve/src/resources/templates/content_directory_control_browse_item.tmpl.xml new file mode 100644 index 0000000..1780249 --- /dev/null +++ b/crates/upnp-serve/src/resources/templates/content_directory_control_browse_item.tmpl.xml @@ -0,0 +1,5 @@ + + {title} + {upnp_class} + {url} + diff --git a/crates/upnp-serve/src/resources/templates/content_directory_control_browse_result.tmpl.xml b/crates/upnp-serve/src/resources/templates/content_directory_control_browse_result.tmpl.xml new file mode 100644 index 0000000..5323913 --- /dev/null +++ b/crates/upnp-serve/src/resources/templates/content_directory_control_browse_result.tmpl.xml @@ -0,0 +1,5 @@ + + {items} + diff --git a/crates/upnp-serve/src/resources/templates/content_directory_control_get_system_update_id.tmpl.xml b/crates/upnp-serve/src/resources/templates/content_directory_control_get_system_update_id.tmpl.xml new file mode 100644 index 0000000..7a873f8 --- /dev/null +++ b/crates/upnp-serve/src/resources/templates/content_directory_control_get_system_update_id.tmpl.xml @@ -0,0 +1,9 @@ + + + + + {id} + + + diff --git a/crates/upnp-serve/src/resources/templates/notify_subscription.tmpl.xml b/crates/upnp-serve/src/resources/templates/notify_subscription.tmpl.xml new file mode 100644 index 0000000..57930a1 --- /dev/null +++ b/crates/upnp-serve/src/resources/templates/notify_subscription.tmpl.xml @@ -0,0 +1,5 @@ + + + {system_update_id} + + diff --git a/crates/upnp-serve/src/resources/templates/root_desc.tmpl.xml b/crates/upnp-serve/src/resources/templates/root_desc.tmpl.xml new file mode 100644 index 0000000..adcde22 --- /dev/null +++ b/crates/upnp-serve/src/resources/templates/root_desc.tmpl.xml @@ -0,0 +1,31 @@ + + + + 1 + 0 + + + urn:schemas-upnp-org:device:MediaServer:1 + {friendly_name} + {manufacturer} + {model_name} + {unique_id} + + + + urn:schemas-upnp-org:service:ContentDirectory:1 + urn:upnp-org:serviceId:ContentDirectory + {http_prefix}/scpd/ContentDirectory.xml + {http_prefix}/control/ContentDirectory + {http_prefix}/subscribe + + + urn:schemas-upnp-org:service:ConnectionManager:1 + urn:upnp-org:serviceId:ConnectionManager + {http_prefix}/scpd/ConnectionManager.xml + {http_prefix}/control/ConnectionManager + + + / + + diff --git a/crates/upnp-serve/src/resources/test/ContentDirectoryControlExampleRequest.xml b/crates/upnp-serve/src/resources/test/ContentDirectoryControlExampleRequest.xml new file mode 100644 index 0000000..d700b10 --- /dev/null +++ b/crates/upnp-serve/src/resources/test/ContentDirectoryControlExampleRequest.xml @@ -0,0 +1,12 @@ + + + + 5 + BrowseDirectChildren + * + 0 + 5000 + + + + diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs new file mode 100644 index 0000000..30facd9 --- /dev/null +++ b/crates/upnp-serve/src/ssdp.rs @@ -0,0 +1,255 @@ +use std::{ + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + time::Duration, +}; + +use anyhow::{bail, Context}; +use bstr::BStr; +use tokio::net::UdpSocket; +use tracing::{debug, trace, warn}; + +use crate::constants::{UPNP_KIND_MEDIASERVER, UPNP_KIND_ROOT_DEVICE}; + +const UPNP_PORT: u16 = 1900; +const UPNP_BROADCAST_IP: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250); +const UPNP_BROADCAST_ADDR: SocketAddrV4 = SocketAddrV4::new(UPNP_BROADCAST_IP, UPNP_PORT); + +#[derive(Debug)] +pub enum SsdpMessage<'a, 'h> { + MSearch(SsdpMSearchRequest<'a>), + #[allow(dead_code)] + OtherRequest(httparse::Request<'h, 'a>), + #[allow(dead_code)] + Response(httparse::Response<'h, 'a>), +} + +#[derive(Debug)] +pub struct SsdpMSearchRequest<'a> { + pub host: &'a BStr, + pub man: &'a BStr, + pub st: &'a BStr, +} + +impl<'a> SsdpMSearchRequest<'a> { + fn matches_media_server(&self) -> bool { + if self.host != "239.255.255.250:1900" { + return false; + } + if self.man != "\"ssdp:discover\"" { + return false; + } + if self.st == UPNP_KIND_ROOT_DEVICE || self.st == UPNP_KIND_MEDIASERVER { + return true; + } + false + } +} + +pub fn try_parse_ssdp<'a, 'h>( + buf: &'a [u8], + headers: &'h mut [httparse::Header<'a>], +) -> anyhow::Result> { + if buf.starts_with(b"HTTP/") { + let mut resp = httparse::Response::new(headers); + resp.parse(buf).context("error parsing response")?; + return Ok(SsdpMessage::Response(resp)); + } + + let mut req = httparse::Request::new(headers); + req.parse(buf).context("error parsing request")?; + + match req.method { + Some("M-SEARCH") => { + let mut host = None; + let mut man = None; + let mut st = None; + + for header in req.headers.iter() { + match header.name { + "HOST" | "Host" | "host" => host = Some(header.value), + "MAN" | "Man" | "man" => man = Some(header.value), + "ST" | "St" | "st" => st = Some(header.value), + other => trace!(header=?BStr::new(other), "ignoring SSDP header"), + } + } + + match (host, man, st) { + (Some(host), Some(man), Some(st)) => { + return Ok(SsdpMessage::MSearch(SsdpMSearchRequest { + host: BStr::new(host), + man: BStr::new(man), + st: BStr::new(st), + })) + } + _ => bail!("not all of host, man and st are set"), + } + } + _ => return Ok(SsdpMessage::OtherRequest(req)), + } +} + +pub struct SsdpRunnerOptions { + pub usn: String, + pub description_http_location: String, + pub server_string: String, + pub notify_interval: Duration, +} + +pub struct SsdpRunner { + opts: SsdpRunnerOptions, + socket: UdpSocket, +} + +impl SsdpRunner { + pub async fn new(opts: SsdpRunnerOptions) -> anyhow::Result { + let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, UPNP_PORT); + trace!(addr=?bind_addr, "binding UDP"); + let socket = + tokio::net::UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, UPNP_PORT)) + .await + .context("error binding")?; + + trace!(multiaddr=?UPNP_BROADCAST_IP, interface=?Ipv4Addr::UNSPECIFIED, "joining multicast v4 group"); + socket + .join_multicast_v4(UPNP_BROADCAST_IP, Ipv4Addr::UNSPECIFIED) + .context("error joining multicast group")?; + + Ok(Self { opts, socket }) + } + + fn generate_notify_message(&self, kind: &str) -> String { + let usn: &str = &self.opts.usn; + let description_http_location = &self.opts.description_http_location; + let server: &str = &self.opts.server_string; + let bcast_addr = UPNP_BROADCAST_ADDR; + format!( + "NOTIFY * HTTP/1.1\r +Host: {bcast_addr}\r +Cache-Control: max-age=75\r +Location: {description_http_location}\r +NT: {kind}\r +NTS: ssdp:alive\r +Server: {server}\r +USN: {usn}::{kind}\r +\r +" + ) + } + + fn generate_ssdp_discover_response(&self) -> String { + let location = &self.opts.description_http_location; + let usn = &self.opts.usn; + let media_server = UPNP_KIND_MEDIASERVER; + let server = &self.opts.server_string; + format!( + "HTTP/1.1 200 OK\r +Cache-Control: max-age=75\r +Ext: \r +Location: {location}\r +Server: {server}\r +St: {media_server}\r +Usn: {usn}::{media_server}\r +Content-Length: 0\r\n\r\n" + ) + } + + async fn try_send_notifies(&self) { + for kind in [UPNP_KIND_ROOT_DEVICE, UPNP_KIND_MEDIASERVER] { + let msg = self.generate_notify_message(kind); + trace!(content=?msg, addr=?UPNP_BROADCAST_ADDR, "sending SSDP NOTIFY"); + if let Err(e) = self + .socket + .send_to(msg.as_bytes(), UPNP_BROADCAST_ADDR) + .await + { + warn!(error=?e, "error sending SSDP NOTIFY") + } + } + } + + async fn task_send_notifies_periodically(&self) -> anyhow::Result<()> { + let mut interval = tokio::time::interval(self.opts.notify_interval); + loop { + interval.tick().await; + self.try_send_notifies().await; + } + } + + async fn process_incoming_message(&self, msg: &[u8], addr: SocketAddr) -> anyhow::Result<()> { + let mut headers = [httparse::EMPTY_HEADER; 16]; + trace!(content = ?BStr::new(msg), ?addr, "received message"); + let parsed = try_parse_ssdp(msg, &mut headers); + let msg = match parsed { + Ok(SsdpMessage::MSearch(msg)) => msg, + Ok(m) => { + trace!("ignoring {m:?}"); + return Ok(()); + } + Err(e) => { + debug!(error=?e, "error parsing SSDP message"); + return Ok(()); + } + }; + if !msg.matches_media_server() { + trace!("not a media server request, ignoring"); + return Ok(()); + } + + let response = self.generate_ssdp_discover_response(); + trace!(content = response, ?addr, "sending SSDP discover response"); + self.socket + .send_to(response.as_bytes(), addr) + .await + .context("error sending")?; + + Ok(()) + } + + async fn task_respond_on_msearches(&self) -> anyhow::Result<()> { + let mut buf = vec![0u8; 16184]; + + loop { + let (sz, addr) = self + .socket + .recv_from(&mut buf) + .await + .context("error receiving")?; + let msg = &buf[..sz]; + if let Err(e) = self.process_incoming_message(msg, addr).await { + warn!(error=?e, ?addr, "error processing incoming SSDP message") + } + } + } + + async fn send_msearch(&self) -> anyhow::Result<()> { + let msearch_msg = "M-SEARCH * HTTP/1.1\r +HOST: 239.255.255.250:1900\r +ST: urn:schemas-upnp-org:device:MediaServer:1\r +MAN: \"ssdp:discover\"\r +MX: 2\r\n\r\n"; + + trace!(content = msearch_msg, "multicasting M-SEARCH"); + + self.socket + .send_to(msearch_msg.as_bytes(), UPNP_BROADCAST_ADDR) + .await + .context("error sending msearch")?; + Ok(()) + } + + pub async fn run_forever(&self) -> anyhow::Result<()> { + // This isn't necessary, but would show that it works. + self.send_msearch().await?; + + let t1 = self.task_respond_on_msearches(); + let t2 = self.task_send_notifies_periodically(); + + tokio::pin!(t1); + tokio::pin!(t2); + + tokio::select! { + r = &mut t1 => r, + r = &mut t2 => r, + } + } +} diff --git a/crates/upnp-serve/src/state.rs b/crates/upnp-serve/src/state.rs new file mode 100644 index 0000000..2a392f8 --- /dev/null +++ b/crates/upnp-serve/src/state.rs @@ -0,0 +1,78 @@ +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use anyhow::Context; +use axum::body::Bytes; +use librqbit_core::spawn_utils::spawn_with_cancel; +use tokio_util::sync::CancellationToken; +use tracing::{error_span, Span}; + +use crate::{ + subscriptions::Subscriptions, upnp_types::content_directory::ContentDirectoryBrowseProvider, +}; + +pub struct UpnpServerStateInner { + pub rendered_root_description: Bytes, + pub provider: Box, + pub system_update_id: AtomicU64, + pub subscriptions: Subscriptions, + + pub span: Span, + pub system_update_bcast_tx: tokio::sync::broadcast::Sender, + pub cancel_token: tokio_util::sync::CancellationToken, + _drop_guard: tokio_util::sync::DropGuard, +} + +fn new_system_update_id() -> anyhow::Result { + Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()) +} + +impl UpnpServerStateInner { + pub fn new( + rendered_root_description: Bytes, + provider: Box, + cancellation_token: CancellationToken, + ) -> anyhow::Result> { + let cancel_token = cancellation_token.child_token(); + let drop_guard = cancel_token.clone().drop_guard(); + let (btx, _) = tokio::sync::broadcast::channel(32); + let span = error_span!(parent: None, "upnp-server"); + let state = Arc::new(Self { + rendered_root_description, + provider, + system_update_id: AtomicU64::new(new_system_update_id()?), + subscriptions: Default::default(), + system_update_bcast_tx: btx, + _drop_guard: drop_guard, + span: span.clone(), + cancel_token: cancel_token.clone(), + }); + + spawn_with_cancel( + error_span!(parent: span, "system_update_id_updater"), + cancel_token, + { + let state = Arc::downgrade(&state); + async move { + let mut interval = tokio::time::interval(Duration::from_secs(10)); + loop { + interval.tick().await; + let new_value = new_system_update_id()?; + let state = state.upgrade().context("upnp server is dead")?; + state.system_update_id.store(new_value, Ordering::Relaxed); + let _ = state.system_update_bcast_tx.send(new_value); + } + } + }, + ); + + Ok(state) + } +} + +pub type UnpnServerState = Arc; diff --git a/crates/upnp-serve/src/subscriptions.rs b/crates/upnp-serve/src/subscriptions.rs new file mode 100644 index 0000000..7cb83e2 --- /dev/null +++ b/crates/upnp-serve/src/subscriptions.rs @@ -0,0 +1,200 @@ +use crate::state::UpnpServerStateInner; +use crate::templates::render_notify_subscription_system_update_id; +use anyhow::Context; +use http::Method; +use librqbit_core::spawn_utils::spawn_with_cancel; +use parking_lot::RwLock; +use std::{ + collections::HashMap, + sync::{atomic::Ordering, Arc}, + time::Duration, +}; +use tokio::sync::{broadcast::error::RecvError, Notify}; +use tracing::{error_span, warn, Instrument}; + +pub struct Subscription { + pub url: url::Url, + pub seq: u64, + pub timeout: Duration, + pub refresh_notify: Arc, +} + +#[derive(Default)] +pub struct Subscriptions { + subs: RwLock>, +} + +impl Subscriptions { + pub fn add(&self, url: url::Url, timeout: Duration) -> (String, Arc) { + let sid = format!("uuid:{}", uuid::Uuid::new_v4()); + let notify = Arc::new(Notify::default()); + self.subs.write().insert( + sid.clone(), + Subscription { + url, + seq: 0, + timeout, + refresh_notify: notify.clone(), + }, + ); + (sid, notify) + } + + pub fn update_timeout(&self, sid: &str, timeout: Duration) -> anyhow::Result<()> { + let mut g = self.subs.write(); + let s = g.get_mut(sid).context("no such subscription")?; + s.timeout = timeout; + s.refresh_notify.notify_waiters(); + Ok(()) + } + + pub fn next_seq(&self, sid: &str) -> anyhow::Result { + let mut g = self.subs.write(); + let s = g.get_mut(sid).context("no such subscription")?; + let id = s.seq; + s.seq += 1; + Ok(id) + } + + pub fn get_timeout(&self, sid: &str) -> anyhow::Result { + let mut g = self.subs.write(); + let s = g.get_mut(sid).context("no such subscription")?; + Ok(s.timeout) + } + + pub fn remove(&self, sid: &str) -> anyhow::Result { + let mut g = self.subs.write(); + let s = g.remove(sid).context("no such subscription")?; + Ok(s) + } +} + +pub async fn notify_subscription_system_update( + url: &url::Url, + sid: &str, + seq: u64, + system_update_id: u64, +) -> anyhow::Result<()> { + // NOTIFY /callback_path HTTP/1.1 + // CONTENT-TYPE: text/xml; charset="utf-8" + // NT: upnp:event + // NTS: upnp:propchange + // SID: uuid: + // SEQ: + // + let body = render_notify_subscription_system_update_id(system_update_id); + + let resp = reqwest::Client::builder() + .build()? + .request(Method::from_bytes(b"NOTIFY")?, url.clone()) + .header("Content-Type", r#"text/xml; charset="utf-8""#) + .header("NT", "upnp:event") + .header("NTS", "upnp:propchange") + .header("SID", sid) + .header("SEQ", seq.to_string()) + .body(body) + .send() + .await?; + + if !resp.status().is_success() { + anyhow::bail!("{:?}", resp.status()) + } + Ok(()) +} + +impl UpnpServerStateInner { + pub fn renew_subscription(&self, sid: &str, new_timeout: Duration) -> anyhow::Result<()> { + self.subscriptions.update_timeout(sid, new_timeout) + } + + pub fn new_subscription( + self: &Arc, + url: url::Url, + timeout: Duration, + ) -> anyhow::Result { + let (sid, refresh_notify) = self.subscriptions.add(url.clone(), timeout); + let token = self.cancel_token.child_token(); + + // Spawn a task that will notify it of system id changes. + // Spawn a task that will wait for timeout or subscription refreshes. + // When it times out, kill all of them. + + let pspan = self.span.clone(); + let subscription_manager = { + let mut brx = self.system_update_bcast_tx.subscribe(); + let state = Arc::downgrade(self); + let sid = sid.clone(); + let url = url.clone(); + + async move { + let system_update_id_notifier = async { + loop { + let res = brx.recv().await; + let state = state.upgrade().context("upnp server dead")?; + let seq = state.subscriptions.next_seq(&sid)?; + match res { + Ok(system_update_id) => { + if let Err(e) = notify_subscription_system_update( + &url, + &sid, + seq, + system_update_id, + ) + .await + { + warn!(error=?e, "error updating UPNP subscription"); + } + } + Err(RecvError::Lagged(by)) => { + warn!(by, "UPNP subscription lagged"); + let seq = state.subscriptions.next_seq(&sid)?; + let system_update_id = + state.system_update_id.load(Ordering::Relaxed); + if let Err(e) = notify_subscription_system_update( + &url, + &sid, + seq, + system_update_id, + ) + .await + { + warn!(error=?e, "error updating UPNP subscription"); + } + } + Err(RecvError::Closed) => return Ok(()), + } + } + } + .instrument(error_span!("system-update-id-notifier")); + + let timeout_notifier = async { + let mut timeout = timeout; + loop { + tokio::select! { + _ = refresh_notify.notified() => { + timeout = state.upgrade().context("upnp server dead")?.subscriptions.get_timeout(&sid)?; + }, + _ = tokio::time::sleep(timeout) => { + state.upgrade().context("upnp server dead")?.subscriptions.remove(&sid)?; + return Ok(()) + } + } + } + }.instrument(error_span!("timeout-killer")); + + tokio::select! { + r = system_update_id_notifier => r, + r = timeout_notifier => r, + } + } + }; + + spawn_with_cancel( + error_span!(parent: pspan, "subscription-manager", %url), + token, + subscription_manager, + ); + + Ok(sid) + } +} diff --git a/crates/upnp-serve/src/templates.rs b/crates/upnp-serve/src/templates.rs new file mode 100644 index 0000000..cabbb95 --- /dev/null +++ b/crates/upnp-serve/src/templates.rs @@ -0,0 +1,124 @@ +use crate::upnp_types::content_directory::response::{Container, Item, ItemOrContainer}; + +pub struct RootDescriptionInputs<'a> { + pub friendly_name: &'a str, + pub manufacturer: &'a str, + pub model_name: &'a str, + pub unique_id: &'a str, + pub http_prefix: &'a str, +} + +pub fn render_root_description_xml(input: &RootDescriptionInputs<'_>) -> String { + let tmpl = include_str!("resources/templates/root_desc.tmpl.xml").trim(); + + // This isn't great perf-wise but whatever. + tmpl.replace("{friendly_name}", input.friendly_name) + .replace("{manufacturer}", input.manufacturer) + .replace("{model_name}", input.model_name) + .replace("{unique_id}", input.unique_id) + .replace("{http_prefix}", input.http_prefix) +} + +pub fn render_content_directory_browse(items: impl IntoIterator) -> String { + fn item_or_container(item_or_container: &ItemOrContainer) -> Option { + fn item(item: &Item) -> Option { + let tmpl = + include_str!("resources/templates/content_directory_control_browse_item.tmpl.xml") + .trim(); + + let mime = item.mime_type.as_ref()?; + let upnp_class = match mime.type_().as_str() { + "video" => "object.item.videoItem", + _ => return None, + }; + let mime = mime.to_string(); + + Some( + tmpl.replace("{id}", &item.id.to_string()) + .replace("{parent_id}", &item.parent_id.unwrap_or(0).to_string()) + .replace("{mime_type}", &mime) + .replace("{url}", &item.url) + .replace("{upnp_class}", upnp_class) + .replace("{title}", &item.title), + ) + } + + fn container(item: &Container) -> String { + let tmpl = include_str!( + "resources/templates/content_directory_control_browse_container.tmpl.xml" + ) + .trim(); + tmpl.replace("{id}", &format!("{}", item.id)) + .replace("{parent_id}", &item.parent_id.unwrap_or(0).to_string()) + .replace("{title}", &item.title) + .replace( + "{childCountTag}", + &match item.children_count { + Some(cc) => format!("childCount=\"{}\"", cc), + None => String::new(), + }, + ) + } + + match item_or_container { + ItemOrContainer::Container(c) => Some(container(c)), + ItemOrContainer::Item(i) => item(i), + } + } + + struct Envelope<'a> { + result: &'a str, + number_returned: usize, + total_matches: usize, + update_id: u64, + } + + fn render_content_directory_envelope(envelope: &Envelope<'_>) -> String { + let tmpl = + include_str!("resources/templates/content_directory_control_browse_envelope.tmpl.xml") + .trim(); + tmpl.replace("{result}", envelope.result) + .replace("{number_returned}", &envelope.number_returned.to_string()) + .replace("{total_matches}", &envelope.total_matches.to_string()) + .replace("{update_id}", &envelope.update_id.to_string()) + } + + fn render_content_directory_browse_result(items: &str) -> String { + let tmpl = + include_str!("resources/templates/content_directory_control_browse_result.tmpl.xml") + .trim(); + tmpl.replace("{items}", items) + } + + let all_items = items + .into_iter() + .filter_map(|item| item_or_container(&item)) + .collect::>(); + let total = all_items.len(); + let all_items = all_items.join(""); + + let result = render_content_directory_browse_result(&all_items); + + use std::time::{SystemTime, UNIX_EPOCH}; + let update_id = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + + render_content_directory_envelope(&Envelope { + result: &result, + number_returned: total, + total_matches: total, + update_id, + }) +} + +pub fn render_notify_subscription_system_update_id(update_id: u64) -> String { + include_str!("resources/templates/notify_subscription.tmpl.xml") + .replace("{system_update_id}", &update_id.to_string()) +} + +pub fn render_content_directory_control_get_system_update_id(update_id: u64) -> String { + include_str!("resources/templates/content_directory_control_get_system_update_id.tmpl.xml") + .replace("{id}", &update_id.to_string()) +} diff --git a/crates/upnp-serve/src/upnp_types.rs b/crates/upnp-serve/src/upnp_types.rs new file mode 100644 index 0000000..db6cf1a --- /dev/null +++ b/crates/upnp-serve/src/upnp_types.rs @@ -0,0 +1,81 @@ +pub mod content_directory { + use response::ItemOrContainer; + + pub mod request { + pub struct ContentDirectoryControlRequest { + pub object_id: usize, + } + + impl ContentDirectoryControlRequest { + pub fn parse(s: &str) -> anyhow::Result { + let mut reader = quick_xml::Reader::from_str(s); + + use quick_xml::events::Event::{Eof, Start}; + + let mut object_id: Option = None; + + loop { + match reader.read_event()? { + Eof => break, + Start(e) if e.name().as_ref() == b"ObjectID" => { + let t = reader.read_text(e.to_end().name())?; + object_id = t.trim().parse().ok(); + } + _ => continue, + } + } + + Ok(ContentDirectoryControlRequest { + object_id: object_id.unwrap_or(0), + }) + } + } + } + + pub mod response { + #[derive(Debug, Clone, PartialEq, Eq)] + pub struct Container { + pub id: usize, + pub parent_id: Option, + pub children_count: Option, + pub title: String, + } + + #[derive(Debug, Clone, PartialEq, Eq)] + pub struct Item { + pub id: usize, + pub parent_id: Option, + pub title: String, + pub mime_type: Option, + pub url: String, + } + + #[derive(Debug, Clone, PartialEq, Eq)] + pub enum ItemOrContainer { + Container(Container), + Item(Item), + } + } + + pub trait ContentDirectoryBrowseProvider: Send + Sync { + fn browse_direct_children(&self, parent_id: usize) -> Vec; + } + + impl ContentDirectoryBrowseProvider for Vec { + fn browse_direct_children(&self, _parent_id: usize) -> Vec { + self.clone() + } + } +} + +#[cfg(test)] +mod tests { + use crate::upnp_types::content_directory::request::ContentDirectoryControlRequest; + + #[test] + fn test_parse_content_directory_request() { + let s = include_str!("resources/test/ContentDirectoryControlExampleRequest.xml"); + let req = ContentDirectoryControlRequest::parse(s).unwrap(); + assert_eq!(req.object_id, 5); + } +} diff --git a/crates/upnp/Cargo.toml b/crates/upnp/Cargo.toml index 81a30b2..f242e49 100644 --- a/crates/upnp/Cargo.toml +++ b/crates/upnp/Cargo.toml @@ -14,7 +14,7 @@ readme = "README.md" [dependencies] tracing = "0.1" anyhow = "1" -reqwest = { version = "0.12" } +reqwest = { version = "0.12", default-features = false } serde = { version = "1", features = ["derive"] } serde-xml-rs = "0.6.0" tokio = { version = "1", features = ["macros"] } diff --git a/desktop/src-tauri/Cargo.lock b/desktop/src-tauri/Cargo.lock index 5d39880..920c35c 100644 --- a/desktop/src-tauri/Cargo.lock +++ b/desktop/src-tauri/Cargo.lock @@ -136,12 +136,6 @@ dependencies = [ "system-deps 6.2.2", ] -[[package]] -name = "atomic-waker" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" - [[package]] name = "autocfg" version = "1.3.0" @@ -1376,25 +1370,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "h2" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" -dependencies = [ - "atomic-waker", - "bytes", - "fnv", - "futures-core", - "futures-sink", - "http 1.1.0", - "indexmap 2.4.0", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -1532,7 +1507,6 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", "http 1.1.0", "http-body", "httparse", @@ -1544,23 +1518,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.27.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" -dependencies = [ - "futures-util", - "http 1.1.0", - "hyper", - "hyper-util", - "rustls", - "rustls-pki-types", - "tokio", - "tokio-rustls", - "tower-service", -] - [[package]] name = "hyper-tls" version = "0.6.0" @@ -2984,15 +2941,12 @@ checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" dependencies = [ "base64 0.22.1", "bytes", - "encoding_rs", "futures-core", "futures-util", - "h2", "http 1.1.0", "http-body", "http-body-util", "hyper", - "hyper-rustls", "hyper-tls", "hyper-util", "ipnet", @@ -3008,7 +2962,6 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 1.0.1", - "system-configuration", "tokio", "tokio-native-tls", "tokio-socks", @@ -3020,21 +2973,6 @@ dependencies = [ "windows-registry", ] -[[package]] -name = "ring" -version = "0.17.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" -dependencies = [ - "cc", - "cfg-if", - "getrandom 0.2.15", - "libc", - "spin", - "untrusted", - "windows-sys 0.52.0", -] - [[package]] name = "rlimit" version = "0.10.1" @@ -3092,19 +3030,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls" -version = "0.23.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" -dependencies = [ - "once_cell", - "rustls-pki-types", - "rustls-webpki", - "subtle", - "zeroize", -] - [[package]] name = "rustls-pemfile" version = "2.1.3" @@ -3121,17 +3046,6 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" -[[package]] -name = "rustls-webpki" -version = "0.102.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" -dependencies = [ - "ring", - "rustls-pki-types", - "untrusted", -] - [[package]] name = "rustversion" version = "1.0.17" @@ -3476,12 +3390,6 @@ dependencies = [ "system-deps 5.0.0", ] -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" - [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -3529,12 +3437,6 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" -[[package]] -name = "subtle" -version = "2.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" - [[package]] name = "syn" version = "1.0.109" @@ -3572,27 +3474,6 @@ dependencies = [ "futures-core", ] -[[package]] -name = "system-configuration" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bc6ee10a9b4fcf576e9b0819d95ec16f4d2c02d39fd83ac1c8789785c4a42" -dependencies = [ - "bitflags 2.6.0", - "core-foundation", - "system-configuration-sys", -] - -[[package]] -name = "system-configuration-sys" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "system-deps" version = "5.0.0" @@ -4032,17 +3913,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" -dependencies = [ - "rustls", - "rustls-pki-types", - "tokio", -] - [[package]] name = "tokio-socks" version = "0.5.2" @@ -4316,12 +4186,6 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" -[[package]] -name = "untrusted" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" - [[package]] name = "url" version = "2.5.2" @@ -5100,9 +4964,3 @@ dependencies = [ "quote", "syn 2.0.75", ] - -[[package]] -name = "zeroize" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" diff --git a/desktop/src-tauri/src/main.rs b/desktop/src-tauri/src/main.rs index 15ef621..6a49429 100644 --- a/desktop/src-tauri/src/main.rs +++ b/desktop/src-tauri/src/main.rs @@ -115,13 +115,20 @@ async fn api_from_config( ); if !config.http_api.disable { - let http_api_task = librqbit::http_api::HttpApi::new( - api.clone(), - Some(librqbit::http_api::HttpApiOptions { - read_only: config.http_api.read_only, - }), - ) - .make_http_api_and_run(config.http_api.listen_addr); + let listen_addr = config.http_api.listen_addr; + let api = api.clone(); + let read_only = config.http_api.read_only; + let http_api_task = async move { + let listener = tokio::net::TcpListener::bind(listen_addr) + .await + .with_context(|| format!("error listening on {}", listen_addr))?; + librqbit::http_api::HttpApi::new( + api.clone(), + Some(librqbit::http_api::HttpApiOptions { read_only }), + ) + .make_http_api_and_run(listener, None) + .await + }; session.spawn(error_span!("http_api"), http_api_task); }