From 25b309867b2083f1d6eb326aae5b38b2095bd386 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 7 Nov 2024 21:40:17 +0000 Subject: [PATCH 01/10] Initial impl of up/down ratelimits --- Cargo.lock | 1 + crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/lib.rs | 1 + crates/librqbit/src/limits.rs | 48 ++++++++++++++ crates/librqbit/src/session.rs | 10 +++ crates/librqbit/src/torrent_state/live/mod.rs | 62 +++++++++++++++++-- crates/librqbit/src/torrent_state/mod.rs | 2 + crates/rqbit/src/main.rs | 2 + 8 files changed, 121 insertions(+), 6 deletions(-) create mode 100644 crates/librqbit/src/limits.rs diff --git a/Cargo.lock b/Cargo.lock index 3bdbe5c..13d5a3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2593,6 +2593,7 @@ dependencies = [ "hex 0.4.3", "http", "itertools 0.13.0", + "leaky-bucket", "librqbit-bencode", "librqbit-buffers", "librqbit-clone-to-owned", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 1639863..825d8d4 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -61,6 +61,7 @@ tokio = { version = "1", features = [ "fs", "io-util", ] } +leaky-bucket = "1.1" console-subscriber = { version = "0.4", optional = true } axum = { version = "0.7", optional = true } tower-http = { version = "0.5", features = ["cors", "trace"], optional = true } diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 455381c..0eadf6f 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -51,6 +51,7 @@ mod file_ops; pub mod http_api; #[cfg(feature = "http-api")] pub mod http_api_client; +mod limits; mod merge_streams; mod peer_connection; mod peer_info_reader; diff --git a/crates/librqbit/src/limits.rs b/crates/librqbit/src/limits.rs new file mode 100644 index 0000000..5df402c --- /dev/null +++ b/crates/librqbit/src/limits.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use leaky_bucket::RateLimiter; +use peer_binary_protocol::PIECE_MESSAGE_DEFAULT_LEN; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Default, Serialize, Deserialize, Clone, Copy)] +pub struct LimitsConfig { + pub upload_bps: Option, + pub download_bps: Option, +} + +#[derive(Default)] +pub struct Limits { + down: Option, + up: Option, +} + +impl Limits { + pub fn new(config: LimitsConfig) -> Self { + let new = |bps: usize| -> RateLimiter { + let b_per_100_ms = bps.div_ceil(10); + RateLimiter::builder() + .interval(Duration::from_millis(100)) + .refill(b_per_100_ms) + // whatever the limit is, we need to be able to download / upload a chunk + .max(PIECE_MESSAGE_DEFAULT_LEN.max(bps)) + .build() + }; + Self { + down: config.download_bps.map(new), + up: config.upload_bps.map(new), + } + } + + pub async fn prepare_for_upload(&self, len: usize) { + if let Some(rl) = self.up.as_ref() { + rl.acquire(len).await; + } + } + + pub async fn prepare_for_download(&self, len: usize) { + if let Some(rl) = self.down.as_ref() { + rl.acquire(len).await; + } + } +} diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index edd24cd..ca9b923 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -13,6 +13,7 @@ use crate::{ bitv_factory::{BitVFactory, NonPersistentBitVFactory}, dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, file_info::FileInfo, + limits::{Limits, LimitsConfig}, merge_streams::merge_streams, peer_connection::PeerConnectionOptions, read_buf::ReadBuf, @@ -122,6 +123,8 @@ pub struct Session { root_span: Option, + pub(crate) ratelimits: Limits, + pub(crate) stats: SessionStats, #[cfg(feature = "disable-upload")] @@ -264,6 +267,9 @@ pub struct AddTorrentOptions { #[serde(default)] pub disable_trackers: bool, + #[serde(default)] + pub ratelimits: LimitsConfig, + /// Initial peers to start of with. pub initial_peers: Option>, @@ -421,6 +427,8 @@ pub struct SessionOptions { // the root span to use. If not set will be None. pub root_span: Option, + pub ratelimits: LimitsConfig, + #[cfg(feature = "disable-upload")] pub disable_upload: bool, } @@ -636,6 +644,7 @@ impl Session { concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new( opts.concurrent_init_limit.unwrap_or(3), )), + ratelimits: Limits::new(opts.ratelimits), #[cfg(feature = "disable-upload")] _disable_upload: opts.disable_upload, }); @@ -1179,6 +1188,7 @@ impl Session { allow_overwrite: opts.overwrite, output_folder, disk_write_queue: self.disk_write_tx.clone(), + ratelimits: opts.ratelimits, #[cfg(feature = "disable-upload")] _disable_upload: self._disable_upload, }, diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 220c637..8ff5ebe 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -83,6 +83,7 @@ use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ chunk_tracker::{ChunkMarkingResult, ChunkTracker, HaveNeededSelected}, file_ops::FileOps, + limits::Limits, peer_connection::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, @@ -201,6 +202,12 @@ pub struct TorrentStateLive { pub(crate) streams: Arc, have_broadcast_tx: tokio::sync::broadcast::Sender, + + ratelimit_upload_tx: tokio::sync::mpsc::UnboundedSender<( + tokio::sync::mpsc::UnboundedSender, + ChunkInfo, + )>, + ratelimits: Limits, } impl TorrentStateLive { @@ -238,6 +245,12 @@ impl TorrentStateLive { let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128); + let (ratelimit_upload_tx, ratelimit_upload_rx) = tokio::sync::mpsc::unbounded_channel::<( + tokio::sync::mpsc::UnboundedSender, + ChunkInfo, + )>(); + let ratelimits = Limits::new(paused.shared.options.ratelimits); + let state = Arc::new(TorrentStateLive { torrent: paused.shared.clone(), peers: PeerStates { @@ -272,6 +285,8 @@ impl TorrentStateLive { per_piece_locks: (0..lengths.total_pieces()) .map(|_| RwLock::new(())) .collect(), + ratelimit_upload_tx, + ratelimits, }); state.spawn( @@ -304,6 +319,11 @@ impl TorrentStateLive { error_span!(parent: state.torrent.span.clone(), "peer_adder"), state.clone().task_peer_adder(peer_queue_rx), ); + + state.spawn( + error_span!(parent: state.torrent.span.clone(), "upload_scheduler"), + state.clone().task_upload_scheduler(ratelimit_upload_rx), + ); Ok(state) } @@ -388,6 +408,26 @@ impl TorrentStateLive { Ok(()) } + async fn task_upload_scheduler( + self: Arc, + mut rx: tokio::sync::mpsc::UnboundedReceiver<( + tokio::sync::mpsc::UnboundedSender, + ChunkInfo, + )>, + ) -> anyhow::Result<()> { + while let Some((tx, ci)) = rx.recv().await { + self.ratelimits.prepare_for_upload(ci.size as usize).await; + if let Some(session) = self.torrent.session.upgrade() { + session + .ratelimits + .prepare_for_upload(ci.size as usize) + .await; + } + let _ = tx.send(WriterRequest::ReadChunkRequest(ci)); + } + Ok(()) + } + async fn task_manage_incoming_peer( self: Arc, checked_peer: CheckedIncomingConnection, @@ -1227,12 +1267,10 @@ impl PeerHandler { ); } - // TODO: this is not super efficient as it does copying multiple times. - // Theoretically, this could be done in the sending code, so that it reads straight into - // the send buffer. - let request = WriterRequest::ReadChunkRequest(chunk_info); - trace!("sending {:?}", &request); - Ok::<_, anyhow::Error>(self.tx.send(request)?) + self.state + .ratelimit_upload_tx + .send((self.tx.clone(), chunk_info))?; + Ok(()) } fn on_have(&self, have: u32) { @@ -1406,6 +1444,18 @@ impl PeerHandler { None => return Ok(()), }; + self.state + .ratelimits + .prepare_for_download(request.length as usize) + .await; + + if let Some(session) = self.state.torrent().session.upgrade() { + session + .ratelimits + .prepare_for_download(request.length as usize) + .await; + } + loop { match aframe!(tokio::time::timeout( Duration::from_secs(5), diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index cf8940a..4c16106 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -36,6 +36,7 @@ use tracing::trace; use tracing::warn; use crate::chunk_tracker::ChunkTracker; +use crate::limits::LimitsConfig; use crate::session::TorrentId; use crate::spawn_utils::BlockingSpawner; use crate::storage::BoxStorageFactory; @@ -103,6 +104,7 @@ pub(crate) struct ManagedTorrentOptions { pub allow_overwrite: bool, pub output_folder: PathBuf, pub disk_write_queue: Option, + pub ratelimits: LimitsConfig, #[cfg(feature = "disable-upload")] pub _disable_upload: bool, } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index a03c87c..9a0eb5b 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -480,6 +480,8 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> cancellation_token: Some(cancel.clone()), #[cfg(feature = "disable-upload")] disable_upload: opts.disable_upload, + // TODO: expose + ratelimits: Default::default(), }; let stats_printer = |session: Arc| async move { From 37810443df13550a018b9f853b77fff832322755 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 7 Nov 2024 21:55:31 +0000 Subject: [PATCH 02/10] Ability to set limits --- crates/librqbit/src/limits.rs | 70 +++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/crates/librqbit/src/limits.rs b/crates/librqbit/src/limits.rs index 5df402c..4fd9be8 100644 --- a/crates/librqbit/src/limits.rs +++ b/crates/librqbit/src/limits.rs @@ -1,6 +1,8 @@ +use std::sync::Arc; use std::time::Duration; use leaky_bucket::RateLimiter; +use parking_lot::RwLock; use peer_binary_protocol::PIECE_MESSAGE_DEFAULT_LEN; use serde::Deserialize; use serde::Serialize; @@ -11,38 +13,68 @@ pub struct LimitsConfig { pub download_bps: Option, } -#[derive(Default)] -pub struct Limits { - down: Option, - up: Option, -} +struct Limit(RwLock>>); -impl Limits { - pub fn new(config: LimitsConfig) -> Self { - let new = |bps: usize| -> RateLimiter { - let b_per_100_ms = bps.div_ceil(10); +impl Limit { + fn new_inner(bps: Option) -> Arc> { + let bps = match bps { + Some(bps) => bps, + None => return Arc::new(None), + }; + let b_per_100_ms = bps.div_ceil(10); + Arc::new(Some( RateLimiter::builder() .interval(Duration::from_millis(100)) .refill(b_per_100_ms) // whatever the limit is, we need to be able to download / upload a chunk .max(PIECE_MESSAGE_DEFAULT_LEN.max(bps)) - .build() - }; + .build(), + )) + } + + fn new(bps: Option) -> Self { + Self(RwLock::new(Self::new_inner(bps))) + } + + async fn acquire(&self, size: usize) { + let lim = self.0.read().clone(); + if let Some(rl) = lim.as_ref() { + rl.acquire(size).await + } + } + + fn set(&self, limit: Option) { + let new = Self::new_inner(limit); + *self.0.write() = new; + } +} + +pub struct Limits { + down: Limit, + up: Limit, +} + +impl Limits { + pub fn new(config: LimitsConfig) -> Self { Self { - down: config.download_bps.map(new), - up: config.upload_bps.map(new), + down: Limit::new(config.download_bps), + up: Limit::new(config.upload_bps), } } pub async fn prepare_for_upload(&self, len: usize) { - if let Some(rl) = self.up.as_ref() { - rl.acquire(len).await; - } + self.up.acquire(len).await } pub async fn prepare_for_download(&self, len: usize) { - if let Some(rl) = self.down.as_ref() { - rl.acquire(len).await; - } + self.down.acquire(len).await + } + + pub fn set_upload_bps(&self, bps: Option) { + self.up.set(bps); + } + + pub fn set_download_bps(&self, bps: Option) { + self.down.set(bps); } } From 39241974615231804a6ef77ae0db0738756920ba Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 7 Nov 2024 22:00:55 +0000 Subject: [PATCH 03/10] Ready to test session limits --- crates/librqbit/src/http_api.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index e7f302f..d89b080 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -21,9 +21,10 @@ use tokio::net::TcpListener; use tower_http::trace::{DefaultOnFailure, DefaultOnResponse, OnFailure}; use tracing::{debug, error_span, trace, Span}; -use axum::Router; +use axum::{Json, Router}; -use crate::api::{Api, ApiTorrentListOpts, TorrentIdOrHash}; +use crate::api::{Api, ApiTorrentListOpts, EmptyJsonResponse, TorrentIdOrHash}; +use crate::limits::LimitsConfig; use crate::peer_connection::PeerConnectionOptions; use crate::session::{AddTorrent, AddTorrentOptions, SUPPORTED_SCHEMES}; use crate::torrent_state::peer::stats::snapshot::PeerStatsFilter; @@ -490,6 +491,18 @@ impl HttpApi { Ok(axum::body::Body::from_stream(s)) } + async fn update_session_ratelimits( + State(state): State, + Json(limits): Json, + ) -> Result { + state.session().ratelimits.set_upload_bps(limits.upload_bps); + state + .session() + .ratelimits + .set_download_bps(limits.download_bps); + Ok(Json(EmptyJsonResponse {})) + } + let mut app = Router::new() .route("/", get(api_root)) .route("/stream_logs", get(stream_logs)) @@ -515,6 +528,7 @@ impl HttpApi { if !self.opts.read_only { app = app .route("/torrents", post(torrents_post)) + .route("/torrents/limits", post(update_session_ratelimits)) .route("/torrents/:id/pause", post(torrent_action_pause)) .route("/torrents/:id/start", post(torrent_action_start)) .route("/torrents/:id/forget", post(torrent_action_forget)) From 1dbdeb5bbe94916445259e3f3049cac2660d275c Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 16 Nov 2024 10:55:33 +0000 Subject: [PATCH 04/10] Replace leaky_bucket with governor crate --- Cargo.lock | 102 +++++++++++++++--- crates/librqbit/Cargo.toml | 2 +- crates/librqbit/src/limits.rs | 45 ++++---- crates/librqbit/src/torrent_state/live/mod.rs | 17 +-- 4 files changed, 117 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 13d5a3e..037062c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1497,9 +1497,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1507,9 +1507,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" @@ -1535,15 +1535,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1552,21 +1552,27 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -1854,6 +1860,27 @@ dependencies = [ "system-deps", ] +[[package]] +name = "governor" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0746aa765db78b521451ef74221663b57ba595bf83f75d0ce23cc09447c8139f" +dependencies = [ + "cfg-if", + "dashmap 6.1.0", + "futures-sink", + "futures-timer", + "futures-util", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta", + "rand 0.8.5", + "smallvec", + "spinning_top", +] + [[package]] name = "gtk" version = "0.18.1" @@ -2590,10 +2617,10 @@ dependencies = [ "console-subscriber", "dashmap 6.1.0", "futures", + "governor", "hex 0.4.3", "http", "itertools 0.13.0", - "leaky-bucket", "librqbit-bencode", "librqbit-buffers", "librqbit-clone-to-owned", @@ -3094,6 +3121,12 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nodrop" version = "0.1.14" @@ -3110,6 +3143,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "notify" version = "6.1.1" @@ -3944,6 +3983,21 @@ dependencies = [ "prost", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.32.0" @@ -4107,6 +4161,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "raw-window-handle" version = "0.6.2" @@ -4861,6 +4924,15 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.7.3" diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 825d8d4..852b10f 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -61,7 +61,7 @@ tokio = { version = "1", features = [ "fs", "io-util", ] } -leaky-bucket = "1.1" +governor = "0.7" console-subscriber = { version = "0.4", optional = true } axum = { version = "0.7", optional = true } tower-http = { version = "0.5", features = ["cors", "trace"], optional = true } diff --git a/crates/librqbit/src/limits.rs b/crates/librqbit/src/limits.rs index 4fd9be8..e16a9a8 100644 --- a/crates/librqbit/src/limits.rs +++ b/crates/librqbit/src/limits.rs @@ -1,49 +1,42 @@ -use std::sync::Arc; -use std::time::Duration; - -use leaky_bucket::RateLimiter; +use governor::DefaultDirectRateLimiter as RateLimiter; +use governor::Quota; use parking_lot::RwLock; -use peer_binary_protocol::PIECE_MESSAGE_DEFAULT_LEN; use serde::Deserialize; use serde::Serialize; +use std::num::NonZero; +use std::num::NonZeroU32; +use std::sync::Arc; #[derive(Default, Serialize, Deserialize, Clone, Copy)] pub struct LimitsConfig { - pub upload_bps: Option, - pub download_bps: Option, + pub upload_bps: Option>, + pub download_bps: Option>, } -struct Limit(RwLock>>); +struct Limit(RwLock>>); impl Limit { - fn new_inner(bps: Option) -> Arc> { + fn new_inner(bps: Option>) -> Arc> { let bps = match bps { Some(bps) => bps, None => return Arc::new(None), }; - let b_per_100_ms = bps.div_ceil(10); - Arc::new(Some( - RateLimiter::builder() - .interval(Duration::from_millis(100)) - .refill(b_per_100_ms) - // whatever the limit is, we need to be able to download / upload a chunk - .max(PIECE_MESSAGE_DEFAULT_LEN.max(bps)) - .build(), - )) + Arc::new(Some(RateLimiter::direct(Quota::per_second(bps)))) } - fn new(bps: Option) -> Self { + fn new(bps: Option>) -> Self { Self(RwLock::new(Self::new_inner(bps))) } - async fn acquire(&self, size: usize) { + async fn acquire(&self, size: NonZero) -> anyhow::Result<()> { let lim = self.0.read().clone(); if let Some(rl) = lim.as_ref() { - rl.acquire(size).await + rl.until_n_ready(size).await?; } + Ok(()) } - fn set(&self, limit: Option) { + fn set(&self, limit: Option>) { let new = Self::new_inner(limit); *self.0.write() = new; } @@ -62,19 +55,19 @@ impl Limits { } } - pub async fn prepare_for_upload(&self, len: usize) { + pub async fn prepare_for_upload(&self, len: NonZero) -> anyhow::Result<()> { self.up.acquire(len).await } - pub async fn prepare_for_download(&self, len: usize) { + pub async fn prepare_for_download(&self, len: NonZero) -> anyhow::Result<()> { self.down.acquire(len).await } - pub fn set_upload_bps(&self, bps: Option) { + pub fn set_upload_bps(&self, bps: Option>) { self.up.set(bps); } - pub fn set_download_bps(&self, bps: Option) { + pub fn set_download_bps(&self, bps: Option) { self.down.set(bps); } } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 8ff5ebe..980455f 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -46,6 +46,7 @@ pub mod stats; use std::{ collections::{HashMap, HashSet}, net::SocketAddr, + num::NonZero, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, @@ -416,12 +417,14 @@ impl TorrentStateLive { )>, ) -> anyhow::Result<()> { while let Some((tx, ci)) = rx.recv().await { - self.ratelimits.prepare_for_upload(ci.size as usize).await; + self.ratelimits + .prepare_for_upload(NonZero::new(ci.size).unwrap()) + .await?; if let Some(session) = self.torrent.session.upgrade() { session .ratelimits - .prepare_for_upload(ci.size as usize) - .await; + .prepare_for_upload(NonZero::new(ci.size).unwrap()) + .await?; } let _ = tx.send(WriterRequest::ReadChunkRequest(ci)); } @@ -1446,14 +1449,14 @@ impl PeerHandler { self.state .ratelimits - .prepare_for_download(request.length as usize) - .await; + .prepare_for_download(NonZero::new(request.length).unwrap()) + .await?; if let Some(session) = self.state.torrent().session.upgrade() { session .ratelimits - .prepare_for_download(request.length as usize) - .await; + .prepare_for_download(NonZero::new(request.length).unwrap()) + .await?; } loop { From 855e7ccaeb33e422c24e37e701ddca49e418a9a9 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 16 Nov 2024 10:58:26 +0000 Subject: [PATCH 05/10] Swap Arc