Add tracker refresh interval parameter

This commit is contained in:
Igor Katson 2021-06-29 00:17:10 +01:00
parent c2affa8865
commit 2fc225cfa2
3 changed files with 66 additions and 49 deletions

View file

@ -1,7 +1,6 @@
use std::{
collections::HashSet,
fs::{File, OpenOptions},
net::SocketAddr,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU64, Ordering},
@ -21,7 +20,6 @@ use crate::{
chunk_tracker::ChunkTracker,
file_ops::FileOps,
lengths::Lengths,
peer_connection::{PeerConnection, WriterRequest},
spawn_utils::spawn,
torrent_metainfo::TorrentMetaV1Owned,
torrent_state::{AtomicStats, TorrentState, TorrentStateLocked},
@ -32,6 +30,7 @@ pub struct TorrentManagerBuilder {
overwrite: bool,
output_folder: PathBuf,
only_files: Option<Vec<usize>>,
force_tracker_interval: Option<Duration>,
}
impl TorrentManagerBuilder {
@ -41,6 +40,7 @@ impl TorrentManagerBuilder {
overwrite: false,
output_folder: output_folder.as_ref().into(),
only_files: None,
force_tracker_interval: None,
}
}
@ -54,12 +54,18 @@ impl TorrentManagerBuilder {
self
}
pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self {
self.force_tracker_interval = Some(force_tracker_interval);
self
}
pub async fn start_manager(self) -> anyhow::Result<TorrentManagerHandle> {
TorrentManager::start(
self.torrent,
self.output_folder,
self.overwrite,
self.only_files,
self.force_tracker_interval,
)
}
}
@ -82,7 +88,8 @@ impl TorrentManagerHandle {
#[derive(Clone)]
struct TorrentManager {
inner: Arc<TorrentState>,
state: Arc<TorrentState>,
force_tracker_interval: Option<Duration>,
}
fn generate_peer_id() -> [u8; 20] {
@ -103,6 +110,7 @@ impl TorrentManager {
out: P,
overwrite: bool,
only_files: Option<Vec<usize>>,
force_tracker_interval: Option<Duration>,
) -> anyhow::Result<TorrentManagerHandle> {
let files = {
let mut files =
@ -160,7 +168,7 @@ impl TorrentManager {
);
let mgr = Self {
inner: Arc::new(TorrentState {
state: Arc::new(TorrentState {
info_hash: torrent.info_hash,
torrent,
peer_id,
@ -178,6 +186,7 @@ impl TorrentManager {
needed: initial_check_results.needed_bytes,
lengths,
}),
force_tracker_interval,
};
spawn("tracker monitor", mgr.clone().task_tracker_monitor());
@ -187,18 +196,18 @@ impl TorrentManager {
async fn stats_printer(self) -> anyhow::Result<()> {
loop {
let live_peer_stats = self.inner.locked.read().peers.stats();
let seen_peers_count = self.inner.locked.read().peers.seen().len();
let have = self.inner.stats.have.load(Ordering::Relaxed);
let fetched = self.inner.stats.fetched_bytes.load(Ordering::Relaxed);
let needed = self.inner.needed;
let live_peer_stats = self.state.locked.read().peers.stats();
let seen_peers_count = self.state.locked.read().peers.seen().len();
let have = self.state.stats.have.load(Ordering::Relaxed);
let fetched = self.state.stats.fetched_bytes.load(Ordering::Relaxed);
let needed = self.state.needed;
let downloaded = self
.inner
.state
.stats
.downloaded_and_checked
.load(Ordering::Relaxed);
let remaining = needed - downloaded;
let uploaded = self.inner.stats.uploaded.load(Ordering::Relaxed);
let uploaded = self.state.stats.uploaded.load(Ordering::Relaxed);
let downloaded_pct = if downloaded == needed {
100f64
} else {
@ -229,7 +238,7 @@ impl TorrentManager {
let url = Url::parse(url).context("error parsing tracker URL")?;
Ok(url)
};
for tracker in self.inner.torrent.iter_announce() {
for tracker in self.state.torrent.iter_announce() {
if seen_trackers.contains(&tracker) {
continue;
}
@ -258,7 +267,7 @@ impl TorrentManager {
let response = crate::serde_bencode::from_bytes::<CompactTrackerResponse>(&bytes)?;
for peer in response.peers.iter_sockaddrs() {
self.add_peer(peer);
self.state.add_peer(peer);
}
Ok(response.interval)
}
@ -267,12 +276,12 @@ impl TorrentManager {
let mut event = Some(TrackerRequestEvent::Started);
loop {
let request = TrackerRequest {
info_hash: self.inner.torrent.info_hash,
peer_id: self.inner.peer_id,
info_hash: self.state.torrent.info_hash,
peer_id: self.state.peer_id,
port: 6778,
uploaded: self.inner.get_uploaded(),
downloaded: self.inner.get_downloaded(),
left: self.inner.get_left_to_download(),
uploaded: self.state.get_uploaded(),
downloaded: self.state.get_downloaded(),
left: self.state.get_left_to_download(),
compact: true,
no_peer_id: false,
event,
@ -289,14 +298,15 @@ impl TorrentManager {
match this.tracker_one_request(tracker_url.clone()).await {
Ok(interval) => {
event = None;
let interval = 30;
let duration = Duration::from_secs(interval);
let interval = self
.force_tracker_interval
.unwrap_or(Duration::from_secs(interval));
debug!(
"sleeping for {:?} after calling tracker {}",
duration,
interval,
tracker_url.host().unwrap()
);
tokio::time::sleep(duration).await;
tokio::time::sleep(interval).await;
}
Err(e) => {
debug!("error calling the tracker {}: {:#}", tracker_url, e);
@ -305,27 +315,4 @@ impl TorrentManager {
};
}
}
fn add_peer(&self, addr: SocketAddr) {
let (out_tx, out_rx) = tokio::sync::mpsc::channel::<WriterRequest>(1);
let handle = match self
.inner
.locked
.write()
.peers
.add_if_not_seen(addr, out_tx)
{
Some(handle) => handle,
None => return,
};
let peer_connection = PeerConnection::new(self.inner.clone());
spawn(format!("manage_peer({})", handle), async move {
if let Err(e) = peer_connection.manage_peer(addr, handle, out_rx).await {
debug!("error managing peer {}: {:#}", handle, e)
};
peer_connection.into_state().drop_peer(handle);
Ok::<_, anyhow::Error>(())
});
}
}

View file

@ -9,16 +9,16 @@ use std::{
};
use futures::{stream::FuturesUnordered, StreamExt};
use log::{trace, warn};
use log::{debug, trace, warn};
use parking_lot::{Mutex, RwLock};
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{channel, Sender};
use crate::{
chunk_tracker::ChunkTracker,
file_ops::FileOps,
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
peer_binary_protocol::{Handshake, Message},
peer_connection::WriterRequest,
peer_connection::{PeerConnection, WriterRequest},
peer_state::{LivePeerState, PeerState},
spawn_utils::spawn,
torrent_metainfo::TorrentMetaV1Owned,
@ -333,4 +333,21 @@ impl TorrentState {
},
);
}
pub fn add_peer(self: &Arc<Self>, addr: SocketAddr) {
let (out_tx, out_rx) = channel::<WriterRequest>(1);
let handle = match self.locked.write().peers.add_if_not_seen(addr, out_tx) {
Some(handle) => handle,
None => return,
};
let peer_connection = PeerConnection::new(self.clone());
spawn(format!("manage_peer({})", handle), async move {
if let Err(e) = peer_connection.manage_peer(addr, handle, out_rx).await {
debug!("error managing peer {}: {:#}", handle, e)
};
peer_connection.into_state().drop_peer(handle);
Ok::<_, anyhow::Error>(())
});
}
}

View file

@ -1,4 +1,4 @@
use std::{fs::File, io::Read};
use std::{fs::File, io::Read, time::Duration};
use anyhow::Context;
use clap::Clap;
@ -60,6 +60,8 @@ struct Opts {
/// The filename of the .torrent file.
output_folder: String,
/// If set, only the file whose filename matching this regex will
/// be downloaded
#[clap(short = 'r', long = "filename-re")]
only_files_matching_regex: Option<String>,
@ -71,8 +73,15 @@ struct Opts {
#[clap(short, long)]
list: bool,
/// The loglevel
#[clap(arg_enum, short = 'v')]
log_level: Option<LogLevel>,
/// The interval in seconds to poll trackers.
/// Trackers send the refresh interval when we connect to them. Often this is
/// pretty big, e.g. 30 minutes. This can force a certain value.
#[clap(short = 'i', long = "tracker-refresh-interval")]
force_tracker_interval: Option<u64>,
}
fn compute_only_files(
@ -162,6 +171,10 @@ fn main() -> anyhow::Result<()> {
builder.only_files(only_files);
}
if let Some(interval) = opts.force_tracker_interval {
builder.force_tracker_interval(Duration::from_secs(interval));
}
let manager_handle = builder.start_manager().await?;
manager_handle.wait_until_completed().await?;
Ok(())