Merge pull request #86 from ikatson/less-spawns-async-stream

Reduce incremental compile times a bit + small refactorings
This commit is contained in:
Igor Katson 2024-02-27 08:27:33 +00:00 committed by GitHub
commit cbbbce2ec4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 594 additions and 475 deletions

22
Cargo.lock generated
View file

@ -1257,6 +1257,7 @@ name = "librqbit"
version = "5.5.0"
dependencies = [
"anyhow",
"async-stream",
"axum 0.7.4",
"backoff",
"base64",
@ -1277,6 +1278,7 @@ dependencies = [
"librqbit-dht",
"librqbit-peer-protocol",
"librqbit-sha1-wrapper",
"librqbit-tracker-comms",
"librqbit-upnp",
"openssl",
"parking_lot",
@ -1396,6 +1398,26 @@ dependencies = [
"sha1",
]
[[package]]
name = "librqbit-tracker-comms"
version = "1.0.0"
dependencies = [
"anyhow",
"async-stream",
"byteorder",
"futures",
"librqbit-bencode",
"librqbit-buffers",
"librqbit-core",
"rand",
"reqwest",
"serde",
"tokio",
"tracing",
"url",
"urlencoding",
]
[[package]]
name = "librqbit-upnp"
version = "0.1.0"

View file

@ -10,7 +10,8 @@ members = [
"crates/librqbit_core",
"crates/peer_binary_protocol",
"crates/dht",
"crates/upnp"
"crates/upnp",
"crates/tracker_comms",
]
[profile.dev]
@ -22,4 +23,4 @@ debug = true
[profile.release-github]
inherits = "release"
debug = false
debug = false

View file

@ -23,7 +23,9 @@ use anyhow::{bail, Context};
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
use bencode::ByteString;
use dashmap::DashMap;
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use futures::{
future::BoxFuture, stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryFutureExt,
};
use leaky_bucket::RateLimiter;
use librqbit_core::{
@ -232,6 +234,7 @@ impl Drop for RequestPeersStream {
impl Stream for RequestPeersStream {
type Item = SocketAddr;
#[inline(never)]
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
@ -1144,49 +1147,54 @@ impl DhtState {
&self.cancellation_token
}
pub async fn with_config(mut config: DhtConfig) -> anyhow::Result<Arc<Self>> {
let socket = match config.listen_addr {
Some(addr) => UdpSocket::bind(addr)
.await
.with_context(|| format!("error binding socket, address {addr}")),
None => UdpSocket::bind("0.0.0.0:0")
.await
.context("error binding socket, address 0.0.0.0:0"),
}?;
#[inline(never)]
pub fn with_config(mut config: DhtConfig) -> BoxFuture<'static, anyhow::Result<Arc<Self>>> {
async move {
let socket = match config.listen_addr {
Some(addr) => UdpSocket::bind(addr)
.await
.with_context(|| format!("error binding socket, address {addr}")),
None => UdpSocket::bind("0.0.0.0:0")
.await
.context("error binding socket, address 0.0.0.0:0"),
}?;
let listen_addr = socket
.local_addr()
.context("cannot determine UDP listen addr")?;
info!("DHT listening on {:?}", listen_addr);
let listen_addr = socket
.local_addr()
.context("cannot determine UDP listen addr")?;
info!("DHT listening on {:?}", listen_addr);
let peer_id = config.peer_id.unwrap_or_else(generate_peer_id);
info!("starting up DHT with peer id {:?}", peer_id);
let bootstrap_addrs = config
.bootstrap_addrs
.unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect());
let peer_id = config.peer_id.unwrap_or_else(generate_peer_id);
info!("starting up DHT with peer id {:?}", peer_id);
let bootstrap_addrs = config
.bootstrap_addrs
.unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect());
let token = config.cancellation_token.take().unwrap_or_default();
let token = config.cancellation_token.take().unwrap_or_default();
let (in_tx, in_rx) = unbounded_channel();
let state = Arc::new(Self::new_internal(
peer_id,
in_tx,
config.routing_table,
listen_addr,
config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)),
token,
));
let (in_tx, in_rx) = unbounded_channel();
let state = Arc::new(Self::new_internal(
peer_id,
in_tx,
config.routing_table,
listen_addr,
config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)),
token,
));
spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), {
let state = state.clone();
async move {
let worker = DhtWorker { socket, dht: state };
worker.start(in_rx, &bootstrap_addrs).await
}
});
Ok(state)
spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), {
let state = state.clone();
async move {
let worker = DhtWorker { socket, dht: state };
worker.start(in_rx, &bootstrap_addrs).await
}
});
Ok(state)
}
.boxed()
}
#[inline(never)]
pub fn get_peers(
self: &Arc<Self>,
info_hash: Id20,

View file

@ -1,5 +1,7 @@
// TODO: this now stores only the routing table, but we also need AT LEAST the same socket address...
use futures::future::BoxFuture;
use futures::FutureExt;
use librqbit_core::directories::get_configuration_directory;
use librqbit_core::spawn_utils::spawn_with_cancel;
use serde::{Deserialize, Serialize};
@ -75,94 +77,102 @@ impl PersistentDht {
Ok(path)
}
pub async fn create(
#[inline(never)]
pub fn create(
config: Option<PersistentDhtConfig>,
cancellation_token: Option<CancellationToken>,
) -> anyhow::Result<Dht> {
let mut config = config.unwrap_or_default();
let config_filename = match config.config_filename.take() {
Some(config_filename) => config_filename,
None => Self::default_persistence_filename()?,
};
) -> BoxFuture<'static, anyhow::Result<Dht>> {
async move {
let mut config = config.unwrap_or_default();
let config_filename = match config.config_filename.take() {
Some(config_filename) => config_filename,
None => Self::default_persistence_filename()?,
};
info!(
filename=?config_filename,
"will store DHT routing table periodically",
);
info!(
filename=?config_filename,
"will store DHT routing table periodically",
);
if let Some(parent) = config_filename.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("error creating dir {:?}", &parent))?;
}
let de = match OpenOptions::new().read(true).open(&config_filename) {
Ok(dht_json) => {
let reader = BufReader::new(dht_json);
match serde_json::from_reader::<_, DhtSerialize<RoutingTable, PeerStore>>(reader) {
Ok(r) => {
info!(filename=?config_filename, "loaded DHT routing table from");
Some(r)
}
Err(e) => {
warn!(
filename=?config_filename,
"cannot deserialize routing table: {:#}",
e
);
None
}
}
if let Some(parent) = config_filename.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("error creating dir {:?}", &parent))?;
}
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => None,
_ => return Err(e).with_context(|| format!("error reading {config_filename:?}")),
},
};
let (listen_addr, routing_table, peer_store) = de
.map(|de| (Some(de.addr), Some(de.table), de.peer_store))
.unwrap_or((None, None, None));
let peer_id = routing_table.as_ref().map(|r| r.id());
let dht_config = DhtConfig {
peer_id,
routing_table,
listen_addr,
peer_store,
cancellation_token,
..Default::default()
};
let dht = DhtState::with_config(dht_config).await?;
spawn_with_cancel(
error_span!("dht_persistence"),
dht.cancellation_token().clone(),
{
let dht = dht.clone();
let dump_interval = config
.dump_interval
.unwrap_or_else(|| Duration::from_secs(3));
async move {
let tempfile_name = {
let file_name = format!("dht.json.tmp.{}", std::process::id());
let mut tmp = config_filename.clone();
tmp.set_file_name(file_name);
tmp
};
loop {
trace!("sleeping for {:?}", &dump_interval);
tokio::time::sleep(dump_interval).await;
match dump_dht(&dht, &config_filename, &tempfile_name) {
Ok(_) => trace!(filename=?config_filename, "dumped DHT"),
Err(e) => {
error!(filename=?config_filename, "error dumping DHT: {:#}", e)
}
let de = match OpenOptions::new().read(true).open(&config_filename) {
Ok(dht_json) => {
let reader = BufReader::new(dht_json);
match serde_json::from_reader::<_, DhtSerialize<RoutingTable, PeerStore>>(
reader,
) {
Ok(r) => {
info!(filename=?config_filename, "loaded DHT routing table from");
Some(r)
}
Err(e) => {
warn!(
filename=?config_filename,
"cannot deserialize routing table: {:#}",
e
);
None
}
}
}
},
);
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => None,
_ => {
return Err(e).with_context(|| format!("error reading {config_filename:?}"))
}
},
};
let (listen_addr, routing_table, peer_store) = de
.map(|de| (Some(de.addr), Some(de.table), de.peer_store))
.unwrap_or((None, None, None));
let peer_id = routing_table.as_ref().map(|r| r.id());
Ok(dht)
let dht_config = DhtConfig {
peer_id,
routing_table,
listen_addr,
peer_store,
cancellation_token,
..Default::default()
};
let dht = DhtState::with_config(dht_config).await?;
spawn_with_cancel(
error_span!("dht_persistence"),
dht.cancellation_token().clone(),
{
let dht = dht.clone();
let dump_interval = config
.dump_interval
.unwrap_or_else(|| Duration::from_secs(3));
async move {
let tempfile_name = {
let file_name = format!("dht.json.tmp.{}", std::process::id());
let mut tmp = config_filename.clone();
tmp.set_file_name(file_name);
tmp
};
loop {
trace!("sleeping for {:?}", &dump_interval);
tokio::time::sleep(dump_interval).await;
match dump_dht(&dht, &config_filename, &tempfile_name) {
Ok(_) => trace!(filename=?config_filename, "dumped DHT"),
Err(e) => {
error!(filename=?config_filename, "error dumping DHT: {:#}", e)
}
}
}
}
},
);
Ok(dht)
}
.boxed()
}
}

View file

@ -23,6 +23,7 @@ rust-tls = ["reqwest/rustls-tls"]
[dependencies]
bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"}
tracker_comms = {path = "../tracker_comms", default-features=false, package="librqbit-tracker-comms", version="1.0.0"}
buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"}
librqbit-core = {path = "../librqbit_core", version = "3.5.0"}
clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
@ -68,6 +69,7 @@ serde_with = "3.4.0"
tokio-util = "0.7.10"
bytes = "1.5.0"
rlimit = "0.10.1"
async-stream = "0.3.5"
[dev-dependencies]
futures = {version = "0.3"}

View file

@ -3,7 +3,8 @@ use axum::body::Bytes;
use axum::extract::{Path, Query, State};
use axum::response::IntoResponse;
use axum::routing::{get, post};
use futures::TryStreamExt;
use futures::future::BoxFuture;
use futures::{FutureExt, TryStreamExt};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
@ -44,7 +45,8 @@ impl HttpApi {
/// Run the HTTP server forever on the given address.
/// If read_only is passed, no state-modifying methods will be exposed.
pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> {
#[inline(never)]
pub fn make_http_api_and_run(self, addr: SocketAddr) -> BoxFuture<'static, anyhow::Result<()>> {
let state = self.inner;
async fn api_root() -> impl IntoResponse {
@ -288,11 +290,15 @@ impl HttpApi {
info!(%addr, "starting HTTP server");
use tokio::net::TcpListener;
let listener = TcpListener::bind(&addr)
.await
.with_context(|| format!("error binding to {addr}"))?;
axum::serve(listener, app).await?;
Ok(())
async move {
let listener = TcpListener::bind(&addr)
.await
.with_context(|| format!("error binding to {addr}"))?;
axum::serve(listener, app).await?;
Ok(())
}
.boxed()
}
}

View file

@ -1,4 +1,5 @@
use anyhow::Context;
use futures::{future::BoxFuture, FutureExt};
use serde::Deserialize;
use crate::{
@ -59,6 +60,7 @@ async fn json_response<T: serde::de::DeserializeOwned + std::any::Any>(
}
impl HttpApiClient {
#[inline(never)]
pub fn new(url: &str) -> anyhow::Result<Self> {
Ok(Self {
base_url: reqwest::Url::parse(url)?,
@ -70,40 +72,47 @@ impl HttpApiClient {
&self.base_url
}
pub async fn validate_rqbit_server(&self) -> anyhow::Result<()> {
let response = self.client.get(self.base_url.clone()).send().await?;
let root: ApiRoot = json_response(response).await?;
if root.server == "rqbit" {
return Ok(());
#[inline(never)]
pub fn validate_rqbit_server(&self) -> BoxFuture<'_, anyhow::Result<()>> {
async move {
let response = self.client.get(self.base_url.clone()).send().await?;
let root: ApiRoot = json_response(response).await?;
if root.server == "rqbit" {
return Ok(());
}
anyhow::bail!("not an rqbit server at {}", &self.base_url)
}
anyhow::bail!("not an rqbit server at {}", &self.base_url)
.boxed()
}
pub async fn add_torrent(
&self,
torrent: AddTorrent<'_>,
pub fn add_torrent<'a>(
&'a self,
torrent: AddTorrent<'a>,
opts: Option<AddTorrentOptions>,
) -> anyhow::Result<ApiAddTorrentResponse> {
let opts = opts.unwrap_or_default();
let params = TorrentAddQueryParams {
overwrite: Some(opts.overwrite),
only_files_regex: opts.only_files_regex,
only_files: None,
output_folder: opts.output_folder,
sub_folder: opts.sub_folder,
list_only: Some(opts.list_only),
..Default::default()
};
let qs = serde_urlencoded::to_string(&params).unwrap();
let url = format!("{}torrents?{}", &self.base_url, qs);
let response = check_response(
self.client
.post(&url)
.body(torrent.into_bytes())
.send()
.await?,
)
.await?;
json_response(response).await
) -> BoxFuture<'a, anyhow::Result<ApiAddTorrentResponse>> {
async move {
let opts = opts.unwrap_or_default();
let params = TorrentAddQueryParams {
overwrite: Some(opts.overwrite),
only_files_regex: opts.only_files_regex,
only_files: None,
output_folder: opts.output_folder,
sub_folder: opts.sub_folder,
list_only: Some(opts.list_only),
..Default::default()
};
let qs = serde_urlencoded::to_string(&params).unwrap();
let url = format!("{}torrents?{}", &self.base_url, qs);
let response = check_response(
self.client
.post(&url)
.body(torrent.into_bytes())
.send()
.await?,
)
.await?;
json_response(response).await
}
.boxed()
}
}

View file

@ -36,9 +36,6 @@ mod session;
mod spawn_utils;
mod torrent_state;
pub mod tracing_subscriber_config_utils;
pub mod tracker_comms;
pub mod tracker_comms_http;
pub mod tracker_comms_udp;
mod type_aliases;
pub use api::Api;

View file

@ -9,12 +9,22 @@ use std::{
time::Duration,
};
use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
spawn_utils::BlockingSpawner,
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
},
type_aliases::PeerStream,
};
use anyhow::{bail, Context};
use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::{ByteBuf, ByteBufT, ByteString};
use clone_to_owned::CloneToOwned;
use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig};
use futures::{stream::FuturesUnordered, TryFutureExt};
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, TryFutureExt};
use librqbit_core::{
directories::get_configuration_directory,
magnet::Magnet,
@ -32,18 +42,7 @@ use tokio::net::{TcpListener, TcpStream};
use tokio_stream::StreamExt;
use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::{debug, error, error_span, info, trace, warn, Instrument};
use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
spawn_utils::BlockingSpawner,
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
},
tracker_comms::TrackerComms,
type_aliases::PeerStream,
};
use tracker_comms::TrackerComms;
pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
@ -298,6 +297,7 @@ pub enum AddTorrent<'a> {
impl<'a> AddTorrent<'a> {
// Don't call this from HTTP API.
#[inline(never)]
pub fn from_cli_argument(path: &'a str) -> anyhow::Result<Self> {
if SUPPORTED_SCHEMES.iter().any(|s| path.starts_with(s)) {
return Ok(Self::Url(Cow::Borrowed(path)));
@ -314,6 +314,7 @@ impl<'a> AddTorrent<'a> {
}
// Don't call this from HTTP API.
#[inline(never)]
pub fn from_local_filename(filename: &str) -> anyhow::Result<Self> {
let file = read_local_file_including_stdin(filename)
.with_context(|| format!("error reading local file {filename:?}"))?;
@ -378,8 +379,9 @@ pub(crate) struct CheckedIncomingConnection {
impl Session {
/// Create a new session. The passed in folder will be used as a default unless overriden per torrent.
pub async fn new(output_folder: PathBuf) -> anyhow::Result<Arc<Self>> {
Self::new_with_opts(output_folder, SessionOptions::default()).await
#[inline(never)]
pub fn new(output_folder: PathBuf) -> BoxFuture<'static, anyhow::Result<Arc<Self>>> {
Self::new_with_opts(output_folder, SessionOptions::default())
}
pub fn default_persistence_filename() -> anyhow::Result<PathBuf> {
@ -392,93 +394,97 @@ impl Session {
}
/// Create a new session with options.
pub async fn new_with_opts(
#[inline(never)]
pub fn new_with_opts(
output_folder: PathBuf,
mut opts: SessionOptions,
) -> anyhow::Result<Arc<Self>> {
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
let token = CancellationToken::new();
) -> BoxFuture<'static, anyhow::Result<Arc<Self>>> {
async move {
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
let token = CancellationToken::new();
let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range {
let (l, p) = create_tcp_listener(port_range)
.await
.context("error listening on TCP")?;
info!("Listening on 0.0.0.0:{p} for incoming peer connections");
(Some(l), Some(p))
} else {
(None, None)
};
let dht = if opts.disable_dht {
None
} else {
let dht = if opts.disable_dht_persistence {
DhtBuilder::with_config(DhtConfig {
cancellation_token: Some(token.child_token()),
..Default::default()
})
.await
.context("error initializing DHT")?
} else {
let pdht_config = opts.dht_config.take().unwrap_or_default();
PersistentDht::create(Some(pdht_config), Some(token.clone()))
let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range {
let (l, p) = create_tcp_listener(port_range)
.await
.context("error initializing persistent DHT")?
.context("error listening on TCP")?;
info!("Listening on 0.0.0.0:{p} for incoming peer connections");
(Some(l), Some(p))
} else {
(None, None)
};
Some(dht)
};
let peer_opts = opts.peer_opts.unwrap_or_default();
let persistence_filename = match opts.persistence_filename {
Some(filename) => filename,
None => Self::default_persistence_filename()?,
};
let spawner = BlockingSpawner::default();
let dht = if opts.disable_dht {
None
} else {
let dht = if opts.disable_dht_persistence {
DhtBuilder::with_config(DhtConfig {
cancellation_token: Some(token.child_token()),
..Default::default()
})
.await
.context("error initializing DHT")?
} else {
let pdht_config = opts.dht_config.take().unwrap_or_default();
PersistentDht::create(Some(pdht_config), Some(token.clone()))
.await
.context("error initializing persistent DHT")?
};
let session = Arc::new(Self {
persistence_filename,
peer_id,
dht,
peer_opts,
spawner,
output_folder,
db: RwLock::new(Default::default()),
_cancellation_token_drop_guard: token.clone().drop_guard(),
cancellation_token: token,
tcp_listen_port,
});
Some(dht)
};
let peer_opts = opts.peer_opts.unwrap_or_default();
let persistence_filename = match opts.persistence_filename {
Some(filename) => filename,
None => Self::default_persistence_filename()?,
};
let spawner = BlockingSpawner::default();
if let Some(tcp_listener) = tcp_listener {
session.spawn(
error_span!("tcp_listen", port = tcp_listen_port),
session.clone().task_tcp_listener(tcp_listener),
);
}
let session = Arc::new(Self {
persistence_filename,
peer_id,
dht,
peer_opts,
spawner,
output_folder,
db: RwLock::new(Default::default()),
_cancellation_token_drop_guard: token.clone().drop_guard(),
cancellation_token: token,
tcp_listen_port,
});
if let Some(listen_port) = tcp_listen_port {
if opts.enable_upnp_port_forwarding {
if let Some(tcp_listener) = tcp_listener {
session.spawn(
error_span!("upnp_forward", port = listen_port),
session.clone().task_upnp_port_forwarder(listen_port),
error_span!("tcp_listen", port = tcp_listen_port),
session.clone().task_tcp_listener(tcp_listener),
);
}
}
if opts.persistence {
info!(
"will use {:?} for session persistence",
session.persistence_filename
);
if let Some(parent) = session.persistence_filename.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!("couldn't create directory {:?} for session storage", parent)
})?;
if let Some(listen_port) = tcp_listen_port {
if opts.enable_upnp_port_forwarding {
session.spawn(
error_span!("upnp_forward", port = listen_port),
session.clone().task_upnp_port_forwarder(listen_port),
);
}
}
let persistence_task = session.clone().task_persistence();
session.spawn(error_span!("session_persistence"), persistence_task);
}
Ok(session)
if opts.persistence {
info!(
"will use {:?} for session persistence",
session.persistence_filename
);
if let Some(parent) = session.persistence_filename.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!("couldn't create directory {:?} for session storage", parent)
})?;
}
let persistence_task = session.clone().task_persistence();
session.spawn(error_span!("session_persistence"), persistence_task);
}
Ok(session)
}
.boxed()
}
async fn task_persistence(self: Arc<Self>) -> anyhow::Result<()> {
@ -739,149 +745,140 @@ impl Session {
}
/// Add a torrent to the session.
pub async fn add_torrent(
&self,
add: AddTorrent<'_>,
#[inline(never)]
pub fn add_torrent<'a>(
&'a self,
add: AddTorrent<'a>,
opts: Option<AddTorrentOptions>,
) -> anyhow::Result<AddTorrentResponse> {
// Magnet links are different in that we first need to discover the metadata.
let span = error_span!("add_torrent");
let _ = span.enter();
) -> BoxFuture<'a, anyhow::Result<AddTorrentResponse>> {
async move {
// Magnet links are different in that we first need to discover the metadata.
let span = error_span!("add_torrent");
let _ = span.enter();
let opts = opts.unwrap_or_default();
let opts = opts.unwrap_or_default();
let announce_port = if opts.list_only {
None
} else {
self.tcp_listen_port
};
let announce_port = if opts.list_only {
None
} else {
self.tcp_listen_port
};
let cancellation_token = self.cancellation_token.child_token();
let cancellation_token_drop_guard = cancellation_token.clone().drop_guard();
let paused = opts.list_only || opts.paused;
let paused = opts.list_only || opts.paused;
// The main difference between magnet link and torrent file, is that we need to resolve the magnet link
// into a torrent file by connecting to peers that support extended handshakes.
// So we must discover at least one peer and connect to it to be able to proceed further.
// The main difference between magnet link and torrent file, is that we need to resolve the magnet link
// into a torrent file by connecting to peers that support extended handshakes.
// So we must discover at least one peer and connect to it to be able to proceed further.
let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
let magnet =
Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?;
let info_hash = magnet
.as_id20()
.context("magnet link didn't contain a BTv1 infohash")?;
let (info_hash, info, trackers, peer_rx, initial_peers) = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
let magnet = Magnet::parse(&magnet)
.context("provided path is not a valid magnet URL")?;
let info_hash = magnet
.as_id20()
.context("magnet link didn't contain a BTv1 infohash")?;
let peer_token = cancellation_token.child_token();
let peer_rx = self.make_peer_rx(
info_hash,
magnet.trackers.clone(),
peer_token.clone(),
announce_port,
opts.force_tracker_interval,
)?;
let peer_rx = match peer_rx {
Some(peer_rx) => peer_rx,
None => bail!("can't find peers: DHT disabled and no trackers in magnet"),
};
debug!(?info_hash, "querying DHT");
let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver(
self.peer_id,
info_hash,
opts.initial_peers.clone().unwrap_or_default(),
peer_rx,
Some(self.merge_peer_opts(opts.peer_opts)),
)
.await
{
ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen),
ReadMetainfoResult::ChannelClosed { .. } => {
anyhow::bail!("DHT died, no way to discover torrent metainfo")
}
};
if paused {
peer_token.cancel();
}
debug!(?info, "received result from DHT");
(
info_hash,
info,
magnet.trackers,
Some(peer_rx),
initial_peers,
cancellation_token,
)
}
other => {
let torrent = match other {
AddTorrent::Url(url)
if url.starts_with("http://") || url.starts_with("https://") =>
{
torrent_from_url(&url).await?
}
AddTorrent::Url(url) => {
bail!(
"unsupported URL {:?}. Supporting magnet:, http:, and https",
url
)
}
AddTorrent::TorrentFileBytes(bytes) => {
torrent_from_bytes(&bytes).context("error decoding torrent")?
}
AddTorrent::TorrentInfo(t) => *t,
};
let trackers = torrent
.iter_announce()
.filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) {
Ok(url) => Some(url.to_owned()),
Err(_) => {
warn!("cannot parse tracker url as utf-8, ignoring");
None
}
})
.collect::<Vec<_>>();
let peer_rx = if paused {
None
} else {
self.make_peer_rx(
torrent.info_hash,
trackers.clone(),
cancellation_token.clone(),
let peer_rx = self.make_peer_rx(
info_hash,
magnet.trackers.clone(),
announce_port,
opts.force_tracker_interval,
)?
};
)?;
let peer_rx = match peer_rx {
Some(peer_rx) => peer_rx,
None => bail!("can't find peers: DHT disabled and no trackers in magnet"),
};
(
torrent.info_hash,
torrent.info,
trackers,
peer_rx,
opts.initial_peers
.clone()
.unwrap_or_default()
.into_iter()
.collect(),
cancellation_token,
)
}
};
debug!(?info_hash, "querying DHT");
let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver(
self.peer_id,
info_hash,
opts.initial_peers.clone().unwrap_or_default(),
peer_rx,
Some(self.merge_peer_opts(opts.peer_opts)),
)
.await
{
ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen),
ReadMetainfoResult::ChannelClosed { .. } => {
anyhow::bail!("DHT died, no way to discover torrent metainfo")
}
};
debug!(?info, "received result from DHT");
(
info_hash,
info,
magnet.trackers,
Some(peer_rx),
initial_peers,
)
}
other => {
let torrent = match other {
AddTorrent::Url(url)
if url.starts_with("http://") || url.starts_with("https://") =>
{
torrent_from_url(&url).await?
}
AddTorrent::Url(url) => {
bail!(
"unsupported URL {:?}. Supporting magnet:, http:, and https",
url
)
}
AddTorrent::TorrentFileBytes(bytes) => {
torrent_from_bytes(&bytes).context("error decoding torrent")?
}
AddTorrent::TorrentInfo(t) => *t,
};
cancellation_token_drop_guard.disarm();
let trackers = torrent
.iter_announce()
.filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) {
Ok(url) => Some(url.to_owned()),
Err(_) => {
warn!("cannot parse tracker url as utf-8, ignoring");
None
}
})
.collect::<Vec<_>>();
self.main_torrent_info(
info_hash,
info,
trackers,
peer_rx,
initial_peers.into_iter().collect(),
opts,
cancellation_token,
)
.await
let peer_rx = if paused {
None
} else {
self.make_peer_rx(
torrent.info_hash,
trackers.clone(),
announce_port,
opts.force_tracker_interval,
)?
};
(
torrent.info_hash,
torrent.info,
trackers,
peer_rx,
opts.initial_peers
.clone()
.unwrap_or_default()
.into_iter()
.collect(),
)
}
};
self.main_torrent_info(
info_hash,
info,
trackers,
peer_rx,
initial_peers.into_iter().collect(),
opts,
)
.await
}
.boxed()
}
#[allow(clippy::too_many_arguments)]
@ -893,12 +890,9 @@ impl Session {
peer_rx: Option<PeerStream>,
initial_peers: Vec<SocketAddr>,
opts: AddTorrentOptions,
cancellation_token: CancellationToken,
) -> anyhow::Result<AddTorrentResponse> {
debug!("Torrent info: {:#?}", &info);
let drop_guard = cancellation_token.clone().drop_guard();
let get_only_files =
|only_files: Option<Vec<usize>>, only_files_regex: Option<String>, list_only: bool| {
match (only_files, only_files_regex) {
@ -1016,20 +1010,16 @@ impl Session {
let span = managed_torrent.info.span.clone();
let _ = span.enter();
// Just in case, cancel all tasks started for this torrent so far.
// This is defensive, and not proven necessary.
let token = if opts.paused {
cancellation_token.cancel();
self.cancellation_token.child_token()
} else {
cancellation_token
};
managed_torrent
.start(initial_peers, peer_rx, opts.paused, token)
.start(
initial_peers,
peer_rx,
opts.paused,
self.cancellation_token.child_token(),
)
.context("error starting torrent")?;
}
drop_guard.disarm();
Ok(AddTorrentResponse::Added(id, managed_torrent))
}
@ -1080,7 +1070,6 @@ impl Session {
&self,
info_hash: Id20,
trackers: Vec<String>,
cancel: CancellationToken,
announce_port: Option<u16>,
force_tracker_interval: Option<Duration>,
) -> anyhow::Result<Option<PeerStream>> {
@ -1097,7 +1086,6 @@ impl Session {
// TODO: report actual bytes, not zeroes.
Box::new(()),
force_tracker_interval,
cancel,
announce_port,
);
@ -1111,15 +1099,18 @@ impl Session {
}
pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
let token = self.cancellation_token.child_token();
let peer_rx = self.make_peer_rx(
handle.info_hash(),
handle.info().trackers.clone().into_iter().collect(),
token.clone(),
self.tcp_listen_port,
handle.info().options.force_tracker_interval,
)?;
handle.start(Default::default(), peer_rx, false, token)?;
handle.start(
Default::default(),
peer_rx,
false,
self.cancellation_token.child_token(),
)?;
Ok(())
}
}

View file

@ -15,6 +15,8 @@ use std::time::Duration;
use anyhow::bail;
use anyhow::Context;
use buffers::ByteString;
use futures::future::BoxFuture;
use futures::FutureExt;
use librqbit_core::hash_id::Id20;
use librqbit_core::lengths::Lengths;
use librqbit_core::peer_id::generate_peer_id;
@ -395,23 +397,29 @@ impl ManagedTorrent {
})
}
pub async fn wait_until_completed(&self) -> anyhow::Result<()> {
// TODO: rewrite, this polling is horrible
let live = loop {
let live = self.with_state(|s| match s {
ManagedTorrentState::Initializing(_) | ManagedTorrentState::Paused(_) => Ok(None),
ManagedTorrentState::Live(l) => Ok(Some(l.clone())),
ManagedTorrentState::Error(e) => bail!("{:?}", e),
ManagedTorrentState::None => bail!("bug: torrent state is None"),
})?;
if let Some(live) = live {
break live;
}
tokio::time::sleep(Duration::from_secs(1)).await;
};
#[inline(never)]
pub fn wait_until_completed(&self) -> BoxFuture<'_, anyhow::Result<()>> {
async move {
// TODO: rewrite, this polling is horrible
let live = loop {
let live = self.with_state(|s| match s {
ManagedTorrentState::Initializing(_) | ManagedTorrentState::Paused(_) => {
Ok(None)
}
ManagedTorrentState::Live(l) => Ok(Some(l.clone())),
ManagedTorrentState::Error(e) => bail!("{:?}", e),
ManagedTorrentState::None => bail!("bug: torrent state is None"),
})?;
if let Some(live) = live {
break live;
}
tokio::time::sleep(Duration::from_secs(1)).await;
};
live.wait_until_completed().await;
Ok(())
live.wait_until_completed().await;
Ok(())
}
.boxed()
}
}

View file

@ -60,6 +60,7 @@ pub struct InitLoggingResult {
pub line_broadcast: LineBroadcast,
}
#[inline(never)]
pub fn init_logging(opts: InitLoggingOptions) -> anyhow::Result<InitLoggingResult> {
let stderr_filter = EnvFilter::builder()
.with_default_directive(

View file

@ -5,4 +5,4 @@ use futures::Stream;
pub type BF = bitvec::vec::BitVec<u8, bitvec::order::Msb0>;
pub type PeerHandle = SocketAddr;
pub type PeerStream = Box<dyn Stream<Item = SocketAddr> + Unpin + Send + Sync + 'static>;
pub type PeerStream = Box<dyn Stream<Item = SocketAddr> + Unpin + Send + 'static>;

View file

@ -159,6 +159,7 @@ impl<BufType: AsRef<[u8]>> TorrentMetaV1Info<BufType> {
Some(expected_hash == hash)
}
#[inline(never)]
pub fn iter_filenames_and_lengths(
&self,
) -> anyhow::Result<impl Iterator<Item = (FileIteratorName<'_, BufType>, u64)>> {

View file

@ -0,0 +1,27 @@
[package]
name = "librqbit-tracker-comms"
version = "1.0.0"
edition = "2018"
description = "Common interface around various sha1 implementations used in rqbit torrent client."
license = "Apache-2.0"
documentation = "https://docs.rs/librqbit-tracker-comms"
repository = "https://github.com/ikatson/rqbit"
readme = "README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = "1"
anyhow = "1"
futures = "0.3"
async-stream = "0.3.5"
buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"}
librqbit-core = {path = "../librqbit_core", version = "3.5.0"}
byteorder = "1.5"
serde = {version = "1", features=["derive"]}
urlencoding = "2"
rand = "0.8"
tracing = "0.1.40"
reqwest = {version="0.11.22", default-features=false, features = ["json"]}
bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"}
url = "2"

View file

@ -0,0 +1,5 @@
mod tracker_comms;
mod tracker_comms_http;
mod tracker_comms_udp;
pub use tracker_comms::*;

View file

@ -4,13 +4,15 @@ use std::time::Duration;
use anyhow::bail;
use anyhow::Context;
use futures::Stream;
use librqbit_core::spawn_utils::spawn_with_cancel;
use tokio_util::sync::CancellationToken;
use futures::future::Either;
use futures::stream::BoxStream;
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use tracing::debug;
use tracing::error_span;
use tracing::info;
use tracing::trace;
use tracing::Instrument;
use url::Url;
use crate::tracker_comms_http;
@ -22,7 +24,6 @@ pub struct TrackerComms {
peer_id: Id20,
stats: Box<dyn TorrentStatsProvider>,
force_tracker_interval: Option<Duration>,
cancellation_token: CancellationToken,
tx: Sender,
tcp_listen_port: Option<u16>,
}
@ -57,6 +58,11 @@ impl TorrentStatsProvider for () {
type Sender = tokio::sync::mpsc::Sender<SocketAddr>;
enum SupportedTracker {
Udp(Url),
Http(Url),
}
impl TrackerComms {
pub fn start(
info_hash: Id20,
@ -64,69 +70,94 @@ impl TrackerComms {
trackers: Vec<String>,
stats: Box<dyn TorrentStatsProvider>,
force_interval: Option<Duration>,
cancellation_token: CancellationToken,
tcp_listen_port: Option<u16>,
) -> Option<impl Stream<Item = SocketAddr> + Send + Sync + Unpin + 'static> {
let (tx, rx) = tokio::sync::mpsc::channel::<SocketAddr>(16);
let comms = Arc::new(Self {
info_hash,
peer_id,
stats,
force_tracker_interval: force_interval,
cancellation_token,
tx,
tcp_listen_port,
});
let mut added = false;
for tracker in trackers {
if let Err(e) = comms.clone().add_tracker(&tracker) {
info!(tracker = tracker, "error adding tracker: {:#}", e)
} else {
added = true;
}
}
if !added {
) -> Option<BoxStream<'static, SocketAddr>> {
let trackers = trackers
.into_iter()
.filter_map(|t| match Url::parse(&t) {
Ok(parsed) => match parsed.scheme() {
"http" | "https" => Some(SupportedTracker::Http(parsed)),
"udp" => Some(SupportedTracker::Udp(parsed)),
_ => {
debug!("unsuppoted tracker URL: {}", t);
None
}
},
Err(e) => {
debug!("error parsing tracker URL {}: {}", t, e);
None
}
})
.collect::<Vec<_>>();
if trackers.is_empty() {
return None;
}
Some(tokio_stream::wrappers::ReceiverStream::new(rx))
let (tx, mut rx) = tokio::sync::mpsc::channel::<SocketAddr>(16);
let s = async_stream::stream! {
use futures::StreamExt;
let comms = Arc::new(Self {
info_hash,
peer_id,
stats,
force_tracker_interval: force_interval,
tx,
tcp_listen_port,
});
let mut futures = FuturesUnordered::new();
for tracker in trackers {
futures.push(comms.add_tracker(tracker))
}
while !(futures.is_empty()) {
tokio::select! {
addr = rx.recv() => {
if let Some(addr) = addr {
yield addr;
}
}
e = futures.next(), if !futures.is_empty() => {
if let Some(Err(e)) = e {
debug!("error: {e}");
}
}
}
}
};
Some(s.boxed())
}
fn add_tracker(self: Arc<Self>, tracker: &str) -> anyhow::Result<()> {
if tracker.starts_with("http://") || tracker.starts_with("https://") {
spawn_with_cancel(
error_span!(
fn add_tracker(
&self,
url: SupportedTracker,
) -> Either<
impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send,
impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send,
> {
let info_hash = self.info_hash;
match url {
SupportedTracker::Udp(url) => {
let span = error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash);
self.task_single_tracker_monitor_udp(url)
.instrument(span)
.right_future()
}
SupportedTracker::Http(url) => {
let span = error_span!(
parent: None,
"http_tracker",
tracker = tracker,
info_hash = ?self.info_hash
),
self.cancellation_token.clone(),
{
let comms = self;
let url = Url::parse(tracker).context("can't parse URL")?;
async move { comms.task_single_tracker_monitor_http(url).await }
},
);
} else if tracker.starts_with("udp://") {
spawn_with_cancel(
error_span!(parent: None, "udp_tracker", tracker = tracker, info_hash = ?self.info_hash),
self.cancellation_token.clone(),
{
let comms = self;
let url = Url::parse(tracker).context("can't parse URL")?;
async move { comms.task_single_tracker_monitor_udp(url).await }
},
);
} else {
bail!("unsupported tracker url {}", tracker)
tracker = %url,
info_hash = ?info_hash
);
self.task_single_tracker_monitor_http(url)
.instrument(span)
.left_future()
}
}
Ok(())
}
async fn task_single_tracker_monitor_http(
self: Arc<Self>,
mut tracker_url: Url,
) -> anyhow::Result<()> {
async fn task_single_tracker_monitor_http(&self, mut tracker_url: Url) -> anyhow::Result<()> {
let mut event = Some(tracker_comms_http::TrackerRequestEvent::Started);
loop {
let stats = self.stats.get();

View file

@ -12,9 +12,9 @@ const ACTION_ANNOUNCE: u32 = 1;
// const ACTION_ERROR: u32 = 3;
pub const EVENT_NONE: u32 = 0;
pub const EVENT_COMPLETED: u32 = 1;
pub const EVENT_STARTED: u32 = 2;
pub const EVENT_STOPPED: u32 = 3;
// pub const EVENT_COMPLETED: u32 = 1;
// pub const EVENT_STARTED: u32 = 2;
// pub const EVENT_STOPPED: u32 = 3;
pub type ConnectionId = u64;
const CONNECTION_ID_MAGIC: ConnectionId = 0x41727101980;
@ -293,7 +293,7 @@ mod tests {
Response::Connect(connection_id) => {
dbg!(connection_id)
}
other => panic!("unexpected response {other:?}"),
other => panic!("unexpected response {:?}", other),
};
let hash = Id20::from_str("775459190aa65566591634203f8d9f17d341f969").unwrap();
@ -335,7 +335,7 @@ mod tests {
Response::Announce(r) => {
dbg!(r);
}
other => panic!("unexpected response {other:?}"),
other => panic!("unexpected response {:?}", other),
}
}
}