Replace leaky_bucket with governor crate

This commit is contained in:
Igor Katson 2024-11-16 10:55:33 +00:00
parent 3924197461
commit 1dbdeb5bbe
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 117 additions and 49 deletions

View file

@ -1,49 +1,42 @@
use std::sync::Arc;
use std::time::Duration;
use leaky_bucket::RateLimiter;
use governor::DefaultDirectRateLimiter as RateLimiter;
use governor::Quota;
use parking_lot::RwLock;
use peer_binary_protocol::PIECE_MESSAGE_DEFAULT_LEN;
use serde::Deserialize;
use serde::Serialize;
use std::num::NonZero;
use std::num::NonZeroU32;
use std::sync::Arc;
#[derive(Default, Serialize, Deserialize, Clone, Copy)]
pub struct LimitsConfig {
pub upload_bps: Option<usize>,
pub download_bps: Option<usize>,
pub upload_bps: Option<NonZero<u32>>,
pub download_bps: Option<NonZero<u32>>,
}
struct Limit(RwLock<Arc<Option<leaky_bucket::RateLimiter>>>);
struct Limit(RwLock<Arc<Option<RateLimiter>>>);
impl Limit {
fn new_inner(bps: Option<usize>) -> Arc<Option<leaky_bucket::RateLimiter>> {
fn new_inner(bps: Option<NonZero<u32>>) -> Arc<Option<RateLimiter>> {
let bps = match bps {
Some(bps) => bps,
None => return Arc::new(None),
};
let b_per_100_ms = bps.div_ceil(10);
Arc::new(Some(
RateLimiter::builder()
.interval(Duration::from_millis(100))
.refill(b_per_100_ms)
// whatever the limit is, we need to be able to download / upload a chunk
.max(PIECE_MESSAGE_DEFAULT_LEN.max(bps))
.build(),
))
Arc::new(Some(RateLimiter::direct(Quota::per_second(bps))))
}
fn new(bps: Option<usize>) -> Self {
fn new(bps: Option<NonZero<u32>>) -> Self {
Self(RwLock::new(Self::new_inner(bps)))
}
async fn acquire(&self, size: usize) {
async fn acquire(&self, size: NonZero<u32>) -> anyhow::Result<()> {
let lim = self.0.read().clone();
if let Some(rl) = lim.as_ref() {
rl.acquire(size).await
rl.until_n_ready(size).await?;
}
Ok(())
}
fn set(&self, limit: Option<usize>) {
fn set(&self, limit: Option<NonZero<u32>>) {
let new = Self::new_inner(limit);
*self.0.write() = new;
}
@ -62,19 +55,19 @@ impl Limits {
}
}
pub async fn prepare_for_upload(&self, len: usize) {
pub async fn prepare_for_upload(&self, len: NonZero<u32>) -> anyhow::Result<()> {
self.up.acquire(len).await
}
pub async fn prepare_for_download(&self, len: usize) {
pub async fn prepare_for_download(&self, len: NonZero<u32>) -> anyhow::Result<()> {
self.down.acquire(len).await
}
pub fn set_upload_bps(&self, bps: Option<usize>) {
pub fn set_upload_bps(&self, bps: Option<NonZero<u32>>) {
self.up.set(bps);
}
pub fn set_download_bps(&self, bps: Option<usize>) {
pub fn set_download_bps(&self, bps: Option<NonZeroU32>) {
self.down.set(bps);
}
}

View file

@ -46,6 +46,7 @@ pub mod stats;
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZero,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
@ -416,12 +417,14 @@ impl TorrentStateLive {
)>,
) -> anyhow::Result<()> {
while let Some((tx, ci)) = rx.recv().await {
self.ratelimits.prepare_for_upload(ci.size as usize).await;
self.ratelimits
.prepare_for_upload(NonZero::new(ci.size).unwrap())
.await?;
if let Some(session) = self.torrent.session.upgrade() {
session
.ratelimits
.prepare_for_upload(ci.size as usize)
.await;
.prepare_for_upload(NonZero::new(ci.size).unwrap())
.await?;
}
let _ = tx.send(WriterRequest::ReadChunkRequest(ci));
}
@ -1446,14 +1449,14 @@ impl PeerHandler {
self.state
.ratelimits
.prepare_for_download(request.length as usize)
.await;
.prepare_for_download(NonZero::new(request.length).unwrap())
.await?;
if let Some(session) = self.state.torrent().session.upgrade() {
session
.ratelimits
.prepare_for_download(request.length as usize)
.await;
.prepare_for_download(NonZero::new(request.length).unwrap())
.await?;
}
loop {