Merge pull request #42 from ikatson/external-listen

[Work in progress] Listen on TCP and publish ports over Upnp
This commit is contained in:
Igor Katson 2023-12-06 01:18:50 +00:00 committed by GitHub
commit 075084ae51
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1614 additions and 294 deletions

69
Cargo.lock generated
View file

@ -95,6 +95,17 @@ version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]]
name = "async-recursion"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-stream"
version = "0.3.5"
@ -1242,7 +1253,7 @@ dependencies = [
[[package]]
name = "librqbit"
version = "4.0.0"
version = "5.0.0-beta.0"
dependencies = [
"anyhow",
"axum 0.7.1",
@ -1264,6 +1275,7 @@ dependencies = [
"librqbit-dht",
"librqbit-peer-protocol",
"librqbit-sha1-wrapper",
"librqbit-upnp",
"openssl",
"parking_lot",
"rand",
@ -1311,7 +1323,7 @@ version = "2.2.1"
[[package]]
name = "librqbit-core"
version = "3.2.1"
version = "3.3.0"
dependencies = [
"anyhow",
"directories",
@ -1331,7 +1343,7 @@ dependencies = [
[[package]]
name = "librqbit-dht"
version = "4.0.0"
version = "4.1.0"
dependencies = [
"anyhow",
"backoff",
@ -1356,7 +1368,7 @@ dependencies = [
[[package]]
name = "librqbit-peer-protocol"
version = "3.2.1"
version = "3.3.0"
dependencies = [
"anyhow",
"bincode",
@ -1378,6 +1390,23 @@ dependencies = [
"sha1",
]
[[package]]
name = "librqbit-upnp"
version = "0.1.0"
dependencies = [
"anyhow",
"async-recursion",
"futures",
"network-interface",
"reqwest",
"serde",
"serde-xml-rs",
"tokio",
"tracing",
"tracing-subscriber",
"url",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.12"
@ -1471,6 +1500,18 @@ dependencies = [
"tempfile",
]
[[package]]
name = "network-interface"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d68759ef97fe9c9e46f79ea8736c19f1d28992e24c8dc8ce86752918bfeaae7"
dependencies = [
"cc",
"libc",
"thiserror",
"winapi",
]
[[package]]
name = "nom"
version = "7.1.3"
@ -1961,7 +2002,7 @@ dependencies = [
[[package]]
name = "rqbit"
version = "4.0.0"
version = "5.0.0-beta.0"
dependencies = [
"anyhow",
"clap",
@ -2099,6 +2140,18 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde-xml-rs"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb3aa78ecda1ebc9ec9847d5d3aba7d618823446a049ba2491940506da6e2782"
dependencies = [
"log",
"serde",
"thiserror",
"xml-rs",
]
[[package]]
name = "serde_derive"
version = "1.0.193"
@ -2994,3 +3047,9 @@ checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed"
dependencies = [
"tap",
]
[[package]]
name = "xml-rs"
version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fcb9cbac069e033553e8bb871be2fbdffcab578eb25bd0f7c508cedc6dcd75a"

View file

@ -9,7 +9,8 @@ members = [
"crates/sha1w",
"crates/librqbit_core",
"crates/peer_binary_protocol",
"crates/dht"
"crates/dht",
"crates/upnp"
]
[profile.dev]

View file

@ -32,6 +32,10 @@
To do this, a
- [x] Ensure that if we query the "returned" nodes, they are even closer to our request than the responding node id was.
incoming peers:
- [ ] error managing peer: expected extended handshake, but got Bitfield(<94 bytes>)
- [ ] do not announce when merely listing the torrent
someday:
- [x] cancellation from the client-side for the lib (i.e. stop the torrent manager)

View file

@ -1,6 +1,6 @@
[package]
name = "librqbit-dht"
version = "4.0.0"
version = "4.1.0"
edition = "2021"
description = "DHT implementation, used in rqbit torrent client."
license = "Apache-2.0"
@ -34,7 +34,7 @@ indexmap = "2"
dashmap = {version = "5.5.3", features = ["serde"]}
clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
librqbit-core = {path="../librqbit_core", version = "3.2.1"}
librqbit-core = {path="../librqbit_core", version = "3.3.0"}
chrono = {version = "0.4.31", features = ["serde"]}
[dev-dependencies]

View file

@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let dht = DhtBuilder::new().await.context("error initializing DHT")?;
let mut stream = dht.get_peers(info_hash)?;
let mut stream = dht.get_peers(info_hash, None)?;
let stats_printer = async {
loop {

View file

@ -1,6 +1,7 @@
use std::{
cmp::Reverse,
net::SocketAddr,
str::FromStr,
sync::{
atomic::{AtomicU16, Ordering},
Arc,
@ -11,8 +12,8 @@ use std::{
use crate::{
bprotocol::{
self, CompactNodeInfo, ErrorDescription, FindNodeRequest, GetPeersRequest, Message,
MessageKind, Node, PingRequest, Response,
self, AnnouncePeer, CompactNodeInfo, ErrorDescription, FindNodeRequest, GetPeersRequest,
Message, MessageKind, Node, PingRequest, Response,
},
peer_store::PeerStore,
routing_table::{InsertResult, NodeStatus, RoutingTable},
@ -93,17 +94,53 @@ trait RecursiveRequestCallbacks: Sized + Send + Sync + 'static {
);
}
struct RecursiveRequestCallbacksGetPeers {}
struct RecursiveRequestCallbacksGetPeers {
// Id20::from_str("00000fffffffffffffffffffffffffffffffffff").unwrap()
min_distance_to_announce: Id20,
announce_port: Option<u16>,
}
impl RecursiveRequestCallbacks for RecursiveRequestCallbacksGetPeers {
fn on_request_start(&self, _: &RecursiveRequest<Self>, _: Id20, _: SocketAddr) {}
fn on_request_end(
&self,
_: &RecursiveRequest<Self>,
_: Id20,
_: SocketAddr,
_: &anyhow::Result<ResponseOrError>,
req: &RecursiveRequest<Self>,
target_node: Id20,
addr: SocketAddr,
resp: &anyhow::Result<ResponseOrError>,
) {
let announce_port = match self.announce_port {
Some(a) => a,
None => return,
};
let resp = match resp {
Ok(ResponseOrError::Response(resp)) => resp,
_ => return,
};
let token = match &resp.token {
Some(token) => token,
None => return,
};
if req.info_hash.distance(&target_node) > self.min_distance_to_announce {
trace!(
"not announcing, {:?} is too far from {:?}",
target_node,
req.info_hash
);
return;
}
let (tid, message) = req.dht.create_request(Request::Announce {
info_hash: req.info_hash,
token: token.clone(),
port: announce_port,
});
let _ = req.dht.worker_sender.send(WorkerSendRequest {
our_tid: Some(tid),
message,
addr,
});
}
}
@ -153,7 +190,7 @@ pub struct RequestPeersStream {
}
impl RequestPeersStream {
fn new(dht: Arc<DhtState>, info_hash: Id20) -> Self {
fn new(dht: Arc<DhtState>, info_hash: Id20, announce_port: Option<u16>) -> Self {
let (peer_tx, peer_rx) = unbounded_channel();
let (node_tx, node_rx) = unbounded_channel();
let rp = Arc::new(RecursiveRequest {
@ -165,7 +202,13 @@ impl RequestPeersStream {
useful_nodes: RwLock::new(Vec::new()),
peer_tx,
node_tx,
callbacks: RecursiveRequestCallbacksGetPeers {},
callbacks: RecursiveRequestCallbacksGetPeers {
min_distance_to_announce: Id20::from_str(
"0000ffffffffffffffffffffffffffffffffffff",
)
.unwrap(),
announce_port,
},
});
let join_handle = rp.request_peers_forever(node_rx);
Self {
@ -351,7 +394,7 @@ impl<C: RecursiveRequestCallbacks> RecursiveRequest<C> {
self.callbacks.on_request_start(self, id, addr);
}
let response = self.dht.request(self.request, addr).await.map(|r| {
let response = self.dht.request(self.request.clone(), addr).await.map(|r| {
self.mark_node_responded(addr, &r);
r
});
@ -359,7 +402,7 @@ impl<C: RecursiveRequestCallbacks> RecursiveRequest<C> {
self.callbacks.on_request_end(self, id, addr, &response);
}
let response = match self.dht.request(self.request, addr).await {
let response = match self.dht.request(self.request.clone(), addr).await {
Ok(ResponseOrError::Response(r)) => r,
Ok(ResponseOrError::Error(e)) => bail!("error response: {:?}", e),
Err(e) => {
@ -581,6 +624,22 @@ impl DhtState {
ip: None,
kind: MessageKind::PingRequest(PingRequest { id: self.id }),
},
Request::Announce {
info_hash,
token,
port,
} => Message {
kind: MessageKind::AnnouncePeer(AnnouncePeer {
id: self.id,
implied_port: 0,
info_hash,
port,
token,
}),
transaction_id: ByteString::from(transaction_id_buf.as_ref()),
version: None,
ip: None,
},
};
(transaction_id, message)
}
@ -744,10 +803,15 @@ impl DhtState {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Request {
GetPeers(Id20),
FindNode(Id20),
Announce {
info_hash: Id20,
token: ByteString,
port: u16,
},
Ping,
}
@ -1059,7 +1123,7 @@ pub struct DhtConfig {
pub bootstrap_addrs: Option<Vec<String>>,
pub routing_table: Option<RoutingTable>,
pub listen_addr: Option<SocketAddr>,
pub(crate) peer_store: Option<PeerStore>,
pub peer_store: Option<PeerStore>,
}
impl DhtState {
@ -1107,8 +1171,16 @@ impl DhtState {
Ok(state)
}
pub fn get_peers(self: &Arc<Self>, info_hash: Id20) -> anyhow::Result<RequestPeersStream> {
Ok(RequestPeersStream::new(self.clone(), info_hash))
pub fn get_peers(
self: &Arc<Self>,
info_hash: Id20,
announce_port: Option<u16>,
) -> anyhow::Result<RequestPeersStream> {
Ok(RequestPeersStream::new(
self.clone(),
info_hash,
announce_port,
))
}
pub fn listen_addr(&self) -> SocketAddr {

View file

@ -16,7 +16,7 @@ use crate::peer_store::PeerStore;
use crate::routing_table::RoutingTable;
use crate::{Dht, DhtConfig, DhtState};
#[derive(Default, Clone)]
#[derive(Default)]
pub struct PersistentDhtConfig {
pub dump_interval: Option<Duration>,
pub config_filename: Option<PathBuf>,
@ -111,6 +111,7 @@ impl PersistentDht {
.map(|de| (Some(de.addr), Some(de.table), de.peer_store))
.unwrap_or((None, None, None));
let peer_id = routing_table.as_ref().map(|r| r.id());
let dht_config = DhtConfig {
peer_id,
routing_table,

View file

@ -1,6 +1,6 @@
[package]
name = "librqbit"
version = "4.0.0"
version = "5.0.0-beta.0"
authors = ["Igor Katson <igor.katson@gmail.com>"]
edition = "2021"
description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it."
@ -24,11 +24,12 @@ rust-tls = ["reqwest/rustls-tls"]
[dependencies]
bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"}
buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"}
librqbit-core = {path = "../librqbit_core", version = "3.2.1"}
librqbit-core = {path = "../librqbit_core", version = "3.3.0"}
clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.2.1"}
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.3.0"}
sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"}
dht = {path = "../dht", package="librqbit-dht", version="4.0.0"}
dht = {path = "../dht", package="librqbit-dht", version="4.1.0"}
librqbit-upnp = {path = "../upnp", version = "0.1.0"}
tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
axum = {version = "0.7"}
@ -41,7 +42,7 @@ anyhow = "1"
itertools = "0.12"
http = "1"
regex = "1"
reqwest = {version="0.11.22", default-features=false}
reqwest = {version="0.11.22", default-features=false, features = ["json"]}
urlencoding = "2"
byteorder = "1"
bincode = "1"

View file

@ -108,7 +108,7 @@ mod tests {
let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap();
let dht = DhtBuilder::new().await.unwrap();
let peer_rx = dht.get_peers(info_hash).unwrap();
let peer_rx = dht.get_peers(info_hash, None).unwrap();
let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver(peer_id, info_hash, Vec::new(), peer_rx, None).await
{

View file

@ -3,10 +3,11 @@ use std::{
time::{Duration, Instant},
};
use anyhow::Context;
use anyhow::{bail, Context};
use buffers::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned;
use librqbit_core::{id20::Id20, lengths::ChunkInfo, peer_id::try_decode_peer_id};
use parking_lot::RwLock;
use peer_binary_protocol::{
extended::{handshake::ExtendedHandshake, ExtendedMessage},
serialize_piece_preamble, Handshake, Message, MessageBorrowed, MessageDeserializeError,
@ -23,7 +24,7 @@ pub trait PeerConnectionHandler {
fn on_connected(&self, _connection_time: Duration) {}
fn get_have_bytes(&self) -> u64;
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize>;
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>;
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()>;
fn on_extended_handshake(
&self,
extended_handshake: &ExtendedHandshake<ByteBuf>,
@ -62,7 +63,7 @@ pub(crate) struct PeerConnection<H> {
spawner: BlockingSpawner,
}
async fn with_timeout<T, E>(
pub(crate) async fn with_timeout<T, E>(
timeout_value: Duration,
fut: impl std::future::Future<Output = Result<T, E>>,
) -> anyhow::Result<T>
@ -120,9 +121,63 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
}
}
pub async fn manage_peer(
// By the time this is called:
// read_buf should start with valuable data. The handshake should be removed from it.
pub async fn manage_peer_incoming(
&self,
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
// How many bytes into read buffer have we read already.
read_so_far: usize,
read_buf: Vec<u8>,
handshake: Handshake<ByteString>,
mut conn: tokio::net::TcpStream,
) -> anyhow::Result<()> {
use tokio::io::AsyncWriteExt;
let rwtimeout = self
.options
.read_write_timeout
.unwrap_or_else(|| Duration::from_secs(10));
if handshake.info_hash != self.info_hash.0 {
anyhow::bail!("wrong info hash");
}
if handshake.peer_id == self.peer_id.0 {
bail!("looks like we are connecting to ourselves");
}
trace!(
"incoming connection: id={:?}",
try_decode_peer_id(Id20(handshake.peer_id))
);
let mut write_buf = Vec::<u8>::with_capacity(PIECE_MESSAGE_DEFAULT_LEN);
let handshake = Handshake::new(self.info_hash, self.peer_id);
handshake.serialize(&mut write_buf);
with_timeout(rwtimeout, conn.write_all(&write_buf))
.await
.context("error writing handshake")?;
write_buf.clear();
let h_supports_extended = handshake.supports_extended();
self.handler.on_handshake(handshake)?;
self.manage_peer(
h_supports_extended,
read_so_far,
read_buf,
write_buf,
conn,
outgoing_chan,
)
.await
}
pub async fn manage_peer_outgoing(
&self,
outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@ -161,44 +216,65 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let (h, size) = Handshake::deserialize(&read_buf[..read_so_far])
.map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?;
let h_supports_extended = h.supports_extended();
trace!("connected: id={:?}", try_decode_peer_id(Id20(h.peer_id)));
if h.info_hash != self.info_hash.0 {
anyhow::bail!("info hash does not match");
}
let mut extended_handshake: Option<ExtendedHandshake<ByteString>> = None;
let supports_extended = h.supports_extended();
if h.peer_id == self.peer_id.0 {
bail!("looks like we are connecting to ourselves");
}
self.handler.on_handshake(h)?;
if read_so_far > size {
read_buf.copy_within(size..read_so_far, 0);
}
read_so_far -= size;
self.manage_peer(
h_supports_extended,
read_so_far,
read_buf,
write_buf,
conn,
outgoing_chan,
)
.await
}
async fn manage_peer(
&self,
handshake_supports_extended: bool,
// How many bytes into read_buf is there of peer-sent-data.
mut read_so_far: usize,
mut read_buf: Vec<u8>,
mut write_buf: Vec<u8>,
mut conn: tokio::net::TcpStream,
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let rwtimeout = self
.options
.read_write_timeout
.unwrap_or_else(|| Duration::from_secs(10));
let extended_handshake: RwLock<Option<ExtendedHandshake<ByteString>>> = RwLock::new(None);
let extended_handshake_ref = &extended_handshake;
let supports_extended = handshake_supports_extended;
if supports_extended {
let my_extended =
Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new()));
trace!("sending extended handshake: {:?}", &my_extended);
my_extended.serialize(&mut write_buf, None).unwrap();
my_extended.serialize(&mut write_buf, &|| None).unwrap();
with_timeout(rwtimeout, conn.write_all(&write_buf))
.await
.context("error writing extended handshake")?;
write_buf.clear();
let (extended, size) = read_one!(conn, read_buf, read_so_far, rwtimeout);
match extended {
Message::Extended(ExtendedMessage::Handshake(h)) => {
trace!("received: {:?}", &h);
self.handler.on_extended_handshake(&h)?;
extended_handshake = Some(h.clone_to_owned())
}
other => anyhow::bail!("expected extended handshake, but got {:?}", other),
};
if read_so_far > size {
read_buf.copy_within(size..read_so_far, 0);
}
read_so_far -= size;
}
let (mut read_half, mut write_half) = tokio::io::split(conn);
@ -231,9 +307,12 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let mut uploaded_add = None;
let len = match &req {
WriterRequest::Message(msg) => {
msg.serialize(&mut write_buf, extended_handshake.as_ref())?
}
WriterRequest::Message(msg) => msg.serialize(&mut write_buf, &|| {
extended_handshake_ref
.read()
.as_ref()
.and_then(|e| e.ut_metadata())
})?,
WriterRequest::ReadChunkRequest(chunk) => {
// this whole section is an optimization
write_buf.resize(PIECE_MESSAGE_DEFAULT_LEN, 0);
@ -251,6 +330,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
full_len
}
WriterRequest::Disconnect => {
trace!("disconnect requested, closing writer");
return Ok(());
}
};
@ -277,9 +357,15 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let (message, size) = read_one!(read_half, read_buf, read_so_far, rwtimeout);
trace!("received: {:?}", &message);
self.handler
.on_received_message(message)
.context("error in handler.on_received_message()")?;
if let Message::Extended(ExtendedMessage::Handshake(h)) = &message {
*extended_handshake_ref.write() = Some(h.clone_to_owned());
self.handler.on_extended_handshake(h)?;
trace!("remembered extended handshake for future serializing");
} else {
self.handler
.on_received_message(message)
.context("error in handler.on_received_message()")?;
}
if read_so_far > size {
read_buf.copy_within(size..read_so_far, 0);
@ -293,10 +379,15 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
};
let r = tokio::select! {
r = reader => {r}
r = writer => {r}
r = reader => {
trace!("reader is done, exiting");
r
}
r = writer => {
trace!("writer is done, exiting");
r
}
};
trace!("either reader or writer are done, exiting");
r
}
}

View file

@ -51,7 +51,7 @@ pub(crate) async fn read_metainfo_from_peer(
);
let result_reader = async move { result_rx.await? };
let connection_runner = async move { connection.manage_peer(writer_rx).await };
let connection_runner = async move { connection.manage_peer_outgoing(writer_rx).await };
tokio::select! {
result = result_reader => result,
@ -145,7 +145,7 @@ impl PeerConnectionHandler for Handler {
Ok(0)
}
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> {
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()> {
if !handshake.supports_extended() {
anyhow::bail!("this peer does not support extended handshaking, which is a prerequisite to download metadata")
}

View file

@ -12,7 +12,11 @@ use std::{
use anyhow::{bail, Context};
use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::{ByteBufT, ByteString};
use dht::{Dht, DhtBuilder, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream};
use clone_to_owned::CloneToOwned;
use dht::{
Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream,
};
use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt};
use librqbit_core::{
directories::get_configuration_directory,
magnet::Magnet,
@ -20,16 +24,23 @@ use librqbit_core::{
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned},
};
use parking_lot::RwLock;
use peer_binary_protocol::{Handshake, PIECE_MESSAGE_DEFAULT_LEN};
use reqwest::Url;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as;
use tracing::{debug, error, error_span, info, warn};
use tokio::{
io::AsyncReadExt,
net::{TcpListener, TcpStream},
};
use tracing::{debug, error, error_span, info, trace, warn, Instrument};
use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
peer_connection::PeerConnectionOptions,
peer_connection::{with_timeout, PeerConnectionOptions},
spawn_utils::{spawn, BlockingSpawner},
torrent_state::{ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState},
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
},
};
pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
@ -147,6 +158,11 @@ pub struct Session {
spawner: BlockingSpawner,
db: RwLock<SessionDatabase>,
output_folder: PathBuf,
tcp_listen_port: Option<u16>,
cancel_tx: tokio::sync::watch::Sender<()>,
cancel_rx: tokio::sync::watch::Receiver<()>,
}
async fn torrent_from_url(url: &str) -> anyhow::Result<TorrentMetaV1Owned> {
@ -214,6 +230,8 @@ pub struct AddTorrentOptions {
#[serde_as(as = "Option<serde_with::DurationSeconds>")]
pub force_tracker_interval: Option<Duration>,
pub disable_trackers: bool,
/// Initial peers to start of with.
pub initial_peers: Option<Vec<SocketAddr>>,
@ -322,6 +340,31 @@ pub struct SessionOptions {
pub peer_id: Option<Id20>,
/// Configure default peer connection options. Can be overriden per torrent.
pub peer_opts: Option<PeerConnectionOptions>,
pub listen_port_range: Option<std::ops::Range<u16>>,
pub enable_upnp_port_forwarding: bool,
}
async fn create_tcp_listener(
port_range: std::ops::Range<u16>,
) -> anyhow::Result<(TcpListener, u16)> {
for port in port_range.clone() {
match TcpListener::bind(("0.0.0.0", port)).await {
Ok(l) => return Ok((l, port)),
Err(e) => {
debug!("error listening on port {port}: {e:#}")
}
}
}
bail!("no free TCP ports in range {port_range:?}");
}
pub(crate) struct CheckedIncomingConnection {
pub addr: SocketAddr,
pub stream: tokio::net::TcpStream,
pub read_buf: Vec<u8>,
pub handshake: Handshake<ByteString>,
pub read_so_far: usize,
}
impl Session {
@ -333,16 +376,28 @@ impl Session {
/// Create a new session with options.
pub async fn new_with_opts(
output_folder: PathBuf,
opts: SessionOptions,
mut opts: SessionOptions,
) -> anyhow::Result<Arc<Self>> {
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range {
let (l, p) = create_tcp_listener(port_range)
.await
.context("error listening on TCP")?;
info!("Listening on 0.0.0.0:{p} for incoming peer connections");
(Some(l), Some(p))
} else {
(None, None)
};
let dht = if opts.disable_dht {
None
} else {
let dht = if opts.disable_dht_persistence {
DhtBuilder::new().await
DhtBuilder::with_config(DhtConfig::default()).await
} else {
PersistentDht::create(opts.dht_config).await
let pdht_config = opts.dht_config.take().unwrap_or_default();
PersistentDht::create(Some(pdht_config)).await
}
.context("error initializing DHT")?;
Some(dht)
@ -355,6 +410,9 @@ impl Session {
.join("session.json"),
};
let spawner = BlockingSpawner::default();
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(());
let session = Arc::new(Self {
persistence_filename,
peer_id,
@ -363,8 +421,29 @@ impl Session {
spawner,
output_folder,
db: RwLock::new(Default::default()),
cancel_rx,
cancel_tx,
tcp_listen_port,
});
if let Some(tcp_listener) = tcp_listener {
session.spawn(
"tcp listener",
error_span!("tcp_listen", port = tcp_listen_port),
session.clone().task_tcp_listener(tcp_listener),
);
}
if let Some(listen_port) = tcp_listen_port {
if opts.enable_upnp_port_forwarding {
session.spawn(
"upnp_forward",
error_span!("upnp_forward", port = listen_port),
session.clone().task_upnp_port_forwarder(listen_port),
);
}
}
if opts.persistence {
info!(
"will use {:?} for session persistence",
@ -375,36 +454,140 @@ impl Session {
format!("couldn't create directory {:?} for session storage", parent)
})?;
}
let session = session.clone();
spawn(
let persistence_task = session.clone().task_persistence();
session.spawn(
"session persistene",
error_span!("session_persistence"),
async move {
// Populate initial from the state filename
if let Err(e) = session.populate_from_stored().await {
error!("could not populate session from stored file: {:?}", e);
}
let session = Arc::downgrade(&session);
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
let session = match session.upgrade() {
Some(s) => s,
None => break,
};
if let Err(e) = session.dump_to_disk() {
error!("error dumping session to disk: {:?}", e);
}
}
Ok(())
},
persistence_task,
);
}
Ok(session)
}
async fn task_persistence(self: Arc<Self>) -> anyhow::Result<()> {
// Populate initial from the state filename
if let Err(e) = self.populate_from_stored().await {
error!("could not populate session from stored file: {:?}", e);
}
let session = Arc::downgrade(&self);
drop(self);
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
let session = match session.upgrade() {
Some(s) => s,
None => break,
};
if let Err(e) = session.dump_to_disk() {
error!("error dumping session to disk: {:?}", e);
}
}
Ok(())
}
async fn check_incoming_connection(
&self,
addr: SocketAddr,
mut stream: TcpStream,
) -> anyhow::Result<(Arc<TorrentStateLive>, CheckedIncomingConnection)> {
// TODO: move buffer handling to peer_connection
let rwtimeout = self
.peer_opts
.read_write_timeout
.unwrap_or_else(|| Duration::from_secs(10));
let mut read_buf = vec![0u8; PIECE_MESSAGE_DEFAULT_LEN * 2];
let mut read_so_far = with_timeout(rwtimeout, stream.read(&mut read_buf))
.await
.context("error reading handshake")?;
if read_so_far == 0 {
anyhow::bail!("bad handshake");
}
let (h, size) = Handshake::deserialize(&read_buf[..read_so_far])
.map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?;
trace!("received handshake from {addr}: {:?}", h);
if h.peer_id == self.peer_id.0 {
bail!("seems like we are connecting to ourselves, ignoring");
}
for (id, torrent) in self.db.read().torrents.iter() {
if torrent.info_hash().0 != h.info_hash {
continue;
}
let live = match torrent.live() {
Some(live) => live,
None => {
bail!("torrent {id} is not live, ignoring connection");
}
};
let handshake = h.clone_to_owned();
if read_so_far > size {
read_buf.copy_within(size..read_so_far, 0);
}
read_so_far -= size;
return Ok((
live,
CheckedIncomingConnection {
addr,
stream,
handshake,
read_buf,
read_so_far,
},
));
}
bail!("didn't find a matching torrent for {:?}", Id20(h.info_hash))
}
async fn task_tcp_listener(self: Arc<Self>, l: TcpListener) -> anyhow::Result<()> {
let mut futs = FuturesUnordered::new();
loop {
tokio::select! {
r = l.accept() => {
match r {
Ok((stream, addr)) => {
trace!("accepted connection from {addr}");
futs.push(
self.check_incoming_connection(addr, stream)
.map_err(|e| {
debug!("error checking incoming connection: {e:#}");
e
})
.instrument(error_span!("incoming", addr=%addr))
);
}
Err(e) => {
error!("error accepting: {e:#}");
continue;
}
}
},
Some(Ok((live, checked))) = futs.next(), if !futs.is_empty() => {
if let Err(e) = live.add_incoming_peer(checked) {
warn!("error handing over incoming connection: {e:#}");
}
},
}
}
}
async fn task_upnp_port_forwarder(self: Arc<Self>, port: u16) -> anyhow::Result<()> {
let pf = librqbit_upnp::UpnpPortForwarder::new(vec![port], None)?;
pf.run_forever().await
}
pub fn get_dht(&self) -> Option<&Dht> {
self.dht.as_ref()
}
@ -425,6 +608,28 @@ impl Session {
}
}
fn spawn(
&self,
name: &str,
span: tracing::Span,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) {
let mut cancel_rx = self.cancel_rx.clone();
spawn(name, span, async move {
tokio::select! {
r = fut => r,
_ = cancel_rx.changed() => {
debug!("task canceled");
Ok(())
}
}
});
}
pub fn stop(&self) {
let _ = self.cancel_tx.send(());
}
async fn populate_from_stored(self: &Arc<Self>) -> anyhow::Result<()> {
let mut rdr = match std::fs::File::open(&self.persistence_filename) {
Ok(f) => BufReader::new(f),
@ -533,6 +738,12 @@ impl Session {
let opts = opts.unwrap_or_default();
let announce_port = if opts.list_only {
None
} else {
self.tcp_listen_port
};
let (info_hash, info, dht_rx, trackers, initial_peers) = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
let Magnet {
@ -544,7 +755,7 @@ impl Session {
.dht
.as_ref()
.context("magnet links without DHT are not supported")?
.get_peers(info_hash)?;
.get_peers(info_hash, announce_port)?;
let trackers = trackers
.into_iter()
@ -607,7 +818,7 @@ impl Session {
let dht_rx = match self.dht.as_ref() {
Some(dht) if !opts.paused && !opts.list_only => {
debug!("reading peers for {:?} from DHT", torrent.info_hash);
Some(dht.get_peers(torrent.info_hash)?)
Some(dht.get_peers(torrent.info_hash, announce_port)?)
}
_ => None,
};
@ -635,7 +846,11 @@ impl Session {
torrent.info,
dht_rx,
trackers,
Default::default(),
opts.initial_peers
.clone()
.unwrap_or_default()
.into_iter()
.collect(),
)
}
};
@ -743,8 +958,11 @@ impl Session {
builder
.overwrite(opts.overwrite)
.spawner(self.spawner)
.peer_id(self.peer_id)
.trackers(trackers);
.peer_id(self.peer_id);
if opts.disable_trackers {
builder.trackers(trackers);
}
if let Some(only_files) = only_files {
builder.only_files(only_files);
@ -833,7 +1051,7 @@ impl Session {
let peer_rx = self
.dht
.as_ref()
.map(|dht| dht.get_peers(handle.info_hash()))
.map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port))
.transpose()?;
handle.start(Default::default(), peer_rx, false)?;
Ok(())

View file

@ -76,7 +76,7 @@ use sha1w::Sha1;
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Notify, Semaphore,
Notify, OwnedSemaphorePermit, Semaphore,
},
time::timeout,
};
@ -89,7 +89,9 @@ use crate::{
peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
},
session::CheckedIncomingConnection,
spawn_utils::spawn,
torrent_state::{peer::Peer, utils::atomic_inc},
tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse},
type_aliases::{PeerHandle, BF},
};
@ -100,7 +102,7 @@ use self::{
atomic::PeerCountersAtomic as AtomicPeerCounters,
snapshot::{PeerStatsFilter, PeerStatsSnapshot},
},
InflightRequest, PeerState, PeerTx, SendMany,
InflightRequest, PeerRx, PeerState, PeerTx,
},
peers::PeerStates,
stats::{atomic::AtomicStats, snapshot::StatsSnapshot},
@ -176,7 +178,7 @@ pub struct TorrentStateLive {
lengths: Lengths,
// Limits how many active (occupying network resources) peers there are at a moment in time.
peer_semaphore: Semaphore,
peer_semaphore: Arc<Semaphore>,
// The queue for peer manager to connect to them.
peer_queue_tx: UnboundedSender<SocketAddr>,
@ -186,7 +188,8 @@ pub struct TorrentStateLive {
cancel_tx: tokio::sync::watch::Sender<()>,
cancel_rx: tokio::sync::watch::Receiver<()>,
speed_estimator: SpeedEstimator,
down_speed_estimator: SpeedEstimator,
up_speed_estimator: SpeedEstimator,
}
impl TorrentStateLive {
@ -196,7 +199,8 @@ impl TorrentStateLive {
) -> Arc<Self> {
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
let speed_estimator = SpeedEstimator::new(5);
let down_speed_estimator = SpeedEstimator::new(5);
let up_speed_estimator = SpeedEstimator::new(5);
let have_bytes = paused.have_bytes;
let needed_bytes = paused.info.lengths.total_length() - have_bytes;
@ -220,10 +224,11 @@ impl TorrentStateLive {
},
initially_needed_bytes: needed_bytes,
lengths,
peer_semaphore: Semaphore::new(128),
peer_semaphore: Arc::new(Semaphore::new(128)),
peer_queue_tx,
finished_notify: Notify::new(),
speed_estimator,
down_speed_estimator,
up_speed_estimator,
cancel_rx,
cancel_tx,
});
@ -247,6 +252,7 @@ impl TorrentStateLive {
Some(state) => state,
None => return Ok(()),
};
let now = Instant::now();
let stats = state.stats_snapshot();
let fetched = stats.fetched_bytes;
let needed = state.initially_needed();
@ -255,8 +261,11 @@ impl TorrentStateLive {
.wrapping_sub(fetched)
.min(needed - stats.downloaded_and_checked_bytes);
state
.speed_estimator
.add_snapshot(fetched, remaining, Instant::now());
.down_speed_estimator
.add_snapshot(fetched, Some(remaining), now);
state
.up_speed_estimator
.add_snapshot(stats.uploaded_bytes, None, now);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
@ -289,8 +298,12 @@ impl TorrentStateLive {
});
}
pub fn speed_estimator(&self) -> &SpeedEstimator {
&self.speed_estimator
pub fn down_speed_estimator(&self) -> &SpeedEstimator {
&self.down_speed_estimator
}
pub fn up_speed_estimator(&self) -> &SpeedEstimator {
&self.up_speed_estimator
}
async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result<u64> {
@ -361,10 +374,127 @@ impl TorrentStateLive {
}
}
async fn task_manage_peer(self: Arc<Self>, addr: SocketAddr) -> anyhow::Result<()> {
pub(crate) fn add_incoming_peer(
self: &Arc<Self>,
checked_peer: CheckedIncomingConnection,
) -> anyhow::Result<()> {
use dashmap::mapref::entry::Entry;
let (tx, rx) = unbounded_channel();
let permit = match self.peer_semaphore.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
warn!("limit of live peers reached, dropping incoming peer");
self.peers.with_peer(checked_peer.addr, |p| {
atomic_inc(&p.stats.counters.incoming_connections);
});
return Ok(());
}
};
let counters = match self.peers.states.entry(checked_peer.addr) {
Entry::Occupied(mut occ) => {
let peer = occ.get_mut();
peer.state
.incoming_connection(
Id20(checked_peer.handshake.peer_id),
tx.clone(),
&self.peers.stats,
)
.context("peer already existed")?;
peer.stats.counters.clone()
}
Entry::Vacant(vac) => {
atomic_inc(&self.peers.stats.seen);
let peer = Peer::new_live_for_incoming_connection(
Id20(checked_peer.handshake.peer_id),
tx.clone(),
&self.peers.stats,
);
let counters = peer.stats.counters.clone();
vac.insert(peer);
counters
}
};
atomic_inc(&counters.incoming_connections);
self.spawn(
"incoming peer",
error_span!("manage_incoming_peer", addr = %checked_peer.addr),
self.clone()
.task_manage_incoming_peer(checked_peer, counters, tx, rx, permit),
);
Ok(())
}
async fn task_manage_incoming_peer(
self: Arc<Self>,
checked_peer: CheckedIncomingConnection,
counters: Arc<AtomicPeerCounters>,
tx: PeerTx,
rx: PeerRx,
permit: OwnedSemaphorePermit,
) -> anyhow::Result<()> {
// TODO: bump counters for incoming
let handler = PeerHandler {
addr: checked_peer.addr,
on_bitfield_notify: Default::default(),
unchoke_notify: Default::default(),
locked: RwLock::new(PeerHandlerLocked {
i_am_choked: true,
previously_requested_pieces: BF::new(),
}),
requests_sem: Semaphore::new(0),
state: self.clone(),
tx,
counters,
};
let options = PeerConnectionOptions {
connect_timeout: self.meta.options.peer_connect_timeout,
read_write_timeout: self.meta.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
checked_peer.addr,
self.meta.info_hash,
self.meta.peer_id,
&handler,
Some(options),
self.meta.spawner,
);
let requester = handler.task_peer_chunk_requester();
let res = tokio::select! {
r = requester => {r}
r = peer_connection.manage_peer_incoming(
rx,
checked_peer.read_so_far,
checked_peer.read_buf,
checked_peer.handshake,
checked_peer.stream
) => {r}
};
match res {
// We disconnected the peer ourselves as we don't need it
Ok(()) => {
handler.on_peer_died(None)?;
}
Err(e) => {
debug!("error managing peer: {:#}", e);
handler.on_peer_died(Some(e))?;
}
};
drop(permit);
Ok(())
}
async fn task_manage_outgoing_peer(
self: Arc<Self>,
addr: SocketAddr,
permit: OwnedSemaphorePermit,
) -> anyhow::Result<()> {
let state = self;
let (rx, tx) = state.peers.mark_peer_connecting(addr)?;
let counters = state
.peers
.with_peer(addr, |p| p.stats.counters.clone())
@ -396,19 +526,17 @@ impl TorrentStateLive {
Some(options),
state.meta.spawner,
);
let requester = handler.task_peer_chunk_requester(addr);
let requester = handler.task_peer_chunk_requester();
handler
.counters
.connection_attempts
.outgoing_connection_attempts
.fetch_add(1, Ordering::Relaxed);
let res = tokio::select! {
r = requester => {r}
r = peer_connection.manage_peer(rx) => {r}
r = peer_connection.manage_peer_outgoing(rx) => {r}
};
handler.state.peer_semaphore.add_permits(1);
match res {
// We disconnected the peer ourselves as we don't need it
Ok(()) => {
@ -419,6 +547,7 @@ impl TorrentStateLive {
handler.on_peer_died(Some(e))?;
}
}
drop(permit);
Ok::<_, anyhow::Error>(())
}
@ -435,12 +564,11 @@ impl TorrentStateLive {
continue;
}
let permit = state.peer_semaphore.acquire().await?;
permit.forget();
let permit = state.peer_semaphore.clone().acquire_owned().await?;
state.spawn(
"manage_peer",
error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()),
state.clone().task_manage_peer(addr),
state.clone().task_manage_outgoing_peer(addr, permit),
);
}
}
@ -478,43 +606,11 @@ impl TorrentStateLive {
TimedExistence::new(timeit(reason, || self.locked.write()), reason)
}
fn get_next_needed_piece(
&self,
peer_handle: PeerHandle,
) -> anyhow::Result<Option<ValidPieceIndex>> {
self.peers
.with_live_mut(peer_handle, "l(get_next_needed_piece)", |live| {
let g = self.lock_read("g(get_next_needed_piece)");
let bf = &live.bitfield;
for n in g.get_chunks()?.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
// in theory it should be safe without validation, but whatever.
return Ok(self.lengths.validate_piece_index(n as u32));
}
}
Ok(None)
})
.transpose()
.map(|r| r.flatten())
}
fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool {
matches!(self.get_next_needed_piece(handle), Ok(Some(_)))
}
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| {
fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) {
self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.state
.connecting_to_live(Id20(h.peer_id), &self.peers.stats)
.is_some()
.connecting_to_live(Id20(h.peer_id), &self.peers.stats);
});
match result {
Some(true) => {
trace!("set peer to live")
}
Some(false) => debug!("can't set peer live, it was in wrong state"),
None => debug!("can't set peer live, it disappeared"),
}
}
pub fn get_uploaded_bytes(&self) -> u64 {
@ -731,7 +827,9 @@ struct PeerHandler {
impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn on_connected(&self, connection_time: Duration) {
self.counters.connections.fetch_add(1, Ordering::Relaxed);
self.counters
.outgoing_connections
.fetch_add(1, Ordering::Relaxed);
self.counters
.total_time_connecting_ms
.fetch_add(connection_time.as_millis() as u64, Ordering::Relaxed);
@ -766,13 +864,15 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize> {
let g = self.state.lock_read("serialize_bitfield_message_to_buf");
let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_raw_slice()));
let len = msg.serialize(buf, None)?;
let len = msg.serialize(buf, &|| None)?;
trace!("sending: {:?}, length={}", &msg, len);
Ok(len)
}
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> {
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()> {
self.state.set_peer_live(self.addr, handshake);
self.tx
.send(WriterRequest::Message(MessageOwned::Unchoke))?;
Ok(())
}
@ -1022,7 +1122,8 @@ impl PeerHandler {
self.state
.peers
.with_live_mut(self.addr, "on_have", |live| {
// If bitfield wasn't allocated yet, let's do it. Some clients send haves before bitfield.
// If bitfield wasn't allocated yet, let's do it. Some clients start empty so they never
// send bitfields.
if live.bitfield.is_empty() {
live.bitfield =
BF::from_vec(vec![0; self.state.lengths.piece_bitfield_bytes()]);
@ -1035,6 +1136,7 @@ impl PeerHandler {
}
};
trace!("updated bitfield with have={}", have);
self.on_bitfield_notify.notify_waiters();
});
}
@ -1050,43 +1152,66 @@ impl PeerHandler {
self.state
.peers
.update_bitfield_from_vec(self.addr, bitfield.0);
if !self.state.am_i_interested_in_peer(self.addr) {
self.tx
.send(WriterRequest::Message(MessageOwned::Unchoke))?;
self.tx
.send(WriterRequest::Message(MessageOwned::NotInterested))?;
if self.state.is_finished() {
self.tx.send(WriterRequest::Disconnect)?;
}
return Ok(());
}
self.on_bitfield_notify.notify_waiters();
Ok(())
}
async fn task_peer_chunk_requester(&self, handle: PeerHandle) -> anyhow::Result<()> {
self.on_bitfield_notify.notified().await;
self.tx.send_many([
WriterRequest::Message(MessageOwned::Unchoke),
WriterRequest::Message(MessageOwned::Interested),
])?;
async fn wait_for_any_notify(&self, notify: &Notify, check: impl Fn() -> bool) {
// To remove possibility of races, we first grab a token, then check
// if we need it, and only if so, await.
let notified = notify.notified();
if check() {
return;
}
notified.await;
}
#[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), self.unchoke_notify.notified()).await;
async fn wait_for_bitfield(&self) {
self.wait_for_any_notify(&self.on_bitfield_notify, || {
self.state
.peers
.with_live(self.addr, |live| !live.bitfield.is_empty())
.unwrap_or_default()
})
.await;
}
async fn wait_for_unchoke(&self) {
self.wait_for_any_notify(&self.unchoke_notify, || !self.locked.read().i_am_choked)
.await;
}
async fn task_peer_chunk_requester(&self) -> anyhow::Result<()> {
let handle = self.addr;
self.wait_for_bitfield().await;
// TODO: this check needs to happen more often, we need to update our
// interested state with the other side, for now we send it only once.
if self.state.is_finished() {
self.tx
.send(WriterRequest::Message(MessageOwned::NotInterested))?;
if self
.state
.peers
.with_live(self.addr, |l| {
l.has_full_torrent(self.state.lengths.total_pieces() as usize)
})
.unwrap_or_default()
{
debug!("both peer and us have full torrent, disconnecting");
self.tx.send(WriterRequest::Disconnect)?;
// Sleep a bit to ensure this gets written to the network by manage_peer
tokio::time::sleep(Duration::from_millis(100)).await;
return Ok(());
}
} else {
self.tx
.send(WriterRequest::Message(MessageOwned::Interested))?;
}
loop {
if self.locked.read().i_am_choked {
debug!("we are choked, can't reserve next piece");
#[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), self.unchoke_notify.notified()).await;
}
continue;
}
self.wait_for_unchoke().await;
if self.state.is_finished() {
debug!("nothing left to download, looping forever until manage_peer quits");

View file

@ -2,8 +2,6 @@ pub mod stats;
use std::collections::HashSet;
use anyhow::Context;
use librqbit_core::id20::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
@ -29,29 +27,30 @@ impl From<&ChunkInfo> for InflightRequest {
}
}
// TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak.
pub(crate) type PeerRx = UnboundedReceiver<WriterRequest>;
pub(crate) type PeerTx = UnboundedSender<WriterRequest>;
pub trait SendMany {
fn send_many(&self, requests: impl IntoIterator<Item = WriterRequest>) -> anyhow::Result<()>;
}
impl SendMany for PeerTx {
fn send_many(&self, requests: impl IntoIterator<Item = WriterRequest>) -> anyhow::Result<()> {
requests
.into_iter()
.try_for_each(|r| self.send(r))
.context("peer dropped")
}
}
#[derive(Debug, Default)]
pub(crate) struct Peer {
pub state: PeerStateNoMut,
pub stats: stats::atomic::PeerStats,
}
impl Peer {
pub fn new_live_for_incoming_connection(
peer_id: Id20,
tx: PeerTx,
counters: &AggregatePeerStatsAtomic,
) -> Self {
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx)));
counters.inc(&state.0);
Self {
state,
stats: Default::default(),
}
}
}
#[derive(Debug, Default)]
pub(crate) enum PeerState {
#[default]
@ -109,6 +108,13 @@ impl PeerStateNoMut {
std::mem::replace(&mut self.0, new)
}
pub fn get_live(&self) -> Option<&LivePeerState> {
match &self.0 {
PeerState::Live(l) => Some(l),
_ => None,
}
}
pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> {
match &mut self.0 {
PeerState::Live(l) => Some(l),
@ -129,6 +135,25 @@ impl PeerStateNoMut {
None
}
}
pub fn incoming_connection(
&mut self,
peer_id: Id20,
tx: PeerTx,
counters: &AggregatePeerStatsAtomic,
) -> anyhow::Result<()> {
if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) {
anyhow::bail!("peer already active");
}
match self.take(counters) {
PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => {
self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters);
}
PeerState::Connecting(..) | PeerState::Live(..) => unreachable!(),
}
Ok(())
}
pub fn connecting_to_live(
&mut self,
peer_id: Id20,

View file

@ -12,8 +12,9 @@ use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
pub(crate) struct PeerCountersAtomic {
pub fetched_bytes: AtomicU64,
pub total_time_connecting_ms: AtomicU64,
pub connection_attempts: AtomicU32,
pub connections: AtomicU32,
pub incoming_connections: AtomicU32,
pub outgoing_connection_attempts: AtomicU32,
pub outgoing_connections: AtomicU32,
pub errors: AtomicU32,
pub fetched_chunks: AtomicU32,
pub downloaded_and_checked_pieces: AtomicU32,

View file

@ -6,6 +6,7 @@ use crate::torrent_state::live::peer::{Peer, PeerState};
#[derive(Serialize, Deserialize)]
pub struct PeerCounters {
pub incoming_connections: u32,
pub fetched_bytes: u64,
pub total_time_connecting_ms: u64,
pub connection_attempts: u32,
@ -24,10 +25,13 @@ pub struct PeerStats {
impl From<&super::atomic::PeerCountersAtomic> for PeerCounters {
fn from(counters: &super::atomic::PeerCountersAtomic) -> Self {
Self {
incoming_connections: counters.incoming_connections.load(Ordering::Relaxed),
fetched_bytes: counters.fetched_bytes.load(Ordering::Relaxed),
total_time_connecting_ms: counters.total_time_connecting_ms.load(Ordering::Relaxed),
connection_attempts: counters.connection_attempts.load(Ordering::Relaxed),
connections: counters.connections.load(Ordering::Relaxed),
connection_attempts: counters
.outgoing_connection_attempts
.load(Ordering::Relaxed),
connections: counters.outgoing_connections.load(Ordering::Relaxed),
errors: counters.errors.load(Ordering::Relaxed),
fetched_chunks: counters.fetched_chunks.load(Ordering::Relaxed),
downloaded_and_checked_pieces: counters

View file

@ -53,6 +53,11 @@ impl PeerStates {
.map(|e| f(TimedExistence::new(e, reason).value_mut()))
}
pub fn with_live<R>(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option<R> {
self.with_peer(addr, |peer| peer.state.get_live().map(f))
.flatten()
}
pub fn with_live_mut<R>(
&self,
addr: PeerHandle,

View file

@ -10,6 +10,7 @@ pub struct LiveStats {
pub snapshot: StatsSnapshot,
pub average_piece_download_time: Option<Duration>,
pub download_speed: Speed,
pub upload_speed: Speed,
pub time_remaining: Option<DurationWithHumanReadable>,
}
@ -17,8 +18,9 @@ impl std::fmt::Display for LiveStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "down speed: {}", self.download_speed)?;
if let Some(time_remaining) = &self.time_remaining {
write!(f, " eta: {time_remaining}")?;
write!(f, ", eta: {time_remaining}")?;
}
write!(f, ", up speed: {}", self.upload_speed)?;
Ok(())
}
}
@ -26,13 +28,17 @@ impl std::fmt::Display for LiveStats {
impl From<&TorrentStateLive> for LiveStats {
fn from(live: &TorrentStateLive) -> Self {
let snapshot = live.stats_snapshot();
let estimator = live.speed_estimator();
let down_estimator = live.down_speed_estimator();
let up_estimator = live.up_speed_estimator();
Self {
average_piece_download_time: snapshot.average_piece_download_time(),
snapshot,
download_speed: estimator.download_mbps().into(),
time_remaining: estimator.time_remaining().map(DurationWithHumanReadable),
download_speed: down_estimator.mbps().into(),
upload_speed: up_estimator.mbps().into(),
time_remaining: down_estimator
.time_remaining()
.map(DurationWithHumanReadable),
}
}
}

File diff suppressed because one or more lines are too long

View file

@ -4,7 +4,7 @@
"src": "assets/logo.svg"
},
"index.html": {
"file": "assets/index-6d4556f3.js",
"file": "assets/index-713a95fc.js",
"isEntry": true,
"src": "index.html"
}

View file

@ -27,6 +27,11 @@ export interface ListTorrentsResponse {
torrents: Array<TorrentId>;
}
export interface Speed {
mbps: number;
human_readable: string;
}
// Interface for the Torrent Stats API response
export interface LiveTorrentStats {
snapshot: {
@ -52,10 +57,8 @@ export interface LiveTorrentStats {
secs: number;
nanos: number;
};
download_speed: {
mbps: number;
human_readable: string;
};
download_speed: Speed;
upload_speed: Speed;
all_time_download_speed: {
mbps: number;
human_readable: string;

View file

@ -185,6 +185,27 @@ const TorrentActions: React.FC<{
</Row>
}
const Speed: React.FC<{ statsResponse: TorrentStats }> = ({ statsResponse }) => {
switch (statsResponse.state) {
case STATE_PAUSED: return 'Paused';
case STATE_INITIALIZING: return 'Checking files';
case STATE_ERROR: return 'Error';
}
// Unknown state
if (statsResponse.state != 'live' || statsResponse.live === null) {
return statsResponse.state;
}
return <>
{!statsResponse.finished &&
<div className='download-speed'> {statsResponse.live.download_speed.human_readable}</div>}
<div className='upload-speed'>
{statsResponse.live.upload_speed.human_readable}
{statsResponse.live.snapshot.uploaded_bytes > 0 &&
<span> ({formatBytes(statsResponse.live.snapshot.uploaded_bytes)})</span>}</div>
</>
}
const TorrentRow: React.FC<{
id: number,
detailsResponse: TorrentDetails | null,
@ -208,19 +229,6 @@ const TorrentRow: React.FC<{
return `${peer_stats.live} / ${peer_stats.seen}`;
}
const formatDownloadSpeed = () => {
if (finished) {
return 'Completed';
}
switch (state) {
case STATE_PAUSED: return 'Paused';
case STATE_INITIALIZING: return 'Checking files';
case STATE_ERROR: return 'Error';
}
return statsResponse?.live?.download_speed.human_readable ?? "N/A";
}
let classNames = [];
if (error) {
@ -253,7 +261,9 @@ const TorrentRow: React.FC<{
animated={isAnimated}
variant={progressBarVariant} />
</Column>
<Column size={2} label="Down Speed">{formatDownloadSpeed()}</Column>
<Column size={2} label="Speed">
<Speed statsResponse={statsResponse} />
</Column>
<Column label="ETA">{getCompletionETA(statsResponse)}</Column>
<Column size={2} label="Peers">{formatPeersString()}</Column >
<Column label="Actions">

View file

@ -1,6 +1,6 @@
[package]
name = "librqbit-core"
version = "3.2.1"
version = "3.3.0"
edition = "2021"
description = "Important utilities used throughout librqbit useful for working with torrents."
license = "Apache-2.0"

View file

@ -8,14 +8,14 @@ use parking_lot::Mutex;
#[derive(Clone, Copy)]
struct ProgressSnapshot {
downloaded_bytes: u64,
progress_bytes: u64,
instant: Instant,
}
/// Estimates download speed in a sliding time window.
/// Estimates download/upload speed in a sliding time window.
pub struct SpeedEstimator {
latest_per_second_snapshots: Mutex<VecDeque<ProgressSnapshot>>,
download_bytes_per_second: AtomicU64,
bytes_per_second: AtomicU64,
time_remaining_millis: AtomicU64,
}
@ -24,7 +24,7 @@ impl SpeedEstimator {
assert!(window_seconds > 1);
Self {
latest_per_second_snapshots: Mutex::new(VecDeque::with_capacity(window_seconds)),
download_bytes_per_second: Default::default(),
bytes_per_second: Default::default(),
time_remaining_millis: Default::default(),
}
}
@ -37,20 +37,25 @@ impl SpeedEstimator {
Some(Duration::from_millis(tr))
}
pub fn download_bps(&self) -> u64 {
self.download_bytes_per_second.load(Ordering::Relaxed)
pub fn bps(&self) -> u64 {
self.bytes_per_second.load(Ordering::Relaxed)
}
pub fn download_mbps(&self) -> f64 {
self.download_bps() as f64 / 1024f64 / 1024f64
pub fn mbps(&self) -> f64 {
self.bps() as f64 / 1024f64 / 1024f64
}
pub fn add_snapshot(&self, downloaded_bytes: u64, remaining_bytes: u64, instant: Instant) {
pub fn add_snapshot(
&self,
progress_bytes: u64,
remaining_bytes: Option<u64>,
instant: Instant,
) {
let first = {
let mut g = self.latest_per_second_snapshots.lock();
let current = ProgressSnapshot {
downloaded_bytes,
progress_bytes,
instant,
};
@ -67,19 +72,18 @@ impl SpeedEstimator {
}
};
let downloaded_bytes_diff = downloaded_bytes - first.downloaded_bytes;
let downloaded_bytes_diff = progress_bytes - first.progress_bytes;
let elapsed = instant - first.instant;
let bps = downloaded_bytes_diff as f64 / elapsed.as_secs_f64();
let time_remaining_millis_rounded: u64 = if downloaded_bytes_diff > 0 {
let time_remaining_secs = remaining_bytes as f64 / bps;
let time_remaining_secs = remaining_bytes.unwrap_or_default() as f64 / bps;
(time_remaining_secs * 1000f64) as u64
} else {
0
};
self.time_remaining_millis
.store(time_remaining_millis_rounded, Ordering::Relaxed);
self.download_bytes_per_second
.store(bps as u64, Ordering::Relaxed);
self.bytes_per_second.store(bps as u64, Ordering::Relaxed);
}
}

View file

@ -1,6 +1,6 @@
[package]
name = "librqbit-peer-protocol"
version = "3.2.1"
version = "3.3.0"
edition = "2021"
description = "Protocol for working with torrent peers. Used in rqbit torrent client."
license = "Apache-2.0"
@ -23,6 +23,6 @@ byteorder = "1"
buffers = {path="../buffers", package="librqbit-buffers", version = "2.2.1"}
bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"}
clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
librqbit-core = {path="../librqbit_core", version = "3.2.1"}
librqbit-core = {path="../librqbit_core", version = "3.3.0"}
bitvec = "1"
anyhow = "1"

View file

@ -59,6 +59,13 @@ impl<ByteBuf: Eq + std::hash::Hash> ExtendedHandshake<ByteBuf> {
}
})
}
pub fn ut_metadata(&self) -> Option<u8>
where
ByteBuf: AsRef<[u8]>,
{
self.get_msgid(b"ut_metadata")
}
}
impl<ByteBuf> CloneToOwned for ExtendedHandshake<ByteBuf>

View file

@ -1,7 +1,6 @@
use bencode::bencode_serialize_to_writer;
use bencode::from_bytes;
use bencode::BencodeValue;
use buffers::ByteString;
use clone_to_owned::CloneToOwned;
use serde::{Deserialize, Serialize};
@ -41,7 +40,7 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage<ByteBuf
pub fn serialize(
&self,
out: &mut Vec<u8>,
extended_handshake: Option<&ExtendedHandshake<ByteString>>,
extended_handshake_ut_metadata: &dyn Fn() -> Option<u8>,
) -> anyhow::Result<()>
where
ByteBuf: AsRef<[u8]>,
@ -56,12 +55,9 @@ impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage<ByteBuf
bencode_serialize_to_writer(h, out)?;
}
ExtendedMessage::UtMetadata(u) => {
let h = extended_handshake.ok_or_else(|| {
let emsg_id = extended_handshake_ut_metadata().ok_or_else(|| {
anyhow::anyhow!("need peer's handshake to serialize ut_metadata")
})?;
let emsg_id = h
.get_msgid(b"ut_metadata")
.ok_or_else(|| anyhow::anyhow!("peer doesn't support ut_metadata"))?;
out.push(emsg_id);
u.serialize(out);
}

View file

@ -11,7 +11,7 @@ use clone_to_owned::CloneToOwned;
use librqbit_core::{constants::CHUNK_SIZE, id20::Id20, lengths::ChunkInfo};
use serde::{Deserialize, Serialize};
use self::extended::{handshake::ExtendedHandshake, ExtendedMessage};
use self::extended::ExtendedMessage;
const INTEGER_LEN: usize = 4;
const MSGID_LEN: usize = 1;
@ -258,7 +258,7 @@ where
pub fn serialize(
&self,
out: &mut Vec<u8>,
peer_extended_handshake: Option<&ExtendedHandshake<ByteString>>,
extended_handshake_ut_metadata: &dyn Fn() -> Option<u8>,
) -> anyhow::Result<usize> {
let (lp, msg_id) = self.len_prefix_and_msg_id();
@ -308,7 +308,7 @@ where
Ok(msg_len)
}
Message::Extended(e) => {
e.serialize(out, peer_extended_handshake)?;
e.serialize(out, extended_handshake_ut_metadata)?;
let msg_size = out.len();
// no fucking idea why +1, but I tweaked that for it all to match up
// with real messages.
@ -472,8 +472,8 @@ where
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Handshake<'a> {
pub pstr: &'a str,
pub struct Handshake<ByteBuf> {
pub pstr: ByteBuf,
pub reserved: [u8; 8],
pub info_hash: [u8; 20],
pub peer_id: [u8; 20],
@ -485,8 +485,8 @@ fn bopts() -> impl bincode::Options {
.with_big_endian()
}
impl<'a> Handshake<'a> {
pub fn new(info_hash: Id20, peer_id: Id20) -> Handshake<'static> {
impl Handshake<ByteBuf<'static>> {
pub fn new(info_hash: Id20, peer_id: Id20) -> Handshake<ByteBuf<'static>> {
debug_assert_eq!(PSTR_BT1.len(), 19);
let mut reserved: u64 = 0;
@ -496,19 +496,16 @@ impl<'a> Handshake<'a> {
BE::write_u64(&mut reserved_arr, reserved);
Handshake {
pstr: PSTR_BT1,
pstr: ByteBuf(PSTR_BT1.as_bytes()),
reserved: reserved_arr,
info_hash: info_hash.0,
peer_id: peer_id.0,
}
}
pub fn supports_extended(&self) -> bool {
self.reserved[5] & 0x10 > 0
}
fn bopts() -> impl bincode::Options {
bincode::DefaultOptions::new()
}
pub fn deserialize(b: &[u8]) -> Result<(Handshake<'_>, usize), MessageDeserializeError> {
pub fn deserialize(
b: &[u8],
) -> Result<(Handshake<ByteBuf<'_>>, usize), MessageDeserializeError> {
let pstr_len = *b
.first()
.ok_or(MessageDeserializeError::NotEnoughData(1, "handshake"))?;
@ -526,11 +523,40 @@ impl<'a> Handshake<'a> {
expected_len,
))
}
pub fn serialize(&self, buf: &mut Vec<u8>) {
}
impl<B> Handshake<B> {
pub fn supports_extended(&self) -> bool {
self.reserved[5] & 0x10 > 0
}
fn bopts() -> impl bincode::Options {
bincode::DefaultOptions::new()
}
pub fn serialize(&self, buf: &mut Vec<u8>)
where
B: Serialize,
{
Self::bopts().serialize_into(buf, &self).unwrap()
}
}
impl<B> CloneToOwned for Handshake<B>
where
B: CloneToOwned,
{
type Target = Handshake<<B as CloneToOwned>::Target>;
fn clone_to_owned(&self) -> Self::Target {
Handshake {
pstr: self.pstr.clone_to_owned(),
reserved: self.reserved,
info_hash: self.info_hash,
peer_id: self.peer_id,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct Request {
pub index: u32,
@ -550,6 +576,8 @@ impl Request {
#[cfg(test)]
mod tests {
use crate::extended::handshake::ExtendedHandshake;
use super::*;
#[test]
fn test_handshake_serialize() {
@ -568,7 +596,7 @@ mod tests {
fn test_extended_serialize() {
let msg = Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new()));
let mut out = Vec::new();
msg.serialize(&mut out, None).unwrap();
msg.serialize(&mut out, &|| None).unwrap();
dbg!(out);
}
@ -584,7 +612,7 @@ mod tests {
let (msg, size) = MessageBorrowed::deserialize(&buf).unwrap();
assert_eq!(size, buf.len());
let mut write_buf = Vec::new();
msg.serialize(&mut write_buf, None).unwrap();
msg.serialize(&mut write_buf, &|| None).unwrap();
if buf != write_buf {
{
use std::io::Write;

View file

@ -1,6 +1,6 @@
[package]
name = "rqbit"
version = "4.0.0"
version = "5.0.0-beta.0"
authors = ["Igor Katson <igor.katson@gmail.com>"]
edition = "2021"
description = "A bittorrent command line client and server."
@ -23,7 +23,7 @@ default-tls = ["librqbit/default-tls"]
rust-tls = ["librqbit/rust-tls"]
[dependencies]
librqbit = {path="../librqbit", default-features=false, version = "4.0.0"}
librqbit = {path="../librqbit", default-features=false, version = "5.0.0-beta.0"}
tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
console-subscriber = {version = "0.2", optional = true}
anyhow = "1"

View file

@ -71,6 +71,22 @@ struct Opts {
#[arg(short = 't', long)]
worker_threads: Option<usize>,
// Enable to listen on 0.0.0.0 on TCP for torrent requests.
#[arg(long = "disable-tcp-listen")]
disable_tcp_listen: bool,
/// The minimal port to listen for incoming connections.
#[arg(long = "tcp-min-port", default_value = "4240")]
tcp_listen_min_port: u16,
/// The maximal port to listen for incoming connections.
#[arg(long = "tcp-max-port", default_value = "4260")]
tcp_listen_max_port: u16,
/// If set, will try to publish the chosen port through upnp on your router.
#[arg(long = "disable-upnp")]
disable_upnp: bool,
#[command(subcommand)]
subcommand: SubCommand,
}
@ -132,6 +148,25 @@ struct DownloadOpts {
/// Exit the program once the torrents complete download.
#[arg(short = 'e', long)]
exit_on_finish: bool,
#[arg(long = "disable-trackers")]
disable_trackers: bool,
#[arg(long = "initial-peers")]
initial_peers: Option<InitialPeers>,
}
#[derive(Clone)]
struct InitialPeers(Vec<SocketAddr>);
impl From<&str> for InitialPeers {
fn from(s: &str) -> Self {
let mut v = Vec::new();
for addr in s.split(',') {
v.push(addr.parse().unwrap());
}
Self(v)
}
}
// server start
@ -303,6 +338,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
disable_dht: opts.disable_dht,
disable_dht_persistence: opts.disable_dht_persistence,
dht_config: None,
// This will be overriden by "server start" below if needed.
persistence: false,
persistence_filename: None,
peer_id: None,
@ -311,6 +347,12 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
read_write_timeout: Some(opts.peer_read_write_timeout),
..Default::default()
}),
listen_port_range: if !opts.disable_tcp_listen {
Some(opts.tcp_listen_min_port..opts.tcp_listen_max_port)
} else {
None
},
enable_upnp_port_forwarding: !opts.disable_upnp,
};
let stats_printer = |session: Arc<Session>| async move {
@ -335,7 +377,8 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
None => continue
};
let stats = handle.stats_snapshot();
let speed = handle.speed_estimator();
let down_speed = handle.down_speed_estimator();
let up_speed = handle.up_speed_estimator();
let total = stats.total_bytes;
let progress = stats.total_bytes - stats.remaining_bytes;
let downloaded_pct = if stats.remaining_bytes == 0 {
@ -343,20 +386,23 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
} else {
(progress as f64 / total as f64) * 100f64
};
let time_remaining = down_speed.time_remaining();
let eta = match &time_remaining {
Some(d) => format!(", ETA: {:?}", d),
None => String::new()
};
info!(
"[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}, dead: {}}}",
"[{}]: {:.2}% ({:.2} / {:.2}), ↓{:.2} MiB/s, ↑{:.2} MiB/s ({:.2}){}, {{live: {}, queued: {}, dead: {}}}",
idx,
downloaded_pct,
SF::new(progress),
speed.download_mbps(),
SF::new(stats.fetched_bytes),
SF::new(stats.remaining_bytes),
SF::new(total),
down_speed.mbps(),
up_speed.mbps(),
SF::new(stats.uploaded_bytes),
stats.peer_stats.live,
stats.peer_stats.connecting,
eta,
stats.peer_stats.live + stats.peer_stats.connecting,
stats.peer_stats.queued,
stats.peer_stats.seen,
stats.peer_stats.dead,
);
}
@ -371,6 +417,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
sopts.persistence = !start_opts.disable_persistence;
sopts.persistence_filename =
start_opts.persistence_filename.clone().map(PathBuf::from);
let session =
Session::new_with_opts(PathBuf::from(&start_opts.output_folder), sopts)
.await
@ -385,7 +432,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
http_api
.make_http_api_and_run(http_api_listen_addr, false)
.await
.context("error starting HTTP API")
.context("error running HTTP API")
}
},
SubCommand::Download(download_opts) => {
@ -401,6 +448,8 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
force_tracker_interval: opts.force_tracker_interval,
output_folder: download_opts.output_folder.clone(),
sub_folder: download_opts.sub_folder.clone(),
initial_peers: download_opts.initial_peers.clone().map(|p| p.0),
disable_trackers: download_opts.disable_trackers,
..Default::default()
};
let connect_to_existing = match client.validate_rqbit_server().await {

28
crates/upnp/Cargo.toml Normal file
View file

@ -0,0 +1,28 @@
[package]
name = "librqbit-upnp"
version = "0.1.0"
authors = ["Igor Katson <igor.katson@gmail.com>"]
edition = "2021"
description = "Library used by rqbit torrent client to lease port forwards on the router."
license = "Apache-2.0"
documentation = "https://docs.rs/librqbit-upnp"
repository = "https://github.com/ikatson/rqbit"
readme = "README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tracing = "0.1"
anyhow = "1"
reqwest = {version = "0.11"}
serde = {version = "1", features = ["derive"]}
serde-xml-rs = "0.6.0"
tokio = {version = "1"}
futures = "0.3"
url = "2"
async-recursion = "1"
network-interface = "1"
[dev-dependencies]
tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
tracing-subscriber = "0.3"

1
crates/upnp/README.md Symbolic link
View file

@ -0,0 +1 @@
../README.md

View file

@ -0,0 +1,24 @@
use librqbit_upnp::UpnpPortForwarder;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args: Vec<String> = std::env::args().collect();
if args.len() != 2 {
eprintln!("Usage: {} <port>", args[0]);
return Ok(());
}
let port: u16 = match args[1].parse() {
Ok(p) => p,
Err(_) => {
eprintln!("Invalid port number: {}", args[1]);
return Ok(());
}
};
let port_forwarder = UpnpPortForwarder::new(vec![port], None)?;
port_forwarder.run_forever().await;
Ok(())
}

480
crates/upnp/src/lib.rs Normal file
View file

@ -0,0 +1,480 @@
use anyhow::{bail, Context};
use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt};
use network_interface::NetworkInterfaceConfig;
use reqwest::Client;
use serde::Deserialize;
use serde_xml_rs::from_str;
use std::{
collections::{HashMap, HashSet},
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
time::Duration,
};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tracing::{debug, error, error_span, trace, warn, Instrument, Span};
use url::Url;
const SERVICE_TYPE_WAN_IP_CONNECTION: &str = "urn:schemas-upnp-org:service:WANIPConnection:1";
const SSDP_MULTICAST_IP: SocketAddr =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(239, 255, 255, 250), 1900));
const SSDP_SEARCH_REQUEST: &str = "M-SEARCH * HTTP/1.1\r\n\
Host: 239.255.255.250:1900\r\n\
Man: \"ssdp:discover\"\r\n\
MX: 3\r\n\
ST: upnp:rootdevice\r\n\
\r\n";
fn get_local_ip_relative_to(local_dest: Ipv4Addr) -> anyhow::Result<Ipv4Addr> {
// Ipv4Addr.to_bits() is only there in nightly rust, so copying here for now.
fn ip_bits(addr: Ipv4Addr) -> u32 {
u32::from_be_bytes(addr.octets())
}
fn masked(ip: Ipv4Addr, mask: Ipv4Addr) -> u32 {
ip_bits(ip) & ip_bits(mask)
}
let interfaces =
network_interface::NetworkInterface::show().context("error listing network interfaces")?;
for i in interfaces {
for addr in i.addr {
if let network_interface::Addr::V4(v4) = addr {
let ip = v4.ip;
let mask = match v4.netmask {
Some(mask) => mask,
None => continue,
};
trace!("found local addr {ip}/{mask}");
// If the masked address is the same, means we are on the same network, return
// the ip address
if masked(ip, mask) == masked(local_dest, mask) {
return Ok(ip);
}
}
}
}
bail!("couldn't find a local ip address")
}
async fn forward_port(
control_url: Url,
local_ip: Ipv4Addr,
port: u16,
lease_duration: Duration,
) -> anyhow::Result<()> {
let request_body = format!(
r#"
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:AddPortMapping xmlns:u="{}">
<NewRemoteHost></NewRemoteHost>
<NewExternalPort>{}</NewExternalPort>
<NewProtocol>TCP</NewProtocol>
<NewInternalPort>{}</NewInternalPort>
<NewInternalClient>{}</NewInternalClient>
<NewEnabled>1</NewEnabled>
<NewPortMappingDescription>rust UPnP</NewPortMappingDescription>
<NewLeaseDuration>{}</NewLeaseDuration>
</u:AddPortMapping>
</s:Body>
</s:Envelope>
"#,
SERVICE_TYPE_WAN_IP_CONNECTION,
port,
port,
local_ip,
lease_duration.as_secs()
);
let url = control_url;
let client = reqwest::Client::new();
let response = client
.post(url.clone())
.header("Content-Type", "text/xml")
.header(
"SOAPAction",
format!("\"{}#AddPortMapping\"", SERVICE_TYPE_WAN_IP_CONNECTION),
)
.body(request_body)
.send()
.await
.context("error sending")?;
let status = response.status();
let response_text = response
.text()
.await
.context("error reading response text")?;
trace!("AddPortMapping response: {} {}", status, response_text);
if !status.is_success() {
bail!("failed port forwarding: {}", status);
} else {
debug!("successfully port forwarded {}:{}", local_ip, port);
}
Ok(())
}
#[derive(Clone, Debug, Deserialize)]
struct RootDesc {
#[serde(rename = "device")]
devices: Vec<Device>,
}
#[derive(Default, Clone, Debug, Deserialize)]
pub struct DeviceList {
#[serde(rename = "device")]
devices: Vec<Device>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Device {
#[serde(rename = "deviceType")]
pub device_type: String,
#[serde(rename = "friendlyName", default)]
pub friendly_name: String,
#[serde(rename = "serviceList", default)]
pub service_list: ServiceList,
#[serde(rename = "deviceList", default)]
pub device_list: DeviceList,
}
impl Device {
pub fn iter_services(
&self,
parent: Span,
) -> Box<dyn Iterator<Item = (tracing::Span, &Service)> + '_> {
let self_span = self.span(parent);
let services = self.service_list.services.iter().map({
let self_span = self_span.clone();
move |s| (s.span(self_span.clone()), s)
});
Box::new(services.chain(self.device_list.devices.iter().flat_map({
let self_span = self_span.clone();
move |d| d.iter_services(self_span.clone())
})))
}
pub fn span(&self, parent: tracing::Span) -> tracing::Span {
error_span!(parent: parent, "device", name = self.name())
}
}
impl Device {
pub fn name(&self) -> &str {
if self.friendly_name.is_empty() {
return &self.device_type;
}
&self.friendly_name
}
}
#[derive(Clone, Debug, Deserialize, Default)]
pub struct ServiceList {
#[serde(rename = "service", default)]
pub services: Vec<Service>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct Service {
#[serde(rename = "serviceType")]
pub service_type: String,
#[serde(rename = "controlURL")]
pub control_url: String,
#[serde(rename = "SCPDURL")]
pub scpd_url: String,
}
impl Service {
pub fn span(&self, parent: tracing::Span) -> tracing::Span {
error_span!(parent: parent, "service", url = self.control_url)
}
}
#[derive(Debug)]
struct UpnpEndpoint {
discover_response: UpnpDiscoverResponse,
data: RootDesc,
}
impl UpnpEndpoint {
fn location(&self) -> &Url {
&self.discover_response.location
}
fn span(&self) -> tracing::Span {
error_span!("upnp_endpoint", location = %self.location())
}
fn iter_services(&self) -> impl Iterator<Item = (tracing::Span, &Service)> + '_ {
let self_span = self.span();
self.data
.devices
.iter()
.flat_map(move |d| d.iter_services(self_span.clone()))
}
fn my_local_ip(&self) -> anyhow::Result<Ipv4Addr> {
let dest_ipv4 = match self.discover_response.received_from {
SocketAddr::V4(v4) => *v4.ip(),
SocketAddr::V6(v6) => {
bail!("don't support IPv6, but remote ip is {}", v6.ip())
}
};
let local_ip = get_local_ip_relative_to(dest_ipv4)
.with_context(|| format!("can't determine local IP relative to {dest_ipv4}"))?;
Ok(local_ip)
}
fn get_wan_ip_control_urls(&self) -> impl Iterator<Item = (tracing::Span, Url)> + '_ {
self.iter_services()
.filter(|(_, s)| s.service_type == SERVICE_TYPE_WAN_IP_CONNECTION)
.map(|(span, s)| (span, self.discover_response.location.join(&s.control_url)))
.filter_map(|(span, url)| match url {
Ok(url) => Some((span, url)),
Err(e) => {
error!("bad control url: {e:#}");
None
}
})
}
}
#[derive(Debug)]
struct UpnpDiscoverResponse {
pub received_from: SocketAddr,
pub location: Url,
}
async fn discover_services(location: Url) -> anyhow::Result<RootDesc> {
let response = Client::new()
.get(location.clone())
.send()
.await
.context("failed to send GET request")?
.text()
.await
.context("failed to read response body")?;
trace!("received from {location}: {response}");
let root_desc: RootDesc = from_str(&response)
.context("failed to parse response body as xml")
.map_err(|e| {
error!("failed to parse this XML: {response}");
e
})?;
Ok(root_desc)
}
fn parse_upnp_discover_response(
response: &str,
received_from: SocketAddr,
) -> anyhow::Result<UpnpDiscoverResponse> {
let mut headers = HashMap::new();
for line in response.lines() {
if let Some((key, value)) = line.split_once(": ") {
headers.insert(key.to_lowercase(), value.trim_end().to_string());
}
}
let location = headers.get("location").context("missing location header")?;
let location =
Url::parse(location).with_context(|| format!("failed parsing location {location}"))?;
Ok(UpnpDiscoverResponse {
location,
received_from,
})
}
pub struct UpnpPortForwarderOptions {
pub lease_duration: Duration,
pub discover_interval: Duration,
pub discover_timeout: Duration,
}
impl Default for UpnpPortForwarderOptions {
fn default() -> Self {
Self {
discover_interval: Duration::from_secs(60),
discover_timeout: Duration::from_secs(10),
lease_duration: Duration::from_secs(60),
}
}
}
pub struct UpnpPortForwarder {
ports: Vec<u16>,
opts: UpnpPortForwarderOptions,
}
impl UpnpPortForwarder {
pub fn new(ports: Vec<u16>, opts: Option<UpnpPortForwarderOptions>) -> anyhow::Result<Self> {
if ports.is_empty() {
bail!("empty ports")
}
Ok(Self {
ports,
opts: opts.unwrap_or_default(),
})
}
async fn parse_endpoint(
&self,
discover_response: UpnpDiscoverResponse,
) -> anyhow::Result<UpnpEndpoint> {
let services = discover_services(discover_response.location.clone()).await?;
Ok(UpnpEndpoint {
discover_response,
data: services,
})
}
async fn discover_once(
&self,
tx: &UnboundedSender<UpnpDiscoverResponse>,
) -> anyhow::Result<()> {
let socket = tokio::net::UdpSocket::bind("0.0.0.0:0")
.await
.context("failed to bind UDP socket")?;
socket
.send_to(SSDP_SEARCH_REQUEST.as_bytes(), SSDP_MULTICAST_IP)
.await
.context("failed to send SSDP search request")?;
let mut buffer = [0; 2048];
let timeout = tokio::time::sleep(self.opts.discover_timeout);
let mut timed_out = false;
tokio::pin!(timeout);
let mut discovered = 0;
while !timed_out {
tokio::select! {
_ = &mut timeout, if !timed_out => {
timed_out = true;
}
Ok((len, addr)) = socket.recv_from(&mut buffer), if !timed_out => {
let response = match std::str::from_utf8(&buffer[..len]) {
Ok(response) => response,
Err(_) => {
warn!("received invalid utf-8 from {addr}");
continue;
},
};
trace!("received response from {addr}: {response}");
match parse_upnp_discover_response(response, addr) {
Ok(r) => {
tx.send(r)?;
discovered += 1;
},
Err(e) => warn!("failed to parse response: {e:#}"),
};
},
}
}
debug!("discovered {discovered} endpoints");
Ok(())
}
async fn discovery(&self, tx: UnboundedSender<UpnpDiscoverResponse>) -> anyhow::Result<()> {
let mut discover_interval = tokio::time::interval(self.opts.discover_interval);
loop {
discover_interval.tick().await;
if let Err(e) = self.discover_once(&tx).await {
warn!("failed to run discovery: {e:#}");
}
}
}
async fn manage_port(&self, control_url: Url, local_ip: Ipv4Addr, port: u16) -> ! {
let lease_duration = self.opts.lease_duration;
let mut interval = tokio::time::interval(lease_duration / 2);
loop {
interval.tick().await;
if let Err(e) = forward_port(control_url.clone(), local_ip, port, lease_duration).await
{
warn!("failed to forward port: {e:#}");
}
}
}
async fn manage_service(&self, control_url: Url, local_ip: Ipv4Addr) -> anyhow::Result<()> {
futures::future::join_all(self.ports.iter().cloned().map(|port| {
self.manage_port(control_url.clone(), local_ip, port)
.instrument(error_span!("manage_port", port = port))
}))
.await;
Ok(())
}
pub async fn run_forever(self) -> ! {
let (discover_tx, mut discover_rx) = unbounded_channel();
let discovery = self.discovery(discover_tx);
let mut spawned_tasks = HashSet::<Url>::new();
let mut endpoints = FuturesUnordered::new();
let mut service_managers = FuturesUnordered::new();
tokio::pin!(discovery);
loop {
tokio::select! {
_ = &mut discovery => {},
r = discover_rx.recv() => {
let r = r.unwrap();
let location = r.location.clone();
endpoints.push(self.parse_endpoint(r).map_err(|e| {
error!("failed to parse endpoint: {e:#}");
e
}).instrument(error_span!("parse endpoint", location=location.to_string())));
},
Some(Ok(endpoint)) = endpoints.next(), if !endpoints.is_empty() => {
let mut local_ip = None;
for (span, control_url) in endpoint.get_wan_ip_control_urls() {
if spawned_tasks.contains(&control_url) {
debug!("already spawned for {}", control_url);
continue;
}
let ip = match local_ip {
Some(ip) => ip,
None => {
match endpoint.my_local_ip() {
Ok(ip) => {
local_ip = Some(ip);
ip
},
Err(e) => {
warn!("failed to determine local IP for endpoint at {}: {:#}", endpoint.location(), e);
break;
}
}
}
};
spawned_tasks.insert(control_url.clone());
service_managers.push(self.manage_service(control_url, ip).instrument(span))
}
},
_ = service_managers.next(), if !service_managers.is_empty() => {
},
}
}
}
}
#[cfg(test)]
mod tests {
use serde_xml_rs::from_str;
use crate::RootDesc;
#[test]
fn test_parse() {
dbg!(from_str::<RootDesc>(include_str!("resources/test/devices-0.xml")).unwrap());
}
}

View file

@ -0,0 +1,77 @@
<root xmlns="urn:schemas-upnp-org:device-1-0">
<specVersion>
<major>1</major>
<minor>0</minor>
</specVersion>
<device>
<deviceType>urn:schemas-upnp-org:device:InternetGatewayDevice:1</deviceType>
<friendlyName>ARRIS TG3492LG</friendlyName>
<manufacturer>Arris Group, Inc</manufacturer>
<manufacturerURL>http://www.arris.com/</manufacturerURL>
<modelDescription>DOCSIS 3.1 Cable Modem Gateway Device</modelDescription>
<modelName>TG3492LG</modelName>
<modelNumber>TG3492LG</modelNumber>
<modelURL>http://www.arris.com</modelURL>
<serialNumber>ABAP02974423</serialNumber>
<UDN>uuid:ebf5a0a0-1dd1-11b2-a90f-acf8cc3de6b6</UDN>
<UPC>TG3492LG</UPC>
<serviceList>
<service>
<serviceType>urn:schemas-upnp-org:service:Layer3Forwarding:1</serviceType>
<serviceId>urn:upnp-org:serviceId:L3Forwarding1</serviceId>
<SCPDURL>/Layer3ForwardingSCPD.xml</SCPDURL>
<controlURL>/upnp/control/Layer3Forwarding</controlURL>
<eventSubURL>/upnp/event/Layer3Forwarding</eventSubURL>
</service>
</serviceList>
<deviceList>
<device>
<deviceType>urn:schemas-upnp-org:device:WANDevice:1</deviceType>
<friendlyName>WANDevice:1</friendlyName>
<manufacturer>Arris Group, Inc</manufacturer>
<manufacturerURL>http://www.arris.com/</manufacturerURL>
<modelDescription>DOCSIS 3.1 Cable Modem Gateway Device</modelDescription>
<modelName>TG3492LG</modelName>
<modelNumber>TG3492LG</modelNumber>
<modelURL>http://www.arris.com</modelURL>
<serialNumber>ABAP02974423</serialNumber>
<UDN>uuid:ebf5a0a0-1dd1-11b2-a92f-acf8cc3de6b6</UDN>
<UPC>TG3492LG</UPC>
<serviceList>
<service>
<serviceType>urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1</serviceType>
<serviceId>urn:upnp-org:serviceId:WANCommonIFC1</serviceId>
<SCPDURL>/WANCommonInterfaceConfigSCPD.xml</SCPDURL>
<controlURL>/upnp/control/WANCommonInterfaceConfig0</controlURL>
<eventSubURL>/upnp/event/WANCommonInterfaceConfig0</eventSubURL>
</service>
</serviceList>
<deviceList>
<device>
<deviceType>urn:schemas-upnp-org:device:WANConnectionDevice:1</deviceType>
<friendlyName>WANConnectionDevice:1</friendlyName>
<manufacturer>Arris Group, Inc</manufacturer>
<manufacturerURL>http://www.arris.com/</manufacturerURL>
<modelDescription>DOCSIS 3.1 Cable Modem Gateway Device</modelDescription>
<modelName>TG3492LG</modelName>
<modelNumber>TG3492LG</modelNumber>
<modelURL>http://www.arris.com</modelURL>
<serialNumber>ABAP02974423</serialNumber>
<UDN>uuid:ebf5a0a0-1dd1-11b2-a93f-acf8cc3de6b6</UDN>
<UPC>TG3492LG</UPC>
<serviceList>
<service>
<serviceType>urn:schemas-upnp-org:service:WANIPConnection:1</serviceType>
<serviceId>urn:upnp-org:serviceId:WANIPConn1</serviceId>
<SCPDURL>/WANIPConnectionServiceSCPD.xml</SCPDURL>
<controlURL>/upnp/control/WANIPConnection0</controlURL>
<eventSubURL>/upnp/event/WANIPConnection0</eventSubURL>
</service>
</serviceList>
</device>
</deviceList>
</device>
</deviceList>
<presentationURL>http://192.168.0.1/</presentationURL>
</device>
</root>