commit 87d6fe27ce21533dead7903d963e485f49e324a2 Author: Igor Katson Date: Fri Jun 25 13:47:51 2021 +0100 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..992e9c1 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,1403 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + +[[package]] +name = "anyhow" +version = "1.0.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15af2628f6890fe2609a3b91bef4c83450512802e59489f9c1cb1fa5df064a61" + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + +[[package]] +name = "base64" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" + +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + +[[package]] +name = "bitflags" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" + +[[package]] +name = "bitvec" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5237f00a8c86130a0cc317830e558b966dd7850d48a953d998c813f01a41b527" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + +[[package]] +name = "bumpalo" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" + +[[package]] +name = "cc" +version = "1.0.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a72c244c1ff497a746a7e1fb3d14bd08420ecda70c8f25c7112f2781652d787" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "clap" +version = "3.0.0-beta.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd1061998a501ee7d4b6d449020df3266ca3124b941ec56cf2005c3779ca142" +dependencies = [ + "atty", + "bitflags", + "clap_derive", + "indexmap", + "lazy_static", + "os_str_bytes", + "strsim", + "termcolor", + "textwrap", + "unicode-width", + "vec_map", +] + +[[package]] +name = "clap_derive" +version = "3.0.0-beta.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "370f715b81112975b1b69db93e0b56ea4cd4e5002ac43b2da8474106a54096a1" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "core-foundation" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a89e2ae426ea83155dccf10c0fa6b1463ef6d5fcb44cee0b224a408fa640a62" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" + +[[package]] +name = "encoding_rs" +version = "0.8.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80df024fbc5ac80f87dfef0d9f5209a252f2a497f7f42944cff24d8253cac065" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "env_logger" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + +[[package]] +name = "form_urlencoded" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" +dependencies = [ + "matches", + "percent-encoding", +] + +[[package]] +name = "funty" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1847abb9cb65d566acd5942e94aea9c8f547ad02c98e1649326fc0e8910b8b1e" + +[[package]] +name = "futures" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7e43a803dae2fa37c1f6a8fe121e1f7bf9548b4dfc0522a42f34145dadfc27" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e682a68b29a882df0545c143dc3646daefe80ba479bcdede94d5a703de2871e2" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1" + +[[package]] +name = "futures-executor" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "badaa6a909fac9e7236d0620a2f57f7664640c56575b71a7552fbd68deafab79" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1" + +[[package]] +name = "futures-macro" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c40298486cdf52cc00cd6d6987892ba502c7656a16a4192a9992b1ccedd121" +dependencies = [ + "autocfg", + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a57bead0ceff0d6dde8f465ecd96c9338121bb7717d3e7b108059531870c4282" + +[[package]] +name = "futures-task" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a16bef9fc1a4dddb5bee51c989e3fbba26569cbb0e31f5b303c184e3dd33dae" + +[[package]] +name = "futures-util" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967" +dependencies = [ + "autocfg", + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "h2" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "825343c4eef0b63f541f8903f395dc5beb362a979b5799a84062527ef1e37726" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" + +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "http" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68" + +[[package]] +name = "httpdate" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" + +[[package]] +name = "humantime" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" +dependencies = [ + "quick-error", +] + +[[package]] +name = "hyper" +version = "0.14.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07d6baa1b441335f3ce5098ac421fb6547c46dda735ca1bc6d0153c838f9dd83" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + +[[package]] +name = "idna" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "indexmap" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3" +dependencies = [ + "autocfg", + "hashbrown", +] + +[[package]] +name = "instant" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "ipnet" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" + +[[package]] +name = "itoa" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" + +[[package]] +name = "js-sys" +version = "0.3.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83bdfbace3a0e81a4253f73b49e960b053e396a11012cbd49b9b74d6a2b67062" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12b8adadd720df158f4d70dfe7ccc6adb0472d7c55ca83445f6a5ab3e36f8fb6" + +[[package]] +name = "librqbit" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "bitvec", + "byteorder", + "futures", + "log", + "parking_lot", + "reqwest", + "serde", + "sha1", + "tokio", + "urlencoding", + "uuid", +] + +[[package]] +name = "lock_api" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0382880606dff6d15c9476c416d18690b72742aa7b605bb6dd6ec9030fbf07eb" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "matches" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" + +[[package]] +name = "memchr" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" + +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + +[[package]] +name = "mio" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + +[[package]] +name = "native-tls" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8d96b2e1c8da3957d58100b09f102c6d9cfdfced01b7ec5a8974044bb09dbd4" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi", +] + +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" + +[[package]] +name = "openssl" +version = "0.10.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "549430950c79ae24e6d02e0b7404534ecf311d94cc9f861e9e4020187d13d885" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-sys", +] + +[[package]] +name = "openssl-probe" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" + +[[package]] +name = "openssl-sys" +version = "0.9.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a7907e3bfa08bb85105209cdfcb6c63d109f8f6c1ed6ca318fff5c1853fbc1d" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "os_str_bytes" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afb2e1c3ee07430c2cf76151675e583e0f19985fa6efae47d6848a3e2c824f85" + +[[package]] +name = "parking_lot" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + +[[package]] +name = "pin-project-lite" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc0e1f259c92177c30a4c9d177246edd0a3568b25756a977d0632cf8fa37e905" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "pkg-config" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" + +[[package]] +name = "ppv-lite86" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" + +[[package]] +name = "pretty_env_logger" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "926d36b9553851b8b0005f1275891b392ee4d2d833852c417ed025477350fb9d" +dependencies = [ + "env_logger", + "log", +] + +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" + +[[package]] +name = "proc-macro2" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0d8caf72986c1a598726adc988bb5984792ef84f5ee5aa50209145ee8077038" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + +[[package]] +name = "quote" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "radium" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "643f8f41a8ebc4c5dc4515c82bb8abd397b527fc20fd681b7c011c2aee5d44fb" + +[[package]] +name = "rand" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", + "rand_hc", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" +dependencies = [ + "rand_core", +] + +[[package]] +name = "redox_syscall" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ab49abadf3f9e1c4bc499e8845e152ad87d2ad2d30371841171169e9d75feee" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + +[[package]] +name = "reqwest" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "246e9f61b9bb77df069a947682be06e31ac43ea37862e244a69f177694ea6d22" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "native-tls", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_urlencoded", + "tokio", + "tokio-native-tls", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + +[[package]] +name = "rqbit" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "futures", + "librqbit", + "log", + "pretty_env_logger", + "reqwest", + "tokio", +] + +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + +[[package]] +name = "schannel" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +dependencies = [ + "lazy_static", + "winapi", +] + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "security-framework" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23a2ac85147a3a11d77ecf1bc7166ec0b92febfa4461c37944e180f319ece467" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e4effb91b4b8b6fb7732e670b6cee160278ff8e6bf485c7805d9e319d76e284" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "serde" +version = "1.0.126" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.126" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "799e97dc9fdae36a5c8b8f2cae9ce2ee9fdce2058c57a93e6099d919fd982f79" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "sha1" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" + +[[package]] +name = "slab" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527" + +[[package]] +name = "smallvec" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" + +[[package]] +name = "socket2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dfc207c526015c632472a77be09cf1b6e46866581aecae5cc38fb4235dea2" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + +[[package]] +name = "syn" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f71489ff30030d2ae598524f61326b902466f72a0fb1a8564c001cc63425bcc7" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + +[[package]] +name = "tempfile" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" +dependencies = [ + "cfg-if", + "libc", + "rand", + "redox_syscall", + "remove_dir_all", + "winapi", +] + +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "203008d98caf094106cfaba70acfed15e18ed3ddb7d94e49baec153a2b462789" +dependencies = [ + "unicode-width", +] + +[[package]] +name = "tinyvec" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b5220f05bb7de7f3f53c7c065e1199b3172696fe2db9f9c4d8ad9b4ee74c342" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" + +[[package]] +name = "tokio" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fb2ed024293bb19f7a5dc54fe83bf86532a44c12a2bb8ba40d64a4509395ca2" +dependencies = [ + "autocfg", + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "pin-project-lite", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-native-tls" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + +[[package]] +name = "tracing" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9ff14f98b1a4b289c6248a023c1c2fa1491062964e9fed67ab29c4e4da4a052" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + +[[package]] +name = "unicode-bidi" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eeb8be209bb1c96b7c177c7420d26e04eccacb0eeae6b980e35fcb74678107e0" +dependencies = [ + "matches", +] + +[[package]] +name = "unicode-normalization" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-segmentation" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796" + +[[package]] +name = "unicode-width" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" + +[[package]] +name = "unicode-xid" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" + +[[package]] +name = "url" +version = "2.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c" +dependencies = [ + "form_urlencoded", + "idna", + "matches", + "percent-encoding", +] + +[[package]] +name = "urlencoding" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a1f0175e03a0973cf4afd476bef05c26e228520400eb1fd473ad417b1c00ffb" + +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom", +] + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + +[[package]] +name = "version_check" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" + +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + +[[package]] +name = "wasm-bindgen" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d54ee1d4ed486f78874278e63e4069fc1ab9f6a18ca492076ffb90c5eb2997fd" +dependencies = [ + "cfg-if", + "serde", + "serde_json", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b33f6a0694ccfea53d94db8b2ed1c3a8a4c86dd936b13b9f0a15ec4a451b900" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fba7978c679d53ce2d0ac80c8c175840feb849a161664365d1287b41f2e67f1" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "088169ca61430fe1e58b8096c24975251700e7b1f6fd91cc9d59b04fb9b18bd4" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be2241542ff3d9f241f5e2cb6dd09b37efe786df8851c54957683a49f0987a97" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7cff876b8f18eed75a66cf49b65e7f967cb354a7aa16003fb55dbfd25b44b4f" + +[[package]] +name = "web-sys" +version = "0.3.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e828417b379f3df7111d3a2a9e5753706cae29c41f7c4029ee9fd77f3e09e582" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "winreg" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" +dependencies = [ + "winapi", +] + +[[package]] +name = "wyz" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "129e027ad65ce1453680623c3fb5163cbf7107bfe1aa32257e7d0e63f9ced188" +dependencies = [ + "tap", +] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..d10cb58 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "rqbit" +version = "0.1.0" +authors = ["Igor Katson "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +librqbit = {path="./crates/librqbit"} +tokio = {version = "1", features = ["macros", "rt-multi-thread"]} +anyhow = "1" +clap = "3.0.0-beta.2" +log = "0.4" +pretty_env_logger = "0.4" +reqwest = "0.11" + +[dev-dependencies] +futures = {version = "0.3"} + +[profile.dev] +panic = "abort" + +[profile.release] +panic = "abort" \ No newline at end of file diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml new file mode 100644 index 0000000..4ef96de --- /dev/null +++ b/crates/librqbit/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "librqbit" +version = "0.1.0" +authors = ["Igor Katson "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = {version = "1", features = ["macros", "rt-multi-thread"]} +serde = {version = "1", features=["derive"]} +anyhow = "1" +sha1 = "0.6" +reqwest = "0.11" +urlencoding = "1" +byteorder = "1" +bincode = "1" +bitvec = "0.22" +parking_lot = "0.11" +log = "0.4" + +uuid = {version = "0.8", features = ["v4"]} +futures = "0.3" + +[dev-dependencies] +futures = {version = "0.3"} +pretty_env_logger = "0.4" + +[profile.dev] +panic = "abort" + +[profile.release] +panic = "abort" \ No newline at end of file diff --git a/crates/librqbit/resources/ubuntu-21.04-desktop-amd64.iso.torrent b/crates/librqbit/resources/ubuntu-21.04-desktop-amd64.iso.torrent new file mode 100644 index 0000000..ba26dc9 Binary files /dev/null and b/crates/librqbit/resources/ubuntu-21.04-desktop-amd64.iso.torrent differ diff --git a/crates/librqbit/resources/ubuntu-21.04-live-server-amd64.iso.torrent b/crates/librqbit/resources/ubuntu-21.04-live-server-amd64.iso.torrent new file mode 100644 index 0000000..0e13b40 Binary files /dev/null and b/crates/librqbit/resources/ubuntu-21.04-live-server-amd64.iso.torrent differ diff --git a/crates/librqbit/src/buffers.rs b/crates/librqbit/src/buffers.rs new file mode 100644 index 0000000..1ffa1a4 --- /dev/null +++ b/crates/librqbit/src/buffers.rs @@ -0,0 +1,112 @@ +use serde::Deserialize; + +use crate::clone_to_owned::CloneToOwned; + +#[derive(PartialEq, Eq, Hash, Clone)] +pub struct ByteString(pub Vec); + +impl std::fmt::Debug for ByteString { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.0.iter().all(|b| *b == 0) { + return write!(f, "<{} bytes, all zeroes>", self.0.len()); + } + match std::str::from_utf8(self.0.as_slice()) { + Ok(bytes) => bytes.fmt(f), + Err(_e) => write!(f, "<{} bytes>", self.0.len()), + } + } +} + +#[derive(Deserialize, PartialEq, Eq, Hash, Clone)] +#[serde(transparent)] +pub struct ByteBuf<'a>(pub &'a [u8]); + +impl<'a> ByteBuf<'a> { + pub fn as_bytes(&'a self) -> &'a [u8] { + self.0 + } +} + +fn debug_raw_bytes(b: &[u8], f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "<{} bytes>", b.len()) +} + +impl<'a> std::fmt::Debug for ByteBuf<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.0.iter().all(|b| *b == 0) { + return write!(f, "<{} bytes, all zeroes>", self.0.len()); + } + match std::str::from_utf8(self.0) { + Ok(bytes) => bytes.fmt(f), + Err(_e) => debug_raw_bytes(&self.0, f), + } + } +} + +impl<'a> std::fmt::Display for ByteBuf<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.0.iter().all(|b| *b == 0) { + return write!(f, "<{} bytes, all zeroes>", self.0.len()); + } + match std::str::from_utf8(self.0) { + Ok(bytes) => f.write_str(bytes), + Err(_e) => debug_raw_bytes(&self.0, f), + } + } +} + +impl<'a> CloneToOwned for ByteBuf<'a> { + type Target = ByteString; + + fn clone_to_owned(&self) -> Self::Target { + ByteString(self.0.into()) + } +} + +impl CloneToOwned for ByteString { + type Target = ByteString; + + fn clone_to_owned(&self) -> Self::Target { + self.clone() + } +} + +impl<'a> std::convert::AsRef<[u8]> for ByteBuf<'a> { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl std::convert::AsRef<[u8]> for ByteString { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl<'a> std::ops::Deref for ByteBuf<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::Deref for ByteString { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a> From<&'a [u8]> for ByteBuf<'a> { + fn from(b: &'a [u8]) -> Self { + Self(b) + } +} + +impl<'a> From<&'a [u8]> for ByteString { + fn from(b: &'a [u8]) -> Self { + Self(b.into()) + } +} diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs new file mode 100644 index 0000000..24e47a7 --- /dev/null +++ b/crates/librqbit/src/chunk_tracker.rs @@ -0,0 +1,71 @@ +use log::{debug, info}; + +use crate::{ + buffers::ByteString, + lengths::{Lengths, ValidPieceIndex}, + peer_comms::Piece, + type_aliases::BF, +}; + +pub struct ChunkTracker { + needed_pieces: BF, + chunk_status: BF, + lengths: Lengths, +} + +fn compute_chunk_status(lengths: &Lengths, needed_pieces: &BF) -> BF { + let required_bits = lengths.total_chunks(); + let required_size = (required_bits as usize + 1) / 8; + let vec = vec![0u8; required_size]; + let mut chunk_bf = BF::from_vec(vec); + for bit in needed_pieces.iter_zeros() { + let offset = bit * 8; + for i in 0..8 { + chunk_bf.set(offset + i, true); + } + } + chunk_bf +} + +impl ChunkTracker { + pub fn new(needed_pieces: BF, lengths: Lengths) -> Self { + Self { + chunk_status: compute_chunk_status(&lengths, &needed_pieces), + needed_pieces, + lengths, + } + } + pub fn get_needed_pieces(&self) -> &BF { + &self.needed_pieces + } + pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) { + self.needed_pieces.set(index.get() as usize, false) + } + pub fn mark_piece_needed(&mut self, index: ValidPieceIndex) -> bool { + info!("remarking piece={} as needed", index); + self.needed_pieces.set(index.get() as usize, true); + self.chunk_status + .get_mut(self.lengths.chunk_range(index)) + .map(|s| { + s.set_all(false); + true + }) + .unwrap_or_default() + } + + // return true if the whole piece is marked downloaded + pub fn mark_chunk_downloaded(&mut self, piece: &Piece) -> Option { + let chunk_info = self.lengths.chunk_info_from_received_piece_data(piece)?; + self.chunk_status + .set(chunk_info.absolute_index as usize, true); + let chunk_range = self.lengths.chunk_range(chunk_info.piece_index); + let chunk_range = self.chunk_status.get(chunk_range).unwrap(); + let all = chunk_range.all(); + + debug!( + "piece={}, chunk_info={:?}, bits={:?}", + piece.index, chunk_info, chunk_range, + ); + Some(all) + } +} diff --git a/crates/librqbit/src/clone_to_owned.rs b/crates/librqbit/src/clone_to_owned.rs new file mode 100644 index 0000000..36b76c2 --- /dev/null +++ b/crates/librqbit/src/clone_to_owned.rs @@ -0,0 +1,27 @@ +pub trait CloneToOwned { + type Target; + + fn clone_to_owned(&self) -> Self::Target; +} + +impl CloneToOwned for Option +where + T: CloneToOwned, +{ + type Target = Option<::Target>; + + fn clone_to_owned(&self) -> Self::Target { + self.as_ref().map(|i| i.clone_to_owned()) + } +} + +impl CloneToOwned for Vec +where + T: CloneToOwned, +{ + type Target = Vec<::Target>; + + fn clone_to_owned(&self) -> Self::Target { + self.iter().map(|i| i.clone_to_owned()).collect() + } +} diff --git a/crates/librqbit/src/constants.rs b/crates/librqbit/src/constants.rs new file mode 100644 index 0000000..e0ad7b2 --- /dev/null +++ b/crates/librqbit/src/constants.rs @@ -0,0 +1 @@ +pub const CHUNK_SIZE: u32 = 16384; diff --git a/crates/librqbit/src/lengths.rs b/crates/librqbit/src/lengths.rs new file mode 100644 index 0000000..3e7c8dd --- /dev/null +++ b/crates/librqbit/src/lengths.rs @@ -0,0 +1,268 @@ +use crate::{buffers::ByteString, constants::CHUNK_SIZE, peer_comms::Piece}; + +const fn is_power_of_two(x: u64) -> bool { + (x != 0) && ((x & (x - 1)) == 0) +} + +pub const fn ceil_div_u64(a: u64, b: u64) -> u64 { + (a + b - 1) / b +} + +pub const fn last_element_size_u64(total: u64, chunk_size: u64) -> u64 { + let rem = total % chunk_size; + if rem == 0 { + return chunk_size; + } + rem +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ChunkInfo { + pub piece_index: ValidPieceIndex, + pub chunk_index: u32, + pub absolute_index: u32, + pub size: u32, + pub offset: u32, +} + +#[derive(Debug, Clone, Copy)] +pub struct Lengths { + chunk_length: u32, + total_length: u64, + piece_length: u32, + last_piece_id: u32, + last_piece_length: u32, + chunks_per_piece: u32, +} + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct ValidPieceIndex(u32); +impl std::fmt::Display for ValidPieceIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} +impl std::fmt::Debug for ValidPieceIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } +} + +impl ValidPieceIndex { + pub fn get(&self) -> u32 { + self.0 + } +} + +impl Lengths { + pub fn new( + total_length: u64, + piece_length: u32, + chunk_length: Option, + ) -> anyhow::Result { + let chunk_length = chunk_length.unwrap_or(CHUNK_SIZE); + if !(is_power_of_two(piece_length as u64)) { + anyhow::bail!("piece length {} is not a power of 2", piece_length); + } + if !(is_power_of_two(chunk_length as u64)) { + anyhow::bail!("chunk length {} is not a power of 2", chunk_length); + } + if chunk_length >= piece_length { + anyhow::bail!( + "chunk length {} should be smaller than pice length {}", + chunk_length, + piece_length + ); + } + Ok(Self { + chunk_length, + piece_length, + total_length, + chunks_per_piece: piece_length / chunk_length, + last_piece_id: ((total_length + 1) / piece_length as u64) as u32, + last_piece_length: last_element_size_u64(total_length, piece_length as u64) as u32, + }) + } + pub const fn piece_bitfield_bytes(&self) -> usize { + ceil_div_u64(self.total_pieces() as u64, 8) as usize + } + pub const fn total_length(&self) -> u64 { + self.total_length + } + pub const fn validate_piece_index(&self, index: u32) -> Option { + if index > self.last_piece_id { + return None; + } + Some(ValidPieceIndex(index)) + } + pub const fn default_piece_length(&self) -> u32 { + self.piece_length + } + pub const fn default_chunk_length(&self) -> u32 { + self.chunk_length + } + pub const fn total_chunks(&self) -> u32 { + ceil_div_u64(self.total_length, self.chunk_length as u64) as u32 + } + pub const fn total_pieces(&self) -> u32 { + self.last_piece_id + 1 + } + pub const fn piece_length(&self, index: ValidPieceIndex) -> u32 { + if index.0 == self.last_piece_id { + return self.last_piece_length; + } + self.piece_length + } + pub const fn piece_offset(&self, index: ValidPieceIndex) -> u64 { + index.0 as u64 * self.piece_length as u64 + } + pub fn iter_chunk_infos(&self, index: ValidPieceIndex) -> impl Iterator { + let mut remaining = self.piece_length(index); + let chunk_size = self.chunk_length; + let absolute_offset = index.0 * self.chunks_per_piece; + (0u32..).scan(0, move |offset, idx| { + if remaining == 0 { + return None; + } + let s = std::cmp::min(remaining, chunk_size); + let result = ChunkInfo { + piece_index: index, + chunk_index: idx, + absolute_index: absolute_offset + idx, + size: s, + offset: *offset, + }; + *offset += s; + remaining -= s; + Some(result) + }) + } + + pub fn chunk_info_from_received_piece_data( + &self, + piece: &Piece, + ) -> Option { + let piece_index = self.validate_piece_index(piece.index)?; + let index = piece.begin / self.chunk_length; + let chunk_size = self.chunk_size(piece_index, index)?; + let offset = self.chunk_offset_in_piece(piece_index, index)?; + if offset != piece.begin { + return None; + } + if chunk_size as usize != piece.block.len() { + return None; + } + let absolute_index = self.chunks_per_piece * piece_index.get() + index; + Some(ChunkInfo { + piece_index, + chunk_index: index, + size: chunk_size, + offset, + absolute_index, + }) + } + pub const fn chunk_range(&self, index: ValidPieceIndex) -> std::ops::Range { + let start = index.0 * self.chunks_per_piece; + let end = start + self.chunks_per_piece(index); + start as usize..end as usize + } + pub const fn chunks_per_piece(&self, index: ValidPieceIndex) -> u32 { + if index.0 == self.last_piece_id { + return (self.last_piece_length + self.chunk_length - 1) / self.chunk_length; + } + self.chunks_per_piece + } + pub const fn chunk_offset_in_piece( + &self, + piece_index: ValidPieceIndex, + chunk_index: u32, + ) -> Option { + if chunk_index >= self.chunks_per_piece(piece_index) { + return None; + } + Some(chunk_index * self.chunk_length) + } + pub fn chunk_size(&self, piece_index: ValidPieceIndex, chunk_index: u32) -> Option { + let chunks_per_piece = self.chunks_per_piece(piece_index); + let pl = self.piece_length(piece_index); + if chunk_index >= chunks_per_piece { + return None; + } + let offset = chunk_index * self.chunk_length; + Some(std::cmp::min(self.chunk_length, pl - offset)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_lengths() -> Lengths { + Lengths::new(1174243328, 262144, None).unwrap() + } + + #[test] + fn test_total_pieces() { + let l = make_lengths(); + assert_eq!(l.total_pieces(), 4480); + } + + #[test] + fn test_piece_length() { + let l = make_lengths(); + let p = l.validate_piece_index(4479).unwrap(); + + assert_eq!(l.piece_length(l.validate_piece_index(0).unwrap()), 262144); + assert_eq!(l.piece_length(p), 100352); + } + + #[test] + fn test_chunks_in_piece() { + let l = make_lengths(); + let p = l.validate_piece_index(4479).unwrap(); + + assert_eq!(l.chunks_per_piece(l.validate_piece_index(0).unwrap()), 16); + assert_eq!(l.chunks_per_piece(p), 7); + } + + #[test] + fn test_chunk_size() { + let l = make_lengths(); + let p = l.validate_piece_index(4479).unwrap(); + + assert_eq!(l.chunk_size(p, 0), Some(16384)); + assert_eq!(l.chunk_size(p, 6), Some(2048)); + } + + #[test] + fn test_chunk_infos() { + let l = make_lengths(); + let p = l.validate_piece_index(4479).unwrap(); + + let mut it = l.iter_chunk_infos(p); + let first = it.next().unwrap(); + let last = it.last().unwrap(); + + assert_eq!( + first, + ChunkInfo { + piece_index: p, + chunk_index: 0, + absolute_index: 71664, + size: 16384, + offset: 0, + } + ); + + assert_eq!( + last, + ChunkInfo { + piece_index: p, + chunk_index: 6, + absolute_index: 71670, + size: 2048, + offset: 98304, + } + ); + } +} diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs new file mode 100644 index 0000000..1267243 --- /dev/null +++ b/crates/librqbit/src/lib.rs @@ -0,0 +1,12 @@ +pub mod buffers; +pub mod chunk_tracker; +pub mod clone_to_owned; +pub mod constants; +pub mod lengths; +pub mod peer_comms; +pub mod peer_id; +pub mod serde_bencode; +pub mod torrent_manager; +pub mod torrent_metainfo; +pub mod tracker_comms; +pub mod type_aliases; diff --git a/crates/librqbit/src/peer_comms.rs b/crates/librqbit/src/peer_comms.rs new file mode 100644 index 0000000..bde0409 --- /dev/null +++ b/crates/librqbit/src/peer_comms.rs @@ -0,0 +1,454 @@ +use bincode::Options; +use byteorder::ByteOrder; +use serde::{Deserialize, Serialize}; + +use crate::{ + buffers::{ByteBuf, ByteString}, + clone_to_owned::CloneToOwned, +}; + +const PREAMBLE_LEN: usize = 5; +const NO_PAYLOAD_MSG_LEN: usize = PREAMBLE_LEN; + +const PSTR_BT1: &str = "BitTorrent protocol"; + +const LEN_PREFIX_KEEPALIVE: u32 = 0; +const LEN_PREFIX_CHOKE: u32 = 1; +const LEN_PREFIX_UNCHOKE: u32 = 1; +const LEN_PREFIX_INTERESTED: u32 = 1; +const LEN_PREFIX_NOT_INTERESTED: u32 = 1; +const LEN_PREFIX_HAVE: u32 = 5; +const LEN_PREFIX_REQUEST: u32 = 13; + +const MSGID_CHOKE: u8 = 0; +const MSGID_UNCHOKE: u8 = 1; +const MSGID_INTERESTED: u8 = 2; +const MSGID_NOT_INTERESTED: u8 = 3; +const MSGID_HAVE: u8 = 4; +const MSGID_BITFIELD: u8 = 5; +const MSGID_REQUEST: u8 = 6; +const MSGID_PIECE: u8 = 7; + +#[derive(Debug)] +pub enum MessageDeserializeError { + NotEnoughData(usize, &'static str), + UnsupportedMessageId(u8), + IncorrectLenPrefix { + received: u32, + expected: u32, + msg_id: u8, + }, + OtherBincode { + error: bincode::Error, + msg_id: u8, + len_prefix: u32, + name: &'static str, + }, +} + +#[derive(Debug)] +pub struct Piece { + pub index: u32, + pub begin: u32, + pub block: ByteBuf, +} + +impl Piece +where + ByteBuf: AsRef<[u8]>, +{ + pub fn serialize(&self, buf: &mut [u8]) -> usize { + byteorder::BigEndian::write_u32(&mut buf[0..4], self.index); + byteorder::BigEndian::write_u32(&mut buf[4..8], self.begin); + (&mut buf[8..8 + self.block.as_ref().len()]).copy_from_slice(self.block.as_ref()); + self.block.as_ref().len() + 8 + } + pub fn deserialize<'a>(buf: &'a [u8]) -> Piece + where + ByteBuf: From<&'a [u8]> + 'a, + { + let index = byteorder::BigEndian::read_u32(&buf[0..4]); + let begin = byteorder::BigEndian::read_u32(&buf[4..8]); + let block = ByteBuf::from(&buf[8..]); + Piece { + index, + begin, + block, + } + } +} + +impl std::fmt::Display for MessageDeserializeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + MessageDeserializeError::NotEnoughData(b, name) => { + write!( + f, + "not enough data to deserialize {}: expected at least {} more bytes", + name, b + ) + } + MessageDeserializeError::UnsupportedMessageId(msg_id) => { + write!(f, "unsupported message id {}", msg_id) + } + MessageDeserializeError::IncorrectLenPrefix { + received, + expected, + msg_id, + } => write!( + f, + "incorrect len prefix for message id {}, expected {}, received {}", + msg_id, expected, received + ), + MessageDeserializeError::OtherBincode { + error, + msg_id, + name, + len_prefix, + } => write!( + f, + "error deserializing {} (msg_id={}, len_prefix={}): {:?}", + name, msg_id, len_prefix, error + ), + } + } +} + +impl std::error::Error for MessageDeserializeError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + MessageDeserializeError::OtherBincode { error, .. } => Some(error), + _ => None, + } + } +} + +#[derive(Debug)] +pub enum Message { + Request(Request), + Bitfield(ByteBuf), + KeepAlive, + Have(u32), + Choke, + Unchoke, + Interested, + NotInterested, + Piece(Piece), +} + +pub type MessageBorrowed<'a> = Message>; +pub type MessageOwned = Message; + +pub type BitfieldBorrowed<'a> = &'a bitvec::slice::BitSlice; +pub type BitfieldOwned = bitvec::vec::BitVec; + +pub struct Bitfield<'a> { + pub data: BitfieldBorrowed<'a>, +} + +impl CloneToOwned for Message { + type Target = Message<::Target>; + + fn clone_to_owned(&self) -> Self::Target { + match self { + Message::Request(req) => Message::Request(*req), + Message::Bitfield(b) => Message::Bitfield(b.clone_to_owned()), + Message::Choke => Message::Choke, + Message::Unchoke => Message::Unchoke, + Message::Interested => Message::Interested, + Message::Piece(piece) => Message::Piece(Piece { + index: piece.index, + begin: piece.begin, + block: piece.block.clone_to_owned(), + }), + Message::KeepAlive => Message::KeepAlive, + Message::Have(v) => Message::Have(*v), + Message::NotInterested => Message::NotInterested, + } + } +} + +impl<'a> Bitfield<'a> { + pub fn new_from_slice(buf: &'a [u8]) -> anyhow::Result { + Ok(Self { + data: bitvec::slice::BitSlice::from_slice(buf)?, + }) + } +} + +impl<'a> std::fmt::Debug for Bitfield<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Bitfield") + .field("_ones", &self.data.count_ones()) + .field("_len", &self.data.len()) + .finish() + } +} + +impl Message +where + ByteBuf: AsRef<[u8]>, +{ + pub fn len_prefix_and_msg_id(&self) -> (u32, u8) { + match self { + Message::Request(_) => (LEN_PREFIX_REQUEST, MSGID_REQUEST), + Message::Bitfield(b) => (1 + b.as_ref().len() as u32, MSGID_BITFIELD), + Message::Choke => (LEN_PREFIX_CHOKE, MSGID_CHOKE), + Message::Unchoke => (LEN_PREFIX_UNCHOKE, MSGID_UNCHOKE), + Message::Interested => (LEN_PREFIX_INTERESTED, MSGID_INTERESTED), + Message::NotInterested => (LEN_PREFIX_NOT_INTERESTED, MSGID_NOT_INTERESTED), + Message::Piece(p) => (9 + p.block.as_ref().len() as u32, MSGID_PIECE), + Message::KeepAlive => (LEN_PREFIX_KEEPALIVE, 0), + Message::Have(_) => (LEN_PREFIX_HAVE, MSGID_HAVE), + } + } + pub fn serialize(&self, out: &mut Vec) -> usize { + let (lp, msg_id) = self.len_prefix_and_msg_id(); + + out.resize(PREAMBLE_LEN, 0); + + byteorder::BigEndian::write_u32(&mut out[..4], lp); + out[4] = msg_id; + + let ser = bopts(); + + match self { + Message::Request(request) => { + const MSG_LEN: usize = PREAMBLE_LEN + 12; + out.resize(MSG_LEN, 0); + debug_assert_eq!((&out[PREAMBLE_LEN..]).len(), 12); + ser.serialize_into(&mut out[PREAMBLE_LEN..], request) + .unwrap(); + MSG_LEN + } + Message::Bitfield(_) => todo!(), + Message::Choke | Message::Unchoke | Message::Interested => PREAMBLE_LEN, + Message::Piece(p) => { + let msg_len = PREAMBLE_LEN + 8 + p.block.as_ref().len(); + out.resize(msg_len, 0); + p.serialize(&mut out[PREAMBLE_LEN..(8 + p.block.as_ref().len())]); + msg_len + } + Message::KeepAlive => 4, + Message::Have(v) => { + let msg_len = PREAMBLE_LEN + 4; + out.resize(msg_len, 0); + byteorder::BE::write_u32(&mut out[PREAMBLE_LEN..], *v); + msg_len + } + Message::NotInterested => todo!(), + } + } + pub fn deserialize<'a>( + buf: &'a [u8], + ) -> Result<(Message, usize), MessageDeserializeError> + where + ByteBuf: From<&'a [u8]> + 'a, + { + let len_prefix = match buf.get(0..4) { + Some(bytes) => byteorder::BigEndian::read_u32(bytes), + None => return Err(MessageDeserializeError::NotEnoughData(4, "message")), + }; + if len_prefix == 0 { + return Ok((Message::KeepAlive, 4)); + } + + let msg_id = match buf.get(4) { + Some(msg_id) => *msg_id, + None => return Err(MessageDeserializeError::NotEnoughData(1, "message")), + }; + let rest = &buf[5..]; + let decoder_config = bincode::DefaultOptions::new() + .with_fixint_encoding() + .with_big_endian(); + + match msg_id { + MSGID_CHOKE => { + if len_prefix != LEN_PREFIX_CHOKE { + return Err(MessageDeserializeError::IncorrectLenPrefix { + received: len_prefix, + expected: LEN_PREFIX_CHOKE, + msg_id, + }); + } + Ok((Message::Choke, NO_PAYLOAD_MSG_LEN)) + } + MSGID_UNCHOKE => { + if len_prefix != LEN_PREFIX_UNCHOKE { + return Err(MessageDeserializeError::IncorrectLenPrefix { + received: len_prefix, + expected: LEN_PREFIX_UNCHOKE, + msg_id, + }); + } + Ok((Message::Unchoke, NO_PAYLOAD_MSG_LEN)) + } + MSGID_INTERESTED => { + if len_prefix != LEN_PREFIX_INTERESTED { + return Err(MessageDeserializeError::IncorrectLenPrefix { + received: len_prefix, + expected: LEN_PREFIX_INTERESTED, + msg_id, + }); + } + Ok((Message::Interested, NO_PAYLOAD_MSG_LEN)) + } + MSGID_NOT_INTERESTED => { + if len_prefix != LEN_PREFIX_NOT_INTERESTED { + return Err(MessageDeserializeError::IncorrectLenPrefix { + received: len_prefix, + expected: LEN_PREFIX_NOT_INTERESTED, + msg_id, + }); + } + Ok((Message::NotInterested, NO_PAYLOAD_MSG_LEN)) + } + MSGID_HAVE => { + let expected_len = 4; + match rest.get(..expected_len as usize) { + Some(h) => Ok(( + Message::Have(byteorder::BE::read_u32(&h)), + PREAMBLE_LEN + expected_len, + )), + None => { + let missing = expected_len - rest.len(); + Err(MessageDeserializeError::NotEnoughData(missing, "have")) + } + } + } + MSGID_BITFIELD => { + if len_prefix <= 1 { + return Err(MessageDeserializeError::IncorrectLenPrefix { + expected: 2, + received: len_prefix, + msg_id, + }); + } + let expected_len = len_prefix as usize - 1; + match rest.get(..expected_len as usize) { + Some(bitfield) => Ok(( + Message::Bitfield(ByteBuf::from(bitfield)), + PREAMBLE_LEN + expected_len, + )), + None => { + let missing = expected_len - rest.len(); + Err(MessageDeserializeError::NotEnoughData(missing, "bitfield")) + } + } + } + MSGID_REQUEST => { + let expected_len = 12; + match rest.get(..expected_len as usize) { + Some(b) => { + let request = decoder_config.deserialize::(&b).unwrap(); + Ok((Message::Request(request), PREAMBLE_LEN + expected_len)) + } + None => { + let missing = expected_len - rest.len(); + Err(MessageDeserializeError::NotEnoughData(missing, "request")) + } + } + } + MSGID_PIECE => { + if len_prefix <= 9 { + return Err(MessageDeserializeError::IncorrectLenPrefix { + expected: 10, + received: len_prefix, + msg_id, + }); + } + // is for "9", "8" is for 2 integer fields in the piece. + let expected_len = len_prefix as usize - 9 + 8; + match rest.get(..expected_len) { + Some(b) => Ok(( + Message::Piece(Piece::deserialize(&b)), + PREAMBLE_LEN + expected_len, + )), + None => Err(MessageDeserializeError::NotEnoughData( + expected_len - rest.len(), + "piece", + )), + } + } + msg_id => Err(MessageDeserializeError::UnsupportedMessageId(msg_id)), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Handshake<'a> { + pub pstr: &'a str, + pub reserved: [u8; 8], + pub info_hash: [u8; 20], + pub peer_id: [u8; 20], +} + +fn bopts() -> impl bincode::Options { + bincode::DefaultOptions::new() + .with_fixint_encoding() + .with_big_endian() +} + +impl<'a> Handshake<'a> { + pub fn new(info_hash: [u8; 20], peer_id: [u8; 20]) -> Handshake<'static> { + debug_assert_eq!(PSTR_BT1.len(), 19); + Handshake { + pstr: PSTR_BT1, + reserved: [0; 8], + info_hash, + peer_id, + } + } + fn bopts() -> impl bincode::Options { + bincode::DefaultOptions::new() + } + pub fn deserialize(b: &[u8]) -> Result<(Handshake<'_>, usize), MessageDeserializeError> { + let pstr_len = *b + .get(0) + .ok_or(MessageDeserializeError::NotEnoughData(1, "handshake"))?; + let expected_len = 1usize + pstr_len as usize + 48; + let hbuf = b + .get(..expected_len) + .ok_or(MessageDeserializeError::NotEnoughData( + expected_len, + "handshake", + ))?; + Ok((Self::bopts().deserialize(&hbuf).unwrap(), expected_len)) + } + pub fn serialize(&self) -> Vec { + Self::bopts().serialize(&self).unwrap() + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +pub struct Request { + pub index: u32, + pub begin: u32, + pub length: u32, +} + +impl Request { + pub fn new(index: u32, begin: u32, length: u32) -> Self { + Self { + index, + begin, + length, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_handshake_serialize() { + let info_hash = [ + 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + ]; + let peer_id = [ + 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + ]; + let b = dbg!(Handshake::new(info_hash, peer_id).serialize()); + assert_eq!(b.len(), 20 + 20 + 8 + 19 + 1); + } +} diff --git a/crates/librqbit/src/peer_id.rs b/crates/librqbit/src/peer_id.rs new file mode 100644 index 0000000..a519965 --- /dev/null +++ b/crates/librqbit/src/peer_id.rs @@ -0,0 +1,45 @@ +#[derive(Debug)] +pub enum AzureusStyleKind { + Deluge, + LibTorrent, + Transmission, + Other([char; 2]), +} + +#[derive(Debug)] +pub struct AzureusStyle { + pub kind: AzureusStyleKind, + pub version: [char; 4], +} + +impl AzureusStyleKind { + pub const fn from_bytes(b1: u8, b2: u8) -> Self { + match &[b1, b2] { + b"DE" => AzureusStyleKind::Deluge, + b"lt" | b"LT" => AzureusStyleKind::LibTorrent, + b"TR" => AzureusStyleKind::Transmission, + _ => AzureusStyleKind::Other([b1 as char, b2 as char]), + } + } +} + +fn try_decode_azureus_style(p: &[u8; 20]) -> Option { + if !(p[0] == b'-' && p[7] == b'-') { + return None; + } + let mut version = ['0'; 4]; + for (i, c) in (&p[3..7]).iter().copied().enumerate() { + version[i] = c as char; + } + let kind = AzureusStyleKind::from_bytes(p[1], p[2]); + Some(AzureusStyle { kind, version }) +} + +#[derive(Debug)] +pub enum PeerId { + AzureusStyle(AzureusStyle), +} + +pub fn try_decode_peer_id(p: [u8; 20]) -> Option { + Some(PeerId::AzureusStyle(try_decode_azureus_style(&p)?)) +} diff --git a/crates/librqbit/src/serde_bencode.rs b/crates/librqbit/src/serde_bencode.rs new file mode 100644 index 0000000..48f01e1 --- /dev/null +++ b/crates/librqbit/src/serde_bencode.rs @@ -0,0 +1,663 @@ +use serde::de::Deserializer; +use serde::de::Error as DeError; +use std::collections::HashMap; + +use crate::buffers::ByteBuf; +use crate::buffers::ByteString; + +pub struct BencodeDeserializer<'de> { + buf: &'de [u8], + field_context: Vec>, + parsing_key: bool, + pub(crate) is_torrent_info: bool, + pub(crate) torrent_info_digest: Option<[u8; 20]>, +} + +impl<'de> BencodeDeserializer<'de> { + pub fn new_from_buf(buf: &'de [u8]) -> BencodeDeserializer<'de> { + Self { + buf, + field_context: Default::default(), + parsing_key: false, + is_torrent_info: false, + torrent_info_digest: None, + } + } + fn parse_integer(&mut self) -> Result { + match self.buf.iter().copied().position(|e| e == b'e') { + Some(end) => { + let intbytes = &self.buf[1..end]; + let value: i64 = std::str::from_utf8(intbytes) + .map_err(|e| Error::new_from_err(e).set_context(self))? + .parse() + .map_err(|e| Error::new_from_err(e).set_context(self))?; + let rem = self.buf.get(end + 1..).unwrap_or_default(); + self.buf = rem; + Ok(value) + } + None => Err(Error::custom("cannot parse integer, unexpected EOF").set_context(self)), + } + } + + fn parse_bytes(&mut self) -> Result<&'de [u8], Error> { + match self.buf.iter().copied().position(|e| e == b':') { + Some(length_delim) => { + let lenbytes = &self.buf[..length_delim]; + let length: usize = std::str::from_utf8(lenbytes) + .map_err(|e| Error::new_from_err(e).set_context(self))? + .parse() + .map_err(|e| Error::new_from_err(e).set_context(self))?; + let bytes_start = length_delim + 1; + let bytes_end = bytes_start + length; + let bytes = &self.buf[bytes_start..bytes_end]; + let rem = self.buf.get(bytes_end..).unwrap_or_default(); + self.buf = rem; + Ok(bytes) + } + None => Err(Error::custom("cannot parse bytes, unexpected EOF").set_context(self)), + } + } + + fn parse_bytes_checked(&mut self) -> Result<&'de [u8], Error> { + let first = match self.buf.first().copied() { + Some(first) => first, + None => return Err(Error::custom("expected bencode bytes, got EOF").set_context(self)), + }; + match first { + b'0'..=b'9' => {} + _ => return Err(Error::custom("expected bencode bytes").set_context(self)), + } + let b = self.parse_bytes()?; + if self.parsing_key { + self.field_context.push(ByteBuf(b)); + } + Ok(b) + } +} + +pub fn from_bytes<'a, T>(buf: &'a [u8]) -> anyhow::Result +where + T: serde::de::Deserialize<'a>, +{ + let mut de = BencodeDeserializer::new_from_buf(buf); + Ok(T::deserialize(&mut de)?) +} + +pub fn dyn_from_bytes(buf: &[u8]) -> anyhow::Result> { + from_bytes(buf) +} + +#[derive(Debug)] +enum ErrorKind { + Other(anyhow::Error), + NotSupported(&'static str), +} + +#[derive(Debug, Default)] +pub struct ErrorContext { + field_stack: Vec, +} + +impl std::fmt::Display for ErrorContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut it = self.field_stack.iter(); + if let Some(field) = it.next() { + write!(f, "\"{}\"", field)?; + } else { + return Ok(()); + } + for field in self.field_stack.iter().skip(1) { + write!(f, " -> \"{}\"", field)?; + } + f.write_str(": ") + } +} + +#[derive(Debug)] +pub struct Error { + kind: ErrorKind, + context: ErrorContext, +} + +impl std::fmt::Display for ErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ErrorKind::Other(err) => err.fmt(f), + ErrorKind::NotSupported(s) => write!(f, "{} is not supported by bencode", s), + } + } +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}{}", self.context, self.kind) + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match &self.kind { + ErrorKind::Other(err) => err.source(), + _ => None, + } + } +} + +impl Error { + fn new_from_err(e: E) -> Self + where + E: std::error::Error + Send + Sync + 'static, + { + Error { + kind: ErrorKind::Other(anyhow::Error::new(e)), + context: Default::default(), + } + } + + fn new_from_kind(kind: ErrorKind) -> Self { + Self { + kind, + context: Default::default(), + } + } + + fn new_from_anyhow(e: anyhow::Error) -> Self { + Error { + kind: ErrorKind::Other(e), + context: Default::default(), + } + } + fn custom_with_de(msg: M, de: &BencodeDeserializer<'_>) -> Self { + Self::custom(msg).set_context(de) + } + fn set_context(mut self, de: &BencodeDeserializer<'_>) -> Self { + self.context = ErrorContext { + field_stack: de.field_context.iter().map(|s| format!("{}", s)).collect(), + }; + self + } +} + +impl serde::de::Error for Error { + fn custom(msg: T) -> Self + where + T: std::fmt::Display, + { + Self { + kind: ErrorKind::Other(anyhow::anyhow!("{}", msg)), + context: Default::default(), + } + } +} + +impl<'de, 'a> serde::de::Deserializer<'de> for &'a mut BencodeDeserializer<'de> { + type Error = Error; + + fn deserialize_any(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self.buf.first().copied() { + Some(b'd') => self.deserialize_map(visitor), + Some(b'i') => self.deserialize_u64(visitor), + Some(b'l') => self.deserialize_seq(visitor), + Some(_) => self.deserialize_bytes(visitor), + None => Err(Error::custom_with_de("empty input", self)), + } + } + + fn deserialize_bool(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err( + Error::new_from_kind(ErrorKind::NotSupported("bencode doesn't support booleans")) + .set_context(self), + ) + } + + fn deserialize_i8(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_i64(visitor) + } + + fn deserialize_i16(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_i64(visitor) + } + + fn deserialize_i32(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_i64(visitor) + } + + fn deserialize_i64(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + if !self.buf.starts_with(b"i") { + return Err(Error::custom_with_de("expected bencode int", self)); + } + visitor + .visit_i64(self.parse_integer()?) + .map_err(|e: Self::Error| e.set_context(self)) + } + + fn deserialize_u8(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_i64(visitor) + } + + fn deserialize_u16(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_i64(visitor) + } + + fn deserialize_u32(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_i64(visitor) + } + + fn deserialize_u64(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_i64(visitor) + } + + fn deserialize_f32(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err( + Error::new_from_kind(ErrorKind::NotSupported("bencode doesn't support floats")) + .set_context(self), + ) + } + + fn deserialize_f64(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err( + Error::new_from_kind(ErrorKind::NotSupported("bencode doesn't support floats")) + .set_context(self), + ) + } + + fn deserialize_char(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err( + Error::new_from_kind(ErrorKind::NotSupported("bencode doesn't support chars")) + .set_context(self), + ) + } + + fn deserialize_str(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + let first = match self.buf.first().copied() { + Some(first) => first, + None => { + return Err(Error::custom_with_de( + "expected bencode string, got EOF", + self, + )) + } + }; + match first { + b'0'..=b'9' => {} + _ => return Err(Error::custom_with_de("expected bencode string", self)), + } + let b = self.parse_bytes()?; + let s = std::str::from_utf8(b).map_err(|e| { + Error::new_from_anyhow(anyhow::anyhow!("error reading utf-8: {}", e)).set_context(self) + })?; + visitor + .visit_borrowed_str(s) + .map_err(|e: Self::Error| e.set_context(self)) + } + + fn deserialize_string(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_str(visitor) + } + + fn deserialize_bytes(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + let b = self.parse_bytes_checked()?; + visitor + .visit_borrowed_bytes(b) + .map_err(|e: Self::Error| e.set_context(self)) + } + + fn deserialize_byte_buf(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_bytes(visitor) + } + + fn deserialize_option(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor + .visit_some(&mut *self) + .map_err(|e: Self::Error| e.set_context(self)) + } + + fn deserialize_unit(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::new_from_kind(ErrorKind::NotSupported( + "bencode doesn't support unit types", + )) + .set_context(self)) + } + + fn deserialize_unit_struct( + self, + _name: &'static str, + _visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::new_from_kind(ErrorKind::NotSupported( + "bencode doesn't support unit structs", + )) + .set_context(self)) + } + + fn deserialize_newtype_struct( + self, + _name: &'static str, + _visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + Err( + Error::new_from_kind(ErrorKind::NotSupported("bencode doesn't newtype structs")) + .set_context(self), + ) + } + + fn deserialize_seq(mut self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + if !self.buf.starts_with(b"l") { + return Err(Error::custom(format!( + "expected bencode list, but got {}", + self.buf[0] as char, + ))); + } + self.buf = self.buf.get(1..).unwrap_or_default(); + visitor + .visit_seq(SeqAccess { de: &mut self }) + .map_err(|e: Self::Error| e.set_context(self)) + } + + fn deserialize_tuple(self, _len: usize, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_seq(visitor) + } + + fn deserialize_tuple_struct( + self, + _name: &'static str, + _len: usize, + visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_seq(visitor) + } + + fn deserialize_map(mut self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + if !self.buf.starts_with(b"d") { + return Err(Error::custom("expected bencode dict")); + } + self.buf = self.buf.get(1..).unwrap_or_default(); + visitor + .visit_map(MapAccess { de: &mut self }) + .map_err(|e: Self::Error| e.set_context(self)) + } + + fn deserialize_struct( + self, + _name: &'static str, + _fields: &'static [&'static str], + visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_map(visitor) + } + + fn deserialize_enum( + self, + _name: &'static str, + _variants: &'static [&'static str], + _visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + Err( + Error::new_from_kind(ErrorKind::NotSupported("deserializing enums not supported")) + .set_context(self), + ) + } + + fn deserialize_identifier(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + let name = self.parse_bytes_checked()?; + visitor + .visit_borrowed_bytes(name) + .map_err(|e: Self::Error| e.set_context(self)) + } + + fn deserialize_ignored_any(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_any(visitor) + } +} + +struct MapAccess<'a, 'de> { + de: &'a mut BencodeDeserializer<'de>, +} + +struct SeqAccess<'a, 'de> { + de: &'a mut BencodeDeserializer<'de>, +} + +impl<'a, 'de> serde::de::MapAccess<'de> for MapAccess<'a, 'de> { + type Error = Error; + + fn next_key_seed(&mut self, seed: K) -> Result, Self::Error> + where + K: serde::de::DeserializeSeed<'de>, + { + if self.de.buf.starts_with(b"e") { + self.de.buf = self.de.buf.get(1..).unwrap_or_default(); + return Ok(None); + } + self.de.parsing_key = true; + let retval = seed.deserialize(&mut *self.de)?; + self.de.parsing_key = false; + Ok(Some(retval)) + } + + fn next_value_seed(&mut self, seed: V) -> Result + where + V: serde::de::DeserializeSeed<'de>, + { + let buf_before = self.de.buf; + let value = seed.deserialize(&mut *self.de)?; + if self.de.is_torrent_info && self.de.field_context.as_slice() == [ByteBuf(b"info")] { + let len = self.de.buf.as_ptr() as usize - buf_before.as_ptr() as usize; + let mut hash = sha1::Sha1::new(); + hash.update(&buf_before[..len]); + let digest = hash.digest().bytes(); + self.de.torrent_info_digest = Some(digest) + } + self.de.field_context.pop(); + Ok(value) + } +} + +impl<'a, 'de> serde::de::SeqAccess<'de> for SeqAccess<'a, 'de> { + type Error = Error; + + fn next_element_seed(&mut self, seed: T) -> Result, Self::Error> + where + T: serde::de::DeserializeSeed<'de>, + { + if self.de.buf.starts_with(b"e") { + self.de.buf = self.de.buf.get(1..).unwrap_or_default(); + return Ok(None); + } + Ok(Some(seed.deserialize(&mut *self.de)?)) + } +} + +impl<'de> serde::de::Deserialize<'de> for DynBencodeNode<'de> { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct Visitor; + + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = DynBencodeNode<'de>; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "a bencode value") + } + + fn visit_i64(self, v: i64) -> Result + where + E: serde::de::Error, + { + Ok(DynBencodeNode::Integer(v)) + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut v = Vec::new(); + while let Some(value) = seq.next_element()? { + v.push(value); + } + Ok(DynBencodeNode::List(v)) + } + + fn visit_borrowed_bytes(self, v: &'de [u8]) -> Result + where + E: serde::de::Error, + { + Ok(DynBencodeNode::Bytes(ByteBuf(v))) + } + + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut hmap = HashMap::new(); + while let Some(key) = map.next_key()? { + let value = map.next_value()?; + hmap.insert(key, value); + } + Ok(DynBencodeNode::Dict(hmap)) + } + } + + deserializer.deserialize_any(Visitor {}) + } +} + +impl<'de> serde::de::Deserialize<'de> for ByteString { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct Visitor; + + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = Vec; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("bencode byte string") + } + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + Ok(v.to_owned()) + } + } + Ok(ByteString(deserializer.deserialize_byte_buf(Visitor {})?)) + } +} + +#[derive(Debug)] +pub enum DynBencodeNode<'a> { + Bytes(ByteBuf<'a>), + Integer(i64), + List(Vec>), + Dict(HashMap, DynBencodeNode<'a>>), +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Read; + + #[test] + fn test_deserialize_torrent_dyn() { + let mut buf = Vec::new(); + let filename = "resources/ubuntu-21.04-desktop-amd64.iso.torrent"; + std::fs::File::open(filename) + .unwrap() + .read_to_end(&mut buf) + .unwrap(); + + let torrent: DynBencodeNode = from_bytes(&buf).unwrap(); + dbg!(torrent); + } +} diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs new file mode 100644 index 0000000..1462297 --- /dev/null +++ b/crates/librqbit/src/torrent_manager.rs @@ -0,0 +1,1112 @@ +use std::{ + collections::{HashMap, HashSet}, + fmt::Display, + fs::{File, OpenOptions}, + io::{Read, Seek, Write}, + net::SocketAddr, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; + +use anyhow::Context; +use futures::{stream::FuturesUnordered, StreamExt}; +use log::{debug, error, info, trace, warn}; +use parking_lot::{Mutex, RwLock}; +use reqwest::Url; +use tokio::sync::{mpsc::Sender, Notify, Semaphore}; + +use crate::{ + buffers::ByteString, + chunk_tracker::ChunkTracker, + clone_to_owned::CloneToOwned, + lengths::{Lengths, ValidPieceIndex}, + peer_comms::{ + Handshake, Message, MessageBorrowed, MessageDeserializeError, MessageOwned, Piece, Request, + }, + peer_id::try_decode_peer_id, + torrent_metainfo::TorrentMetaV1Owned, + tracker_comms::{CompactTrackerResponse, TrackerRequest, TrackerRequestEvent}, +}; +pub struct TorrentManagerBuilder { + torrent: TorrentMetaV1Owned, + overwrite: bool, + output_folder: PathBuf, +} + +impl TorrentManagerBuilder { + pub fn new>(torrent: TorrentMetaV1Owned, output_folder: P) -> Self { + Self { + torrent, + overwrite: false, + output_folder: output_folder.as_ref().into(), + } + } + + pub fn overwrite(mut self, overwrite: bool) -> Self { + self.overwrite = overwrite; + self + } + + pub async fn start_manager(self) -> anyhow::Result { + TorrentManager::start(self.torrent, self.output_folder, self.overwrite) + } +} + +#[derive(Clone)] +pub struct TorrentManagerHandle { + manager: TorrentManager, +} + +impl TorrentManagerHandle { + pub async fn cancel(&self) -> anyhow::Result<()> { + todo!() + } + pub async fn wait_until_completed(&self) -> anyhow::Result<()> { + loop { + tokio::time::sleep(Duration::from_secs(60)).await; + } + } +} + +type PeerHandle = SocketAddr; + +enum PeerState { + Connecting(SocketAddr), + Live(LivePeerState), +} + +type BF = bitvec::vec::BitVec; + +struct LivePeerState { + #[allow(unused)] + peer_id: [u8; 20], + i_am_choked: bool, + #[allow(unused)] + peer_choked: bool, + #[allow(unused)] + peer_interested: bool, + outstanding_requests: Arc, + have_notify: Arc, + bitfield: Option, + requested_pieces: HashSet, +} + +#[derive(Default)] +struct PeerStates { + states: HashMap, + seen_peers: HashSet, + tx: HashMap>>, +} + +impl PeerStates { + fn add_if_not_seen( + &mut self, + addr: SocketAddr, + tx: tokio::sync::mpsc::Sender, + ) -> Option { + if self.seen_peers.contains(&addr) { + return None; + } + let handle = self.add(addr, tx)?; + self.seen_peers.insert(addr); + Some(handle) + } + fn get_live(&self, handle: PeerHandle) -> Option<&LivePeerState> { + if let PeerState::Live(ref l) = self.states.get(&handle)? { + return Some(l); + } + None + } + fn get_live_mut(&mut self, handle: PeerHandle) -> Option<&mut LivePeerState> { + if let PeerState::Live(ref mut l) = self.states.get_mut(&handle)? { + return Some(l); + } + None + } + fn add( + &mut self, + addr: SocketAddr, + tx: tokio::sync::mpsc::Sender, + ) -> Option { + let handle = addr; + if self.states.contains_key(&addr) { + return None; + } + self.states.insert(handle, PeerState::Connecting(addr)); + self.tx.insert(handle, Arc::new(tx)); + Some(handle) + } + fn drop_peer(&mut self, handle: PeerHandle) -> Option { + let result = self.states.remove(&handle); + self.tx.remove(&handle); + result + } + fn mark_i_am_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option { + match self.states.get_mut(&handle) { + Some(PeerState::Live(live)) => { + let prev = live.i_am_choked; + live.i_am_choked = is_choked; + return Some(prev); + } + _ => return None, + } + } + fn mark_peer_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option { + match self.states.get_mut(&handle) { + Some(PeerState::Live(live)) => { + let prev = live.peer_choked; + live.peer_choked = is_choked; + return Some(prev); + } + _ => return None, + } + } + fn update_bitfield_from_vec( + &mut self, + handle: PeerHandle, + bitfield: Vec, + ) -> Option> { + match self.states.get_mut(&handle) { + Some(PeerState::Live(live)) => { + let bitfield = BF::from_vec(bitfield); + let prev = live.bitfield.take(); + live.bitfield = Some(bitfield); + Some(prev) + } + _ => None, + } + } + fn get_tx(&self, handle: PeerHandle) -> Option<&Sender> { + self.tx.get(&handle).map(|v| v.as_ref()) + } + fn clone_tx(&self, handle: PeerHandle) -> Option>> { + Some(self.tx.get(&handle)?.clone()) + } +} + +struct TorrentManagerInnerLocked { + peers: PeerStates, + chunks: ChunkTracker, +} + +impl TorrentManagerInnerLocked {} + +struct TorrentManagerInner { + torrent: TorrentMetaV1Owned, + locked: Arc>, + files: Vec>>, + info_hash: [u8; 20], + peer_id: [u8; 20], + incoming_tx: tokio::sync::mpsc::Sender<(PeerHandle, MessageOwned)>, + downloaded: AtomicU64, + uploaded: AtomicU64, + fetched_bytes: AtomicU64, + lengths: Lengths, +} + +#[derive(Clone)] +struct TorrentManager { + inner: Arc, +} + +fn generate_peer_id() -> [u8; 20] { + let mut peer_id = [0u8; 20]; + let u = uuid::Uuid::new_v4(); + (&mut peer_id[..16]).copy_from_slice(&u.as_bytes()[..]); + peer_id +} + +fn spawn( + name: N, + fut: impl std::future::Future> + Send + 'static, +) { + debug!("starting task \"{}\"", name); + tokio::spawn(async move { + match fut.await { + Ok(_) => { + debug!("task \"{}\" finished", name); + } + Err(e) => { + error!("error in task \"{}\": {:#}", name, e) + } + } + }); +} + +fn spawn_blocking( + name: N, + f: impl FnOnce() -> anyhow::Result<()> + Send + 'static, +) { + debug!("starting blocking task \"{}\"", name); + tokio::task::spawn_blocking(move || match f() { + Ok(_) => { + debug!("blocking task \"{}\" finished", name); + } + Err(e) => { + error!("error in blocking task \"{}\": {:#}", name, e) + } + }); +} + +fn make_lengths(torrent: &TorrentMetaV1Owned) -> anyhow::Result { + let total_length = torrent.info.iter_file_lengths().sum(); + Lengths::new(total_length, torrent.info.piece_length, None) +} + +fn compute_needed_pieces( + torrent: &TorrentMetaV1Owned, + files: &mut [Arc>], + lengths: &Lengths, +) -> anyhow::Result { + let needed_pieces = vec![u8::MAX; lengths.piece_bitfield_bytes()]; + let needed_pieces = BF::from_vec(needed_pieces); + + // TODO: read and validate existing files + Ok(needed_pieces) +} + +impl TorrentManager { + pub fn start>( + torrent: TorrentMetaV1Owned, + out: P, + overwrite: bool, + ) -> anyhow::Result { + let mut files = { + let mut files = + Vec::>>::with_capacity(torrent.info.iter_file_lengths().count()); + + for (path_bits, _) in torrent.info.iter_filenames_and_lengths() { + let mut full_path = out.as_ref().to_owned(); + for bit in path_bits.iter_components() { + full_path.push( + bit.as_ref() + .map(|b| std::str::from_utf8(b.as_ref())) + .unwrap_or(Ok("output"))?, + ); + } + + std::fs::create_dir_all(full_path.parent().unwrap())?; + let file = if overwrite { + OpenOptions::new() + .create(true) + .read(true) + .write(true) + .open(&full_path)? + } else { + // TODO: create_new does not seem to work with read(true), so calling this twice. + OpenOptions::new() + .create_new(true) + .write(true) + .open(&full_path) + .with_context(|| format!("error creating {:?}", &full_path))?; + OpenOptions::new().read(true).write(true).open(&full_path)? + }; + files.push(Arc::new(Mutex::new(file))) + } + files + }; + + let peer_id = generate_peer_id(); + let lengths = make_lengths(&torrent).context("unable to compute Lengths from torrent")?; + let needed_pieces = compute_needed_pieces(&torrent, &mut files, &lengths)?; + debug!("computed lengths: {:?}", &lengths); + let chunk_tracker = ChunkTracker::new(needed_pieces, lengths); + + let (incoming_tx, incoming_rx) = + tokio::sync::mpsc::channel::<(PeerHandle, MessageOwned)>(1); + + let mgr = Self { + inner: Arc::new(TorrentManagerInner { + info_hash: torrent.info_hash, + torrent, + peer_id, + locked: Arc::new(RwLock::new(TorrentManagerInnerLocked { + peers: Default::default(), + chunks: chunk_tracker, + })), + files, + incoming_tx, + downloaded: Default::default(), + fetched_bytes: Default::default(), + uploaded: Default::default(), + lengths, + }), + }; + + spawn("tracker_monitor", mgr.clone().task_tracker_monitor()); + spawn( + "incoming_rx_handler", + mgr.clone().task_incoming_rx_handler(incoming_rx), + ); + spawn("Stats printer", mgr.clone().stats_printer()); + Ok(mgr.into_handle()) + } + + async fn stats_printer(self) -> anyhow::Result<()> { + loop { + let live_peers = self.inner.locked.read().peers.states.len(); + let downloaded_bytes = self.inner.downloaded.load(Ordering::Relaxed); + let downloaded = self.inner.downloaded.load(Ordering::Relaxed) / 1024 / 1024; + let fetched = self.inner.fetched_bytes.load(Ordering::Relaxed) / 1024 / 1024; + let total_length = self.inner.lengths.total_length(); + let pct = if total_length == downloaded { + 100f64 + } else { + (downloaded_bytes as f64 / self.inner.lengths.total_length() as f64) * 100f64 + }; + info!( + "Total downloaded and checked {}MiB ({:.2}%), fetched {}MiB, live peers={}", + downloaded, pct, fetched, live_peers + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + + async fn task_tracker_monitor(self) -> anyhow::Result<()> { + let mut seen_trackers = HashSet::new(); + let mut tracker_futures = FuturesUnordered::new(); + let parse_url = |url: &[u8]| -> anyhow::Result { + let url = std::str::from_utf8(url).context("error parsing tracker URL")?; + let url = Url::parse(url).context("error parsing tracker URL")?; + Ok(url) + }; + for tracker in self.inner.torrent.iter_announce() { + if seen_trackers.contains(&tracker) { + continue; + } + seen_trackers.insert(tracker); + let tracker_url = match parse_url(tracker) { + Ok(url) => url, + Err(e) => { + warn!("ignoring tracker: {:#}", e); + continue; + } + }; + tracker_futures.push(self.clone().single_tracker_monitor(tracker_url)); + } + + while tracker_futures.next().await.is_some() {} + Ok(()) + } + async fn task_incoming_rx_handler( + self, + mut incoming_tx: tokio::sync::mpsc::Receiver<(PeerHandle, MessageOwned)>, + ) -> anyhow::Result<()> { + loop { + let (peer_handle, message): (PeerHandle, MessageOwned) = match incoming_tx.recv().await + { + Some(msg) => msg, + None => { + return Ok(()); + } + }; + + match message { + Message::Request(request) => { + warn!( + "{}: received {:?} , but download requests not implemented", + peer_handle, request + ) + } + Message::Bitfield(b) => self.on_bitfield(peer_handle, b), + Message::Choke => self.on_i_am_choked(peer_handle), + Message::Unchoke => self.on_i_am_unchoked(peer_handle), + Message::Interested => { + warn!( + "{} is interested, but support for interested messages not implemented", + peer_handle + ) + } + Message::Piece(piece) => { + self.on_received_piece(peer_handle, piece); + } + Message::KeepAlive => { + debug!("keepalive received from {}", peer_handle); + } + Message::Have(h) => self.on_have(peer_handle, h), + Message::NotInterested => { + info!("received \"not interested\", but we don't care yet") + } + } + } + } + fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool { + self.get_next_needed_piece(handle).is_some() + } + + fn on_have(&self, handle: PeerHandle, have: u32) { + if let Some(bitfield) = self + .inner + .locked + .write() + .peers + .get_live_mut(handle) + .and_then(|l| l.bitfield.as_mut()) + { + debug!("{}: updated bitfield with have={}", handle, have); + bitfield.set(have as usize, true) + } + } + + fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) { + if bitfield.len() != self.inner.lengths.piece_bitfield_bytes() as usize { + warn!( + "dropping {} as its bitfield has unexpected size. Got {}, expected {}", + handle, + bitfield.len(), + self.inner.lengths.piece_bitfield_bytes(), + ); + self.inner.locked.write().peers.drop_peer(handle); + return; + } + self.inner + .locked + .write() + .peers + .update_bitfield_from_vec(handle, bitfield.0); + if !self.am_i_interested_in_peer(handle) { + self.inner.locked.write().peers.drop_peer(handle); + return; + } + + // Additional spawn per peer. + spawn( + format!("peer_chunk_requester({})", handle), + self.clone().task_peer_chunk_requester(handle), + ); + } + + async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> { + let tx = match self.inner.locked.read().peers.clone_tx(handle) { + Some(tx) => tx, + None => return Ok(()), + }; + tx.send(MessageOwned::Unchoke) + .await + .context("peer dropped")?; + tx.send(MessageOwned::Interested) + .await + .context("peer dropped")?; + + self.requester(handle).await?; + Ok::<_, anyhow::Error>(()) + } + + fn on_i_am_choked(&self, handle: PeerHandle) { + warn!("we are choked by {}", handle); + self.inner + .locked + .write() + .peers + .mark_i_am_choked(handle, true); + } + fn am_i_choked(&self, peer_handle: PeerHandle) -> Option { + self.inner + .locked + .read() + .peers + .states + .get(&peer_handle) + .and_then(|s| match s { + PeerState::Live(l) => Some(l.i_am_choked), + _ => None, + }) + } + + async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> { + let notify = match self.inner.locked.read().peers.get_live(handle) { + Some(l) => l.have_notify.clone(), + None => return Ok(()), + }; + // TODO: this might dangle + tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; + + loop { + let next = match self.reserve_next_needed_piece(handle) { + Some(next) => next, + None => { + info!("no pieces to request from {}", handle); + let notify = match self.inner.locked.read().peers.get_live(handle) { + Some(l) => l.have_notify.clone(), + None => return Ok(()), + }; + // TODO: this might dangle + tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; + continue; + } + }; + let tx = match self.inner.locked.read().peers.clone_tx(handle) { + Some(tx) => tx, + None => return Ok(()), + }; + let sem = match self.inner.locked.read().peers.get_live(handle) { + Some(live) => live.outstanding_requests.clone(), + None => return Ok(()), + }; + for chunk in self.inner.lengths.iter_chunk_infos(next) { + let request = Request { + index: next.get(), + begin: chunk.offset, + length: chunk.size, + }; + sem.acquire().await?.forget(); + tx.send(MessageOwned::Request(request)) + .await + .context("peer dropped")?; + } + } + } + fn on_i_am_unchoked(&self, handle: PeerHandle) { + debug!("we are unchoked by {}", handle); + let mut g = self.inner.locked.write(); + let live = match g.peers.get_live_mut(handle) { + Some(live) => live, + None => return, + }; + live.i_am_choked = false; + live.have_notify.notify_waiters(); + live.outstanding_requests.add_permits(16); + } + fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { + let g = self.inner.locked.read(); + let bf = match g.peers.states.get(&peer_handle)? { + PeerState::Live(l) => l.bitfield.as_ref()?, + _ => return None, + }; + for n in g.chunks.get_needed_pieces().iter_ones() { + if bf.get(n).map(|v| *v) == Some(true) { + // in theory it should be safe without validation, but whatever. + return self.inner.lengths.validate_piece_index(n as u32); + } + } + None + } + + fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { + if self.am_i_choked(peer_handle)? { + warn!("we are choked by {}, can't reserve next piece", peer_handle); + return None; + } + let mut g = self.inner.locked.write(); + let n = { + let mut n_opt = None; + let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?; + for n in g.chunks.get_needed_pieces().iter_ones() { + if bf.get(n).map(|v| *v) == Some(true) { + n_opt = Some(n); + break; + } + } + self.inner.lengths.validate_piece_index(n_opt? as u32)? + }; + + g.peers + .get_live_mut(peer_handle)? + .requested_pieces + .insert(n); + g.chunks.reserve_needed_piece(n); + Some(n) + } + + fn check_piece_blocking( + &self, + who_sent: PeerHandle, + index: ValidPieceIndex, + ) -> anyhow::Result { + let mut h = sha1::Sha1::new(); + let piece_length = self.inner.lengths.piece_length(index); + let mut absolute_offset = self.inner.lengths.piece_offset(index); + let mut buf = vec![0; std::cmp::min(8192, piece_length as usize)]; + + let mut left_to_read = piece_length as usize; + + for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() { + if absolute_offset > file_len { + absolute_offset -= file_len; + continue; + } + let file_remaining_len = file_len - absolute_offset; + + let mut left_to_read_in_file = + std::cmp::min(file_remaining_len, left_to_read as u64) as usize; + let mut file_g = self.inner.files[file_idx].lock(); + trace!("piece={}, seeking to {}", index, absolute_offset); + file_g + .seek(std::io::SeekFrom::Start(absolute_offset)) + .with_context(|| { + format!( + "error seeking to {}, file id: {}", + absolute_offset, file_idx + ) + })?; + while left_to_read_in_file > 0 { + let chunk_length = std::cmp::min(buf.len(), left_to_read_in_file); + file_g + .read_exact(&mut buf[..chunk_length]) + .with_context(|| { + format!( + "error reading {} bytes, file_id: {}, left_to_read_in_file: {}", + chunk_length, file_idx, left_to_read_in_file + ) + })?; + h.update(&buf[..chunk_length]); + left_to_read_in_file -= chunk_length; + } + + match self.inner.torrent.info.compare_hash(index.get(), &h) { + Some(true) => { + debug!("piece={} hash matches", index); + } + Some(false) => { + warn!("the piece={} hash does not match", index); + return Ok(false); + } + None => { + // this is probably a bug? + warn!("compare_hash() did not find the piece"); + anyhow::bail!("compare_hash() did not find the piece"); + } + } + + left_to_read -= left_to_read_in_file; + + if left_to_read == 0 { + return Ok(true); + } + + absolute_offset = 0; + } + Ok(true) + } + + // TODO: this is a task per chunk, not good + async fn task_transmit_haves(self, index: u32) -> anyhow::Result<()> { + let mut unordered = FuturesUnordered::new(); + + for weak in self + .inner + .locked + .read() + .peers + .tx + .values() + .map(|v| Arc::downgrade(v)) + { + unordered.push(async move { + if let Some(tx) = weak.upgrade() { + if tx.send(Message::Have(index)).await.is_err() { + // whatever + } + } + }); + } + + while unordered.next().await.is_some() {} + Ok(()) + } + + fn write_chunk_blocking( + &self, + who_sent: PeerHandle, + chunk: &Piece, + ) -> anyhow::Result<()> { + let mut absolute_offset = + self.inner.torrent.info.piece_length as u64 * chunk.index as u64 + chunk.begin as u64; + + let mut buf = chunk.block.as_ref(); + + for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() { + if absolute_offset > file_len { + absolute_offset -= file_len; + continue; + } + + let remaining_len = file_len - absolute_offset; + let to_write = std::cmp::min(buf.len(), remaining_len as usize); + + let mut file_g = self.inner.files[file_idx].lock(); + debug!( + "piece={}, handle={}, writing {} bytes to file {} at offset {}", + chunk.index, who_sent, to_write, file_idx, absolute_offset + ); + debug!("piece={}, seeking to {}", chunk.index, absolute_offset); + file_g.seek(std::io::SeekFrom::Start(absolute_offset))?; + file_g.write_all(&buf[..to_write])?; + buf = &buf[to_write..]; + if buf.is_empty() { + break; + } + + absolute_offset = 0; + } + + Ok(()) + } + + fn on_received_piece(&self, handle: PeerHandle, piece: Piece) -> Option<()> { + let chunk_info = match self + .inner + .lengths + .chunk_info_from_received_piece_data(&piece) + { + Some(i) => i, + None => { + warn!( + "peer {} sent us a piece that is invalid {:?}, dropping", + handle, &piece, + ); + self.drop_peer(handle); + return None; + } + }; + + let mut g = self.inner.locked.write(); + let h = g.peers.get_live_mut(handle)?; + h.outstanding_requests.add_permits(1); + + self.inner + .fetched_bytes + .fetch_add(piece.block.len() as u64, Ordering::Relaxed); + + if !h.requested_pieces.contains(&chunk_info.piece_index) { + warn!( + "peer {} sent us a piece that we did not ask for, dropping it. Requested pieces: {:?}. Got: {:?}", handle, &h.requested_pieces, &piece, + ); + self.drop_peer(handle); + return None; + } + + let this = self.clone(); + spawn_blocking( + format!("write_and_check(piece={}, block={:?})", piece.index, &piece), + move || { + let index = piece.index; + this.write_chunk_blocking(handle, &piece)?; + + let piece_done = match this + .inner + .locked + .write() + .chunks + .mark_chunk_downloaded(&piece) + { + Some(true) => { + debug!( + "piece={} done, requesting a piece from {}", + piece.index, handle + ); + true + } + Some(false) => false, + None => { + warn!( + "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", + handle, piece + ); + this.drop_peer(handle); + return Ok(()); + } + }; + + if !piece_done { + return Ok(()); + } + // Ignore responses about this piece from now on. + this.inner + .locked + .write() + .peers + .get_live_mut(handle) + .map(|l| l.requested_pieces.remove(&chunk_info.piece_index)); + + let clone = this.clone(); + match clone + .check_piece_blocking(handle, chunk_info.piece_index) + .with_context(|| format!("error checking piece={}", index))? + { + true => { + this.inner.downloaded.fetch_add( + this.inner.lengths.piece_length(chunk_info.piece_index) as u64, + Ordering::Relaxed, + ); + debug!( + "piece={} successfully downloaded and verified from {}", + index, handle + ); + spawn( + "transmit haves", + this.clone().task_transmit_haves(piece.index), + ); + } + false => { + warn!( + "checksum for piece={} did not validate, came from {}", + index, handle + ); + this.inner + .locked + .write() + .chunks + .mark_piece_needed(chunk_info.piece_index); + // this.drop_peer(handle); + } + }; + Ok::<_, anyhow::Error>(()) + }, + ); + Some(()) + } + fn into_handle(self) -> TorrentManagerHandle { + TorrentManagerHandle { manager: self } + } + fn get_uploaded(&self) -> u64 { + self.inner.uploaded.load(Ordering::Relaxed) + } + fn get_downloaded(&self) -> u64 { + self.inner.downloaded.load(Ordering::Relaxed) + } + async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result { + let response: reqwest::Response = reqwest::get(tracker_url).await?; + let bytes = response.bytes().await?; + let response = crate::serde_bencode::from_bytes::(&bytes)?; + + for peer in response.peers.iter_sockaddrs() { + self.add_peer(peer); + } + Ok(response.interval) + } + fn get_total(&self) -> u64 { + if let Some(length) = self.inner.torrent.info.length { + return length; + } + self.inner + .torrent + .info + .files + .as_ref() + .map(|files| files.iter().map(|f| f.length).sum()) + .unwrap_or_default() + } + fn get_left_to_download(&self) -> u64 { + self.get_total() - self.get_downloaded() + } + + async fn single_tracker_monitor(self, mut tracker_url: Url) -> anyhow::Result<()> { + let mut event = Some(TrackerRequestEvent::Started); + loop { + let request = TrackerRequest { + info_hash: self.inner.torrent.info_hash, + peer_id: self.inner.peer_id, + port: 6778, + uploaded: self.get_uploaded(), + downloaded: self.get_downloaded(), + left: self.get_left_to_download(), + compact: true, + no_peer_id: false, + event, + ip: None, + numwant: None, + key: None, + trackerid: None, + }; + + let request_query = request.as_querystring(); + tracker_url.set_query(Some(&request_query)); + + let this = self.clone(); + match this.tracker_one_request(tracker_url.clone()).await { + Ok(interval) => { + event = None; + let duration = Duration::from_secs(interval); + debug!( + "sleeping for {:?} after calling tracker {}", + duration, + tracker_url.host().unwrap() + ); + tokio::time::sleep(duration).await; + } + Err(e) => { + error!("error calling the tracker {}: {:#}", tracker_url, e); + tokio::time::sleep(Duration::from_secs(60)).await; + } + }; + } + } + fn set_peer_live(&self, handle: PeerHandle, addr: SocketAddr, h: Handshake) { + let mut g = self.inner.locked.write(); + match g.peers.states.get_mut(&handle) { + Some(s @ &mut PeerState::Connecting(_)) => { + *s = PeerState::Live(LivePeerState { + peer_id: h.peer_id, + i_am_choked: true, + peer_choked: true, + peer_interested: false, + bitfield: None, + have_notify: Arc::new(Notify::new()), + outstanding_requests: Arc::new(Semaphore::new(0)), + requested_pieces: Default::default(), + }); + } + _ => { + warn!("peer {} was in wrong state", handle); + } + } + } + async fn manage_peer( + &self, + addr: SocketAddr, + handle: PeerHandle, + incoming_chan: tokio::sync::mpsc::Sender<(PeerHandle, MessageOwned)>, + // outgoing_chan_tx: tokio::sync::mpsc::Sender, + mut outgoing_chan: tokio::sync::mpsc::Receiver, + ) -> anyhow::Result<()> { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + let mut conn = tokio::net::TcpStream::connect(addr).await?; + let handshake = Handshake::new(self.inner.info_hash, self.inner.peer_id); + conn.write_all(&handshake.serialize()).await?; + let mut read_buf = vec![0u8; 16384 * 2]; + let read_bytes = conn.read(&mut read_buf).await?; + if read_bytes == 0 { + anyhow::bail!("bad handshake"); + } + let (h, hlen) = Handshake::deserialize(&read_buf[..read_bytes]) + .map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?; + + let mut read_so_far = 0usize; + debug!( + "connected peer {}: {:?}", + addr, + try_decode_peer_id(h.peer_id) + ); + if h.info_hash != self.inner.info_hash { + anyhow::bail!("info hash does not match"); + } + + self.set_peer_live(handle, addr, h); + + if read_bytes > hlen { + read_buf.copy_within(hlen..read_bytes, 0); + read_so_far = read_bytes - hlen; + } + + let (mut read_half, mut write_half) = tokio::io::split(conn); + + let writer = async move { + let mut buf = vec![0u8; 1024]; + let keep_alive_interval = Duration::from_secs(120); + loop { + let msg = + match tokio::time::timeout(keep_alive_interval, outgoing_chan.recv()).await { + Ok(Some(msg)) => msg, + Ok(None) => return Err(anyhow::anyhow!("torrent manager closed")), + Err(_) => MessageOwned::KeepAlive, + }; + + let len = msg.serialize(&mut buf); + debug!("sending to {}: {:?}, length={}", handle, &msg, len); + + write_half + .write_all(&buf[..len]) + .await + .context("error writing")?; + } + + // For type inference. + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) + }; + + let reader = async move { + loop { + let (message, size) = loop { + match MessageBorrowed::deserialize(&read_buf[..read_so_far]) { + Ok((msg, size)) => break (msg.clone_to_owned(), size), + Err(MessageDeserializeError::NotEnoughData(d, _)) => { + if read_buf.len() < read_so_far + d { + read_buf.reserve(d); + read_buf.resize(read_buf.capacity(), 0); + } + } + Err(e) => return Err(e.into()), + } + + let size = read_half + .read(&mut read_buf[read_so_far..]) + .await + .context("error reading from peer")?; + if size == 0 { + anyhow::bail!("disconnected while reading, read so far: {}", read_so_far) + } + read_so_far += size; + }; + + if read_so_far > size { + read_buf.copy_within(size..read_so_far, 0); + } + read_so_far -= size; + + incoming_chan + .send((handle, message)) + .await + .context("error sending received message")?; + } + + // For type inference. + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) + }; + + let r = tokio::select! { + r = reader => {r} + r = writer => {r} + }; + debug!("{}: either reader or writer are done, exiting", handle); + r + } + fn drop_peer(&self, handle: PeerHandle) -> bool { + let mut g = self.inner.locked.write(); + let peer = match g.peers.drop_peer(handle) { + Some(peer) => peer, + None => return false, + }; + match peer { + PeerState::Connecting(_) => {} + PeerState::Live(l) => { + for piece in l.requested_pieces { + g.chunks.mark_piece_needed(piece); + } + } + } + true + } + fn add_peer(&self, addr: SocketAddr) { + let (out_tx, out_rx) = tokio::sync::mpsc::channel::(1); + let handle = match self + .inner + .locked + .write() + .peers + .add_if_not_seen(addr, out_tx) + { + Some(handle) => handle, + None => return, + }; + + let this = self.clone(); + spawn(format!("manage_peer({})", handle), async move { + if let Err(e) = this + .manage_peer(addr, handle, this.inner.incoming_tx.clone(), out_rx) + .await + { + error!("error managing peer, will drop {}: {:#}", handle, e) + }; + this.drop_peer(handle); + Ok::<_, anyhow::Error>(()) + }); + } +} diff --git a/crates/librqbit/src/torrent_metainfo.rs b/crates/librqbit/src/torrent_metainfo.rs new file mode 100644 index 0000000..c348dbd --- /dev/null +++ b/crates/librqbit/src/torrent_metainfo.rs @@ -0,0 +1,253 @@ +use std::{fs::File, ops::Deref, path::PathBuf}; + +use serde::Deserialize; + +use crate::{ + buffers::{ByteBuf, ByteString}, + clone_to_owned::CloneToOwned, + serde_bencode::BencodeDeserializer, +}; + +pub type TorrentMetaV1Borrowed<'a> = TorrentMetaV1>; +pub type TorrentMetaV1Owned = TorrentMetaV1; + +pub fn torrent_from_bytes(buf: &[u8]) -> anyhow::Result> { + let mut de = BencodeDeserializer::new_from_buf(buf); + de.is_torrent_info = true; + let mut t = TorrentMetaV1::deserialize(&mut de)?; + t.info_hash = de.torrent_info_digest.unwrap(); + Ok(t) +} + +pub fn torrent_from_bytes_owned(buf: &[u8]) -> anyhow::Result { + let mut de = BencodeDeserializer::new_from_buf(buf); + de.is_torrent_info = true; + let mut t = TorrentMetaV1Owned::deserialize(&mut de)?; + t.info_hash = de.torrent_info_digest.unwrap(); + Ok(t) +} + +#[derive(Deserialize, Debug, Clone)] +pub struct TorrentMetaV1 { + pub announce: BufType, + #[serde(rename = "announce-list")] + pub announce_list: Vec>, + pub info: TorrentMetaV1Info, + pub comment: Option, + #[serde(rename = "created by")] + pub created_by: Option, + pub encoding: Option, + pub publisher: Option, + #[serde(rename = "publisher-url")] + pub publisher_url: Option, + #[serde(rename = "creation date")] + pub creation_date: Option, + + #[serde(skip)] + pub info_hash: [u8; 20], +} + +impl TorrentMetaV1 { + pub fn iter_announce(&self) -> impl Iterator { + std::iter::once(&self.announce).chain(self.announce_list.iter().flatten()) + } +} + +#[derive(Deserialize, Debug, Clone)] +pub struct TorrentMetaV1Info { + pub name: Option, + pub pieces: BufType, + #[serde(rename = "piece length")] + pub piece_length: u32, + + // Single-file mode + pub length: Option, + pub md5sum: Option, + + // Multi-file mode + pub files: Option>>, +} + +pub enum FileIteratorName<'a, ByteBuf> { + Single(Option<&'a ByteBuf>), + Tree(&'a [ByteBuf]), +} + +impl<'a, ByteBuf> FileIteratorName<'a, ByteBuf> { + pub fn iter_components(&self) -> impl Iterator> { + let single_it = std::iter::once(match self { + FileIteratorName::Single(n) => Some(*n), + FileIteratorName::Tree(_) => None, + }); + let multi_it = match self { + FileIteratorName::Single(_) => &[], + FileIteratorName::Tree(t) => *t, + } + .iter() + .map(|p| Some(Some(p))); + + single_it.chain(multi_it).flatten() + } +} + +impl> TorrentMetaV1Info { + pub fn compare_hash(&self, piece: u32, hash: &sha1::Sha1) -> Option { + let start = piece as usize * 20; + let end = start + 20; + let expected_hash = self.pieces.deref().get(start..end)?; + Some(expected_hash == hash.digest().bytes()) + } + pub fn iter_filenames_and_lengths( + &self, + ) -> impl Iterator, u64)> { + let single_it = std::iter::once(match (self.name.as_ref(), self.length) { + (Some(n), Some(l)) => Some((FileIteratorName::Single(Some(n)), l)), + _ => None, + }); + let multi_it = self + .files + .as_deref() + .unwrap_or_default() + .iter() + .map(|f| Some((FileIteratorName::Tree(&f.path), f.length))); + single_it.chain(multi_it).flatten() + } + pub fn iter_file_lengths(&self) -> impl Iterator + '_ { + std::iter::once(self.length) + .chain( + self.files + .as_deref() + .unwrap_or_default() + .iter() + .map(|f| Some(f.length)), + ) + .flatten() + } +} + +#[derive(Deserialize, Debug, Clone)] +pub struct TorrentMetaV1File { + pub length: u64, + pub path: Vec, +} + +impl TorrentMetaV1File +where + BufType: Clone + AsRef<[u8]>, +{ + pub fn full_path(&self, parent: &mut PathBuf) -> anyhow::Result<()> { + for p in self.path.iter() { + let bit = std::str::from_utf8(p.as_ref())?; + parent.push(bit); + } + Ok(()) + } +} + +impl CloneToOwned for TorrentMetaV1File +where + ByteBuf: CloneToOwned + Clone, + ::Target: Clone, +{ + type Target = TorrentMetaV1File<::Target>; + + fn clone_to_owned(&self) -> Self::Target { + TorrentMetaV1File { + length: self.length, + path: self.path.clone_to_owned(), + } + } +} + +impl CloneToOwned for TorrentMetaV1Info +where + ByteBuf: CloneToOwned + Clone, + ::Target: Clone, +{ + type Target = TorrentMetaV1Info<::Target>; + + fn clone_to_owned(&self) -> Self::Target { + TorrentMetaV1Info { + name: self.name.clone_to_owned(), + pieces: self.pieces.clone_to_owned(), + piece_length: self.piece_length, + length: self.length, + md5sum: self.md5sum.clone_to_owned(), + files: self.files.clone_to_owned(), + } + } +} + +impl CloneToOwned for TorrentMetaV1 +where + ByteBuf: CloneToOwned + Clone, + ::Target: Clone, +{ + type Target = TorrentMetaV1<::Target>; + + fn clone_to_owned(&self) -> Self::Target { + TorrentMetaV1 { + announce: self.announce.clone_to_owned(), + announce_list: self.announce_list.clone_to_owned(), + info: self.info.clone_to_owned(), + comment: self.comment.clone_to_owned(), + created_by: self.created_by.clone_to_owned(), + encoding: self.encoding.clone_to_owned(), + publisher: self.publisher.clone_to_owned(), + publisher_url: self.publisher_url.clone_to_owned(), + creation_date: self.creation_date, + info_hash: self.info_hash, + } + } +} + +#[cfg(test)] +mod tests { + use std::io::Read; + + use crate::serde_bencode::from_bytes; + + use super::*; + + #[test] + fn test_deserialize_torrent_owned() { + let mut buf = Vec::new(); + let filename = "resources/ubuntu-21.04-desktop-amd64.iso.torrent"; + std::fs::File::open(filename) + .unwrap() + .read_to_end(&mut buf) + .unwrap(); + + let torrent: TorrentMetaV1Owned = from_bytes(&buf).unwrap(); + dbg!(torrent); + } + + #[test] + fn test_deserialize_torrent_borrowed() { + let mut buf = Vec::new(); + let filename = "resources/ubuntu-21.04-desktop-amd64.iso.torrent"; + std::fs::File::open(filename) + .unwrap() + .read_to_end(&mut buf) + .unwrap(); + + let torrent: TorrentMetaV1Borrowed = from_bytes(&buf).unwrap(); + dbg!(torrent); + } + + #[test] + fn test_deserialize_torrent_with_info_hash() { + let mut buf = Vec::new(); + let filename = "resources/ubuntu-21.04-desktop-amd64.iso.torrent"; + std::fs::File::open(filename) + .unwrap() + .read_to_end(&mut buf) + .unwrap(); + + let torrent = torrent_from_bytes(&buf).unwrap(); + assert_eq!( + torrent.info_hash, + *b"\x64\xa9\x80\xab\xe6\xe4\x48\x22\x6b\xb9\x30\xba\x06\x15\x92\xe4\x4c\x37\x81\xa1" + ); + } +} diff --git a/crates/librqbit/src/tracker_comms.rs b/crates/librqbit/src/tracker_comms.rs new file mode 100644 index 0000000..18de7a9 --- /dev/null +++ b/crates/librqbit/src/tracker_comms.rs @@ -0,0 +1,228 @@ +use byteorder::ByteOrder; +use serde::{Deserialize, Deserializer}; +use std::{ + fmt::Write, + marker::PhantomData, + net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, + str::FromStr, +}; + +use crate::buffers::ByteBuf; + +#[derive(Clone, Copy)] +pub enum TrackerRequestEvent { + Started, + Stopped, + Completed, +} + +pub struct TrackerRequest { + pub info_hash: [u8; 20], + pub peer_id: [u8; 20], + pub event: Option, + pub port: u16, + pub uploaded: u64, + pub downloaded: u64, + pub left: u64, + pub compact: bool, + pub no_peer_id: bool, + + pub ip: Option, + pub numwant: Option, + pub key: Option, + pub trackerid: Option, +} + +#[derive(Deserialize, Debug)] +pub struct TrackerError<'a> { + #[serde(rename = "failure reason", borrow)] + failure_reason: ByteBuf<'a>, +} + +#[derive(Deserialize, Debug)] +pub struct DictPeer<'a> { + #[serde(deserialize_with = "deserialize_ip_string")] + ip: IpAddr, + #[serde(borrow)] + peer_id: Option>, + port: u16, +} + +impl<'a> DictPeer<'a> { + fn as_sockaddr(&self) -> SocketAddr { + SocketAddr::new(self.ip, self.port) + } +} + +#[derive(Debug)] +pub enum Peers<'a> { + Full(Vec>), + Compact(Vec), +} + +impl<'a> Peers<'a> { + pub fn iter_sockaddrs(&self) -> Box + '_> { + match self { + Peers::Full(d) => Box::new(d.iter().map(DictPeer::as_sockaddr)), + Peers::Compact(c) => Box::new(c.iter().copied().map(SocketAddr::V4)), + } + } +} + +impl<'de: 'a, 'a> serde::de::Deserialize<'de> for Peers<'a> { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct Visitor<'de> { + phantom: std::marker::PhantomData<&'de ()>, + } + impl<'de> serde::de::Visitor<'de> for Visitor<'de> { + type Value = Peers<'de>; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a list of peers in dict or binary format") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut peers = Vec::new(); + while let Some(peer) = seq.next_element::()? { + peers.push(peer) + } + Ok(Peers::Full(peers)) + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + Ok(Peers::Compact(parse_compact_peers(v))) + } + } + deserializer.deserialize_any(Visitor { + phantom: PhantomData, + }) + } +} + +fn deserialize_ip_string<'de, D>(de: D) -> Result +where + D: Deserializer<'de>, +{ + struct Visitor; + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = IpAddr; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expecting an IPv4 address") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + IpAddr::from_str(v).map_err(|e| E::custom(format!("cannot parse ip: {}", e))) + } + } + de.deserialize_str(Visitor {}) +} + +fn parse_compact_peers(b: &[u8]) -> Vec { + let mut ips = Vec::new(); + for chunk in b.chunks_exact(6) { + let ip_chunk = &chunk[..4]; + let port_chunk = &chunk[4..6]; + let ipaddr = Ipv4Addr::new(ip_chunk[0], ip_chunk[1], ip_chunk[2], ip_chunk[3]); + let port = byteorder::BigEndian::read_u16(port_chunk); + ips.push(SocketAddrV4::new(ipaddr, port)); + } + ips +} + +#[derive(Deserialize, Debug)] +pub struct CompactTrackerResponse<'a> { + #[serde(rename = "warning message", borrow)] + pub warning_message: Option>, + pub complete: u64, + pub interval: u64, + #[serde(rename = "min interval")] + pub min_interval: Option, + pub tracker_id: Option>, + pub incomplete: u64, + pub peers: Peers<'a>, +} + +impl TrackerRequest { + pub fn as_querystring(&self) -> String { + use urlencoding as u; + let mut s = String::new(); + s.push_str("info_hash="); + s.push_str(u::encode_binary(&self.info_hash).as_ref()); + s.push_str("&peer_id="); + s.push_str(u::encode_binary(&self.peer_id).as_ref()); + if let Some(event) = self.event { + write!( + s, + "&event={}", + match event { + TrackerRequestEvent::Started => "started", + TrackerRequestEvent::Stopped => "stopped", + TrackerRequestEvent::Completed => "completed", + } + ) + .unwrap(); + } + write!(s, "&port={}", self.port).unwrap(); + write!(s, "&uploaded={}", self.uploaded).unwrap(); + write!(s, "&downloaded={}", self.downloaded).unwrap(); + write!(s, "&left={}", self.left).unwrap(); + write!(s, "&compact={}", if self.compact { 1 } else { 0 }).unwrap(); + write!(s, "&no_peer_id={}", if self.no_peer_id { 1 } else { 0 }).unwrap(); + if let Some(ip) = &self.ip { + write!(s, "&ip={}", ip).unwrap(); + } + if let Some(numwant) = &self.numwant { + write!(s, "&numwant={}", numwant).unwrap(); + } + if let Some(key) = &self.key { + write!(s, "&key={}", key).unwrap(); + } + if let Some(trackerid) = &self.trackerid { + write!(s, "&trackerid={}", trackerid).unwrap(); + } + s + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_serialize() { + let info_hash = [ + 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + ]; + let peer_id = [ + 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + ]; + let request = TrackerRequest { + info_hash, + peer_id, + port: 6881, + uploaded: 0, + downloaded: 0, + left: 1024 * 1024, + compact: true, + no_peer_id: false, + event: Some(TrackerRequestEvent::Started), + ip: Some("127.0.0.1".parse().unwrap()), + numwant: None, + key: None, + trackerid: None, + }; + dbg!(request.as_querystring()); + } +} diff --git a/crates/librqbit/src/type_aliases.rs b/crates/librqbit/src/type_aliases.rs new file mode 100644 index 0000000..2dfc5b9 --- /dev/null +++ b/crates/librqbit/src/type_aliases.rs @@ -0,0 +1 @@ +pub type BF = bitvec::vec::BitVec; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..fea5d8a --- /dev/null +++ b/src/main.rs @@ -0,0 +1,71 @@ +use std::{fs::File, io::Read}; + +use anyhow::Context; +use clap::Clap; +use librqbit::{ + clone_to_owned::CloneToOwned, torrent_manager::TorrentManagerBuilder, + torrent_metainfo::torrent_from_bytes, +}; +use log::info; + +#[derive(Clap)] +#[clap(version = "1.0", author = "Igor Katson ")] +struct Opts { + /// The filename or URL of the .torrent file. + torrent_path: String, + + /// The filename of the .torrent file. + output_folder: String, + + /// Set if you are ok to write on top of existing files + #[clap(long)] + overwrite: bool, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + pretty_env_logger::init(); + + let opts = Opts::parse(); + + let torrent = + if opts.torrent_path.starts_with("http://") || opts.torrent_path.starts_with("https://") { + let response = reqwest::get(&opts.torrent_path).await.with_context(|| { + format!( + "error downloading torrent metadata from {}", + &opts.torrent_path + ) + })?; + if !response.status().is_success() { + anyhow::bail!("GET {} returned {}", &opts.torrent_path, response.status()) + } + let b = response.bytes().await.with_context(|| { + format!("error reading repsonse body from {}", &opts.torrent_path) + })?; + torrent_from_bytes(&b) + .context("error decoding torrent")? + .clone_to_owned() + } else { + let mut buf = Vec::new(); + if opts.torrent_path == "-" { + std::io::stdin() + .read_to_end(&mut buf) + .context("error reading stdin")?; + } else { + File::open(&opts.torrent_path) + .with_context(|| format!("error opening {}", &opts.torrent_path))? + .read_to_end(&mut buf) + .with_context(|| format!("error reading {}", &opts.torrent_path))?; + } + torrent_from_bytes(&buf) + .context("error decoding torrent")? + .clone_to_owned() + }; + + info!("Torrent metadata: {:#?}", &torrent); + + let builder = TorrentManagerBuilder::new(torrent, opts.output_folder).overwrite(opts.overwrite); + let manager_handle = builder.start_manager().await?; + manager_handle.wait_until_completed().await?; + Ok(()) +}