Initial impl of up/down ratelimits

This commit is contained in:
Igor Katson 2024-11-07 21:40:17 +00:00
parent 616498f500
commit 25b309867b
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
8 changed files with 121 additions and 6 deletions

1
Cargo.lock generated
View file

@ -2593,6 +2593,7 @@ dependencies = [
"hex 0.4.3",
"http",
"itertools 0.13.0",
"leaky-bucket",
"librqbit-bencode",
"librqbit-buffers",
"librqbit-clone-to-owned",

View file

@ -61,6 +61,7 @@ tokio = { version = "1", features = [
"fs",
"io-util",
] }
leaky-bucket = "1.1"
console-subscriber = { version = "0.4", optional = true }
axum = { version = "0.7", optional = true }
tower-http = { version = "0.5", features = ["cors", "trace"], optional = true }

View file

@ -51,6 +51,7 @@ mod file_ops;
pub mod http_api;
#[cfg(feature = "http-api")]
pub mod http_api_client;
mod limits;
mod merge_streams;
mod peer_connection;
mod peer_info_reader;

View file

@ -0,0 +1,48 @@
use std::time::Duration;
use leaky_bucket::RateLimiter;
use peer_binary_protocol::PIECE_MESSAGE_DEFAULT_LEN;
use serde::Deserialize;
use serde::Serialize;
#[derive(Default, Serialize, Deserialize, Clone, Copy)]
pub struct LimitsConfig {
pub upload_bps: Option<usize>,
pub download_bps: Option<usize>,
}
#[derive(Default)]
pub struct Limits {
down: Option<leaky_bucket::RateLimiter>,
up: Option<leaky_bucket::RateLimiter>,
}
impl Limits {
pub fn new(config: LimitsConfig) -> Self {
let new = |bps: usize| -> RateLimiter {
let b_per_100_ms = bps.div_ceil(10);
RateLimiter::builder()
.interval(Duration::from_millis(100))
.refill(b_per_100_ms)
// whatever the limit is, we need to be able to download / upload a chunk
.max(PIECE_MESSAGE_DEFAULT_LEN.max(bps))
.build()
};
Self {
down: config.download_bps.map(new),
up: config.upload_bps.map(new),
}
}
pub async fn prepare_for_upload(&self, len: usize) {
if let Some(rl) = self.up.as_ref() {
rl.acquire(len).await;
}
}
pub async fn prepare_for_download(&self, len: usize) {
if let Some(rl) = self.down.as_ref() {
rl.acquire(len).await;
}
}
}

View file

@ -13,6 +13,7 @@ use crate::{
bitv_factory::{BitVFactory, NonPersistentBitVFactory},
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
file_info::FileInfo,
limits::{Limits, LimitsConfig},
merge_streams::merge_streams,
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
@ -122,6 +123,8 @@ pub struct Session {
root_span: Option<Span>,
pub(crate) ratelimits: Limits,
pub(crate) stats: SessionStats,
#[cfg(feature = "disable-upload")]
@ -264,6 +267,9 @@ pub struct AddTorrentOptions {
#[serde(default)]
pub disable_trackers: bool,
#[serde(default)]
pub ratelimits: LimitsConfig,
/// Initial peers to start of with.
pub initial_peers: Option<Vec<SocketAddr>>,
@ -421,6 +427,8 @@ pub struct SessionOptions {
// the root span to use. If not set will be None.
pub root_span: Option<Span>,
pub ratelimits: LimitsConfig,
#[cfg(feature = "disable-upload")]
pub disable_upload: bool,
}
@ -636,6 +644,7 @@ impl Session {
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(
opts.concurrent_init_limit.unwrap_or(3),
)),
ratelimits: Limits::new(opts.ratelimits),
#[cfg(feature = "disable-upload")]
_disable_upload: opts.disable_upload,
});
@ -1179,6 +1188,7 @@ impl Session {
allow_overwrite: opts.overwrite,
output_folder,
disk_write_queue: self.disk_write_tx.clone(),
ratelimits: opts.ratelimits,
#[cfg(feature = "disable-upload")]
_disable_upload: self._disable_upload,
},

View file

@ -83,6 +83,7 @@ use tracing::{debug, error, error_span, info, trace, warn, Instrument};
use crate::{
chunk_tracker::{ChunkMarkingResult, ChunkTracker, HaveNeededSelected},
file_ops::FileOps,
limits::Limits,
peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
},
@ -201,6 +202,12 @@ pub struct TorrentStateLive {
pub(crate) streams: Arc<TorrentStreams>,
have_broadcast_tx: tokio::sync::broadcast::Sender<ValidPieceIndex>,
ratelimit_upload_tx: tokio::sync::mpsc::UnboundedSender<(
tokio::sync::mpsc::UnboundedSender<WriterRequest>,
ChunkInfo,
)>,
ratelimits: Limits,
}
impl TorrentStateLive {
@ -238,6 +245,12 @@ impl TorrentStateLive {
let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128);
let (ratelimit_upload_tx, ratelimit_upload_rx) = tokio::sync::mpsc::unbounded_channel::<(
tokio::sync::mpsc::UnboundedSender<WriterRequest>,
ChunkInfo,
)>();
let ratelimits = Limits::new(paused.shared.options.ratelimits);
let state = Arc::new(TorrentStateLive {
torrent: paused.shared.clone(),
peers: PeerStates {
@ -272,6 +285,8 @@ impl TorrentStateLive {
per_piece_locks: (0..lengths.total_pieces())
.map(|_| RwLock::new(()))
.collect(),
ratelimit_upload_tx,
ratelimits,
});
state.spawn(
@ -304,6 +319,11 @@ impl TorrentStateLive {
error_span!(parent: state.torrent.span.clone(), "peer_adder"),
state.clone().task_peer_adder(peer_queue_rx),
);
state.spawn(
error_span!(parent: state.torrent.span.clone(), "upload_scheduler"),
state.clone().task_upload_scheduler(ratelimit_upload_rx),
);
Ok(state)
}
@ -388,6 +408,26 @@ impl TorrentStateLive {
Ok(())
}
async fn task_upload_scheduler(
self: Arc<Self>,
mut rx: tokio::sync::mpsc::UnboundedReceiver<(
tokio::sync::mpsc::UnboundedSender<WriterRequest>,
ChunkInfo,
)>,
) -> anyhow::Result<()> {
while let Some((tx, ci)) = rx.recv().await {
self.ratelimits.prepare_for_upload(ci.size as usize).await;
if let Some(session) = self.torrent.session.upgrade() {
session
.ratelimits
.prepare_for_upload(ci.size as usize)
.await;
}
let _ = tx.send(WriterRequest::ReadChunkRequest(ci));
}
Ok(())
}
async fn task_manage_incoming_peer(
self: Arc<Self>,
checked_peer: CheckedIncomingConnection,
@ -1227,12 +1267,10 @@ impl PeerHandler {
);
}
// TODO: this is not super efficient as it does copying multiple times.
// Theoretically, this could be done in the sending code, so that it reads straight into
// the send buffer.
let request = WriterRequest::ReadChunkRequest(chunk_info);
trace!("sending {:?}", &request);
Ok::<_, anyhow::Error>(self.tx.send(request)?)
self.state
.ratelimit_upload_tx
.send((self.tx.clone(), chunk_info))?;
Ok(())
}
fn on_have(&self, have: u32) {
@ -1406,6 +1444,18 @@ impl PeerHandler {
None => return Ok(()),
};
self.state
.ratelimits
.prepare_for_download(request.length as usize)
.await;
if let Some(session) = self.state.torrent().session.upgrade() {
session
.ratelimits
.prepare_for_download(request.length as usize)
.await;
}
loop {
match aframe!(tokio::time::timeout(
Duration::from_secs(5),

View file

@ -36,6 +36,7 @@ use tracing::trace;
use tracing::warn;
use crate::chunk_tracker::ChunkTracker;
use crate::limits::LimitsConfig;
use crate::session::TorrentId;
use crate::spawn_utils::BlockingSpawner;
use crate::storage::BoxStorageFactory;
@ -103,6 +104,7 @@ pub(crate) struct ManagedTorrentOptions {
pub allow_overwrite: bool,
pub output_folder: PathBuf,
pub disk_write_queue: Option<DiskWorkQueueSender>,
pub ratelimits: LimitsConfig,
#[cfg(feature = "disable-upload")]
pub _disable_upload: bool,
}

View file

@ -480,6 +480,8 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()>
cancellation_token: Some(cancel.clone()),
#[cfg(feature = "disable-upload")]
disable_upload: opts.disable_upload,
// TODO: expose
ratelimits: Default::default(),
};
let stats_printer = |session: Arc<Session>| async move {