diff --git a/Cargo.lock b/Cargo.lock index 3bdbe5c..13d5a3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2593,6 +2593,7 @@ dependencies = [ "hex 0.4.3", "http", "itertools 0.13.0", + "leaky-bucket", "librqbit-bencode", "librqbit-buffers", "librqbit-clone-to-owned", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 1639863..825d8d4 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -61,6 +61,7 @@ tokio = { version = "1", features = [ "fs", "io-util", ] } +leaky-bucket = "1.1" console-subscriber = { version = "0.4", optional = true } axum = { version = "0.7", optional = true } tower-http = { version = "0.5", features = ["cors", "trace"], optional = true } diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 455381c..0eadf6f 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -51,6 +51,7 @@ mod file_ops; pub mod http_api; #[cfg(feature = "http-api")] pub mod http_api_client; +mod limits; mod merge_streams; mod peer_connection; mod peer_info_reader; diff --git a/crates/librqbit/src/limits.rs b/crates/librqbit/src/limits.rs new file mode 100644 index 0000000..5df402c --- /dev/null +++ b/crates/librqbit/src/limits.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use leaky_bucket::RateLimiter; +use peer_binary_protocol::PIECE_MESSAGE_DEFAULT_LEN; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Default, Serialize, Deserialize, Clone, Copy)] +pub struct LimitsConfig { + pub upload_bps: Option, + pub download_bps: Option, +} + +#[derive(Default)] +pub struct Limits { + down: Option, + up: Option, +} + +impl Limits { + pub fn new(config: LimitsConfig) -> Self { + let new = |bps: usize| -> RateLimiter { + let b_per_100_ms = bps.div_ceil(10); + RateLimiter::builder() + .interval(Duration::from_millis(100)) + .refill(b_per_100_ms) + // whatever the limit is, we need to be able to download / upload a chunk + .max(PIECE_MESSAGE_DEFAULT_LEN.max(bps)) + .build() + }; + Self { + down: config.download_bps.map(new), + up: config.upload_bps.map(new), + } + } + + pub async fn prepare_for_upload(&self, len: usize) { + if let Some(rl) = self.up.as_ref() { + rl.acquire(len).await; + } + } + + pub async fn prepare_for_download(&self, len: usize) { + if let Some(rl) = self.down.as_ref() { + rl.acquire(len).await; + } + } +} diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index edd24cd..ca9b923 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -13,6 +13,7 @@ use crate::{ bitv_factory::{BitVFactory, NonPersistentBitVFactory}, dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, file_info::FileInfo, + limits::{Limits, LimitsConfig}, merge_streams::merge_streams, peer_connection::PeerConnectionOptions, read_buf::ReadBuf, @@ -122,6 +123,8 @@ pub struct Session { root_span: Option, + pub(crate) ratelimits: Limits, + pub(crate) stats: SessionStats, #[cfg(feature = "disable-upload")] @@ -264,6 +267,9 @@ pub struct AddTorrentOptions { #[serde(default)] pub disable_trackers: bool, + #[serde(default)] + pub ratelimits: LimitsConfig, + /// Initial peers to start of with. pub initial_peers: Option>, @@ -421,6 +427,8 @@ pub struct SessionOptions { // the root span to use. If not set will be None. pub root_span: Option, + pub ratelimits: LimitsConfig, + #[cfg(feature = "disable-upload")] pub disable_upload: bool, } @@ -636,6 +644,7 @@ impl Session { concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new( opts.concurrent_init_limit.unwrap_or(3), )), + ratelimits: Limits::new(opts.ratelimits), #[cfg(feature = "disable-upload")] _disable_upload: opts.disable_upload, }); @@ -1179,6 +1188,7 @@ impl Session { allow_overwrite: opts.overwrite, output_folder, disk_write_queue: self.disk_write_tx.clone(), + ratelimits: opts.ratelimits, #[cfg(feature = "disable-upload")] _disable_upload: self._disable_upload, }, diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 220c637..8ff5ebe 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -83,6 +83,7 @@ use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ chunk_tracker::{ChunkMarkingResult, ChunkTracker, HaveNeededSelected}, file_ops::FileOps, + limits::Limits, peer_connection::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, @@ -201,6 +202,12 @@ pub struct TorrentStateLive { pub(crate) streams: Arc, have_broadcast_tx: tokio::sync::broadcast::Sender, + + ratelimit_upload_tx: tokio::sync::mpsc::UnboundedSender<( + tokio::sync::mpsc::UnboundedSender, + ChunkInfo, + )>, + ratelimits: Limits, } impl TorrentStateLive { @@ -238,6 +245,12 @@ impl TorrentStateLive { let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128); + let (ratelimit_upload_tx, ratelimit_upload_rx) = tokio::sync::mpsc::unbounded_channel::<( + tokio::sync::mpsc::UnboundedSender, + ChunkInfo, + )>(); + let ratelimits = Limits::new(paused.shared.options.ratelimits); + let state = Arc::new(TorrentStateLive { torrent: paused.shared.clone(), peers: PeerStates { @@ -272,6 +285,8 @@ impl TorrentStateLive { per_piece_locks: (0..lengths.total_pieces()) .map(|_| RwLock::new(())) .collect(), + ratelimit_upload_tx, + ratelimits, }); state.spawn( @@ -304,6 +319,11 @@ impl TorrentStateLive { error_span!(parent: state.torrent.span.clone(), "peer_adder"), state.clone().task_peer_adder(peer_queue_rx), ); + + state.spawn( + error_span!(parent: state.torrent.span.clone(), "upload_scheduler"), + state.clone().task_upload_scheduler(ratelimit_upload_rx), + ); Ok(state) } @@ -388,6 +408,26 @@ impl TorrentStateLive { Ok(()) } + async fn task_upload_scheduler( + self: Arc, + mut rx: tokio::sync::mpsc::UnboundedReceiver<( + tokio::sync::mpsc::UnboundedSender, + ChunkInfo, + )>, + ) -> anyhow::Result<()> { + while let Some((tx, ci)) = rx.recv().await { + self.ratelimits.prepare_for_upload(ci.size as usize).await; + if let Some(session) = self.torrent.session.upgrade() { + session + .ratelimits + .prepare_for_upload(ci.size as usize) + .await; + } + let _ = tx.send(WriterRequest::ReadChunkRequest(ci)); + } + Ok(()) + } + async fn task_manage_incoming_peer( self: Arc, checked_peer: CheckedIncomingConnection, @@ -1227,12 +1267,10 @@ impl PeerHandler { ); } - // TODO: this is not super efficient as it does copying multiple times. - // Theoretically, this could be done in the sending code, so that it reads straight into - // the send buffer. - let request = WriterRequest::ReadChunkRequest(chunk_info); - trace!("sending {:?}", &request); - Ok::<_, anyhow::Error>(self.tx.send(request)?) + self.state + .ratelimit_upload_tx + .send((self.tx.clone(), chunk_info))?; + Ok(()) } fn on_have(&self, have: u32) { @@ -1406,6 +1444,18 @@ impl PeerHandler { None => return Ok(()), }; + self.state + .ratelimits + .prepare_for_download(request.length as usize) + .await; + + if let Some(session) = self.state.torrent().session.upgrade() { + session + .ratelimits + .prepare_for_download(request.length as usize) + .await; + } + loop { match aframe!(tokio::time::timeout( Duration::from_secs(5), diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index cf8940a..4c16106 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -36,6 +36,7 @@ use tracing::trace; use tracing::warn; use crate::chunk_tracker::ChunkTracker; +use crate::limits::LimitsConfig; use crate::session::TorrentId; use crate::spawn_utils::BlockingSpawner; use crate::storage::BoxStorageFactory; @@ -103,6 +104,7 @@ pub(crate) struct ManagedTorrentOptions { pub allow_overwrite: bool, pub output_folder: PathBuf, pub disk_write_queue: Option, + pub ratelimits: LimitsConfig, #[cfg(feature = "disable-upload")] pub _disable_upload: bool, } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index a03c87c..9a0eb5b 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -480,6 +480,8 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> cancellation_token: Some(cancel.clone()), #[cfg(feature = "disable-upload")] disable_upload: opts.disable_upload, + // TODO: expose + ratelimits: Default::default(), }; let stats_printer = |session: Arc| async move {