From 56311fb4df7640b091904ecf3394988c65dccdd4 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 22 Nov 2023 21:56:00 +0000 Subject: [PATCH] DHT Rate limiting --- Cargo.lock | 18 +++++++++++++++--- crates/dht/Cargo.toml | 3 ++- crates/dht/examples/dht.rs | 12 +++++++++--- crates/dht/src/dht.rs | 21 +++++++++++++++++++++ crates/librqbit/Cargo.toml | 4 ++-- crates/rqbit/Cargo.toml | 6 +++--- 6 files changed, 52 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9b87610..ff0e522 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -817,6 +817,17 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "leaky-bucket" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eb491abd89e9794d50f93c8db610a29509123e3fbbc9c8c67a528e9391cd853" +dependencies = [ + "parking_lot", + "tokio", + "tracing", +] + [[package]] name = "libc" version = "0.2.150" @@ -836,7 +847,7 @@ dependencies = [ [[package]] name = "librqbit" -version = "3.2.0" +version = "3.3.0" dependencies = [ "anyhow", "axum", @@ -918,13 +929,14 @@ dependencies = [ [[package]] name = "librqbit-dht" -version = "3.0.0" +version = "3.1.0" dependencies = [ "anyhow", "directories", "futures", "hex 0.4.3", "indexmap", + "leaky-bucket", "librqbit-bencode", "librqbit-clone-to-owned", "librqbit-core", @@ -1491,7 +1503,7 @@ dependencies = [ [[package]] name = "rqbit" -version = "3.2.0" +version = "3.3.0" dependencies = [ "anyhow", "clap", diff --git a/crates/dht/Cargo.toml b/crates/dht/Cargo.toml index b2f2240..3504526 100644 --- a/crates/dht/Cargo.toml +++ b/crates/dht/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit-dht" -version = "3.0.0" +version = "3.1.0" edition = "2018" description = "DHT implementation, used in rqbit torrent client." license = "Apache-2.0" @@ -20,6 +20,7 @@ sha1-rust = ["bencode/sha1-rust", "librqbit-core/sha1-rust"] tokio = {version = "1", features = ["macros", "rt-multi-thread", "net", "sync"]} tokio-stream = {version = "0.1", features = ["sync"]} serde = {version = "1", features = ["derive"]} +leaky-bucket = "1" serde_json = "1" hex = "0.4" bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"} diff --git a/crates/dht/examples/dht.rs b/crates/dht/examples/dht.rs index 469e0f6..cac7bd6 100644 --- a/crates/dht/examples/dht.rs +++ b/crates/dht/examples/dht.rs @@ -1,13 +1,19 @@ -use std::{str::FromStr, time::Duration}; +use std::time::Duration; use anyhow::Context; -use librqbit_dht::{Dht, Id20}; +use librqbit_core::magnet::Magnet; +use librqbit_dht::Dht; use tokio_stream::StreamExt; use tracing::info; #[tokio::main] async fn main() -> anyhow::Result<()> { - let info_hash = Id20::from_str("64a980abe6e448226bb930ba061592e44c3781a1").unwrap(); + let magnet = std::env::args() + .nth(1) + .expect("first argument should be a magnet link"); + let magnet = Magnet::parse(&magnet).unwrap(); + let info_hash = magnet.info_hash; + tracing_subscriber::fmt::init(); let dht = Dht::new().await.context("error initializing DHT")?; diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 970d78c..3808ecd 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -3,6 +3,7 @@ use std::{ net::SocketAddr, sync::Arc, task::Poll, + time::Duration, }; use crate::{ @@ -16,6 +17,7 @@ use anyhow::Context; use bencode::ByteString; use futures::{stream::FuturesUnordered, Stream, StreamExt}; use indexmap::IndexSet; +use leaky_bucket::RateLimiter; use librqbit_core::{id20::Id20, peer_id::generate_peer_id}; use parking_lot::RwLock; use rand::Rng; @@ -404,6 +406,23 @@ impl DhtState { } } +fn make_rate_limiter() -> RateLimiter { + // TODO: move to configuration, i'm lazy. + let dht_queries_per_second = std::env::var("DHT_QUERIES_PER_SECOND") + .map(|v| v.parse().expect("couldn't parse DHT_QUERIES_PER_SECOND")) + .unwrap_or(250usize); + + let per_100_ms = dht_queries_per_second / 10; + + RateLimiter::builder() + .initial(per_100_ms) + .max(dht_queries_per_second) + .interval(Duration::from_millis(100)) + .fair(false) + .refill(per_100_ms) + .build() +} + async fn run_framer( socket: &UdpSocket, mut input_rx: UnboundedReceiver<(Message, SocketAddr)>, @@ -411,11 +430,13 @@ async fn run_framer( ) -> anyhow::Result<()> { let writer = async { let mut buf = Vec::new(); + let rate_limiter = make_rate_limiter(); while let Some((msg, addr)) = input_rx.recv().await { let addr = match addr { SocketAddr::V4(v4) => v4, SocketAddr::V6(_) => continue, }; + rate_limiter.acquire_one().await; trace!("{}: sending {:?}", addr, &msg); buf.clear(); bprotocol::serialize_message( diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 37435f9..6f92138 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit" -version = "3.2.0" +version = "3.3.0" authors = ["Igor Katson "] edition = "2018" description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it." @@ -28,7 +28,7 @@ librqbit-core = {path = "../librqbit_core", version = "3.0.0"} clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.0.0"} sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"} -dht = {path = "../dht", package="librqbit-dht", version="3.0.0"} +dht = {path = "../dht", package="librqbit-dht", version="3.1.0"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} axum = {version = "0.6"} diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index b240b92..d568fe2 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rqbit" -version = "3.2.0" +version = "3.3.0" authors = ["Igor Katson "] edition = "2018" description = "A bittorrent command line client and server." @@ -22,8 +22,8 @@ default-tls = ["librqbit/default-tls"] rust-tls = ["librqbit/rust-tls"] [dependencies] -librqbit = {path="../librqbit", default-features=false, version = "3.1.0"} -dht = {path="../dht", package="librqbit-dht", version="3.0.0"} +librqbit = {path="../librqbit", default-features=false, version = "3.3.0"} +dht = {path="../dht", package="librqbit-dht", version="3.1.0"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} anyhow = "1" clap = {version = "4", features = ["derive", "deprecated"]}