[2/n] HUGE REFACTOR to suppor multiple states. Incomplete, broken
This commit is contained in:
parent
739666ff88
commit
d8538af25d
7 changed files with 394 additions and 369 deletions
|
|
@ -76,6 +76,18 @@ impl ChunkTracker {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn get_have_bytes(&self) -> u64 {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn get_needed_bytes(&self) -> u64 {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn get_lengths(&self) -> &Lengths {
|
||||
&self.lengths
|
||||
}
|
||||
|
||||
pub fn get_have_pieces(&self) -> &BF {
|
||||
&self.have
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,262 +0,0 @@
|
|||
use std::{
|
||||
collections::HashSet,
|
||||
fs::{File, OpenOptions},
|
||||
net::SocketAddr,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use bencode::from_bytes;
|
||||
use buffers::ByteString;
|
||||
use librqbit_core::{
|
||||
id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator,
|
||||
torrent_metainfo::TorrentMetaV1Info,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use reqwest::Url;
|
||||
use sha1w::Sha1;
|
||||
use size_format::SizeFormatterBinary as SF;
|
||||
use tracing::{debug, info, span, warn, Level};
|
||||
|
||||
use crate::{
|
||||
chunk_tracker::ChunkTracker,
|
||||
file_ops::FileOps,
|
||||
spawn_utils::{spawn, BlockingSpawner},
|
||||
torrent_state::{ManagedTorrent, ManagedTorrentHandle, TorrentStateLive, TorrentStateOptions},
|
||||
tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse},
|
||||
};
|
||||
|
||||
struct TorrentManager {
|
||||
state: Arc<TorrentStateLive>,
|
||||
#[allow(dead_code)]
|
||||
speed_estimator: Arc<SpeedEstimator>,
|
||||
trackers: Mutex<HashSet<Url>>,
|
||||
options: TorrentManagerOptions,
|
||||
}
|
||||
|
||||
fn make_lengths<ByteBuf: AsRef<[u8]>>(
|
||||
torrent: &TorrentMetaV1Info<ByteBuf>,
|
||||
) -> anyhow::Result<Lengths> {
|
||||
let total_length = torrent.iter_file_lengths()?.sum();
|
||||
Lengths::new(total_length, torrent.piece_length, None)
|
||||
}
|
||||
|
||||
fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> {
|
||||
Ok(file.set_len(length)?)
|
||||
}
|
||||
|
||||
impl TorrentManager {
|
||||
fn start<P: AsRef<Path>>(
|
||||
info: TorrentMetaV1Info<ByteString>,
|
||||
info_hash: Id20,
|
||||
out: P,
|
||||
spawner: BlockingSpawner,
|
||||
options: Option<ManagedTorrentOptions>,
|
||||
) -> anyhow::Result<ManagedTorrentHandle> {
|
||||
let options = options.unwrap_or_default();
|
||||
let (files, filenames) = {
|
||||
let mut files =
|
||||
Vec::<Arc<Mutex<File>>>::with_capacity(info.iter_file_lengths()?.count());
|
||||
let mut filenames = Vec::new();
|
||||
for (path_bits, _) in info.iter_filenames_and_lengths()? {
|
||||
let mut full_path = out.as_ref().to_owned();
|
||||
let relative_path = path_bits
|
||||
.to_pathbuf()
|
||||
.context("error converting file to path")?;
|
||||
full_path.push(relative_path);
|
||||
|
||||
std::fs::create_dir_all(full_path.parent().unwrap())?;
|
||||
let file = if options.overwrite {
|
||||
OpenOptions::new()
|
||||
.create(true)
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(&full_path)?
|
||||
} else {
|
||||
// TODO: create_new does not seem to work with read(true), so calling this twice.
|
||||
OpenOptions::new()
|
||||
.create_new(true)
|
||||
.write(true)
|
||||
.open(&full_path)
|
||||
.with_context(|| format!("error creating {:?}", &full_path))?;
|
||||
OpenOptions::new().read(true).write(true).open(&full_path)?
|
||||
};
|
||||
filenames.push(full_path);
|
||||
files.push(Arc::new(Mutex::new(file)))
|
||||
}
|
||||
(files, filenames)
|
||||
};
|
||||
|
||||
let peer_id = options.peer_id.unwrap_or_else(generate_peer_id);
|
||||
let lengths = make_lengths(&info).context("unable to compute Lengths from torrent")?;
|
||||
debug!("computed lengths: {:?}", &lengths);
|
||||
|
||||
info!("Doing initial checksum validation, this might take a while...");
|
||||
let initial_check_results = spawner.spawn_block_in_place(|| {
|
||||
FileOps::<Sha1>::new(&info, &files, &lengths)
|
||||
.initial_check(options.only_files.as_deref())
|
||||
})?;
|
||||
|
||||
info!(
|
||||
"Initial check results: have {}, needed {}",
|
||||
SF::new(initial_check_results.have_bytes),
|
||||
SF::new(initial_check_results.needed_bytes)
|
||||
);
|
||||
|
||||
spawner.spawn_block_in_place(|| {
|
||||
for (idx, (file, (name, length))) in files
|
||||
.iter()
|
||||
.zip(info.iter_filenames_and_lengths().unwrap())
|
||||
.enumerate()
|
||||
{
|
||||
if options
|
||||
.only_files
|
||||
.as_ref()
|
||||
.map(|v| !v.contains(&idx))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let now = Instant::now();
|
||||
if let Err(err) = ensure_file_length(&file.lock(), length) {
|
||||
warn!(
|
||||
"Error setting length for file {:?} to {}: {:#?}",
|
||||
name, length, err
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
"Set length for file {:?} to {} in {:?}",
|
||||
name,
|
||||
SF::new(length),
|
||||
now.elapsed()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let chunk_tracker = ChunkTracker::new(
|
||||
initial_check_results.needed_pieces,
|
||||
initial_check_results.have_pieces,
|
||||
lengths,
|
||||
);
|
||||
|
||||
#[allow(clippy::needless_update)]
|
||||
let state_options = TorrentStateOptions {
|
||||
peer_connect_timeout: options.peer_connect_timeout,
|
||||
peer_read_write_timeout: options.peer_read_write_timeout,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let state = TorrentStateLive::new(
|
||||
info,
|
||||
info_hash,
|
||||
peer_id,
|
||||
files,
|
||||
filenames,
|
||||
chunk_tracker,
|
||||
lengths,
|
||||
initial_check_results.have_bytes,
|
||||
initial_check_results.needed_bytes,
|
||||
spawner,
|
||||
Some(state_options),
|
||||
);
|
||||
|
||||
let estimator = Arc::new(SpeedEstimator::new(5));
|
||||
|
||||
let mgr = Arc::new(Self {
|
||||
state,
|
||||
speed_estimator: estimator.clone(),
|
||||
trackers: Mutex::new(HashSet::new()),
|
||||
options,
|
||||
});
|
||||
|
||||
spawn(span!(Level::ERROR, "speed_estimator_updater"), {
|
||||
let state = mgr.state.clone();
|
||||
async move {
|
||||
loop {
|
||||
let stats = state.stats_snapshot();
|
||||
let fetched = stats.fetched_bytes;
|
||||
let needed = state.initially_needed();
|
||||
// fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64.
|
||||
let remaining = needed
|
||||
.wrapping_sub(fetched)
|
||||
.min(needed - stats.downloaded_and_checked_bytes);
|
||||
estimator.add_snapshot(fetched, remaining, Instant::now());
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(mgr.into_handle())
|
||||
}
|
||||
|
||||
fn into_handle(self: Arc<Self>) -> TorrentManagerHandle {
|
||||
TorrentManagerHandle { manager: self }
|
||||
}
|
||||
|
||||
async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result<u64> {
|
||||
let response: reqwest::Response = reqwest::get(tracker_url).await?;
|
||||
if !response.status().is_success() {
|
||||
anyhow::bail!("tracker responded with {:?}", response.status());
|
||||
}
|
||||
let bytes = response.bytes().await?;
|
||||
if let Ok(error) = from_bytes::<TrackerError>(&bytes) {
|
||||
anyhow::bail!(
|
||||
"tracker returned failure. Failure reason: {}",
|
||||
error.failure_reason
|
||||
)
|
||||
};
|
||||
let response = from_bytes::<TrackerResponse>(&bytes)?;
|
||||
|
||||
for peer in response.peers.iter_sockaddrs() {
|
||||
self.state.add_peer_if_not_seen(peer);
|
||||
}
|
||||
Ok(response.interval)
|
||||
}
|
||||
|
||||
async fn single_tracker_monitor(&self, mut tracker_url: Url) -> anyhow::Result<()> {
|
||||
let mut event = Some(TrackerRequestEvent::Started);
|
||||
loop {
|
||||
let request = TrackerRequest {
|
||||
info_hash: self.state.info_hash(),
|
||||
peer_id: self.state.peer_id(),
|
||||
port: 6778,
|
||||
uploaded: self.state.get_uploaded_bytes(),
|
||||
downloaded: self.state.get_downloaded_bytes(),
|
||||
left: self.state.get_left_to_download_bytes(),
|
||||
compact: true,
|
||||
no_peer_id: false,
|
||||
event,
|
||||
ip: None,
|
||||
numwant: None,
|
||||
key: None,
|
||||
trackerid: None,
|
||||
};
|
||||
|
||||
let request_query = request.as_querystring();
|
||||
tracker_url.set_query(Some(&request_query));
|
||||
|
||||
match self.tracker_one_request(tracker_url.clone()).await {
|
||||
Ok(interval) => {
|
||||
event = None;
|
||||
let interval = self
|
||||
.options
|
||||
.force_tracker_interval
|
||||
.unwrap_or_else(|| Duration::from_secs(interval));
|
||||
debug!(
|
||||
"sleeping for {:?} after calling tracker {}",
|
||||
interval,
|
||||
tracker_url.host().unwrap()
|
||||
);
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("error calling the tracker {}: {:#}", tracker_url, e);
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
157
crates/librqbit/src/torrent_state/initializing.rs
Normal file
157
crates/librqbit/src/torrent_state/initializing.rs
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
use std::{
|
||||
collections::HashSet,
|
||||
fs::{File, OpenOptions},
|
||||
net::SocketAddr,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use bencode::from_bytes;
|
||||
use buffers::ByteString;
|
||||
use librqbit_core::{
|
||||
id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator,
|
||||
torrent_metainfo::TorrentMetaV1Info,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use reqwest::Url;
|
||||
use sha1w::Sha1;
|
||||
use size_format::SizeFormatterBinary as SF;
|
||||
use tracing::{debug, info, span, warn, Level};
|
||||
|
||||
use crate::{
|
||||
chunk_tracker::ChunkTracker,
|
||||
file_ops::FileOps,
|
||||
spawn_utils::{spawn, BlockingSpawner},
|
||||
torrent_state::{ManagedTorrent, ManagedTorrentHandle, TorrentStateLive, TorrentStateOptions},
|
||||
tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse},
|
||||
};
|
||||
|
||||
use super::{paused::TorrentStatePaused, ManagedTorrentInfo};
|
||||
|
||||
fn make_lengths<ByteBuf: AsRef<[u8]>>(
|
||||
torrent: &TorrentMetaV1Info<ByteBuf>,
|
||||
) -> anyhow::Result<Lengths> {
|
||||
let total_length = torrent.iter_file_lengths()?.sum();
|
||||
Lengths::new(total_length, torrent.piece_length, None)
|
||||
}
|
||||
|
||||
fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> {
|
||||
Ok(file.set_len(length)?)
|
||||
}
|
||||
|
||||
pub struct TorrentStateInitializing {
|
||||
info: Arc<ManagedTorrentInfo>,
|
||||
only_files: Option<Vec<usize>>,
|
||||
}
|
||||
|
||||
impl TorrentStateInitializing {
|
||||
pub fn new(info: Arc<ManagedTorrentInfo>, only_files: Option<Vec<usize>>) -> Self {
|
||||
Self { info, only_files }
|
||||
}
|
||||
|
||||
pub async fn check(&self) -> anyhow::Result<TorrentStatePaused> {
|
||||
let (files, filenames) = {
|
||||
let mut files = Vec::<Arc<Mutex<File>>>::with_capacity(
|
||||
(&self.info).info.iter_file_lengths()?.count(),
|
||||
);
|
||||
let mut filenames = Vec::new();
|
||||
for (path_bits, _) in (&self.info).info.iter_filenames_and_lengths()? {
|
||||
let mut full_path = (&self.info).out_dir.clone();
|
||||
let relative_path = path_bits
|
||||
.to_pathbuf()
|
||||
.context("error converting file to path")?;
|
||||
full_path.push(relative_path);
|
||||
|
||||
std::fs::create_dir_all(full_path.parent().unwrap())?;
|
||||
let file = if (&self.info).options.overwrite {
|
||||
OpenOptions::new()
|
||||
.create(true)
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(&full_path)?
|
||||
} else {
|
||||
// TODO: create_new does not seem to work with read(true), so calling this twice.
|
||||
OpenOptions::new()
|
||||
.create_new(true)
|
||||
.write(true)
|
||||
.open(&full_path)
|
||||
.with_context(|| format!("error creating {:?}", &full_path))?;
|
||||
OpenOptions::new().read(true).write(true).open(&full_path)?
|
||||
};
|
||||
filenames.push(full_path);
|
||||
files.push(Arc::new(Mutex::new(file)))
|
||||
}
|
||||
(files, filenames)
|
||||
};
|
||||
|
||||
let lengths =
|
||||
make_lengths(&(&self.info).info).context("unable to compute Lengths from torrent")?;
|
||||
debug!("computed lengths: {:?}", &lengths);
|
||||
|
||||
info!("Doing initial checksum validation, this might take a while...");
|
||||
let initial_check_results = (&self.info).spawner.spawn_block_in_place(|| {
|
||||
FileOps::<Sha1>::new(&(&self.info).info, &files, &lengths)
|
||||
.initial_check(self.only_files.as_deref())
|
||||
})?;
|
||||
|
||||
info!(
|
||||
"Initial check results: have {}, needed {}",
|
||||
SF::new(initial_check_results.have_bytes),
|
||||
SF::new(initial_check_results.needed_bytes)
|
||||
);
|
||||
|
||||
(&self.info).spawner.spawn_block_in_place(|| {
|
||||
for (idx, (file, (name, length))) in files
|
||||
.iter()
|
||||
.zip((&self.info).info.iter_filenames_and_lengths().unwrap())
|
||||
.enumerate()
|
||||
{
|
||||
if self
|
||||
.only_files
|
||||
.as_ref()
|
||||
.map(|v| !v.contains(&idx))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let now = Instant::now();
|
||||
if let Err(err) = ensure_file_length(&file.lock(), length) {
|
||||
warn!(
|
||||
"Error setting length for file {:?} to {}: {:#?}",
|
||||
name, length, err
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
"Set length for file {:?} to {} in {:?}",
|
||||
name,
|
||||
SF::new(length),
|
||||
now.elapsed()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let chunk_tracker = ChunkTracker::new(
|
||||
initial_check_results.needed_pieces,
|
||||
initial_check_results.have_pieces,
|
||||
lengths,
|
||||
);
|
||||
|
||||
#[allow(clippy::needless_update)]
|
||||
let state_options = TorrentStateOptions {
|
||||
peer_connect_timeout: (&self.info).options.peer_connect_timeout,
|
||||
peer_read_write_timeout: (&self.info).options.peer_read_write_timeout,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let paused = TorrentStatePaused {
|
||||
info: self.info.clone(),
|
||||
files,
|
||||
filenames,
|
||||
chunk_tracker,
|
||||
};
|
||||
Ok(paused)
|
||||
}
|
||||
}
|
||||
|
|
@ -57,12 +57,13 @@ use std::{
|
|||
|
||||
use anyhow::{bail, Context};
|
||||
use backoff::backoff::Backoff;
|
||||
use bencode::from_bytes;
|
||||
use buffers::{ByteBuf, ByteString};
|
||||
use clone_to_owned::CloneToOwned;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use librqbit_core::{
|
||||
id20::Id20,
|
||||
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
|
||||
lengths::{self, ChunkInfo, Lengths, ValidPieceIndex},
|
||||
speed_estimator::{self, SpeedEstimator},
|
||||
torrent_metainfo::TorrentMetaV1Info,
|
||||
};
|
||||
|
|
@ -78,7 +79,8 @@ use tokio::{
|
|||
},
|
||||
time::timeout,
|
||||
};
|
||||
use tracing::{debug, error, info, span, trace, warn, Level};
|
||||
use tracing::{debug, error, info, span, trace, trace_span, warn, Level};
|
||||
use url::Url;
|
||||
|
||||
use crate::{
|
||||
chunk_tracker::{ChunkMarkingResult, ChunkTracker},
|
||||
|
|
@ -87,6 +89,7 @@ use crate::{
|
|||
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
|
||||
},
|
||||
spawn_utils::{spawn, BlockingSpawner},
|
||||
tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse},
|
||||
type_aliases::{PeerHandle, BF},
|
||||
};
|
||||
|
||||
|
|
@ -102,7 +105,11 @@ use self::{
|
|||
stats::{atomic::AtomicStats, snapshot::StatsSnapshot},
|
||||
};
|
||||
|
||||
use super::utils::{timeit, TimedExistence};
|
||||
use super::{
|
||||
paused::TorrentStatePaused,
|
||||
utils::{timeit, TimedExistence},
|
||||
ManagedTorrentInfo,
|
||||
};
|
||||
|
||||
struct InflightPiece {
|
||||
peer: PeerHandle,
|
||||
|
|
@ -126,17 +133,17 @@ pub struct TorrentStateOptions {
|
|||
|
||||
pub struct TorrentStateLive {
|
||||
peers: PeerStates,
|
||||
info: TorrentMetaV1Info<ByteString>,
|
||||
meta: Arc<ManagedTorrentInfo>,
|
||||
locked: Arc<RwLock<TorrentStateLocked>>,
|
||||
files: Vec<Arc<Mutex<File>>>,
|
||||
filenames: Vec<PathBuf>,
|
||||
info_hash: Id20,
|
||||
peer_id: Id20,
|
||||
lengths: Lengths,
|
||||
|
||||
// TODO: why the hell do we need these here, remove it.
|
||||
needed_bytes: u64,
|
||||
have_plus_needed_bytes: u64,
|
||||
|
||||
stats: AtomicStats,
|
||||
options: TorrentStateOptions,
|
||||
lengths: Lengths,
|
||||
|
||||
// Limits how many active (occupying network resources) peers there are at a moment in time.
|
||||
peer_semaphore: Semaphore,
|
||||
|
|
@ -151,35 +158,24 @@ pub struct TorrentStateLive {
|
|||
|
||||
impl TorrentStateLive {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
info: TorrentMetaV1Info<ByteString>,
|
||||
info_hash: Id20,
|
||||
peer_id: Id20,
|
||||
files: Vec<Arc<Mutex<File>>>,
|
||||
filenames: Vec<PathBuf>,
|
||||
chunk_tracker: ChunkTracker,
|
||||
lengths: Lengths,
|
||||
have_bytes: u64,
|
||||
needed_bytes: u64,
|
||||
spawner: BlockingSpawner,
|
||||
options: Option<TorrentStateOptions>,
|
||||
) -> Arc<Self> {
|
||||
let options = options.unwrap_or_default();
|
||||
pub(crate) fn new(paused: TorrentStatePaused) -> Arc<Self> {
|
||||
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
|
||||
|
||||
let speed_estimator = SpeedEstimator::new(5);
|
||||
|
||||
let have_bytes = paused.chunk_tracker.get_have_bytes();
|
||||
let needed_bytes = paused.chunk_tracker.get_needed_bytes();
|
||||
let lengths = paused.chunk_tracker.get_lengths().clone();
|
||||
|
||||
let state = Arc::new(TorrentStateLive {
|
||||
info_hash,
|
||||
info,
|
||||
peer_id,
|
||||
meta: paused.info.clone(),
|
||||
peers: Default::default(),
|
||||
locked: Arc::new(RwLock::new(TorrentStateLocked {
|
||||
chunks: chunk_tracker,
|
||||
chunks: paused.chunk_tracker,
|
||||
inflight_pieces: Default::default(),
|
||||
})),
|
||||
files,
|
||||
filenames,
|
||||
files: paused.files,
|
||||
filenames: paused.filenames,
|
||||
stats: AtomicStats {
|
||||
have_bytes: AtomicU64::new(have_bytes),
|
||||
..Default::default()
|
||||
|
|
@ -187,16 +183,42 @@ impl TorrentStateLive {
|
|||
needed_bytes,
|
||||
have_plus_needed_bytes: needed_bytes + have_bytes,
|
||||
lengths,
|
||||
options,
|
||||
|
||||
peer_semaphore: Semaphore::new(128),
|
||||
peer_queue_tx,
|
||||
finished_notify: Notify::new(),
|
||||
speed_estimator,
|
||||
});
|
||||
|
||||
for tracker in state.meta.trackers.iter() {
|
||||
spawn(
|
||||
trace_span!("tracker_monitor", url = tracker.to_string()),
|
||||
state.clone().task_single_tracker_monitor(tracker.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
spawn(span!(Level::ERROR, "speed_estimator_updater"), {
|
||||
let state = state.clone();
|
||||
async move {
|
||||
loop {
|
||||
let stats = state.stats_snapshot();
|
||||
let fetched = stats.fetched_bytes;
|
||||
let needed = state.initially_needed();
|
||||
// fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64.
|
||||
let remaining = needed
|
||||
.wrapping_sub(fetched)
|
||||
.min(needed - stats.downloaded_and_checked_bytes);
|
||||
state
|
||||
.speed_estimator
|
||||
.add_snapshot(fetched, remaining, Instant::now());
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
spawn(
|
||||
span!(Level::ERROR, "peer_adder"),
|
||||
state.clone().task_peer_adder(peer_queue_rx, spawner),
|
||||
state.clone().task_peer_adder(peer_queue_rx),
|
||||
);
|
||||
state
|
||||
}
|
||||
|
|
@ -205,11 +227,75 @@ impl TorrentStateLive {
|
|||
&self.speed_estimator
|
||||
}
|
||||
|
||||
async fn task_manage_peer(
|
||||
async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result<u64> {
|
||||
let response: reqwest::Response = reqwest::get(tracker_url).await?;
|
||||
if !response.status().is_success() {
|
||||
anyhow::bail!("tracker responded with {:?}", response.status());
|
||||
}
|
||||
let bytes = response.bytes().await?;
|
||||
if let Ok(error) = from_bytes::<TrackerError>(&bytes) {
|
||||
anyhow::bail!(
|
||||
"tracker returned failure. Failure reason: {}",
|
||||
error.failure_reason
|
||||
)
|
||||
};
|
||||
let response = from_bytes::<TrackerResponse>(&bytes)?;
|
||||
|
||||
for peer in response.peers.iter_sockaddrs() {
|
||||
self.add_peer_if_not_seen(peer);
|
||||
}
|
||||
Ok(response.interval)
|
||||
}
|
||||
|
||||
async fn task_single_tracker_monitor(
|
||||
self: Arc<Self>,
|
||||
addr: SocketAddr,
|
||||
spawner: BlockingSpawner,
|
||||
mut tracker_url: Url,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut event = Some(TrackerRequestEvent::Started);
|
||||
loop {
|
||||
let request = TrackerRequest {
|
||||
info_hash: self.info_hash(),
|
||||
peer_id: self.peer_id(),
|
||||
port: 6778,
|
||||
uploaded: self.get_uploaded_bytes(),
|
||||
downloaded: self.get_downloaded_bytes(),
|
||||
left: self.get_left_to_download_bytes(),
|
||||
compact: true,
|
||||
no_peer_id: false,
|
||||
event,
|
||||
ip: None,
|
||||
numwant: None,
|
||||
key: None,
|
||||
trackerid: None,
|
||||
};
|
||||
|
||||
let request_query = request.as_querystring();
|
||||
tracker_url.set_query(Some(&request_query));
|
||||
|
||||
match self.tracker_one_request(tracker_url.clone()).await {
|
||||
Ok(interval) => {
|
||||
event = None;
|
||||
let interval = self
|
||||
.meta
|
||||
.options
|
||||
.force_tracker_interval
|
||||
.unwrap_or_else(|| Duration::from_secs(interval));
|
||||
debug!(
|
||||
"sleeping for {:?} after calling tracker {}",
|
||||
interval,
|
||||
tracker_url.host().unwrap()
|
||||
);
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("error calling the tracker {}: {:#}", tracker_url, e);
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async fn task_manage_peer(self: Arc<Self>, addr: SocketAddr) -> anyhow::Result<()> {
|
||||
let state = self;
|
||||
let (rx, tx) = state.peers.mark_peer_connecting(addr)?;
|
||||
|
||||
|
|
@ -229,21 +315,20 @@ impl TorrentStateLive {
|
|||
requests_sem: Semaphore::new(0),
|
||||
state: state.clone(),
|
||||
tx,
|
||||
spawner,
|
||||
counters,
|
||||
};
|
||||
let options = PeerConnectionOptions {
|
||||
connect_timeout: state.options.peer_connect_timeout,
|
||||
read_write_timeout: state.options.peer_read_write_timeout,
|
||||
connect_timeout: state.meta.options.peer_connect_timeout,
|
||||
read_write_timeout: state.meta.options.peer_read_write_timeout,
|
||||
..Default::default()
|
||||
};
|
||||
let peer_connection = PeerConnection::new(
|
||||
addr,
|
||||
state.info_hash,
|
||||
state.peer_id,
|
||||
state.meta.info_hash,
|
||||
state.meta.peer_id,
|
||||
&handler,
|
||||
Some(options),
|
||||
spawner,
|
||||
state.meta.spawner,
|
||||
);
|
||||
let requester = handler.task_peer_chunk_requester(addr);
|
||||
|
||||
|
|
@ -274,7 +359,6 @@ impl TorrentStateLive {
|
|||
async fn task_peer_adder(
|
||||
self: Arc<Self>,
|
||||
mut peer_queue_rx: UnboundedReceiver<SocketAddr>,
|
||||
spawner: BlockingSpawner,
|
||||
) -> anyhow::Result<()> {
|
||||
let state = self;
|
||||
loop {
|
||||
|
|
@ -289,22 +373,22 @@ impl TorrentStateLive {
|
|||
permit.forget();
|
||||
spawn(
|
||||
span!(parent: None, Level::ERROR, "manage_peer", peer = addr.to_string()),
|
||||
state.clone().task_manage_peer(addr, spawner),
|
||||
state.clone().task_manage_peer(addr),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn info(&self) -> &TorrentMetaV1Info<ByteString> {
|
||||
&self.info
|
||||
&self.meta.info
|
||||
}
|
||||
pub fn info_hash(&self) -> Id20 {
|
||||
self.info_hash
|
||||
self.meta.info_hash
|
||||
}
|
||||
pub fn peer_id(&self) -> Id20 {
|
||||
self.peer_id
|
||||
self.meta.peer_id
|
||||
}
|
||||
pub(crate) fn file_ops(&self) -> FileOps<'_, Sha1> {
|
||||
FileOps::new(&self.info, &self.files, &self.lengths)
|
||||
FileOps::new(&self.meta.info, &self.files, &self.lengths)
|
||||
}
|
||||
pub fn initially_needed(&self) -> u64 {
|
||||
self.needed_bytes
|
||||
|
|
@ -429,7 +513,7 @@ impl TorrentStateLive {
|
|||
);
|
||||
}
|
||||
|
||||
pub(crate) fn add_peer_if_not_seen(self: &Arc<Self>, addr: SocketAddr) -> bool {
|
||||
pub(crate) fn add_peer_if_not_seen(&self, addr: SocketAddr) -> bool {
|
||||
match self.peers.add_if_not_seen(addr) {
|
||||
Some(handle) => handle,
|
||||
None => return false,
|
||||
|
|
@ -509,7 +593,6 @@ struct PeerHandler {
|
|||
requests_sem: Semaphore,
|
||||
|
||||
addr: SocketAddr,
|
||||
spawner: BlockingSpawner,
|
||||
|
||||
tx: PeerTx,
|
||||
}
|
||||
|
|
@ -1083,7 +1166,9 @@ impl PeerHandler {
|
|||
// By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would
|
||||
// have fallen off above in one of the defensive checks.
|
||||
|
||||
self.spawner
|
||||
self.state
|
||||
.meta
|
||||
.spawner
|
||||
.spawn_block_in_place(move || {
|
||||
let index = piece.index;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
pub mod utils;
|
||||
|
||||
pub mod initializing;
|
||||
pub mod live;
|
||||
pub mod paused;
|
||||
pub mod utils;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
|
|
@ -11,6 +12,7 @@ use std::{collections::HashSet, path::Path};
|
|||
use anyhow::Context;
|
||||
use buffers::ByteString;
|
||||
use librqbit_core::id20::Id20;
|
||||
use librqbit_core::peer_id::generate_peer_id;
|
||||
use librqbit_core::speed_estimator::SpeedEstimator;
|
||||
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
|
||||
pub use live::*;
|
||||
|
|
@ -21,32 +23,37 @@ use url::Url;
|
|||
|
||||
use crate::spawn_utils::{spawn, BlockingSpawner};
|
||||
|
||||
pub struct TorrentStateInitializing {}
|
||||
use initializing::TorrentStateInitializing;
|
||||
|
||||
use self::paused::TorrentStatePaused;
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub enum ManagedTorrentState {
|
||||
#[default]
|
||||
Created,
|
||||
|
||||
Initializing(Arc<TorrentStateInitializing>),
|
||||
|
||||
// TODO: only_files_tx
|
||||
// TODO: trackers_tx??
|
||||
Initializing(TorrentStateInitializing),
|
||||
Paused(TorrentStatePaused),
|
||||
Live(Arc<TorrentStateLive>),
|
||||
Error(anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) struct ManagedTorrentLocked {
|
||||
pub only_files: Option<Vec<usize>>,
|
||||
pub state: ManagedTorrentState,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct ManagedTorrentOptions {
|
||||
pub force_tracker_interval: Option<Duration>,
|
||||
pub peer_connect_timeout: Option<Duration>,
|
||||
pub peer_read_write_timeout: Option<Duration>,
|
||||
pub overwrite: bool,
|
||||
}
|
||||
|
||||
pub struct ManagedTorrentInfo {
|
||||
pub info: TorrentMetaV1Info<ByteString>,
|
||||
pub info_hash: Id20,
|
||||
pub out_dir: PathBuf,
|
||||
pub spawner: BlockingSpawner,
|
||||
pub trackers: Vec<Url>,
|
||||
// pub options: Option<ManagedTorrentOptions>,
|
||||
pub peer_id: Id20,
|
||||
pub(crate) options: ManagedTorrentOptions,
|
||||
}
|
||||
|
||||
pub struct ManagedTorrent {
|
||||
|
|
@ -68,11 +75,12 @@ impl ManagedTorrent {
|
|||
}
|
||||
|
||||
pub fn only_files(&self) -> Option<Vec<usize>> {
|
||||
self.locked.write().only_files.clone()
|
||||
// self.locked.write().only_files.clone()
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn state(&self) -> ManagedTorrentState {
|
||||
self.locked.read().state.clone()
|
||||
pub fn with_state<R>(&self, f: impl FnOnce(&ManagedTorrentState) -> R) -> R {
|
||||
f(&self.locked.read().state)
|
||||
}
|
||||
|
||||
pub fn live(&self) -> Option<Arc<TorrentStateLive>> {
|
||||
|
|
@ -169,19 +177,26 @@ impl ManagedTorrentBuilder {
|
|||
}
|
||||
|
||||
pub(crate) fn build(self) -> ManagedTorrentHandle {
|
||||
let info = Arc::new(ManagedTorrentInfo {
|
||||
info: self.info,
|
||||
info_hash: self.info_hash,
|
||||
out_dir: self.output_folder,
|
||||
trackers: self.trackers.into_iter().collect(),
|
||||
spawner: self.spawner.unwrap_or_default(),
|
||||
peer_id: self.peer_id.unwrap_or_else(generate_peer_id),
|
||||
options: ManagedTorrentOptions {
|
||||
force_tracker_interval: self.force_tracker_interval,
|
||||
peer_connect_timeout: self.peer_connect_timeout,
|
||||
peer_read_write_timeout: self.peer_read_write_timeout,
|
||||
overwrite: self.overwrite,
|
||||
},
|
||||
});
|
||||
let initializing = TorrentStateInitializing::new(info.clone(), self.only_files);
|
||||
Arc::new(ManagedTorrent {
|
||||
locked: RwLock::new(ManagedTorrentLocked {
|
||||
only_files: self.only_files,
|
||||
state: Default::default(),
|
||||
}),
|
||||
info: Arc::new(ManagedTorrentInfo {
|
||||
info: self.info,
|
||||
info_hash: self.info_hash,
|
||||
out_dir: self.output_folder,
|
||||
trackers: self.trackers.into_iter().collect(),
|
||||
spawner: self.spawner.unwrap_or_default(),
|
||||
// options: Some(self.options),
|
||||
state: ManagedTorrentState::Initializing(initializing),
|
||||
}),
|
||||
info,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
14
crates/librqbit/src/torrent_state/paused.rs
Normal file
14
crates/librqbit/src/torrent_state/paused.rs
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
use std::{fs::File, path::PathBuf, sync::Arc};
|
||||
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use crate::chunk_tracker::ChunkTracker;
|
||||
|
||||
use super::ManagedTorrentInfo;
|
||||
|
||||
pub struct TorrentStatePaused {
|
||||
pub(crate) info: Arc<ManagedTorrentInfo>,
|
||||
pub(crate) files: Vec<Arc<Mutex<File>>>,
|
||||
pub(crate) filenames: Vec<PathBuf>,
|
||||
pub(crate) chunk_tracker: ChunkTracker,
|
||||
}
|
||||
|
|
@ -239,39 +239,43 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
|
|||
loop {
|
||||
session.with_torrents(|torrents| {
|
||||
for (idx, torrent) in torrents.iter().enumerate() {
|
||||
match torrent.state() {
|
||||
ManagedTorrentState::Initializing(_) => {
|
||||
info!("[{}] initializing", idx);
|
||||
},
|
||||
ManagedTorrentState::Live(handle) => {
|
||||
let stats = handle.stats_snapshot();
|
||||
let speed = handle.speed_estimator();
|
||||
let total = stats.total_bytes;
|
||||
let progress = stats.total_bytes - stats.remaining_bytes;
|
||||
let downloaded_pct = if stats.remaining_bytes == 0 {
|
||||
100f64
|
||||
} else {
|
||||
(progress as f64 / total as f64) * 100f64
|
||||
};
|
||||
info!(
|
||||
"[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}, dead: {}}}",
|
||||
idx,
|
||||
downloaded_pct,
|
||||
SF::new(progress),
|
||||
speed.download_mbps(),
|
||||
SF::new(stats.fetched_bytes),
|
||||
SF::new(stats.remaining_bytes),
|
||||
SF::new(total),
|
||||
SF::new(stats.uploaded_bytes),
|
||||
stats.peer_stats.live,
|
||||
stats.peer_stats.connecting,
|
||||
stats.peer_stats.queued,
|
||||
stats.peer_stats.seen,
|
||||
stats.peer_stats.dead,
|
||||
);
|
||||
},
|
||||
ManagedTorrentState::Created => warn!("the torrent was just created, but not initializing"),
|
||||
}
|
||||
let live = torrent.with_state(|s| {
|
||||
match s {
|
||||
ManagedTorrentState::Initializing(_) => info!("[{}] initializing", idx),
|
||||
ManagedTorrentState::Live(h) => return Some(h.clone()),
|
||||
ManagedTorrentState::Error(_) | ManagedTorrentState::Paused(_) => {},
|
||||
};
|
||||
None
|
||||
});
|
||||
let handle = match live {
|
||||
Some(live) => live,
|
||||
None => continue
|
||||
};
|
||||
let stats = handle.stats_snapshot();
|
||||
let speed = handle.speed_estimator();
|
||||
let total = stats.total_bytes;
|
||||
let progress = stats.total_bytes - stats.remaining_bytes;
|
||||
let downloaded_pct = if stats.remaining_bytes == 0 {
|
||||
100f64
|
||||
} else {
|
||||
(progress as f64 / total as f64) * 100f64
|
||||
};
|
||||
info!(
|
||||
"[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}, dead: {}}}",
|
||||
idx,
|
||||
downloaded_pct,
|
||||
SF::new(progress),
|
||||
speed.download_mbps(),
|
||||
SF::new(stats.fetched_bytes),
|
||||
SF::new(stats.remaining_bytes),
|
||||
SF::new(total),
|
||||
SF::new(stats.uploaded_bytes),
|
||||
stats.peer_stats.live,
|
||||
stats.peer_stats.connecting,
|
||||
stats.peer_stats.queued,
|
||||
stats.peer_stats.seen,
|
||||
stats.peer_stats.dead,
|
||||
);
|
||||
}
|
||||
});
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue