From 34ea2255606f82d30f8e09f4e4c39b3cb1d9daed Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 26 Jun 2021 18:13:46 +0100 Subject: [PATCH] Updating --- Cargo.lock | 90 +++++++++++++ TODO.md | 2 - crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/chunk_tracker.rs | 23 +++- crates/librqbit/src/torrent_manager.rs | 170 +++++++++++++------------ 5 files changed, 202 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f271561..af9f801 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -298,6 +298,15 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd" +dependencies = [ + "typenum", +] + [[package]] name = "getrandom" version = "0.2.3" @@ -509,6 +518,7 @@ dependencies = [ "reqwest", "serde", "sha1", + "size_format", "tokio", "urlencoding", "uuid", @@ -599,6 +609,70 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36" +dependencies = [ + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-iter" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.0" @@ -1003,6 +1077,16 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" +[[package]] +name = "size_format" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed5f6ab2122c6dec69dca18c72fa4590a27e581ad20d44960fe74c032a0b23b" +dependencies = [ + "generic-array", + "num", +] + [[package]] name = "slab" version = "0.4.3" @@ -1179,6 +1263,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "typenum" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06" + [[package]] name = "unicode-bidi" version = "0.3.5" diff --git a/TODO.md b/TODO.md index 6c7ca83..af2ee34 100644 --- a/TODO.md +++ b/TODO.md @@ -1,6 +1,4 @@ - [ ] Selective file downloading (mostly done) - - [ ] Seeking optimization - - If a file is not needed, no need to check its hash - [ ] Proper counting of how much is left, and how much is downloaded - [ ] Refactor "needed pieces" into a bitfield diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 4ef96de..8c0abb9 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -18,6 +18,7 @@ bincode = "1" bitvec = "0.22" parking_lot = "0.11" log = "0.4" +size_format = "1" uuid = {version = "0.8", features = ["v4"]} futures = "0.3" diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 3146181..2358592 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -8,11 +8,27 @@ use crate::{ }; pub struct ChunkTracker { + // This forms the basis of a "queue" to pull from. + // It's set to 1 if we need a piece, but the moment we start requesting a peer, + // it's set to 0. + + // Better to rename into piece_queue or smth, and maybe use some other form of a queue. needed_pieces: BF, + + // This has a bit set per each chunk (block) that we have written to the output file. + // It doesn't mean it's valid yet. Used to track how much is left in each piece. chunk_status: BF, + + // These are the pieces that we actually have, fully checked and downloaded. + have: BF, + lengths: Lengths, } +// TODO: this should be redone from "have" pieces, not from "needed" pieces. +// Needed pieces are the ones we need to download, not necessarily the ones we have. +// E.g. we might have more pieces, but the client asks to download only some files +// partially. fn compute_chunk_status(lengths: &Lengths, needed_pieces: &BF) -> BF { let required_size = lengths.chunk_bitfield_bytes(); let vec = vec![0u8; required_size]; @@ -35,11 +51,12 @@ fn compute_chunk_status(lengths: &Lengths, needed_pieces: &BF) -> BF { } impl ChunkTracker { - pub fn new(needed_pieces: BF, lengths: Lengths) -> Self { + pub fn new(needed_pieces: BF, have_pieces: BF, lengths: Lengths) -> Self { Self { chunk_status: compute_chunk_status(&lengths, &needed_pieces), needed_pieces, lengths, + have: have_pieces, } } pub fn get_needed_pieces(&self) -> &BF { @@ -60,6 +77,10 @@ impl ChunkTracker { .unwrap_or_default() } + pub fn mark_piece_downloaded(&mut self, idx: ValidPieceIndex) { + self.have.set(idx.get() as usize, true) + } + // return true if the whole piece is marked downloaded pub fn mark_chunk_downloaded(&mut self, piece: &Piece) -> Option { let chunk_info = self.lengths.chunk_info_from_received_piece(piece)?; diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 942d8fe..deaa541 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -17,6 +17,7 @@ use futures::{stream::FuturesUnordered, StreamExt}; use log::{debug, error, info, trace, warn}; use parking_lot::{Mutex, RwLock}; use reqwest::Url; +use size_format::SizeFormatterBinary as SF; use tokio::sync::{mpsc::Sender, Notify, Semaphore}; use crate::{ @@ -201,7 +202,10 @@ struct TorrentManagerInner { info_hash: [u8; 20], peer_id: [u8; 20], incoming_tx: tokio::sync::mpsc::Sender<(PeerHandle, MessageOwned)>, - downloaded: AtomicU64, + + have: AtomicU64, + downloaded_and_checked: AtomicU64, + needed: u64, uploaded: AtomicU64, fetched_bytes: AtomicU64, lengths: Lengths, @@ -280,14 +284,25 @@ fn update_hash_from_file( Ok(()) } -fn compute_needed_pieces( +struct InitialCheckResults { + needed_pieces: BF, + have_pieces: BF, + have_bytes: u64, + needed_bytes: u64, +} + +fn initial_check( torrent: &TorrentMetaV1Owned, files: &[Arc>], only_files: Option<&[usize]>, lengths: &Lengths, -) -> anyhow::Result { - let needed_pieces = vec![0u8; lengths.piece_bitfield_bytes()]; - let mut needed_pieces = BF::from_vec(needed_pieces); +) -> anyhow::Result { + let mut needed_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]); + let mut have_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]); + + let mut have_bytes = 0u64; + let mut needed_bytes = 0u64; + struct CurrentFile<'a> { index: usize, fd: &'a Arc>, @@ -333,33 +348,6 @@ fn compute_needed_pieces( let mut read_buffer = vec![0u8; 65536]; for piece_info in lengths.iter_piece_infos() { - // We need to compute the hash (and afterwards mark the piece as NOT needed) if ANY of the following are true - // - the file is required - // - the current piece is required (i.e. it's a part of some other file that is required) - - // This means, that for an easy implementation: - // - we ALWAYS try to compute the hash from existing files - // - after the whole piece was processed, we mark the piece needed if: - // - at least one file that the piece owns was required - // - and (there were errors OR the hash does not match) - // - // If there's an error, it's fine only if none of the files was required. - - // let mut seek: Option = None; - - // Optimization for a common case: if the piece is wholy in the file, and the file is not required, continue - - // if !current_file.full_file_required && current_file.remaining() >= piece_info.len as u64 { - // seek = match seek { - // None => Some(piece_info.len as u64), - // Some(s) => { - // current_file.mark_processed_bytes(piece_info.len as u64); - // Some(s + piece_info.len as u64) - // } - // }; - // continue; - // } - let mut computed_hash = sha1::Sha1::new(); let mut piece_remaining = piece_info.len as usize; let mut piece_is_needed = false; @@ -394,23 +382,6 @@ fn compute_needed_pieces( let mut fd = current_file.fd.lock(); - // if let Some(offset) = seek.take() { - // match fd.seek(SeekFrom::Start(offset)) { - // Ok(v) => { - // assert_eq!(v, offset) - // } - // Err(e) => { - // debug!( - // "error seeking in file {} to {}: {:#}", - // current_file.index, offset, &e - // ); - // piece_is_needed = true; - // current_file.is_broken = true; - // continue; - // } - // } - // } - fd.seek(SeekFrom::Start(pos)).unwrap(); if let Err(err) = update_hash_from_file( &mut fd, @@ -427,14 +398,6 @@ fn compute_needed_pieces( } } - if !at_least_one_file_required { - trace!( - "piece {} is not required by any of the requested files, ignoring", - piece_info.piece_index - ); - continue; - } - if piece_is_needed { trace!( "piece {} had errors, marking as needed", @@ -453,16 +416,31 @@ fn compute_needed_pieces( "piece {} is fine, not marking as needed", piece_info.piece_index ); + have_bytes += piece_info.len as u64; + have_pieces.set(piece_info.piece_index.get() as usize, true); } else { - trace!( - "piece {} hash does not match, marking as needed", - piece_info.piece_index - ); - needed_pieces.set(piece_info.piece_index.get() as usize, true); + if !at_least_one_file_required { + trace!( + "piece {} hash does not match, marking as needed", + piece_info.piece_index + ); + needed_bytes += piece_info.len as u64; + needed_pieces.set(piece_info.piece_index.get() as usize, true); + } else { + trace!( + "piece {} is not required by any of the requested files, ignoring", + piece_info.piece_index + ); + } } } - Ok(needed_pieces) + Ok(InitialCheckResults { + needed_pieces, + have_pieces, + have_bytes, + needed_bytes, + }) } impl TorrentManager { @@ -509,10 +487,23 @@ impl TorrentManager { let peer_id = generate_peer_id(); let lengths = make_lengths(&torrent).context("unable to compute Lengths from torrent")?; - let needed_pieces = - compute_needed_pieces(&torrent, &files, only_files.as_deref(), &lengths)?; debug!("computed lengths: {:?}", &lengths); - let chunk_tracker = ChunkTracker::new(needed_pieces, lengths); + + info!("Doing initial checksum validation, this might take a while..."); + let initial_check_results = + initial_check(&torrent, &files, only_files.as_deref(), &lengths)?; + + info!( + "Initial check results: have {}, needed {}", + SF::new(initial_check_results.have_bytes), + SF::new(initial_check_results.needed_bytes) + ); + + let chunk_tracker = ChunkTracker::new( + initial_check_results.needed_pieces, + initial_check_results.have_pieces, + lengths, + ); let (incoming_tx, incoming_rx) = tokio::sync::mpsc::channel::<(PeerHandle, MessageOwned)>(1); @@ -528,7 +519,9 @@ impl TorrentManager { })), files, incoming_tx, - downloaded: Default::default(), + have: AtomicU64::new(initial_check_results.have_bytes), + needed: initial_check_results.needed_bytes, + downloaded_and_checked: Default::default(), fetched_bytes: Default::default(), uploaded: Default::default(), lengths, @@ -547,18 +540,25 @@ impl TorrentManager { async fn stats_printer(self) -> anyhow::Result<()> { loop { let live_peers = self.inner.locked.read().peers.states.len(); - let downloaded_bytes = self.inner.downloaded.load(Ordering::Relaxed); - let downloaded = self.inner.downloaded.load(Ordering::Relaxed) / 1024 / 1024; - let fetched = self.inner.fetched_bytes.load(Ordering::Relaxed) / 1024 / 1024; - let total_length = self.inner.lengths.total_length(); - let pct = if total_length == downloaded { + let have = self.inner.have.load(Ordering::Relaxed); + let fetched = self.inner.fetched_bytes.load(Ordering::Relaxed); + let needed = self.inner.needed; + let downloaded = self.inner.downloaded_and_checked.load(Ordering::Relaxed); + let remaining = needed - downloaded; + let downloaded_pct = if downloaded == needed { 100f64 } else { - (downloaded_bytes as f64 / self.inner.lengths.total_length() as f64) * 100f64 + (downloaded as f64 / needed as f64) * 100f64 }; info!( - "Total downloaded and checked {}MiB ({:.2}%), fetched {}MiB, live peers={}", - downloaded, pct, fetched, live_peers + "Stats: downloaded {:.2}% ({}), live peers {}, fetched {}, remaining {} out of {}, total have {}", + downloaded_pct, + SF::new(downloaded), + live_peers, + SF::new(fetched), + SF::new(remaining), + SF::new(needed), + SF::new(have) ); tokio::time::sleep(Duration::from_secs(1)).await; } @@ -1150,10 +1150,18 @@ impl TorrentManager { .with_context(|| format!("error checking piece={}", index))? { true => { - this.inner.downloaded.fetch_add( - this.inner.lengths.piece_length(chunk_info.piece_index) as u64, - Ordering::Relaxed, - ); + let piece_len = + this.inner.lengths.piece_length(chunk_info.piece_index) as u64; + this.inner + .downloaded_and_checked + .fetch_add(piece_len, Ordering::Relaxed); + this.inner.have.fetch_add(piece_len, Ordering::Relaxed); + this.inner + .locked + .write() + .chunks + .mark_piece_downloaded(chunk_info.piece_index); + debug!( "piece={} successfully downloaded and verified from {}", index, handle @@ -1188,7 +1196,7 @@ impl TorrentManager { self.inner.uploaded.load(Ordering::Relaxed) } fn get_downloaded(&self) -> u64 { - self.inner.downloaded.load(Ordering::Relaxed) + self.inner.downloaded_and_checked.load(Ordering::Relaxed) } async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result { let response: reqwest::Response = reqwest::get(tracker_url).await?; @@ -1213,7 +1221,7 @@ impl TorrentManager { .unwrap_or_default() } fn get_left_to_download(&self) -> u64 { - self.get_total() - self.get_downloaded() + self.inner.needed - self.get_downloaded() } async fn single_tracker_monitor(self, mut tracker_url: Url) -> anyhow::Result<()> {