Report size to trackers

This commit is contained in:
Igor Katson 2024-03-01 07:54:27 +00:00
parent 5488e1d40f
commit a6ebecee97
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
8 changed files with 183 additions and 42 deletions

View file

@ -47,7 +47,7 @@ pub use session::{
SUPPORTED_SCHEMES,
};
pub use spawn_utils::spawn as librqbit_spawn;
pub use torrent_state::{ManagedTorrent, ManagedTorrentState};
pub use torrent_state::{ManagedTorrent, ManagedTorrentState, TorrentStats, TorrentStatsState};
pub use buffers::*;
pub use clone_to_owned::CloneToOwned;

View file

@ -783,7 +783,7 @@ impl Session {
/// Add a torrent to the session.
#[inline(never)]
pub fn add_torrent<'a>(
&'a self,
self: &'a Arc<Self>,
add: AddTorrent<'a>,
opts: Option<AddTorrentOptions>,
) -> BoxFuture<'a, anyhow::Result<AddTorrentResponse>> {
@ -1077,7 +1077,7 @@ impl Session {
// Get a peer stream from both DHT and trackers.
fn make_peer_rx(
&self,
self: &Arc<Self>,
info_hash: Id20,
trackers: Vec<String>,
announce_port: Option<u16>,
@ -1089,12 +1089,16 @@ impl Session {
.as_ref()
.map(|dht| dht.get_peers(info_hash, announce_port))
.transpose()?;
let peer_rx_stats = PeerRxTorrentInfo {
info_hash,
session: self.clone(),
};
let peer_rx = TrackerComms::start(
info_hash,
self.peer_id,
trackers,
// TODO: report actual bytes, not zeroes.
Box::new(()),
Box::new(peer_rx_stats),
force_tracker_interval,
announce_port,
);
@ -1108,7 +1112,7 @@ impl Session {
}
}
pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
pub fn unpause(self: &Arc<Self>, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
let peer_rx = self.make_peer_rx(
handle.info_hash(),
handle.info().trackers.clone().into_iter().collect(),
@ -1124,3 +1128,45 @@ impl Session {
Ok(())
}
}
// Ad adapter for converting stats into the format that tracker_comms accepts.
struct PeerRxTorrentInfo {
info_hash: Id20,
session: Arc<Session>,
}
impl tracker_comms::TorrentStatsProvider for PeerRxTorrentInfo {
fn get(&self) -> tracker_comms::TrackerCommsStats {
let mt = self.session.with_torrents(|torrents| {
for (_, mt) in torrents {
if mt.info_hash() == self.info_hash {
return Some(mt.clone());
}
}
None
});
let mt = match mt {
Some(mt) => mt,
None => {
warn!(info_hash=?self.info_hash, "can't find torrent in the session");
return Default::default();
}
};
let stats = mt.stats();
use crate::torrent_state::stats::TorrentStatsState as TS;
use tracker_comms::TrackerCommsStatsState as S;
tracker_comms::TrackerCommsStats {
downloaded_bytes: stats.progress_bytes,
total_bytes: stats.total_bytes,
uploaded_bytes: stats.uploaded_bytes,
torrent_state: match stats.state {
TS::Initializing => S::Initializing,
TS::Live => S::Live,
TS::Paused => S::Paused,
TS::Error => S::None,
},
}
}
}

View file

@ -42,7 +42,7 @@ use crate::type_aliases::PeerStream;
use initializing::TorrentStateInitializing;
use self::paused::TorrentStatePaused;
use self::stats::TorrentStats;
pub use self::stats::{TorrentStats, TorrentStatsState};
pub enum ManagedTorrentState {
Initializing(Arc<TorrentStateInitializing>),
@ -351,11 +351,13 @@ impl ManagedTorrent {
/// Get stats.
pub fn stats(&self) -> TorrentStats {
use stats::TorrentStatsState as S;
let mut resp = TorrentStats {
total_bytes: self.info().lengths.total_length(),
state: "",
state: S::Error,
error: None,
progress_bytes: 0,
uploaded_bytes: 0,
finished: false,
live: None,
};
@ -363,17 +365,17 @@ impl ManagedTorrent {
self.with_state(|s| {
match s {
ManagedTorrentState::Initializing(i) => {
resp.state = "initializing";
resp.state = S::Initializing;
resp.progress_bytes = i.checked_bytes.load(Ordering::Relaxed);
}
ManagedTorrentState::Paused(p) => {
resp.state = "paused";
resp.state = S::Paused;
resp.total_bytes = p.chunk_tracker.get_total_selected_bytes();
resp.progress_bytes = resp.total_bytes - p.needed_bytes;
resp.finished = resp.progress_bytes == resp.total_bytes;
}
ManagedTorrentState::Live(l) => {
resp.state = "live";
resp.state = S::Live;
let live_stats = LiveStats::from(l.as_ref());
let total = l.get_total_selected_bytes();
let remaining = l.get_left_to_download_bytes();
@ -382,14 +384,15 @@ impl ManagedTorrent {
resp.progress_bytes = progress;
resp.total_bytes = total;
resp.finished = remaining == 0;
resp.uploaded_bytes = l.get_uploaded_bytes();
resp.live = Some(live_stats);
}
ManagedTorrentState::Error(e) => {
resp.state = "error";
resp.state = S::Error;
resp.error = Some(format!("{:?}", e))
}
ManagedTorrentState::None => {
resp.state = "error";
resp.state = S::Error;
resp.error = Some("bug: torrent in broken \"None\" state".to_string());
}
}

View file

@ -43,11 +43,35 @@ impl From<&TorrentStateLive> for LiveStats {
}
}
#[derive(Clone, Copy, Serialize, Debug)]
pub enum TorrentStatsState {
#[serde(rename = "initializing")]
Initializing,
#[serde(rename = "live")]
Live,
#[serde(rename = "paused")]
Paused,
#[serde(rename = "error")]
Error,
}
impl std::fmt::Display for TorrentStatsState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TorrentStatsState::Initializing => f.write_str("initializing"),
TorrentStatsState::Live => f.write_str("live"),
TorrentStatsState::Paused => f.write_str("paused"),
TorrentStatsState::Error => f.write_str("error"),
}
}
}
#[derive(Serialize, Debug)]
pub struct TorrentStats {
pub state: &'static str,
pub state: TorrentStatsState,
pub error: Option<String>,
pub progress_bytes: u64,
pub uploaded_bytes: u64,
pub total_bytes: u64,
pub finished: bool,
pub live: Option<LiveStats>,

View file

@ -8,7 +8,7 @@ use librqbit::{
http_api_client, librqbit_spawn,
tracing_subscriber_config_utils::{init_logging, InitLoggingOptions},
AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ListOnlyResponse,
PeerConnectionOptions, Session, SessionOptions,
PeerConnectionOptions, Session, SessionOptions, TorrentStatsState,
};
use size_format::SizeFormatterBinary as SF;
use tracing::{error, error_span, info, trace_span, warn};
@ -277,7 +277,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
session.with_torrents(|torrents| {
for (idx, torrent) in torrents {
let stats = torrent.stats();
if stats.state == "initializing" {
if let TorrentStatsState::Initializing = stats.state {
let total = stats.total_bytes;
let progress = stats.progress_bytes;
let pct = (progress as f64 / total as f64) * 100f64;

View file

@ -28,11 +28,21 @@ pub struct TrackerComms {
tcp_listen_port: Option<u16>,
}
#[derive(Default)]
pub enum TrackerCommsStatsState {
#[default]
None,
Initializing,
Paused,
Live,
}
#[derive(Default)]
pub struct TrackerCommsStats {
pub uploaded_bytes: u64,
pub downloaded_bytes: u64,
pub total_bytes: u64,
pub torrent_state: TrackerCommsStatsState,
}
impl TrackerCommsStats {
@ -44,6 +54,10 @@ impl TrackerCommsStats {
}
0
}
pub fn is_completed(&self) -> bool {
self.downloaded_bytes >= self.total_bytes
}
}
pub trait TorrentStatsProvider: Send + Sync {
@ -164,7 +178,7 @@ impl TrackerComms {
let request = tracker_comms_http::TrackerRequest {
info_hash: self.info_hash,
peer_id: self.peer_id,
port: 6778,
port: self.tcp_listen_port.unwrap_or(0),
uploaded: stats.uploaded_bytes,
downloaded: stats.downloaded_bytes,
left: stats.get_left_to_download_bytes(),
@ -249,7 +263,18 @@ impl TrackerComms {
downloaded: stats.downloaded_bytes,
left: stats.get_left_to_download_bytes(),
uploaded: stats.uploaded_bytes,
event: EVENT_NONE,
event: match stats.torrent_state {
TrackerCommsStatsState::None => EVENT_NONE,
TrackerCommsStatsState::Initializing => EVENT_STARTED,
TrackerCommsStatsState::Paused => EVENT_STOPPED,
TrackerCommsStatsState::Live => {
if stats.is_completed() {
EVENT_COMPLETED
} else {
EVENT_STARTED
}
}
},
key: 0, // whatever that is?
port: self.tcp_listen_port.unwrap_or(0),
};

View file

@ -12,9 +12,9 @@ const ACTION_ANNOUNCE: u32 = 1;
// const ACTION_ERROR: u32 = 3;
pub const EVENT_NONE: u32 = 0;
// pub const EVENT_COMPLETED: u32 = 1;
// pub const EVENT_STARTED: u32 = 2;
// pub const EVENT_STOPPED: u32 = 3;
pub const EVENT_COMPLETED: u32 = 1;
pub const EVENT_STARTED: u32 = 2;
pub const EVENT_STOPPED: u32 = 3;
pub type ConnectionId = u64;
const CONNECTION_ID_MAGIC: ConnectionId = 0x41727101980;