Merge pull request #35 from ikatson/refactor-peer-system

Major refactoring
This commit is contained in:
Igor Katson 2023-11-20 10:39:09 +00:00 committed by GitHub
commit b06ef71cf3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 1434 additions and 669 deletions

263
Cargo.lock generated
View file

@ -146,6 +146,17 @@ dependencies = [
"tower-service",
]
[[package]]
name = "backoff"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1"
dependencies = [
"getrandom",
"instant",
"rand",
]
[[package]]
name = "backtrace"
version = "0.3.69"
@ -353,6 +364,19 @@ dependencies = [
"winapi",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "digest"
version = "0.10.7"
@ -399,19 +423,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "env_logger"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95b3f3e67048839cb0d0781f445682a35113da7121f7c949db0e2be96a4fbece"
dependencies = [
"humantime",
"is-terminal",
"log",
"regex",
"termcolor",
]
[[package]]
name = "equivalent"
version = "1.0.1"
@ -434,6 +445,12 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "fnv"
version = "1.0.7"
@ -678,12 +695,6 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.27"
@ -755,23 +766,21 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
]
[[package]]
name = "ipnet"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "is-terminal"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
dependencies = [
"hermit-abi",
"rustix",
"windows-sys",
]
[[package]]
name = "itertools"
version = "0.12.0"
@ -821,14 +830,16 @@ dependencies = [
[[package]]
name = "librqbit"
version = "2.2.2"
version = "3.0.0-beta.0"
dependencies = [
"anyhow",
"axum",
"backoff",
"bincode",
"bitvec",
"byteorder",
"crypto-hash",
"dashmap",
"futures",
"hex 0.4.3",
"http",
@ -839,10 +850,8 @@ dependencies = [
"librqbit-dht",
"librqbit-peer-protocol",
"librqbit-sha1-wrapper",
"log",
"openssl",
"parking_lot",
"pretty_env_logger",
"rand",
"regex",
"reqwest",
@ -853,6 +862,8 @@ dependencies = [
"size_format",
"tokio",
"tokio-stream",
"tracing",
"tracing-subscriber",
"url",
"urlencoding",
"uuid",
@ -883,7 +894,7 @@ version = "2.2.1"
[[package]]
name = "librqbit-core"
version = "2.2.2"
version = "3.0.0"
dependencies = [
"anyhow",
"hex 0.4.3",
@ -891,7 +902,6 @@ dependencies = [
"librqbit-bencode",
"librqbit-buffers",
"librqbit-clone-to-owned",
"log",
"parking_lot",
"serde",
"url",
@ -900,7 +910,7 @@ dependencies = [
[[package]]
name = "librqbit-dht"
version = "2.2.2"
version = "3.0.0"
dependencies = [
"anyhow",
"directories",
@ -910,19 +920,19 @@ dependencies = [
"librqbit-bencode",
"librqbit-clone-to-owned",
"librqbit-core",
"log",
"parking_lot",
"pretty_env_logger",
"rand",
"serde",
"serde_json",
"tokio",
"tokio-stream",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "librqbit-peer-protocol"
version = "2.2.2"
version = "3.0.0"
dependencies = [
"anyhow",
"bincode",
@ -966,6 +976,15 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.3"
@ -1022,6 +1041,16 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num"
version = "0.2.1"
@ -1174,6 +1203,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -1190,10 +1225,13 @@ version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"backtrace",
"cfg-if",
"libc",
"petgraph",
"redox_syscall",
"smallvec",
"thread-id",
"windows-targets",
]
@ -1214,6 +1252,16 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "petgraph"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]]
name = "pin-project"
version = "1.1.3"
@ -1258,16 +1306,6 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pretty_env_logger"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c"
dependencies = [
"env_logger",
"log",
]
[[package]]
name = "proc-macro2"
version = "1.0.69"
@ -1350,8 +1388,17 @@ checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
@ -1362,9 +1409,15 @@ checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"regex-syntax 0.8.2",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.2"
@ -1430,22 +1483,23 @@ dependencies = [
[[package]]
name = "rqbit"
version = "2.2.2"
version = "3.0.0-beta.0"
dependencies = [
"anyhow",
"clap",
"futures",
"librqbit",
"librqbit-dht",
"log",
"parking_lot",
"parse_duration",
"pretty_env_logger",
"regex",
"reqwest",
"serde",
"serde_json",
"size_format",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
@ -1469,9 +1523,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.8"
version = "0.21.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "446e14c5cda4f3f30fe71863c34ec70f5ac79d6087097ad0bb433e1be5edf04c"
checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9"
dependencies = [
"log",
"ring",
@ -1622,6 +1676,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "size_format"
version = "1.0.2"
@ -1736,15 +1799,6 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "termcolor"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff1bc3d3f05aff0403e8ac0d92ced918ec05b666a43f83297ccef5bea8a3d449"
dependencies = [
"winapi-util",
]
[[package]]
name = "thiserror"
version = "1.0.50"
@ -1765,6 +1819,26 @@ dependencies = [
"syn",
]
[[package]]
name = "thread-id"
version = "4.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0ec81c46e9eb50deaa257be2f148adf052d1fb7701cfd55ccfab2525280b70b"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "thread_local"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
@ -1890,9 +1964,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.32"
@ -1900,6 +1986,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
@ -1973,6 +2089,12 @@ dependencies = [
"getrandom",
]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
@ -2098,15 +2220,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"

View file

@ -15,4 +15,5 @@ members = [
panic = "abort"
[profile.release]
panic = "abort"
panic = "abort"
debug = true

19
TODO.md
View file

@ -1,16 +1,11 @@
- [x] Selective file downloading (mostly done)
- [x] Proper counting of how much is left, and how much is downloaded
- [x] Send bitfield at the start if I have something
- [x] use the "update_hash" function in piece checking
- [ ] signaling when file is done
- [ ] when we have the whole torrent, there's no point talking to peers that also have the whole torrent
- [ ] when we have the whole torrent, there's no point talking to peers that also have the whole torrent and keep reconnecting to them.
- [ ] per-file stats
- [ ] per-peer stats
- [x] slow peers cause slowness in the end, need the "end of game" algorithm
- [x (partial)] per-peer stats
- [x] use some concurrent hashmap e.g. flurry or dashmap
- [x] tracing instead of logging. Debugging peers: RUST_LOG=[{peer=.*}]=debug
test-log for tests
- [ ] reopen read only is bugged:
expected to be able to write to disk: error writing to file 0 (""The.Creator.2023.D.AMZN.WEB-DLRip.1.46Gb.MegaPeer.avi"")
someday:
- [ ] cancellation from the client-side for the lib (i.e. stop the torrent manager)

View file

@ -1,3 +1,8 @@
// This crate used for making working with &[u8] or Vec<u8> generic in other parts of librqbit,
// for nicer display of binary data etc.
//
// Not useful outside of librqbit.
use serde::{Deserialize, Deserializer};
use clone_to_owned::CloneToOwned;

View file

@ -1,3 +1,11 @@
// These are helpers for objects that can be borrowed, but can be made owned while changing the type.
// The difference between e.g. Cow and CloneToOwned, is that we can implement it recursively for owned types.
//
// E.g. HashMap<&str, &str> can be converted to HashMap<String, String>.
//
// This lets us express types like TorrentMetaInfo<&[u8]> for zero-copy metadata about a bencode buffer in memory,
// but to have one-line conversion for it into TorrentMetaInfo<Vec<u8>> so that we can store it later somewhere.
use std::collections::HashMap;
pub trait CloneToOwned {

View file

@ -1,6 +1,6 @@
[package]
name = "librqbit-dht"
version = "2.2.2"
version = "3.0.0"
edition = "2018"
description = "DHT implementation, used in rqbit torrent client."
license = "Apache-2.0"
@ -25,14 +25,14 @@ hex = "0.4"
bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"}
anyhow = "1"
parking_lot = "0.12"
log = "0.4"
pretty_env_logger = "0.5"
tracing = "0.1"
futures = "0.3"
rand = "0.8"
indexmap = "2"
directories = "5"
clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
librqbit-core = {path="../librqbit_core", version = "2.2.1"}
librqbit-core = {path="../librqbit_core", version = "3.0.0"}
[dev-dependencies]
tracing-subscriber = "0.3"

View file

@ -2,14 +2,14 @@ use std::{str::FromStr, time::Duration};
use anyhow::Context;
use librqbit_dht::{Dht, Id20};
use log::info;
use tokio_stream::StreamExt;
use tracing::info;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::init();
let info_hash = Id20::from_str("64a980abe6e448226bb930ba061592e44c3781a1").unwrap();
tracing_subscriber::fmt::init();
let dht = Dht::new().await.context("error initializing DHT")?;
let mut stream = dht.get_peers(info_hash).await?;
@ -42,7 +42,7 @@ async fn main() -> anyhow::Result<()> {
let peer_printer = async {
while let Some(peer) = stream.next().await {
log::info!("peer found: {}", peer)
info!("peer found: {}", peer)
}
Ok(())
};

View file

@ -17,7 +17,6 @@ use bencode::ByteString;
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use indexmap::IndexSet;
use librqbit_core::{id20::Id20, peer_id::generate_peer_id};
use log::{debug, info, trace, warn};
use parking_lot::RwLock;
use rand::Rng;
use serde::Serialize;
@ -26,6 +25,7 @@ use tokio::{
sync::mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tracing::{debug, info, trace, warn};
#[derive(Debug, Serialize)]
pub struct DhtStats {
@ -449,7 +449,7 @@ async fn run_framer(
Err(_) => break,
}
}
Err(e) => log::debug!("{}: error deserializing incoming message: {}", addr, e),
Err(e) => debug!("{}: error deserializing incoming message: {}", addr, e),
}
}
Err::<(), _>(anyhow::anyhow!(

View file

@ -8,8 +8,8 @@ use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::Context;
use log::{debug, error, info, trace, warn};
use tokio::spawn;
use tracing::{debug, error, info, trace, warn};
use crate::dht::{Dht, DhtConfig};
use crate::routing_table::RoutingTable;

View file

@ -4,8 +4,8 @@ use std::{
};
use librqbit_core::id20::Id20;
use log::debug;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tracing::debug;
#[derive(Debug, Clone, Serialize, Deserialize)]
enum BucketTreeNodeData {

View file

@ -1,6 +1,6 @@
[package]
name = "librqbit"
version = "2.2.2"
version = "3.0.0-beta.0"
authors = ["Igor Katson <igor.katson@gmail.com>"]
edition = "2018"
description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it."
@ -13,6 +13,7 @@ readme = "README.md"
[features]
default = ["sha1-system", "default-tls"]
timed_existence = []
sha1-system = ["sha1w/sha1-system"]
sha1-openssl = ["sha1w/sha1-openssl"]
sha1-rust = ["sha1w/sha1-rust"]
@ -22,11 +23,11 @@ rust-tls = ["reqwest/rustls-tls"]
[dependencies]
bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"}
buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"}
librqbit-core = {path = "../librqbit_core", version = "2.2.2"}
librqbit-core = {path = "../librqbit_core", version = "3.0.0"}
clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "2.2.2"}
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.0.0"}
sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"}
dht = {path = "../dht", package="librqbit-dht", version="2.2.2"}
dht = {path = "../dht", package="librqbit-dht", version="3.0.0"}
tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
axum = {version = "0.6"}
@ -43,7 +44,7 @@ byteorder = "1"
bincode = "1"
bitvec = "1"
parking_lot = "0.12"
log = "0.4"
tracing = "0.1.40"
size_format = "1"
rand = "0.8"
@ -55,7 +56,9 @@ uuid = {version = "1.2", features = ["v4"]}
futures = "0.3"
url = "2"
hex = "0.4"
backoff = "0.4.0"
dashmap = "5.5.3"
[dev-dependencies]
futures = {version = "0.3"}
pretty_env_logger = "0.5"
tracing-subscriber = "0.3"

View file

@ -3,38 +3,10 @@
A torrent library 100% written in rust
## Basic example
See ```examples``` folder.
This is a simple program on how to use this library
This program will just download a simple torrent file with a Magnet link
```rust
use std::error::Error;
use std::path::PathBuf;
use librqbit::session::{AddTorrentResponse, Session};
use librqbit::spawn_utils::BlockingSpawner;
const MAGNET_LINK: &str = "magnet:?..."; // Put your magnet link here
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>>{
// Create the session
let session = Session::new("C:\\Anime".parse().unwrap(), BlockingSpawner::new(false)).await?;
// Add the torrent to the session
let handle = match session.add_torrent(MAGNET_LINK, None).await {
Ok(AddTorrentResponse::Added(handle)) => {
Ok(handle)
},
Err(e) => {
eprintln!("Something goes wrong when downloading torrent : {:?}", e);
Err(())
}
_ => Err(())
}.expect("Failed to add torrent to the session");
// Wait until the download is completed
handle.wait_until_completed().await?;
Ok(())
}
```

View file

@ -0,0 +1,69 @@
// For production-grade code look at rqbit::main(), which does the same but has more options.
//
// Usage:
// cargo run --release --example ubuntu /tmp/ubuntu/
use std::time::Duration;
use anyhow::Context;
use librqbit::session::{AddTorrentOptions, AddTorrentResponse, Session};
use tracing::info;
// This is ubuntu-21.04-live-server-amd64.iso.torrent
// You can also pass filenames and URLs to add_torrent().
const MAGNET_LINK: &str = "magnet:?xt=urn:btih:cab507494d02ebb1178b38f2e9d7be299c86b862";
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// Output logs to console.
tracing_subscriber::fmt::init();
let output_dir = std::env::args()
.nth(1)
.expect("the first argument should be the output directory");
// Create the session
let session = Session::new(output_dir.into(), Default::default())
.await
.context("error creating session")?;
// Add the torrent to the session
let handle = match session
.add_torrent(
MAGNET_LINK,
Some(AddTorrentOptions {
// Set this to true to allow writing on top of existing files.
// If the file is partially downloaded, librqbit will only download the
// missing pieces.
//
// Otherwise it will throw an error that the file exists.
overwrite: false,
..Default::default()
}),
)
.await
.context("error adding torrent")?
{
AddTorrentResponse::Added(handle) => handle,
// For a brand new session other variants won't happen.
_ => unreachable!(),
};
// Print stats periodically.
tokio::spawn({
let handle = handle.clone();
async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let stats = handle.torrent_state().stats_snapshot();
info!("stats: {stats:?}");
}
}
});
// Wait until the download is completed
handle.wait_until_completed().await?;
info!("torrent downloaded");
Ok(())
}

View file

@ -1,6 +1,6 @@
use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex};
use log::{debug, info};
use peer_binary_protocol::Piece;
use tracing::{debug, info};
use crate::type_aliases::BF;
@ -8,8 +8,9 @@ pub struct ChunkTracker {
// This forms the basis of a "queue" to pull from.
// It's set to 1 if we need a piece, but the moment we start requesting a peer,
// it's set to 0.
// Better to rename into piece_queue or smth, and maybe use some other form of a queue.
//
// Initially this is the opposite of "have", until we start making requests.
// An in-flight request is not in "needed", and not in "have".
needed_pieces: BF,
// This has a bit set per each chunk (block) that we have written to the output file.
@ -21,6 +22,7 @@ pub struct ChunkTracker {
lengths: Lengths,
// What pieces to download first.
priority_piece_ids: Vec<usize>,
}
@ -168,17 +170,6 @@ impl ChunkTracker {
piece.index, chunk_info, chunk_range,
);
// TODO: remove me, it's for debugging
// {
// use std::io::Write;
// let mut f = std::fs::OpenOptions::new()
// .write(true)
// .create(true)
// .open("/tmp/chunks")
// .unwrap();
// write!(f, "{:?}", &self.have).unwrap();
// }
if chunk_range.all() {
return Some(ChunkMarkingResult::Completed);
}

View file

@ -4,7 +4,7 @@ use anyhow::Context;
use buffers::ByteString;
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use log::debug;
use tracing::debug;
use crate::{
peer_connection::PeerConnectionOptions, peer_info_reader, spawn_utils::BlockingSpawner,
@ -97,7 +97,7 @@ mod tests {
fn init_logging() {
#[allow(unused_must_use)]
LOG_INIT.call_once(|| {
pretty_env_logger::try_init();
// pretty_env_logger::try_init();
})
}

View file

@ -11,7 +11,7 @@ use librqbit_core::{
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
torrent_metainfo::{FileIteratorName, TorrentMetaV1Info},
};
use log::{debug, trace, warn};
use tracing::{debug, trace, warn};
use parking_lot::Mutex;
use peer_binary_protocol::Piece;
use sha1w::ISha1;

View file

@ -7,12 +7,12 @@ use dht::{Dht, DhtStats};
use http::StatusCode;
use librqbit_core::id20::Id20;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use log::warn;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{info, warn};
use axum::Router;
@ -110,7 +110,7 @@ impl HttpApi {
.route("/torrents/:id/stats", get(torrent_stats))
.with_state(state);
log::info!("starting HTTP server on {}", addr);
info!("starting HTTP server on {}", addr);
axum::Server::try_bind(&addr)
.with_context(|| format!("error binding to {addr}"))?
.serve(app.into_make_service())
@ -341,7 +341,10 @@ impl ApiInternal {
let mgr = self.mgr_handle(idx)?;
Ok(format!(
"{:?}",
mgr.torrent_state().lock_read().chunks.get_have_pieces(),
mgr.torrent_state()
.lock_read("api_dump_haves")
.chunks
.get_have_pieces(),
))
}
}

View file

@ -5,7 +5,6 @@ pub mod http_api;
pub mod http_api_client;
mod http_api_error;
pub mod peer_connection;
pub mod peer_handler;
pub mod peer_info_reader;
pub mod peer_state;
pub mod session;

View file

@ -4,13 +4,13 @@ use anyhow::Context;
use buffers::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned;
use librqbit_core::{id20::Id20, lengths::ChunkInfo, peer_id::try_decode_peer_id};
use log::{debug, trace};
use peer_binary_protocol::{
extended::{handshake::ExtendedHandshake, ExtendedMessage},
serialize_piece_preamble, Handshake, Message, MessageBorrowed, MessageDeserializeError,
MessageOwned, PIECE_MESSAGE_DEFAULT_LEN,
};
use tokio::time::timeout;
use tracing::{debug, trace};
use crate::spawn_utils::BlockingSpawner;
@ -31,6 +31,7 @@ pub trait PeerConnectionHandler {
pub enum WriterRequest {
Message(MessageOwned),
ReadChunkRequest(ChunkInfo),
Disconnect,
}
#[derive(Default, Copy, Clone)]
@ -56,10 +57,10 @@ async fn with_timeout<T, E>(
where
E: Into<anyhow::Error>,
{
timeout(timeout_value, fut)
.await
.with_context(|| format!("timeout at {timeout_value:?}"))?
.map_err(|e| e.into())
match timeout(timeout_value, fut).await {
Ok(v) => v.map_err(Into::into),
Err(_) => anyhow::bail!("timeout at {timeout_value:?}"),
}
}
macro_rules! read_one {
@ -148,11 +149,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let (h, size) = Handshake::deserialize(&read_buf[..read_so_far])
.map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?;
debug!(
"connected peer {}: {:?}",
self.addr,
try_decode_peer_id(Id20(h.peer_id))
);
debug!("connected: id={:?}", try_decode_peer_id(Id20(h.peer_id)));
if h.info_hash != self.info_hash.0 {
anyhow::bail!("info hash does not match");
}
@ -169,11 +166,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
if supports_extended {
let my_extended =
Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new()));
trace!(
"sending extended handshake to {}: {:?}",
self.addr,
&my_extended
);
trace!("sending extended handshake: {:?}", &my_extended);
my_extended.serialize(&mut write_buf, None).unwrap();
with_timeout(rwtimeout, conn.write_all(&write_buf))
.await
@ -183,7 +176,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let (extended, size) = read_one!(conn, read_buf, read_so_far, rwtimeout);
match extended {
Message::Extended(ExtendedMessage::Handshake(h)) => {
trace!("received from {}: {:?}", self.addr, &h);
trace!("received: {:?}", &h);
self.handler.on_extended_handshake(&h)?;
extended_handshake = Some(h.clone_to_owned())
}
@ -212,7 +205,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
.await
.context("error writing bitfield to peer")?;
debug!("sent bitfield to {}", self.addr);
debug!("sent bitfield");
}
}
@ -247,9 +240,12 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
uploaded_add = Some(chunk.size);
full_len
}
WriterRequest::Disconnect => {
return Ok(());
}
};
debug!("sending to {}: {:?}, length={}", self.addr, &req, len);
debug!("sending: {:?}, length={}", &req, len);
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
.await
@ -269,7 +265,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let reader = async move {
loop {
let (message, size) = read_one!(read_half, read_buf, read_so_far, rwtimeout);
trace!("received from {}: {:?}", self.addr, &message);
trace!("received: {:?}", &message);
self.handler
.on_received_message(message)
@ -290,7 +286,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
r = reader => {r}
r = writer => {r}
};
debug!("{}: either reader or writer are done, exiting", self.addr);
debug!("either reader or writer are done, exiting");
r
}
}

View file

@ -1 +0,0 @@

View file

@ -8,7 +8,6 @@ use librqbit_core::{
lengths::{ceil_div_u64, last_element_size_u64, ChunkInfo},
torrent_metainfo::TorrentMetaV1Info,
};
use log::debug;
use parking_lot::{Mutex, RwLock};
use peer_binary_protocol::{
extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage},
@ -16,6 +15,7 @@ use peer_binary_protocol::{
};
use sha1w::{ISha1, Sha1};
use tokio::sync::mpsc::UnboundedSender;
use tracing::debug;
use crate::{
peer_connection::{
@ -238,7 +238,7 @@ mod tests {
fn init_logging() {
#[allow(unused_must_use)]
LOG_INIT.call_once(|| {
pretty_env_logger::try_init();
// pretty_env_logger::try_init();
})
}

View file

@ -1,9 +1,16 @@
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use std::{collections::HashSet, sync::Arc};
use anyhow::Context;
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
use librqbit_core::id20::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
use serde::Serialize;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::{Notify, Semaphore};
use crate::peer_connection::WriterRequest;
use crate::type_aliases::BF;
#[derive(Debug, Hash, PartialEq, Eq)]
@ -21,11 +28,194 @@ impl From<&ChunkInfo> for InflightRequest {
}
}
// TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak.
pub type PeerRx = UnboundedReceiver<WriterRequest>;
pub type PeerTx = UnboundedSender<WriterRequest>;
pub trait SendMany {
fn send_many(&self, requests: impl IntoIterator<Item = WriterRequest>) -> anyhow::Result<()>;
}
impl SendMany for PeerTx {
fn send_many(&self, requests: impl IntoIterator<Item = WriterRequest>) -> anyhow::Result<()> {
requests
.into_iter()
.try_for_each(|r| self.send(r))
.context("peer dropped")
}
}
#[derive(Debug)]
pub struct PeerStats {
pub backoff: ExponentialBackoff,
}
impl Default for PeerStats {
fn default() -> Self {
Self {
backoff: ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_secs(10))
.with_multiplier(6.)
.with_max_interval(Duration::from_secs(3600))
.with_max_elapsed_time(Some(Duration::from_secs(86400)))
.build(),
}
}
}
#[derive(Debug, Default)]
pub struct Peer {
pub state: PeerStateNoMut,
pub stats: PeerStats,
}
#[derive(Debug, Default, Serialize)]
pub struct AggregatePeerStatsAtomic {
pub queued: AtomicU32,
pub connecting: AtomicU32,
pub live: AtomicU32,
pub seen: AtomicU32,
pub dead: AtomicU32,
pub not_needed: AtomicU32,
}
pub fn atomic_inc(c: &AtomicU32) -> u32 {
c.fetch_add(1, Ordering::Relaxed)
}
pub fn atomic_dec(c: &AtomicU32) -> u32 {
c.fetch_sub(1, Ordering::Relaxed)
}
impl AggregatePeerStatsAtomic {
pub fn counter(&self, state: &PeerState) -> &AtomicU32 {
match state {
PeerState::Connecting(_) => &self.connecting,
PeerState::Live(_) => &self.live,
PeerState::Queued => &self.queued,
PeerState::Dead => &self.dead,
PeerState::NotNeeded => &self.not_needed,
}
}
pub fn inc(&self, state: &PeerState) {
atomic_inc(self.counter(state));
}
pub fn dec(&self, state: &PeerState) {
atomic_dec(self.counter(state));
}
pub fn incdec(&self, old: &PeerState, new: &PeerState) {
self.dec(old);
self.inc(new);
}
}
#[derive(Debug, Default)]
pub enum PeerState {
#[default]
// Will be tried to be connected as soon as possible.
Queued,
Connecting,
Connecting(PeerTx),
Live(LivePeerState),
// There was an error, and it's waiting for exponential backoff.
Dead,
// We don't need to do anything with the peer any longer.
// The peer has the full torrent, and we have the full torrent, so no need
// to keep talking to it.
NotNeeded,
}
impl std::fmt::Display for PeerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
impl PeerState {
pub fn name(&self) -> &'static str {
match self {
PeerState::Queued => "queued",
PeerState::Connecting(_) => "connecting",
PeerState::Live(_) => "live",
PeerState::Dead => "dead",
PeerState::NotNeeded => "not needed",
}
}
pub fn take_live_no_counters(self) -> Option<LivePeerState> {
match self {
PeerState::Live(l) => Some(l),
_ => None,
}
}
}
#[derive(Debug, Default)]
pub struct PeerStateNoMut(PeerState);
impl PeerStateNoMut {
pub fn get(&self) -> &PeerState {
&self.0
}
pub fn take(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState {
self.set(Default::default(), counters)
}
pub fn set(&mut self, new: PeerState, counters: &AggregatePeerStatsAtomic) -> PeerState {
counters.incdec(&self.0, &new);
std::mem::replace(&mut self.0, new)
}
pub fn get_live(&self) -> Option<&LivePeerState> {
match &self.0 {
PeerState::Live(l) => Some(l),
_ => None,
}
}
pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> {
match &mut self.0 {
PeerState::Live(l) => Some(l),
_ => None,
}
}
pub fn queued_to_connecting(&mut self, counters: &AggregatePeerStatsAtomic) -> Option<PeerRx> {
if let PeerState::Queued = &self.0 {
let (tx, rx) = unbounded_channel();
self.set(PeerState::Connecting(tx), counters);
Some(rx)
} else {
None
}
}
pub fn connecting_to_live(
&mut self,
peer_id: Id20,
counters: &AggregatePeerStatsAtomic,
) -> Option<&mut LivePeerState> {
if let PeerState::Connecting(_) = &self.0 {
let tx = match self.take(counters) {
PeerState::Connecting(tx) => tx,
_ => unreachable!(),
};
self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters);
self.get_live_mut()
} else {
None
}
}
pub fn to_dead(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState {
self.set(PeerState::Dead, counters)
}
pub fn to_not_needed(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState {
self.set(PeerState::NotNeeded, counters)
}
}
#[derive(Debug)]
@ -33,22 +223,45 @@ pub struct LivePeerState {
pub peer_id: Id20,
pub i_am_choked: bool,
pub peer_interested: bool,
// This is used to limit the number of chunk requests we send to a peer at a time.
pub requests_sem: Arc<Semaphore>,
// This is used to unpause processes after we were choked.
pub have_notify: Arc<Notify>,
pub bitfield: Option<BF>,
// This is used to track the pieces the peer has.
pub bitfield: BF,
// This is used to only request a piece from a peer once when stealing from others.
// So that you don't steal then re-steal the same piece in a loop.
pub previously_requested_pieces: BF,
// When the peer sends us data this is used to track if we asked for it.
pub inflight_requests: HashSet<InflightRequest>,
// The main channel to send requests to peer.
pub tx: PeerTx,
}
impl LivePeerState {
pub fn new(peer_id: Id20) -> Self {
pub fn new(peer_id: Id20, tx: PeerTx) -> Self {
LivePeerState {
peer_id,
i_am_choked: true,
peer_interested: false,
bitfield: None,
bitfield: BF::new(),
previously_requested_pieces: BF::new(),
have_notify: Arc::new(Notify::new()),
requests_sem: Arc::new(Semaphore::new(0)),
inflight_requests: Default::default(),
tx,
}
}
pub fn has_full_torrent(&self, total_pieces: usize) -> bool {
self.bitfield
.get(0..total_pieces)
.map_or(false, |s| s.all())
}
}

View file

@ -8,10 +8,10 @@ use librqbit_core::{
peer_id::generate_peer_id,
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned},
};
use log::{debug, info, warn};
use parking_lot::RwLock;
use reqwest::Url;
use tokio_stream::StreamExt;
use tracing::{debug, info, span, warn, Level};
use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
@ -251,7 +251,10 @@ impl Session {
torrent_from_file(url)?
};
let dht_rx = match self.dht.as_ref() {
Some(dht) => Some(dht.get_peers(torrent.info_hash).await?),
Some(dht) => {
debug!("reading peers for {:?} from DHT", torrent.info_hash);
Some(dht.get_peers(torrent.info_hash).await?)
}
None => None,
};
let trackers = torrent
@ -402,7 +405,7 @@ impl Session {
}
if let Some(mut dht_peer_rx) = dht_peer_rx {
spawn("DHT peer adder", {
spawn(span!(Level::INFO, "dht_peer_adder"), {
let handle = handle.clone();
async move {
while let Some(peer) = dht_peer_rx.next().await {

View file

@ -1,22 +1,22 @@
use std::fmt::Display;
use tracing::{debug, error, trace, Instrument};
use log::{debug, error};
pub fn spawn<N: Display + 'static + Send>(
name: N,
pub fn spawn(
span: tracing::Span,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) {
debug!("starting task \"{}\"", &name);
tokio::spawn(async move {
let fut = async move {
trace!("started");
match fut.await {
Ok(_) => {
debug!("task \"{}\" finished", &name);
debug!("finished");
}
Err(e) => {
error!("error in task \"{}\": {:#}", &name, e)
error!("{:#}", e)
}
}
});
}
.instrument(span.or_current());
tokio::spawn(fut);
}
#[derive(Clone, Copy, Debug)]
@ -38,3 +38,14 @@ impl BlockingSpawner {
f()
}
}
impl Default for BlockingSpawner {
fn default() -> Self {
let allow_block_in_place = match tokio::runtime::Handle::current().runtime_flavor() {
tokio::runtime::RuntimeFlavor::CurrentThread => false,
tokio::runtime::RuntimeFlavor::MultiThread => true,
_ => true,
};
Self::new(allow_block_in_place)
}
}

View file

@ -14,11 +14,11 @@ use librqbit_core::{
id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator,
torrent_metainfo::TorrentMetaV1Info,
};
use log::{debug, info, warn};
use parking_lot::Mutex;
use reqwest::Url;
use sha1w::Sha1;
use size_format::SizeFormatterBinary as SF;
use tracing::{debug, info, span, warn, Level};
use crate::{
chunk_tracker::ChunkTracker,
@ -116,9 +116,10 @@ impl TorrentManagerHandle {
pub fn add_tracker(&self, url: Url) -> bool {
let mgr = self.manager.clone();
if mgr.trackers.lock().insert(url.clone()) {
spawn(format!("tracker monitor {url}"), async move {
mgr.single_tracker_monitor(url).await
});
spawn(
span!(Level::ERROR, "tracker_monitor", url = url.to_string()),
async move { mgr.single_tracker_monitor(url).await },
);
true
} else {
false
@ -289,12 +290,12 @@ impl TorrentManager {
options,
});
spawn("speed estimator updater", {
spawn(span!(Level::ERROR, "speed_estimator_updater"), {
let state = mgr.state.clone();
async move {
loop {
let stats = state.stats_snapshot();
let fetched = state.stats_snapshot().fetched_bytes;
let fetched = stats.fetched_bytes;
let needed = state.initially_needed();
// fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64.
let remaining = needed

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[package]
name = "librqbit-core"
version = "2.2.2"
version = "3.0.0"
edition = "2018"
description = "Important utilities used throughout librqbit useful for working with torrents."
license = "Apache-2.0"
@ -21,7 +21,6 @@ hex = "0.4"
anyhow = "1"
url = "2"
uuid = {version = "1", features = ["v4"]}
log = "0.4"
parking_lot = "0.12"
serde = {version = "1", features=["derive"]}
buffers = {path="../buffers", package="librqbit-buffers", version = "2.2.1"}

View file

@ -1,6 +1,6 @@
[package]
name = "librqbit-peer-protocol"
version = "2.2.2"
version = "3.0.0"
edition = "2018"
description = "Protocol for working with torrent peers. Used in rqbit torrent client."
license = "Apache-2.0"
@ -23,6 +23,6 @@ byteorder = "1"
buffers = {path="../buffers", package="librqbit-buffers", version = "2.2.1"}
bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"}
clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
librqbit-core = {path="../librqbit_core", version = "2.2.2"}
librqbit-core = {path="../librqbit_core", version = "3.0.0"}
bitvec = "1"
anyhow = "1"

View file

@ -1,3 +1,7 @@
// BitTorrent peer protocol implementation: parsing, serialization etc.
//
// Can be used outside of librqbit.
pub mod extended;
use bincode::Options;

View file

@ -1,6 +1,6 @@
[package]
name = "rqbit"
version = "2.2.2"
version = "3.0.0-beta.0"
authors = ["Igor Katson <igor.katson@gmail.com>"]
edition = "2018"
description = "A bittorrent command line client and server."
@ -13,6 +13,7 @@ readme = "README.md"
[features]
default = ["sha1-system", "default-tls"]
timed_existence = ["librqbit/timed_existence"]
sha1-system = ["librqbit/sha1-system"]
sha1-openssl = ["librqbit/sha1-openssl"]
sha1-rust = ["librqbit/sha1-rust"]
@ -20,16 +21,17 @@ default-tls = ["librqbit/default-tls"]
rust-tls = ["librqbit/rust-tls"]
[dependencies]
librqbit = {path="../librqbit", default-features=false, version = "2.2.2"}
dht = {path="../dht", package="librqbit-dht", version="2.2.2"}
librqbit = {path="../librqbit", default-features=false, version = "3.0.0-beta.0"}
dht = {path="../dht", package="librqbit-dht", version="3.0.0"}
tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
anyhow = "1"
clap = {version = "4", features = ["derive", "deprecated"]}
log = "0.4"
pretty_env_logger = "0.5"
tracing = "0.1"
tracing-subscriber = {version = "0.3", features = ["env-filter"]}
regex = "1"
futures = "0.3"
parse_duration = "2"
parking_lot = {version = "0.12", features = ["deadlock_detection"]}
reqwest = "0.11"
serde = {version = "1", features=["derive"]}
serde_json = "1"

View file

@ -11,9 +11,10 @@ use librqbit::{
SessionOptions,
},
spawn_utils::{spawn, BlockingSpawner},
torrent_state::timeit,
};
use log::{error, info, warn};
use size_format::SizeFormatterBinary as SF;
use tracing::{error, info, span, warn, Level};
#[derive(Debug, Clone, Copy, ValueEnum)]
enum LogLevel {
@ -57,12 +58,12 @@ struct Opts {
disable_dht_persistence: bool,
/// The connect timeout, e.g. 1s, 1.5s, 100ms etc.
#[arg(long = "peer-connect-timeout", value_parser = parse_duration::parse)]
peer_connect_timeout: Option<Duration>,
#[arg(long = "peer-connect-timeout", value_parser = parse_duration::parse, default_value="2s")]
peer_connect_timeout: Duration,
/// The connect timeout, e.g. 1s, 1.5s, 100ms etc.
#[arg(long = "peer-read-write-timeout" , value_parser = parse_duration::parse)]
peer_read_write_timeout: Option<Duration>,
#[arg(long = "peer-read-write-timeout" , value_parser = parse_duration::parse, default_value="10s")]
peer_read_write_timeout: Duration,
/// How many threads to spawn for the executor.
#[arg(short = 't', long)]
@ -151,13 +152,44 @@ fn init_logging(opts: &Opts) {
}
};
}
pretty_env_logger::init();
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();
}
fn _start_deadlock_detector_thread() {
use parking_lot::deadlock;
use std::thread;
// Create a background thread which checks for deadlocks every 10s
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(10));
let deadlocks = deadlock::check_deadlock();
if deadlocks.is_empty() {
continue;
}
println!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
println!("Deadlock #{}", i);
for t in threads {
println!("Thread Id {:#?}", t.thread_id());
println!("{:#?}", t.backtrace());
}
}
std::process::exit(42);
});
}
fn main() -> anyhow::Result<()> {
let opts = Opts::parse();
init_logging(&opts);
// start_deadlock_detector_thread();
let (mut rt_builder, spawner) = match opts.single_thread_runtime {
true => (
@ -197,8 +229,8 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
dht_config: None,
peer_id: None,
peer_opts: Some(PeerConnectionOptions {
connect_timeout: opts.peer_connect_timeout,
read_write_timeout: opts.peer_read_write_timeout,
connect_timeout: Some(opts.peer_connect_timeout),
read_write_timeout: Some(opts.peer_read_write_timeout),
..Default::default()
}),
};
@ -212,8 +244,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
info!("[{}] initializing", idx);
},
ManagedTorrentState::Running(handle) => {
let peer_stats = handle.torrent_state().peer_stats_snapshot();
let stats = handle.torrent_state().stats_snapshot();
let stats = timeit("stats_snapshot", || handle.torrent_state().stats_snapshot());
let speed = handle.speed_estimator();
let total = stats.total_bytes;
let progress = stats.total_bytes - stats.remaining_bytes;
@ -223,7 +254,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
(progress as f64 / total as f64) * 100f64
};
info!(
"[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}",
"[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}, dead: {}}}",
idx,
downloaded_pct,
SF::new(progress),
@ -232,10 +263,11 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
SF::new(stats.remaining_bytes),
SF::new(total),
SF::new(stats.uploaded_bytes),
peer_stats.live,
peer_stats.connecting,
peer_stats.queued,
peer_stats.seen,
stats.peer_stats.live,
stats.peer_stats.connecting,
stats.peer_stats.queued,
stats.peer_stats.seen,
stats.peer_stats.dead,
);
},
}
@ -257,7 +289,10 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
.await
.context("error initializing rqbit session")?,
);
spawn("Stats printer", stats_printer(session.clone()));
spawn(
span!(Level::TRACE, "stats_printer"),
stats_printer(session.clone()),
);
let http_api = HttpApi::new(session);
let http_api_listen_addr = opts.http_api_listen_addr;
http_api
@ -330,11 +365,14 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
.await
.context("error initializing rqbit session")?,
);
spawn("Stats printer", stats_printer(session.clone()));
spawn(
span!(Level::TRACE, "stats_printer"),
stats_printer(session.clone()),
);
let http_api = HttpApi::new(session.clone());
let http_api_listen_addr = opts.http_api_listen_addr;
spawn(
"HTTP API",
span!(Level::ERROR, "http_api"),
http_api.clone().make_http_api_and_run(http_api_listen_addr),
);

View file

@ -1,8 +1,9 @@
// Wrapper for sha1 libraries.
// Sha1 computation is the majority of CPU usage of this library.
// openssl seems 2-3x faster, so using it for now, but
// leaving the pure-rust impl here too. Maybe someday make them
// runtime swappable or enabled with a feature.
// Wrapper for sha1 libraries to be able to swap them easily,
// e.g. to measure performance, or change implementations depending on platform.
//
// Sha1 computation is the majority of CPU usage of librqbit.
// openssl is 2-3x faster than rust's sha1.
// system library is the best choice probably (it's the default anyway).
#[cfg(feature = "sha1-openssl")]
pub type Sha1 = Sha1Openssl;