Merge pull request #38 from ikatson/refactoring-2023-12

Refactoring DHT
This commit is contained in:
Igor Katson 2023-12-01 11:39:12 +00:00 committed by GitHub
commit 64d22577d0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 2138 additions and 928 deletions

68
Cargo.lock generated
View file

@ -26,6 +26,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.4"
@ -275,6 +290,21 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"wasm-bindgen",
"windows-targets",
]
[[package]]
name = "clap"
version = "4.4.8"
@ -462,6 +492,7 @@ dependencies = [
"lock_api",
"once_cell",
"parking_lot_core",
"serde",
]
[[package]]
@ -886,6 +917,29 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "iana-time-zone"
version = "0.1.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "idna"
version = "0.4.0"
@ -1005,6 +1059,7 @@ dependencies = [
"anyhow",
"axum",
"backoff",
"base64",
"bincode",
"bitvec",
"byteorder",
@ -1076,6 +1131,7 @@ dependencies = [
"librqbit-clone-to-owned",
"parking_lot",
"serde",
"serde_json",
"tokio",
"tracing",
"url",
@ -1087,6 +1143,9 @@ name = "librqbit-dht"
version = "3.2.0"
dependencies = [
"anyhow",
"backoff",
"chrono",
"dashmap",
"directories",
"futures",
"hex 0.4.3",
@ -2511,6 +2570,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.51.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-sys"
version = "0.48.0"

29
TODO.md
View file

@ -1,4 +1,4 @@
- [ ] when we have the whole torrent, there's no point talking to peers that also have the whole torrent and keep reconnecting to them.
- [x] when we have the whole torrent, there's no point talking to peers that also have the whole torrent and keep reconnecting to them.
- [ ] per-file stats
- [x (partial)] per-peer stats
- [x] use some concurrent hashmap e.g. flurry or dashmap
@ -8,21 +8,37 @@
- [x] initializing/checking
- [x] blocks the whole process. Need to break it up. On slower devices (rpi) just hangs for a good while
- [x] checking torrents should be visible right away
- [ ] server persistence
- [ ] it would be nice to restart the server and keep the state
- [x] server persistence
- [x] it would be nice to restart the server and keep the state
- [x] torrent actions
- [x] pause/unpause
- [x] remove including from disk
- [ ] DHT
- [ ] for torrents with a few seeds might be cool to re-query DHT once in a while.
- [x] bootstrapping is lame
- [x] many nodes in "Unknown" status, do smth about it
- [x] for torrents with a few seeds might be cool to re-query DHT once in a while.
- [x] don't leak memory when deleting torrents (i.e. remove torrent information (seen peers etc) once the torrent is deleted)
- [x] Routing table - is it balanced properly?
- [ ]
- [x] Don't query Bad nodes
- [-] Buckets that have not been changed in 15 minutes should be "refreshed." (per RFC)
- [x] Did it, but it's flawed: starts repeating the same queries again as neighboring refreshes
don't know about the other ones, and DHT returns the same nodes again and again.
- [x] it's sending many requests now way too fast, locks up Mac OS UI annoyingly
- [x] store peers sent to us with "announce_peer"
- [ ] announced peers should be persisted
- [ ] After the search is exhausted, the client then inserts the peer contact information for itself onto the responding nodes with IDs closest to the infohash of the torrent.
To do this, a
- [x] Ensure that if we query the "returned" nodes, they are even closer to our request than the responding node id was.
someday:
- [x] cancellation from the client-side for the lib (i.e. stop the torrent manager)
- [ ] favicons for Web UI
- [x] favicons for Web UI
refactor:
- [ ] session persistence: should add torrents even if we haven't resolved it yet
- [x] where are peers stored
- [x] http api pause/unpause etc
- [x] when a live torrent fails writing to disk, it should transition to error state
@ -30,7 +46,6 @@ refactor:
- [x] silence this: WARN torrent{id=0}:external_peer_adder: librqbit::spawn_utils: finished with error: no longer live
- [x] start from error state should be possible from UI
- [ ] if the torrent was completed, not need to re-check it
- [x] checking is very slow on raspberry
checked. nothing much can be done here. Even if raspberry's own libssl.so is used it's still super slow (sha1)
- [ ] .rqbit-session.json file has 0 bytes when disk full. I guess fs::rename does this when disk is full? at least on linux
- [ ] .rqbit-session.json file has 0 bytes when disk full. I guess fs::rename does this when disk is full? at least on linux. Couldn't repro on MacOS

View file

@ -27,13 +27,16 @@ bencode = {path = "../bencode", default-features=false, package="librqbit-bencod
anyhow = "1"
parking_lot = "0.12"
tracing = "0.1"
backoff = "0.4.0"
futures = "0.3"
rand = "0.8"
indexmap = "2"
directories = "5"
dashmap = {version = "5.5.3", features = ["serde"]}
clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
librqbit-core = {path="../librqbit_core", version = "3.1.0"}
chrono = {version = "0.4.31", features = ["serde"]}
[dev-dependencies]
tracing-subscriber = "0.3"
tracing-subscriber = "0.3"

View file

@ -2,7 +2,7 @@ use std::time::Duration;
use anyhow::Context;
use librqbit_core::magnet::Magnet;
use librqbit_dht::Dht;
use librqbit_dht::DhtBuilder;
use tokio_stream::StreamExt;
use tracing::info;
@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let dht = Dht::new().await.context("error initializing DHT")?;
let dht = DhtBuilder::new().await.context("error initializing DHT")?;
let mut stream = dht.get_peers(info_hash)?;
let stats_printer = async {
@ -36,6 +36,7 @@ async fn main() -> anyhow::Result<()> {
let mut f = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(filename)
.unwrap();
serde_json::to_writer_pretty(&mut f, r).unwrap();

View file

@ -4,7 +4,7 @@ use std::{
net::{Ipv4Addr, SocketAddrV4},
};
use bencode::ByteBuf;
use bencode::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned;
use librqbit_core::id20::Id20;
use serde::{
@ -163,17 +163,27 @@ struct RawMessage<BufT, Args = IgnoredAny, Resp = IgnoredAny> {
ip: Option<CompactPeerInfo>,
}
#[derive(Debug)]
pub struct Node {
pub id: Id20,
pub addr: SocketAddrV4,
}
#[derive(Debug)]
impl core::fmt::Debug for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}={:?}", self.addr, self.id)
}
}
pub struct CompactNodeInfo {
pub nodes: Vec<Node>,
}
impl core::fmt::Debug for CompactNodeInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.nodes)
}
}
impl Serialize for CompactNodeInfo {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@ -230,11 +240,16 @@ impl<'de> Deserialize<'de> for CompactNodeInfo {
}
}
#[derive(Debug)]
pub struct CompactPeerInfo {
pub addr: SocketAddrV4,
}
impl core::fmt::Debug for CompactPeerInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.addr)
}
}
impl Serialize for CompactPeerInfo {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@ -292,12 +307,12 @@ pub struct FindNodeRequest {
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct Response<BufT> {
#[serde(skip_serializing_if = "Option::is_none")]
pub values: Option<Vec<CompactPeerInfo>>,
pub id: Id20,
#[serde(skip_serializing_if = "Option::is_none")]
pub nodes: Option<CompactNodeInfo>,
#[serde(skip_serializing_if = "Option::is_none")]
pub values: Option<Vec<CompactPeerInfo>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub token: Option<BufT>,
}
@ -309,7 +324,16 @@ pub struct GetPeersRequest {
#[derive(Debug, Serialize, Deserialize)]
pub struct PingRequest {
id: Id20,
pub id: Id20,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AnnouncePeer<BufT> {
pub id: Id20,
pub implied_port: u8,
pub info_hash: Id20,
pub port: u16,
pub token: BufT,
}
#[derive(Debug, Serialize, Deserialize)]
@ -326,19 +350,43 @@ pub struct GetPeersResponse<BufT> {
#[derive(Debug)]
pub struct Message<BufT> {
pub kind: MessageKind<BufT>,
pub transaction_id: BufT,
pub version: Option<BufT>,
pub ip: Option<SocketAddrV4>,
pub kind: MessageKind<BufT>,
}
#[derive(Debug)]
impl Message<ByteString> {
// This implies that the transaction id was generated by us.
pub fn get_our_transaction_id(&self) -> Option<u16> {
if self.transaction_id.len() != 2 {
return None;
}
let tid = ((self.transaction_id[0] as u16) << 8) + (self.transaction_id[1] as u16);
Some(tid)
}
}
pub enum MessageKind<BufT> {
Error(ErrorDescription<BufT>),
GetPeersRequest(GetPeersRequest),
FindNodeRequest(FindNodeRequest),
Response(Response<BufT>),
PingRequest(PingRequest),
AnnouncePeer(AnnouncePeer<BufT>),
}
impl<BufT: core::fmt::Debug> core::fmt::Debug for MessageKind<BufT> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Error(e) => write!(f, "{e:?}"),
Self::GetPeersRequest(r) => write!(f, "{r:?}"),
Self::FindNodeRequest(r) => write!(f, "{r:?}"),
Self::Response(r) => write!(f, "{r:?}"),
Self::PingRequest(r) => write!(f, "{r:?}"),
Self::AnnouncePeer(r) => write!(f, "{r:?}"),
}
}
}
pub fn serialize_message<'a, W: Write, BufT: Serialize + From<&'a [u8]>>(
@ -415,6 +463,19 @@ pub fn serialize_message<'a, W: Write, BufT: Serialize + From<&'a [u8]>>(
};
Ok(bencode::bencode_serialize_to_writer(msg, writer)?)
}
MessageKind::AnnouncePeer(announce) => {
let msg: RawMessage<BufT, _, ()> = RawMessage {
message_type: MessageType::Request,
transaction_id,
error: None,
response: None,
method_name: Some(BufT::from(b"announce_peer")),
arguments: Some(announce),
ip,
version,
};
Ok(bencode::bencode_serialize_to_writer(msg, writer)?)
}
}
}
@ -453,6 +514,15 @@ where
kind: MessageKind::PingRequest(de.arguments.unwrap()),
})
}
b"announce_peer" => {
let de: RawMessage<BufT, AnnouncePeer<BufT>> = bencode::from_bytes(buf)?;
Ok(Message {
transaction_id: de.transaction_id,
version: de.version,
ip: de.ip.map(|c| c.addr),
kind: MessageKind::AnnouncePeer(de.arguments.unwrap())
})
}
other => anyhow::bail!("unsupported method {:?}", ByteBuf(other)),
},
_ => anyhow::bail!(
@ -615,6 +685,22 @@ mod tests {
test_deserialize_then_serialize_hex(WHAT_IS_THAT, "what_is_that")
}
#[test]
fn test_announce() {
let ann = b"d1:ad2:id20:abcdefghij012345678912:implied_porti1e9:info_hash20:mnopqrstuvwxyz1234564:porti6881e5:token8:aoeusnthe1:q13:announce_peer1:t2:aa1:y1:qe";
let msg = bprotocol::deserialize_message::<ByteBuf>(ann).unwrap();
match &msg.kind {
bprotocol::MessageKind::AnnouncePeer(ann) => {
dbg!(&ann);
}
_ => panic!("wrong kind"),
}
let mut buf = Vec::new();
bprotocol::serialize_message(&mut buf, msg.transaction_id, msg.version, msg.ip, msg.kind)
.unwrap();
assert_eq!(ann[..], buf[..]);
}
#[test]
fn deserialize_bencode_packets_captured_from_wireshark() {
debug_hex_bencode("req: find_node", FIND_NODE_REQUEST);

File diff suppressed because it is too large Load diff

View file

@ -1,12 +1,38 @@
mod bprotocol;
mod dht;
mod peer_store;
mod persistence;
mod routing_table;
mod utils;
use std::sync::Arc;
use std::time::Duration;
pub use crate::dht::DhtStats;
pub use crate::dht::{Dht, DhtConfig};
pub use crate::dht::{DhtConfig, DhtState, RequestPeersStream};
pub use librqbit_core::id20::Id20;
pub use persistence::{PersistentDht, PersistentDhtConfig};
pub type Dht = Arc<DhtState>;
// How long do we wait for a response from a DHT node.
pub(crate) const RESPONSE_TIMEOUT: Duration = Duration::from_secs(60);
// TODO: Not sure if we should re-query tbh.
pub(crate) const REQUERY_INTERVAL: Duration = Duration::from_secs(60);
// After how long we consider a routing table node questionable.
pub(crate) const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(15 * 60);
pub struct DhtBuilder {}
impl DhtBuilder {
#[allow(clippy::new_ret_no_self)]
pub async fn new() -> anyhow::Result<Dht> {
DhtState::new().await
}
pub async fn with_config(config: DhtConfig) -> anyhow::Result<Dht> {
DhtState::with_config(config).await
}
}
pub static DHT_BOOTSTRAP: &[&str] = &["dht.transmissionbt.com:6881", "dht.libtorrent.org:25401"];

View file

@ -0,0 +1,215 @@
use std::{
collections::VecDeque,
net::{SocketAddr, SocketAddrV4},
str::FromStr,
sync::atomic::AtomicU32,
};
use bencode::ByteString;
use chrono::{DateTime, Utc};
use librqbit_core::id20::Id20;
use parking_lot::RwLock;
use rand::RngCore;
use serde::{
ser::{SerializeMap, SerializeStruct},
Deserialize, Serialize,
};
use tracing::trace;
use crate::bprotocol::{AnnouncePeer, CompactPeerInfo};
#[derive(Serialize, Deserialize)]
struct StoredToken {
token: [u8; 4],
#[serde(serialize_with = "crate::utils::serialize_id20")]
node_id: Id20,
addr: SocketAddr,
}
#[derive(Serialize, Deserialize)]
struct StoredPeer {
addr: SocketAddrV4,
time: DateTime<Utc>,
}
pub struct PeerStore {
self_id: Id20,
max_remembered_tokens: u32,
max_remembered_peers: u32,
max_distance: Id20,
tokens: RwLock<VecDeque<StoredToken>>,
peers: dashmap::DashMap<Id20, Vec<StoredPeer>>,
peers_len: AtomicU32,
}
impl Serialize for PeerStore {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
struct SerializePeers<'a> {
peers: &'a dashmap::DashMap<Id20, Vec<StoredPeer>>,
}
impl<'a> Serialize for SerializePeers<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut m = serializer.serialize_map(None)?;
for entry in self.peers.iter() {
m.serialize_entry(&entry.key().as_string(), &entry.value())?;
}
m.end()
}
}
let mut s = serializer.serialize_struct("PeerStore", 7)?;
s.serialize_field("self_id", &self.self_id.as_string())?;
s.serialize_field("max_remembered_tokens", &self.max_remembered_tokens)?;
s.serialize_field("max_remembered_peers", &self.max_remembered_peers)?;
s.serialize_field("max_distance", &self.max_distance.as_string())?;
s.serialize_field("tokens", &*self.tokens.read())?;
s.serialize_field("peers", &SerializePeers { peers: &self.peers })?;
s.serialize_field(
"peers_len",
&self.peers_len.load(std::sync::atomic::Ordering::SeqCst),
)?;
s.end()
}
}
impl<'de> Deserialize<'de> for PeerStore {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
struct Tmp {
self_id: Id20,
max_remembered_tokens: u32,
max_remembered_peers: u32,
max_distance: Id20,
tokens: VecDeque<StoredToken>,
peers: dashmap::DashMap<Id20, Vec<StoredPeer>>,
}
Tmp::deserialize(deserializer).map(|tmp| Self {
self_id: tmp.self_id,
max_remembered_tokens: tmp.max_remembered_tokens,
max_remembered_peers: tmp.max_remembered_peers,
max_distance: tmp.max_distance,
tokens: RwLock::new(tmp.tokens),
peers_len: AtomicU32::new(tmp.peers.iter().map(|e| e.value().len() as u32).sum()),
peers: tmp.peers,
})
}
}
impl PeerStore {
pub fn new(self_id: Id20) -> Self {
Self {
self_id,
max_remembered_tokens: 1000,
max_remembered_peers: 1000,
max_distance: Id20::from_str("00000fffffffffffffffffffffffffffffffffff").unwrap(),
tokens: RwLock::new(VecDeque::new()),
peers: dashmap::DashMap::new(),
peers_len: AtomicU32::new(0),
}
}
pub fn gen_token_for(&self, node_id: Id20, addr: SocketAddr) -> [u8; 4] {
let mut token = [0u8; 4];
rand::thread_rng().fill_bytes(&mut token);
let mut tokens = self.tokens.write();
tokens.push_back(StoredToken {
token,
addr,
node_id,
});
if tokens.len() > self.max_remembered_tokens as usize {
tokens.pop_front();
}
token
}
pub fn store_peer(&self, announce: &AnnouncePeer<ByteString>, addr: SocketAddr) -> bool {
// If the info_hash in announce is too far away from us, don't store it.
// If the token doesn't match, don't store it.
// If we are out of capacity, don't store it.
// Otherwise, store it.
let mut addr = match addr {
SocketAddr::V4(addr) => addr,
SocketAddr::V6(_) => {
trace!("peer store: IPv6 not supported");
return false;
}
};
if announce.info_hash.distance(&self.self_id) > self.max_distance {
trace!("peer store: info_hash too far to store");
return false;
}
if !self.tokens.read().iter().any(|t| {
t.token[..] == announce.token[..]
&& t.addr == std::net::SocketAddr::V4(addr)
&& t.node_id == announce.id
}) {
trace!("peer store: can't find this token / addr combination");
return false;
}
if announce.implied_port == 0 {
addr.set_port(announce.port);
}
use dashmap::mapref::entry::Entry;
let peers_entry = self.peers.entry(announce.info_hash);
let peers_len = self.peers_len.load(std::sync::atomic::Ordering::SeqCst);
match peers_entry {
Entry::Occupied(mut occ) => {
if let Some(s) = occ.get_mut().iter_mut().find(|s| s.addr == addr) {
s.time = Utc::now();
return true;
}
if peers_len >= self.max_remembered_peers {
trace!("peer store: out of capacity");
return false;
}
occ.get_mut().push(StoredPeer {
addr,
time: Utc::now(),
});
}
Entry::Vacant(vac) => {
if peers_len >= self.max_remembered_peers {
trace!("peer store: out of capacity");
return false;
}
vac.insert(vec![StoredPeer {
addr,
time: Utc::now(),
}]);
}
}
self.peers_len
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
true
}
pub fn get_for_info_hash(&self, info_hash: Id20) -> Vec<CompactPeerInfo> {
if let Some(stored_peers) = self.peers.get(&info_hash) {
return stored_peers
.iter()
.map(|p| CompactPeerInfo { addr: p.addr })
.collect();
}
Vec::new()
}
pub fn garbage_collect_peers(&self) {
todo!()
}
}

View file

@ -11,8 +11,9 @@ use std::time::Duration;
use anyhow::Context;
use tracing::{debug, error, error_span, info, trace, warn};
use crate::dht::{Dht, DhtConfig};
use crate::peer_store::PeerStore;
use crate::routing_table::RoutingTable;
use crate::{Dht, DhtConfig, DhtState};
#[derive(Default, Clone)]
pub struct PersistentDhtConfig {
@ -21,9 +22,10 @@ pub struct PersistentDhtConfig {
}
#[derive(Serialize, Deserialize)]
struct DhtSerialize<Table> {
struct DhtSerialize<Table, PeerStore> {
addr: SocketAddr,
table: Table,
peer_store: Option<PeerStore>,
}
pub struct PersistentDht {
@ -40,11 +42,18 @@ fn dump_dht(dht: &Dht, filename: &Path, tempfile_name: &Path) -> anyhow::Result<
let mut file = BufWriter::new(file);
let addr = dht.listen_addr();
match dht
.with_routing_table(|r| serde_json::to_writer(&mut file, &DhtSerialize { addr, table: r }))
{
match dht.with_routing_table(|r| {
serde_json::to_writer(
&mut file,
&DhtSerialize {
addr,
table: r,
peer_store: Some(&dht.peer_store),
},
)
}) {
Ok(_) => {
debug!("dumped DHT to {:?}", &tempfile_name);
trace!("dumped DHT to {:?}", &tempfile_name);
}
Err(e) => {
return Err(e).with_context(|| {
@ -79,7 +88,7 @@ impl PersistentDht {
let de = match OpenOptions::new().read(true).open(&config_filename) {
Ok(dht_json) => {
let reader = BufReader::new(dht_json);
match serde_json::from_reader::<_, DhtSerialize<RoutingTable>>(reader) {
match serde_json::from_reader::<_, DhtSerialize<RoutingTable, PeerStore>>(reader) {
Ok(r) => {
info!("loaded DHT routing table from {:?}", &config_filename);
Some(r)
@ -98,17 +107,18 @@ impl PersistentDht {
_ => return Err(e).with_context(|| format!("error reading {config_filename:?}")),
},
};
let (listen_addr, routing_table) = de
.map(|de| (Some(de.addr), Some(de.table)))
.unwrap_or((None, None));
let (listen_addr, routing_table, peer_store) = de
.map(|de| (Some(de.addr), Some(de.table), de.peer_store))
.unwrap_or((None, None, None));
let peer_id = routing_table.as_ref().map(|r| r.id());
let dht_config = DhtConfig {
peer_id,
routing_table,
listen_addr,
peer_store,
..Default::default()
};
let dht = Dht::with_config(dht_config).await?;
let dht = DhtState::with_config(dht_config).await?;
spawn(error_span!("dht_persistence"), {
let dht = dht.clone();

View file

@ -1,16 +1,61 @@
use std::{
net::SocketAddr,
time::{Duration, Instant},
};
use std::{net::SocketAddr, time::Instant};
use librqbit_core::id20::Id20;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tracing::debug;
use rand::RngCore;
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
use tracing::{debug, trace};
use crate::INACTIVITY_TIMEOUT;
#[derive(Clone, Debug)]
pub struct LeafBucket {
pub nodes: Vec<RoutingTableNode>,
pub last_refreshed: Instant,
}
impl Serialize for LeafBucket {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut s = serializer.serialize_struct("LeafBucket", 2)?;
s.serialize_field("nodes", &self.nodes)?;
s.serialize_field(
"last_refreshed",
&format!("{:?}", self.last_refreshed.elapsed()),
)?;
s.end()
}
}
impl<'de> Deserialize<'de> for LeafBucket {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
struct Tmp {
nodes: Vec<RoutingTableNode>,
}
Tmp::deserialize(deserializer).map(|t| Self {
nodes: t.nodes,
last_refreshed: Instant::now(),
})
}
}
impl Default for LeafBucket {
fn default() -> Self {
Self {
nodes: Default::default(),
last_refreshed: Instant::now(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum BucketTreeNodeData {
// TODO: maybe replace that with SmallVec<8>?
Leaf(Vec<RoutingTableNode>),
Leaf(LeafBucket),
LeftRight(usize, usize),
}
@ -24,167 +69,77 @@ struct BucketTreeNode {
data: BucketTreeNodeData,
}
#[derive(Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BucketTree {
data: Vec<BucketTreeNode>,
size: usize,
max_size: usize,
}
impl<'de> Deserialize<'de> for BucketTree {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct Visitor;
impl<'de> serde::de::Visitor<'de> for Visitor {
type Value = BucketTree;
pub struct BucketTreeIteratorItem<'a> {
pub bits: u8,
pub start: &'a Id20,
pub end_inclusive: &'a Id20,
pub leaf: &'a LeafBucket,
}
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "a map with key \"flat\"")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
let mut data: Option<Vec<BucketTreeNode>> = None;
loop {
match map.next_key::<String>()?.as_deref() {
Some("flat") => {
let buckets = map.next_value::<Vec<BucketTreeNode>>()?;
data = Some(buckets)
}
Some(_) => {
map.next_value::<serde::de::IgnoredAny>()?;
}
None => {
use serde::de::Error;
match data.take() {
Some(data) => return Ok(BucketTree { data }),
None => return Err(A::Error::missing_field("flat")),
}
}
}
}
}
}
deserializer.deserialize_map(Visitor)
impl<'a> BucketTreeIteratorItem<'a> {
pub fn random_within(&self) -> Id20 {
generate_random_id(self.start, self.bits)
}
}
impl Serialize for BucketTree {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
struct Node<'a> {
tree: &'a BucketTree,
idx: usize,
}
impl<'a> Serialize for Node<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serializer.serialize_map(None)?;
let node = &self.tree.data[self.idx];
map.serialize_entry("bits", &node.bits)?;
map.serialize_entry("start", &node.start.as_string())?;
map.serialize_entry("end", &node.end_inclusive.as_string())?;
match &node.data {
BucketTreeNodeData::Leaf(nodes) => {
map.serialize_entry("nodes", &nodes)?;
}
BucketTreeNodeData::LeftRight(l, r) => {
map.serialize_entry(
"left",
&(Node {
idx: *l,
tree: self.tree,
}),
)?;
map.serialize_entry(
"right",
&(Node {
idx: *r,
tree: self.tree,
}),
)?;
}
}
map.end()
}
}
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("nodes_len", &self.data.len())?;
map.serialize_entry("nodes_capacity", &self.data.capacity())?;
map.serialize_entry("node_memory_bytes", &std::mem::size_of::<BucketTreeNode>())?;
map.serialize_entry(
"nodes_memory_bytes",
&(std::mem::size_of::<BucketTreeNode>() * self.data.capacity()),
)?;
map.serialize_entry("tree", &Node { tree: self, idx: 0 })?;
map.serialize_entry("flat", &self.data)?;
map.end()
}
}
pub struct BucketTreeIterator<'a> {
struct BucketTreeIterator<'a> {
tree: &'a BucketTree,
current: std::slice::Iter<'a, RoutingTableNode>,
queue: Vec<usize>,
}
impl<'a> BucketTreeIterator<'a> {
fn new(tree: &'a BucketTree) -> Self {
let mut queue = Vec::new();
let mut current = 0;
let current_slice = loop {
match &tree.data[current].data {
BucketTreeNodeData::Leaf(nodes) => break nodes.iter(),
BucketTreeNodeData::LeftRight(left, right) => {
queue.push(*right);
current = *left;
}
}
};
BucketTreeIterator {
tree,
current: current_slice,
queue,
}
let queue = vec![0];
BucketTreeIterator { tree, queue }
}
}
impl<'a> Iterator for BucketTreeIterator<'a> {
type Item = &'a RoutingTableNode;
type Item = BucketTreeIteratorItem<'a>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(v) = self.current.next() {
return Some(v);
};
loop {
let idx = self.queue.pop()?;
match &self.tree.data[idx].data {
BucketTreeNodeData::Leaf(nodes) => {
self.current = nodes.iter();
match self.current.next() {
Some(v) => return Some(v),
None => continue,
match self.tree.data.get(idx) {
Some(node) => match &node.data {
BucketTreeNodeData::Leaf(leaf) => {
return Some(BucketTreeIteratorItem {
bits: node.bits,
start: &node.start,
end_inclusive: &node.end_inclusive,
leaf,
});
}
}
BucketTreeNodeData::LeftRight(left, right) => {
self.queue.push(*right);
self.queue.push(*left);
continue;
}
BucketTreeNodeData::LeftRight(left, right) => {
self.queue.push(*right);
self.queue.push(*left);
continue;
}
},
None => continue,
}
}
}
}
pub fn generate_random_id(start: &Id20, bits: u8) -> Id20 {
let mut data = [0u8; 20];
rand::thread_rng().fill_bytes(&mut data);
let mut data = Id20(data);
let remaining_bits = 160 - bits;
for bit in 0..remaining_bits {
data.set_bit(bit, start.get_bit(bit));
}
data
}
fn compute_split_start_end(
start: Id20,
end_inclusive: Id20,
@ -240,20 +195,27 @@ pub enum InsertResult {
}
impl BucketTree {
pub fn new() -> Self {
pub fn new(max_size: usize) -> Self {
BucketTree {
data: vec![BucketTreeNode {
bits: 160,
start: Id20([0u8; 20]),
end_inclusive: Id20([0xff; 20]),
data: BucketTreeNodeData::Leaf(Vec::new()),
data: BucketTreeNodeData::Leaf(Default::default()),
}],
size: 0,
max_size,
}
}
pub fn iter(&self) -> BucketTreeIterator<'_> {
fn iter_leaves(&self) -> BucketTreeIterator<'_> {
BucketTreeIterator::new(self)
}
fn iter(&self) -> impl Iterator<Item = &'_ RoutingTableNode> + '_ {
self.iter_leaves().flat_map(|l| l.leaf.nodes.iter())
}
fn get_leaf(&self, id: &Id20) -> usize {
let mut idx = 0;
loop {
@ -272,10 +234,16 @@ impl BucketTree {
}
}
pub fn get_mut(&mut self, id: &Id20) -> Option<&mut RoutingTableNode> {
pub fn get_mut(&mut self, id: &Id20, refresh: bool) -> Option<&mut RoutingTableNode> {
let idx = self.get_leaf(id);
match &mut self.data[idx].data {
BucketTreeNodeData::Leaf(nodes) => nodes.iter_mut().find(|b| b.id == *id),
BucketTreeNodeData::Leaf(leaf) => {
let r = leaf.nodes.iter_mut().find(|b| b.id == *id);
if r.is_some() && refresh {
leaf.last_refreshed = Instant::now()
}
r
}
BucketTreeNodeData::LeftRight(_, _) => unreachable!(),
}
}
@ -304,7 +272,7 @@ impl BucketTree {
BucketTreeNodeData::LeftRight(_, _) => unreachable!(),
};
// if already found, quit
if nodes.iter().any(|r| r.id == id) {
if nodes.nodes.iter().any(|r| r.id == id) {
return InsertResult::WasExisting;
}
@ -313,26 +281,40 @@ impl BucketTree {
addr,
last_request: None,
last_response: None,
outstanding_queries_in_a_row: 0,
last_query: None,
errors_in_a_row: 0,
};
if nodes.len() < 8 {
nodes.push(new_node);
nodes.sort_by_key(|n| n.id);
return InsertResult::Added;
}
// Try replace a bad node
if let Some(bad_node) = nodes
.nodes
.iter_mut()
.find(|r| matches!(r.status(), NodeStatus::Bad))
{
std::mem::swap(bad_node, &mut new_node);
nodes.sort_by_key(|n| n.id);
nodes.nodes.sort_by_key(|n| n.id);
debug!("replaced bad node {:?}", new_node);
nodes.last_refreshed = Instant::now();
return InsertResult::ReplacedBad(new_node);
}
// if max size reached, don't bother
if self.size == self.max_size {
trace!(
"can't add node to routing table, max size of {} reached",
self.max_size
);
return InsertResult::Ignored;
}
if nodes.nodes.len() < 8 {
nodes.nodes.push(new_node);
nodes.nodes.sort_by_key(|n| n.id);
nodes.last_refreshed = Instant::now();
self.size += 1;
return InsertResult::Added;
}
// if our id is not inside, don't bother.
if *self_id < leaf.start || *self_id > leaf.end_inclusive {
return InsertResult::Ignored;
@ -342,7 +324,7 @@ impl BucketTree {
let ((ls, le), (rs, re)) =
compute_split_start_end(leaf.start, leaf.end_inclusive, leaf.bits);
let (mut ld, mut rd) = (Vec::new(), Vec::new());
for node in nodes.drain(0..) {
for node in nodes.nodes.drain(0..) {
if node.id < rs {
ld.push(node);
} else {
@ -354,13 +336,19 @@ impl BucketTree {
bits: leaf.bits - 1,
start: ls,
end_inclusive: le,
data: BucketTreeNodeData::Leaf(ld),
data: BucketTreeNodeData::Leaf(LeafBucket {
nodes: ld,
..Default::default()
}),
};
let right = BucketTreeNode {
bits: leaf.bits - 1,
start: rs,
end_inclusive: re,
data: BucketTreeNodeData::Leaf(rd),
data: BucketTreeNodeData::Leaf(LeafBucket {
nodes: rd,
..Default::default()
}),
};
let left_idx = {
@ -384,13 +372,7 @@ impl BucketTree {
}
}
impl Default for BucketTree {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct RoutingTableNode {
#[serde(serialize_with = "crate::utils::serialize_id20")]
id: Id20,
@ -400,9 +382,35 @@ pub struct RoutingTableNode {
#[serde(skip)]
last_response: Option<Instant>,
#[serde(skip)]
outstanding_queries_in_a_row: usize,
last_query: Option<Instant>,
#[serde(skip)]
errors_in_a_row: usize,
}
impl Serialize for RoutingTableNode {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_struct("RoutingTableNode", 3)?;
s.serialize_field("id", &self.id.as_string())?;
s.serialize_field("addr", &self.addr)?;
s.serialize_field("status", &self.status())?;
if let Some(l) = self.last_request {
s.serialize_field("last_request_ago", &l.elapsed())?;
}
if let Some(l) = self.last_response {
s.serialize_field("last_response_ago", &l.elapsed())?;
}
if let Some(l) = self.last_query {
s.serialize_field("last_query_ago", &l.elapsed())?;
}
s.serialize_field("errors_in_a_row", &self.errors_in_a_row)?;
s.end()
}
}
#[derive(Serialize, Debug)]
pub enum NodeStatus {
Good,
Questionable,
@ -418,24 +426,39 @@ impl RoutingTableNode {
self.addr
}
pub fn status(&self) -> NodeStatus {
// TODO: this is just a stub with simpler logic
let last_request = match self.last_request {
Some(v) => v,
None => return NodeStatus::Unknown,
};
if self.outstanding_queries_in_a_row > 0 && last_request.elapsed() > Duration::from_secs(10)
{
return NodeStatus::Bad;
match (self.last_request, self.last_response, self.last_query) {
// Nodes become bad when they fail to respond to multiple queries in a row.
(Some(_), _, _) if self.errors_in_a_row >= 2 => NodeStatus::Bad,
// A good node is a node has responded to one of our queries within the last 15 minutes.
// A node is also good if it has ever responded to one of our queries and has sent
// us a query within the last 15 minutes.
(Some(_), Some(last_incoming), _) | (Some(_), Some(_), Some(last_incoming))
if last_incoming.elapsed() < INACTIVITY_TIMEOUT =>
{
NodeStatus::Good
}
// After 15 minutes of inactivity, a node becomes questionable.
// The moment we send a request to it, it stops becoming questionable and becomes Unknown / Bad.
(last_outgoing, _, Some(last_incoming)) | (last_outgoing, Some(last_incoming), _)
if last_incoming.elapsed() > INACTIVITY_TIMEOUT
&& last_outgoing
.map(|e| e.elapsed() > INACTIVITY_TIMEOUT)
.unwrap_or(true) =>
{
NodeStatus::Questionable
}
_ => NodeStatus::Unknown,
}
if self.last_response.is_some() {
return NodeStatus::Good;
}
NodeStatus::Questionable
}
pub fn mark_outgoing_request(&mut self) {
self.last_request = Some(Instant::now());
self.outstanding_queries_in_a_row += 1;
}
pub fn mark_last_query(&mut self) {
self.last_query = Some(Instant::now());
}
pub fn mark_response(&mut self) {
@ -444,7 +467,11 @@ impl RoutingTableNode {
if self.last_request.is_none() {
self.last_request = Some(now);
}
self.outstanding_queries_in_a_row = 0;
self.errors_in_a_row = 0;
}
pub fn mark_error(&mut self) {
self.errors_in_a_row += 1;
}
}
@ -457,10 +484,12 @@ pub struct RoutingTable {
}
impl RoutingTable {
pub fn new(id: Id20) -> Self {
const DEFAULT_MAX_SIZE: usize = 512;
pub fn new(id: Id20, max_size: Option<usize>) -> Self {
Self {
id,
buckets: BucketTree::new(),
buckets: BucketTree::new(max_size.unwrap_or(Self::DEFAULT_MAX_SIZE)),
size: 0,
}
}
@ -475,10 +504,27 @@ impl RoutingTable {
for node in self.buckets.iter() {
result.push(node);
}
result.sort_by_key(|n| id.distance(&n.id));
result.sort_by_key(|n| {
// Query decent nodes first.
let status = match n.status() {
NodeStatus::Good => 0,
NodeStatus::Questionable => 0,
NodeStatus::Unknown => 2,
NodeStatus::Bad => 3,
};
(status, id.distance(&n.id))
});
result
}
pub fn iter_buckets(&self) -> impl Iterator<Item = BucketTreeIteratorItem<'_>> + '_ {
self.buckets.iter_leaves()
}
pub fn iter(&self) -> impl Iterator<Item = &'_ RoutingTableNode> + '_ {
self.buckets.iter()
}
pub fn add_node(&mut self, id: Id20, addr: SocketAddr) -> InsertResult {
let res = self.buckets.add_node(&self.id, id, addr);
let replaced = match &res {
@ -493,7 +539,7 @@ impl RoutingTable {
res
}
pub fn mark_outgoing_request(&mut self, id: &Id20) -> bool {
let r = match self.buckets.get_mut(id) {
let r = match self.buckets.get_mut(id, false) {
Some(r) => r,
None => return false,
};
@ -502,13 +548,31 @@ impl RoutingTable {
}
pub fn mark_response(&mut self, id: &Id20) -> bool {
let r = match self.buckets.get_mut(id) {
let r = match self.buckets.get_mut(id, true) {
Some(r) => r,
None => return false,
};
r.mark_response();
true
}
pub fn mark_error(&mut self, id: &Id20) -> bool {
let r = match self.buckets.get_mut(id, false) {
Some(r) => r,
None => return false,
};
r.mark_error();
true
}
pub fn mark_last_query(&mut self, id: &Id20) -> bool {
let r = match self.buckets.get_mut(id, false) {
Some(r) => r,
None => return false,
};
r.mark_last_query();
true
}
}
#[cfg(test)]
@ -524,7 +588,7 @@ mod tests {
use crate::routing_table::compute_split_start_end;
use super::RoutingTable;
use super::{generate_random_id, RoutingTable};
#[test]
fn compute_split_start_end_root() {
@ -599,7 +663,7 @@ mod tests {
fn generate_table(length: Option<usize>) -> RoutingTable {
let my_id = random_id_20();
let mut rtable = RoutingTable::new(my_id);
let mut rtable = RoutingTable::new(my_id, None);
for _ in 0..length.unwrap_or(16536) {
let other_id = random_id_20();
let addr = generate_socket_addr();
@ -632,4 +696,15 @@ mod tests {
let v = serde_json::to_vec(&table).unwrap();
let _: RoutingTable = serde_json::from_reader(Cursor::new(v)).unwrap();
}
#[test]
fn test_generate_random_id() {
let start = Id20::from_str("3000000000000000000000000000000000000000").unwrap();
let end = Id20::from_str("3fffffffffffffffffffffffffffffffffffffff").unwrap();
let bits = 156;
for _ in 0..100 {
let id = dbg!(generate_random_id(&start, bits));
assert!(id >= start && id <= end, "{:?}", id);
}
}
}

View file

@ -61,6 +61,7 @@ url = "2"
hex = "0.4"
backoff = "0.4.0"
dashmap = "5.5.3"
base64 = "0.21.5"
[dev-dependencies]
futures = {version = "0.3"}

View file

@ -26,15 +26,12 @@ pub enum ReadMetainfoResult<Rx> {
pub async fn read_metainfo_from_peer_receiver<A: Stream<Item = SocketAddr> + Unpin>(
peer_id: Id20,
info_hash: Id20,
mut addrs: A,
initial_addrs: Vec<SocketAddr>,
addrs_stream: A,
peer_connection_options: Option<PeerConnectionOptions>,
) -> ReadMetainfoResult<A> {
let mut seen = HashSet::<SocketAddr>::new();
let first_addr = match addrs.next().await {
Some(addr) => addr,
None => return ReadMetainfoResult::ChannelClosed { seen },
};
seen.insert(first_addr);
let mut addrs = addrs_stream;
let semaphore = tokio::sync::Semaphore::new(128);
@ -57,7 +54,11 @@ pub async fn read_metainfo_from_peer_receiver<A: Stream<Item = SocketAddr> + Unp
};
let mut unordered = FuturesUnordered::new();
unordered.push(read_info_guarded(first_addr));
for a in initial_addrs {
seen.insert(a);
unordered.push(read_info_guarded(a));
}
loop {
tokio::select! {
@ -86,7 +87,7 @@ pub async fn read_metainfo_from_peer_receiver<A: Stream<Item = SocketAddr> + Unp
#[cfg(test)]
mod tests {
use dht::{Dht, Id20};
use dht::{DhtBuilder, Id20};
use librqbit_core::peer_id::generate_peer_id;
use super::*;
@ -106,10 +107,11 @@ mod tests {
init_logging();
let info_hash = Id20::from_str("cf3ea75e2ebbd30e0da6e6e215e2226bf35f2e33").unwrap();
let dht = Dht::new().await.unwrap();
let dht = DhtBuilder::new().await.unwrap();
let peer_rx = dht.get_peers(info_hash).unwrap();
let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx, None).await {
match read_metainfo_from_peer_receiver(peer_id, info_hash, Vec::new(), peer_rx, None).await
{
ReadMetainfoResult::Found { info, .. } => dbg!(info),
ReadMetainfoResult::ChannelClosed { .. } => todo!("should not have happened"),
};

View file

@ -241,9 +241,13 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
let to_read_in_file =
std::cmp::min(file_remaining_len, piece_remaining_bytes as u64) as usize;
let mut file_g = self.files[file_idx].lock();
debug!(
trace!(
"piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}",
piece_index, who_sent, file_idx, absolute_offset, &last_received_chunk
piece_index,
who_sent,
file_idx,
absolute_offset,
&last_received_chunk
);
file_g
.seek(SeekFrom::Start(absolute_offset))
@ -269,7 +273,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
match self.torrent.compare_hash(piece_index.get(), h.finish()) {
Some(true) => {
debug!("piece={} hash matches", piece_index);
trace!("piece={} hash matches", piece_index);
Ok(true)
}
Some(false) => {
@ -305,9 +309,13 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize;
let mut file_g = self.files[file_idx].lock();
debug!(
trace!(
"piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}",
chunk_info.piece_index, who_sent, file_idx, absolute_offset, &chunk_info
chunk_info.piece_index,
who_sent,
file_idx,
absolute_offset,
&chunk_info
);
file_g
.seek(SeekFrom::Start(absolute_offset))
@ -354,7 +362,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
let to_write = std::cmp::min(buf.len(), remaining_len as usize);
let mut file_g = self.files[file_idx].lock();
debug!(
trace!(
"piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}",
chunk_info.piece_index,
chunk_info,

View file

@ -11,15 +11,19 @@ use librqbit_core::id20::Id20;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{info, warn};
use axum::Router;
use crate::http_api_error::{ApiError, ApiErrorExt};
use crate::peer_connection::PeerConnectionOptions;
use crate::session::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId,
SUPPORTED_SCHEMES,
};
use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot};
use crate::torrent_state::stats::{LiveStats, TorrentStats};
@ -85,10 +89,29 @@ impl HttpApi {
Query(params): Query<TorrentAddQueryParams>,
data: Bytes,
) -> Result<impl IntoResponse> {
let is_url = params.is_url;
let opts = params.into_add_torrent_options();
let add = match String::from_utf8(data.to_vec()) {
Ok(s) => AddTorrent::Url(s.into()),
Err(e) => AddTorrent::TorrentFileBytes(e.into_bytes().into()),
let data = data.to_vec();
let add = match is_url {
Some(true) => AddTorrent::Url(
String::from_utf8(data)
.context("invalid utf-8 for passed URL")?
.into(),
),
Some(false) => AddTorrent::TorrentFileBytes(data.into()),
// Guess the format.
None if SUPPORTED_SCHEMES
.iter()
.any(|s| data.starts_with(s.as_bytes())) =>
{
AddTorrent::Url(
String::from_utf8(data)
.context("invalid utf-8 for passed URL")?
.into(),
)
}
_ => AddTorrent::TorrentFileBytes(data.into()),
};
state.api_add_torrent(add, Some(opts)).await.map(axum::Json)
}
@ -279,6 +302,7 @@ pub struct TorrentDetailsResponse {
pub struct ApiAddTorrentResponse {
pub id: Option<usize>,
pub details: TorrentDetailsResponse,
pub seen_peers: Option<Vec<SocketAddr>>,
}
pub struct OnlyFiles(Vec<usize>);
@ -322,13 +346,48 @@ impl<'de> Deserialize<'de> for OnlyFiles {
}
}
#[derive(Serialize, Deserialize)]
pub struct InitialPeers(pub Vec<SocketAddr>);
impl<'de> Deserialize<'de> for InitialPeers {
fn deserialize<D>(deserializer: D) -> std::prelude::v1::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let string = String::deserialize(deserializer)?;
let mut addrs = Vec::new();
for addr_str in string.split(',').filter(|s| !s.is_empty()) {
addrs.push(SocketAddr::from_str(addr_str).map_err(D::Error::custom)?);
}
Ok(InitialPeers(addrs))
}
}
impl Serialize for InitialPeers {
fn serialize<S>(&self, serializer: S) -> std::prelude::v1::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.0
.iter()
.map(|s| s.to_string())
.join(",")
.serialize(serializer)
}
}
#[derive(Serialize, Deserialize, Default)]
pub struct TorrentAddQueryParams {
pub overwrite: Option<bool>,
pub output_folder: Option<String>,
pub sub_folder: Option<String>,
pub only_files_regex: Option<String>,
pub only_files: Option<OnlyFiles>,
pub peer_connect_timeout: Option<u64>,
pub peer_read_write_timeout: Option<u64>,
pub initial_peers: Option<InitialPeers>,
// Will force interpreting the content as a URL.
pub is_url: Option<bool>,
pub list_only: Option<bool>,
}
@ -341,6 +400,12 @@ impl TorrentAddQueryParams {
output_folder: self.output_folder,
sub_folder: self.sub_folder,
list_only: self.list_only.unwrap_or(false),
initial_peers: self.initial_peers.map(|i| i.0),
peer_opts: Some(PeerConnectionOptions {
connect_timeout: self.peer_connect_timeout.map(Duration::from_secs),
read_write_timeout: self.peer_read_write_timeout.map(Duration::from_secs),
..Default::default()
}),
..Default::default()
}
}
@ -462,8 +527,10 @@ impl ApiInternal {
info_hash,
info,
only_files,
seen_peers,
}) => ApiAddTorrentResponse {
id: None,
seen_peers: Some(seen_peers),
details: make_torrent_details(&info_hash, &info, only_files.as_deref())
.context("error making torrent details")?,
},
@ -477,6 +544,7 @@ impl ApiInternal {
ApiAddTorrentResponse {
id: Some(id),
details,
seen_peers: None,
}
}
};

View file

@ -91,6 +91,7 @@ impl HttpApiClient {
output_folder: opts.output_folder,
sub_folder: opts.sub_folder,
list_only: Some(opts.list_only),
..Default::default()
};
let qs = serde_urlencoded::to_string(&params).unwrap();
let url = format!("{}torrents?{}", &self.base_url, qs);

View file

@ -118,7 +118,7 @@ impl std::fmt::Display for ApiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.kind {
ApiErrorKind::TorrentNotFound(idx) => write!(f, "torrent {idx} not found"),
ApiErrorKind::Other(err) => write!(f, "{err:#}"),
ApiErrorKind::Other(err) => write!(f, "{err:?}"),
ApiErrorKind::DhtDisabled => write!(f, "DHT is disabled"),
}
}

View file

@ -13,7 +13,7 @@ use peer_binary_protocol::{
MessageOwned, PIECE_MESSAGE_DEFAULT_LEN,
};
use tokio::time::timeout;
use tracing::{debug, trace};
use tracing::trace;
use crate::spawn_utils::BlockingSpawner;
@ -38,7 +38,7 @@ pub enum WriterRequest {
Disconnect,
}
#[derive(Default, Copy, Clone)]
#[derive(Default, Debug, Copy, Clone)]
pub struct PeerConnectionOptions {
pub connect_timeout: Option<Duration>,
pub read_write_timeout: Option<Duration>,
@ -155,7 +155,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let (h, size) = Handshake::deserialize(&read_buf[..read_so_far])
.map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?;
debug!("connected: id={:?}", try_decode_peer_id(Id20(h.peer_id)));
trace!("connected: id={:?}", try_decode_peer_id(Id20(h.peer_id)));
if h.info_hash != self.info_hash.0 {
anyhow::bail!("info hash does not match");
}
@ -210,7 +210,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
.await
.context("error writing bitfield to peer")?;
debug!("sent bitfield");
trace!("sent bitfield");
}
loop {
@ -249,7 +249,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
}
};
debug!("sending: {:?}, length={}", &req, len);
trace!("sending: {:?}, length={}", &req, len);
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
.await
@ -290,7 +290,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
r = reader => {r}
r = writer => {r}
};
debug!("either reader or writer are done, exiting");
trace!("either reader or writer are done, exiting");
r
}
}

View file

@ -15,7 +15,7 @@ use peer_binary_protocol::{
};
use sha1w::{ISha1, Sha1};
use tokio::sync::mpsc::UnboundedSender;
use tracing::debug;
use tracing::trace;
use crate::{
peer_connection::{
@ -153,7 +153,7 @@ impl PeerConnectionHandler for Handler {
}
fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
debug!("{}: received message: {:?}", self.addr, msg);
trace!("{}: received message: {:?}", self.addr, msg);
if let Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Data {
piece,

View file

@ -10,8 +10,9 @@ use std::{
};
use anyhow::{bail, Context};
use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::ByteString;
use dht::{Dht, Id20, PersistentDht, PersistentDhtConfig};
use dht::{Dht, DhtBuilder, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream};
use librqbit_core::{
magnet::Magnet,
peer_id::generate_peer_id,
@ -19,8 +20,7 @@ use librqbit_core::{
};
use parking_lot::RwLock;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tracing::{debug, error, error_span, info, warn};
use crate::{
@ -36,12 +36,27 @@ pub type TorrentId = usize;
#[derive(Default)]
pub struct SessionDatabase {
next_id: usize,
torrents: HashMap<usize, ManagedTorrentHandle>,
next_id: TorrentId,
torrents: HashMap<TorrentId, ManagedTorrentHandle>,
}
impl SessionDatabase {
fn add_torrent(&mut self, torrent: ManagedTorrentHandle) -> TorrentId {
fn add_torrent(
&mut self,
torrent: ManagedTorrentHandle,
preferred_id: Option<TorrentId>,
) -> TorrentId {
match preferred_id {
Some(id) if self.torrents.contains_key(&id) => {
warn!("id {id} already present in DB, ignoring \"preferred_id\" parameter");
}
Some(id) => {
self.torrents.insert(id, torrent);
self.next_id = id.max(self.next_id).wrapping_add(1);
return id;
}
_ => {}
}
let idx = self.next_id;
self.torrents.insert(idx, torrent);
self.next_id += 1;
@ -50,20 +65,27 @@ impl SessionDatabase {
fn serialize(&self) -> SerializedSessionDatabase {
SerializedSessionDatabase {
torrents: self
torrents_v2: self
.torrents
.values()
.map(|torrent| SerializedTorrent {
trackers: torrent
.info()
.trackers
.iter()
.map(|u| u.to_string())
.collect(),
info_hash: torrent.info_hash().as_string(),
only_files: torrent.only_files.clone(),
is_paused: torrent.with_state(|s| matches!(s, ManagedTorrentState::Paused(_))),
output_folder: torrent.info().out_dir.clone(),
.iter()
.map(|(id, torrent)| {
(
*id,
SerializedTorrent {
trackers: torrent
.info()
.trackers
.iter()
.map(|u| u.to_string())
.collect(),
info_hash: torrent.info_hash().as_string(),
info: torrent.info().info.clone(),
only_files: torrent.only_files.clone(),
is_paused: torrent
.with_state(|s| matches!(s, ManagedTorrentState::Paused(_))),
output_folder: torrent.info().out_dir.clone(),
},
)
})
.collect(),
}
@ -73,15 +95,46 @@ impl SessionDatabase {
#[derive(Serialize, Deserialize)]
struct SerializedTorrent {
info_hash: String,
#[serde(
serialize_with = "serialize_torrent",
deserialize_with = "deserialize_torrent"
)]
info: TorrentMetaV1Info<ByteString>,
trackers: HashSet<String>,
output_folder: PathBuf,
only_files: Option<Vec<usize>>,
is_paused: bool,
}
fn serialize_torrent<S>(t: &TorrentMetaV1Info<ByteString>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use base64::{engine::general_purpose, Engine as _};
use serde::ser::Error;
let mut writer = Vec::new();
bencode_serialize_to_writer(t, &mut writer).map_err(S::Error::custom)?;
let s = general_purpose::STANDARD_NO_PAD.encode(&writer);
s.serialize(serializer)
}
fn deserialize_torrent<'de, D>(deserializer: D) -> Result<TorrentMetaV1Info<ByteString>, D::Error>
where
D: Deserializer<'de>,
{
use base64::{engine::general_purpose, Engine as _};
use serde::de::Error;
let s = String::deserialize(deserializer)?;
let b = general_purpose::STANDARD_NO_PAD
.decode(s)
.map_err(D::Error::custom)?;
TorrentMetaV1Info::<ByteString>::deserialize(&mut BencodeDeserializer::new_from_buf(&b))
.map_err(D::Error::custom)
}
#[derive(Serialize, Deserialize)]
struct SerializedSessionDatabase {
torrents: Vec<SerializedTorrent>,
torrents_v2: HashMap<usize, SerializedTorrent>,
}
pub struct Session {
@ -97,14 +150,14 @@ pub struct Session {
async fn torrent_from_url(url: &str) -> anyhow::Result<TorrentMetaV1Owned> {
let response = reqwest::get(url)
.await
.with_context(|| format!("error downloading torrent metadata from {url}"))?;
.context("error downloading torrent metadata")?;
if !response.status().is_success() {
anyhow::bail!("GET {} returned {}", url, response.status())
}
let b = response
.bytes()
.await
.with_context(|| format!("error reading repsonse body from {url}"))?;
.with_context(|| format!("error reading response body from {url}"))?;
torrent_from_bytes(&b).context("error decoding torrent")
}
@ -139,12 +192,16 @@ pub struct AddTorrentOptions {
pub sub_folder: Option<String>,
pub peer_opts: Option<PeerConnectionOptions>,
pub force_tracker_interval: Option<Duration>,
pub initial_peers: Option<Vec<SocketAddr>>,
// This is used to restore the session.
pub preferred_id: Option<usize>,
}
pub struct ListOnlyResponse {
pub info_hash: Id20,
pub info: TorrentMetaV1Info<ByteString>,
pub only_files: Option<Vec<usize>>,
pub seen_peers: Vec<SocketAddr>,
}
pub enum AddTorrentResponse {
@ -171,6 +228,7 @@ pub fn read_local_file_including_stdin(filename: &str) -> anyhow::Result<Vec<u8>
pub enum AddTorrent<'a> {
Url(Cow<'a, str>),
TorrentFileBytes(Cow<'a, [u8]>),
TorrentInfo(Box<TorrentMetaV1Owned>),
}
impl<'a> AddTorrent<'a> {
@ -201,6 +259,7 @@ impl<'a> AddTorrent<'a> {
match self {
Self::Url(s) => s.into_owned().into_bytes(),
Self::TorrentFileBytes(b) => b.into_owned(),
Self::TorrentInfo(_) => unimplemented!(),
}
}
}
@ -234,7 +293,7 @@ impl Session {
None
} else {
let dht = if opts.disable_dht_persistence {
Dht::new().await
DhtBuilder::new().await
} else {
PersistentDht::create(opts.dht_config).await
}
@ -295,6 +354,22 @@ impl Session {
self.dht.as_ref()
}
fn merge_peer_opts(&self, other: Option<PeerConnectionOptions>) -> PeerConnectionOptions {
let other = match other {
Some(o) => o,
None => self.peer_opts,
};
PeerConnectionOptions {
connect_timeout: other.connect_timeout.or(self.peer_opts.connect_timeout),
read_write_timeout: other
.read_write_timeout
.or(self.peer_opts.read_write_timeout),
keep_alive_interval: other
.keep_alive_interval
.or(self.peer_opts.keep_alive_interval),
}
}
async fn populate_from_stored(self: &Arc<Self>) -> anyhow::Result<()> {
let mut rdr = match std::fs::File::open(&self.persistence_filename) {
Ok(f) => BufReader::new(f),
@ -309,18 +384,33 @@ impl Session {
let db: SerializedSessionDatabase =
serde_json::from_reader(&mut rdr).context("error deserializing session database")?;
let mut futures = Vec::new();
for storrent in db.torrents.into_iter() {
let magnet = Magnet {
info_hash: Id20::from_str(&storrent.info_hash)
.context("error deserializing info_hash")?,
trackers: storrent.trackers.into_iter().collect(),
for (id, storrent) in db.torrents_v2.into_iter() {
let trackers: Vec<ByteString> = storrent
.trackers
.into_iter()
.map(|t| ByteString(t.into_bytes()))
.collect();
let info = TorrentMetaV1Owned {
announce: trackers
.get(0)
.cloned()
.unwrap_or_else(|| ByteString(b"http://retracker.local/announce".into())),
announce_list: vec![trackers],
info: storrent.info,
comment: None,
created_by: None,
encoding: None,
publisher: None,
publisher_url: None,
creation_date: None,
info_hash: Id20::from_str(&storrent.info_hash)?,
};
futures.push({
let session = self.clone();
async move {
session
.add_torrent(
AddTorrent::Url(Cow::Owned(magnet.to_string())),
AddTorrent::TorrentInfo(Box::new(info)),
Some(AddTorrentOptions {
paused: storrent.is_paused,
output_folder: Some(
@ -332,6 +422,7 @@ impl Session {
),
only_files: storrent.only_files,
overwrite: true,
preferred_id: Some(id),
..Default::default()
}),
)
@ -376,7 +467,7 @@ impl Session {
pub async fn add_torrent(
&self,
add: impl Into<AddTorrent<'_>>,
add: AddTorrent<'_>,
opts: Option<AddTorrentOptions>,
) -> anyhow::Result<AddTorrentResponse> {
// Magnet links are different in that we first need to discover the metadata.
@ -385,7 +476,7 @@ impl Session {
let opts = opts.unwrap_or_default();
let (info_hash, info, dht_rx, trackers, initial_peers) = match add.into() {
let (info_hash, info, dht_rx, trackers, initial_peers) = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
let Magnet {
info_hash,
@ -413,8 +504,9 @@ impl Session {
let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver(
self.peer_id,
info_hash,
opts.initial_peers.clone().unwrap_or_default(),
dht_rx,
Some(self.peer_opts),
Some(self.merge_peer_opts(opts.peer_opts)),
)
.await
{
@ -424,7 +516,17 @@ impl Session {
}
};
debug!("received result from DHT: {:?}", info);
(info_hash, info, Some(dht_rx), trackers, initial_peers)
(
info_hash,
info,
if opts.paused || opts.list_only {
None
} else {
Some(dht_rx)
},
trackers,
initial_peers,
)
}
other => {
let torrent = match other {
@ -442,14 +544,15 @@ impl Session {
AddTorrent::TorrentFileBytes(bytes) => {
torrent_from_bytes(&bytes).context("error decoding torrent")?
}
AddTorrent::TorrentInfo(t) => *t,
};
let dht_rx = match self.dht.as_ref() {
Some(dht) => {
Some(dht) if !opts.paused && !opts.list_only => {
debug!("reading peers for {:?} from DHT", torrent.info_hash);
Some(dht.get_peers(torrent.info_hash)?)
}
None => None,
_ => None,
};
let trackers = torrent
.iter_announce()
@ -496,7 +599,7 @@ impl Session {
&self,
info_hash: Id20,
info: TorrentMetaV1Info<ByteString>,
dht_peer_rx: Option<impl StreamExt<Item = SocketAddr> + Unpin + Send + Sync + 'static>,
dht_peer_rx: Option<RequestPeersStream>,
initial_peers: Vec<SocketAddr>,
trackers: Vec<reqwest::Url>,
opts: AddTorrentOptions,
@ -541,6 +644,7 @@ impl Session {
info_hash,
info,
only_files,
seen_peers: initial_peers,
}));
}
@ -565,11 +669,13 @@ impl Session {
builder.force_tracker_interval(interval);
}
if let Some(t) = opts.peer_opts.unwrap_or(self.peer_opts).connect_timeout {
let peer_opts = self.merge_peer_opts(opts.peer_opts);
if let Some(t) = peer_opts.connect_timeout {
builder.peer_connect_timeout(t);
}
if let Some(t) = opts.peer_opts.unwrap_or(self.peer_opts).read_write_timeout {
if let Some(t) = peer_opts.read_write_timeout {
builder.peer_read_write_timeout(t);
}
@ -582,7 +688,7 @@ impl Session {
let next_id = g.torrents.len();
let managed_torrent =
builder.build(error_span!(parent: None, "torrent", id = next_id))?;
let id = g.add_torrent(managed_torrent.clone());
let id = g.add_torrent(managed_torrent.clone(), opts.preferred_id);
(managed_torrent, id)
};

View file

@ -510,7 +510,7 @@ impl TorrentStateLive {
});
match result {
Some(true) => {
debug!("set peer to live")
trace!("set peer to live")
}
Some(false) => debug!("can't set peer live, it was in wrong state"),
None => debug!("can't set peer live, it disappeared"),
@ -750,7 +750,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
Message::Interested => self.on_peer_interested(),
Message::Piece(piece) => self.on_received_piece(piece).context("on_received_piece")?,
Message::KeepAlive => {
debug!("keepalive received");
trace!("keepalive received");
}
Message::Have(h) => self.on_have(h),
Message::NotInterested => {
@ -767,7 +767,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
let g = self.state.lock_read("serialize_bitfield_message_to_buf");
let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_raw_slice()));
let len = msg.serialize(buf, None)?;
debug!("sending: {:?}, length={}", &msg, len);
trace!("sending: {:?}, length={}", &msg, len);
Ok(len)
}
@ -841,7 +841,7 @@ impl PeerHandler {
let _error = match error {
Some(e) => e,
None => {
debug!("peer died without errors, not re-queueing");
trace!("peer died without errors, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return Ok(());
}
@ -850,7 +850,7 @@ impl PeerHandler {
self.counters.errors.fetch_add(1, Ordering::Relaxed);
if self.state.is_finished() {
debug!("torrent finished, not re-queueing");
trace!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return Ok(());
}
@ -1014,7 +1014,7 @@ impl PeerHandler {
// Theoretically, this could be done in the sending code, so that it reads straight into
// the send buffer.
let request = WriterRequest::ReadChunkRequest(chunk_info);
debug!("sending {:?}", &request);
trace!("sending {:?}", &request);
Ok::<_, anyhow::Error>(self.tx.send(request)?)
}
@ -1034,7 +1034,7 @@ impl PeerHandler {
return;
}
};
debug!("updated bitfield with have={}", have);
trace!("updated bitfield with have={}", have);
});
}
@ -1168,7 +1168,7 @@ impl PeerHandler {
}
fn on_peer_interested(&self) {
debug!("peer is interested");
trace!("peer is interested");
self.state.peers.mark_peer_interested(self.addr, true);
}
@ -1266,7 +1266,7 @@ impl PeerHandler {
match g.get_chunks_mut()?.mark_chunk_downloaded(&piece) {
Some(ChunkMarkingResult::Completed) => {
debug!("piece={} done, will write and checksum", piece.index,);
trace!("piece={} done, will write and checksum", piece.index,);
// This will prevent others from stealing it.
{
let piece = chunk_info.piece_index;

View file

@ -15,6 +15,7 @@ use std::time::Duration;
use anyhow::bail;
use anyhow::Context;
use buffers::ByteString;
use dht::RequestPeersStream;
use librqbit_core::id20::Id20;
use librqbit_core::lengths::Lengths;
use librqbit_core::peer_id::generate_peer_id;
@ -28,6 +29,7 @@ use tracing::debug;
use tracing::error;
use tracing::error_span;
use tracing::warn;
use tracing::trace;
use url::Url;
use crate::chunk_tracker::ChunkTracker;
@ -165,7 +167,7 @@ impl ManagedTorrent {
pub fn start(
self: &Arc<Self>,
initial_peers: Vec<SocketAddr>,
peer_rx: Option<impl StreamExt<Item = SocketAddr> + Unpin + Send + Sync + 'static>,
peer_rx: Option<RequestPeersStream>,
start_paused: bool,
) -> anyhow::Result<()> {
let mut g = self.locked.write();
@ -195,7 +197,7 @@ impl ManagedTorrent {
fn spawn_peer_adder(
live: &Arc<TorrentStateLive>,
initial_peers: Vec<SocketAddr>,
peer_rx: Option<impl StreamExt<Item = SocketAddr> + Unpin + Send + Sync + 'static>,
peer_rx: Option<RequestPeersStream>,
) {
let span = live.meta().span.clone();
let live = Arc::downgrade(live);
@ -206,6 +208,7 @@ impl ManagedTorrent {
{
let live: Arc<TorrentStateLive> =
live.upgrade().context("no longer live")?;
trace!("adding {} initial peers", initial_peers.len());
for peer in initial_peers {
live.add_peer_if_not_seen(peer).context("torrent closed")?;
}

File diff suppressed because one or more lines are too long

View file

@ -4,7 +4,7 @@
"src": "assets/logo.svg"
},
"index.html": {
"file": "assets/index-1c38bddb.js",
"file": "assets/index-3dee43e7.js",
"isEntry": true,
"src": "index.html"
}

View file

@ -22,6 +22,7 @@ export interface TorrentDetails {
export interface AddTorrentResponse {
id: number | null;
details: TorrentDetails;
seen_peers?: Array<string>;
}
export interface ListTorrentsResponse {
@ -147,7 +148,10 @@ export const API = {
},
uploadTorrent: (data: string | File, opts?: {
listOnly?: boolean, selectedFiles?: Array<number>
listOnly?: boolean,
selectedFiles?: Array<number>,
unpopularTorrent?: boolean,
initialPeers?: Array<string>,
}): Promise<AddTorrentResponse> => {
opts = opts || {};
let url = '/torrents?&overwrite=true';
@ -157,6 +161,15 @@ export const API = {
if (opts.selectedFiles != null) {
url += `&only_files=${opts.selectedFiles.join(',')}`;
}
if (opts.unpopularTorrent) {
url += '&peer_connect_timeout=20&peer_read_write_timeout=60';
}
if (opts.initialPeers) {
url += `&initial_peers=${opts.initialPeers.join(',')}`;
}
if (typeof data === 'string') {
url += '&is_url=true';
}
return makeRequest('POST', url, data)
},

View file

@ -356,8 +356,8 @@ const ErrorDetails = (props: { details: ErrorDetails }) => {
return null;
}
return <>
{details.status && <strong>{details.status} {details.statusText}: </strong>}
{details.text}
<p>{details.status && <strong>{details.status} {details.statusText}</strong>}</p>
<pre>{details.text}</pre>
</>
}
@ -378,11 +378,11 @@ const ErrorComponent = (props: { error: Error, remove?: () => void }) => {
const UploadButton = ({ buttonText, onClick, data, resetData, variant }) => {
const [loading, setLoading] = useState(false);
const [fileList, setFileList] = useState([]);
const [fileListError, setFileListError] = useState(null);
const [listTorrentResponse, setListTorrentResponse] = useState<AddTorrentResponse>(null);
const [listTorrentError, setListTorrentError] = useState<Error>(null);
const ctx = useContext(AppContext);
const showModal = data !== null || fileListError !== null;
const showModal = data !== null || listTorrentError !== null;
// Get the torrent file list if there's data.
useEffect(() => {
@ -394,9 +394,9 @@ const UploadButton = ({ buttonText, onClick, data, resetData, variant }) => {
setLoading(true);
try {
const response = await API.uploadTorrent(data, { listOnly: true });
setFileList(response.details.files);
setListTorrentResponse(response);
} catch (e) {
setFileListError({ text: 'Error uploading torrent', details: e });
setListTorrentError({ text: 'Error listing torrent files', details: e });
} finally {
setLoading(false);
}
@ -406,8 +406,8 @@ const UploadButton = ({ buttonText, onClick, data, resetData, variant }) => {
const clear = () => {
resetData();
setFileListError(null);
setFileList([]);
setListTorrentError(null);
setListTorrentResponse(null);
setLoading(false);
}
@ -420,10 +420,10 @@ const UploadButton = ({ buttonText, onClick, data, resetData, variant }) => {
<FileSelectionModal
show={showModal}
onHide={clear}
fileListError={fileListError}
fileList={fileList}
listTorrentError={listTorrentError}
listTorrentResponse={listTorrentResponse}
data={data}
fileListLoading={loading}
listTorrentLoading={loading}
/>
</>
);
@ -438,7 +438,13 @@ const MagnetInput = () => {
};
return (
<UploadButton variant='primary' buttonText="Add Torrent from Magnet Link" onClick={onClick} data={magnet} resetData={() => setMagnet(null)} />
<UploadButton
variant='primary'
buttonText="Add Torrent from Magnet / URL"
onClick={onClick}
data={magnet}
resetData={() => setMagnet(null)}
/>
);
};
@ -463,7 +469,13 @@ const FileInput = () => {
return (
<>
<input type="file" ref={inputRef} accept=".torrent" onChange={onFileChange} className='d-none' />
<UploadButton variant='secondary' buttonText="Upload .torrent File" onClick={onClick} data={file} resetData={reset} />
<UploadButton
variant='secondary'
buttonText="Upload .torrent File"
onClick={onClick}
data={file}
resetData={reset}
/>
</>
);
};
@ -471,21 +483,22 @@ const FileInput = () => {
const FileSelectionModal = (props: {
show: boolean,
onHide: () => void,
fileList: Array<TorrentFile>,
fileListError: Error,
fileListLoading: boolean,
data: string | File
listTorrentResponse: AddTorrentResponse,
listTorrentError: Error,
listTorrentLoading: boolean,
data: string | File,
}) => {
let { show, onHide, fileList, fileListError, fileListLoading, data } = props;
let { show, onHide, listTorrentResponse, listTorrentError, listTorrentLoading, data } = props;
const [selectedFiles, setSelectedFiles] = useState([]);
const [uploading, setUploading] = useState(false);
const [uploadError, setUploadError] = useState<Error>(null);
const [unpopularTorrent, setUnpopularTorrent] = useState(false);
const ctx = useContext(AppContext);
useEffect(() => {
setSelectedFiles(fileList.map((_, id) => id));
}, [fileList]);
setSelectedFiles(listTorrentResponse ? listTorrentResponse.details.files.map((_, id) => id) : []);
}, [listTorrentResponse]);
const clear = () => {
onHide();
@ -504,43 +517,66 @@ const FileSelectionModal = (props: {
const handleUpload = async () => {
setUploading(true);
API.uploadTorrent(data, { selectedFiles }).then(
() => {
onHide();
ctx.refreshTorrents();
},
let initialPeers = listTorrentResponse.seen_peers ? listTorrentResponse.seen_peers.slice(0, 32) : null;
API.uploadTorrent(data, { selectedFiles, unpopularTorrent, initialPeers }).then(() => {
onHide();
ctx.refreshTorrents();
},
(e) => {
setUploadError({ text: 'Error starting torrent', details: e });
}
).finally(() => setUploading(false));
};
const getBody = () => {
if (listTorrentLoading) {
return <Spinner />;
} else if (listTorrentError) {
return <ErrorComponent error={listTorrentError}></ErrorComponent>;
} else if (listTorrentResponse) {
return <Form>
<fieldset className='mb-5'>
<legend>Pick the files to download</legend>
{listTorrentResponse.details.files.map((file, index) => (
<Form.Group key={index} controlId={`check-${index}`}>
<Form.Check
type="checkbox"
label={`${file.name} (${formatBytes(file.length)})`}
checked={selectedFiles.includes(index)}
onChange={() => handleToggleFile(index)}>
</Form.Check>
</Form.Group>
))}
</fieldset>
<fieldset>
<legend>Other options</legend>
<Form.Group controlId='unpopular-torrent'>
<Form.Check
type="checkbox"
label="Increase timeouts"
checked={unpopularTorrent}
onChange={() => setUnpopularTorrent(!unpopularTorrent)}>
</Form.Check>
<small id="emailHelp" className="form-text text-muted">This might be useful for unpopular torrents with few peers. It will slow down fast torrents though.</small>
</Form.Group>
</fieldset>
</Form >
}
};
return (
<Modal show={show} onHide={clear} size='lg'>
<Modal.Header closeButton>
{!!fileListError || <Modal.Title>Select Files</Modal.Title>}
<Modal.Title>Add torrent</Modal.Title>
</Modal.Header>
<Modal.Body>
{fileListLoading ? <Spinner />
: fileListError ? <ErrorComponent error={fileListError}></ErrorComponent> :
<Form>
{fileList.map((file, index) => (
<Form.Group key={index} controlId={`check-${index}`}>
<Form.Check
type="checkbox"
label={`${file.name} (${formatBytes(file.length)})`}
checked={selectedFiles.includes(index)}
onChange={() => handleToggleFile(index)}>
</Form.Check>
</Form.Group>
))}
</Form>
}
{getBody()}
<ErrorComponent error={uploadError} />
</Modal.Body>
<Modal.Footer>
{uploading && <Spinner />}
<Button variant="primary" onClick={handleUpload} disabled={fileListLoading || uploading || selectedFiles.length == 0}>
<Button variant="primary" onClick={handleUpload} disabled={listTorrentLoading || uploading || selectedFiles.length == 0}>
OK
</Button>
<Button variant="secondary" onClick={clear}>

View file

@ -18,7 +18,7 @@ sha1-rust = ["bencode/sha1-rust"]
[dependencies]
tracing = "0.1.40"
tokio = "1"
tokio = {version = "1", features = ["rt-multi-thread"]}
hex = "0.4"
anyhow = "1"
url = "2"
@ -29,3 +29,6 @@ buffers = {path="../buffers", package="librqbit-buffers", version = "2.2.1"}
bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"}
clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
itertools = "0.12"
[dev-dependencies]
serde_json = "1"

View file

@ -20,11 +20,9 @@ impl FromStr for Id20 {
impl std::fmt::Debug for Id20 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "<")?;
for byte in self.0 {
write!(f, "{byte:02x?}")?;
}
write!(f, ">")?;
Ok(())
}
}
@ -102,6 +100,12 @@ impl Id20 {
}
Id20(xor)
}
pub fn get_bit(&self, bit: u8) -> bool {
let n = self.0[(bit / 8) as usize];
let mask = 1 << (7 - bit % 8);
n & mask > 0
}
pub fn set_bit(&mut self, bit: u8, value: bool) {
let n = &mut self.0[(bit / 8) as usize];
if value {

View file

@ -1,4 +1,4 @@
use tracing::{debug, error, trace, Instrument};
use tracing::{error, trace, Instrument};
pub fn spawn(
span: tracing::Span,
@ -6,12 +6,25 @@ pub fn spawn(
) -> tokio::task::JoinHandle<()> {
let fut = async move {
trace!("started");
match fut.await {
Ok(_) => {
debug!("finished");
}
Err(e) => {
error!("finished with error: {:#}", e)
tokio::pin!(fut);
let mut trace_interval = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
tokio::select! {
_ = trace_interval.tick() => {
trace!("still running");
},
r = &mut fut => {
match r {
Ok(_) => {
trace!("finished");
}
Err(e) => {
error!("finished with error: {:#}", e)
}
}
return;
}
}
}
}

View file

@ -5,7 +5,7 @@ use bencode::BencodeDeserializer;
use buffers::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned;
use itertools::Either;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use crate::id20::Id20;
@ -51,18 +51,23 @@ impl<BufType> TorrentMetaV1<BufType> {
}
}
#[derive(Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct TorrentMetaV1Info<BufType> {
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<BufType>,
pub pieces: BufType,
#[serde(rename = "piece length")]
pub piece_length: u32,
// Single-file mode
#[serde(skip_serializing_if = "Option::is_none")]
pub length: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub md5sum: Option<BufType>,
// Multi-file mode
#[serde(skip_serializing_if = "Option::is_none")]
pub files: Option<Vec<TorrentMetaV1File<BufType>>>,
}
@ -180,7 +185,7 @@ impl<BufType: AsRef<[u8]>> TorrentMetaV1Info<BufType> {
}
}
#[derive(Deserialize, Debug, Clone)]
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct TorrentMetaV1File<BufType> {
pub length: u64,
pub path: Vec<BufType>,
@ -299,4 +304,23 @@ mod tests {
"64a980abe6e448226bb930ba061592e44c3781a1"
);
}
#[test]
fn test_serialize_then_deserialize_bencode() {
let mut buf = Vec::new();
std::fs::File::open(TORRENT_FILENAME)
.unwrap()
.read_to_end(&mut buf)
.unwrap();
let torrent: TorrentMetaV1Info<ByteBuf> = torrent_from_bytes(&buf).unwrap().info;
let mut writer = Vec::new();
bencode::bencode_serialize_to_writer(&torrent, &mut writer).unwrap();
let deserialized = TorrentMetaV1Info::<ByteBuf>::deserialize(
&mut BencodeDeserializer::new_from_buf(&writer),
)
.unwrap();
assert_eq!(torrent, deserialized);
}
}

View file

@ -1,4 +1,4 @@
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use std::{io::LineWriter, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use anyhow::Context;
use clap::{Parser, ValueEnum};
@ -28,10 +28,18 @@ enum LogLevel {
#[derive(Parser)]
#[command(version, author, about)]
struct Opts {
/// The loglevel
/// The console loglevel
#[arg(value_enum, short = 'v')]
log_level: Option<LogLevel>,
/// The log filename to also write to in addition to the console.
#[arg(long = "log-file")]
log_file: Option<String>,
/// The value for RUST_LOG in the log file
#[arg(long = "log-file-rust-log", default_value = "librqbit=trace,info")]
log_file_rust_log: String,
/// The interval to poll trackers, e.g. 30s.
/// Trackers send the refresh interval when we connect to them. Often this is
/// pretty big, e.g. 30 minutes. This can force a certain value.
@ -193,10 +201,31 @@ fn init_logging(opts: &Opts) -> tokio::sync::mpsc::UnboundedSender<String> {
#[cfg(not(feature = "tokio-console"))]
{
tracing_subscriber::registry()
.with(fmt::layer())
.with(stderr_filter)
.init();
let layered = tracing_subscriber::registry().with(fmt::layer().with_filter(stderr_filter));
if let Some(log_file) = &opts.log_file {
let log_file = log_file.clone();
let log_file = move || {
LineWriter::new(
std::fs::OpenOptions::new()
.create(true)
.append(true)
.write(true)
.open(&log_file)
.with_context(|| format!("error opening log file {:?}", log_file))
.unwrap(),
)
};
layered
.with(
fmt::layer()
.with_ansi(false)
.with_writer(log_file)
.with_filter(EnvFilter::builder().parse(&opts.log_file_rust_log).unwrap()),
)
.init();
} else {
layered.init();
}
}
let (reload_tx, mut reload_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
@ -408,7 +437,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
)
.await
{
Ok(ApiAddTorrentResponse { id, details }) => {
Ok(ApiAddTorrentResponse { id, details, .. }) => {
if let Some(id) = id {
info!("{} added to the server with index {}. Query {}/torrents/{}/(stats/haves) for details", details.info_hash, id, http_api_url, id)
}
@ -480,6 +509,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
info_hash: _,
info,
only_files,
..
}) => {
for (idx, (filename, len)) in
info.iter_filenames_and_lengths()?.enumerate()