Should be working ok now

This commit is contained in:
Igor Katson 2023-11-24 14:08:02 +00:00
parent 5e728fc67b
commit afbf2a76b9
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
8 changed files with 197 additions and 105 deletions

View file

@ -18,4 +18,10 @@
- [ ] it's sending many requests now way too fast, locks up Mac OS UI annoyingly - [ ] it's sending many requests now way too fast, locks up Mac OS UI annoyingly
someday: someday:
- [ ] cancellation from the client-side for the lib (i.e. stop the torrent manager) - [ ] cancellation from the client-side for the lib (i.e. stop the torrent manager)
refactor:
- [ ] where are peers stored
- [ ] http api pause/unpause etc
- [ ] when a live torrent fails writing to disk, it should transition to error state

View file

@ -76,14 +76,6 @@ impl ChunkTracker {
} }
} }
pub fn get_have_bytes(&self) -> u64 {
todo!()
}
pub fn get_needed_bytes(&self) -> u64 {
todo!()
}
pub fn get_lengths(&self) -> &Lengths { pub fn get_lengths(&self) -> &Lengths {
&self.lengths &self.lengths
} }

View file

@ -1,4 +1,4 @@
use anyhow::{Context}; use anyhow::Context;
use axum::body::Bytes; use axum::body::Bytes;
use axum::extract::{Path, Query, State}; use axum::extract::{Path, Query, State};
use axum::response::IntoResponse; use axum::response::IntoResponse;
@ -20,7 +20,7 @@ use axum::Router;
use crate::http_api_error::{ApiError, ApiErrorExt}; use crate::http_api_error::{ApiError, ApiErrorExt};
use crate::session::{ use crate::session::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId,
}; };
use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}; use crate::torrent_state::peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot};
use crate::torrent_state::stats::snapshot::StatsSnapshot; use crate::torrent_state::stats::snapshot::StatsSnapshot;
@ -38,9 +38,6 @@ impl HttpApi {
inner: Arc::new(ApiInternal::new(session)), inner: Arc::new(ApiInternal::new(session)),
} }
} }
pub fn add_torrent_handle(&self, handle: ManagedTorrentHandle) -> usize {
self.inner.add_torrent_handle(handle)
}
pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> {
let state = self.inner; let state = self.inner;
@ -331,9 +328,7 @@ impl TorrentAddQueryParams {
// Private HTTP API internals. Agnostic of web framework. // Private HTTP API internals. Agnostic of web framework.
struct ApiInternal { struct ApiInternal {
dht: Option<Dht>,
startup_time: Instant, startup_time: Instant,
torrent_managers: RwLock<Vec<ManagedTorrentHandle>>,
session: Arc<Session>, session: Arc<Session>,
} }
@ -342,41 +337,29 @@ type ApiState = Arc<ApiInternal>;
impl ApiInternal { impl ApiInternal {
pub fn new(session: Arc<Session>) -> Self { pub fn new(session: Arc<Session>) -> Self {
Self { Self {
dht: session.get_dht(),
startup_time: Instant::now(), startup_time: Instant::now(),
torrent_managers: RwLock::new(Vec::new()),
session, session,
} }
} }
fn add_torrent_handle(&self, handle: ManagedTorrentHandle) -> usize { fn mgr_handle(&self, idx: TorrentId) -> Result<ManagedTorrentHandle> {
let mut g = self.torrent_managers.write(); self.session
let idx = g.len();
g.push(handle);
idx
}
fn mgr_handle(&self, idx: usize) -> Result<ManagedTorrentHandle> {
self.torrent_managers
.read()
.get(idx) .get(idx)
.cloned()
.ok_or(ApiError::torrent_not_found(idx)) .ok_or(ApiError::torrent_not_found(idx))
} }
fn api_torrent_list(&self) -> TorrentListResponse { fn api_torrent_list(&self) -> TorrentListResponse {
TorrentListResponse { let items = self.session.with_torrents(|torrents| {
torrents: self torrents
.torrent_managers
.read()
.iter() .iter()
.enumerate() .enumerate()
.map(|(id, mgr)| TorrentListResponseItem { .map(|(id, mgr)| TorrentListResponseItem {
id, id,
info_hash: mgr.info().info_hash.as_string(), info_hash: mgr.info().info_hash.as_string(),
}) })
.collect(), .collect()
} });
TorrentListResponse { torrents: items }
} }
fn api_torrent_details(&self, idx: usize) -> Result<TorrentDetailsResponse> { fn api_torrent_details(&self, idx: usize) -> Result<TorrentDetailsResponse> {
@ -406,10 +389,11 @@ impl ApiInternal {
.context("error adding torrent") .context("error adding torrent")
.with_error_status_code(StatusCode::BAD_REQUEST)? .with_error_status_code(StatusCode::BAD_REQUEST)?
{ {
AddTorrentResponse::AlreadyManaged(managed) => { AddTorrentResponse::AlreadyManaged(id, managed) => {
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
"{:?} is already managed, downloaded to {:?}", "{:?} is already managed, id={}, downloaded to {:?}",
managed.info_hash(), managed.info_hash(),
id,
&managed.info().out_dir &managed.info().out_dir
)) ))
.with_error_status_code(StatusCode::CONFLICT); .with_error_status_code(StatusCode::CONFLICT);
@ -423,14 +407,13 @@ impl ApiInternal {
details: make_torrent_details(&info_hash, &info, only_files.as_deref()) details: make_torrent_details(&info_hash, &info, only_files.as_deref())
.context("error making torrent details")?, .context("error making torrent details")?,
}, },
AddTorrentResponse::Added(handle) => { AddTorrentResponse::Added(id, handle) => {
let details = make_torrent_details( let details = make_torrent_details(
&handle.info_hash(), &handle.info_hash(),
&handle.info().info, &handle.info().info,
handle.only_files().as_deref(), handle.only_files().as_deref(),
) )
.context("error making torrent details")?; .context("error making torrent details")?;
let id = self.add_torrent_handle(handle);
ApiAddTorrentResponse { ApiAddTorrentResponse {
id: Some(id), id: Some(id),
details, details,
@ -441,14 +424,15 @@ impl ApiInternal {
} }
fn api_dht_stats(&self) -> Result<DhtStats> { fn api_dht_stats(&self) -> Result<DhtStats> {
self.dht self.session
.get_dht()
.as_ref() .as_ref()
.map(|d| d.stats()) .map(|d| d.stats())
.ok_or(ApiError::dht_disabled()) .ok_or(ApiError::dht_disabled())
} }
fn api_dht_table(&self) -> Result<impl Serialize> { fn api_dht_table(&self) -> Result<impl Serialize> {
let dht = self.dht.as_ref().ok_or(ApiError::dht_disabled())?; let dht = self.session.get_dht().ok_or(ApiError::dht_disabled())?;
Ok(dht.with_routing_table(|r| r.clone())) Ok(dht.with_routing_table(|r| r.clone()))
} }

View file

@ -11,39 +11,29 @@ use librqbit_core::{
use parking_lot::RwLock; use parking_lot::RwLock;
use reqwest::Url; use reqwest::Url;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tracing::{debug, info, span, warn, Level}; use tracing::{debug, info, trace_span, warn};
use crate::{ use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
peer_connection::PeerConnectionOptions, peer_connection::PeerConnectionOptions,
spawn_utils::{spawn, BlockingSpawner}, spawn_utils::BlockingSpawner,
torrent_state::{ManagedTorrentBuilder, ManagedTorrentHandle}, torrent_state::{ManagedTorrentBuilder, ManagedTorrentHandle},
}; };
pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
pub type TorrentId = usize;
#[derive(Default)] #[derive(Default)]
pub struct SessionLocked { pub struct SessionLocked {
torrents: Vec<ManagedTorrentHandle>, torrents: Vec<ManagedTorrentHandle>,
} }
enum SessionLockedAddTorrentResult {
AlreadyManaged(ManagedTorrentHandle),
Added(usize),
}
impl SessionLocked { impl SessionLocked {
fn add_torrent(&mut self, torrent: ManagedTorrentHandle) -> SessionLockedAddTorrentResult { fn add_torrent(&mut self, torrent: ManagedTorrentHandle) -> TorrentId {
if let Some(handle) = self
.torrents
.iter()
.find(|t| t.info_hash() == torrent.info_hash())
{
return SessionLockedAddTorrentResult::AlreadyManaged(handle.clone());
}
let idx = self.torrents.len(); let idx = self.torrents.len();
self.torrents.push(torrent); self.torrents.push(torrent);
SessionLockedAddTorrentResult::Added(idx) idx
} }
} }
@ -109,9 +99,9 @@ pub struct ListOnlyResponse {
} }
pub enum AddTorrentResponse { pub enum AddTorrentResponse {
AlreadyManaged(ManagedTorrentHandle), AlreadyManaged(TorrentId, ManagedTorrentHandle),
ListOnly(ListOnlyResponse), ListOnly(ListOnlyResponse),
Added(ManagedTorrentHandle), Added(TorrentId, ManagedTorrentHandle),
} }
pub fn read_local_file_including_stdin(filename: &str) -> anyhow::Result<Vec<u8>> { pub fn read_local_file_including_stdin(filename: &str) -> anyhow::Result<Vec<u8>> {
@ -207,15 +197,14 @@ impl Session {
locked: RwLock::new(SessionLocked::default()), locked: RwLock::new(SessionLocked::default()),
}) })
} }
pub fn get_dht(&self) -> Option<Dht> { pub fn get_dht(&self) -> Option<&Dht> {
self.dht.clone() self.dht.as_ref()
} }
pub fn with_torrents<F>(&self, callback: F)
where pub fn with_torrents<R>(&self, callback: impl Fn(&[ManagedTorrentHandle]) -> R) -> R {
F: Fn(&[ManagedTorrentHandle]),
{
callback(&self.locked.read().torrents) callback(&self.locked.read().torrents)
} }
pub async fn add_torrent( pub async fn add_torrent(
&self, &self,
add: impl Into<AddTorrent<'_>>, add: impl Into<AddTorrent<'_>>,
@ -411,32 +400,37 @@ impl Session {
builder.peer_read_write_timeout(t); builder.peer_read_write_timeout(t);
} }
let managed_torrent = builder.build(); let (managed_torrent, id) = {
let mut g = self.locked.write();
match self.locked.write().add_torrent(managed_torrent.clone()) { if let Some((id, handle)) = g
SessionLockedAddTorrentResult::AlreadyManaged(managed) => { .torrents
return Ok(AddTorrentResponse::AlreadyManaged(managed)) .iter()
.enumerate()
.find(|(_, t)| t.info_hash() == info_hash)
{
return Ok(AddTorrentResponse::AlreadyManaged(id, handle.clone()));
} }
SessionLockedAddTorrentResult::Added(_) => {} let managed_torrent = builder.build();
let id = g.add_torrent(managed_torrent.clone());
(managed_torrent, id)
};
{
let span = trace_span!("torrent", id = id);
let _ = span.enter();
managed_torrent
.start(initial_peers, dht_peer_rx)
.context("error starting torrent")?;
} }
for peer in initial_peers { Ok(AddTorrentResponse::Added(id, managed_torrent))
managed_torrent.add_peer(peer); }
}
if let Some(mut dht_peer_rx) = dht_peer_rx { pub fn get(&self, id: TorrentId) -> Option<ManagedTorrentHandle> {
spawn(span!(Level::INFO, "dht_peer_adder"), { self.locked.read().torrents.get(id).cloned()
let handle = managed_torrent.clone(); }
async move {
while let Some(peer) = dht_peer_rx.next().await {
handle.add_peer(peer);
}
warn!("dht was closed");
Ok(())
}
});
}
Ok(AddTorrentResponse::Added(managed_torrent)) pub fn restart(&self, id: usize) -> anyhow::Result<()> {
todo!()
} }
} }

View file

@ -29,8 +29,8 @@ fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> {
} }
pub struct TorrentStateInitializing { pub struct TorrentStateInitializing {
meta: Arc<ManagedTorrentInfo>, pub(crate) meta: Arc<ManagedTorrentInfo>,
only_files: Option<Vec<usize>>, pub(crate) only_files: Option<Vec<usize>>,
} }
impl TorrentStateInitializing { impl TorrentStateInitializing {

View file

@ -64,7 +64,7 @@ use futures::{stream::FuturesUnordered, StreamExt};
use librqbit_core::{ use librqbit_core::{
id20::Id20, id20::Id20,
lengths::{ChunkInfo, Lengths, ValidPieceIndex}, lengths::{ChunkInfo, Lengths, ValidPieceIndex},
speed_estimator::{SpeedEstimator}, speed_estimator::SpeedEstimator,
torrent_metainfo::TorrentMetaV1Info, torrent_metainfo::TorrentMetaV1Info,
}; };
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
@ -88,7 +88,7 @@ use crate::{
peer_connection::{ peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
}, },
spawn_utils::{spawn}, spawn_utils::spawn,
tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse},
type_aliases::{PeerHandle, BF}, type_aliases::{PeerHandle, BF},
}; };
@ -157,14 +157,13 @@ pub struct TorrentStateLive {
} }
impl TorrentStateLive { impl TorrentStateLive {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(paused: TorrentStatePaused) -> Arc<Self> { pub(crate) fn new(paused: TorrentStatePaused) -> Arc<Self> {
let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
let speed_estimator = SpeedEstimator::new(5); let speed_estimator = SpeedEstimator::new(5);
let have_bytes = paused.chunk_tracker.get_have_bytes(); let have_bytes = paused.have_bytes;
let needed_bytes = paused.chunk_tracker.get_needed_bytes(); let needed_bytes = paused.needed_bytes;
let lengths = *paused.chunk_tracker.get_lengths(); let lengths = *paused.chunk_tracker.get_lengths();
let state = Arc::new(TorrentStateLive { let state = Arc::new(TorrentStateLive {
@ -560,6 +559,10 @@ impl TorrentStateLive {
} }
self.finished_notify.notified().await; self.finished_notify.notified().await;
} }
pub fn pause(&self) -> anyhow::Result<TorrentStatePaused> {
bail!("pause not implemented yet")
}
} }
struct PeerHandlerLocked { struct PeerHandlerLocked {

View file

@ -4,11 +4,12 @@ pub mod paused;
pub mod utils; pub mod utils;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{path::Path};
use anyhow::bail;
use anyhow::Context; use anyhow::Context;
use buffers::ByteString; use buffers::ByteString;
use librqbit_core::id20::Id20; use librqbit_core::id20::Id20;
@ -18,20 +19,38 @@ use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
pub use live::*; pub use live::*;
use parking_lot::RwLock; use parking_lot::RwLock;
use tokio_stream::StreamExt;
use tracing::trace_span;
use url::Url; use url::Url;
use crate::spawn_utils::{BlockingSpawner}; use crate::spawn_utils::spawn;
use crate::spawn_utils::BlockingSpawner;
use initializing::TorrentStateInitializing; use initializing::TorrentStateInitializing;
use self::paused::TorrentStatePaused; use self::paused::TorrentStatePaused;
pub enum ManagedTorrentState { pub enum ManagedTorrentState {
Initializing(TorrentStateInitializing), Initializing(Arc<TorrentStateInitializing>),
Paused(TorrentStatePaused), Paused(TorrentStatePaused),
Live(Arc<TorrentStateLive>), Live(Arc<TorrentStateLive>),
Error(anyhow::Error), Error(anyhow::Error),
// This is used when swapping between states, outside world should never see it.
None,
}
impl ManagedTorrentState {
fn assert_paused(self) -> TorrentStatePaused {
match self {
Self::Paused(paused) => paused,
_ => panic!("Expected paused state"),
}
}
fn take(&mut self) -> Self {
std::mem::replace(self, Self::None)
}
} }
pub(crate) struct ManagedTorrentLocked { pub(crate) struct ManagedTorrentLocked {
@ -58,6 +77,7 @@ pub struct ManagedTorrentInfo {
pub struct ManagedTorrent { pub struct ManagedTorrent {
pub info: Arc<ManagedTorrentInfo>, pub info: Arc<ManagedTorrentInfo>,
only_files: Option<Vec<usize>>,
locked: RwLock<ManagedTorrentLocked>, locked: RwLock<ManagedTorrentLocked>,
} }
@ -70,13 +90,8 @@ impl ManagedTorrent {
self.info.info_hash self.info.info_hash
} }
pub(crate) fn add_peer(&self, _peer: SocketAddr) -> bool {
todo!()
}
pub fn only_files(&self) -> Option<Vec<usize>> { pub fn only_files(&self) -> Option<Vec<usize>> {
// self.locked.write().only_files.clone() self.only_files.clone()
todo!()
} }
pub fn with_state<R>(&self, f: impl FnOnce(&ManagedTorrentState) -> R) -> R { pub fn with_state<R>(&self, f: impl FnOnce(&ManagedTorrentState) -> R) -> R {
@ -91,6 +106,91 @@ impl ManagedTorrent {
} }
} }
pub fn start(
self: &Arc<Self>,
initial_peers: Vec<SocketAddr>,
peer_rx: Option<impl StreamExt<Item = SocketAddr> + Unpin + Send + Sync + 'static>,
) -> anyhow::Result<()> {
let mut g = self.locked.write();
match &g.state {
ManagedTorrentState::Live(_) => {
bail!("torrent is already live");
}
ManagedTorrentState::Initializing(init) => {
let init = init.clone();
let t = self.clone();
spawn(trace_span!("initialize_and_start"), async move {
match init.check().await {
Ok(paused) => {
let live = TorrentStateLive::new(paused);
t.locked.write().state = ManagedTorrentState::Live(live.clone());
let live = Arc::downgrade(&live);
spawn(trace_span!("peer_adder"), async move {
{
let live: Arc<TorrentStateLive> =
live.upgrade().context("no longer live")?;
for peer in initial_peers {
live.add_peer_if_not_seen(peer);
}
}
if let Some(mut peer_rx) = peer_rx {
while let Some(peer) = peer_rx.next().await {
live.upgrade()
.context("no longer live")?
.add_peer_if_not_seen(peer);
}
}
Ok(())
});
Ok(())
}
Err(err) => {
let result = anyhow::anyhow!("{:?}", err);
t.locked.write().state = ManagedTorrentState::Error(err);
Err(result)
}
}
});
Ok(())
}
ManagedTorrentState::Paused(_) => {
let paused = g.state.take().assert_paused();
let live = TorrentStateLive::new(paused);
g.state = ManagedTorrentState::Live(live);
Ok(())
}
ManagedTorrentState::Error(_) => {
bail!("starting torrents from error state not implemented")
}
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
}
}
pub fn pause(&self) -> anyhow::Result<()> {
let mut g = self.locked.write();
match &g.state {
ManagedTorrentState::Live(live) => {
let paused = live.pause()?;
g.state = ManagedTorrentState::Paused(paused);
Ok(())
}
ManagedTorrentState::Initializing(_) => {
bail!("torrent is initializing, can't pause");
}
ManagedTorrentState::Paused(_) => {
bail!("torrent is already paused");
}
ManagedTorrentState::Error(_) => {
bail!("can't pause torrent in error state")
}
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
}
}
pub async fn wait_until_completed(&self) -> anyhow::Result<()> { pub async fn wait_until_completed(&self) -> anyhow::Result<()> {
// TODO: rewrite // TODO: rewrite
self.live() self.live()
@ -191,8 +291,12 @@ impl ManagedTorrentBuilder {
overwrite: self.overwrite, overwrite: self.overwrite,
}, },
}); });
let initializing = TorrentStateInitializing::new(info.clone(), self.only_files); let initializing = Arc::new(TorrentStateInitializing::new(
info.clone(),
self.only_files.clone(),
));
Arc::new(ManagedTorrent { Arc::new(ManagedTorrent {
only_files: self.only_files,
locked: RwLock::new(ManagedTorrentLocked { locked: RwLock::new(ManagedTorrentLocked {
state: ManagedTorrentState::Initializing(initializing), state: ManagedTorrentState::Initializing(initializing),
}), }),

View file

@ -14,3 +14,12 @@ pub struct TorrentStatePaused {
pub(crate) have_bytes: u64, pub(crate) have_bytes: u64,
pub(crate) needed_bytes: u64, pub(crate) needed_bytes: u64,
} }
// impl TorrentStatePaused {
// pub fn get_have_bytes(&self) -> u64 {
// self.have_bytes
// }
// pub fn get_needed_bytes(&self) -> u64 {
// self.needed_bytes
// }
// }