This commit is contained in:
Igor Katson 2021-06-26 18:13:46 +01:00
parent 2b768a5505
commit 34ea225560
5 changed files with 202 additions and 84 deletions

90
Cargo.lock generated
View file

@ -298,6 +298,15 @@ dependencies = [
"slab",
]
[[package]]
name = "generic-array"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd"
dependencies = [
"typenum",
]
[[package]]
name = "getrandom"
version = "0.2.3"
@ -509,6 +518,7 @@ dependencies = [
"reqwest",
"serde",
"sha1",
"size_format",
"tokio",
"urlencoding",
"uuid",
@ -599,6 +609,70 @@ dependencies = [
"winapi",
]
[[package]]
name = "num"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36"
dependencies = [
"num-complex",
"num-integer",
"num-iter",
"num-rational",
"num-traits",
]
[[package]]
name = "num-complex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-iter"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
@ -1003,6 +1077,16 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
[[package]]
name = "size_format"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ed5f6ab2122c6dec69dca18c72fa4590a27e581ad20d44960fe74c032a0b23b"
dependencies = [
"generic-array",
"num",
]
[[package]]
name = "slab"
version = "0.4.3"
@ -1179,6 +1263,12 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "typenum"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06"
[[package]]
name = "unicode-bidi"
version = "0.3.5"

View file

@ -1,6 +1,4 @@
- [ ] Selective file downloading (mostly done)
- [ ] Seeking optimization
- If a file is not needed, no need to check its hash
- [ ] Proper counting of how much is left, and how much is downloaded
- [ ] Refactor "needed pieces" into a bitfield

View file

@ -18,6 +18,7 @@ bincode = "1"
bitvec = "0.22"
parking_lot = "0.11"
log = "0.4"
size_format = "1"
uuid = {version = "0.8", features = ["v4"]}
futures = "0.3"

View file

@ -8,11 +8,27 @@ use crate::{
};
pub struct ChunkTracker {
// This forms the basis of a "queue" to pull from.
// It's set to 1 if we need a piece, but the moment we start requesting a peer,
// it's set to 0.
// Better to rename into piece_queue or smth, and maybe use some other form of a queue.
needed_pieces: BF,
// This has a bit set per each chunk (block) that we have written to the output file.
// It doesn't mean it's valid yet. Used to track how much is left in each piece.
chunk_status: BF,
// These are the pieces that we actually have, fully checked and downloaded.
have: BF,
lengths: Lengths,
}
// TODO: this should be redone from "have" pieces, not from "needed" pieces.
// Needed pieces are the ones we need to download, not necessarily the ones we have.
// E.g. we might have more pieces, but the client asks to download only some files
// partially.
fn compute_chunk_status(lengths: &Lengths, needed_pieces: &BF) -> BF {
let required_size = lengths.chunk_bitfield_bytes();
let vec = vec![0u8; required_size];
@ -35,11 +51,12 @@ fn compute_chunk_status(lengths: &Lengths, needed_pieces: &BF) -> BF {
}
impl ChunkTracker {
pub fn new(needed_pieces: BF, lengths: Lengths) -> Self {
pub fn new(needed_pieces: BF, have_pieces: BF, lengths: Lengths) -> Self {
Self {
chunk_status: compute_chunk_status(&lengths, &needed_pieces),
needed_pieces,
lengths,
have: have_pieces,
}
}
pub fn get_needed_pieces(&self) -> &BF {
@ -60,6 +77,10 @@ impl ChunkTracker {
.unwrap_or_default()
}
pub fn mark_piece_downloaded(&mut self, idx: ValidPieceIndex) {
self.have.set(idx.get() as usize, true)
}
// return true if the whole piece is marked downloaded
pub fn mark_chunk_downloaded(&mut self, piece: &Piece<ByteString>) -> Option<bool> {
let chunk_info = self.lengths.chunk_info_from_received_piece(piece)?;

View file

@ -17,6 +17,7 @@ use futures::{stream::FuturesUnordered, StreamExt};
use log::{debug, error, info, trace, warn};
use parking_lot::{Mutex, RwLock};
use reqwest::Url;
use size_format::SizeFormatterBinary as SF;
use tokio::sync::{mpsc::Sender, Notify, Semaphore};
use crate::{
@ -201,7 +202,10 @@ struct TorrentManagerInner {
info_hash: [u8; 20],
peer_id: [u8; 20],
incoming_tx: tokio::sync::mpsc::Sender<(PeerHandle, MessageOwned)>,
downloaded: AtomicU64,
have: AtomicU64,
downloaded_and_checked: AtomicU64,
needed: u64,
uploaded: AtomicU64,
fetched_bytes: AtomicU64,
lengths: Lengths,
@ -280,14 +284,25 @@ fn update_hash_from_file(
Ok(())
}
fn compute_needed_pieces(
struct InitialCheckResults {
needed_pieces: BF,
have_pieces: BF,
have_bytes: u64,
needed_bytes: u64,
}
fn initial_check(
torrent: &TorrentMetaV1Owned,
files: &[Arc<Mutex<File>>],
only_files: Option<&[usize]>,
lengths: &Lengths,
) -> anyhow::Result<BF> {
let needed_pieces = vec![0u8; lengths.piece_bitfield_bytes()];
let mut needed_pieces = BF::from_vec(needed_pieces);
) -> anyhow::Result<InitialCheckResults> {
let mut needed_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]);
let mut have_pieces = BF::from_vec(vec![0u8; lengths.piece_bitfield_bytes()]);
let mut have_bytes = 0u64;
let mut needed_bytes = 0u64;
struct CurrentFile<'a> {
index: usize,
fd: &'a Arc<Mutex<File>>,
@ -333,33 +348,6 @@ fn compute_needed_pieces(
let mut read_buffer = vec![0u8; 65536];
for piece_info in lengths.iter_piece_infos() {
// We need to compute the hash (and afterwards mark the piece as NOT needed) if ANY of the following are true
// - the file is required
// - the current piece is required (i.e. it's a part of some other file that is required)
// This means, that for an easy implementation:
// - we ALWAYS try to compute the hash from existing files
// - after the whole piece was processed, we mark the piece needed if:
// - at least one file that the piece owns was required
// - and (there were errors OR the hash does not match)
//
// If there's an error, it's fine only if none of the files was required.
// let mut seek: Option<u64> = None;
// Optimization for a common case: if the piece is wholy in the file, and the file is not required, continue
// if !current_file.full_file_required && current_file.remaining() >= piece_info.len as u64 {
// seek = match seek {
// None => Some(piece_info.len as u64),
// Some(s) => {
// current_file.mark_processed_bytes(piece_info.len as u64);
// Some(s + piece_info.len as u64)
// }
// };
// continue;
// }
let mut computed_hash = sha1::Sha1::new();
let mut piece_remaining = piece_info.len as usize;
let mut piece_is_needed = false;
@ -394,23 +382,6 @@ fn compute_needed_pieces(
let mut fd = current_file.fd.lock();
// if let Some(offset) = seek.take() {
// match fd.seek(SeekFrom::Start(offset)) {
// Ok(v) => {
// assert_eq!(v, offset)
// }
// Err(e) => {
// debug!(
// "error seeking in file {} to {}: {:#}",
// current_file.index, offset, &e
// );
// piece_is_needed = true;
// current_file.is_broken = true;
// continue;
// }
// }
// }
fd.seek(SeekFrom::Start(pos)).unwrap();
if let Err(err) = update_hash_from_file(
&mut fd,
@ -427,14 +398,6 @@ fn compute_needed_pieces(
}
}
if !at_least_one_file_required {
trace!(
"piece {} is not required by any of the requested files, ignoring",
piece_info.piece_index
);
continue;
}
if piece_is_needed {
trace!(
"piece {} had errors, marking as needed",
@ -453,16 +416,31 @@ fn compute_needed_pieces(
"piece {} is fine, not marking as needed",
piece_info.piece_index
);
have_bytes += piece_info.len as u64;
have_pieces.set(piece_info.piece_index.get() as usize, true);
} else {
trace!(
"piece {} hash does not match, marking as needed",
piece_info.piece_index
);
needed_pieces.set(piece_info.piece_index.get() as usize, true);
if !at_least_one_file_required {
trace!(
"piece {} hash does not match, marking as needed",
piece_info.piece_index
);
needed_bytes += piece_info.len as u64;
needed_pieces.set(piece_info.piece_index.get() as usize, true);
} else {
trace!(
"piece {} is not required by any of the requested files, ignoring",
piece_info.piece_index
);
}
}
}
Ok(needed_pieces)
Ok(InitialCheckResults {
needed_pieces,
have_pieces,
have_bytes,
needed_bytes,
})
}
impl TorrentManager {
@ -509,10 +487,23 @@ impl TorrentManager {
let peer_id = generate_peer_id();
let lengths = make_lengths(&torrent).context("unable to compute Lengths from torrent")?;
let needed_pieces =
compute_needed_pieces(&torrent, &files, only_files.as_deref(), &lengths)?;
debug!("computed lengths: {:?}", &lengths);
let chunk_tracker = ChunkTracker::new(needed_pieces, lengths);
info!("Doing initial checksum validation, this might take a while...");
let initial_check_results =
initial_check(&torrent, &files, only_files.as_deref(), &lengths)?;
info!(
"Initial check results: have {}, needed {}",
SF::new(initial_check_results.have_bytes),
SF::new(initial_check_results.needed_bytes)
);
let chunk_tracker = ChunkTracker::new(
initial_check_results.needed_pieces,
initial_check_results.have_pieces,
lengths,
);
let (incoming_tx, incoming_rx) =
tokio::sync::mpsc::channel::<(PeerHandle, MessageOwned)>(1);
@ -528,7 +519,9 @@ impl TorrentManager {
})),
files,
incoming_tx,
downloaded: Default::default(),
have: AtomicU64::new(initial_check_results.have_bytes),
needed: initial_check_results.needed_bytes,
downloaded_and_checked: Default::default(),
fetched_bytes: Default::default(),
uploaded: Default::default(),
lengths,
@ -547,18 +540,25 @@ impl TorrentManager {
async fn stats_printer(self) -> anyhow::Result<()> {
loop {
let live_peers = self.inner.locked.read().peers.states.len();
let downloaded_bytes = self.inner.downloaded.load(Ordering::Relaxed);
let downloaded = self.inner.downloaded.load(Ordering::Relaxed) / 1024 / 1024;
let fetched = self.inner.fetched_bytes.load(Ordering::Relaxed) / 1024 / 1024;
let total_length = self.inner.lengths.total_length();
let pct = if total_length == downloaded {
let have = self.inner.have.load(Ordering::Relaxed);
let fetched = self.inner.fetched_bytes.load(Ordering::Relaxed);
let needed = self.inner.needed;
let downloaded = self.inner.downloaded_and_checked.load(Ordering::Relaxed);
let remaining = needed - downloaded;
let downloaded_pct = if downloaded == needed {
100f64
} else {
(downloaded_bytes as f64 / self.inner.lengths.total_length() as f64) * 100f64
(downloaded as f64 / needed as f64) * 100f64
};
info!(
"Total downloaded and checked {}MiB ({:.2}%), fetched {}MiB, live peers={}",
downloaded, pct, fetched, live_peers
"Stats: downloaded {:.2}% ({}), live peers {}, fetched {}, remaining {} out of {}, total have {}",
downloaded_pct,
SF::new(downloaded),
live_peers,
SF::new(fetched),
SF::new(remaining),
SF::new(needed),
SF::new(have)
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
@ -1150,10 +1150,18 @@ impl TorrentManager {
.with_context(|| format!("error checking piece={}", index))?
{
true => {
this.inner.downloaded.fetch_add(
this.inner.lengths.piece_length(chunk_info.piece_index) as u64,
Ordering::Relaxed,
);
let piece_len =
this.inner.lengths.piece_length(chunk_info.piece_index) as u64;
this.inner
.downloaded_and_checked
.fetch_add(piece_len, Ordering::Relaxed);
this.inner.have.fetch_add(piece_len, Ordering::Relaxed);
this.inner
.locked
.write()
.chunks
.mark_piece_downloaded(chunk_info.piece_index);
debug!(
"piece={} successfully downloaded and verified from {}",
index, handle
@ -1188,7 +1196,7 @@ impl TorrentManager {
self.inner.uploaded.load(Ordering::Relaxed)
}
fn get_downloaded(&self) -> u64 {
self.inner.downloaded.load(Ordering::Relaxed)
self.inner.downloaded_and_checked.load(Ordering::Relaxed)
}
async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result<u64> {
let response: reqwest::Response = reqwest::get(tracker_url).await?;
@ -1213,7 +1221,7 @@ impl TorrentManager {
.unwrap_or_default()
}
fn get_left_to_download(&self) -> u64 {
self.get_total() - self.get_downloaded()
self.inner.needed - self.get_downloaded()
}
async fn single_tracker_monitor(self, mut tracker_url: Url) -> anyhow::Result<()> {