From a6ebecee97857193cbfdddf158b37a6c0ada7217 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 1 Mar 2024 07:54:27 +0000 Subject: [PATCH] Report size to trackers --- crates/librqbit/src/lib.rs | 2 +- crates/librqbit/src/session.rs | 56 ++++++++++-- crates/librqbit/src/torrent_state/mod.rs | 17 ++-- crates/librqbit/src/torrent_state/stats.rs | 26 +++++- crates/rqbit/src/main.rs | 4 +- crates/tracker_comms/src/tracker_comms.rs | 29 ++++++- crates/tracker_comms/src/tracker_comms_udp.rs | 6 +- desktop/src-tauri/Cargo.lock | 85 ++++++++++++++----- 8 files changed, 183 insertions(+), 42 deletions(-) diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index d16c427..4c59346 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -47,7 +47,7 @@ pub use session::{ SUPPORTED_SCHEMES, }; pub use spawn_utils::spawn as librqbit_spawn; -pub use torrent_state::{ManagedTorrent, ManagedTorrentState}; +pub use torrent_state::{ManagedTorrent, ManagedTorrentState, TorrentStats, TorrentStatsState}; pub use buffers::*; pub use clone_to_owned::CloneToOwned; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 1ebc426..98ef416 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -783,7 +783,7 @@ impl Session { /// Add a torrent to the session. #[inline(never)] pub fn add_torrent<'a>( - &'a self, + self: &'a Arc, add: AddTorrent<'a>, opts: Option, ) -> BoxFuture<'a, anyhow::Result> { @@ -1077,7 +1077,7 @@ impl Session { // Get a peer stream from both DHT and trackers. fn make_peer_rx( - &self, + self: &Arc, info_hash: Id20, trackers: Vec, announce_port: Option, @@ -1089,12 +1089,16 @@ impl Session { .as_ref() .map(|dht| dht.get_peers(info_hash, announce_port)) .transpose()?; + + let peer_rx_stats = PeerRxTorrentInfo { + info_hash, + session: self.clone(), + }; let peer_rx = TrackerComms::start( info_hash, self.peer_id, trackers, - // TODO: report actual bytes, not zeroes. - Box::new(()), + Box::new(peer_rx_stats), force_tracker_interval, announce_port, ); @@ -1108,7 +1112,7 @@ impl Session { } } - pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { + pub fn unpause(self: &Arc, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { let peer_rx = self.make_peer_rx( handle.info_hash(), handle.info().trackers.clone().into_iter().collect(), @@ -1124,3 +1128,45 @@ impl Session { Ok(()) } } + +// Ad adapter for converting stats into the format that tracker_comms accepts. +struct PeerRxTorrentInfo { + info_hash: Id20, + session: Arc, +} + +impl tracker_comms::TorrentStatsProvider for PeerRxTorrentInfo { + fn get(&self) -> tracker_comms::TrackerCommsStats { + let mt = self.session.with_torrents(|torrents| { + for (_, mt) in torrents { + if mt.info_hash() == self.info_hash { + return Some(mt.clone()); + } + } + None + }); + let mt = match mt { + Some(mt) => mt, + None => { + warn!(info_hash=?self.info_hash, "can't find torrent in the session"); + return Default::default(); + } + }; + let stats = mt.stats(); + + use crate::torrent_state::stats::TorrentStatsState as TS; + use tracker_comms::TrackerCommsStatsState as S; + + tracker_comms::TrackerCommsStats { + downloaded_bytes: stats.progress_bytes, + total_bytes: stats.total_bytes, + uploaded_bytes: stats.uploaded_bytes, + torrent_state: match stats.state { + TS::Initializing => S::Initializing, + TS::Live => S::Live, + TS::Paused => S::Paused, + TS::Error => S::None, + }, + } + } +} diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index a3fe61f..8678abd 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -42,7 +42,7 @@ use crate::type_aliases::PeerStream; use initializing::TorrentStateInitializing; use self::paused::TorrentStatePaused; -use self::stats::TorrentStats; +pub use self::stats::{TorrentStats, TorrentStatsState}; pub enum ManagedTorrentState { Initializing(Arc), @@ -351,11 +351,13 @@ impl ManagedTorrent { /// Get stats. pub fn stats(&self) -> TorrentStats { + use stats::TorrentStatsState as S; let mut resp = TorrentStats { total_bytes: self.info().lengths.total_length(), - state: "", + state: S::Error, error: None, progress_bytes: 0, + uploaded_bytes: 0, finished: false, live: None, }; @@ -363,17 +365,17 @@ impl ManagedTorrent { self.with_state(|s| { match s { ManagedTorrentState::Initializing(i) => { - resp.state = "initializing"; + resp.state = S::Initializing; resp.progress_bytes = i.checked_bytes.load(Ordering::Relaxed); } ManagedTorrentState::Paused(p) => { - resp.state = "paused"; + resp.state = S::Paused; resp.total_bytes = p.chunk_tracker.get_total_selected_bytes(); resp.progress_bytes = resp.total_bytes - p.needed_bytes; resp.finished = resp.progress_bytes == resp.total_bytes; } ManagedTorrentState::Live(l) => { - resp.state = "live"; + resp.state = S::Live; let live_stats = LiveStats::from(l.as_ref()); let total = l.get_total_selected_bytes(); let remaining = l.get_left_to_download_bytes(); @@ -382,14 +384,15 @@ impl ManagedTorrent { resp.progress_bytes = progress; resp.total_bytes = total; resp.finished = remaining == 0; + resp.uploaded_bytes = l.get_uploaded_bytes(); resp.live = Some(live_stats); } ManagedTorrentState::Error(e) => { - resp.state = "error"; + resp.state = S::Error; resp.error = Some(format!("{:?}", e)) } ManagedTorrentState::None => { - resp.state = "error"; + resp.state = S::Error; resp.error = Some("bug: torrent in broken \"None\" state".to_string()); } } diff --git a/crates/librqbit/src/torrent_state/stats.rs b/crates/librqbit/src/torrent_state/stats.rs index 222dba9..0b85e59 100644 --- a/crates/librqbit/src/torrent_state/stats.rs +++ b/crates/librqbit/src/torrent_state/stats.rs @@ -43,11 +43,35 @@ impl From<&TorrentStateLive> for LiveStats { } } +#[derive(Clone, Copy, Serialize, Debug)] +pub enum TorrentStatsState { + #[serde(rename = "initializing")] + Initializing, + #[serde(rename = "live")] + Live, + #[serde(rename = "paused")] + Paused, + #[serde(rename = "error")] + Error, +} + +impl std::fmt::Display for TorrentStatsState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TorrentStatsState::Initializing => f.write_str("initializing"), + TorrentStatsState::Live => f.write_str("live"), + TorrentStatsState::Paused => f.write_str("paused"), + TorrentStatsState::Error => f.write_str("error"), + } + } +} + #[derive(Serialize, Debug)] pub struct TorrentStats { - pub state: &'static str, + pub state: TorrentStatsState, pub error: Option, pub progress_bytes: u64, + pub uploaded_bytes: u64, pub total_bytes: u64, pub finished: bool, pub live: Option, diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 4fdfb1d..2242e8e 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -8,7 +8,7 @@ use librqbit::{ http_api_client, librqbit_spawn, tracing_subscriber_config_utils::{init_logging, InitLoggingOptions}, AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ListOnlyResponse, - PeerConnectionOptions, Session, SessionOptions, + PeerConnectionOptions, Session, SessionOptions, TorrentStatsState, }; use size_format::SizeFormatterBinary as SF; use tracing::{error, error_span, info, trace_span, warn}; @@ -277,7 +277,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { session.with_torrents(|torrents| { for (idx, torrent) in torrents { let stats = torrent.stats(); - if stats.state == "initializing" { + if let TorrentStatsState::Initializing = stats.state { let total = stats.total_bytes; let progress = stats.progress_bytes; let pct = (progress as f64 / total as f64) * 100f64; diff --git a/crates/tracker_comms/src/tracker_comms.rs b/crates/tracker_comms/src/tracker_comms.rs index 3816131..5ab2554 100644 --- a/crates/tracker_comms/src/tracker_comms.rs +++ b/crates/tracker_comms/src/tracker_comms.rs @@ -28,11 +28,21 @@ pub struct TrackerComms { tcp_listen_port: Option, } +#[derive(Default)] +pub enum TrackerCommsStatsState { + #[default] + None, + Initializing, + Paused, + Live, +} + #[derive(Default)] pub struct TrackerCommsStats { pub uploaded_bytes: u64, pub downloaded_bytes: u64, pub total_bytes: u64, + pub torrent_state: TrackerCommsStatsState, } impl TrackerCommsStats { @@ -44,6 +54,10 @@ impl TrackerCommsStats { } 0 } + + pub fn is_completed(&self) -> bool { + self.downloaded_bytes >= self.total_bytes + } } pub trait TorrentStatsProvider: Send + Sync { @@ -164,7 +178,7 @@ impl TrackerComms { let request = tracker_comms_http::TrackerRequest { info_hash: self.info_hash, peer_id: self.peer_id, - port: 6778, + port: self.tcp_listen_port.unwrap_or(0), uploaded: stats.uploaded_bytes, downloaded: stats.downloaded_bytes, left: stats.get_left_to_download_bytes(), @@ -249,7 +263,18 @@ impl TrackerComms { downloaded: stats.downloaded_bytes, left: stats.get_left_to_download_bytes(), uploaded: stats.uploaded_bytes, - event: EVENT_NONE, + event: match stats.torrent_state { + TrackerCommsStatsState::None => EVENT_NONE, + TrackerCommsStatsState::Initializing => EVENT_STARTED, + TrackerCommsStatsState::Paused => EVENT_STOPPED, + TrackerCommsStatsState::Live => { + if stats.is_completed() { + EVENT_COMPLETED + } else { + EVENT_STARTED + } + } + }, key: 0, // whatever that is? port: self.tcp_listen_port.unwrap_or(0), }; diff --git a/crates/tracker_comms/src/tracker_comms_udp.rs b/crates/tracker_comms/src/tracker_comms_udp.rs index 35af733..b62127c 100644 --- a/crates/tracker_comms/src/tracker_comms_udp.rs +++ b/crates/tracker_comms/src/tracker_comms_udp.rs @@ -12,9 +12,9 @@ const ACTION_ANNOUNCE: u32 = 1; // const ACTION_ERROR: u32 = 3; pub const EVENT_NONE: u32 = 0; -// pub const EVENT_COMPLETED: u32 = 1; -// pub const EVENT_STARTED: u32 = 2; -// pub const EVENT_STOPPED: u32 = 3; +pub const EVENT_COMPLETED: u32 = 1; +pub const EVENT_STARTED: u32 = 2; +pub const EVENT_STOPPED: u32 = 3; pub type ConnectionId = u64; const CONNECTION_ID_MAGIC: ConnectionId = 0x41727101980; diff --git a/desktop/src-tauri/Cargo.lock b/desktop/src-tauri/Cargo.lock index 12ac960..f49fc67 100644 --- a/desktop/src-tauri/Cargo.lock +++ b/desktop/src-tauri/Cargo.lock @@ -73,6 +73,28 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "async-trait" version = "0.1.74" @@ -116,9 +138,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.7.1" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "810a80b128d70e6ed2bdf3fe8ed72c0ae56f5f5948d01c2753282dd92a84fce8" +checksum = "1236b4b292f6c4d6dc34604bb5120d85c3fe1d1aa596bd5cc52ca054d13e7b9e" dependencies = [ "async-trait", "axum-core", @@ -127,7 +149,7 @@ dependencies = [ "http 1.0.0", "http-body 1.0.0", "http-body-util", - "hyper 1.0.1", + "hyper 1.2.0", "hyper-util", "itoa 1.0.9", "matchit", @@ -145,13 +167,14 @@ dependencies = [ "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] name = "axum-core" -version = "0.4.0" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de0ddc355eab88f4955090a823715df47acf0b7660aab7a69ad5ce6301ee3b73" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" dependencies = [ "async-trait", "bytes", @@ -165,6 +188,7 @@ dependencies = [ "sync_wrapper", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -1357,9 +1381,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1d308f63daf4181410c242d34c11f928dcb3aa105852019e043c9d1f4e4368a" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" dependencies = [ "bytes", "fnv", @@ -1547,20 +1571,21 @@ dependencies = [ [[package]] name = "hyper" -version = "1.0.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "403f9214f3e703236b221f1a9cd88ec8b4adfa5296de01ab96216361f4692f56" +checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.0", + "h2 0.4.2", "http 1.0.0", "http-body 1.0.0", "httparse", "httpdate", "itoa 1.0.9", "pin-project-lite", + "smallvec", "tokio", ] @@ -1579,22 +1604,18 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.1" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca339002caeb0d159cc6e023dff48e199f081e42fa039895c7c6f38b37f2e9d" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" dependencies = [ "bytes", - "futures-channel", "futures-util", "http 1.0.0", "http-body 1.0.0", - "hyper 1.0.1", + "hyper 1.2.0", "pin-project-lite", "socket2 0.5.5", "tokio", - "tower", - "tower-service", - "tracing", ] [[package]] @@ -1867,9 +1888,10 @@ dependencies = [ [[package]] name = "librqbit" -version = "5.4.1" +version = "5.5.1" dependencies = [ "anyhow", + "async-stream", "axum", "backoff", "base64 0.21.5", @@ -1889,6 +1911,7 @@ dependencies = [ "librqbit-dht", "librqbit-peer-protocol", "librqbit-sha1-wrapper", + "librqbit-tracker-comms", "librqbit-upnp", "parking_lot", "rand 0.8.5", @@ -2001,6 +2024,26 @@ dependencies = [ "crypto-hash", ] +[[package]] +name = "librqbit-tracker-comms" +version = "1.0.0" +dependencies = [ + "anyhow", + "async-stream", + "byteorder", + "futures", + "librqbit-bencode", + "librqbit-buffers", + "librqbit-core", + "rand 0.8.5", + "reqwest", + "serde", + "tokio", + "tracing", + "url", + "urlencoding", +] + [[package]] name = "librqbit-upnp" version = "0.1.0" @@ -3022,7 +3065,7 @@ dependencies = [ [[package]] name = "rqbit-desktop" -version = "5.4.1" +version = "5.5.1" dependencies = [ "anyhow", "base64 0.21.5", @@ -3367,9 +3410,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.2" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" [[package]] name = "socket2"