diff --git a/Cargo.lock b/Cargo.lock index 13d5a3e..037062c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1497,9 +1497,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1507,9 +1507,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" @@ -1535,15 +1535,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1552,21 +1552,27 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -1854,6 +1860,27 @@ dependencies = [ "system-deps", ] +[[package]] +name = "governor" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0746aa765db78b521451ef74221663b57ba595bf83f75d0ce23cc09447c8139f" +dependencies = [ + "cfg-if", + "dashmap 6.1.0", + "futures-sink", + "futures-timer", + "futures-util", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta", + "rand 0.8.5", + "smallvec", + "spinning_top", +] + [[package]] name = "gtk" version = "0.18.1" @@ -2590,10 +2617,10 @@ dependencies = [ "console-subscriber", "dashmap 6.1.0", "futures", + "governor", "hex 0.4.3", "http", "itertools 0.13.0", - "leaky-bucket", "librqbit-bencode", "librqbit-buffers", "librqbit-clone-to-owned", @@ -3094,6 +3121,12 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nodrop" version = "0.1.14" @@ -3110,6 +3143,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "notify" version = "6.1.1" @@ -3944,6 +3983,21 @@ dependencies = [ "prost", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.32.0" @@ -4107,6 +4161,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "raw-window-handle" version = "0.6.2" @@ -4861,6 +4924,15 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.7.3" diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 825d8d4..852b10f 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -61,7 +61,7 @@ tokio = { version = "1", features = [ "fs", "io-util", ] } -leaky-bucket = "1.1" +governor = "0.7" console-subscriber = { version = "0.4", optional = true } axum = { version = "0.7", optional = true } tower-http = { version = "0.5", features = ["cors", "trace"], optional = true } diff --git a/crates/librqbit/src/limits.rs b/crates/librqbit/src/limits.rs index 4fd9be8..e16a9a8 100644 --- a/crates/librqbit/src/limits.rs +++ b/crates/librqbit/src/limits.rs @@ -1,49 +1,42 @@ -use std::sync::Arc; -use std::time::Duration; - -use leaky_bucket::RateLimiter; +use governor::DefaultDirectRateLimiter as RateLimiter; +use governor::Quota; use parking_lot::RwLock; -use peer_binary_protocol::PIECE_MESSAGE_DEFAULT_LEN; use serde::Deserialize; use serde::Serialize; +use std::num::NonZero; +use std::num::NonZeroU32; +use std::sync::Arc; #[derive(Default, Serialize, Deserialize, Clone, Copy)] pub struct LimitsConfig { - pub upload_bps: Option, - pub download_bps: Option, + pub upload_bps: Option>, + pub download_bps: Option>, } -struct Limit(RwLock>>); +struct Limit(RwLock>>); impl Limit { - fn new_inner(bps: Option) -> Arc> { + fn new_inner(bps: Option>) -> Arc> { let bps = match bps { Some(bps) => bps, None => return Arc::new(None), }; - let b_per_100_ms = bps.div_ceil(10); - Arc::new(Some( - RateLimiter::builder() - .interval(Duration::from_millis(100)) - .refill(b_per_100_ms) - // whatever the limit is, we need to be able to download / upload a chunk - .max(PIECE_MESSAGE_DEFAULT_LEN.max(bps)) - .build(), - )) + Arc::new(Some(RateLimiter::direct(Quota::per_second(bps)))) } - fn new(bps: Option) -> Self { + fn new(bps: Option>) -> Self { Self(RwLock::new(Self::new_inner(bps))) } - async fn acquire(&self, size: usize) { + async fn acquire(&self, size: NonZero) -> anyhow::Result<()> { let lim = self.0.read().clone(); if let Some(rl) = lim.as_ref() { - rl.acquire(size).await + rl.until_n_ready(size).await?; } + Ok(()) } - fn set(&self, limit: Option) { + fn set(&self, limit: Option>) { let new = Self::new_inner(limit); *self.0.write() = new; } @@ -62,19 +55,19 @@ impl Limits { } } - pub async fn prepare_for_upload(&self, len: usize) { + pub async fn prepare_for_upload(&self, len: NonZero) -> anyhow::Result<()> { self.up.acquire(len).await } - pub async fn prepare_for_download(&self, len: usize) { + pub async fn prepare_for_download(&self, len: NonZero) -> anyhow::Result<()> { self.down.acquire(len).await } - pub fn set_upload_bps(&self, bps: Option) { + pub fn set_upload_bps(&self, bps: Option>) { self.up.set(bps); } - pub fn set_download_bps(&self, bps: Option) { + pub fn set_download_bps(&self, bps: Option) { self.down.set(bps); } } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 8ff5ebe..980455f 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -46,6 +46,7 @@ pub mod stats; use std::{ collections::{HashMap, HashSet}, net::SocketAddr, + num::NonZero, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, @@ -416,12 +417,14 @@ impl TorrentStateLive { )>, ) -> anyhow::Result<()> { while let Some((tx, ci)) = rx.recv().await { - self.ratelimits.prepare_for_upload(ci.size as usize).await; + self.ratelimits + .prepare_for_upload(NonZero::new(ci.size).unwrap()) + .await?; if let Some(session) = self.torrent.session.upgrade() { session .ratelimits - .prepare_for_upload(ci.size as usize) - .await; + .prepare_for_upload(NonZero::new(ci.size).unwrap()) + .await?; } let _ = tx.send(WriterRequest::ReadChunkRequest(ci)); } @@ -1446,14 +1449,14 @@ impl PeerHandler { self.state .ratelimits - .prepare_for_download(request.length as usize) - .await; + .prepare_for_download(NonZero::new(request.length).unwrap()) + .await?; if let Some(session) = self.state.torrent().session.upgrade() { session .ratelimits - .prepare_for_download(request.length as usize) - .await; + .prepare_for_download(NonZero::new(request.length).unwrap()) + .await?; } loop {