362 lines
12 KiB
Rust
362 lines
12 KiB
Rust
use std::{
|
|
net::{Ipv4Addr, SocketAddr},
|
|
time::Duration,
|
|
};
|
|
|
|
use anyhow::{bail, Context};
|
|
use librqbit_core::magnet::Magnet;
|
|
use rand::Rng;
|
|
use tokio::{
|
|
spawn,
|
|
time::{interval, timeout},
|
|
};
|
|
use tracing::{error, error_span, info, Instrument};
|
|
|
|
use crate::{
|
|
create_torrent,
|
|
tests::test_util::{
|
|
create_default_random_dir_with_torrents, setup_test_logging, spawn_debug_server,
|
|
DropChecks, TestPeerMetadata,
|
|
},
|
|
AddTorrentOptions, AddTorrentResponse, Session, SessionOptions, SessionPersistenceConfig,
|
|
};
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 64)]
|
|
async fn test_e2e_download() {
|
|
let timeout = std::env::var("E2E_TIMEOUT")
|
|
.ok()
|
|
.and_then(|v| v.parse().ok())
|
|
.unwrap_or(180);
|
|
|
|
let drop_checks = DropChecks::default();
|
|
tokio::time::timeout(
|
|
Duration::from_secs(timeout),
|
|
_test_e2e_download(&drop_checks),
|
|
)
|
|
.await
|
|
.context("test_e2e_download timed out")
|
|
.unwrap();
|
|
|
|
// Wait to ensure everything is dropped.
|
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
|
|
|
let metrics = tokio::runtime::Handle::current().metrics();
|
|
assert_eq!(metrics.num_alive_tasks(), 1);
|
|
|
|
drop_checks.check().unwrap();
|
|
}
|
|
|
|
async fn _test_e2e_download(drop_checks: &DropChecks) {
|
|
setup_test_logging();
|
|
match crate::try_increase_nofile_limit() {
|
|
Ok(limit) => info!(limit, "increased ulimit"),
|
|
Err(e) => error!(error=?e, "error increasing ulimit"),
|
|
};
|
|
|
|
spawn_debug_server();
|
|
|
|
// 1. Create a torrent
|
|
// Ideally (for a more complicated test) with N files, and at least N pieces that span 2 files.
|
|
|
|
let piece_length: u32 = 16384 * 2; // TODO: figure out if this should be multiple of chunk size or not
|
|
let file_length: usize = 1000 * 1000;
|
|
let num_files: usize = 64;
|
|
|
|
let tempdir =
|
|
create_default_random_dir_with_torrents(num_files, file_length, Some("rqbit_e2e"));
|
|
let torrent_file = create_torrent(
|
|
dbg!(tempdir.path()),
|
|
crate::CreateTorrentOptions {
|
|
piece_length: Some(piece_length),
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let num_servers = std::env::var("E2E_NUM_SERVERS")
|
|
.ok()
|
|
.and_then(|v| v.parse().ok())
|
|
.unwrap_or(128u8);
|
|
|
|
let torrent_file_bytes = torrent_file.as_bytes().unwrap();
|
|
let mut futs = Vec::new();
|
|
|
|
// 2. Start N servers that are serving that torrent, and return their IP:port combos.
|
|
// Disable DHT on each.
|
|
for i in 0..num_servers {
|
|
let torrent_file_bytes = torrent_file_bytes.clone();
|
|
let tempdir = tempdir.path().to_owned();
|
|
let drop_checks = drop_checks.clone();
|
|
let fut = spawn(
|
|
async move {
|
|
let peer_id = TestPeerMetadata {
|
|
server_id: i,
|
|
max_random_sleep_ms: rand::thread_rng().gen_range(0u8..16),
|
|
}
|
|
.as_peer_id();
|
|
let listen_range_start = 15100u16 + i as u16;
|
|
let listen_range_end = listen_range_start + 1;
|
|
let listen_range = listen_range_start..listen_range_end;
|
|
let session = crate::Session::new_with_opts(
|
|
std::env::temp_dir().join("does_not_exist"),
|
|
SessionOptions {
|
|
disable_dht: true,
|
|
disable_dht_persistence: true,
|
|
dht_config: None,
|
|
peer_id: Some(peer_id),
|
|
peer_opts: None,
|
|
listen_port_range: Some(listen_range),
|
|
enable_upnp_port_forwarding: false,
|
|
default_storage_factory: None,
|
|
defer_writes_up_to: None,
|
|
root_span: Some(error_span!(parent: None, "server", id = i)),
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await
|
|
.context("error starting session")?;
|
|
|
|
drop_checks.add(&session, format!("server session {i}"));
|
|
|
|
info!("started session");
|
|
|
|
let handle = session
|
|
.add_torrent(
|
|
crate::AddTorrent::TorrentFileBytes(torrent_file_bytes),
|
|
Some(AddTorrentOptions {
|
|
overwrite: true,
|
|
output_folder: Some(tempdir.to_str().unwrap().to_owned()),
|
|
..Default::default()
|
|
}),
|
|
)
|
|
.await
|
|
.context("error adding torrent")?;
|
|
let h = handle.into_handle().context("into_handle()")?;
|
|
let mut interval = interval(Duration::from_millis(100));
|
|
|
|
info!("added torrent");
|
|
loop {
|
|
interval.tick().await;
|
|
let is_live = h
|
|
.with_state(|s| match s {
|
|
crate::ManagedTorrentState::Initializing(_) => Ok(false),
|
|
crate::ManagedTorrentState::Live(l) => {
|
|
if !l.is_finished() {
|
|
bail!("torrent went live, but expected it to be finished");
|
|
}
|
|
drop_checks.add(l, format!("server {i} live"));
|
|
Ok(true)
|
|
}
|
|
crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"),
|
|
_ => bail!("broken state"),
|
|
})
|
|
.context("error checking for torrent liveness")?;
|
|
if is_live {
|
|
break;
|
|
}
|
|
}
|
|
info!("torrent is live");
|
|
Ok::<_, anyhow::Error>(SocketAddr::new(
|
|
std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
|
session
|
|
.tcp_listen_port()
|
|
.context("expected session.tcp_listen_port() to be set")?,
|
|
))
|
|
}
|
|
.instrument(error_span!("server", id = i)),
|
|
);
|
|
futs.push(timeout(Duration::from_secs(30), fut));
|
|
}
|
|
|
|
let mut peers = Vec::new();
|
|
for (id, peer) in futures::future::join_all(futs)
|
|
.await
|
|
.into_iter()
|
|
.enumerate()
|
|
{
|
|
let peer = peer
|
|
.with_context(|| format!("join error, server={id}"))
|
|
.unwrap()
|
|
.with_context(|| format!("timeout, server={id}"))
|
|
.unwrap()
|
|
.with_context(|| format!("server couldn't start, server={id}"))
|
|
.unwrap();
|
|
peers.push(peer);
|
|
}
|
|
|
|
info!("started all servers, starting client");
|
|
|
|
let client_iters = std::env::var("E2E_CLIENT_ITERS")
|
|
.ok()
|
|
.and_then(|v| v.parse().ok())
|
|
.unwrap_or(1usize);
|
|
|
|
let magnet = Magnet::from_id20(torrent_file.info_hash(), Vec::new()).to_string();
|
|
|
|
// 3. Start a client with the initial peers, and download the file.
|
|
for _ in 0..client_iters {
|
|
let root = tempfile::TempDir::with_prefix("rqbit_e2e_client").unwrap();
|
|
let outdir = root.path().join("out");
|
|
let session_persistence = root.path().join("session");
|
|
let session = Session::new_with_opts(
|
|
outdir.to_owned(),
|
|
SessionOptions {
|
|
disable_dht: true,
|
|
disable_dht_persistence: true,
|
|
dht_config: None,
|
|
persistence: Some(SessionPersistenceConfig::Json {
|
|
folder: Some(session_persistence),
|
|
}),
|
|
fastresume: true,
|
|
listen_port_range: None,
|
|
enable_upnp_port_forwarding: false,
|
|
root_span: Some(error_span!("client")),
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
drop_checks.add(&session, "client session");
|
|
|
|
info!("started client session");
|
|
|
|
let (id, handle) = {
|
|
let r = session
|
|
.add_torrent(
|
|
crate::AddTorrent::Url((&magnet).into()),
|
|
Some(AddTorrentOptions {
|
|
initial_peers: Some(peers.clone()),
|
|
// only_files: Some(vec![0]),
|
|
overwrite: false,
|
|
..Default::default()
|
|
}),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
match r {
|
|
AddTorrentResponse::AlreadyManaged(_, _) => todo!(),
|
|
AddTorrentResponse::ListOnly(_) => todo!(),
|
|
AddTorrentResponse::Added(id, h) => (id, h),
|
|
}
|
|
};
|
|
|
|
info!("added handle");
|
|
|
|
{
|
|
let stats_printer = {
|
|
let handle = handle.clone();
|
|
async move {
|
|
let mut interval = interval(Duration::from_millis(100));
|
|
|
|
loop {
|
|
interval.tick().await;
|
|
let stats = handle.stats();
|
|
let live = match &stats.live {
|
|
Some(live) => live,
|
|
None => continue,
|
|
};
|
|
let pstats = &live.snapshot.peer_stats;
|
|
|
|
info!(
|
|
progress_percent =
|
|
format!("{}", stats.progress_percent_human_readable()),
|
|
peers = format!("{:?}", pstats),
|
|
);
|
|
}
|
|
}
|
|
}
|
|
.instrument(error_span!("stats_printer"));
|
|
|
|
let timeout = timeout(Duration::from_secs(180), handle.wait_until_completed());
|
|
|
|
tokio::pin!(stats_printer);
|
|
tokio::pin!(timeout);
|
|
|
|
let mut stats_finished = false;
|
|
loop {
|
|
tokio::select! {
|
|
r = &mut timeout => {
|
|
r.unwrap().unwrap();
|
|
break;
|
|
}
|
|
_ = &mut stats_printer, if !stats_finished => {
|
|
stats_finished = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("handle is completed");
|
|
tokio::time::timeout(Duration::from_secs(5), session.delete(id.into(), false))
|
|
.await
|
|
.context("timeout deleting torrent")
|
|
.unwrap()
|
|
.context("error deleting")
|
|
.unwrap();
|
|
|
|
info!("deleted handle");
|
|
|
|
// 4. After downloading, recheck its integrity.
|
|
let handle = session
|
|
.add_torrent(
|
|
crate::AddTorrent::TorrentFileBytes(torrent_file_bytes.clone()),
|
|
Some(AddTorrentOptions {
|
|
paused: true,
|
|
overwrite: true,
|
|
..Default::default()
|
|
}),
|
|
)
|
|
.await
|
|
.unwrap()
|
|
.into_handle()
|
|
.unwrap();
|
|
|
|
info!("re-added handle");
|
|
|
|
timeout(Duration::from_secs(30), async {
|
|
let mut interval = interval(Duration::from_millis(100));
|
|
loop {
|
|
interval.tick().await;
|
|
let b = handle
|
|
.with_state(|s| match s {
|
|
crate::ManagedTorrentState::Initializing(_) => Ok(false),
|
|
crate::ManagedTorrentState::Paused(p) => {
|
|
assert_eq!(p.chunk_tracker.get_hns().needed_bytes, 0);
|
|
Ok(true)
|
|
}
|
|
_ => bail!("bugged state"),
|
|
})
|
|
.unwrap();
|
|
if b {
|
|
break;
|
|
}
|
|
}
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
tokio::time::timeout(
|
|
Duration::from_secs(5),
|
|
session.delete(handle.id().into(), true),
|
|
)
|
|
.await
|
|
.context("timeout")
|
|
.unwrap()
|
|
.context("error deleting")
|
|
.unwrap();
|
|
|
|
// Ensure the files were deleted from the filesystem.
|
|
let d = std::fs::read_dir(&outdir)
|
|
.context("read_dir outdir")
|
|
.unwrap();
|
|
assert_eq!(
|
|
d.into_iter().count(),
|
|
0,
|
|
"{outdir:?} was not empty after deletion"
|
|
);
|
|
|
|
info!("all good");
|
|
}
|
|
}
|