diff --git a/Cargo.lock b/Cargo.lock index 180adc9..43dd12f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -514,6 +514,7 @@ dependencies = [ "byteorder", "futures", "log", + "openssl", "parking_lot", "rand", "reqwest", diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f7f1550 --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +all: sign-release sign-debug + +sign-debug: + codesign -f --entitlements resources/debugging.entitlements -s - target/debug/rqbit + +sign-release: + codesign -f --entitlements resources/debugging.entitlements -s - target/release/rqbit \ No newline at end of file diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 6c42f09..a4ded7b 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -20,6 +20,7 @@ parking_lot = "0.11" log = "0.4" size_format = "1" rand = "0.8" +openssl = "*" uuid = {version = "0.8", features = ["v4"]} futures = "0.3" diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index b407686..c75e77c 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -12,6 +12,7 @@ use crate::{ buffers::ByteString, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, peer_binary_protocol::Piece, + sha1w::{self, ISha1}, torrent_metainfo::{FileIteratorName, TorrentMetaV1Owned}, type_aliases::{PeerHandle, BF}, }; @@ -23,9 +24,9 @@ pub struct InitialCheckResults { pub needed_bytes: u64, } -pub fn update_hash_from_file( +pub fn update_hash_from_file( file: &mut File, - hash: &mut sha1::Sha1, + hash: &mut Sha1, buf: &mut [u8], mut bytes_to_read: usize, ) -> anyhow::Result<()> { @@ -120,7 +121,7 @@ impl<'a> FileOps<'a> { let mut read_buffer = vec![0u8; 65536]; for piece_info in self.lengths.iter_piece_infos() { - let mut computed_hash = sha1::Sha1::new(); + let mut computed_hash = sha1w::Sha1Openssl::new(); let mut piece_remaining = piece_info.len as usize; let mut some_files_broken = false; let mut at_least_one_file_required = current_file.full_file_required; @@ -180,7 +181,7 @@ impl<'a> FileOps<'a> { if self .torrent .info - .compare_hash(piece_info.piece_index.get(), &computed_hash) + .compare_hash(piece_info.piece_index.get(), computed_hash.finish()) .unwrap() { trace!( @@ -220,7 +221,7 @@ impl<'a> FileOps<'a> { piece_index: ValidPieceIndex, last_received_chunk: &ChunkInfo, ) -> anyhow::Result { - let mut h = sha1::Sha1::new(); + let mut h = sha1w::Sha1Openssl::new(); let piece_length = self.lengths.piece_length(piece_index); let mut absolute_offset = self.lengths.piece_offset(piece_index); let mut buf = vec![0u8; std::cmp::min(65536, piece_length as usize)]; @@ -269,7 +270,11 @@ impl<'a> FileOps<'a> { absolute_offset = 0; } - match self.torrent.info.compare_hash(piece_index.get(), &h) { + match self + .torrent + .info + .compare_hash(piece_index.get(), h.finish()) + { Some(true) => { debug!("piece={} hash matches", piece_index); Ok(true) diff --git a/crates/librqbit/src/lengths.rs b/crates/librqbit/src/lengths.rs index 6b5a041..5ebd728 100644 --- a/crates/librqbit/src/lengths.rs +++ b/crates/librqbit/src/lengths.rs @@ -1,4 +1,4 @@ -use crate::{buffers::ByteString, constants::CHUNK_SIZE, peer_binary_protocol::Piece}; +use crate::{constants::CHUNK_SIZE, peer_binary_protocol::Piece}; const fn is_power_of_two(x: u64) -> bool { (x != 0) && ((x & (x - 1)) == 0) diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 53501eb..7991f7b 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -9,6 +9,7 @@ pub mod peer_connection; pub mod peer_id; pub mod peer_state; pub mod serde_bencode; +pub mod sha1w; pub mod spawn_utils; pub mod torrent_manager; pub mod torrent_metainfo; diff --git a/crates/librqbit/src/peer_binary_protocol.rs b/crates/librqbit/src/peer_binary_protocol.rs index a6755ff..9e844ce 100644 --- a/crates/librqbit/src/peer_binary_protocol.rs +++ b/crates/librqbit/src/peer_binary_protocol.rs @@ -5,10 +5,16 @@ use serde::{Deserialize, Serialize}; use crate::{ buffers::{ByteBuf, ByteString}, clone_to_owned::CloneToOwned, + constants::CHUNK_SIZE, lengths::ChunkInfo, }; -const PREAMBLE_LEN: usize = 5; +const INTEGER_LEN: usize = 4; +const MSGID_LEN: usize = 1; +const PREAMBLE_LEN: usize = INTEGER_LEN + MSGID_LEN; +const PIECE_MESSAGE_PREAMBLE_LEN: usize = PREAMBLE_LEN + INTEGER_LEN * 2; +pub const PIECE_MESSAGE_DEFAULT_LEN: usize = PIECE_MESSAGE_PREAMBLE_LEN + CHUNK_SIZE as usize; + const NO_PAYLOAD_MSG_LEN: usize = PREAMBLE_LEN; const PSTR_BT1: &str = "BitTorrent protocol"; @@ -56,7 +62,7 @@ pub fn serialize_piece_preamble(chunk: &ChunkInfo, mut buf: &mut [u8]) -> usize BE::write_u32(&mut buf[0..4], chunk.piece_index.get()); BE::write_u32(&mut buf[4..8], chunk.offset); - PREAMBLE_LEN + 8 + PIECE_MESSAGE_PREAMBLE_LEN } #[derive(Debug)] diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 0f59713..576f4d4 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -15,10 +15,10 @@ use crate::{ lengths::ChunkInfo, peer_binary_protocol::{ serialize_piece_preamble, Handshake, Message, MessageBorrowed, MessageDeserializeError, - MessageOwned, Piece, Request, + MessageOwned, Piece, Request, PIECE_MESSAGE_DEFAULT_LEN, }, peer_id::try_decode_peer_id, - spawn_utils::spawn, + spawn_utils::{spawn, spawn_block_in_place}, torrent_state::{InflightRequest, TorrentState}, type_aliases::PeerHandle, }; @@ -57,7 +57,7 @@ impl PeerConnection { conn.write_all(&handshake.serialize()) .await .context("error writing handshake")?; - let mut read_buf = vec![0u8; 16384 * 2]; + let mut read_buf = vec![0u8; PIECE_MESSAGE_DEFAULT_LEN * 2]; let read_bytes = conn .read(&mut read_buf) .await @@ -89,7 +89,7 @@ impl PeerConnection { let this = self.clone(); let writer = async move { - let mut buf = Vec::::new(); + let mut buf = Vec::::with_capacity(PIECE_MESSAGE_DEFAULT_LEN); let keep_alive_interval = Duration::from_secs(120); if this.state.stats.have.load(Ordering::Relaxed) > 0 { @@ -127,7 +127,7 @@ impl PeerConnection { let preamble_len = serialize_piece_preamble(&chunk, &mut buf); let full_len = preamble_len + chunk.size as usize; buf.resize(full_len, 0); - tokio::task::block_in_place(|| { + spawn_block_in_place(|| { this.state.file_ops().read_chunk( handle, &chunk, @@ -402,15 +402,15 @@ impl PeerConnection { Some(next) => next, None => { if self.state.get_left_to_download() == 0 { - info!("{}: nothing left to download, closing requester", handle); + debug!("{}: nothing left to download, closing requester", handle); return Ok(()); } if let Some(piece) = self.state.try_steal_piece(handle) { - info!("{}: stole a piece {}", handle, piece); + debug!("{}: stole a piece {}", handle, piece); piece } else { - info!("no pieces to request from {}", handle); + debug!("no pieces to request from {}", handle); #[allow(unused_must_use)] { timeout(Duration::from_secs(60), notify.notified()).await; @@ -534,7 +534,7 @@ impl PeerConnection { // to prevent deadlocks. drop(g); - tokio::task::block_in_place(move || { + spawn_block_in_place(move || { let index = piece.index; // TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what diff --git a/crates/librqbit/src/sha1w.rs b/crates/librqbit/src/sha1w.rs new file mode 100644 index 0000000..c26a17d --- /dev/null +++ b/crates/librqbit/src/sha1w.rs @@ -0,0 +1,49 @@ +// Wrapper for sha1 libraries. +// Sha1 computation is the majority of CPU usage of this library. +// openssl seems 2-3x faster, so using it for now, but +// leaving the pure-rust impl here too. Maybe someday make them +// runtime swappable. + +pub trait ISha1 { + fn new() -> Self; + fn update(&mut self, buf: &[u8]); + fn finish(self) -> [u8; 20]; +} + +pub struct Sha1Rust { + inner: sha1::Sha1, +} + +impl ISha1 for Sha1Rust { + fn new() -> Self { + Sha1Rust { + inner: sha1::Sha1::new(), + } + } + + fn update(&mut self, buf: &[u8]) { + self.inner.update(buf) + } + + fn finish(self) -> [u8; 20] { + self.inner.digest().bytes() + } +} +pub struct Sha1Openssl { + inner: openssl::sha::Sha1, +} +impl ISha1 for Sha1Openssl { + fn new() -> Self { + Self { + inner: openssl::sha::Sha1::new(), + } + } + + fn update(&mut self, buf: &[u8]) { + self.inner.update(buf) + } + + fn finish(self) -> [u8; 20] { + self.inner.finish() + } +} diff --git a/crates/librqbit/src/spawn_utils.rs b/crates/librqbit/src/spawn_utils.rs index 35a717e..1679488 100644 --- a/crates/librqbit/src/spawn_utils.rs +++ b/crates/librqbit/src/spawn_utils.rs @@ -18,3 +18,10 @@ pub fn spawn( } }); } + +pub fn spawn_block_in_place R, R>(f: F) -> R { + // Have this wrapper so that it's easy to switch to just f() when + // using tokio's single-threaded runtime. Single-threaded runtime is + // easier to read with time profilers. + tokio::task::block_in_place(f) +} diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 5648aeb..f9f6138 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -318,7 +318,7 @@ impl TorrentManager { let peer_connection = PeerConnection::new(self.inner.clone()); spawn(format!("manage_peer({})", handle), async move { if let Err(e) = peer_connection.manage_peer(addr, handle, out_rx).await { - error!("error managing peer {}: {:#}", handle, e) + debug!("error managing peer {}: {:#}", handle, e) }; peer_connection.into_state().drop_peer(handle); Ok::<_, anyhow::Error>(()) diff --git a/crates/librqbit/src/torrent_metainfo.rs b/crates/librqbit/src/torrent_metainfo.rs index f0f1d7d..69a8fec 100644 --- a/crates/librqbit/src/torrent_metainfo.rs +++ b/crates/librqbit/src/torrent_metainfo.rs @@ -132,11 +132,11 @@ impl> TorrentMetaV1Info { let expected_hash = self.pieces.deref().get(start..end)?; Some(expected_hash) } - pub fn compare_hash(&self, piece: u32, hash: &sha1::Sha1) -> Option { + pub fn compare_hash(&self, piece: u32, hash: [u8; 20]) -> Option { let start = piece as usize * 20; let end = start + 20; let expected_hash = self.pieces.deref().get(start..end)?; - Some(expected_hash == hash.digest().bytes()) + Some(expected_hash == hash) } pub fn iter_filenames_and_lengths( &self, diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 5782777..3112a91 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -276,7 +276,7 @@ impl TorrentState { } pub fn maybe_transmit_haves(&self, index: ValidPieceIndex) { - let mut unordered = FuturesUnordered::new(); + let mut futures = Vec::new(); let g = self.locked.read(); for (handle, peer_state) in g.peers.states.iter() { @@ -300,7 +300,7 @@ impl TorrentState { None => continue, }; let tx = Arc::downgrade(tx); - unordered.push(async move { + futures.push(async move { if let Some(tx) = tx.upgrade() { if tx .send(WriterRequest::Message(Message::Have(index.get()))) @@ -316,11 +316,12 @@ impl TorrentState { } } - if unordered.is_empty() { + if futures.is_empty() { trace!("no peers to transmit Have={} to, saving some work", index); return; } + let mut unordered: FuturesUnordered<_> = futures.into_iter().collect(); spawn( format!("transmit_haves(piece={}, count={})", index, unordered.len()), async move { diff --git a/resources/debugging.entitlements b/resources/debugging.entitlements new file mode 100644 index 0000000..f98654d --- /dev/null +++ b/resources/debugging.entitlements @@ -0,0 +1,10 @@ + + + + + com.apple.security.cs.debugger + + com.apple.security.get-task-allow + + + \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 101861e..85800a9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -42,6 +42,15 @@ fn torrent_from_file(filename: &str) -> anyhow::Result { .clone_to_owned()) } +#[derive(Debug, Clap)] +enum LogLevel { + Trace, + Debug, + Info, + Warn, + Error, +} + #[derive(Clap)] #[clap(version = "1.0", author = "Igor Katson ")] struct Opts { @@ -61,6 +70,9 @@ struct Opts { /// Only list the torrent metadata contents, don't do anything else. #[clap(short, long)] list: bool, + + #[clap(arg_enum, short = 'v')] + log_level: Option, } fn compute_only_files( @@ -87,15 +99,40 @@ fn compute_only_files( Ok(only_files) } -fn main() -> anyhow::Result<()> { +fn init_logging(opts: &Opts) { + match opts.log_level.as_ref() { + Some(level) => { + let level_str = match level { + LogLevel::Trace => "trace", + LogLevel::Debug => "debug", + LogLevel::Info => "info", + LogLevel::Warn => "warn", + LogLevel::Error => "error", + }; + std::env::set_var("RUST_LOG", level_str); + } + None => { + if std::env::var_os("RUST_LOG").is_none() { + std::env::set_var("RUST_LOG", "info"); + }; + } + }; pretty_env_logger::init(); +} +fn main() -> anyhow::Result<()> { let opts = Opts::parse(); + init_logging(&opts); + let rt = tokio::runtime::Builder::new_multi_thread() .enable_time() .enable_io() - // the default is 512, it can get out of hand. + // the default is 512, it can get out of hand, as this program is CPU-bound on + // hash checking. + // note: we aren't using spawn_blocking() anymore, so this doesn't apply, + // however I'm still messing around, so in case we do, let's block the number of + // spawned threads. .max_blocking_threads(8) .build()?;