Deentangled classes a bit

This commit is contained in:
Igor Katson 2021-07-03 12:40:59 +01:00
parent 9038630622
commit 85e33741b7
2 changed files with 84 additions and 59 deletions

View file

@ -1,6 +1,7 @@
use std::{
collections::HashSet,
fs::{File, OpenOptions},
ops::Deref,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU64, Ordering},
@ -17,6 +18,7 @@ use reqwest::Url;
use size_format::SizeFormatterBinary as SF;
use crate::{
buffers::{ByteBuf, ByteString},
chunk_tracker::ChunkTracker,
file_ops::FileOps,
http_api::make_and_run_http_api,
@ -24,13 +26,14 @@ use crate::{
peer_id::generate_peer_id,
spawn_utils::{spawn, BlockingSpawner},
speed_estimator::SpeedEstimator,
torrent_metainfo::TorrentMetaV1Owned,
torrent_metainfo::{TorrentMetaV1Info, TorrentMetaV1Owned},
torrent_state::{AtomicStats, TorrentState, TorrentStateLocked},
tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse},
type_aliases::Sha1,
};
pub struct TorrentManagerBuilder {
torrent: TorrentMetaV1Owned,
info: TorrentMetaV1Info<ByteString>,
info_hash: [u8; 20],
overwrite: bool,
output_folder: PathBuf,
only_files: Option<Vec<usize>>,
@ -39,9 +42,14 @@ pub struct TorrentManagerBuilder {
}
impl TorrentManagerBuilder {
pub fn new<P: AsRef<Path>>(torrent: TorrentMetaV1Owned, output_folder: P) -> Self {
pub fn new<P: AsRef<Path>>(
info: TorrentMetaV1Info<ByteString>,
info_hash: [u8; 20],
output_folder: P,
) -> Self {
Self {
torrent,
info,
info_hash,
overwrite: false,
output_folder: output_folder.as_ref().into(),
only_files: None,
@ -70,9 +78,10 @@ impl TorrentManagerBuilder {
self
}
pub async fn start_manager(self) -> anyhow::Result<TorrentManagerHandle> {
pub fn start_manager(self) -> anyhow::Result<TorrentManagerHandle> {
TorrentManager::start(
self.torrent,
self.info,
self.info_hash,
self.output_folder,
self.overwrite,
self.only_files,
@ -84,10 +93,21 @@ impl TorrentManagerBuilder {
#[derive(Clone)]
pub struct TorrentManagerHandle {
manager: TorrentManager,
manager: Arc<TorrentManager>,
}
impl TorrentManagerHandle {
pub fn add_tracker(&self, url: Url) -> bool {
let mgr = self.manager.clone();
if mgr.trackers.lock().insert(url.clone()) {
spawn(format!("tracker monitor {}", url), async move {
mgr.single_tracker_monitor(url).await
});
true
} else {
false
}
}
pub async fn cancel(&self) -> anyhow::Result<()> {
todo!()
}
@ -98,21 +118,24 @@ impl TorrentManagerHandle {
}
}
#[derive(Clone)]
struct TorrentManager {
state: Arc<TorrentState>,
speed_estimator: Arc<SpeedEstimator>,
trackers: Mutex<HashSet<Url>>,
force_tracker_interval: Option<Duration>,
}
fn make_lengths(torrent: &TorrentMetaV1Owned) -> anyhow::Result<Lengths> {
let total_length = torrent.info.iter_file_lengths().sum();
Lengths::new(total_length, torrent.info.piece_length, None)
fn make_lengths<ByteBuf: Clone + Deref<Target = [u8]>>(
torrent: &TorrentMetaV1Info<ByteBuf>,
) -> anyhow::Result<Lengths> {
let total_length = torrent.iter_file_lengths().sum();
Lengths::new(total_length, torrent.piece_length, None)
}
impl TorrentManager {
pub fn start<P: AsRef<Path>>(
torrent: TorrentMetaV1Owned,
info: TorrentMetaV1Info<ByteString>,
info_hash: [u8; 20],
out: P,
overwrite: bool,
only_files: Option<Vec<usize>>,
@ -121,9 +144,9 @@ impl TorrentManager {
) -> anyhow::Result<TorrentManagerHandle> {
let files = {
let mut files =
Vec::<Arc<Mutex<File>>>::with_capacity(torrent.info.iter_file_lengths().count());
Vec::<Arc<Mutex<File>>>::with_capacity(info.iter_file_lengths().count());
for (path_bits, _) in torrent.info.iter_filenames_and_lengths() {
for (path_bits, _) in info.iter_filenames_and_lengths() {
let mut full_path = out.as_ref().to_owned();
for bit in path_bits.iter_components() {
full_path.push(
@ -155,12 +178,12 @@ impl TorrentManager {
};
let peer_id = generate_peer_id();
let lengths = make_lengths(&torrent).context("unable to compute Lengths from torrent")?;
let lengths = make_lengths(&info).context("unable to compute Lengths from torrent")?;
debug!("computed lengths: {:?}", &lengths);
info!("Doing initial checksum validation, this might take a while...");
let initial_check_results = FileOps::<Sha1>::new(&torrent.info, &files, &lengths)
.initial_check(only_files.as_deref())?;
let initial_check_results =
FileOps::<Sha1>::new(&info, &files, &lengths).initial_check(only_files.as_deref())?;
info!(
"Initial check results: have {}, needed {}",
@ -175,8 +198,8 @@ impl TorrentManager {
);
let state = Arc::new(TorrentState {
info_hash: torrent.info_hash,
torrent: torrent.info,
info_hash,
torrent: info,
peer_id,
locked: Arc::new(RwLock::new(TorrentStateLocked {
peers: Default::default(),
@ -193,14 +216,17 @@ impl TorrentManager {
});
let estimator = Arc::new(SpeedEstimator::new(5));
let mgr = Self {
let mgr = Arc::new(Self {
state,
speed_estimator: estimator.clone(),
trackers: Mutex::new(HashSet::new()),
force_tracker_interval,
};
});
spawn("tracker monitor", mgr.clone().task_tracker_monitor());
spawn("stats printer", mgr.clone().stats_printer());
spawn("stats printer", {
let this = mgr.clone();
async move { this.stats_printer().await }
});
spawn(
"http api",
make_and_run_http_api(mgr.state.clone(), estimator.clone()),
@ -221,7 +247,7 @@ impl TorrentManager {
Ok(mgr.into_handle())
}
async fn stats_printer(self) -> anyhow::Result<()> {
async fn stats_printer(&self) -> anyhow::Result<()> {
loop {
let live_peer_stats = self.state.locked.read().peers.stats();
let seen_peers_count = self.state.locked.read().peers.seen().len();
@ -257,34 +283,7 @@ impl TorrentManager {
}
}
async fn task_tracker_monitor(self) -> anyhow::Result<()> {
let mut seen_trackers = HashSet::new();
let mut tracker_futures = FuturesUnordered::new();
let parse_url = |url: &[u8]| -> anyhow::Result<Url> {
let url = std::str::from_utf8(url).context("error parsing tracker URL")?;
let url = Url::parse(url).context("error parsing tracker URL")?;
Ok(url)
};
for tracker in self.state.torrent.iter_announce() {
if seen_trackers.contains(&tracker) {
continue;
}
seen_trackers.insert(tracker);
let tracker_url = match parse_url(tracker) {
Ok(url) => url,
Err(e) => {
warn!("ignoring tracker: {:#}", e);
continue;
}
};
tracker_futures.push(self.clone().single_tracker_monitor(tracker_url));
}
while tracker_futures.next().await.is_some() {}
Ok(())
}
fn into_handle(self) -> TorrentManagerHandle {
fn into_handle(self: Arc<Self>) -> TorrentManagerHandle {
TorrentManagerHandle { manager: self }
}
@ -308,7 +307,7 @@ impl TorrentManager {
Ok(response.interval)
}
async fn single_tracker_monitor(self, mut tracker_url: Url) -> anyhow::Result<()> {
async fn single_tracker_monitor(&self, mut tracker_url: Url) -> anyhow::Result<()> {
let mut event = Some(TrackerRequestEvent::Started);
loop {
let request = TrackerRequest {
@ -330,8 +329,7 @@ impl TorrentManager {
let request_query = request.as_querystring();
tracker_url.set_query(Some(&request_query));
let this = self.clone();
match this.tracker_one_request(tracker_url.clone()).await {
match self.tracker_one_request(tracker_url.clone()).await {
Ok(interval) => {
event = None;
let interval = self

View file

@ -7,7 +7,8 @@ use librqbit::{
torrent_manager::TorrentManagerBuilder,
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Owned},
};
use log::info;
use log::{info, warn};
use reqwest::Url;
async fn torrent_from_url(url: &str) -> anyhow::Result<TorrentMetaV1Owned> {
let response = reqwest::get(url)
@ -178,7 +179,28 @@ fn main() -> anyhow::Result<()> {
None
};
let mut builder = TorrentManagerBuilder::new(torrent, opts.output_folder);
let trackers = torrent
.iter_announce()
.filter_map(|tracker| {
let url = match std::str::from_utf8(tracker.as_ref()) {
Ok(url) => url,
Err(_) => {
warn!("cannot parse tracker url as utf-8, ignoring");
return None;
}
};
match Url::parse(url) {
Ok(url) => Some(url),
Err(e) => {
warn!("cannot parse tracker URL {}: {}", url, e);
None
}
}
})
.collect::<Vec<_>>();
let mut builder =
TorrentManagerBuilder::new(torrent.info, torrent.info_hash, opts.output_folder);
builder.overwrite(opts.overwrite).spawner(spawner);
if let Some(only_files) = only_files {
builder.only_files(only_files);
@ -188,8 +210,13 @@ fn main() -> anyhow::Result<()> {
builder.force_tracker_interval(Duration::from_secs(interval));
}
let manager_handle = builder.start_manager().await?;
manager_handle.wait_until_completed().await?;
let handle = builder.start_manager()?;
for url in trackers {
handle.add_tracker(url);
}
handle.wait_until_completed().await?;
Ok(())
})
}