Merge pull request #197 from ikatson/fix-e2e-test

Other fixes found by e2e test
This commit is contained in:
Igor Katson 2024-08-19 14:32:08 +01:00 committed by GitHub
commit 6ed84ffcb3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 240 additions and 71 deletions

85
Cargo.lock generated
View file

@ -115,6 +115,33 @@ version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "async-backtrace"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcb391558246d27a13f195c1e3a53eda422270fdd452bd57a5aa9c1da1bb198"
dependencies = [
"async-backtrace-attributes",
"dashmap",
"futures",
"loom",
"once_cell",
"pin-project-lite",
"rustc-hash 1.1.0",
"static_assertions",
]
[[package]]
name = "async-backtrace-attributes"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "affbba0d438add06462a0371997575927bc05052f7ec486e7a4ca405c956c3d7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.74",
]
[[package]]
name = "async-recursion"
version = "1.1.1"
@ -963,6 +990,19 @@ dependencies = [
"slab",
]
[[package]]
name = "generator"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e"
dependencies = [
"cc",
"libc",
"log",
"rustversion",
"windows",
]
[[package]]
name = "generic-array"
version = "0.12.4"
@ -1467,6 +1507,7 @@ name = "librqbit"
version = "7.0.0-beta.0"
dependencies = [
"anyhow",
"async-backtrace",
"async-stream",
"async-trait",
"axum 0.7.5",
@ -1689,6 +1730,19 @@ version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "loom"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5"
dependencies = [
"cfg-if",
"generator",
"scoped-tls",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "lru"
version = "0.12.4"
@ -2203,7 +2257,7 @@ dependencies = [
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustc-hash 2.0.0",
"rustls",
"socket2",
"thiserror",
@ -2220,7 +2274,7 @@ dependencies = [
"bytes",
"rand",
"ring",
"rustc-hash",
"rustc-hash 2.0.0",
"rustls",
"slab",
"thiserror",
@ -2481,6 +2535,12 @@ version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hash"
version = "2.0.0"
@ -2562,6 +2622,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -2992,6 +3058,12 @@ dependencies = [
"urlencoding",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stringprep"
version = "0.1.5"
@ -3696,6 +3768,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f"
dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "windows-core"
version = "0.52.0"

View file

@ -22,6 +22,7 @@ storage_middleware = ["lru"]
storage_examples = []
tracing-subscriber-utils = ["tracing-subscriber"]
postgres = ["sqlx"]
async-bt = ["async-backtrace"]
[dependencies]
sqlx = { version = "0.7", features = [
@ -88,6 +89,7 @@ lru = { version = "0.12.3", optional = true }
mime_guess = { version = "2.0.5", default-features = false }
tokio-socks = "0.5.2"
async-trait = "0.1.81"
async-backtrace = { version = "0.2", optional = true }
[build-dependencies]
anyhow = "1"

View file

@ -222,25 +222,6 @@ impl ChunkTracker {
self.have[id.get() as usize]
}
// None if wrong chunk
// true if did something
// false if didn't do anything
pub fn mark_chunk_request_cancelled(
&mut self,
index: ValidPieceIndex,
_chunk: u32,
) -> Option<bool> {
if *self.have.get(index.get() as usize)? {
return Some(false);
}
// This will trigger the requesters to re-check each chunk in this piece.
let chunk_range = self.lengths.chunk_range(index);
if !self.chunk_status.get(chunk_range)?.all() {
self.queue_pieces.set(index.get() as usize, true);
}
Some(true)
}
pub fn mark_piece_broken_if_not_have(&mut self, index: ValidPieceIndex) {
if self
.have

View file

@ -25,6 +25,19 @@
#![warn(clippy::cast_possible_truncation)]
macro_rules! aframe {
($e:expr) => {{
#[cfg(feature = "async-bt")]
{
async_backtrace::frame!($e)
}
#[cfg(not(feature = "async-bt"))]
{
$e
}
}};
}
pub mod api;
mod api_error;
mod chunk_tracker;

View file

@ -199,7 +199,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
.await
.context("error reading handshake")?;
let h_supports_extended = h.supports_extended();
trace!(
debug!(
"connected: id={:?}",
try_decode_peer_id(Id20::new(h.peer_id))
);

View file

@ -54,14 +54,29 @@ impl TorrentStorage for FilesystemStorage {
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::FileExt;
Ok(of.file.read().read_exact_at(buf, offset)?)
Ok(of
.file
.read()
.as_ref()
.context("file is None")?
.read_exact_at(buf, offset)?)
}
#[cfg(not(target_family = "unix"))]
#[cfg(target_family = "windows")]
{
use std::os::windows::fs::FileExt;
let mut remaining = buf.len();
let g = of.file.read();
let f = g.as_ref().context("file is None")?;
f.seek_read(buf, offset)?;
Ok(())
}
#[cfg(not(any(target_family = "unix", target_family = "windows")))]
{
use std::io::{Read, Seek, SeekFrom};
let mut g = of.file.write();
g.seek(SeekFrom::Start(offset))?;
Ok(g.read_exact(buf)?)
let mut f = g.as_ref().context("file is None")?;
f.seek(SeekFrom::Start(offset))?;
Ok(f.read_exact(buf)?)
}
}
@ -70,14 +85,21 @@ impl TorrentStorage for FilesystemStorage {
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::FileExt;
Ok(of.file.read().write_all_at(buf, offset)?)
Ok(of
.file
.read()
.as_ref()
.context("file is None")?
.write_all_at(buf, offset)?)
}
#[cfg(target_family = "windows")]
{
use std::os::windows::fs::FileExt;
let mut remaining = buf.len();
let g = of.file.read();
let f = g.as_ref().context("file is None")?;
while remaining > 0 {
remaining -= of.file.read().seek_write(buf, offset)?;
remaining -= f.seek_write(buf, offset)?;
}
Ok(())
}
@ -85,8 +107,9 @@ impl TorrentStorage for FilesystemStorage {
{
use std::io::{Read, Seek, SeekFrom, Write};
let mut g = of.file.write();
g.seek(SeekFrom::Start(offset))?;
Ok(g.write_all(buf)?)
let mut f = g.as_ref().context("file is None")?;
f.seek(SeekFrom::Start(offset))?;
Ok(f.write_all(buf)?)
}
}
@ -95,7 +118,12 @@ impl TorrentStorage for FilesystemStorage {
}
fn ensure_file_length(&self, file_id: usize, len: u64) -> anyhow::Result<()> {
Ok(self.opened_files[file_id].file.write().set_len(len)?)
Ok(self.opened_files[file_id]
.file
.write()
.as_ref()
.context("file is None")?
.set_len(len)?)
}
fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>> {

View file

@ -102,9 +102,10 @@ impl TorrentStorage for MmapFilesystemStorage {
let mut mmaps = Vec::new();
for (idx, file) in self.fs.opened_files.iter().enumerate() {
let fg = file.file.write();
let fg = fg.as_ref().context("file is None")?;
fg.set_len(meta.file_infos[idx].len)
.context("mmap storage: error setting length")?;
let mmap = unsafe { MmapOptions::new().map_mut(&*fg) }.context("error mapping file")?;
let mmap = unsafe { MmapOptions::new().map_mut(fg) }.context("error mapping file")?;
mmaps.push(RwLock::new(mmap));
}

View file

@ -1,37 +1,22 @@
use std::fs::File;
use anyhow::Context;
use parking_lot::RwLock;
#[derive(Debug)]
pub(crate) struct OpenedFile {
pub file: RwLock<File>,
}
pub(crate) fn dummy_file() -> anyhow::Result<std::fs::File> {
#[cfg(target_os = "windows")]
const DEVNULL: &str = "NUL";
#[cfg(not(target_os = "windows"))]
const DEVNULL: &str = "/dev/null";
std::fs::OpenOptions::new()
.read(true)
.open(DEVNULL)
.with_context(|| format!("error opening {}", DEVNULL))
pub file: RwLock<Option<File>>,
}
impl OpenedFile {
pub fn new(f: File) -> Self {
Self {
file: RwLock::new(f),
file: RwLock::new(Some(f)),
}
}
pub fn take(&self) -> anyhow::Result<File> {
pub fn take(&self) -> anyhow::Result<Option<File>> {
let mut f = self.file.write();
let dummy = dummy_file()?;
let f = std::mem::replace(&mut *f, dummy);
Ok(f)
Ok(f.take())
}
pub fn take_clone(&self) -> anyhow::Result<Self> {

View file

@ -14,14 +14,30 @@ use tracing::{error_span, info, Instrument};
use crate::{
create_torrent,
tests::test_util::{create_default_random_dir_with_torrents, TestPeerMetadata},
tests::test_util::{
create_default_random_dir_with_torrents, spawn_debug_server, TestPeerMetadata,
},
AddTorrentOptions, AddTorrentResponse, Session, SessionOptions,
};
#[tokio::test(flavor = "multi_thread", worker_threads = 64)]
async fn test_e2e_download() {
let timeout = std::env::var("E2E_TIMEOUT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(180);
tokio::time::timeout(Duration::from_secs(timeout), _test_e2e_download())
.await
.context("test_e2e_download timed out")
.unwrap()
}
async fn _test_e2e_download() {
let _ = tracing_subscriber::fmt::try_init();
spawn_debug_server();
// 1. Create a torrent
// Ideally (for a more complicated test) with N files, and at least N pieces that span 2 files.
@ -41,14 +57,17 @@ async fn test_e2e_download() {
.await
.unwrap();
let num_servers = 128;
let num_servers = std::env::var("E2E_NUM_SERVERS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(128u8);
let torrent_file_bytes = torrent_file.as_bytes().unwrap();
let mut futs = Vec::new();
// 2. Start N servers that are serving that torrent, and return their IP:port combos.
// Disable DHT on each.
for i in 0u8..num_servers {
for i in 0..num_servers {
let torrent_file_bytes = torrent_file_bytes.clone();
let tempdir = tempdir.path().to_owned();
let fut = spawn(

View file

@ -1,8 +1,11 @@
use std::{io::Write, path::Path};
use anyhow::Context;
use axum::{response::IntoResponse, routing::get, Router};
use librqbit_core::Id20;
use rand::{thread_rng, Rng, RngCore, SeedableRng};
use tempfile::TempDir;
use tracing::info;
pub fn create_new_file_with_random_content(path: &Path, mut size: usize) {
let mut file = std::fs::OpenOptions::new()
@ -79,3 +82,38 @@ impl TestPeerMetadata {
0f64
}
}
async fn debug_server() -> anyhow::Result<()> {
async fn backtraces() -> impl IntoResponse {
#[cfg(feature = "async-bt")]
{
async_backtrace::taskdump_tree(true)
}
#[cfg(not(feature = "async-bt"))]
{
use crate::ApiError;
ApiError::from(anyhow::anyhow!(
"backtraces not enabled, enable async-bt feature"
))
}
}
let app = Router::new().route("/backtrace", get(backtraces));
let app = app.into_make_service();
let addr = "127.0.0.1:3032";
info!(%addr, "starting HTTP server");
use tokio::net::TcpListener;
let listener = TcpListener::bind(addr)
.await
.with_context(|| format!("error binding to {addr}"))?;
axum::serve(listener, app).await?;
Ok(())
}
pub fn spawn_debug_server() {
tokio::spawn(debug_server());
}

View file

@ -70,15 +70,12 @@ use peer_binary_protocol::{
extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage},
Handshake, Message, MessageOwned, Piece, Request,
};
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Notify, OwnedSemaphorePermit, Semaphore,
},
time::timeout,
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Notify, OwnedSemaphorePermit, Semaphore,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, error_span, info, trace, warn};
use tracing::{debug, error, error_span, info, trace, warn, Instrument};
use crate::{
chunk_tracker::{ChunkMarkingResult, ChunkTracker, HaveNeededSelected},
@ -177,6 +174,7 @@ pub struct TorrentStateLive {
peer_queue_tx: UnboundedSender<SocketAddr>,
finished_notify: Notify,
new_pieces_notify: Notify,
down_speed_estimator: SpeedEstimator,
up_speed_estimator: SpeedEstimator,
@ -233,6 +231,7 @@ impl TorrentStateLive {
},
lengths,
peer_semaphore: Arc::new(Semaphore::new(128)),
new_pieces_notify: Notify::new(),
peer_queue_tx,
finished_notify: Notify::new(),
down_speed_estimator,
@ -347,8 +346,9 @@ impl TorrentStateLive {
"manage_incoming_peer",
addr = %checked_peer.addr
),
self.clone()
.task_manage_incoming_peer(checked_peer, counters, tx, rx, permit),
aframe!(self
.clone()
.task_manage_incoming_peer(checked_peer, counters, tx, rx, permit)),
);
Ok(())
}
@ -449,7 +449,12 @@ impl TorrentStateLive {
state.meta.spawner,
state.meta.connector.clone(),
);
let requester = handler.task_peer_chunk_requester();
let requester = aframe!(handler
.task_peer_chunk_requester()
.instrument(error_span!("chunk_requester")));
let conn_manager = aframe!(peer_connection
.manage_peer_outgoing(rx, state.have_broadcast_tx.subscribe())
.instrument(error_span!("peer_connection")));
handler
.counters
@ -457,7 +462,7 @@ impl TorrentStateLive {
.fetch_add(1, Ordering::Relaxed);
let res = tokio::select! {
r = requester => {r}
r = peer_connection.manage_peer_outgoing(rx, state.have_broadcast_tx.subscribe()) => {r}
r = conn_manager => {r}
};
match res {
@ -490,7 +495,7 @@ impl TorrentStateLive {
let permit = state.peer_semaphore.clone().acquire_owned().await?;
state.spawn(
error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()),
state.clone().task_manage_outgoing_peer(addr, permit),
aframe!(state.clone().task_manage_outgoing_peer(addr, permit)),
);
}
}
@ -897,7 +902,8 @@ impl PeerHandler {
req.chunk_index
);
g.get_chunks_mut()?
.mark_chunk_request_cancelled(req.piece_index, req.chunk_index);
.mark_piece_broken_if_not_have(req.piece_index);
self.state.new_pieces_notify.notify_waiters();
}
}
PeerState::NotNeeded => {
@ -926,7 +932,7 @@ impl PeerHandler {
self.counters.errors.fetch_add(1, Ordering::Relaxed);
if self.state.is_finished_and_dont_need_peers() {
trace!("torrent finished, not re-queueing");
debug!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return Ok(());
}
@ -1208,7 +1214,7 @@ impl PeerHandler {
}
loop {
self.wait_for_unchoke().await;
aframe!(self.wait_for_unchoke()).await;
if self.state.is_finished_and_dont_need_peers() {
debug!("nothing left to do, disconnecting peer");
@ -1219,6 +1225,7 @@ impl PeerHandler {
// to download early pieces.
// Then try get the next one in queue.
// Afterwards means we are close to completion, try stealing more aggressively.
let new_piece_notify = self.state.new_pieces_notify.notified();
let next = match self
.try_steal_old_slow_piece(10.)
.map_or_else(|| self.reserve_next_needed_piece(), |v| Ok(Some(v)))?
@ -1227,7 +1234,15 @@ impl PeerHandler {
Some(next) => next,
None => {
debug!("no pieces to request");
tokio::time::sleep(Duration::from_secs(10)).await;
match aframe!(tokio::time::timeout(
Duration::from_secs(10),
new_piece_notify
))
.await
{
Ok(()) => debug!("woken up, new pieces might be available"),
Err(_) => debug!("woken up by sleep timer"),
}
continue;
}
};
@ -1261,7 +1276,12 @@ impl PeerHandler {
};
loop {
match timeout(Duration::from_secs(10), self.requests_sem.acquire()).await {
match aframe!(tokio::time::timeout(
Duration::from_secs(10),
aframe!(self.requests_sem.acquire())
))
.await
{
Ok(acq) => break acq?.forget(),
Err(_) => continue,
};
@ -1490,6 +1510,7 @@ impl PeerHandler {
.lock_write("mark_piece_broken")
.get_chunks_mut()?
.mark_piece_broken_if_not_have(chunk_info.piece_index);
state.new_pieces_notify.notify_waiters();
anyhow::bail!("i am probably a bogus peer. dying.")
}
};