Perfect sim is working!

This commit is contained in:
Igor Katson 2024-05-03 09:09:16 +01:00
parent 4ac512121b
commit 438392da1d
3 changed files with 44 additions and 30 deletions

View file

@ -105,6 +105,10 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
if self.addr.ip().to_string() != "213.189.217.38" {
bail!("bad ip")
}
let rwtimeout = self let rwtimeout = self
.options .options
.read_write_timeout .read_write_timeout
@ -149,6 +153,10 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
&self, &self,
outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>, outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if self.addr.ip().to_string() != "213.189.217.38" {
bail!("bad ip")
}
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
let rwtimeout = self let rwtimeout = self
.options .options

View file

@ -2,9 +2,13 @@
A storage middleware that slows down the underlying storage. A storage middleware that slows down the underlying storage.
*/ */
use std::time::Duration; use std::{
fs::File,
io::{BufRead, BufReader, Lines},
time::Duration,
};
use rand_distr::Distribution; use parking_lot::Mutex;
use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage};
@ -27,6 +31,19 @@ impl<U: StorageFactory + Clone> StorageFactory for SlowStorageFactory<U> {
fn init_storage(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result<Self::Storage> { fn init_storage(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
Ok(SlowStorage { Ok(SlowStorage {
underlying: self.underlying_factory.init_storage(info)?, underlying: self.underlying_factory.init_storage(info)?,
pwrite_all_bufread: Mutex::new(
BufReader::new(
File::open("/Users/igor/Downloads/rqbit-log-slow-disk.log-pwrite_all").unwrap(),
)
.lines(),
),
pread_exact_bufread: Mutex::new(
BufReader::new(
File::open("/Users/igor/Downloads/rqbit-log-slow-disk.log-pread_exact")
.unwrap(),
)
.lines(),
),
}) })
} }
@ -41,39 +58,25 @@ impl<U: StorageFactory + Clone> StorageFactory for SlowStorageFactory<U> {
pub struct SlowStorage<U> { pub struct SlowStorage<U> {
underlying: U, underlying: U,
pwrite_all_bufread: Mutex<Lines<BufReader<File>>>,
pread_exact_bufread: Mutex<Lines<BufReader<File>>>,
} }
fn random_duration() -> Duration { fn sleep_from_reader(r: &Mutex<Lines<BufReader<File>>>) {
use rand_distr::StandardNormal; let mut g = r.lock();
let micros: u64 = g.next().unwrap().unwrap().parse().unwrap();
let s = StandardNormal {}; let sl = Duration::from_micros(micros);
let sl: f64 = s.sample(&mut rand::thread_rng());
// let sl = Duration::from_secs_f64(sl);
// tracing::trace!(duration = ?sl, "sleeping");
// std::thread::sleep(sl)
//
let micros = 340f64 + sl * 200.;
// 16 is max blocking threads
let micros = micros.max(0.001) * 4. * 16.;
#[allow(clippy::cast_possible_truncation)]
Duration::from_micros(micros as u64)
}
fn random_sleep() {
let sl = random_duration();
tracing::trace!(duration = ?sl, "sleeping");
std::thread::sleep(sl) std::thread::sleep(sl)
} }
impl<U: TorrentStorage> TorrentStorage for SlowStorage<U> { impl<U: TorrentStorage> TorrentStorage for SlowStorage<U> {
fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> {
random_sleep(); sleep_from_reader(&self.pread_exact_bufread);
self.underlying.pread_exact(file_id, offset, buf) self.underlying.pread_exact(file_id, offset, buf)
} }
fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> {
random_sleep(); sleep_from_reader(&self.pwrite_all_bufread);
self.underlying.pwrite_all(file_id, offset, buf) self.underlying.pwrite_all(file_id, offset, buf)
} }
@ -86,8 +89,6 @@ impl<U: TorrentStorage> TorrentStorage for SlowStorage<U> {
} }
fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>> { fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>> {
Ok(Box::new(SlowStorage { anyhow::bail!("not implemented")
underlying: self.underlying.take()?,
}))
} }
} }

View file

@ -9,7 +9,8 @@ use librqbit::{
http_api_client, librqbit_spawn, http_api_client, librqbit_spawn,
storage::{ storage::{
filesystem::{FilesystemStorageFactory, MmapFilesystemStorageFactory}, filesystem::{FilesystemStorageFactory, MmapFilesystemStorageFactory},
StorageFactoryExt, middleware::{slow::SlowStorageFactory, timing::TimingStorageFactory},
StorageFactory, StorageFactoryExt,
}, },
tracing_subscriber_config_utils::{init_logging, InitLoggingOptions}, tracing_subscriber_config_utils::{init_logging, InitLoggingOptions},
AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ListOnlyResponse, AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ListOnlyResponse,
@ -298,10 +299,14 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
enable_upnp_port_forwarding: !opts.disable_upnp, enable_upnp_port_forwarding: !opts.disable_upnp,
default_defer_writes: opts.defer_writes, default_defer_writes: opts.defer_writes,
default_storage_factory: Some({ default_storage_factory: Some({
fn wrap<S: StorageFactory + Clone>(s: S) -> impl StorageFactory {
TimingStorageFactory::new("hdd".to_owned(), SlowStorageFactory::new(s))
}
if opts.experimental_mmap_storage { if opts.experimental_mmap_storage {
MmapFilesystemStorageFactory::default().boxed() wrap(MmapFilesystemStorageFactory::default()).boxed()
} else { } else {
FilesystemStorageFactory::default().boxed() wrap(FilesystemStorageFactory::default()).boxed()
} }
}), }),
}; };