diff --git a/crates/librqbit/src/tests/e2e_stream.rs b/crates/librqbit/src/tests/e2e_stream.rs new file mode 100644 index 0000000..05c2aeb --- /dev/null +++ b/crates/librqbit/src/tests/e2e_stream.rs @@ -0,0 +1,118 @@ +use std::{net::SocketAddr, time::Duration}; + +use anyhow::Context; +use tokio::{io::AsyncReadExt, time::timeout}; +use tracing::info; + +use crate::{ + create_torrent, storage::example::InMemoryExampleStorageFactory, AddTorrent, + CreateTorrentOptions, Session, +}; + +use super::test_util::create_default_random_dir_with_torrents; + +async fn e2e_stream() -> anyhow::Result<()> { + let files = create_default_random_dir_with_torrents(1, 8192, Some("test_e2e_stream")); + let torrent = create_torrent( + files.path(), + CreateTorrentOptions { + name: None, + piece_length: Some(1024), + }, + ) + .await?; + + let orig_content = std::fs::read(files.path().join("0.data")).unwrap(); + + let server_session = Session::new_with_opts( + "/does-not-matter".into(), + crate::SessionOptions { + disable_dht: true, + persistence: false, + listen_port_range: Some(16001..16100), + enable_upnp_port_forwarding: false, + ..Default::default() + }, + ) + .await + .context("error creating server session")?; + + info!("created server session"); + + timeout( + Duration::from_secs(5), + server_session + .add_torrent( + AddTorrent::from_bytes(torrent.as_bytes()?), + Some(crate::AddTorrentOptions { + paused: false, + output_folder: Some(files.path().to_str().unwrap().to_owned()), + overwrite: true, + ..Default::default() + }), + ) + .await? + .into_handle() + .unwrap() + .wait_until_completed(), + ) + .await? + .context("error adding torrent")?; + + info!("server torrent was completed"); + + let peer = SocketAddr::new( + "127.0.0.1".parse().unwrap(), + server_session.tcp_listen_port().unwrap(), + ); + + let client_session = Session::new_with_opts( + "/does-not-matter".into(), + crate::SessionOptions { + disable_dht: true, + persistence: false, + listen_port_range: None, + enable_upnp_port_forwarding: false, + ..Default::default() + }, + ) + .await?; + + info!("created client session"); + + let client_handle = client_session + .add_torrent( + AddTorrent::from_bytes(torrent.as_bytes()?), + Some(crate::AddTorrentOptions { + paused: false, + storage_factory: Some(Box::new(InMemoryExampleStorageFactory {})), + initial_peers: Some(vec![peer]), + // Download no files automatically. + only_files: Some(vec![]), + ..Default::default() + }), + ) + .await? + .into_handle() + .unwrap(); + + client_handle.wait_until_initialized().await?; + + info!("client torrent initialized, starting stream"); + + let mut stream = client_handle.stream(0)?; + let mut buf = Vec::::with_capacity(8192); + stream.read_to_end(&mut buf).await?; + + if buf != orig_content { + panic!("contents differ") + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_e2e_stream() -> anyhow::Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + timeout(Duration::from_secs(10), e2e_stream()).await? +} diff --git a/crates/librqbit/src/tests/mod.rs b/crates/librqbit/src/tests/mod.rs index 8a9bd01..4631428 100644 --- a/crates/librqbit/src/tests/mod.rs +++ b/crates/librqbit/src/tests/mod.rs @@ -1,2 +1,3 @@ mod e2e; +mod e2e_stream; pub mod test_util;