Merge pull request #275 from ikatson/updown-ratelimits

undefined
This commit is contained in:
Igor Katson 2024-11-20 16:26:57 +00:00 committed by GitHub
commit 09c9659b88
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 310 additions and 23 deletions

108
Cargo.lock generated
View file

@ -129,6 +129,12 @@ version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "assert_cfg"
version = "0.1.0"
@ -1497,9 +1503,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
@ -1507,9 +1513,9 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
@ -1535,15 +1541,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
@ -1552,21 +1558,27 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-timer"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]]
name = "futures-util"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
@ -1854,6 +1866,27 @@ dependencies = [
"system-deps",
]
[[package]]
name = "governor"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0746aa765db78b521451ef74221663b57ba595bf83f75d0ce23cc09447c8139f"
dependencies = [
"cfg-if",
"dashmap 6.1.0",
"futures-sink",
"futures-timer",
"futures-util",
"no-std-compat",
"nonzero_ext",
"parking_lot",
"portable-atomic",
"quanta",
"rand 0.8.5",
"smallvec",
"spinning_top",
]
[[package]]
name = "gtk"
version = "0.18.1"
@ -2577,6 +2610,7 @@ name = "librqbit"
version = "7.1.0-beta.1"
dependencies = [
"anyhow",
"arc-swap",
"async-backtrace",
"async-stream",
"async-trait",
@ -2590,6 +2624,7 @@ dependencies = [
"console-subscriber",
"dashmap 6.1.0",
"futures",
"governor",
"hex 0.4.3",
"http",
"itertools 0.13.0",
@ -3093,6 +3128,12 @@ version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086"
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nodrop"
version = "0.1.14"
@ -3109,6 +3150,12 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "notify"
version = "6.1.1"
@ -3943,6 +3990,21 @@ dependencies = [
"prost",
]
[[package]]
name = "quanta"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quick-xml"
version = "0.32.0"
@ -4106,6 +4168,15 @@ dependencies = [
"rand_core 0.5.1",
]
[[package]]
name = "raw-cpuid"
version = "11.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0"
dependencies = [
"bitflags 2.6.0",
]
[[package]]
name = "raw-window-handle"
version = "0.6.2"
@ -4860,6 +4931,15 @@ dependencies = [
"lock_api",
]
[[package]]
name = "spinning_top"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.7.3"

View file

@ -61,6 +61,7 @@ tokio = { version = "1", features = [
"fs",
"io-util",
] }
governor = "0.7"
console-subscriber = { version = "0.4", optional = true }
axum = { version = "0.7", optional = true }
tower-http = { version = "0.5", features = ["cors", "trace"], optional = true }
@ -109,6 +110,7 @@ async-trait = "0.1.81"
async-backtrace = { version = "0.2", optional = true }
notify = { version = "6.1.1", optional = true }
walkdir = "2.5.0"
arc-swap = "1.7.1"
[build-dependencies]
anyhow = "1"

View file

@ -21,9 +21,10 @@ use tokio::net::TcpListener;
use tower_http::trace::{DefaultOnFailure, DefaultOnResponse, OnFailure};
use tracing::{debug, error_span, trace, Span};
use axum::Router;
use axum::{Json, Router};
use crate::api::{Api, ApiTorrentListOpts, TorrentIdOrHash};
use crate::api::{Api, ApiTorrentListOpts, EmptyJsonResponse, TorrentIdOrHash};
use crate::limits::LimitsConfig;
use crate::peer_connection::PeerConnectionOptions;
use crate::session::{AddTorrent, AddTorrentOptions, SUPPORTED_SCHEMES};
use crate::torrent_state::peer::stats::snapshot::PeerStatsFilter;
@ -490,6 +491,18 @@ impl HttpApi {
Ok(axum::body::Body::from_stream(s))
}
async fn update_session_ratelimits(
State(state): State<ApiState>,
Json(limits): Json<LimitsConfig>,
) -> Result<impl IntoResponse> {
state.session().ratelimits.set_upload_bps(limits.upload_bps);
state
.session()
.ratelimits
.set_download_bps(limits.download_bps);
Ok(Json(EmptyJsonResponse {}))
}
let mut app = Router::new()
.route("/", get(api_root))
.route("/stream_logs", get(stream_logs))
@ -515,6 +528,7 @@ impl HttpApi {
if !self.opts.read_only {
app = app
.route("/torrents", post(torrents_post))
.route("/torrents/limits", post(update_session_ratelimits))
.route("/torrents/:id/pause", post(torrent_action_pause))
.route("/torrents/:id/start", post(torrent_action_start))
.route("/torrents/:id/forget", post(torrent_action_forget))

View file

@ -51,6 +51,7 @@ mod file_ops;
pub mod http_api;
#[cfg(feature = "http-api")]
pub mod http_api_client;
pub mod limits;
mod merge_streams;
mod peer_connection;
mod peer_info_reader;

View file

@ -0,0 +1,69 @@
use arc_swap::ArcSwapOption;
use governor::DefaultDirectRateLimiter as RateLimiter;
use governor::Quota;
use serde::Deserialize;
use serde::Serialize;
use std::num::NonZeroU32;
use std::sync::Arc;
#[derive(Default, Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
pub struct LimitsConfig {
pub upload_bps: Option<NonZeroU32>,
pub download_bps: Option<NonZeroU32>,
}
struct Limit(ArcSwapOption<RateLimiter>);
impl Limit {
fn new_inner(bps: Option<NonZeroU32>) -> Option<Arc<RateLimiter>> {
let bps = bps?;
Some(Arc::new(RateLimiter::direct(Quota::per_second(bps))))
}
fn new(bps: Option<NonZeroU32>) -> Self {
Self(ArcSwapOption::new(Self::new_inner(bps)))
}
async fn acquire(&self, size: NonZeroU32) -> anyhow::Result<()> {
let lim = self.0.load().clone();
if let Some(rl) = lim.as_ref() {
rl.until_n_ready(size).await?;
}
Ok(())
}
fn set(&self, limit: Option<NonZeroU32>) {
let new = Self::new_inner(limit);
self.0.swap(new);
}
}
pub struct Limits {
down: Limit,
up: Limit,
}
impl Limits {
pub fn new(config: LimitsConfig) -> Self {
Self {
down: Limit::new(config.download_bps),
up: Limit::new(config.upload_bps),
}
}
pub async fn prepare_for_upload(&self, len: NonZeroU32) -> anyhow::Result<()> {
self.up.acquire(len).await
}
pub async fn prepare_for_download(&self, len: NonZeroU32) -> anyhow::Result<()> {
self.down.acquire(len).await
}
pub fn set_upload_bps(&self, bps: Option<NonZeroU32>) {
self.up.set(bps);
}
pub fn set_download_bps(&self, bps: Option<NonZeroU32>) {
self.down.set(bps);
}
}

View file

@ -13,6 +13,7 @@ use crate::{
bitv_factory::{BitVFactory, NonPersistentBitVFactory},
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
file_info::FileInfo,
limits::{Limits, LimitsConfig},
merge_streams::merge_streams,
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
@ -122,6 +123,8 @@ pub struct Session {
root_span: Option<Span>,
pub(crate) ratelimits: Limits,
pub(crate) stats: SessionStats,
#[cfg(feature = "disable-upload")]
@ -264,6 +267,9 @@ pub struct AddTorrentOptions {
#[serde(default)]
pub disable_trackers: bool,
#[serde(default)]
pub ratelimits: LimitsConfig,
/// Initial peers to start of with.
pub initial_peers: Option<Vec<SocketAddr>>,
@ -421,6 +427,8 @@ pub struct SessionOptions {
// the root span to use. If not set will be None.
pub root_span: Option<Span>,
pub ratelimits: LimitsConfig,
#[cfg(feature = "disable-upload")]
pub disable_upload: bool,
}
@ -636,6 +644,7 @@ impl Session {
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(
opts.concurrent_init_limit.unwrap_or(3),
)),
ratelimits: Limits::new(opts.ratelimits),
#[cfg(feature = "disable-upload")]
_disable_upload: opts.disable_upload,
});
@ -1179,6 +1188,7 @@ impl Session {
allow_overwrite: opts.overwrite,
output_folder,
disk_write_queue: self.disk_write_tx.clone(),
ratelimits: opts.ratelimits,
#[cfg(feature = "disable-upload")]
_disable_upload: self._disable_upload,
},

View file

@ -46,6 +46,7 @@ pub mod stats;
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZeroU32,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
@ -83,6 +84,7 @@ use tracing::{debug, error, error_span, info, trace, warn, Instrument};
use crate::{
chunk_tracker::{ChunkMarkingResult, ChunkTracker, HaveNeededSelected},
file_ops::FileOps,
limits::Limits,
peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
},
@ -201,6 +203,12 @@ pub struct TorrentStateLive {
pub(crate) streams: Arc<TorrentStreams>,
have_broadcast_tx: tokio::sync::broadcast::Sender<ValidPieceIndex>,
ratelimit_upload_tx: tokio::sync::mpsc::UnboundedSender<(
tokio::sync::mpsc::UnboundedSender<WriterRequest>,
ChunkInfo,
)>,
ratelimits: Limits,
}
impl TorrentStateLive {
@ -238,6 +246,12 @@ impl TorrentStateLive {
let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128);
let (ratelimit_upload_tx, ratelimit_upload_rx) = tokio::sync::mpsc::unbounded_channel::<(
tokio::sync::mpsc::UnboundedSender<WriterRequest>,
ChunkInfo,
)>();
let ratelimits = Limits::new(paused.shared.options.ratelimits);
let state = Arc::new(TorrentStateLive {
torrent: paused.shared.clone(),
peers: PeerStates {
@ -272,6 +286,8 @@ impl TorrentStateLive {
per_piece_locks: (0..lengths.total_pieces())
.map(|_| RwLock::new(()))
.collect(),
ratelimit_upload_tx,
ratelimits,
});
state.spawn(
@ -304,6 +320,11 @@ impl TorrentStateLive {
error_span!(parent: state.torrent.span.clone(), "peer_adder"),
state.clone().task_peer_adder(peer_queue_rx),
);
state.spawn(
error_span!(parent: state.torrent.span.clone(), "upload_scheduler"),
state.clone().task_upload_scheduler(ratelimit_upload_rx),
);
Ok(state)
}
@ -388,6 +409,28 @@ impl TorrentStateLive {
Ok(())
}
async fn task_upload_scheduler(
self: Arc<Self>,
mut rx: tokio::sync::mpsc::UnboundedReceiver<(
tokio::sync::mpsc::UnboundedSender<WriterRequest>,
ChunkInfo,
)>,
) -> anyhow::Result<()> {
while let Some((tx, ci)) = rx.recv().await {
self.ratelimits
.prepare_for_upload(NonZeroU32::new(ci.size).unwrap())
.await?;
if let Some(session) = self.torrent.session.upgrade() {
session
.ratelimits
.prepare_for_upload(NonZeroU32::new(ci.size).unwrap())
.await?;
}
let _ = tx.send(WriterRequest::ReadChunkRequest(ci));
}
Ok(())
}
async fn task_manage_incoming_peer(
self: Arc<Self>,
checked_peer: CheckedIncomingConnection,
@ -1227,12 +1270,10 @@ impl PeerHandler {
);
}
// TODO: this is not super efficient as it does copying multiple times.
// Theoretically, this could be done in the sending code, so that it reads straight into
// the send buffer.
let request = WriterRequest::ReadChunkRequest(chunk_info);
trace!("sending {:?}", &request);
Ok::<_, anyhow::Error>(self.tx.send(request)?)
self.state
.ratelimit_upload_tx
.send((self.tx.clone(), chunk_info))?;
Ok(())
}
fn on_have(&self, have: u32) {
@ -1406,6 +1447,18 @@ impl PeerHandler {
None => return Ok(()),
};
self.state
.ratelimits
.prepare_for_download(NonZeroU32::new(request.length).unwrap())
.await?;
if let Some(session) = self.state.torrent().session.upgrade() {
session
.ratelimits
.prepare_for_download(NonZeroU32::new(request.length).unwrap())
.await?;
}
loop {
match aframe!(tokio::time::timeout(
Duration::from_secs(5),

View file

@ -36,6 +36,7 @@ use tracing::trace;
use tracing::warn;
use crate::chunk_tracker::ChunkTracker;
use crate::limits::LimitsConfig;
use crate::session::TorrentId;
use crate::spawn_utils::BlockingSpawner;
use crate::storage::BoxStorageFactory;
@ -103,6 +104,7 @@ pub(crate) struct ManagedTorrentOptions {
pub allow_overwrite: bool,
pub output_folder: PathBuf,
pub disk_write_queue: Option<DiskWorkQueueSender>,
pub ratelimits: LimitsConfig,
#[cfg(feature = "disable-upload")]
pub _disable_upload: bool,
}

View file

@ -1,6 +1,7 @@
use std::{
io,
net::SocketAddr,
num::NonZeroU32,
path::{Path, PathBuf},
sync::Arc,
thread,
@ -14,6 +15,7 @@ use librqbit::{
api::ApiAddTorrentResponse,
http_api::{HttpApi, HttpApiOptions},
http_api_client, librqbit_spawn,
limits::LimitsConfig,
storage::{
filesystem::{FilesystemStorageFactory, MmapFilesystemStorageFactory},
StorageFactory, StorageFactoryExt,
@ -218,6 +220,14 @@ struct Opts {
#[cfg(feature = "disable-upload")]
#[arg(long, env = "RQBIT_DISABLE_UPLOAD")]
disable_upload: bool,
/// Limit download to bytes-per-second.
#[arg(long = "ratelimit-download", env = "RQBIT_RATELIMIT_DOWNLOAD")]
ratelimit_download_bps: Option<NonZeroU32>,
/// Limit upload to bytes-per-second.
#[arg(long = "ratelimit-upload", env = "RQBIT_RATELIMIT_UPLOAD")]
ratelimit_upload_bps: Option<NonZeroU32>,
}
#[derive(Parser)]
@ -480,6 +490,10 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()>
cancellation_token: Some(cancel.clone()),
#[cfg(feature = "disable-upload")]
disable_upload: opts.disable_upload,
ratelimits: LimitsConfig {
upload_bps: opts.ratelimit_upload_bps,
download_bps: opts.ratelimit_download_bps,
},
};
let stats_printer = |session: Arc<Session>| async move {

View file

@ -4,7 +4,7 @@ use std::{
time::Duration,
};
use librqbit::dht::PersistentDht;
use librqbit::{dht::PersistentDht, limits::LimitsConfig};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
@ -154,6 +154,9 @@ pub struct RqbitDesktopConfig {
pub persistence: RqbitDesktopConfigPersistence,
pub peer_opts: RqbitDesktopConfigPeerOpts,
pub http_api: RqbitDesktopConfigHttpApi,
#[serde(default)]
pub ratelimits: LimitsConfig,
}
impl Default for RqbitDesktopConfig {
@ -172,6 +175,7 @@ impl Default for RqbitDesktopConfig {
persistence: Default::default(),
peer_opts: Default::default(),
http_api: Default::default(),
ratelimits: Default::default(),
#[cfg(feature = "disable-upload")]
disable_upload: false,
}

View file

@ -105,6 +105,7 @@ async fn api_from_config(
},
enable_upnp_port_forwarding: !config.upnp.disable_tcp_port_forward,
fastresume: config.persistence.fastresume,
ratelimits: config.ratelimits,
#[cfg(feature = "disable-upload")]
disable_upload: config.disable_upload,
..Default::default()

View file

@ -39,6 +39,11 @@ interface RqbitDesktopConfigUpnp {
server_friendly_name: string;
}
export interface LimitsConfig {
upload_bps?: number | null;
download_bps?: number | null;
}
export interface RqbitDesktopConfig {
default_download_location: PathLike;
disable_upload?: boolean;
@ -48,6 +53,7 @@ export interface RqbitDesktopConfig {
persistence: RqbitDesktopConfigPersistence;
peer_opts: RqbitDesktopConfigPeerOpts;
http_api: RqbitDesktopConfigHttpApi;
ratelimits: LimitsConfig;
}
export interface CurrentDesktopState {

View file

@ -10,6 +10,7 @@ import { Modal } from "rqbit-webui/src/components/modal/Modal";
import { Fieldset } from "rqbit-webui/src/components/forms/Fieldset";
import { ModalFooter } from "rqbit-webui/src/components/modal/ModalFooter";
import { Button } from "rqbit-webui/src/components/buttons/Button";
import { formatBytes } from "rqbit-webui/src/helper/formatBytes";
const FormCheck: React.FC<{
label: string;
@ -344,6 +345,36 @@ Might be useful e.g. if rqbit upload consumes all your upload bandwidth and inte
onChange={handleToggleChange}
help="If enabled, restarting will not rehash torrents, and thus will be faster. You should not modify the downloaded files in any way if you use that."
/>
<FormInput
label="Download rate limit"
name="ratelimits.download_bps"
inputType="number"
value={config.ratelimits.download_bps ?? ""}
onChange={handleInputChange}
help={`Limit total download speed to this number of bytes per second (${
(config.ratelimits.download_bps ?? 0) > 0
? "current " +
formatBytes(config.ratelimits.download_bps ?? 0) +
" per second"
: "currently disabled"
})`}
/>
<FormInput
label="Upload rate limit"
name="ratelimits.upload_bps"
inputType="number"
value={config.ratelimits.upload_bps ?? ""}
onChange={handleInputChange}
help={`Limit total upload speed to this number of bytes per second (${
(config.ratelimits.upload_bps ?? 0) > 0
? "current " +
formatBytes(config.ratelimits.upload_bps ?? 0) +
" per second"
: "currently disabled"
})`}
/>
</Fieldset>
</Tab>