HUGE REFACTOR to suppor multiple states. Incomplete, broken

This commit is contained in:
Igor Katson 2023-11-24 09:30:21 +00:00
parent cc1ef9d0e4
commit 739666ff88
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
7 changed files with 242 additions and 244 deletions

View file

@ -1,4 +1,4 @@
use anyhow::Context;
use anyhow::{bail, Context};
use axum::body::Bytes;
use axum::extract::{Path, Query, State};
use axum::response::IntoResponse;
@ -22,9 +22,9 @@ use crate::http_api_error::{ApiError, ApiErrorExt};
use crate::session::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session,
};
use crate::torrent_manager::TorrentManagerHandle;
use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot};
use crate::torrent_state::stats::snapshot::StatsSnapshot;
use crate::torrent_state::ManagedTorrentHandle;
// Public API
#[derive(Clone)]
@ -38,7 +38,7 @@ impl HttpApi {
inner: Arc::new(ApiInternal::new(session)),
}
}
pub fn add_torrent_handle(&self, handle: TorrentManagerHandle) -> usize {
pub fn add_torrent_handle(&self, handle: ManagedTorrentHandle) -> usize {
self.inner.add_torrent_handle(handle)
}
@ -333,7 +333,7 @@ impl TorrentAddQueryParams {
struct ApiInternal {
dht: Option<Dht>,
startup_time: Instant,
torrent_managers: RwLock<Vec<TorrentManagerHandle>>,
torrent_managers: RwLock<Vec<ManagedTorrentHandle>>,
session: Arc<Session>,
}
@ -349,14 +349,14 @@ impl ApiInternal {
}
}
fn add_torrent_handle(&self, handle: TorrentManagerHandle) -> usize {
fn add_torrent_handle(&self, handle: ManagedTorrentHandle) -> usize {
let mut g = self.torrent_managers.write();
let idx = g.len();
g.push(handle);
idx
}
fn mgr_handle(&self, idx: usize) -> Result<TorrentManagerHandle> {
fn mgr_handle(&self, idx: usize) -> Result<ManagedTorrentHandle> {
self.torrent_managers
.read()
.get(idx)
@ -373,7 +373,7 @@ impl ApiInternal {
.enumerate()
.map(|(id, mgr)| TorrentListResponseItem {
id,
info_hash: mgr.torrent_state().info_hash().as_string(),
info_hash: mgr.info().info_hash.as_string(),
})
.collect(),
}
@ -381,14 +381,17 @@ impl ApiInternal {
fn api_torrent_details(&self, idx: usize) -> Result<TorrentDetailsResponse> {
let handle = self.mgr_handle(idx)?;
let info_hash = handle.torrent_state().info_hash();
let info_hash = handle.info().info_hash;
let only_files = handle.only_files();
make_torrent_details(&info_hash, handle.torrent_state().info(), only_files)
make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref())
}
fn api_peer_stats(&self, idx: usize, filter: PeerStatsFilter) -> Result<PeerStatsSnapshot> {
let handle = self.mgr_handle(idx)?;
Ok(handle.torrent_state().per_peer_stats_snapshot(filter))
Ok(handle
.live()
.context("not live")?
.per_peer_stats_snapshot(filter))
}
pub async fn api_add_torrent(
@ -406,8 +409,8 @@ impl ApiInternal {
AddTorrentResponse::AlreadyManaged(managed) => {
return Err(anyhow::anyhow!(
"{:?} is already managed, downloaded to {:?}",
managed.info_hash,
managed.output_folder
managed.info_hash(),
&managed.info().out_dir
))
.with_error_status_code(StatusCode::CONFLICT);
}
@ -422,9 +425,9 @@ impl ApiInternal {
},
AddTorrentResponse::Added(handle) => {
let details = make_torrent_details(
&handle.torrent_state().info_hash(),
handle.torrent_state().info(),
handle.only_files(),
&handle.info_hash(),
&handle.info().info,
handle.only_files().as_deref(),
)
.context("error making torrent details")?;
let id = self.add_torrent_handle(handle);
@ -451,8 +454,9 @@ impl ApiInternal {
fn api_stats(&self, idx: usize) -> Result<StatsResponse> {
let mgr = self.mgr_handle(idx)?;
let snapshot = mgr.torrent_state().stats_snapshot();
let estimator = mgr.speed_estimator();
let live = mgr.live().context("not live")?;
let snapshot = live.stats_snapshot();
let estimator = live.speed_estimator();
// Poor mans download speed computation
let elapsed = self.startup_time.elapsed();
@ -469,14 +473,15 @@ impl ApiInternal {
}
fn api_dump_haves(&self, idx: usize) -> Result<String> {
let mgr = self.mgr_handle(idx)?;
Ok(format!(
"{:?}",
mgr.torrent_state()
.lock_read("api_dump_haves")
.chunks
.get_have_pieces(),
))
Err(anyhow::anyhow!("not implemented").into())
// let mgr = self.mgr_handle(idx)?;
// Ok(format!(
// "{:?}",
// mgr.live().conetext()
// .lock_read("api_dump_haves")
// .chunks
// .get_have_pieces(),
// ))
}
}

View file

@ -8,7 +8,7 @@ pub mod peer_connection;
pub mod peer_info_reader;
pub mod session;
pub mod spawn_utils;
pub mod torrent_manager;
// pub mod torrent_manager;
pub mod torrent_state;
pub mod tracker_comms;
pub mod type_aliases;

View file

@ -17,43 +17,28 @@ use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
peer_connection::PeerConnectionOptions,
spawn_utils::{spawn, BlockingSpawner},
torrent_manager::{TorrentManagerBuilder, TorrentManagerHandle},
torrent_state::{ManagedTorrent, ManagedTorrentBuilder, ManagedTorrentHandle},
};
pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
#[derive(Clone)]
pub enum ManagedTorrentState {
Initializing,
Running(TorrentManagerHandle),
}
#[derive(Clone)]
pub struct ManagedTorrent {
pub info_hash: Id20,
pub output_folder: PathBuf,
pub state: ManagedTorrentState,
}
impl PartialEq for ManagedTorrent {
fn eq(&self, other: &Self) -> bool {
self.info_hash == other.info_hash && self.output_folder == other.output_folder
}
}
#[derive(Default)]
pub struct SessionLocked {
torrents: Vec<ManagedTorrent>,
torrents: Vec<ManagedTorrentHandle>,
}
enum SessionLockedAddTorrentResult {
AlreadyManaged(ManagedTorrent),
AlreadyManaged(ManagedTorrentHandle),
Added(usize),
}
impl SessionLocked {
fn add_torrent(&mut self, torrent: ManagedTorrent) -> SessionLockedAddTorrentResult {
if let Some(handle) = self.torrents.iter().find(|t| **t == torrent) {
fn add_torrent(&mut self, torrent: ManagedTorrentHandle) -> SessionLockedAddTorrentResult {
if let Some(handle) = self
.torrents
.iter()
.find(|t| t.info_hash() == torrent.info_hash())
{
return SessionLockedAddTorrentResult::AlreadyManaged(handle.clone());
}
let idx = self.torrents.len();
@ -124,9 +109,9 @@ pub struct ListOnlyResponse {
}
pub enum AddTorrentResponse {
AlreadyManaged(ManagedTorrent),
AlreadyManaged(ManagedTorrentHandle),
ListOnly(ListOnlyResponse),
Added(TorrentManagerHandle),
Added(ManagedTorrentHandle),
}
pub fn read_local_file_including_stdin(filename: &str) -> anyhow::Result<Vec<u8>> {
@ -227,7 +212,7 @@ impl Session {
}
pub fn with_torrents<F>(&self, callback: F)
where
F: Fn(&[ManagedTorrent]),
F: Fn(&[ManagedTorrentHandle]),
{
callback(&self.locked.read().torrents)
}
@ -404,24 +389,13 @@ impl Session {
.unwrap_or_else(|| self.output_folder.clone())
.join(sub_folder);
let managed_torrent = ManagedTorrent {
info_hash,
output_folder: output_folder.clone(),
state: ManagedTorrentState::Initializing,
};
match self.locked.write().add_torrent(managed_torrent) {
SessionLockedAddTorrentResult::AlreadyManaged(managed) => {
return Ok(AddTorrentResponse::AlreadyManaged(managed))
}
SessionLockedAddTorrentResult::Added(_) => {}
}
let mut builder = TorrentManagerBuilder::new(info, info_hash, output_folder.clone());
let mut builder = ManagedTorrentBuilder::new(info, info_hash, output_folder.clone());
builder
.overwrite(opts.overwrite)
.spawner(self.spawner)
.peer_id(self.peer_id);
.peer_id(self.peer_id)
.trackers(trackers);
if let Some(only_files) = only_files {
builder.only_files(only_files);
}
@ -437,51 +411,22 @@ impl Session {
builder.peer_read_write_timeout(t);
}
let handle = match builder
.start_manager()
.context("error starting torrent manager")
{
Ok(handle) => {
let mut g = self.locked.write();
let m = g
.torrents
.iter_mut()
.find(|t| t.info_hash == info_hash && t.output_folder == output_folder)
.unwrap();
m.state = ManagedTorrentState::Running(handle.clone());
handle
let managed_torrent = builder.build();
match self.locked.write().add_torrent(managed_torrent.clone()) {
SessionLockedAddTorrentResult::AlreadyManaged(managed) => {
return Ok(AddTorrentResponse::AlreadyManaged(managed))
}
Err(error) => {
let mut g = self.locked.write();
let idx = g
.torrents
.iter()
.position(|t| t.info_hash == info_hash && t.output_folder == output_folder)
.unwrap();
g.torrents.remove(idx);
return Err(error);
}
};
{
let mut g = self.locked.write();
let m = g
.torrents
.iter_mut()
.find(|t| t.info_hash == info_hash && t.output_folder == output_folder)
.unwrap();
m.state = ManagedTorrentState::Running(handle.clone());
SessionLockedAddTorrentResult::Added(_) => {}
}
for url in trackers {
handle.add_tracker(url);
}
for peer in initial_peers {
handle.add_peer(peer);
managed_torrent.add_peer(peer);
}
if let Some(mut dht_peer_rx) = dht_peer_rx {
spawn(span!(Level::INFO, "dht_peer_adder"), {
let handle = handle.clone();
let handle = managed_torrent.clone();
async move {
while let Some(peer) = dht_peer_rx.next().await {
handle.add_peer(peer);
@ -492,6 +437,6 @@ impl Session {
});
}
Ok(AddTorrentResponse::Added(handle))
Ok(AddTorrentResponse::Added(managed_torrent))
}
}

View file

@ -24,128 +24,10 @@ use crate::{
chunk_tracker::ChunkTracker,
file_ops::FileOps,
spawn_utils::{spawn, BlockingSpawner},
torrent_state::{TorrentStateLive, TorrentStateOptions},
torrent_state::{ManagedTorrent, ManagedTorrentHandle, TorrentStateLive, TorrentStateOptions},
tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse},
};
#[derive(Default)]
struct TorrentManagerOptions {
force_tracker_interval: Option<Duration>,
peer_connect_timeout: Option<Duration>,
peer_read_write_timeout: Option<Duration>,
only_files: Option<Vec<usize>>,
peer_id: Option<Id20>,
overwrite: bool,
}
pub struct TorrentManagerBuilder {
info: TorrentMetaV1Info<ByteString>,
info_hash: Id20,
output_folder: PathBuf,
options: TorrentManagerOptions,
spawner: Option<BlockingSpawner>,
}
impl TorrentManagerBuilder {
pub fn new<P: AsRef<Path>>(
info: TorrentMetaV1Info<ByteString>,
info_hash: Id20,
output_folder: P,
) -> Self {
Self {
info,
info_hash,
output_folder: output_folder.as_ref().into(),
spawner: None,
options: TorrentManagerOptions::default(),
}
}
pub fn only_files(&mut self, only_files: Vec<usize>) -> &mut Self {
self.options.only_files = Some(only_files);
self
}
pub fn overwrite(&mut self, overwrite: bool) -> &mut Self {
self.options.overwrite = overwrite;
self
}
pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self {
self.options.force_tracker_interval = Some(force_tracker_interval);
self
}
pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self {
self.spawner = Some(spawner);
self
}
pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self {
self.options.peer_id = Some(peer_id);
self
}
pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self {
self.options.peer_connect_timeout = Some(timeout);
self
}
pub fn peer_read_write_timeout(&mut self, timeout: Duration) -> &mut Self {
self.options.peer_read_write_timeout = Some(timeout);
self
}
pub fn start_manager(self) -> anyhow::Result<TorrentManagerHandle> {
TorrentManager::start(
self.info,
self.info_hash,
self.output_folder,
self.spawner.unwrap_or_else(|| BlockingSpawner::new(true)),
Some(self.options),
)
}
}
#[derive(Clone)]
pub struct TorrentManagerHandle {
manager: Arc<TorrentManager>,
}
impl TorrentManagerHandle {
pub fn add_tracker(&self, url: Url) -> bool {
let mgr = self.manager.clone();
if mgr.trackers.lock().insert(url.clone()) {
spawn(
span!(Level::ERROR, "tracker_monitor", url = url.to_string()),
async move { mgr.single_tracker_monitor(url).await },
);
true
} else {
false
}
}
pub fn only_files(&self) -> Option<&[usize]> {
self.manager.options.only_files.as_deref()
}
pub fn add_peer(&self, addr: SocketAddr) -> bool {
self.manager.state.add_peer_if_not_seen(addr)
}
pub fn torrent_state(&self) -> &TorrentStateLive {
&self.manager.state
}
pub fn speed_estimator(&self) -> &Arc<SpeedEstimator> {
&self.manager.speed_estimator
}
pub async fn cancel(&self) -> anyhow::Result<()> {
todo!()
}
pub async fn wait_until_completed(&self) -> anyhow::Result<()> {
self.manager.state.wait_until_completed().await;
Ok(())
}
}
struct TorrentManager {
state: Arc<TorrentStateLive>,
#[allow(dead_code)]
@ -171,8 +53,8 @@ impl TorrentManager {
info_hash: Id20,
out: P,
spawner: BlockingSpawner,
options: Option<TorrentManagerOptions>,
) -> anyhow::Result<TorrentManagerHandle> {
options: Option<ManagedTorrentOptions>,
) -> anyhow::Result<ManagedTorrentHandle> {
let options = options.unwrap_or_default();
let (files, filenames) = {
let mut files =

View file

@ -63,6 +63,7 @@ use futures::{stream::FuturesUnordered, StreamExt};
use librqbit_core::{
id20::Id20,
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
speed_estimator::{self, SpeedEstimator},
torrent_metainfo::TorrentMetaV1Info,
};
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
@ -144,6 +145,8 @@ pub struct TorrentStateLive {
peer_queue_tx: UnboundedSender<SocketAddr>,
finished_notify: Notify,
speed_estimator: SpeedEstimator,
}
impl TorrentStateLive {
@ -163,6 +166,9 @@ impl TorrentStateLive {
) -> Arc<Self> {
let options = options.unwrap_or_default();
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
let speed_estimator = SpeedEstimator::new(5);
let state = Arc::new(TorrentStateLive {
info_hash,
info,
@ -186,6 +192,7 @@ impl TorrentStateLive {
peer_semaphore: Semaphore::new(128),
peer_queue_tx,
finished_notify: Notify::new(),
speed_estimator,
});
spawn(
span!(Level::ERROR, "peer_adder"),
@ -194,6 +201,10 @@ impl TorrentStateLive {
state
}
pub fn speed_estimator(&self) -> &SpeedEstimator {
&self.speed_estimator
}
async fn task_manage_peer(
self: Arc<Self>,
addr: SocketAddr,

View file

@ -2,36 +2,188 @@ pub mod utils;
pub mod live;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::{collections::HashSet, path::Path};
use anyhow::Context;
use buffers::ByteString;
use librqbit_core::id20::Id20;
use librqbit_core::speed_estimator::SpeedEstimator;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
pub use live::*;
use parking_lot::RwLock;
use tokio::sync::mpsc::Sender;
use tracing::trace_span;
use url::Url;
pub(crate) enum ManagedTorrentState {
Live {
state: TorrentStateLive,
only_files_tx: Sender<Vec<usize>>,
trackers_tx: Sender<Url>,
},
use crate::spawn_utils::{spawn, BlockingSpawner};
pub struct TorrentStateInitializing {}
#[derive(Default, Clone)]
pub enum ManagedTorrentState {
#[default]
Created,
Initializing(Arc<TorrentStateInitializing>),
// TODO: only_files_tx
// TODO: trackers_tx??
Live(Arc<TorrentStateLive>),
}
pub(crate) struct ManagedTorrentLocked {
pub trackers: Vec<Url>,
pub only_files: Vec<usize>,
pub only_files: Option<Vec<usize>>,
pub state: ManagedTorrentState,
}
pub struct ManagedTorrentInfo {
pub info: TorrentMetaV1Info<ByteString>,
pub info_hash: Id20,
pub out_dir: PathBuf,
pub spawner: BlockingSpawner,
pub trackers: Vec<Url>,
// pub options: Option<ManagedTorrentOptions>,
}
pub(crate) struct ManagedTorrent {
pub struct ManagedTorrent {
pub info: Arc<ManagedTorrentInfo>,
pub(crate) locked: RwLock<ManagedTorrentLocked>,
locked: RwLock<ManagedTorrentLocked>,
}
impl ManagedTorrent {
pub fn info(&self) -> &ManagedTorrentInfo {
&self.info
}
pub fn info_hash(&self) -> Id20 {
self.info.info_hash
}
pub(crate) fn add_peer(&self, peer: SocketAddr) -> bool {
todo!()
}
pub fn only_files(&self) -> Option<Vec<usize>> {
self.locked.write().only_files.clone()
}
pub fn state(&self) -> ManagedTorrentState {
self.locked.read().state.clone()
}
pub fn live(&self) -> Option<Arc<TorrentStateLive>> {
let g = self.locked.read();
match &g.state {
ManagedTorrentState::Live(live) => Some(live.clone()),
_ => None,
}
}
pub async fn wait_until_completed(&self) -> anyhow::Result<()> {
// TODO: rewrite
self.live()
.context("torrent isn't live")?
.wait_until_completed()
.await;
Ok(())
}
}
pub struct ManagedTorrentBuilder {
info: TorrentMetaV1Info<ByteString>,
info_hash: Id20,
output_folder: PathBuf,
force_tracker_interval: Option<Duration>,
peer_connect_timeout: Option<Duration>,
peer_read_write_timeout: Option<Duration>,
only_files: Option<Vec<usize>>,
trackers: Vec<Url>,
peer_id: Option<Id20>,
overwrite: bool,
spawner: Option<BlockingSpawner>,
}
impl ManagedTorrentBuilder {
pub fn new<P: AsRef<Path>>(
info: TorrentMetaV1Info<ByteString>,
info_hash: Id20,
output_folder: P,
) -> Self {
Self {
info,
info_hash,
output_folder: output_folder.as_ref().into(),
spawner: None,
force_tracker_interval: None,
peer_connect_timeout: None,
peer_read_write_timeout: None,
only_files: None,
trackers: Default::default(),
peer_id: None,
overwrite: false,
}
}
pub fn only_files(&mut self, only_files: Vec<usize>) -> &mut Self {
self.only_files = Some(only_files);
self
}
pub fn trackers(&mut self, trackers: Vec<Url>) -> &mut Self {
self.trackers = trackers;
self
}
pub fn overwrite(&mut self, overwrite: bool) -> &mut Self {
self.overwrite = overwrite;
self
}
pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self {
self.force_tracker_interval = Some(force_tracker_interval);
self
}
pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self {
self.spawner = Some(spawner);
self
}
pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self {
self.peer_id = Some(peer_id);
self
}
pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self {
self.peer_connect_timeout = Some(timeout);
self
}
pub fn peer_read_write_timeout(&mut self, timeout: Duration) -> &mut Self {
self.peer_read_write_timeout = Some(timeout);
self
}
pub(crate) fn build(self) -> ManagedTorrentHandle {
Arc::new(ManagedTorrent {
locked: RwLock::new(ManagedTorrentLocked {
only_files: self.only_files,
state: Default::default(),
}),
info: Arc::new(ManagedTorrentInfo {
info: self.info,
info_hash: self.info_hash,
out_dir: self.output_folder,
trackers: self.trackers.into_iter().collect(),
spawner: self.spawner.unwrap_or_default(),
// options: Some(self.options),
}),
})
}
}
pub type ManagedTorrentHandle = Arc<ManagedTorrent>;

View file

@ -7,10 +7,11 @@ use librqbit::{
http_api_client,
peer_connection::PeerConnectionOptions,
session::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, ManagedTorrentState,
Session, SessionOptions,
AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session,
SessionOptions,
},
spawn_utils::{spawn, BlockingSpawner},
torrent_state::ManagedTorrentState,
};
use size_format::SizeFormatterBinary as SF;
use tracing::{error, info, span, warn, Level};
@ -238,12 +239,12 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
loop {
session.with_torrents(|torrents| {
for (idx, torrent) in torrents.iter().enumerate() {
match &torrent.state {
ManagedTorrentState::Initializing => {
match torrent.state() {
ManagedTorrentState::Initializing(_) => {
info!("[{}] initializing", idx);
},
ManagedTorrentState::Running(handle) => {
let stats = handle.torrent_state().stats_snapshot();
ManagedTorrentState::Live(handle) => {
let stats = handle.stats_snapshot();
let speed = handle.speed_estimator();
let total = stats.total_bytes;
let progress = stats.total_bytes - stats.remaining_bytes;
@ -269,6 +270,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
stats.peer_stats.dead,
);
},
ManagedTorrentState::Created => warn!("the torrent was just created, but not initializing"),
}
}
});
@ -394,7 +396,8 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
AddTorrentResponse::AlreadyManaged(handle) => {
info!(
"torrent {:?} is already managed, downloaded to {:?}",
handle.info_hash, handle.output_folder
handle.info_hash(),
handle.info().out_dir
);
continue;
}