From 5942e6a9d53b575595794b1c2e4ddde19b099468 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 1 Jul 2021 19:17:44 +0100 Subject: [PATCH] Updates --- Cargo.lock | 44 ------- Cargo.toml | 3 +- crates/librqbit/Cargo.toml | 15 ++- crates/librqbit/src/file_ops.rs | 15 ++- crates/librqbit/src/peer_connection.rs | 162 +++++++++++++------------ crates/librqbit/src/serde_bencode.rs | 6 +- crates/librqbit/src/sha1w.rs | 8 ++ crates/librqbit/src/spawn_utils.rs | 23 +++- crates/librqbit/src/torrent_manager.rs | 17 ++- crates/librqbit/src/torrent_state.rs | 10 +- crates/librqbit/src/type_aliases.rs | 9 ++ src/main.rs | 22 +++- 12 files changed, 186 insertions(+), 148 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 534a12b..86ced0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,24 +148,6 @@ dependencies = [ "syn", ] -[[package]] -name = "commoncrypto" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d056a8586ba25a1e4d61cb090900e495952c7886786fc55f909ab2f819b69007" -dependencies = [ - "commoncrypto-sys", -] - -[[package]] -name = "commoncrypto-sys" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fed34f46747aa73dfaa578069fd8279d2818ade2b55f38f22a9401c7f4083e2" -dependencies = [ - "libc", -] - [[package]] name = "core-foundation" version = "0.9.1" @@ -191,18 +173,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crypto-hash" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a77162240fd97248d19a564a565eb563a3f592b386e4136fb300909e67dddca" -dependencies = [ - "commoncrypto", - "hex", - "openssl", - "winapi", -] - [[package]] name = "digest" version = "0.9.0" @@ -474,12 +444,6 @@ dependencies = [ "libc", ] -[[package]] -name = "hex" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" - [[package]] name = "http" version = "0.2.4" @@ -640,7 +604,6 @@ dependencies = [ "bincode", "bitvec", "byteorder", - "crypto-hash", "futures", "log", "openssl", @@ -648,7 +611,6 @@ dependencies = [ "rand 0.8.4", "reqwest", "serde", - "sha1", "size_format", "tokio", "urlencoding", @@ -1323,12 +1285,6 @@ dependencies = [ "opaque-debug", ] -[[package]] -name = "sha1" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" - [[package]] name = "size_format" version = "1.0.2" diff --git a/Cargo.toml b/Cargo.toml index 53d26d7..942b6e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ version = "0.1.0" authors = ["Igor Katson "] edition = "2018" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +rt-single-thread = [] [dependencies] librqbit = {path="./crates/librqbit"} diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 55c5040..106bdca 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -3,6 +3,13 @@ name = "librqbit" version = "0.1.0" authors = ["Igor Katson "] edition = "2018" +default-features = ["sha1-system"] + +[features] +default = ["sha1-openssl"] +sha1-system = ["crypto-hash"] +sha1-openssl = ["openssl"] +sha1-rust = ["sha1"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -10,7 +17,7 @@ edition = "2018" tokio = {version = "1", features = ["macros", "rt-multi-thread"]} serde = {version = "1", features=["derive"]} anyhow = "1" -sha1 = "0.6" + reqwest = "0.11" urlencoding = "1" byteorder = "1" @@ -20,9 +27,11 @@ parking_lot = "0.11" log = "0.4" size_format = "1" rand = "0.8" -openssl = "0.10" warp = "0.3" -crypto-hash = "0.3" + +openssl = {version="0.10", optional=true} +crypto-hash = {version="0.3", optional=true} +sha1 = {version = "0.6", optional=true} uuid = {version = "0.8", features = ["v4"]} futures = "0.3" diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index 7c52879..6531d39 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -1,6 +1,7 @@ use std::{ fs::File, io::{Read, Seek, SeekFrom, Write}, + marker::PhantomData, sync::Arc, }; @@ -12,7 +13,7 @@ use crate::{ buffers::ByteString, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, peer_binary_protocol::Piece, - sha1w::{self, ISha1}, + sha1w::ISha1, torrent_metainfo::{FileIteratorName, TorrentMetaV1Owned}, type_aliases::{PeerHandle, BF}, }; @@ -24,6 +25,7 @@ pub struct InitialCheckResults { pub needed_bytes: u64, } +#[inline(never)] pub fn update_hash_from_file( file: &mut File, hash: &mut Sha1, @@ -46,13 +48,14 @@ pub fn update_hash_from_file( Ok(()) } -pub struct FileOps<'a> { +pub struct FileOps<'a, Sha1> { torrent: &'a TorrentMetaV1Owned, files: &'a [Arc>], lengths: &'a Lengths, + phantom_data: PhantomData, } -impl<'a> FileOps<'a> { +impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { pub fn new( torrent: &'a TorrentMetaV1Owned, files: &'a [Arc>], @@ -62,6 +65,7 @@ impl<'a> FileOps<'a> { torrent, files, lengths, + phantom_data: PhantomData, } } @@ -121,7 +125,7 @@ impl<'a> FileOps<'a> { let mut read_buffer = vec![0u8; 65536]; for piece_info in self.lengths.iter_piece_infos() { - let mut computed_hash = sha1w::Sha1System::new(); + let mut computed_hash = Sha1Impl::new(); let mut piece_remaining = piece_info.len as usize; let mut some_files_broken = false; let mut at_least_one_file_required = current_file.full_file_required; @@ -215,13 +219,14 @@ impl<'a> FileOps<'a> { }) } + #[inline(never)] pub fn check_piece( &self, who_sent: PeerHandle, piece_index: ValidPieceIndex, last_received_chunk: &ChunkInfo, ) -> anyhow::Result { - let mut h = sha1w::Sha1System::new(); + let mut h = Sha1Impl::new(); let piece_length = self.lengths.piece_length(piece_index); let mut absolute_offset = self.lengths.piece_offset(piece_index); let mut buf = vec![0u8; std::cmp::min(65536, piece_length as usize)]; diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 34f0758..b68f244 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -19,7 +19,7 @@ use crate::{ }, peer_id::try_decode_peer_id, peer_state::InflightRequest, - spawn_utils::{spawn, spawn_block_in_place}, + spawn_utils::{spawn, BlockingSpawner}, torrent_state::TorrentState, type_aliases::PeerHandle, }; @@ -33,11 +33,12 @@ pub enum WriterRequest { #[derive(Clone)] pub struct PeerConnection { state: Arc, + spawner: BlockingSpawner, } impl PeerConnection { - pub fn new(state: Arc) -> Self { - PeerConnection { state } + pub fn new(state: Arc, spawner: BlockingSpawner) -> Self { + PeerConnection { state, spawner } } pub fn into_state(self) -> Arc { self.state @@ -128,14 +129,15 @@ impl PeerConnection { let preamble_len = serialize_piece_preamble(&chunk, &mut buf); let full_len = preamble_len + chunk.size as usize; buf.resize(full_len, 0); - spawn_block_in_place(|| { - this.state.file_ops().read_chunk( - handle, - &chunk, - &mut buf[preamble_len..], - ) - }) - .with_context(|| format!("error reading chunk {:?}", chunk))?; + this.spawner + .spawn_block_in_place(|| { + this.state.file_ops().read_chunk( + handle, + &chunk, + &mut buf[preamble_len..], + ) + }) + .with_context(|| format!("error reading chunk {:?}", chunk))?; uploaded_add = Some(chunk.size); full_len } @@ -544,79 +546,81 @@ impl PeerConnection { // to prevent deadlocks. drop(g); - spawn_block_in_place(move || { - let index = piece.index; + self.spawner + .spawn_block_in_place(move || { + let index = piece.index; - // TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what - // should we really do? If we unmark it, it will get requested forever... - // - // So let's just unwrap and abort. - self.state - .file_ops() - .write_chunk(handle, &piece, &chunk_info) - .expect("expected to be able to write to disk"); + // TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what + // should we really do? If we unmark it, it will get requested forever... + // + // So let's just unwrap and abort. + self.state + .file_ops() + .write_chunk(handle, &piece, &chunk_info) + .expect("expected to be able to write to disk"); - let full_piece_download_time = match full_piece_download_time { - Some(t) => t, - None => return Ok(()), - }; + let full_piece_download_time = match full_piece_download_time { + Some(t) => t, + None => return Ok(()), + }; - match self - .state - .file_ops() - .check_piece(handle, chunk_info.piece_index, &chunk_info) - .with_context(|| format!("error checking piece={}", index))? - { - true => { - let piece_len = self.state.lengths.piece_length(chunk_info.piece_index) as u64; - self.state - .stats - .downloaded_and_checked - .fetch_add(piece_len, Ordering::Relaxed); - self.state - .stats - .have - .fetch_add(piece_len, Ordering::Relaxed); - self.state - .stats - .downloaded_pieces - .fetch_add(1, Ordering::Relaxed); - self.state - .stats - .downloaded_pieces - .fetch_add(1, Ordering::Relaxed); - self.state.stats.total_piece_download_ms.fetch_add( - full_piece_download_time.as_millis() as u64, - Ordering::Relaxed, - ); - self.state - .locked - .write() - .chunks - .mark_piece_downloaded(chunk_info.piece_index); + match self + .state + .file_ops() + .check_piece(handle, chunk_info.piece_index, &chunk_info) + .with_context(|| format!("error checking piece={}", index))? + { + true => { + let piece_len = + self.state.lengths.piece_length(chunk_info.piece_index) as u64; + self.state + .stats + .downloaded_and_checked + .fetch_add(piece_len, Ordering::Relaxed); + self.state + .stats + .have + .fetch_add(piece_len, Ordering::Relaxed); + self.state + .stats + .downloaded_pieces + .fetch_add(1, Ordering::Relaxed); + self.state + .stats + .downloaded_pieces + .fetch_add(1, Ordering::Relaxed); + self.state.stats.total_piece_download_ms.fetch_add( + full_piece_download_time.as_millis() as u64, + Ordering::Relaxed, + ); + self.state + .locked + .write() + .chunks + .mark_piece_downloaded(chunk_info.piece_index); - debug!( - "piece={} successfully downloaded and verified from {}", - index, handle - ); + debug!( + "piece={} successfully downloaded and verified from {}", + index, handle + ); - self.state.maybe_transmit_haves(chunk_info.piece_index); - } - false => { - warn!( - "checksum for piece={} did not validate, came from {}", - index, handle - ); - self.state - .locked - .write() - .chunks - .mark_piece_broken(chunk_info.piece_index); - } - }; - Ok::<_, anyhow::Error>(()) - }) - .with_context(|| format!("error processing received chunk {:?}", chunk_info))?; + self.state.maybe_transmit_haves(chunk_info.piece_index); + } + false => { + warn!( + "checksum for piece={} did not validate, came from {}", + index, handle + ); + self.state + .locked + .write() + .chunks + .mark_piece_broken(chunk_info.piece_index); + } + }; + Ok::<_, anyhow::Error>(()) + }) + .with_context(|| format!("error processing received chunk {:?}", chunk_info))?; Ok(()) } } diff --git a/crates/librqbit/src/serde_bencode.rs b/crates/librqbit/src/serde_bencode.rs index 1f854d3..a770dae 100644 --- a/crates/librqbit/src/serde_bencode.rs +++ b/crates/librqbit/src/serde_bencode.rs @@ -7,6 +7,8 @@ use std::marker::PhantomData; use crate::buffers::ByteBuf; use crate::buffers::ByteString; use crate::clone_to_owned::CloneToOwned; +use crate::sha1w::ISha1; +use crate::type_aliases::Sha1; pub struct BencodeDeserializer<'de> { buf: &'de [u8], @@ -536,9 +538,9 @@ impl<'a, 'de> serde::de::MapAccess<'de> for MapAccess<'a, 'de> { let value = seed.deserialize(&mut *self.de)?; if self.de.is_torrent_info && self.de.field_context.as_slice() == [ByteBuf(b"info")] { let len = self.de.buf.as_ptr() as usize - buf_before.as_ptr() as usize; - let mut hash = sha1::Sha1::new(); + let mut hash = Sha1::new(); hash.update(&buf_before[..len]); - let digest = hash.digest().bytes(); + let digest = hash.finish(); self.de.torrent_info_digest = Some(digest) } self.de.field_context.pop(); diff --git a/crates/librqbit/src/sha1w.rs b/crates/librqbit/src/sha1w.rs index 976a1e3..e96ad99 100644 --- a/crates/librqbit/src/sha1w.rs +++ b/crates/librqbit/src/sha1w.rs @@ -12,10 +12,12 @@ pub trait ISha1 { fn finish(self) -> [u8; 20]; } +#[cfg(feature = "sha1-rust")] pub struct Sha1Rust { inner: sha1::Sha1, } +#[cfg(feature = "sha1-rust")] impl ISha1 for Sha1Rust { fn new() -> Self { Sha1Rust { @@ -31,9 +33,13 @@ impl ISha1 for Sha1Rust { self.inner.digest().bytes() } } + +#[cfg(feature = "sha1-openssl")] pub struct Sha1Openssl { inner: openssl::sha::Sha1, } + +#[cfg(feature = "sha1-openssl")] impl ISha1 for Sha1Openssl { fn new() -> Self { Self { @@ -50,10 +56,12 @@ impl ISha1 for Sha1Openssl { } } +#[cfg(feature = "sha1-system")] pub struct Sha1System { inner: crypto_hash::Hasher, } +#[cfg(feature = "sha1-system")] impl ISha1 for Sha1System { fn new() -> Self { Self { diff --git a/crates/librqbit/src/spawn_utils.rs b/crates/librqbit/src/spawn_utils.rs index 1679488..1b52b7c 100644 --- a/crates/librqbit/src/spawn_utils.rs +++ b/crates/librqbit/src/spawn_utils.rs @@ -19,9 +19,22 @@ pub fn spawn( }); } -pub fn spawn_block_in_place R, R>(f: F) -> R { - // Have this wrapper so that it's easy to switch to just f() when - // using tokio's single-threaded runtime. Single-threaded runtime is - // easier to read with time profilers. - tokio::task::block_in_place(f) +#[derive(Clone, Copy, Debug)] +pub struct BlockingSpawner { + allow_tokio_block_in_place: bool, +} + +impl BlockingSpawner { + pub fn new(allow_tokio_block_in_place: bool) -> Self { + Self { + allow_tokio_block_in_place, + } + } + pub fn spawn_block_in_place R, R>(&self, f: F) -> R { + if self.allow_tokio_block_in_place { + return tokio::task::block_in_place(f); + } + + return f(); + } } diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index ea47a85..dd68da3 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -21,11 +21,12 @@ use crate::{ file_ops::FileOps, http_api::make_and_run_http_api, lengths::Lengths, - spawn_utils::spawn, + spawn_utils::{spawn, BlockingSpawner}, speed_estimator::SpeedEstimator, torrent_metainfo::TorrentMetaV1Owned, torrent_state::{AtomicStats, TorrentState, TorrentStateLocked}, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, + type_aliases::Sha1, }; pub struct TorrentManagerBuilder { torrent: TorrentMetaV1Owned, @@ -33,6 +34,7 @@ pub struct TorrentManagerBuilder { output_folder: PathBuf, only_files: Option>, force_tracker_interval: Option, + spawner: Option, } impl TorrentManagerBuilder { @@ -43,6 +45,7 @@ impl TorrentManagerBuilder { output_folder: output_folder.as_ref().into(), only_files: None, force_tracker_interval: None, + spawner: None, } } @@ -61,6 +64,11 @@ impl TorrentManagerBuilder { self } + pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self { + self.spawner = Some(spawner); + self + } + pub async fn start_manager(self) -> anyhow::Result { TorrentManager::start( self.torrent, @@ -68,6 +76,7 @@ impl TorrentManagerBuilder { self.overwrite, self.only_files, self.force_tracker_interval, + self.spawner.unwrap_or(BlockingSpawner::new(true)), ) } } @@ -114,6 +123,7 @@ impl TorrentManager { overwrite: bool, only_files: Option>, force_tracker_interval: Option, + spawner: BlockingSpawner, ) -> anyhow::Result { let files = { let mut files = @@ -155,8 +165,8 @@ impl TorrentManager { debug!("computed lengths: {:?}", &lengths); info!("Doing initial checksum validation, this might take a while..."); - let initial_check_results = - FileOps::new(&torrent, &files, &lengths).initial_check(only_files.as_deref())?; + let initial_check_results = FileOps::::new(&torrent, &files, &lengths) + .initial_check(only_files.as_deref())?; info!( "Initial check results: have {}, needed {}", @@ -185,6 +195,7 @@ impl TorrentManager { }, needed: initial_check_results.needed_bytes, lengths, + spawner, }); let estimator = Arc::new(SpeedEstimator::new(5)); diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 6e5ff9f..905e7f0 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -21,9 +21,9 @@ use crate::{ peer_binary_protocol::{Handshake, Message}, peer_connection::{PeerConnection, WriterRequest}, peer_state::{LivePeerState, PeerState}, - spawn_utils::spawn, + spawn_utils::{spawn, BlockingSpawner}, torrent_metainfo::TorrentMetaV1Owned, - type_aliases::{PeerHandle, BF}, + type_aliases::{PeerHandle, Sha1, BF}, }; pub struct InflightPiece { @@ -192,10 +192,12 @@ pub struct TorrentState { pub lengths: Lengths, pub needed: u64, pub stats: AtomicStats, + + pub spawner: BlockingSpawner, } impl TorrentState { - pub fn file_ops(&self) -> FileOps<'_> { + pub fn file_ops(&self) -> FileOps<'_, Sha1> { FileOps::new(&self.torrent, &self.files, &self.lengths) } @@ -400,7 +402,7 @@ impl TorrentState { None => return false, }; - let peer_connection = PeerConnection::new(self.clone()); + let peer_connection = PeerConnection::new(self.clone(), self.spawner.clone()); spawn(format!("manage_peer({})", handle), async move { if let Err(e) = peer_connection.manage_peer(addr, handle, out_rx).await { debug!("error managing peer {}: {:#}", handle, e) diff --git a/crates/librqbit/src/type_aliases.rs b/crates/librqbit/src/type_aliases.rs index dc4de81..c01f88f 100644 --- a/crates/librqbit/src/type_aliases.rs +++ b/crates/librqbit/src/type_aliases.rs @@ -3,3 +3,12 @@ use std::net::SocketAddr; pub type BF = bitvec::vec::BitVec; pub type PeerHandle = SocketAddr; + +#[cfg(feature = "sha1-openssl")] +pub type Sha1 = crate::sha1w::Sha1Openssl; + +#[cfg(feature = "sha1-system")] +pub type Sha1 = crate::sha1w::Sha1System; + +#[cfg(feature = "sha1-rust")] +pub type Sha1 = crate::sha1w::Sha1Rust; diff --git a/src/main.rs b/src/main.rs index 31a356d..52ecab6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ use std::{fs::File, io::Read, time::Duration}; use anyhow::Context; use clap::Clap; use librqbit::{ + spawn_utils::BlockingSpawner, torrent_manager::TorrentManagerBuilder, torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Owned}, }; @@ -77,6 +78,12 @@ struct Opts { /// pretty big, e.g. 30 minutes. This can force a certain value. #[clap(short = 'i', long = "tracker-refresh-interval")] force_tracker_interval: Option, + + /// Set this flag if you want to use tokio's single threaded runtime. + /// It MAY perform better, but the main purpose is easier debugging, as time + /// profilers work better with this one. + #[clap(short, long)] + single_thread_runtime: bool, } fn compute_only_files( @@ -129,7 +136,18 @@ fn main() -> anyhow::Result<()> { init_logging(&opts); - let rt = tokio::runtime::Builder::new_multi_thread() + let (mut rt_builder, spawner) = match opts.single_thread_runtime { + true => ( + tokio::runtime::Builder::new_current_thread(), + BlockingSpawner::new(false), + ), + false => ( + tokio::runtime::Builder::new_multi_thread(), + BlockingSpawner::new(true), + ), + }; + + let rt = rt_builder .enable_time() .enable_io() // the default is 512, it can get out of hand, as this program is CPU-bound on @@ -161,7 +179,7 @@ fn main() -> anyhow::Result<()> { }; let mut builder = TorrentManagerBuilder::new(torrent, opts.output_folder); - builder.overwrite(opts.overwrite); + builder.overwrite(opts.overwrite).spawner(spawner); if let Some(only_files) = only_files { builder.only_files(only_files); }