Merge pull request #208 from ikatson/upnp-serve

[Major feature] UPNP server integrated into rqbit
This commit is contained in:
Igor Katson 2024-08-23 19:58:30 +01:00 committed by GitHub
commit 3110f68f36
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 2435 additions and 243 deletions

132
Cargo.lock generated
View file

@ -200,12 +200,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "autocfg"
version = "1.3.0"
@ -401,6 +395,17 @@ dependencies = [
"generic-array 0.14.7",
]
[[package]]
name = "bstr"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40723b8fb387abc38f4f4a37c09073622e41dd12327033091ef8950659e6dc0c"
dependencies = [
"memchr",
"regex-automata 0.4.7",
"serde",
]
[[package]]
name = "bumpalo"
version = "3.16.0"
@ -793,15 +798,6 @@ dependencies = [
"serde",
]
[[package]]
name = "encoding_rs"
version = "0.8.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59"
dependencies = [
"cfg-if",
]
[[package]]
name = "equivalent"
version = "1.0.1"
@ -1041,6 +1037,16 @@ dependencies = [
"version_check",
]
[[package]]
name = "gethostname"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc3655aa6818d65bc620d6911f05aa7b6aeb596291e1e9f79e52df85583d1e30"
dependencies = [
"rustix",
"windows-targets 0.52.6",
]
[[package]]
name = "getrandom"
version = "0.2.15"
@ -1077,25 +1083,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "h2"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205"
dependencies = [
"atomic-waker",
"bytes",
"fnv",
"futures-core",
"futures-sink",
"http 1.1.0",
"indexmap 2.4.0",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@ -1275,7 +1262,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2 0.3.26",
"h2",
"http 0.2.12",
"http-body 0.4.6",
"httparse",
@ -1298,7 +1285,6 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"h2 0.4.6",
"http 1.1.0",
"http-body 1.0.1",
"httparse",
@ -1576,6 +1562,7 @@ dependencies = [
"tower-http",
"tracing",
"tracing-subscriber",
"upnp-serve",
"url",
"urlencoding",
"uuid",
@ -2284,6 +2271,15 @@ dependencies = [
"prost",
]
[[package]]
name = "quick-xml"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc"
dependencies = [
"memchr",
]
[[package]]
name = "quinn"
version = "0.11.3"
@ -2458,10 +2454,8 @@ checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63"
dependencies = [
"base64 0.22.1",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2 0.4.6",
"http 1.1.0",
"http-body 1.0.1",
"http-body-util",
@ -2485,7 +2479,6 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"sync_wrapper 1.0.1",
"system-configuration",
"tokio",
"tokio-native-tls",
"tokio-rustls",
@ -2544,6 +2537,7 @@ dependencies = [
"tokio",
"tracing",
"tracing-subscriber",
"upnp-serve",
]
[[package]]
@ -2827,6 +2821,15 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
dependencies = [
"libc",
]
[[package]]
name = "signature"
version = "2.2.0"
@ -3159,27 +3162,6 @@ dependencies = [
"futures-core",
]
[[package]]
name = "system-configuration"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "658bc6ee10a9b4fcf576e9b0819d95ec16f4d2c02d39fd83ac1c8789785c4a42"
dependencies = [
"bitflags 2.6.0",
"core-foundation",
"system-configuration-sys",
]
[[package]]
name = "system-configuration-sys"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "tap"
version = "1.0.1"
@ -3295,7 +3277,9 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"tracing",
@ -3405,7 +3389,7 @@ dependencies = [
"axum 0.6.20",
"base64 0.21.7",
"bytes",
"h2 0.3.26",
"h2",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.30",
@ -3605,6 +3589,32 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "upnp-serve"
version = "0.1.0"
dependencies = [
"anyhow",
"axum 0.7.5",
"bstr",
"gethostname",
"http 1.1.0",
"httparse",
"librqbit-core",
"librqbit-sha1-wrapper",
"librqbit-upnp",
"mime_guess",
"parking_lot",
"quick-xml",
"reqwest",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",
"url",
"uuid",
]
[[package]]
name = "url"
version = "2.5.2"

View file

@ -11,7 +11,7 @@ members = [
"crates/peer_binary_protocol",
"crates/dht",
"crates/upnp",
"crates/tracker_comms",
"crates/tracker_comms", "crates/upnp-serve",
]
[profile.dev]

View file

@ -20,8 +20,10 @@ devserver:
echo -n '' > /tmp/rqbit-log && CORS_ALLOW_REGEXP=".*" \
cargo run -- \
--log-file /tmp/rqbit-log \
--log-file-rust-log=debug,librqbit=trace \
--log-file-rust-log=debug,librqbit=trace,upnp_serve=trace \
--http-api-listen-addr 0.0.0.0:3030 \
--upnp-server-hostname 192.168.0.112 \
--upnp-server-friendly-name rqbit-dev \
server start /tmp/scratch/
@PHONY: devserver

View file

@ -14,6 +14,7 @@ readme = "README.md"
[features]
default = ["default-tls"]
http-api = ["axum", "tower-http"]
upnp-serve-adapter = ["upnp-serve"]
webui = []
timed_existence = []
default-tls = ["reqwest/default-tls"]
@ -38,6 +39,7 @@ peer_binary_protocol = { path = "../peer_binary_protocol", package = "librqbit-p
sha1w = { path = "../sha1w", default-features = false, package = "librqbit-sha1-wrapper", version = "3.0.0" }
dht = { path = "../dht", package = "librqbit-dht", version = "5.1.0" }
librqbit-upnp = { path = "../upnp", version = "0.1.1" }
upnp-serve = { path = "../upnp-serve", version = "0.1.0", optional = true }
tokio = { version = "1", features = [
"macros",
@ -80,7 +82,7 @@ backoff = "0.4.0"
dashmap = "5.5.3"
base64 = "0.21.5"
serde_with = "3.4.0"
tokio-util = "0.7.10"
tokio-util = { version = "0.7.10", features = ["io"] }
bytes = "1.5.0"
rlimit = "0.10.1"
async-stream = "0.3.5"

View file

@ -1,6 +1,6 @@
use anyhow::Context;
use axum::body::Bytes;
use axum::extract::{Path, Query, State};
use axum::extract::{ConnectInfo, Path, Query, Request, State};
use axum::response::IntoResponse;
use axum::routing::{get, post};
use bencode::AsDisplay;
@ -16,7 +16,8 @@ use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use tokio::io::AsyncSeekExt;
use tracing::{debug, info, trace};
use tokio::net::TcpListener;
use tracing::{debug, error_span, trace};
use axum::Router;
@ -52,7 +53,11 @@ impl HttpApi {
/// Run the HTTP server forever on the given address.
/// If read_only is passed, no state-modifying methods will be exposed.
#[inline(never)]
pub fn make_http_api_and_run(self, addr: SocketAddr) -> BoxFuture<'static, anyhow::Result<()>> {
pub fn make_http_api_and_run(
self,
listener: TcpListener,
upnp_router: Option<Router>,
) -> BoxFuture<'static, anyhow::Result<()>> {
let state = self.inner;
async fn api_root() -> impl IntoResponse {
@ -558,22 +563,33 @@ impl HttpApi {
.allow_headers(AllowHeaders::any())
};
let mut app = app.with_state(state);
if let Some(upnp_router) = upnp_router {
app = app.nest("/upnp", upnp_router);
}
let app = app
.layer(cors_layer)
.layer(tower_http::trace::TraceLayer::new_for_http())
.with_state(state)
.into_make_service();
info!(%addr, "starting HTTP server");
use tokio::net::TcpListener;
.layer(
tower_http::trace::TraceLayer::new_for_http().make_span_with(|req: &Request| {
let method = req.method();
let uri = req.uri();
if let Some(ConnectInfo(addr)) =
req.extensions().get::<ConnectInfo<SocketAddr>>()
{
error_span!("request", %method, %uri, %addr)
} else {
error_span!("request", %method, %uri)
}
}),
)
.into_make_service_with_connect_info::<SocketAddr>();
async move {
let listener = TcpListener::bind(&addr)
axum::serve(listener, app)
.await
.with_context(|| format!("error binding to {addr}"))?;
axum::serve(listener, app).await?;
Ok(())
.context("error running HTTP API")
}
.boxed()
}

View file

@ -65,6 +65,8 @@ mod torrent_state;
#[cfg(feature = "tracing-subscriber-utils")]
pub mod tracing_subscriber_config_utils;
mod type_aliases;
#[cfg(all(feature = "http-api", feature = "upnp-serve-adapter"))]
pub mod upnp_server_adapter;
pub use api::Api;
pub use api_error::ApiError;

View file

@ -208,19 +208,19 @@ fn merge_two_optional_streams<T>(
) -> Option<BoxStream<'static, T>> {
match (s1, s2) {
(Some(s1), None) => {
debug!("merge_two_optional_streams: using first");
trace!("merge_two_optional_streams: using first");
Some(Box::pin(s1))
}
(None, Some(s2)) => {
debug!("merge_two_optional_streams: using second");
trace!("merge_two_optional_streams: using second");
Some(Box::pin(s2))
}
(Some(s1), Some(s2)) => {
debug!("merge_two_optional_streams: using both");
trace!("merge_two_optional_streams: using both");
Some(Box::pin(merge_streams(s1, s2)))
}
(None, None) => {
debug!("merge_two_optional_streams: using none");
trace!("merge_two_optional_streams: using none");
None
}
}
@ -909,7 +909,7 @@ impl Session {
rx,
seen,
} => {
debug!(?info, "received result from DHT");
trace!(?info, "received result from DHT");
let trackers = magnet.trackers.into_iter().unique().collect_vec();
InternalAddResult {
info_hash,
@ -924,7 +924,7 @@ impl Session {
initial_peers: {
let seen = seen.into_iter().collect_vec();
for peer in &seen {
debug!(?peer, "seen")
trace!(?peer, "seen")
}
seen
},
@ -1045,7 +1045,7 @@ impl Session {
info_bytes,
} = add_res;
debug!("Torrent info: {:#?}", &info);
trace!("Torrent info: {:#?}", &info);
let only_files = compute_only_files(
&info,

View file

@ -6,7 +6,6 @@ use std::{
};
use anyhow::{bail, Context};
use axum::{response::IntoResponse, routing::get, Router};
use librqbit_core::Id20;
use parking_lot::RwLock;
use rand::{thread_rng, Rng, RngCore, SeedableRng};
@ -96,7 +95,9 @@ impl TestPeerMetadata {
}
}
#[cfg(feature = "http-api")]
async fn debug_server() -> anyhow::Result<()> {
use axum::{response::IntoResponse, routing::get, Router};
async fn backtraces() -> impl IntoResponse {
#[cfg(feature = "async-bt")]
{
@ -127,6 +128,11 @@ async fn debug_server() -> anyhow::Result<()> {
Ok(())
}
#[cfg(not(feature = "http-api"))]
async fn debug_server() -> anyhow::Result<()> {
Ok(())
}
pub fn spawn_debug_server() {
tokio::spawn(debug_server());
}

View file

@ -7,7 +7,7 @@ use anyhow::Context;
use librqbit_core::lengths::Lengths;
use size_format::SizeFormatterBinary as SF;
use tracing::{debug, info, warn};
use tracing::{info, trace, warn};
use crate::{
api::TorrentIdOrHash,
@ -159,7 +159,7 @@ impl TorrentStateInitializing {
fi.relative_filename, fi.len, err
);
} else {
debug!(
trace!(
"Set length for file {:?} to {} in {:?}",
fi.relative_filename,
SF::new(fi.len),

View file

@ -0,0 +1,595 @@
use std::{
collections::{
hash_map::Entry::{Occupied, Vacant},
HashMap,
},
sync::Arc,
};
use crate::{session::TorrentId, ManagedTorrent, Session};
#[derive(Clone)]
pub struct UpnpServerSessionAdapter {
session: Arc<Session>,
hostname: String,
port: u16,
}
use anyhow::Context;
use buffers::ByteBufOwned;
use itertools::Itertools;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use tracing::{debug, trace, warn};
use upnp_serve::{
upnp_types::content_directory::{
response::{Container, Item, ItemOrContainer},
ContentDirectoryBrowseProvider,
},
UpnpServer, UpnpServerOptions,
};
#[derive(Debug, PartialEq, Eq)]
struct TorrentFileTreeNode {
title: String,
parent_id: Option<usize>,
children: Vec<usize>,
real_torrent_file_id: Option<usize>,
}
fn encode_id(local_id: usize, torrent_id: usize) -> usize {
(local_id << 16) | (torrent_id + 1)
}
fn decode_id(id: usize) -> anyhow::Result<(usize, usize)> {
let torrent_id = id & 0xffff;
if torrent_id == 0 {
anyhow::bail!("invalid id")
}
let torrent_id = torrent_id - 1;
Ok((id >> 16, torrent_id))
}
impl TorrentFileTreeNode {
fn as_item_or_container(
&self,
id: usize,
torrent: &ManagedTorrent,
adapter: &UpnpServerSessionAdapter,
) -> ItemOrContainer {
let encoded_id = encode_id(id, torrent.id());
let encoded_parent_id = self.parent_id.map(|p| encode_id(p, torrent.id()));
match self.real_torrent_file_id {
Some(fid) => {
let filename = &torrent.shared().file_infos[fid].relative_filename;
// Torrent path joined with "/"
let last_url_bit = torrent
.shared()
.info
.iter_filenames_and_lengths()
.ok()
.and_then(|mut it| it.nth(fid))
.and_then(|(fi, _)| fi.to_vec().ok())
.map(|components| components.join("/"))
.unwrap_or_else(|| self.title.clone());
ItemOrContainer::Item(Item {
id: encoded_id,
parent_id: encoded_parent_id,
title: self.title.clone(),
mime_type: mime_guess::from_path(filename).first(),
url: format!(
"http://{}:{}/torrents/{}/stream/{}/{}",
adapter.hostname,
adapter.port,
torrent.id(),
fid,
last_url_bit
),
})
}
None => ItemOrContainer::Container(Container {
id: encoded_id,
parent_id: encoded_parent_id,
title: self.title.clone(),
children_count: Some(self.children.len()),
}),
}
}
}
struct TorrentFileTree {
// root id is 0
nodes: Vec<TorrentFileTreeNode>,
}
fn is_single_file_at_root(info: &TorrentMetaV1Info<ByteBufOwned>) -> bool {
info.iter_filenames_and_lengths()
.into_iter()
.flatten()
.flat_map(|(f, _)| f.iter_components())
.nth(1)
.is_none()
}
impl TorrentFileTree {
fn build(torent_id: TorrentId, info: &TorrentMetaV1Info<ByteBufOwned>) -> anyhow::Result<Self> {
if is_single_file_at_root(info) {
let filename = info
.iter_filenames_and_lengths()?
.next()
.context("bug")?
.0
.iter_components()
.last()
.context("bug")??;
let root_node = TorrentFileTreeNode {
title: filename.to_owned(),
parent_id: None,
children: vec![],
real_torrent_file_id: Some(0),
};
return Ok(TorrentFileTree {
nodes: vec![root_node],
});
}
let root_node = TorrentFileTreeNode {
title: match info.name.as_ref() {
Some(n) => std::str::from_utf8(n)?.to_owned(),
None => {
format!("torrent {}", torent_id)
}
},
parent_id: None,
children: vec![],
real_torrent_file_id: None,
};
let mut tree = TorrentFileTree {
nodes: vec![root_node],
};
let mut name_cache = HashMap::new();
for (fid, (fi, _)) in info.iter_filenames_and_lengths()?.enumerate() {
let components = match fi.to_vec() {
Ok(v) => v,
Err(_) => continue,
};
let mut parent_id = 0;
let mut it = components.iter().peekable();
while let Some(component) = it.next() {
let is_last = it.peek().is_none();
if is_last {
let current_id = tree.nodes.len();
let node = TorrentFileTreeNode {
title: component.clone(),
parent_id: Some(parent_id),
children: vec![],
real_torrent_file_id: Some(fid),
};
tree.nodes.push(node);
tree.nodes[parent_id].children.push(current_id);
break;
}
parent_id = match name_cache.entry((parent_id, component.clone())) {
Occupied(occ) => *occ.get(),
Vacant(vac) => {
let id = tree.nodes.len();
let node = TorrentFileTreeNode {
title: component.clone(),
parent_id: Some(parent_id),
children: vec![],
real_torrent_file_id: None,
};
tree.nodes.push(node);
tree.nodes[parent_id].children.push(id);
vac.insert(id);
id
}
};
}
}
Ok(tree)
}
}
impl UpnpServerSessionAdapter {
fn build_root(&self) -> Vec<ItemOrContainer> {
let mut all = self
.session
.with_torrents(|torrents| torrents.map(|(_, t)| t.clone()).collect_vec());
all.sort_unstable_by_key(|t| t.id());
all.iter()
.filter_map(|t| {
let real_id = t.id();
let upnp_id = real_id + 1;
if is_single_file_at_root(&t.shared().info) {
// Just add the file directly
let rf = &t.shared().file_infos[0].relative_filename;
let title = rf.file_name()?.to_str()?.to_owned();
let mime_type = mime_guess::from_path(rf).first();
let url = format!(
"http://{}:{}/torrents/{real_id}/stream/0/{title}",
self.hostname, self.port
);
Some(ItemOrContainer::Item(Item {
id: upnp_id,
parent_id: None,
title,
mime_type,
url,
}))
} else {
let title = t
.shared()
.info
.name
.as_ref()
.and_then(|b| std::str::from_utf8(&b.0).ok())
.map(|n| n.to_owned())
.unwrap_or_else(|| format!("torrent {real_id}"));
// Create a folder
Some(ItemOrContainer::Container(Container {
id: upnp_id,
parent_id: None,
title,
children_count: None,
}))
}
})
.collect_vec()
}
}
impl ContentDirectoryBrowseProvider for UpnpServerSessionAdapter {
fn browse_direct_children(&self, parent_id: usize) -> Vec<ItemOrContainer> {
if parent_id == 0 {
return self.build_root();
}
let (node_id, torrent_id) = match decode_id(parent_id) {
Ok((node_id, torrent_id)) => (node_id, torrent_id),
Err(_) => {
debug!(id=?parent_id, "invalid id");
return vec![];
}
};
trace!(parent_id, node_id, torrent_id);
let torrent = match self.session.get(torrent_id.into()) {
Some(t) => t,
None => {
warn!(torrent_id, "no such torrent");
return vec![];
}
};
let tree = match TorrentFileTree::build(torrent.id(), &torrent.shared().info) {
Ok(tree) => tree,
Err(e) => {
warn!(parent_id, error=?e, "error building torrent file tree");
return vec![];
}
};
let node = match tree.nodes.get(node_id) {
Some(n) => n,
None => {
warn!(torrent_id, node_id, "no such internal ID in torrent");
return vec![];
}
};
trace!(node_id, torrent_id, ?node);
let mut result = Vec::new();
if node.real_torrent_file_id.is_some() {
result.push(node.as_item_or_container(node_id, &torrent, self))
} else {
for (child_node_id, child_node) in node
.children
.iter()
.filter_map(|id| Some((*id, tree.nodes.get(*id)?)))
{
result.push(child_node.as_item_or_container(child_node_id, &torrent, self));
}
};
result
}
}
impl Session {
pub async fn make_upnp_adapter(
self: &Arc<Self>,
friendly_name: String,
http_hostname: String,
http_listen_port: u16,
) -> anyhow::Result<UpnpServer> {
UpnpServer::new(UpnpServerOptions {
friendly_name,
http_hostname: http_hostname.clone(),
http_listen_port,
http_prefix: "/upnp".to_owned(),
browse_provider: Box::new(UpnpServerSessionAdapter {
session: self.clone(),
hostname: http_hostname,
port: http_listen_port,
}),
cancellation_token: self.cancellation_token().child_token(),
})
.await
.context("error creating upnp adapter")
}
}
#[cfg(test)]
mod tests {
use bencode::bencode_serialize_to_writer;
use bytes::Bytes;
use dht::Id20;
use librqbit_core::torrent_metainfo::{
TorrentMetaV1File, TorrentMetaV1Info, TorrentMetaV1Owned,
};
use tempfile::TempDir;
use upnp_serve::upnp_types::content_directory::{
response::{Container, Item, ItemOrContainer},
ContentDirectoryBrowseProvider,
};
use crate::{
tests::test_util::setup_test_logging,
upnp_server_adapter::{
decode_id, encode_id, TorrentFileTree, TorrentFileTreeNode, UpnpServerSessionAdapter,
},
AddTorrent, AddTorrentOptions, Session, SessionOptions,
};
fn create_torrent(name: Option<&str>, files: &[&str]) -> TorrentMetaV1Owned {
TorrentMetaV1Owned {
announce: None,
announce_list: vec![],
info: TorrentMetaV1Info {
name: name.map(|n| n.as_bytes().into()),
pieces: b""[..].into(),
piece_length: 1,
length: None,
md5sum: None,
files: Some(
files
.iter()
.map(|f| TorrentMetaV1File {
length: 1,
path: f.split("/").map(|f| f.as_bytes().into()).collect(),
})
.collect(),
),
},
comment: None,
created_by: None,
encoding: None,
publisher: None,
publisher_url: None,
creation_date: None,
info_hash: Id20::default(),
}
}
#[test]
fn test_torrent_file_tree_single() -> anyhow::Result<()> {
let t = create_torrent(Some("test t"), &["file0"]);
let tree = TorrentFileTree::build(0, &t.info)?;
assert_eq!(
&tree.nodes,
&[TorrentFileTreeNode {
children: vec![],
parent_id: None,
real_torrent_file_id: Some(0),
title: "file0".into()
}]
);
Ok(())
}
#[test]
fn test_torrent_file_tree_flat() -> anyhow::Result<()> {
let t = create_torrent(Some("test t"), &["file0", "file1"]);
let tree = TorrentFileTree::build(0, &t.info)?;
assert_eq!(
&tree.nodes,
&[
TorrentFileTreeNode {
children: vec![1, 2],
parent_id: None,
real_torrent_file_id: None,
title: "test t".into()
},
TorrentFileTreeNode {
children: vec![],
parent_id: Some(0),
real_torrent_file_id: Some(0),
title: "file0".into()
},
TorrentFileTreeNode {
children: vec![],
parent_id: Some(0),
real_torrent_file_id: Some(1),
title: "file1".into()
}
]
);
Ok(())
}
#[test]
fn test_torrent_file_tree_nested() -> anyhow::Result<()> {
let t = create_torrent(
Some("test t"),
&["file0", "file1", "dir0/file2", "dir0/dir1/file3"],
);
let tree = TorrentFileTree::build(0, &t.info)?;
assert_eq!(
&tree.nodes,
&[
TorrentFileTreeNode {
children: vec![1, 2, 3],
parent_id: None,
real_torrent_file_id: None,
title: "test t".into()
},
TorrentFileTreeNode {
children: vec![],
parent_id: Some(0),
real_torrent_file_id: Some(0),
title: "file0".into()
},
TorrentFileTreeNode {
children: vec![],
parent_id: Some(0),
real_torrent_file_id: Some(1),
title: "file1".into()
},
TorrentFileTreeNode {
children: vec![4, 5],
parent_id: Some(0),
real_torrent_file_id: None,
title: "dir0".into()
},
TorrentFileTreeNode {
children: vec![],
parent_id: Some(3),
real_torrent_file_id: Some(2),
title: "file2".into()
},
TorrentFileTreeNode {
children: vec![6],
parent_id: Some(3),
real_torrent_file_id: None,
title: "dir1".into()
},
TorrentFileTreeNode {
children: vec![],
parent_id: Some(5),
real_torrent_file_id: Some(3),
title: "file3".into()
},
]
);
Ok(())
}
#[tokio::test]
async fn test_browse_direct_children() {
setup_test_logging();
let t1 = create_torrent(Some("t1"), &["f1"]);
let t2 = create_torrent(Some("t2"), &["d1/f2"]);
fn as_bytes(t: &TorrentMetaV1Owned) -> Bytes {
let mut b = Vec::new();
bencode_serialize_to_writer(t, &mut b).unwrap();
b.into()
}
let td = TempDir::new().unwrap();
let session = Session::new_with_opts(
td.path().to_owned(),
SessionOptions {
disable_dht: true,
..Default::default()
},
)
.await
.unwrap();
session
.add_torrent(
AddTorrent::from_bytes(as_bytes(&t1)),
Some(AddTorrentOptions {
paused: true,
..Default::default()
}),
)
.await
.unwrap();
session
.add_torrent(
AddTorrent::from_bytes(as_bytes(&t2)),
Some(AddTorrentOptions {
paused: true,
..Default::default()
}),
)
.await
.unwrap();
let adapter = UpnpServerSessionAdapter {
session,
hostname: "127.0.0.1".into(),
port: 9005,
};
assert_eq!(
adapter.browse_direct_children(0),
vec![
ItemOrContainer::Item(Item {
id: encode_id(0, 0),
parent_id: None,
title: "f1".into(),
mime_type: None,
url: "http://127.0.0.1:9005/torrents/0/stream/0/f1".into()
}),
ItemOrContainer::Container(Container {
id: encode_id(0, 1),
parent_id: None,
children_count: None,
title: "t2".into()
})
]
);
assert_eq!(
adapter.browse_direct_children(encode_id(0, 1)),
vec![ItemOrContainer::Container(Container {
id: encode_id(1, 1),
parent_id: Some(encode_id(0, 1)),
children_count: Some(1),
title: "d1".into()
}),]
);
assert_eq!(
adapter.browse_direct_children(encode_id(1, 1)),
vec![ItemOrContainer::Item(Item {
id: encode_id(2, 1),
parent_id: Some(encode_id(1, 1)),
title: "f2".into(),
mime_type: None,
url: "http://127.0.0.1:9005/torrents/1/stream/0/d1/f2".into()
})]
);
}
#[test]
fn test_encode_id() {
for local_id in 0..5 {
for torrent_id in 0..5 {
let encoded = encode_id(local_id, torrent_id);
let (decoded_local_id, decoded_torrent_id) = decode_id(encoded).unwrap();
assert_eq!(local_id, decoded_local_id);
assert_eq!(torrent_id, decoded_torrent_id);
}
}
}
}

View file

@ -26,6 +26,7 @@ postgres = ["librqbit/postgres"]
librqbit = { path = "../librqbit", default-features = false, features = [
"http-api",
"tracing-subscriber-utils",
"upnp-serve-adapter",
], version = "7.0.0-beta.2" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
console-subscriber = { version = "0.2", optional = true }
@ -43,6 +44,7 @@ serde_json = "1"
size_format = "1"
bytes = "1.5.0"
openssl = { version = "0.10", features = ["vendored"], optional = true }
upnp-serve = { path = "../upnp-serve" }
[dev-dependencies]
futures = { version = "0.3" }

View file

@ -1,6 +1,6 @@
use std::{io, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use anyhow::Context;
use anyhow::{bail, Context};
use clap::{CommandFactory, Parser, ValueEnum};
use clap_complete::Shell;
use librqbit::{
@ -16,6 +16,7 @@ use librqbit::{
PeerConnectionOptions, Session, SessionOptions, SessionPersistenceConfig, TorrentStatsState,
};
use size_format::SizeFormatterBinary as SF;
use tokio::net::TcpListener;
use tracing::{error, error_span, info, trace_span, warn};
#[derive(Debug, Clone, Copy, ValueEnum)]
@ -95,6 +96,15 @@ struct Opts {
#[arg(long = "disable-upnp")]
disable_upnp: bool,
/// If set, will run a UPNP Media server and stream all the torrents through it.
/// Should be set to your hostname/IP as seen by your LAN neighbors.
#[arg(long = "upnp-server-hostname")]
upnp_server_hostname: Option<String>,
/// UPNP server name that would be displayed on devices in your network.
#[arg(long = "upnp-server-friendly-name")]
upnp_server_friendly_name: Option<String>,
#[command(subcommand)]
subcommand: SubCommand,
@ -437,6 +447,28 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
trace_span!("stats_printer"),
stats_printer(session.clone()),
);
let mut upnp_server = {
match opts.upnp_server_hostname {
Some(hn) => {
if opts.http_api_listen_addr.ip().is_loopback() {
bail!("cannot enable UPNP server as HTTP API listen addr is localhost. Change --http-api-listen-addr to start with 0.0.0.0");
}
let server = session
.make_upnp_adapter(
opts.upnp_server_friendly_name
.unwrap_or_else(|| format!("rqbit at {hn}")),
hn,
opts.http_api_listen_addr.port(),
)
.await
.context("error starting UPNP server")?;
Some(server)
}
None => None,
}
};
let api = Api::new(
session,
Some(log_config.rust_log_reload_tx),
@ -444,10 +476,31 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
);
let http_api = HttpApi::new(api, Some(HttpApiOptions { read_only: false }));
let http_api_listen_addr = opts.http_api_listen_addr;
http_api
.make_http_api_and_run(http_api_listen_addr)
info!("starting HTTP API at http://{http_api_listen_addr}");
let tcp_listener = TcpListener::bind(http_api_listen_addr)
.await
.context("error running HTTP API")
.with_context(|| format!("error binding to {http_api_listen_addr}"))?;
let upnp_router = upnp_server.as_mut().and_then(|s| s.take_router().ok());
let http_api_fut = http_api.make_http_api_and_run(tcp_listener, upnp_router);
let res = match upnp_server {
Some(srv) => {
let upnp_fut = srv.run_ssdp_forever();
tokio::pin!(http_api_fut);
tokio::pin!(upnp_fut);
tokio::select! {
r = &mut http_api_fut => r,
r = &mut upnp_fut => r
}
}
None => http_api_fut.await,
};
res.context("error running rqbit server")
}
},
SubCommand::Download(download_opts) => {
@ -534,10 +587,16 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
);
let http_api = HttpApi::new(api, Some(HttpApiOptions { read_only: true }));
let http_api_listen_addr = opts.http_api_listen_addr;
info!("starting HTTP API at http://{http_api_listen_addr}");
let listener = tokio::net::TcpListener::bind(opts.http_api_listen_addr)
.await
.with_context(|| format!("error binding to {http_api_listen_addr}"))?;
librqbit_spawn(
"http_api",
error_span!("http_api"),
http_api.make_http_api_and_run(http_api_listen_addr),
http_api.make_http_api_and_run(listener, None),
);
let mut added = false;

View file

@ -0,0 +1,31 @@
[package]
name = "upnp-serve"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.86"
axum = { version = "0.7.5", features = ["tokio"] }
tokio = { version = "1.39.3", features = ["full"] }
tracing = "0.1.40"
bstr = "1.10.0"
quick-xml = "0.36.1"
http = "1.1.0"
httparse = "1.9.4"
uuid = { version = "1.10.0", features = ["v4"] }
librqbit-upnp = { path = "../upnp" }
gethostname = "0.5.0"
librqbit-sha1-wrapper = { path = "../sha1w" }
librqbit-core = { path = "../librqbit_core" }
mime_guess = "2.0.5"
url = "2.5.2"
parking_lot = "0.12.3"
tokio-util = "0.7.11"
reqwest = { version = "0.12.7", default-features = false }
[dev-dependencies]
tracing-subscriber = "0.3.18"
tower-http = { version = "0.5", features = ["trace"] }
[[example]]
name = "upnp-stub-server"

View file

@ -0,0 +1,17 @@
all: release-linux-armv7-musl
@PHONY: release-linux-current-target
release-linux-current-target:
CC_$(TARGET_SNAKE_CASE)=$(CROSS_COMPILE_PREFIX)-gcc \
CXX_$(TARGET_SNAKE_CASE)=$(CROSS_COMPILE_PREFIX)-g++ \
AR_$(TARGET_SNAKE_CASE)=$(CROSS_COMPILE_PREFIX)-ar \
CARGO_TARGET_$(TARGET_SNAKE_UPPER_CASE)_LINKER=$(CROSS_COMPILE_PREFIX)-gcc \
cargo build --target=$(TARGET)
@PHONY: release-linux-armv7-musl
release-linux-armv7-musl:
TARGET=armv7-unknown-linux-musleabihf \
TARGET_SNAKE_CASE=armv7_unknown_linux_musleabihf \
TARGET_SNAKE_UPPER_CASE=ARMV7_UNKNOWN_LINUX_MUSLEABIHF \
CROSS_COMPILE_PREFIX=armv7-linux-musleabihf \
$(MAKE) release-linux-current-target

View file

@ -0,0 +1,74 @@
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
str::FromStr,
};
use anyhow::Context;
use axum::routing::get;
use mime_guess::Mime;
use tracing::{error, info};
use upnp_serve::{
upnp_types::content_directory::response::{Item, ItemOrContainer},
UpnpServer, UpnpServerOptions,
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "trace");
}
tracing_subscriber::fmt::init();
let items: Vec<ItemOrContainer> = vec![ItemOrContainer::Item(Item {
title: "Example".to_owned(),
mime_type: Some(Mime::from_str("video/x-matroska")?),
url: "http://192.168.0.165:3030/torrents/4/stream/0/file.mkv".to_owned(),
id: 1,
parent_id: Some(0),
})];
const HTTP_PORT: u16 = 9005;
const HTTP_PREFIX: &str = "/upnp";
info!("Creating UpnpServer");
let mut server = UpnpServer::new(UpnpServerOptions {
friendly_name: "demo upnp server".to_owned(),
http_hostname: std::env::var("UPNP_HOSTNAME")
.context("you need to set UPNP_HOSTNAME to your IP visible from LAN")?,
http_listen_port: HTTP_PORT,
http_prefix: HTTP_PREFIX.to_owned(),
browse_provider: Box::new(items),
cancellation_token: Default::default(),
})
.await?;
let app = axum::Router::new()
.route("/", get(|| async { "hello world" }))
.nest(HTTP_PREFIX, server.take_router()?)
.layer(tower_http::trace::TraceLayer::new_for_http())
.into_make_service_with_connect_info::<SocketAddr>();
use tokio::net::TcpListener;
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, HTTP_PORT);
info!(?addr, "Binding TcpListener");
let listener = TcpListener::bind(addr)
.await
.with_context(|| format!("error binding to {addr}"))?;
tokio::spawn(async move {
let res = axum::serve(listener, app).await;
error!(error=?res, "error running HTTP server");
});
info!("Running SSDP");
server
.run_ssdp_forever()
.await
.context("error running SSDP")?;
error!("Unreachable");
Ok(())
}

View file

@ -0,0 +1,9 @@
pub const UPNP_KIND_ROOT_DEVICE: &str = "upnp:rootdevice";
pub const UPNP_KIND_MEDIASERVER: &str = "urn:schemas-upnp-org:device:MediaServer:1";
pub const SOAP_ACTION_CONTENT_DIRECTORY_BROWSE: &[u8] =
b"\"urn:schemas-upnp-org:service:ContentDirectory:1#Browse\"";
pub const SOAP_ACTION_GET_SYSTEM_UPDATE_ID: &[u8] =
b"\"urn:schemas-upnp-org:service:ContentDirectory:1#GetSystemUpdateID\"";
pub const CONTENT_TYPE_XML_UTF8: &str = "text/xml; charset=\"utf-8\"";

View file

@ -0,0 +1,214 @@
use std::{sync::atomic::Ordering, time::Duration};
use anyhow::Context;
use axum::{
body::Bytes,
extract::State,
handler::HandlerWithoutStateExt,
response::IntoResponse,
routing::{get, post},
};
use bstr::BStr;
use http::{header::CONTENT_TYPE, HeaderMap, HeaderName, StatusCode};
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use crate::{
constants::{
CONTENT_TYPE_XML_UTF8, SOAP_ACTION_CONTENT_DIRECTORY_BROWSE,
SOAP_ACTION_GET_SYSTEM_UPDATE_ID,
},
state::{UnpnServerState, UpnpServerStateInner},
templates::{
render_content_directory_browse, render_content_directory_control_get_system_update_id,
render_root_description_xml, RootDescriptionInputs,
},
upnp_types::content_directory::{
request::ContentDirectoryControlRequest, ContentDirectoryBrowseProvider,
},
};
async fn description_xml(State(state): State<UnpnServerState>) -> impl IntoResponse {
state.rendered_root_description.clone()
}
async fn generate_content_directory_control_response(
headers: HeaderMap,
State(state): State<UnpnServerState>,
body: Bytes,
) -> impl IntoResponse {
let body = BStr::new(&body);
let action = headers.get("soapaction").map(|v| BStr::new(v.as_bytes()));
trace!(?body, ?action, "received control request");
let action = match action {
Some(action) => action,
None => {
debug!("missing SOAPACTION header");
return (StatusCode::BAD_REQUEST, "").into_response();
}
};
match action.as_ref() {
SOAP_ACTION_CONTENT_DIRECTORY_BROWSE => {
let body = match std::str::from_utf8(body) {
Ok(body) => body,
Err(_) => return (StatusCode::BAD_REQUEST, "cannot parse request").into_response(),
};
let request = match ContentDirectoryControlRequest::parse(body) {
Ok(req) => req,
Err(e) => {
debug!(error=?e, "error parsing XML");
return (StatusCode::BAD_REQUEST, "cannot parse request").into_response();
}
};
(
[(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)],
render_content_directory_browse(
state.provider.browse_direct_children(request.object_id),
),
)
.into_response()
}
SOAP_ACTION_GET_SYSTEM_UPDATE_ID => {
let update_id = state.system_update_id.load(Ordering::Relaxed);
(
[(CONTENT_TYPE, CONTENT_TYPE_XML_UTF8)],
render_content_directory_control_get_system_update_id(update_id),
)
.into_response()
}
_ => {
debug!(?action, "unsupported ContentDirectory action");
(StatusCode::NOT_IMPLEMENTED, "").into_response()
}
}
}
async fn subscription(
State(state): State<UnpnServerState>,
request: axum::extract::Request,
) -> impl IntoResponse {
if request.method().as_str() != "SUBSCRIBE" {
return (StatusCode::METHOD_NOT_ALLOWED, "").into_response();
}
let (parts, _body) = request.into_parts();
trace!(?parts.headers, "subscription request");
let is_event = parts
.headers
.get(HeaderName::from_static("nt"))
.map(|v| v.as_bytes() == b"upnp:event")
.unwrap_or_default();
if !is_event {
return (StatusCode::BAD_REQUEST, "expected NT: upnp:event header").into_response();
}
let callback = parts
.headers
.get(HeaderName::from_static("callback"))
.and_then(|v| v.to_str().ok())
.map(|s| s.trim_matches(|c| c == '>' || c == '<'))
.and_then(|u| url::Url::parse(u).ok());
let callback = match callback {
Some(c) => c,
None => return (StatusCode::BAD_REQUEST, "callback not provided").into_response(),
};
let subscription_id = parts
.headers
.get(HeaderName::from_static("sid"))
.and_then(|v| v.to_str().ok());
let timeout = parts
.headers
.get(HeaderName::from_static("timeout"))
.and_then(|v| v.to_str().ok())
.and_then(|t| t.strip_prefix("Second-"))
.and_then(|t| t.parse::<u16>().ok())
.map(|t| Duration::from_secs(t as u64));
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1800);
let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT);
if let Some(sid) = subscription_id {
match state.renew_subscription(sid, timeout) {
Ok(()) => (
StatusCode::OK,
[
("SID", sid.to_owned()),
("TIMEOUT", format!("Second-{}", timeout.as_secs())),
],
)
.into_response(),
Err(e) => {
warn!(sid, error=?e, "error renewing subscription");
StatusCode::NOT_FOUND.into_response()
}
}
} else {
match state.new_subscription(callback, timeout) {
Ok(sid) => (
StatusCode::OK,
[
("SID", sid),
("TIMEOUT", format!("Second-{}", timeout.as_secs())),
],
)
.into_response(),
Err(e) => {
warn!(error=?e, "error creating subscription");
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
}
pub fn make_router(
friendly_name: String,
http_prefix: String,
upnp_usn: String,
browse_provider: Box<dyn ContentDirectoryBrowseProvider>,
cancellation_token: CancellationToken,
) -> anyhow::Result<axum::Router> {
let root_desc = render_root_description_xml(&RootDescriptionInputs {
friendly_name: &friendly_name,
manufacturer: "rqbit developers",
model_name: "1.0.0",
unique_id: &upnp_usn,
http_prefix: &http_prefix,
});
let state = UpnpServerStateInner::new(root_desc.into(), browse_provider, cancellation_token)
.context("error creating UPNP server")?;
let sub_handler = {
let state = state.clone();
move |request: axum::extract::Request| async move {
subscription(State(state.clone()), request).await
}
};
let app = axum::Router::new()
.route("/description.xml", get(description_xml))
.route(
"/scpd/ContentDirectory.xml",
get(|| async { include_str!("resources/scpd_content_directory.xml") }),
)
.route(
"/scpd/ConnectionManager.xml",
get(|| async { include_str!("resources/scpd_connection_manager.xml") }),
)
.route(
"/control/ContentDirectory",
post(generate_content_directory_control_response),
)
.route(
"/control/ConnectionManager",
post(|| async { (StatusCode::NOT_IMPLEMENTED, "") }),
)
.route_service("/subscribe", sub_handler.into_service())
.with_state(state);
Ok(app)
}

View file

@ -0,0 +1,102 @@
use std::{io::Write, time::Duration};
use anyhow::Context;
use gethostname::gethostname;
use http_handlers::make_router;
use librqbit_sha1_wrapper::ISha1;
use ssdp::SsdpRunner;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use upnp_types::content_directory::ContentDirectoryBrowseProvider;
mod constants;
mod http_handlers;
mod ssdp;
pub mod state;
mod subscriptions;
mod templates;
pub mod upnp_types;
pub struct UpnpServerOptions {
pub friendly_name: String,
pub http_hostname: String,
pub http_listen_port: u16,
pub http_prefix: String,
pub browse_provider: Box<dyn ContentDirectoryBrowseProvider>,
pub cancellation_token: CancellationToken,
}
pub struct UpnpServer {
axum_router: Option<axum::Router>,
ssdp_runner: SsdpRunner,
}
fn create_usn(opts: &UpnpServerOptions) -> anyhow::Result<String> {
let mut buf = Vec::new();
buf.write_all(gethostname().as_encoded_bytes())?;
write!(
&mut buf,
"{}{}{}",
opts.friendly_name, opts.http_listen_port, opts.http_prefix
)?;
let mut sha1 = librqbit_sha1_wrapper::Sha1::new();
sha1.update(&buf);
let hash = sha1.finish();
let uuid = uuid::Builder::from_slice(&hash[..16])
.context("error generating UUID")?
.into_uuid();
Ok(format!("uuid:{}", uuid))
}
impl UpnpServer {
pub async fn new(opts: UpnpServerOptions) -> anyhow::Result<Self> {
let usn = create_usn(&opts).context("error generating USN")?;
let description_http_location = {
let hostname = &opts.http_hostname;
let port = opts.http_listen_port;
let http_prefix = &opts.http_prefix;
format!("http://{hostname}:{port}{http_prefix}/description.xml")
};
let ssdp_runner = crate::ssdp::SsdpRunner::new(ssdp::SsdpRunnerOptions {
usn: usn.clone(),
description_http_location,
server_string: "Linux/3.4 UPnP/1.0 rqbit/1".to_owned(),
notify_interval: Duration::from_secs(60),
})
.await
.context("error initializing SsdpRunner")?;
let router = make_router(
opts.friendly_name,
opts.http_prefix,
usn,
opts.browse_provider,
opts.cancellation_token,
)?;
Ok(Self {
axum_router: Some(router),
ssdp_runner,
})
}
pub fn take_router(&mut self) -> anyhow::Result<axum::Router> {
self.axum_router
.take()
.context("programming error: router already taken")
}
pub async fn run_ssdp_forever(&self) -> anyhow::Result<()> {
debug!("starting SSDP");
self.ssdp_runner
.run_forever()
.await
.context("error running SSDP loop")
}
}

View file

@ -0,0 +1,182 @@
<?xml version="1.0"?>
<scpd xmlns="urn:schemas-upnp-org:service-1-0">
<specVersion>
<major>1</major>
<minor>0</minor>
</specVersion>
<actionList>
<action>
<name>GetProtocolInfo</name>
<argumentList>
<argument>
<name>Source</name>
<direction>out</direction>
<relatedStateVariable>SourceProtocolInfo</relatedStateVariable>
</argument>
<argument>
<name>Sink</name>
<direction>out</direction>
<relatedStateVariable>SinkProtocolInfo</relatedStateVariable>
</argument>
</argumentList>
</action>
<action>
<name>PrepareForConnection</name>
<argumentList>
<argument>
<name>RemoteProtocolInfo</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_ProtocolInfo</relatedStateVariable>
</argument>
<argument>
<name>PeerConnectionManager</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_ConnectionManager</relatedStateVariable>
</argument>
<argument>
<name>PeerConnectionID</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_ConnectionID</relatedStateVariable>
</argument>
<argument>
<name>Direction</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_Direction</relatedStateVariable>
</argument>
<argument>
<name>ConnectionID</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_ConnectionID</relatedStateVariable>
</argument>
<argument>
<name>AVTransportID</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_AVTransportID</relatedStateVariable>
</argument>
<argument>
<name>RcsID</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_RcsID</relatedStateVariable>
</argument>
</argumentList>
</action>
<action>
<name>ConnectionComplete</name>
<argumentList>
<argument>
<name>ConnectionID</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_ConnectionID</relatedStateVariable>
</argument>
</argumentList>
</action>
<action>
<name>GetCurrentConnectionIDs</name>
<argumentList>
<argument>
<name>ConnectionIDs</name>
<direction>out</direction>
<relatedStateVariable>CurrentConnectionIDs</relatedStateVariable>
</argument>
</argumentList>
</action>
<action>
<name>GetCurrentConnectionInfo</name>
<argumentList>
<argument>
<name>ConnectionID</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_ConnectionID</relatedStateVariable>
</argument>
<argument>
<name>RcsID</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_RcsID</relatedStateVariable>
</argument>
<argument>
<name>AVTransportID</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_AVTransportID</relatedStateVariable>
</argument>
<argument>
<name>ProtocolInfo</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_ProtocolInfo</relatedStateVariable>
</argument>
<argument>
<name>PeerConnectionManager</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_ConnectionManager</relatedStateVariable>
</argument>
<argument>
<name>PeerConnectionID</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_ConnectionID</relatedStateVariable>
</argument>
<argument>
<name>Direction</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_Direction</relatedStateVariable>
</argument>
<argument>
<name>Status</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_ConnectionStatus</relatedStateVariable>
</argument>
</argumentList>
</action>
</actionList>
<serviceStateTable>
<stateVariable sendEvents="yes">
<name>SourceProtocolInfo</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="yes">
<name>SinkProtocolInfo</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="yes">
<name>CurrentConnectionIDs</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_ConnectionStatus</name>
<dataType>string</dataType>
<allowedValueList>
<allowedValue>OK</allowedValue>
<allowedValue>ContentFormatMismatch</allowedValue>
<allowedValue>InsufficientBandwidth</allowedValue>
<allowedValue>UnreliableChannel</allowedValue>
<allowedValue>Unknown</allowedValue>
</allowedValueList>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_ConnectionManager</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_Direction</name>
<dataType>string</dataType>
<allowedValueList>
<allowedValue>Input</allowedValue>
<allowedValue>Output</allowedValue>
</allowedValueList>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_ProtocolInfo</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_ConnectionID</name>
<dataType>i4</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_AVTransportID</name>
<dataType>i4</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_RcsID</name>
<dataType>i4</dataType>
</stateVariable>
</serviceStateTable>
</scpd>

View file

@ -0,0 +1,184 @@
<?xml version="1.0"?>
<scpd xmlns="urn:schemas-upnp-org:service-1-0">
<specVersion>
<major>1</major>
<minor>0</minor>
</specVersion>
<actionList>
<action>
<name>Browse</name>
<argumentList>
<argument>
<name>ObjectID</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_ObjectID</relatedStateVariable>
</argument>
<argument>
<name>BrowseFlag</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_BrowseFlag</relatedStateVariable>
</argument>
<argument>
<name>Filter</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_Filter</relatedStateVariable>
</argument>
<argument>
<name>StartingIndex</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_Index</relatedStateVariable>
</argument>
<argument>
<name>RequestedCount</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_Count</relatedStateVariable>
</argument>
<argument>
<name>SortCriteria</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_SortCriteria</relatedStateVariable>
</argument>
<argument>
<name>Result</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_Result</relatedStateVariable>
</argument>
<argument>
<name>NumberReturned</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_Count</relatedStateVariable>
</argument>
<argument>
<name>TotalMatches</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_Count</relatedStateVariable>
</argument>
<argument>
<name>UpdateID</name>
<direction>out</direction>
<relatedStateVariable>A_ARG_TYPE_UpdateID</relatedStateVariable>
</argument>
</argumentList>
</action>
</actionList>
<serviceStateTable>
<stateVariable sendEvents="no">
<name>SearchCapabilities</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>SortCapabilities</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>SortExtensionCapabilities</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="yes">
<name>SystemUpdateID</name>
<dataType>ui4</dataType>
</stateVariable>
<stateVariable sendEvents="yes">
<name>ContainerUpdateIDs</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="yes">
<name>TransferIDs</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>FeatureList</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_ObjectID</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_Result</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_SearchCriteria</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_BrowseFlag</name>
<dataType>string</dataType>
<allowedValueList>
<allowedValue>BrowseMetadata</allowedValue>
<allowedValue>BrowseDirectChildren</allowedValue>
</allowedValueList>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_Filter</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_SortCriteria</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_Index</name>
<dataType>ui4</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_Count</name>
<dataType>ui4</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_UpdateID</name>
<dataType>ui4</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_TransferID</name>
<dataType>ui4</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_TransferStatus</name>
<dataType>string</dataType>
<allowedValueList>
<allowedValue>COMPLETED</allowedValue>
<allowedValue>ERROR</allowedValue>
<allowedValue>IN_PROGRESS</allowedValue>
<allowedValue>STOPPED</allowedValue>
</allowedValueList>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_TransferLength</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_TransferTotal</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_TagValueList</name>
<dataType>string</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_URI</name>
<dataType>uri</dataType>
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_CategoryType</name>
<dataType>ui4</dataType>
<defaultValue />
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_RID</name>
<dataType>ui4</dataType>
<defaultValue />
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_PosSec</name>
<dataType>ui4</dataType>
<defaultValue />
</stateVariable>
<stateVariable sendEvents="no">
<name>A_ARG_TYPE_Featurelist</name>
<dataType>string</dataType>
<defaultValue />
</stateVariable>
</serviceStateTable>
</scpd>

View file

@ -0,0 +1,4 @@
<container id="{id}" parentID="{parent_id}" restricted="true" {childCountTag}>
<dc:title>{title}</dc:title>
<upnp:class>object.container.storageFolder</upnp:class>
</container>

View file

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:BrowseResponse xmlns:u="urn:schemas-upnp-org:service:ContentDirectory:1">
<Result><![CDATA[{result}]]></Result>
<NumberReturned>{number_returned}</NumberReturned>
<TotalMatches>{total_matches}</TotalMatches>
<UpdateID>{update_id}</UpdateID>
</u:BrowseResponse>
</s:Body>
</s:Envelope>

View file

@ -0,0 +1,5 @@
<item id="{id}" parentID="{parent_id}" restricted="true">
<dc:title>{title}</dc:title>
<upnp:class>{upnp_class}</upnp:class>
<res protocolInfo="http-get:*:{mime_type}:DLNA.ORG_OP=01">{url}</res>
</item>

View file

@ -0,0 +1,5 @@
<DIDL-Lite xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/">
{items}
</DIDL-Lite>

View file

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="utf-8"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:GetSystemUpdateIDResponse xmlns:u="urn:schemas-upnp-org:service:ContentDirectory:1">
<Id>{id}</Id>
</u:GetSystemUpdateIDResponse>
</s:Body>
</s:Envelope>

View file

@ -0,0 +1,5 @@
<e:propertyset xmlns:e="urn:schemas-upnp-org:event-1-0">
<e:property>
<SystemUpdateID>{system_update_id}</SystemUpdateID>
</e:property>
</e:propertyset>

View file

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<root xmlns="urn:schemas-upnp-org:device-1-0">
<specVersion>
<major>1</major>
<minor>0</minor>
</specVersion>
<device>
<deviceType>urn:schemas-upnp-org:device:MediaServer:1</deviceType>
<friendlyName>{friendly_name}</friendlyName>
<manufacturer>{manufacturer}</manufacturer>
<modelName>{model_name}</modelName>
<UDN>{unique_id}</UDN>
<serviceList>
<service>
<serviceType>urn:schemas-upnp-org:service:ContentDirectory:1</serviceType>
<serviceId>urn:upnp-org:serviceId:ContentDirectory</serviceId>
<SCPDURL>{http_prefix}/scpd/ContentDirectory.xml</SCPDURL>
<controlURL>{http_prefix}/control/ContentDirectory</controlURL>
<eventSubURL>{http_prefix}/subscribe</eventSubURL>
</service>
<service>
<serviceType>urn:schemas-upnp-org:service:ConnectionManager:1</serviceType>
<serviceId>urn:upnp-org:serviceId:ConnectionManager</serviceId>
<SCPDURL>{http_prefix}/scpd/ConnectionManager.xml</SCPDURL>
<controlURL>{http_prefix}/control/ConnectionManager</controlURL>
</service>
</serviceList>
<presentationURL>/</presentationURL>
</device>
</root>

View file

@ -0,0 +1,12 @@
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:Browse xmlns:u="urn:schemas-upnp-org:service:ContentDirectory:1">
<ObjectID>5</ObjectID>
<BrowseFlag>BrowseDirectChildren</BrowseFlag>
<Filter>*</Filter>
<StartingIndex>0</StartingIndex>
<RequestedCount>5000</RequestedCount>
<SortCriteria></SortCriteria>
</u:Browse>
</s:Body>
</s:Envelope>

View file

@ -0,0 +1,255 @@
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
time::Duration,
};
use anyhow::{bail, Context};
use bstr::BStr;
use tokio::net::UdpSocket;
use tracing::{debug, trace, warn};
use crate::constants::{UPNP_KIND_MEDIASERVER, UPNP_KIND_ROOT_DEVICE};
const UPNP_PORT: u16 = 1900;
const UPNP_BROADCAST_IP: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250);
const UPNP_BROADCAST_ADDR: SocketAddrV4 = SocketAddrV4::new(UPNP_BROADCAST_IP, UPNP_PORT);
#[derive(Debug)]
pub enum SsdpMessage<'a, 'h> {
MSearch(SsdpMSearchRequest<'a>),
#[allow(dead_code)]
OtherRequest(httparse::Request<'h, 'a>),
#[allow(dead_code)]
Response(httparse::Response<'h, 'a>),
}
#[derive(Debug)]
pub struct SsdpMSearchRequest<'a> {
pub host: &'a BStr,
pub man: &'a BStr,
pub st: &'a BStr,
}
impl<'a> SsdpMSearchRequest<'a> {
fn matches_media_server(&self) -> bool {
if self.host != "239.255.255.250:1900" {
return false;
}
if self.man != "\"ssdp:discover\"" {
return false;
}
if self.st == UPNP_KIND_ROOT_DEVICE || self.st == UPNP_KIND_MEDIASERVER {
return true;
}
false
}
}
pub fn try_parse_ssdp<'a, 'h>(
buf: &'a [u8],
headers: &'h mut [httparse::Header<'a>],
) -> anyhow::Result<SsdpMessage<'a, 'h>> {
if buf.starts_with(b"HTTP/") {
let mut resp = httparse::Response::new(headers);
resp.parse(buf).context("error parsing response")?;
return Ok(SsdpMessage::Response(resp));
}
let mut req = httparse::Request::new(headers);
req.parse(buf).context("error parsing request")?;
match req.method {
Some("M-SEARCH") => {
let mut host = None;
let mut man = None;
let mut st = None;
for header in req.headers.iter() {
match header.name {
"HOST" | "Host" | "host" => host = Some(header.value),
"MAN" | "Man" | "man" => man = Some(header.value),
"ST" | "St" | "st" => st = Some(header.value),
other => trace!(header=?BStr::new(other), "ignoring SSDP header"),
}
}
match (host, man, st) {
(Some(host), Some(man), Some(st)) => {
return Ok(SsdpMessage::MSearch(SsdpMSearchRequest {
host: BStr::new(host),
man: BStr::new(man),
st: BStr::new(st),
}))
}
_ => bail!("not all of host, man and st are set"),
}
}
_ => return Ok(SsdpMessage::OtherRequest(req)),
}
}
pub struct SsdpRunnerOptions {
pub usn: String,
pub description_http_location: String,
pub server_string: String,
pub notify_interval: Duration,
}
pub struct SsdpRunner {
opts: SsdpRunnerOptions,
socket: UdpSocket,
}
impl SsdpRunner {
pub async fn new(opts: SsdpRunnerOptions) -> anyhow::Result<Self> {
let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, UPNP_PORT);
trace!(addr=?bind_addr, "binding UDP");
let socket =
tokio::net::UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, UPNP_PORT))
.await
.context("error binding")?;
trace!(multiaddr=?UPNP_BROADCAST_IP, interface=?Ipv4Addr::UNSPECIFIED, "joining multicast v4 group");
socket
.join_multicast_v4(UPNP_BROADCAST_IP, Ipv4Addr::UNSPECIFIED)
.context("error joining multicast group")?;
Ok(Self { opts, socket })
}
fn generate_notify_message(&self, kind: &str) -> String {
let usn: &str = &self.opts.usn;
let description_http_location = &self.opts.description_http_location;
let server: &str = &self.opts.server_string;
let bcast_addr = UPNP_BROADCAST_ADDR;
format!(
"NOTIFY * HTTP/1.1\r
Host: {bcast_addr}\r
Cache-Control: max-age=75\r
Location: {description_http_location}\r
NT: {kind}\r
NTS: ssdp:alive\r
Server: {server}\r
USN: {usn}::{kind}\r
\r
"
)
}
fn generate_ssdp_discover_response(&self) -> String {
let location = &self.opts.description_http_location;
let usn = &self.opts.usn;
let media_server = UPNP_KIND_MEDIASERVER;
let server = &self.opts.server_string;
format!(
"HTTP/1.1 200 OK\r
Cache-Control: max-age=75\r
Ext: \r
Location: {location}\r
Server: {server}\r
St: {media_server}\r
Usn: {usn}::{media_server}\r
Content-Length: 0\r\n\r\n"
)
}
async fn try_send_notifies(&self) {
for kind in [UPNP_KIND_ROOT_DEVICE, UPNP_KIND_MEDIASERVER] {
let msg = self.generate_notify_message(kind);
trace!(content=?msg, addr=?UPNP_BROADCAST_ADDR, "sending SSDP NOTIFY");
if let Err(e) = self
.socket
.send_to(msg.as_bytes(), UPNP_BROADCAST_ADDR)
.await
{
warn!(error=?e, "error sending SSDP NOTIFY")
}
}
}
async fn task_send_notifies_periodically(&self) -> anyhow::Result<()> {
let mut interval = tokio::time::interval(self.opts.notify_interval);
loop {
interval.tick().await;
self.try_send_notifies().await;
}
}
async fn process_incoming_message(&self, msg: &[u8], addr: SocketAddr) -> anyhow::Result<()> {
let mut headers = [httparse::EMPTY_HEADER; 16];
trace!(content = ?BStr::new(msg), ?addr, "received message");
let parsed = try_parse_ssdp(msg, &mut headers);
let msg = match parsed {
Ok(SsdpMessage::MSearch(msg)) => msg,
Ok(m) => {
trace!("ignoring {m:?}");
return Ok(());
}
Err(e) => {
debug!(error=?e, "error parsing SSDP message");
return Ok(());
}
};
if !msg.matches_media_server() {
trace!("not a media server request, ignoring");
return Ok(());
}
let response = self.generate_ssdp_discover_response();
trace!(content = response, ?addr, "sending SSDP discover response");
self.socket
.send_to(response.as_bytes(), addr)
.await
.context("error sending")?;
Ok(())
}
async fn task_respond_on_msearches(&self) -> anyhow::Result<()> {
let mut buf = vec![0u8; 16184];
loop {
let (sz, addr) = self
.socket
.recv_from(&mut buf)
.await
.context("error receiving")?;
let msg = &buf[..sz];
if let Err(e) = self.process_incoming_message(msg, addr).await {
warn!(error=?e, ?addr, "error processing incoming SSDP message")
}
}
}
async fn send_msearch(&self) -> anyhow::Result<()> {
let msearch_msg = "M-SEARCH * HTTP/1.1\r
HOST: 239.255.255.250:1900\r
ST: urn:schemas-upnp-org:device:MediaServer:1\r
MAN: \"ssdp:discover\"\r
MX: 2\r\n\r\n";
trace!(content = msearch_msg, "multicasting M-SEARCH");
self.socket
.send_to(msearch_msg.as_bytes(), UPNP_BROADCAST_ADDR)
.await
.context("error sending msearch")?;
Ok(())
}
pub async fn run_forever(&self) -> anyhow::Result<()> {
// This isn't necessary, but would show that it works.
self.send_msearch().await?;
let t1 = self.task_respond_on_msearches();
let t2 = self.task_send_notifies_periodically();
tokio::pin!(t1);
tokio::pin!(t2);
tokio::select! {
r = &mut t1 => r,
r = &mut t2 => r,
}
}
}

View file

@ -0,0 +1,78 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use anyhow::Context;
use axum::body::Bytes;
use librqbit_core::spawn_utils::spawn_with_cancel;
use tokio_util::sync::CancellationToken;
use tracing::{error_span, Span};
use crate::{
subscriptions::Subscriptions, upnp_types::content_directory::ContentDirectoryBrowseProvider,
};
pub struct UpnpServerStateInner {
pub rendered_root_description: Bytes,
pub provider: Box<dyn ContentDirectoryBrowseProvider>,
pub system_update_id: AtomicU64,
pub subscriptions: Subscriptions,
pub span: Span,
pub system_update_bcast_tx: tokio::sync::broadcast::Sender<u64>,
pub cancel_token: tokio_util::sync::CancellationToken,
_drop_guard: tokio_util::sync::DropGuard,
}
fn new_system_update_id() -> anyhow::Result<u64> {
Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
}
impl UpnpServerStateInner {
pub fn new(
rendered_root_description: Bytes,
provider: Box<dyn ContentDirectoryBrowseProvider>,
cancellation_token: CancellationToken,
) -> anyhow::Result<Arc<Self>> {
let cancel_token = cancellation_token.child_token();
let drop_guard = cancel_token.clone().drop_guard();
let (btx, _) = tokio::sync::broadcast::channel(32);
let span = error_span!(parent: None, "upnp-server");
let state = Arc::new(Self {
rendered_root_description,
provider,
system_update_id: AtomicU64::new(new_system_update_id()?),
subscriptions: Default::default(),
system_update_bcast_tx: btx,
_drop_guard: drop_guard,
span: span.clone(),
cancel_token: cancel_token.clone(),
});
spawn_with_cancel(
error_span!(parent: span, "system_update_id_updater"),
cancel_token,
{
let state = Arc::downgrade(&state);
async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
let new_value = new_system_update_id()?;
let state = state.upgrade().context("upnp server is dead")?;
state.system_update_id.store(new_value, Ordering::Relaxed);
let _ = state.system_update_bcast_tx.send(new_value);
}
}
},
);
Ok(state)
}
}
pub type UnpnServerState = Arc<UpnpServerStateInner>;

View file

@ -0,0 +1,200 @@
use crate::state::UpnpServerStateInner;
use crate::templates::render_notify_subscription_system_update_id;
use anyhow::Context;
use http::Method;
use librqbit_core::spawn_utils::spawn_with_cancel;
use parking_lot::RwLock;
use std::{
collections::HashMap,
sync::{atomic::Ordering, Arc},
time::Duration,
};
use tokio::sync::{broadcast::error::RecvError, Notify};
use tracing::{error_span, warn, Instrument};
pub struct Subscription {
pub url: url::Url,
pub seq: u64,
pub timeout: Duration,
pub refresh_notify: Arc<Notify>,
}
#[derive(Default)]
pub struct Subscriptions {
subs: RwLock<HashMap<String, Subscription>>,
}
impl Subscriptions {
pub fn add(&self, url: url::Url, timeout: Duration) -> (String, Arc<Notify>) {
let sid = format!("uuid:{}", uuid::Uuid::new_v4());
let notify = Arc::new(Notify::default());
self.subs.write().insert(
sid.clone(),
Subscription {
url,
seq: 0,
timeout,
refresh_notify: notify.clone(),
},
);
(sid, notify)
}
pub fn update_timeout(&self, sid: &str, timeout: Duration) -> anyhow::Result<()> {
let mut g = self.subs.write();
let s = g.get_mut(sid).context("no such subscription")?;
s.timeout = timeout;
s.refresh_notify.notify_waiters();
Ok(())
}
pub fn next_seq(&self, sid: &str) -> anyhow::Result<u64> {
let mut g = self.subs.write();
let s = g.get_mut(sid).context("no such subscription")?;
let id = s.seq;
s.seq += 1;
Ok(id)
}
pub fn get_timeout(&self, sid: &str) -> anyhow::Result<Duration> {
let mut g = self.subs.write();
let s = g.get_mut(sid).context("no such subscription")?;
Ok(s.timeout)
}
pub fn remove(&self, sid: &str) -> anyhow::Result<Subscription> {
let mut g = self.subs.write();
let s = g.remove(sid).context("no such subscription")?;
Ok(s)
}
}
pub async fn notify_subscription_system_update(
url: &url::Url,
sid: &str,
seq: u64,
system_update_id: u64,
) -> anyhow::Result<()> {
// NOTIFY /callback_path HTTP/1.1
// CONTENT-TYPE: text/xml; charset="utf-8"
// NT: upnp:event
// NTS: upnp:propchange
// SID: uuid:<Subscription ID>
// SEQ: <sequence number>
//
let body = render_notify_subscription_system_update_id(system_update_id);
let resp = reqwest::Client::builder()
.build()?
.request(Method::from_bytes(b"NOTIFY")?, url.clone())
.header("Content-Type", r#"text/xml; charset="utf-8""#)
.header("NT", "upnp:event")
.header("NTS", "upnp:propchange")
.header("SID", sid)
.header("SEQ", seq.to_string())
.body(body)
.send()
.await?;
if !resp.status().is_success() {
anyhow::bail!("{:?}", resp.status())
}
Ok(())
}
impl UpnpServerStateInner {
pub fn renew_subscription(&self, sid: &str, new_timeout: Duration) -> anyhow::Result<()> {
self.subscriptions.update_timeout(sid, new_timeout)
}
pub fn new_subscription(
self: &Arc<Self>,
url: url::Url,
timeout: Duration,
) -> anyhow::Result<String> {
let (sid, refresh_notify) = self.subscriptions.add(url.clone(), timeout);
let token = self.cancel_token.child_token();
// Spawn a task that will notify it of system id changes.
// Spawn a task that will wait for timeout or subscription refreshes.
// When it times out, kill all of them.
let pspan = self.span.clone();
let subscription_manager = {
let mut brx = self.system_update_bcast_tx.subscribe();
let state = Arc::downgrade(self);
let sid = sid.clone();
let url = url.clone();
async move {
let system_update_id_notifier = async {
loop {
let res = brx.recv().await;
let state = state.upgrade().context("upnp server dead")?;
let seq = state.subscriptions.next_seq(&sid)?;
match res {
Ok(system_update_id) => {
if let Err(e) = notify_subscription_system_update(
&url,
&sid,
seq,
system_update_id,
)
.await
{
warn!(error=?e, "error updating UPNP subscription");
}
}
Err(RecvError::Lagged(by)) => {
warn!(by, "UPNP subscription lagged");
let seq = state.subscriptions.next_seq(&sid)?;
let system_update_id =
state.system_update_id.load(Ordering::Relaxed);
if let Err(e) = notify_subscription_system_update(
&url,
&sid,
seq,
system_update_id,
)
.await
{
warn!(error=?e, "error updating UPNP subscription");
}
}
Err(RecvError::Closed) => return Ok(()),
}
}
}
.instrument(error_span!("system-update-id-notifier"));
let timeout_notifier = async {
let mut timeout = timeout;
loop {
tokio::select! {
_ = refresh_notify.notified() => {
timeout = state.upgrade().context("upnp server dead")?.subscriptions.get_timeout(&sid)?;
},
_ = tokio::time::sleep(timeout) => {
state.upgrade().context("upnp server dead")?.subscriptions.remove(&sid)?;
return Ok(())
}
}
}
}.instrument(error_span!("timeout-killer"));
tokio::select! {
r = system_update_id_notifier => r,
r = timeout_notifier => r,
}
}
};
spawn_with_cancel(
error_span!(parent: pspan, "subscription-manager", %url),
token,
subscription_manager,
);
Ok(sid)
}
}

View file

@ -0,0 +1,124 @@
use crate::upnp_types::content_directory::response::{Container, Item, ItemOrContainer};
pub struct RootDescriptionInputs<'a> {
pub friendly_name: &'a str,
pub manufacturer: &'a str,
pub model_name: &'a str,
pub unique_id: &'a str,
pub http_prefix: &'a str,
}
pub fn render_root_description_xml(input: &RootDescriptionInputs<'_>) -> String {
let tmpl = include_str!("resources/templates/root_desc.tmpl.xml").trim();
// This isn't great perf-wise but whatever.
tmpl.replace("{friendly_name}", input.friendly_name)
.replace("{manufacturer}", input.manufacturer)
.replace("{model_name}", input.model_name)
.replace("{unique_id}", input.unique_id)
.replace("{http_prefix}", input.http_prefix)
}
pub fn render_content_directory_browse(items: impl IntoIterator<Item = ItemOrContainer>) -> String {
fn item_or_container(item_or_container: &ItemOrContainer) -> Option<String> {
fn item(item: &Item) -> Option<String> {
let tmpl =
include_str!("resources/templates/content_directory_control_browse_item.tmpl.xml")
.trim();
let mime = item.mime_type.as_ref()?;
let upnp_class = match mime.type_().as_str() {
"video" => "object.item.videoItem",
_ => return None,
};
let mime = mime.to_string();
Some(
tmpl.replace("{id}", &item.id.to_string())
.replace("{parent_id}", &item.parent_id.unwrap_or(0).to_string())
.replace("{mime_type}", &mime)
.replace("{url}", &item.url)
.replace("{upnp_class}", upnp_class)
.replace("{title}", &item.title),
)
}
fn container(item: &Container) -> String {
let tmpl = include_str!(
"resources/templates/content_directory_control_browse_container.tmpl.xml"
)
.trim();
tmpl.replace("{id}", &format!("{}", item.id))
.replace("{parent_id}", &item.parent_id.unwrap_or(0).to_string())
.replace("{title}", &item.title)
.replace(
"{childCountTag}",
&match item.children_count {
Some(cc) => format!("childCount=\"{}\"", cc),
None => String::new(),
},
)
}
match item_or_container {
ItemOrContainer::Container(c) => Some(container(c)),
ItemOrContainer::Item(i) => item(i),
}
}
struct Envelope<'a> {
result: &'a str,
number_returned: usize,
total_matches: usize,
update_id: u64,
}
fn render_content_directory_envelope(envelope: &Envelope<'_>) -> String {
let tmpl =
include_str!("resources/templates/content_directory_control_browse_envelope.tmpl.xml")
.trim();
tmpl.replace("{result}", envelope.result)
.replace("{number_returned}", &envelope.number_returned.to_string())
.replace("{total_matches}", &envelope.total_matches.to_string())
.replace("{update_id}", &envelope.update_id.to_string())
}
fn render_content_directory_browse_result(items: &str) -> String {
let tmpl =
include_str!("resources/templates/content_directory_control_browse_result.tmpl.xml")
.trim();
tmpl.replace("{items}", items)
}
let all_items = items
.into_iter()
.filter_map(|item| item_or_container(&item))
.collect::<Vec<_>>();
let total = all_items.len();
let all_items = all_items.join("");
let result = render_content_directory_browse_result(&all_items);
use std::time::{SystemTime, UNIX_EPOCH};
let update_id = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
render_content_directory_envelope(&Envelope {
result: &result,
number_returned: total,
total_matches: total,
update_id,
})
}
pub fn render_notify_subscription_system_update_id(update_id: u64) -> String {
include_str!("resources/templates/notify_subscription.tmpl.xml")
.replace("{system_update_id}", &update_id.to_string())
}
pub fn render_content_directory_control_get_system_update_id(update_id: u64) -> String {
include_str!("resources/templates/content_directory_control_get_system_update_id.tmpl.xml")
.replace("{id}", &update_id.to_string())
}

View file

@ -0,0 +1,81 @@
pub mod content_directory {
use response::ItemOrContainer;
pub mod request {
pub struct ContentDirectoryControlRequest {
pub object_id: usize,
}
impl ContentDirectoryControlRequest {
pub fn parse(s: &str) -> anyhow::Result<Self> {
let mut reader = quick_xml::Reader::from_str(s);
use quick_xml::events::Event::{Eof, Start};
let mut object_id: Option<usize> = None;
loop {
match reader.read_event()? {
Eof => break,
Start(e) if e.name().as_ref() == b"ObjectID" => {
let t = reader.read_text(e.to_end().name())?;
object_id = t.trim().parse().ok();
}
_ => continue,
}
}
Ok(ContentDirectoryControlRequest {
object_id: object_id.unwrap_or(0),
})
}
}
}
pub mod response {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Container {
pub id: usize,
pub parent_id: Option<usize>,
pub children_count: Option<usize>,
pub title: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Item {
pub id: usize,
pub parent_id: Option<usize>,
pub title: String,
pub mime_type: Option<mime_guess::Mime>,
pub url: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ItemOrContainer {
Container(Container),
Item(Item),
}
}
pub trait ContentDirectoryBrowseProvider: Send + Sync {
fn browse_direct_children(&self, parent_id: usize) -> Vec<ItemOrContainer>;
}
impl ContentDirectoryBrowseProvider for Vec<ItemOrContainer> {
fn browse_direct_children(&self, _parent_id: usize) -> Vec<ItemOrContainer> {
self.clone()
}
}
}
#[cfg(test)]
mod tests {
use crate::upnp_types::content_directory::request::ContentDirectoryControlRequest;
#[test]
fn test_parse_content_directory_request() {
let s = include_str!("resources/test/ContentDirectoryControlExampleRequest.xml");
let req = ContentDirectoryControlRequest::parse(s).unwrap();
assert_eq!(req.object_id, 5);
}
}

View file

@ -14,7 +14,7 @@ readme = "README.md"
[dependencies]
tracing = "0.1"
anyhow = "1"
reqwest = { version = "0.12" }
reqwest = { version = "0.12", default-features = false }
serde = { version = "1", features = ["derive"] }
serde-xml-rs = "0.6.0"
tokio = { version = "1", features = ["macros"] }

View file

@ -136,12 +136,6 @@ dependencies = [
"system-deps 6.2.2",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "autocfg"
version = "1.3.0"
@ -1376,25 +1370,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "h2"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205"
dependencies = [
"atomic-waker",
"bytes",
"fnv",
"futures-core",
"futures-sink",
"http 1.1.0",
"indexmap 2.4.0",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@ -1532,7 +1507,6 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"h2",
"http 1.1.0",
"http-body",
"httparse",
@ -1544,23 +1518,6 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper",
"hyper-util",
"rustls",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tower-service",
]
[[package]]
name = "hyper-tls"
version = "0.6.0"
@ -2984,15 +2941,12 @@ checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63"
dependencies = [
"base64 0.22.1",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http 1.1.0",
"http-body",
"http-body-util",
"hyper",
"hyper-rustls",
"hyper-tls",
"hyper-util",
"ipnet",
@ -3008,7 +2962,6 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"sync_wrapper 1.0.1",
"system-configuration",
"tokio",
"tokio-native-tls",
"tokio-socks",
@ -3020,21 +2973,6 @@ dependencies = [
"windows-registry",
]
[[package]]
name = "ring"
version = "0.17.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
dependencies = [
"cc",
"cfg-if",
"getrandom 0.2.15",
"libc",
"spin",
"untrusted",
"windows-sys 0.52.0",
]
[[package]]
name = "rlimit"
version = "0.10.1"
@ -3092,19 +3030,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rustls"
version = "0.23.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044"
dependencies = [
"once_cell",
"rustls-pki-types",
"rustls-webpki",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-pemfile"
version = "2.1.3"
@ -3121,17 +3046,6 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0"
[[package]]
name = "rustls-webpki"
version = "0.102.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e"
dependencies = [
"ring",
"rustls-pki-types",
"untrusted",
]
[[package]]
name = "rustversion"
version = "1.0.17"
@ -3476,12 +3390,6 @@ dependencies = [
"system-deps 5.0.0",
]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
@ -3529,12 +3437,6 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "subtle"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "1.0.109"
@ -3572,27 +3474,6 @@ dependencies = [
"futures-core",
]
[[package]]
name = "system-configuration"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "658bc6ee10a9b4fcf576e9b0819d95ec16f4d2c02d39fd83ac1c8789785c4a42"
dependencies = [
"bitflags 2.6.0",
"core-foundation",
"system-configuration-sys",
]
[[package]]
name = "system-configuration-sys"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "system-deps"
version = "5.0.0"
@ -4032,17 +3913,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
"rustls",
"rustls-pki-types",
"tokio",
]
[[package]]
name = "tokio-socks"
version = "0.5.2"
@ -4316,12 +4186,6 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202"
[[package]]
name = "untrusted"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "url"
version = "2.5.2"
@ -5100,9 +4964,3 @@ dependencies = [
"quote",
"syn 2.0.75",
]
[[package]]
name = "zeroize"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde"

View file

@ -115,13 +115,20 @@ async fn api_from_config(
);
if !config.http_api.disable {
let http_api_task = librqbit::http_api::HttpApi::new(
api.clone(),
Some(librqbit::http_api::HttpApiOptions {
read_only: config.http_api.read_only,
}),
)
.make_http_api_and_run(config.http_api.listen_addr);
let listen_addr = config.http_api.listen_addr;
let api = api.clone();
let read_only = config.http_api.read_only;
let http_api_task = async move {
let listener = tokio::net::TcpListener::bind(listen_addr)
.await
.with_context(|| format!("error listening on {}", listen_addr))?;
librqbit::http_api::HttpApi::new(
api.clone(),
Some(librqbit::http_api::HttpApiOptions { read_only }),
)
.make_http_api_and_run(listener, None)
.await
};
session.spawn(error_span!("http_api"), http_api_task);
}