diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index b33857f..106367f 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -105,6 +105,10 @@ impl PeerConnection { ) -> anyhow::Result<()> { use tokio::io::AsyncWriteExt; + if self.addr.ip().to_string() != "213.189.217.38" { + bail!("bad ip") + } + let rwtimeout = self .options .read_write_timeout @@ -149,6 +153,10 @@ impl PeerConnection { &self, outgoing_chan: tokio::sync::mpsc::UnboundedReceiver, ) -> anyhow::Result<()> { + if self.addr.ip().to_string() != "213.189.217.38" { + bail!("bad ip") + } + use tokio::io::AsyncWriteExt; let rwtimeout = self .options diff --git a/crates/librqbit/src/storage/middleware/slow.rs b/crates/librqbit/src/storage/middleware/slow.rs index 496d41e..61726f5 100644 --- a/crates/librqbit/src/storage/middleware/slow.rs +++ b/crates/librqbit/src/storage/middleware/slow.rs @@ -2,9 +2,13 @@ A storage middleware that slows down the underlying storage. */ -use std::time::Duration; +use std::{ + fs::File, + io::{BufRead, BufReader, Lines}, + time::Duration, +}; -use rand_distr::Distribution; +use parking_lot::Mutex; use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; @@ -27,6 +31,19 @@ impl StorageFactory for SlowStorageFactory { fn init_storage(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { Ok(SlowStorage { underlying: self.underlying_factory.init_storage(info)?, + pwrite_all_bufread: Mutex::new( + BufReader::new( + File::open("/Users/igor/Downloads/rqbit-log-slow-disk.log-pwrite_all").unwrap(), + ) + .lines(), + ), + pread_exact_bufread: Mutex::new( + BufReader::new( + File::open("/Users/igor/Downloads/rqbit-log-slow-disk.log-pread_exact") + .unwrap(), + ) + .lines(), + ), }) } @@ -41,39 +58,25 @@ impl StorageFactory for SlowStorageFactory { pub struct SlowStorage { underlying: U, + pwrite_all_bufread: Mutex>>, + pread_exact_bufread: Mutex>>, } -fn random_duration() -> Duration { - use rand_distr::StandardNormal; - - let s = StandardNormal {}; - - let sl: f64 = s.sample(&mut rand::thread_rng()); - // let sl = Duration::from_secs_f64(sl); - // tracing::trace!(duration = ?sl, "sleeping"); - // std::thread::sleep(sl) - // - let micros = 340f64 + sl * 200.; - // 16 is max blocking threads - let micros = micros.max(0.001) * 4. * 16.; - #[allow(clippy::cast_possible_truncation)] - Duration::from_micros(micros as u64) -} - -fn random_sleep() { - let sl = random_duration(); - tracing::trace!(duration = ?sl, "sleeping"); +fn sleep_from_reader(r: &Mutex>>) { + let mut g = r.lock(); + let micros: u64 = g.next().unwrap().unwrap().parse().unwrap(); + let sl = Duration::from_micros(micros); std::thread::sleep(sl) } impl TorrentStorage for SlowStorage { fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { - random_sleep(); + sleep_from_reader(&self.pread_exact_bufread); self.underlying.pread_exact(file_id, offset, buf) } fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { - random_sleep(); + sleep_from_reader(&self.pwrite_all_bufread); self.underlying.pwrite_all(file_id, offset, buf) } @@ -86,8 +89,6 @@ impl TorrentStorage for SlowStorage { } fn take(&self) -> anyhow::Result> { - Ok(Box::new(SlowStorage { - underlying: self.underlying.take()?, - })) + anyhow::bail!("not implemented") } } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index ce1719a..5c907e8 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -9,7 +9,8 @@ use librqbit::{ http_api_client, librqbit_spawn, storage::{ filesystem::{FilesystemStorageFactory, MmapFilesystemStorageFactory}, - StorageFactoryExt, + middleware::{slow::SlowStorageFactory, timing::TimingStorageFactory}, + StorageFactory, StorageFactoryExt, }, tracing_subscriber_config_utils::{init_logging, InitLoggingOptions}, AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ListOnlyResponse, @@ -298,10 +299,14 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { enable_upnp_port_forwarding: !opts.disable_upnp, default_defer_writes: opts.defer_writes, default_storage_factory: Some({ + fn wrap(s: S) -> impl StorageFactory { + TimingStorageFactory::new("hdd".to_owned(), SlowStorageFactory::new(s)) + } + if opts.experimental_mmap_storage { - MmapFilesystemStorageFactory::default().boxed() + wrap(MmapFilesystemStorageFactory::default()).boxed() } else { - FilesystemStorageFactory::default().boxed() + wrap(FilesystemStorageFactory::default()).boxed() } }), };