commit
b7ed850918
23 changed files with 363 additions and 394 deletions
|
|
@ -20,7 +20,7 @@ struct CustomStorage {
|
||||||
impl StorageFactory for CustomStorageFactory {
|
impl StorageFactory for CustomStorageFactory {
|
||||||
type Storage = CustomStorage;
|
type Storage = CustomStorage;
|
||||||
|
|
||||||
fn create(&self, _info: &librqbit::ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
|
fn create(&self, _info: &librqbit::ManagedTorrentShared) -> anyhow::Result<Self::Storage> {
|
||||||
Ok(CustomStorage::default())
|
Ok(CustomStorage::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -54,7 +54,7 @@ impl TorrentStorage for CustomStorage {
|
||||||
anyhow::bail!("not implemented")
|
anyhow::bail!("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init(&mut self, _meta: &librqbit::ManagedTorrentInfo) -> anyhow::Result<()> {
|
fn init(&mut self, _meta: &librqbit::ManagedTorrentShared) -> anyhow::Result<()> {
|
||||||
anyhow::bail!("not implemented")
|
anyhow::bail!("not implemented")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ async fn main() -> Result<(), anyhow::Error> {
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Details: {:?}", &handle.info().info);
|
info!("Details: {:?}", &handle.shared().info);
|
||||||
|
|
||||||
// Print stats periodically.
|
// Print stats periodically.
|
||||||
tokio::spawn({
|
tokio::spawn({
|
||||||
|
|
|
||||||
|
|
@ -197,7 +197,7 @@ impl Api {
|
||||||
torrents
|
torrents
|
||||||
.map(|(id, mgr)| TorrentListResponseItem {
|
.map(|(id, mgr)| TorrentListResponseItem {
|
||||||
id,
|
id,
|
||||||
info_hash: mgr.info().info_hash.as_string(),
|
info_hash: mgr.shared().info_hash.as_string(),
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
});
|
});
|
||||||
|
|
@ -206,9 +206,9 @@ impl Api {
|
||||||
|
|
||||||
pub fn api_torrent_details(&self, idx: TorrentIdOrHash) -> Result<TorrentDetailsResponse> {
|
pub fn api_torrent_details(&self, idx: TorrentIdOrHash) -> Result<TorrentDetailsResponse> {
|
||||||
let handle = self.mgr_handle(idx)?;
|
let handle = self.mgr_handle(idx)?;
|
||||||
let info_hash = handle.info().info_hash;
|
let info_hash = handle.shared().info_hash;
|
||||||
let only_files = handle.only_files();
|
let only_files = handle.only_files();
|
||||||
make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref())
|
make_torrent_details(&info_hash, &handle.shared().info, only_files.as_deref())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn api_session_stats(&self) -> SessionStatsSnapshot {
|
pub fn api_session_stats(&self) -> SessionStatsSnapshot {
|
||||||
|
|
@ -221,7 +221,7 @@ impl Api {
|
||||||
file_idx: usize,
|
file_idx: usize,
|
||||||
) -> Result<&'static str> {
|
) -> Result<&'static str> {
|
||||||
let handle = self.mgr_handle(idx)?;
|
let handle = self.mgr_handle(idx)?;
|
||||||
let info = &handle.info().info;
|
let info = &handle.shared().info;
|
||||||
torrent_file_mime_type(info, file_idx)
|
torrent_file_mime_type(info, file_idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -361,7 +361,7 @@ impl Api {
|
||||||
AddTorrentResponse::Added(id, handle) => {
|
AddTorrentResponse::Added(id, handle) => {
|
||||||
let details = make_torrent_details(
|
let details = make_torrent_details(
|
||||||
&handle.info_hash(),
|
&handle.info_hash(),
|
||||||
&handle.info().info,
|
&handle.shared().info,
|
||||||
handle.only_files().as_deref(),
|
handle.only_files().as_deref(),
|
||||||
)
|
)
|
||||||
.context("error making torrent details")?;
|
.context("error making torrent details")?;
|
||||||
|
|
@ -370,7 +370,7 @@ impl Api {
|
||||||
details,
|
details,
|
||||||
seen_peers: None,
|
seen_peers: None,
|
||||||
output_folder: handle
|
output_folder: handle
|
||||||
.info()
|
.shared()
|
||||||
.options
|
.options
|
||||||
.output_folder
|
.output_folder
|
||||||
.to_string_lossy()
|
.to_string_lossy()
|
||||||
|
|
|
||||||
|
|
@ -142,7 +142,7 @@ impl HttpApi {
|
||||||
|
|
||||||
fn torrent_playlist_items(handle: &ManagedTorrent) -> Result<Vec<(usize, String)>> {
|
fn torrent_playlist_items(handle: &ManagedTorrent) -> Result<Vec<(usize, String)>> {
|
||||||
let mut playlist_items = handle
|
let mut playlist_items = handle
|
||||||
.info()
|
.shared()
|
||||||
.info
|
.info
|
||||||
.iter_filenames_and_lengths()?
|
.iter_filenames_and_lengths()?
|
||||||
.enumerate()
|
.enumerate()
|
||||||
|
|
@ -216,8 +216,8 @@ impl HttpApi {
|
||||||
.await?;
|
.await?;
|
||||||
let (info, content) = match added {
|
let (info, content) = match added {
|
||||||
crate::AddTorrentResponse::AlreadyManaged(_, handle) => (
|
crate::AddTorrentResponse::AlreadyManaged(_, handle) => (
|
||||||
handle.info().info.clone(),
|
handle.shared().info.clone(),
|
||||||
handle.info().torrent_bytes.clone(),
|
handle.shared().torrent_bytes.clone(),
|
||||||
),
|
),
|
||||||
crate::AddTorrentResponse::ListOnly(ListOnlyResponse {
|
crate::AddTorrentResponse::ListOnly(ListOnlyResponse {
|
||||||
info,
|
info,
|
||||||
|
|
|
||||||
|
|
@ -77,7 +77,7 @@ pub use session::{
|
||||||
};
|
};
|
||||||
pub use spawn_utils::spawn as librqbit_spawn;
|
pub use spawn_utils::spawn as librqbit_spawn;
|
||||||
pub use torrent_state::{
|
pub use torrent_state::{
|
||||||
ManagedTorrent, ManagedTorrentInfo, ManagedTorrentState, TorrentStats, TorrentStatsState,
|
ManagedTorrent, ManagedTorrentShared, ManagedTorrentState, TorrentStats, TorrentStatsState,
|
||||||
};
|
};
|
||||||
pub use type_aliases::FileInfos;
|
pub use type_aliases::FileInfos;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ use crate::{
|
||||||
api::TorrentIdOrHash,
|
api::TorrentIdOrHash,
|
||||||
bitv_factory::{BitVFactory, NonPersistentBitVFactory},
|
bitv_factory::{BitVFactory, NonPersistentBitVFactory},
|
||||||
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
|
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
|
||||||
|
file_info::FileInfo,
|
||||||
merge_streams::merge_streams,
|
merge_streams::merge_streams,
|
||||||
peer_connection::PeerConnectionOptions,
|
peer_connection::PeerConnectionOptions,
|
||||||
read_buf::ReadBuf,
|
read_buf::ReadBuf,
|
||||||
|
|
@ -23,10 +24,11 @@ use crate::{
|
||||||
},
|
},
|
||||||
stream_connect::{SocksProxyConfig, StreamConnector},
|
stream_connect::{SocksProxyConfig, StreamConnector},
|
||||||
torrent_state::{
|
torrent_state::{
|
||||||
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
|
initializing::TorrentStateInitializing, ManagedTorrentHandle, ManagedTorrentLocked,
|
||||||
|
ManagedTorrentOptions, ManagedTorrentState, TorrentStateLive,
|
||||||
},
|
},
|
||||||
type_aliases::{DiskWorkQueueSender, PeerStream},
|
type_aliases::{DiskWorkQueueSender, PeerStream},
|
||||||
ManagedTorrentInfo,
|
ManagedTorrent, ManagedTorrentShared,
|
||||||
};
|
};
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use bencode::bencode_serialize_to_writer;
|
use bencode::bencode_serialize_to_writer;
|
||||||
|
|
@ -43,6 +45,7 @@ use itertools::Itertools;
|
||||||
use librqbit_core::{
|
use librqbit_core::{
|
||||||
constants::CHUNK_SIZE,
|
constants::CHUNK_SIZE,
|
||||||
directories::get_configuration_directory,
|
directories::get_configuration_directory,
|
||||||
|
lengths::Lengths,
|
||||||
magnet::Magnet,
|
magnet::Magnet,
|
||||||
peer_id::generate_peer_id,
|
peer_id::generate_peer_id,
|
||||||
spawn_utils::spawn_with_cancel,
|
spawn_utils::spawn_with_cancel,
|
||||||
|
|
@ -51,7 +54,10 @@ use librqbit_core::{
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use peer_binary_protocol::Handshake;
|
use peer_binary_protocol::Handshake;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::{
|
||||||
|
net::{TcpListener, TcpStream},
|
||||||
|
sync::Notify,
|
||||||
|
};
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tokio_util::sync::{CancellationToken, DropGuard};
|
use tokio_util::sync::{CancellationToken, DropGuard};
|
||||||
use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span};
|
use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span};
|
||||||
|
|
@ -95,7 +101,7 @@ pub struct Session {
|
||||||
peer_id: Id20,
|
peer_id: Id20,
|
||||||
dht: Option<Dht>,
|
dht: Option<Dht>,
|
||||||
persistence: Option<Arc<dyn SessionPersistenceStore>>,
|
persistence: Option<Arc<dyn SessionPersistenceStore>>,
|
||||||
bitv_factory: Arc<dyn BitVFactory>,
|
pub(crate) bitv_factory: Arc<dyn BitVFactory>,
|
||||||
peer_opts: PeerConnectionOptions,
|
peer_opts: PeerConnectionOptions,
|
||||||
spawner: BlockingSpawner,
|
spawner: BlockingSpawner,
|
||||||
next_id: AtomicUsize,
|
next_id: AtomicUsize,
|
||||||
|
|
@ -111,9 +117,8 @@ pub struct Session {
|
||||||
default_storage_factory: Option<BoxStorageFactory>,
|
default_storage_factory: Option<BoxStorageFactory>,
|
||||||
|
|
||||||
reqwest_client: reqwest::Client,
|
reqwest_client: reqwest::Client,
|
||||||
connector: Arc<StreamConnector>,
|
pub(crate) connector: Arc<StreamConnector>,
|
||||||
|
pub(crate) concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>,
|
||||||
concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>,
|
|
||||||
|
|
||||||
root_span: Option<Span>,
|
root_span: Option<Span>,
|
||||||
|
|
||||||
|
|
@ -637,7 +642,7 @@ impl Session {
|
||||||
if opts.enable_upnp_port_forwarding {
|
if opts.enable_upnp_port_forwarding {
|
||||||
session.spawn(
|
session.spawn(
|
||||||
error_span!(parent: session.rs(), "upnp_forward", port = listen_port),
|
error_span!(parent: session.rs(), "upnp_forward", port = listen_port),
|
||||||
session.clone().task_upnp_port_forwarder(listen_port),
|
Self::task_upnp_port_forwarder(listen_port),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -686,7 +691,7 @@ impl Session {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn check_incoming_connection(
|
async fn check_incoming_connection(
|
||||||
&self,
|
self: Arc<Self>,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
) -> anyhow::Result<(Arc<TorrentStateLive>, CheckedIncomingConnection)> {
|
) -> anyhow::Result<(Arc<TorrentStateLive>, CheckedIncomingConnection)> {
|
||||||
|
|
@ -739,6 +744,8 @@ impl Session {
|
||||||
|
|
||||||
async fn task_tcp_listener(self: Arc<Self>, l: TcpListener) -> anyhow::Result<()> {
|
async fn task_tcp_listener(self: Arc<Self>, l: TcpListener) -> anyhow::Result<()> {
|
||||||
let mut futs = FuturesUnordered::new();
|
let mut futs = FuturesUnordered::new();
|
||||||
|
let session = Arc::downgrade(&self);
|
||||||
|
drop(self);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
|
@ -746,13 +753,15 @@ impl Session {
|
||||||
match r {
|
match r {
|
||||||
Ok((stream, addr)) => {
|
Ok((stream, addr)) => {
|
||||||
trace!("accepted connection from {addr}");
|
trace!("accepted connection from {addr}");
|
||||||
|
let session = session.upgrade().context("session is dead")?;
|
||||||
|
let span = error_span!(parent: session.rs(), "incoming", addr=%addr);
|
||||||
futs.push(
|
futs.push(
|
||||||
self.check_incoming_connection(addr, stream)
|
session.check_incoming_connection(addr, stream)
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
debug!("error checking incoming connection: {e:#}");
|
debug!("error checking incoming connection: {e:#}");
|
||||||
e
|
e
|
||||||
})
|
})
|
||||||
.instrument(error_span!(parent: self.rs(), "incoming", addr=%addr))
|
.instrument(span)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
@ -770,7 +779,7 @@ impl Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn task_upnp_port_forwarder(self: Arc<Self>, port: u16) -> anyhow::Result<()> {
|
async fn task_upnp_port_forwarder(port: u16) -> anyhow::Result<()> {
|
||||||
let pf = librqbit_upnp::UpnpPortForwarder::new(vec![port], None)?;
|
let pf = librqbit_upnp::UpnpPortForwarder::new(vec![port], None)?;
|
||||||
pf.run_forever().await
|
pf.run_forever().await
|
||||||
}
|
}
|
||||||
|
|
@ -914,7 +923,6 @@ impl Session {
|
||||||
peer_rx: Some(rx),
|
peer_rx: Some(rx),
|
||||||
initial_peers: {
|
initial_peers: {
|
||||||
let seen = seen.into_iter().collect_vec();
|
let seen = seen.into_iter().collect_vec();
|
||||||
info!(count=seen.len(), "seen");
|
|
||||||
for peer in &seen {
|
for peer in &seen {
|
||||||
debug!(?peer, "seen")
|
debug!(?peer, "seen")
|
||||||
}
|
}
|
||||||
|
|
@ -1023,7 +1031,7 @@ impl Session {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn main_torrent_info(
|
async fn main_torrent_info(
|
||||||
&self,
|
self: &Arc<Self>,
|
||||||
add_res: InternalAddResult,
|
add_res: InternalAddResult,
|
||||||
mut opts: AddTorrentOptions,
|
mut opts: AddTorrentOptions,
|
||||||
) -> anyhow::Result<AddTorrentResponse> {
|
) -> anyhow::Result<AddTorrentResponse> {
|
||||||
|
|
@ -1084,43 +1092,6 @@ impl Session {
|
||||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut builder = ManagedTorrentBuilder::new(
|
|
||||||
id,
|
|
||||||
info,
|
|
||||||
info_hash,
|
|
||||||
torrent_bytes,
|
|
||||||
info_bytes,
|
|
||||||
output_folder,
|
|
||||||
storage_factory,
|
|
||||||
);
|
|
||||||
builder
|
|
||||||
.allow_overwrite(opts.overwrite)
|
|
||||||
.spawner(self.spawner)
|
|
||||||
.trackers(trackers)
|
|
||||||
.connector(self.connector.clone())
|
|
||||||
.peer_id(self.peer_id);
|
|
||||||
|
|
||||||
if let Some(d) = self.disk_write_tx.clone() {
|
|
||||||
builder.disk_writer(d);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(only_files) = only_files {
|
|
||||||
builder.only_files(only_files);
|
|
||||||
}
|
|
||||||
if let Some(interval) = opts.force_tracker_interval {
|
|
||||||
builder.force_tracker_interval(interval);
|
|
||||||
}
|
|
||||||
|
|
||||||
let peer_opts = self.merge_peer_opts(opts.peer_opts);
|
|
||||||
|
|
||||||
if let Some(t) = peer_opts.connect_timeout {
|
|
||||||
builder.peer_connect_timeout(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(t) = peer_opts.read_write_timeout {
|
|
||||||
builder.peer_read_write_timeout(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
let managed_torrent = {
|
let managed_torrent = {
|
||||||
let mut g = self.db.write();
|
let mut g = self.db.write();
|
||||||
if let Some((id, handle)) = g.torrents.iter().find_map(|(eid, t)| {
|
if let Some((id, handle)) = g.torrents.iter().find_map(|(eid, t)| {
|
||||||
|
|
@ -1132,13 +1103,70 @@ impl Session {
|
||||||
}) {
|
}) {
|
||||||
return Ok(AddTorrentResponse::AlreadyManaged(id, handle));
|
return Ok(AddTorrentResponse::AlreadyManaged(id, handle));
|
||||||
}
|
}
|
||||||
let managed_torrent = builder.build(error_span!(parent: self.rs(), "torrent", id))?;
|
|
||||||
g.add_torrent(managed_torrent.clone(), id);
|
let lengths = Lengths::from_torrent(&info)?;
|
||||||
managed_torrent
|
let file_infos = info
|
||||||
|
.iter_file_details(&lengths)?
|
||||||
|
.map(|fd| {
|
||||||
|
Ok::<_, anyhow::Error>(FileInfo {
|
||||||
|
relative_filename: fd.filename.to_pathbuf()?,
|
||||||
|
offset_in_torrent: fd.offset,
|
||||||
|
piece_range: fd.pieces,
|
||||||
|
len: fd.len,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<anyhow::Result<Vec<FileInfo>>>()?;
|
||||||
|
|
||||||
|
let span = error_span!(parent: self.rs(), "torrent", id);
|
||||||
|
let peer_opts = self.merge_peer_opts(opts.peer_opts);
|
||||||
|
let minfo = Arc::new(ManagedTorrentShared {
|
||||||
|
id,
|
||||||
|
span,
|
||||||
|
file_infos,
|
||||||
|
info,
|
||||||
|
torrent_bytes,
|
||||||
|
info_bytes,
|
||||||
|
info_hash,
|
||||||
|
trackers: trackers.into_iter().collect(),
|
||||||
|
spawner: self.spawner,
|
||||||
|
peer_id: self.peer_id,
|
||||||
|
lengths,
|
||||||
|
storage_factory,
|
||||||
|
options: ManagedTorrentOptions {
|
||||||
|
force_tracker_interval: opts.force_tracker_interval,
|
||||||
|
peer_connect_timeout: peer_opts.connect_timeout,
|
||||||
|
peer_read_write_timeout: peer_opts.read_write_timeout,
|
||||||
|
allow_overwrite: opts.overwrite,
|
||||||
|
output_folder,
|
||||||
|
disk_write_queue: self.disk_write_tx.clone(),
|
||||||
|
},
|
||||||
|
connector: self.connector.clone(),
|
||||||
|
session: Arc::downgrade(self),
|
||||||
|
});
|
||||||
|
|
||||||
|
let initializing = Arc::new(TorrentStateInitializing::new(
|
||||||
|
minfo.clone(),
|
||||||
|
only_files.clone(),
|
||||||
|
minfo.storage_factory.create_and_init(&minfo)?,
|
||||||
|
));
|
||||||
|
let handle = Arc::new(ManagedTorrent {
|
||||||
|
locked: RwLock::new(ManagedTorrentLocked {
|
||||||
|
state: ManagedTorrentState::Initializing(initializing),
|
||||||
|
only_files,
|
||||||
|
}),
|
||||||
|
state_change_notify: Notify::new(),
|
||||||
|
shared: minfo,
|
||||||
|
});
|
||||||
|
|
||||||
|
g.add_torrent(handle.clone(), id);
|
||||||
|
handle
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(p) = self.persistence.as_ref() {
|
if let Some(p) = self.persistence.as_ref() {
|
||||||
p.store(id, &managed_torrent).await?;
|
if let Err(e) = p.store(id, &managed_torrent).await {
|
||||||
|
self.db.write().torrents.remove(&id);
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge "initial_peers" and "peer_rx" into one stream.
|
// Merge "initial_peers" and "peer_rx" into one stream.
|
||||||
|
|
@ -1156,18 +1184,11 @@ impl Session {
|
||||||
);
|
);
|
||||||
|
|
||||||
{
|
{
|
||||||
let span = managed_torrent.info.span.clone();
|
let span = managed_torrent.shared.span.clone();
|
||||||
let _ = span.enter();
|
let _ = span.enter();
|
||||||
|
|
||||||
managed_torrent
|
managed_torrent
|
||||||
.start(
|
.start(peer_rx, opts.paused)
|
||||||
peer_rx,
|
|
||||||
opts.paused,
|
|
||||||
self.cancellation_token.child_token(),
|
|
||||||
self.concurrent_initialize_semaphore.clone(),
|
|
||||||
self.bitv_factory.clone(),
|
|
||||||
self.stats.atomic.clone(),
|
|
||||||
)
|
|
||||||
.context("error starting torrent")?;
|
.context("error starting torrent")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1239,18 +1260,18 @@ impl Session {
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.map(Ok)
|
.map(Ok)
|
||||||
.unwrap_or_else(|| removed.storage_factory.create(removed.info()));
|
.unwrap_or_else(|| removed.shared.storage_factory.create(removed.shared()));
|
||||||
|
|
||||||
match (storage, delete_files) {
|
match (storage, delete_files) {
|
||||||
(Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"),
|
(Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"),
|
||||||
(Ok(storage), true) => {
|
(Ok(storage), true) => {
|
||||||
debug!("will delete files");
|
debug!("will delete files");
|
||||||
remove_files_and_dirs(removed.info(), &storage);
|
remove_files_and_dirs(removed.shared(), &storage);
|
||||||
if removed.info().options.output_folder != self.output_folder {
|
if removed.shared().options.output_folder != self.output_folder {
|
||||||
if let Err(e) = storage.remove_directory_if_empty(Path::new("")) {
|
if let Err(e) = storage.remove_directory_if_empty(Path::new("")) {
|
||||||
warn!(
|
warn!(
|
||||||
"error removing {:?}: {e:?}",
|
"error removing {:?}: {e:?}",
|
||||||
removed.info().options.output_folder
|
removed.shared().options.output_folder
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1313,18 +1334,11 @@ impl Session {
|
||||||
pub async fn unpause(self: &Arc<Self>, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
|
pub async fn unpause(self: &Arc<Self>, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
|
||||||
let peer_rx = self.make_peer_rx(
|
let peer_rx = self.make_peer_rx(
|
||||||
handle.info_hash(),
|
handle.info_hash(),
|
||||||
handle.info().trackers.clone().into_iter().collect(),
|
handle.shared().trackers.clone().into_iter().collect(),
|
||||||
self.tcp_listen_port,
|
self.tcp_listen_port,
|
||||||
handle.info().options.force_tracker_interval,
|
handle.shared().options.force_tracker_interval,
|
||||||
)?;
|
|
||||||
handle.start(
|
|
||||||
peer_rx,
|
|
||||||
false,
|
|
||||||
self.cancellation_token.child_token(),
|
|
||||||
self.concurrent_initialize_semaphore.clone(),
|
|
||||||
self.bitv_factory.clone(),
|
|
||||||
self.stats.atomic.clone(),
|
|
||||||
)?;
|
)?;
|
||||||
|
handle.start(peer_rx, false)?;
|
||||||
self.try_update_persistence_metadata(handle).await;
|
self.try_update_persistence_metadata(handle).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -1344,7 +1358,7 @@ impl Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_files_and_dirs(info: &ManagedTorrentInfo, files: &dyn TorrentStorage) {
|
fn remove_files_and_dirs(info: &ManagedTorrentShared, files: &dyn TorrentStorage) {
|
||||||
let mut all_dirs = HashSet::new();
|
let mut all_dirs = HashSet::new();
|
||||||
for (id, fi) in info.file_infos.iter().enumerate() {
|
for (id, fi) in info.file_infos.iter().enumerate() {
|
||||||
let mut fname = &*fi.relative_filename;
|
let mut fname = &*fi.relative_filename;
|
||||||
|
|
|
||||||
|
|
@ -128,6 +128,7 @@ impl JsonSessionPersistenceStore {
|
||||||
write_torrent_file: bool,
|
write_torrent_file: bool,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
if !torrent
|
if !torrent
|
||||||
|
.shared
|
||||||
.storage_factory
|
.storage_factory
|
||||||
.is_type_id(TypeId::of::<FilesystemStorageFactory>())
|
.is_type_id(TypeId::of::<FilesystemStorageFactory>())
|
||||||
{
|
{
|
||||||
|
|
@ -136,7 +137,7 @@ impl JsonSessionPersistenceStore {
|
||||||
|
|
||||||
let st = SerializedTorrent {
|
let st = SerializedTorrent {
|
||||||
trackers: torrent
|
trackers: torrent
|
||||||
.info()
|
.shared()
|
||||||
.trackers
|
.trackers
|
||||||
.iter()
|
.iter()
|
||||||
.map(|u| u.to_string())
|
.map(|u| u.to_string())
|
||||||
|
|
@ -146,10 +147,10 @@ impl JsonSessionPersistenceStore {
|
||||||
torrent_bytes: Default::default(),
|
torrent_bytes: Default::default(),
|
||||||
only_files: torrent.only_files().clone(),
|
only_files: torrent.only_files().clone(),
|
||||||
is_paused: torrent.with_state(|s| matches!(s, ManagedTorrentState::Paused(_))),
|
is_paused: torrent.with_state(|s| matches!(s, ManagedTorrentState::Paused(_))),
|
||||||
output_folder: torrent.info().options.output_folder.clone(),
|
output_folder: torrent.shared().options.output_folder.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
if write_torrent_file && !torrent.info().torrent_bytes.is_empty() {
|
if write_torrent_file && !torrent.shared().torrent_bytes.is_empty() {
|
||||||
let torrent_bytes_file = self.torrent_bytes_filename(&torrent.info_hash());
|
let torrent_bytes_file = self.torrent_bytes_filename(&torrent.info_hash());
|
||||||
match tokio::fs::OpenOptions::new()
|
match tokio::fs::OpenOptions::new()
|
||||||
.create(true)
|
.create(true)
|
||||||
|
|
@ -159,7 +160,7 @@ impl JsonSessionPersistenceStore {
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(mut f) => {
|
Ok(mut f) => {
|
||||||
if let Err(e) = f.write_all(&torrent.info().torrent_bytes).await {
|
if let Err(e) = f.write_all(&torrent.shared().torrent_bytes).await {
|
||||||
warn!(error=?e, file=?torrent_bytes_file, "error writing torrent bytes")
|
warn!(error=?e, file=?torrent_bytes_file, "error writing torrent bytes")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -96,7 +96,7 @@ impl SessionPersistenceStore for PostgresSessionStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> {
|
async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> {
|
||||||
let torrent_bytes: &[u8] = &torrent.info().torrent_bytes;
|
let torrent_bytes: &[u8] = &torrent.shared().torrent_bytes;
|
||||||
let q = "INSERT INTO torrents (id, info_hash, torrent_bytes, trackers, output_folder, only_files, is_paused)
|
let q = "INSERT INTO torrents (id, info_hash, torrent_bytes, trackers, output_folder, only_files, is_paused)
|
||||||
VALUES($1, $2, $3, $4, $5, $6, $7)
|
VALUES($1, $2, $3, $4, $5, $6, $7)
|
||||||
ON CONFLICT(id) DO NOTHING";
|
ON CONFLICT(id) DO NOTHING";
|
||||||
|
|
@ -104,10 +104,17 @@ impl SessionPersistenceStore for PostgresSessionStorage {
|
||||||
.bind::<i32>(id.try_into()?)
|
.bind::<i32>(id.try_into()?)
|
||||||
.bind(&torrent.info_hash().0[..])
|
.bind(&torrent.info_hash().0[..])
|
||||||
.bind(torrent_bytes)
|
.bind(torrent_bytes)
|
||||||
.bind(torrent.info().trackers.iter().cloned().collect::<Vec<_>>())
|
|
||||||
.bind(
|
.bind(
|
||||||
torrent
|
torrent
|
||||||
.info()
|
.shared()
|
||||||
|
.trackers
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
)
|
||||||
|
.bind(
|
||||||
|
torrent
|
||||||
|
.shared()
|
||||||
.options
|
.options
|
||||||
.output_folder
|
.output_folder
|
||||||
.to_str()
|
.to_str()
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ use std::{
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
use atomic::AtomicSessionStats;
|
use atomic::AtomicSessionStats;
|
||||||
use librqbit_core::speed_estimator::SpeedEstimator;
|
use librqbit_core::speed_estimator::SpeedEstimator;
|
||||||
use snapshot::SessionStatsSnapshot;
|
use snapshot::SessionStatsSnapshot;
|
||||||
|
|
@ -40,12 +41,13 @@ impl Default for SessionStats {
|
||||||
impl Session {
|
impl Session {
|
||||||
pub(crate) fn start_speed_estimator_updater(self: &Arc<Self>) {
|
pub(crate) fn start_speed_estimator_updater(self: &Arc<Self>) {
|
||||||
self.spawn(error_span!(parent: self.rs(), "speed_estimator"), {
|
self.spawn(error_span!(parent: self.rs(), "speed_estimator"), {
|
||||||
let s = self.clone();
|
let s = Arc::downgrade(self);
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let mut i = tokio::time::interval(Duration::from_secs(1));
|
let mut i = tokio::time::interval(Duration::from_secs(1));
|
||||||
loop {
|
loop {
|
||||||
i.tick().await;
|
i.tick().await;
|
||||||
|
let s = s.upgrade().context("session is dead")?;
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let fetched = s.stats.atomic.fetched_bytes.load(Ordering::Relaxed);
|
let fetched = s.stats.atomic.fetched_bytes.load(Ordering::Relaxed);
|
||||||
let uploaded = s.stats.atomic.uploaded_bytes.load(Ordering::Relaxed);
|
let uploaded = s.stats.atomic.uploaded_bytes.load(Ordering::Relaxed);
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ use std::{
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
|
|
||||||
use crate::{storage::StorageFactoryExt, torrent_state::ManagedTorrentInfo};
|
use crate::{storage::StorageFactoryExt, torrent_state::ManagedTorrentShared};
|
||||||
|
|
||||||
use crate::storage::{StorageFactory, TorrentStorage};
|
use crate::storage::{StorageFactory, TorrentStorage};
|
||||||
|
|
||||||
|
|
@ -18,7 +18,7 @@ pub struct FilesystemStorageFactory {}
|
||||||
impl StorageFactory for FilesystemStorageFactory {
|
impl StorageFactory for FilesystemStorageFactory {
|
||||||
type Storage = FilesystemStorage;
|
type Storage = FilesystemStorage;
|
||||||
|
|
||||||
fn create(&self, meta: &ManagedTorrentInfo) -> anyhow::Result<FilesystemStorage> {
|
fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result<FilesystemStorage> {
|
||||||
Ok(FilesystemStorage {
|
Ok(FilesystemStorage {
|
||||||
output_folder: meta.options.output_folder.clone(),
|
output_folder: meta.options.output_folder.clone(),
|
||||||
opened_files: Default::default(),
|
opened_files: Default::default(),
|
||||||
|
|
@ -149,7 +149,7 @@ impl TorrentStorage for FilesystemStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> {
|
fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> {
|
||||||
let mut files = Vec::<OpenedFile>::new();
|
let mut files = Vec::<OpenedFile>::new();
|
||||||
for file_details in meta.info.iter_file_details(&meta.lengths)? {
|
for file_details in meta.info.iter_file_details(&meta.lengths)? {
|
||||||
let mut full_path = self.output_folder.clone();
|
let mut full_path = self.output_folder.clone();
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ use anyhow::Context;
|
||||||
use memmap2::{MmapMut, MmapOptions};
|
use memmap2::{MmapMut, MmapOptions};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
|
|
||||||
use crate::torrent_state::ManagedTorrentInfo;
|
use crate::torrent_state::ManagedTorrentShared;
|
||||||
|
|
||||||
use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage};
|
use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage};
|
||||||
|
|
||||||
|
|
@ -22,7 +22,7 @@ fn dummy_mmap() -> anyhow::Result<MmapMut> {
|
||||||
impl StorageFactory for MmapFilesystemStorageFactory {
|
impl StorageFactory for MmapFilesystemStorageFactory {
|
||||||
type Storage = MmapFilesystemStorage;
|
type Storage = MmapFilesystemStorage;
|
||||||
|
|
||||||
fn create(&self, meta: &ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
|
fn create(&self, meta: &ManagedTorrentShared) -> anyhow::Result<Self::Storage> {
|
||||||
let fs_storage = FilesystemStorageFactory::default().create(meta)?;
|
let fs_storage = FilesystemStorageFactory::default().create(meta)?;
|
||||||
|
|
||||||
Ok(MmapFilesystemStorage {
|
Ok(MmapFilesystemStorage {
|
||||||
|
|
@ -97,7 +97,7 @@ impl TorrentStorage for MmapFilesystemStorage {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> {
|
fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> {
|
||||||
self.fs.init(meta)?;
|
self.fs.init(meta)?;
|
||||||
let mut mmaps = Vec::new();
|
let mut mmaps = Vec::new();
|
||||||
for (idx, file) in self.fs.opened_files.iter().enumerate() {
|
for (idx, file) in self.fs.opened_files.iter().enumerate() {
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ use parking_lot::Mutex;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
storage::{StorageFactory, StorageFactoryExt, TorrentStorage},
|
storage::{StorageFactory, StorageFactoryExt, TorrentStorage},
|
||||||
ManagedTorrentInfo,
|
ManagedTorrentShared,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
@ -35,7 +35,7 @@ impl<U: StorageFactory> SlowStorageFactory<U> {
|
||||||
impl<U: StorageFactory + Clone> StorageFactory for SlowStorageFactory<U> {
|
impl<U: StorageFactory + Clone> StorageFactory for SlowStorageFactory<U> {
|
||||||
type Storage = SlowStorage<U::Storage>;
|
type Storage = SlowStorage<U::Storage>;
|
||||||
|
|
||||||
fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
|
fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result<Self::Storage> {
|
||||||
Ok(SlowStorage {
|
Ok(SlowStorage {
|
||||||
underlying: self.underlying_factory.create(info)?,
|
underlying: self.underlying_factory.create(info)?,
|
||||||
pwrite_all_bufread: Mutex::new(Box::new(
|
pwrite_all_bufread: Mutex::new(Box::new(
|
||||||
|
|
@ -116,7 +116,7 @@ impl<U: TorrentStorage> TorrentStorage for SlowStorage<U> {
|
||||||
self.underlying.remove_directory_if_empty(path)
|
self.underlying.remove_directory_if_empty(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> {
|
fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> {
|
||||||
self.underlying.init(meta)
|
self.underlying.init(meta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ A storage middleware that logs the time underlying storage operations took.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
storage::{StorageFactory, StorageFactoryExt, TorrentStorage},
|
storage::{StorageFactory, StorageFactoryExt, TorrentStorage},
|
||||||
ManagedTorrentInfo,
|
ManagedTorrentShared,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
@ -25,7 +25,7 @@ impl<U> TimingStorageFactory<U> {
|
||||||
impl<U: StorageFactory + Clone> StorageFactory for TimingStorageFactory<U> {
|
impl<U: StorageFactory + Clone> StorageFactory for TimingStorageFactory<U> {
|
||||||
type Storage = TimingStorage<U::Storage>;
|
type Storage = TimingStorage<U::Storage>;
|
||||||
|
|
||||||
fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
|
fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result<Self::Storage> {
|
||||||
Ok(TimingStorage {
|
Ok(TimingStorage {
|
||||||
name: self.name.clone(),
|
name: self.name.clone(),
|
||||||
underlying: self.underlying_factory.create(info)?,
|
underlying: self.underlying_factory.create(info)?,
|
||||||
|
|
@ -104,7 +104,7 @@ impl<U: TorrentStorage> TorrentStorage for TimingStorage<U> {
|
||||||
self.underlying.remove_directory_if_empty(path)
|
self.underlying.remove_directory_if_empty(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> {
|
fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> {
|
||||||
self.underlying.init(meta)
|
self.underlying.init(meta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ use parking_lot::RwLock;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
storage::{StorageFactory, StorageFactoryExt, TorrentStorage},
|
storage::{StorageFactory, StorageFactoryExt, TorrentStorage},
|
||||||
FileInfos, ManagedTorrentInfo,
|
FileInfos, ManagedTorrentShared,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy)]
|
||||||
|
|
@ -35,7 +35,7 @@ impl<U> WriteThroughCacheStorageFactory<U> {
|
||||||
impl<U: StorageFactory + Clone> StorageFactory for WriteThroughCacheStorageFactory<U> {
|
impl<U: StorageFactory + Clone> StorageFactory for WriteThroughCacheStorageFactory<U> {
|
||||||
type Storage = WriteThroughCacheStorage<U::Storage>;
|
type Storage = WriteThroughCacheStorage<U::Storage>;
|
||||||
|
|
||||||
fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
|
fn create(&self, info: &crate::ManagedTorrentShared) -> anyhow::Result<Self::Storage> {
|
||||||
let pieces = self
|
let pieces = self
|
||||||
.max_cache_bytes
|
.max_cache_bytes
|
||||||
.div_ceil(info.lengths.default_piece_length() as u64)
|
.div_ceil(info.lengths.default_piece_length() as u64)
|
||||||
|
|
@ -121,7 +121,7 @@ impl<U: TorrentStorage> TorrentStorage for WriteThroughCacheStorage<U> {
|
||||||
self.underlying.remove_directory_if_empty(path)
|
self.underlying.remove_directory_if_empty(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> {
|
fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> {
|
||||||
self.underlying.init(meta)
|
self.underlying.init(meta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,13 +11,13 @@ use std::{
|
||||||
path::Path,
|
path::Path,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::torrent_state::ManagedTorrentInfo;
|
use crate::torrent_state::ManagedTorrentShared;
|
||||||
|
|
||||||
pub trait StorageFactory: Send + Sync + Any {
|
pub trait StorageFactory: Send + Sync + Any {
|
||||||
type Storage: TorrentStorage;
|
type Storage: TorrentStorage;
|
||||||
|
|
||||||
fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result<Self::Storage>;
|
fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result<Self::Storage>;
|
||||||
fn create_and_init(&self, info: &ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
|
fn create_and_init(&self, info: &ManagedTorrentShared) -> anyhow::Result<Self::Storage> {
|
||||||
let mut storage = self.create(info)?;
|
let mut storage = self.create(info)?;
|
||||||
storage.init(info)?;
|
storage.init(info)?;
|
||||||
Ok(storage)
|
Ok(storage)
|
||||||
|
|
@ -44,7 +44,7 @@ impl<SF: StorageFactory> StorageFactoryExt for SF {
|
||||||
impl<SF: StorageFactory> StorageFactory for Wrapper<SF> {
|
impl<SF: StorageFactory> StorageFactory for Wrapper<SF> {
|
||||||
type Storage = Box<dyn TorrentStorage>;
|
type Storage = Box<dyn TorrentStorage>;
|
||||||
|
|
||||||
fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
|
fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result<Self::Storage> {
|
||||||
let s = self.sf.create(info)?;
|
let s = self.sf.create(info)?;
|
||||||
Ok(Box::new(s))
|
Ok(Box::new(s))
|
||||||
}
|
}
|
||||||
|
|
@ -65,7 +65,7 @@ impl<SF: StorageFactory> StorageFactoryExt for SF {
|
||||||
impl<U: StorageFactory + ?Sized> StorageFactory for Box<U> {
|
impl<U: StorageFactory + ?Sized> StorageFactory for Box<U> {
|
||||||
type Storage = U::Storage;
|
type Storage = U::Storage;
|
||||||
|
|
||||||
fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result<U::Storage> {
|
fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result<U::Storage> {
|
||||||
(**self).create(info)
|
(**self).create(info)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -76,7 +76,7 @@ impl<U: StorageFactory + ?Sized> StorageFactory for Box<U> {
|
||||||
|
|
||||||
pub trait TorrentStorage: Send + Sync {
|
pub trait TorrentStorage: Send + Sync {
|
||||||
// Create/open files etc.
|
// Create/open files etc.
|
||||||
fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()>;
|
fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()>;
|
||||||
|
|
||||||
/// Given a file_id (which you can get more info from in init_storage() through torrent info)
|
/// Given a file_id (which you can get more info from in init_storage() through torrent info)
|
||||||
/// read buf.len() bytes into buf at offset.
|
/// read buf.len() bytes into buf at offset.
|
||||||
|
|
@ -124,7 +124,7 @@ impl<U: TorrentStorage + ?Sized> TorrentStorage for Box<U> {
|
||||||
(**self).remove_directory_if_empty(path)
|
(**self).remove_directory_if_empty(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> {
|
fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> {
|
||||||
(**self).init(meta)
|
(**self).init(meta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ use crate::{
|
||||||
create_torrent,
|
create_torrent,
|
||||||
tests::test_util::{
|
tests::test_util::{
|
||||||
create_default_random_dir_with_torrents, setup_test_logging, spawn_debug_server,
|
create_default_random_dir_with_torrents, setup_test_logging, spawn_debug_server,
|
||||||
TestPeerMetadata,
|
wait_until_i_am_the_last_task, DropChecks, TestPeerMetadata,
|
||||||
},
|
},
|
||||||
AddTorrentOptions, AddTorrentResponse, Session, SessionOptions, SessionPersistenceConfig,
|
AddTorrentOptions, AddTorrentResponse, Session, SessionOptions, SessionPersistenceConfig,
|
||||||
};
|
};
|
||||||
|
|
@ -28,13 +28,22 @@ async fn test_e2e_download() {
|
||||||
.and_then(|v| v.parse().ok())
|
.and_then(|v| v.parse().ok())
|
||||||
.unwrap_or(180);
|
.unwrap_or(180);
|
||||||
|
|
||||||
tokio::time::timeout(Duration::from_secs(timeout), _test_e2e_download())
|
let drop_checks = DropChecks::default();
|
||||||
.await
|
tokio::time::timeout(
|
||||||
.context("test_e2e_download timed out")
|
Duration::from_secs(timeout),
|
||||||
.unwrap()
|
_test_e2e_download(&drop_checks),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.context("test_e2e_download timed out")
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Wait to ensure everything is dropped.
|
||||||
|
wait_until_i_am_the_last_task().await;
|
||||||
|
|
||||||
|
drop_checks.check().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn _test_e2e_download() {
|
async fn _test_e2e_download(drop_checks: &DropChecks) {
|
||||||
setup_test_logging();
|
setup_test_logging();
|
||||||
match crate::try_increase_nofile_limit() {
|
match crate::try_increase_nofile_limit() {
|
||||||
Ok(limit) => info!(limit, "increased ulimit"),
|
Ok(limit) => info!(limit, "increased ulimit"),
|
||||||
|
|
@ -75,6 +84,7 @@ async fn _test_e2e_download() {
|
||||||
for i in 0..num_servers {
|
for i in 0..num_servers {
|
||||||
let torrent_file_bytes = torrent_file_bytes.clone();
|
let torrent_file_bytes = torrent_file_bytes.clone();
|
||||||
let tempdir = tempdir.path().to_owned();
|
let tempdir = tempdir.path().to_owned();
|
||||||
|
let drop_checks = drop_checks.clone();
|
||||||
let fut = spawn(
|
let fut = spawn(
|
||||||
async move {
|
async move {
|
||||||
let peer_id = TestPeerMetadata {
|
let peer_id = TestPeerMetadata {
|
||||||
|
|
@ -104,6 +114,8 @@ async fn _test_e2e_download() {
|
||||||
.await
|
.await
|
||||||
.context("error starting session")?;
|
.context("error starting session")?;
|
||||||
|
|
||||||
|
drop_checks.add(&session, format!("server session {i}"));
|
||||||
|
|
||||||
info!("started session");
|
info!("started session");
|
||||||
|
|
||||||
let handle = session
|
let handle = session
|
||||||
|
|
@ -118,6 +130,9 @@ async fn _test_e2e_download() {
|
||||||
.await
|
.await
|
||||||
.context("error adding torrent")?;
|
.context("error adding torrent")?;
|
||||||
let h = handle.into_handle().context("into_handle()")?;
|
let h = handle.into_handle().context("into_handle()")?;
|
||||||
|
|
||||||
|
drop_checks.add(&h.shared, format!("server {i} torrent shared handle"));
|
||||||
|
|
||||||
let mut interval = interval(Duration::from_millis(100));
|
let mut interval = interval(Duration::from_millis(100));
|
||||||
|
|
||||||
info!("added torrent");
|
info!("added torrent");
|
||||||
|
|
@ -141,12 +156,13 @@ async fn _test_e2e_download() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("torrent is live");
|
info!("torrent is live");
|
||||||
Ok::<_, anyhow::Error>(SocketAddr::new(
|
let addr = SocketAddr::new(
|
||||||
std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
||||||
session
|
session
|
||||||
.tcp_listen_port()
|
.tcp_listen_port()
|
||||||
.context("expected session.tcp_listen_port() to be set")?,
|
.context("expected session.tcp_listen_port() to be set")?,
|
||||||
))
|
);
|
||||||
|
Ok::<_, anyhow::Error>((session, addr))
|
||||||
}
|
}
|
||||||
.instrument(error_span!("server", id = i)),
|
.instrument(error_span!("server", id = i)),
|
||||||
);
|
);
|
||||||
|
|
@ -154,12 +170,15 @@ async fn _test_e2e_download() {
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut peers = Vec::new();
|
let mut peers = Vec::new();
|
||||||
|
|
||||||
|
// This is around just not to drop.
|
||||||
|
let mut _servers = Vec::new();
|
||||||
for (id, peer) in futures::future::join_all(futs)
|
for (id, peer) in futures::future::join_all(futs)
|
||||||
.await
|
.await
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
{
|
{
|
||||||
let peer = peer
|
let (server, peer) = peer
|
||||||
.with_context(|| format!("join error, server={id}"))
|
.with_context(|| format!("join error, server={id}"))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.with_context(|| format!("timeout, server={id}"))
|
.with_context(|| format!("timeout, server={id}"))
|
||||||
|
|
@ -167,6 +186,7 @@ async fn _test_e2e_download() {
|
||||||
.with_context(|| format!("server couldn't start, server={id}"))
|
.with_context(|| format!("server couldn't start, server={id}"))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
peers.push(peer);
|
peers.push(peer);
|
||||||
|
_servers.push(server);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("started all servers, starting client");
|
info!("started all servers, starting client");
|
||||||
|
|
@ -201,6 +221,7 @@ async fn _test_e2e_download() {
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
drop_checks.add(&session, "client session");
|
||||||
|
|
||||||
info!("started client session");
|
info!("started client session");
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,20 @@
|
||||||
use std::{io::Write, path::Path};
|
use std::{
|
||||||
|
io::Write,
|
||||||
|
path::Path,
|
||||||
|
sync::{Arc, Weak},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::{bail, Context};
|
||||||
use axum::{response::IntoResponse, routing::get, Router};
|
use axum::{response::IntoResponse, routing::get, Router};
|
||||||
use librqbit_core::Id20;
|
use librqbit_core::Id20;
|
||||||
|
use parking_lot::RwLock;
|
||||||
use rand::{thread_rng, Rng, RngCore, SeedableRng};
|
use rand::{thread_rng, Rng, RngCore, SeedableRng};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
|
|
||||||
pub fn setup_test_logging() {
|
pub fn setup_test_logging() {
|
||||||
if let Err(_) = std::env::var("RUST_LOG") {
|
if std::env::var("RUST_LOG").is_err() {
|
||||||
std::env::set_var("RUST_LOG", "debug");
|
std::env::set_var("RUST_LOG", "debug");
|
||||||
}
|
}
|
||||||
let _ = tracing_subscriber::fmt::try_init();
|
let _ = tracing_subscriber::fmt::try_init();
|
||||||
|
|
@ -124,3 +130,77 @@ async fn debug_server() -> anyhow::Result<()> {
|
||||||
pub fn spawn_debug_server() {
|
pub fn spawn_debug_server() {
|
||||||
tokio::spawn(debug_server());
|
tokio::spawn(debug_server());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub trait DropPlaceholder: Send + Sync {}
|
||||||
|
impl<T: Send + Sync> DropPlaceholder for T {}
|
||||||
|
|
||||||
|
struct DropCheck {
|
||||||
|
obj: Weak<dyn DropPlaceholder>,
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub struct DropChecks(Arc<RwLock<Vec<DropCheck>>>);
|
||||||
|
|
||||||
|
impl DropChecks {
|
||||||
|
pub fn add<T: DropPlaceholder + 'static, S: Into<String>>(&self, obj: &Arc<T>, name: S) {
|
||||||
|
let weak = Arc::downgrade(obj);
|
||||||
|
self.0.write().push(DropCheck {
|
||||||
|
obj: weak as Weak<dyn DropPlaceholder>,
|
||||||
|
name: name.into(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn check(&self) -> anyhow::Result<()> {
|
||||||
|
let mut still_running = Vec::new();
|
||||||
|
for dc in self.0.read().iter() {
|
||||||
|
if dc.obj.upgrade().is_some() {
|
||||||
|
still_running.push(dc.name.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !still_running.is_empty() {
|
||||||
|
anyhow::bail!(
|
||||||
|
"still existing objects that were supposed to be dropped: {still_running:#?}"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn wait_until(
|
||||||
|
mut cond: impl FnMut() -> anyhow::Result<()>,
|
||||||
|
timeout: Duration,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_millis(10));
|
||||||
|
let mut last_err: Option<anyhow::Error> = None;
|
||||||
|
let res = tokio::time::timeout(timeout, async {
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
match cond() {
|
||||||
|
Ok(()) => return Ok::<_, anyhow::Error>(()),
|
||||||
|
Err(e) => last_err = Some(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
if res.is_err() {
|
||||||
|
bail!("wait_until timeout: last result = {last_err:?}")
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn wait_until_i_am_the_last_task() {
|
||||||
|
let metrics = tokio::runtime::Handle::current().metrics();
|
||||||
|
wait_until(
|
||||||
|
|| {
|
||||||
|
let num_alive = metrics.num_alive_tasks();
|
||||||
|
if num_alive != 1 {
|
||||||
|
bail!("metrics.num_alive_tasks() = {num_alive}, expected 1")
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
Duration::from_secs(5),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,11 +19,11 @@ use crate::{
|
||||||
FileInfos,
|
FileInfos,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{paused::TorrentStatePaused, ManagedTorrentInfo};
|
use super::{paused::TorrentStatePaused, ManagedTorrentShared};
|
||||||
|
|
||||||
pub struct TorrentStateInitializing {
|
pub struct TorrentStateInitializing {
|
||||||
pub(crate) files: FileStorage,
|
pub(crate) files: FileStorage,
|
||||||
pub(crate) meta: Arc<ManagedTorrentInfo>,
|
pub(crate) meta: Arc<ManagedTorrentShared>,
|
||||||
pub(crate) only_files: Option<Vec<usize>>,
|
pub(crate) only_files: Option<Vec<usize>>,
|
||||||
pub(crate) checked_bytes: AtomicU64,
|
pub(crate) checked_bytes: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
@ -37,7 +37,7 @@ fn compute_selected_pieces(
|
||||||
for (_, fi) in file_infos
|
for (_, fi) in file_infos
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.filter(|(id, _)| only_files.map(|of| of.contains(id)).unwrap_or(false))
|
.filter(|(id, _)| only_files.map(|of| of.contains(id)).unwrap_or(true))
|
||||||
{
|
{
|
||||||
if let Some(r) = bf.get_mut(fi.piece_range_usize()) {
|
if let Some(r) = bf.get_mut(fi.piece_range_usize()) {
|
||||||
r.fill(true);
|
r.fill(true);
|
||||||
|
|
@ -48,7 +48,7 @@ fn compute_selected_pieces(
|
||||||
|
|
||||||
impl TorrentStateInitializing {
|
impl TorrentStateInitializing {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
meta: Arc<ManagedTorrentInfo>,
|
meta: Arc<ManagedTorrentShared>,
|
||||||
only_files: Option<Vec<usize>>,
|
only_files: Option<Vec<usize>>,
|
||||||
files: FileStorage,
|
files: FileStorage,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ use super::{
|
||||||
paused::TorrentStatePaused,
|
paused::TorrentStatePaused,
|
||||||
streaming::TorrentStreams,
|
streaming::TorrentStreams,
|
||||||
utils::{timeit, TimedExistence},
|
utils::{timeit, TimedExistence},
|
||||||
ManagedTorrentInfo,
|
ManagedTorrentShared,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -180,7 +180,7 @@ const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024;
|
||||||
|
|
||||||
pub struct TorrentStateLive {
|
pub struct TorrentStateLive {
|
||||||
peers: PeerStates,
|
peers: PeerStates,
|
||||||
meta: Arc<ManagedTorrentInfo>,
|
torrent: Arc<ManagedTorrentShared>,
|
||||||
locked: RwLock<TorrentStateLocked>,
|
locked: RwLock<TorrentStateLocked>,
|
||||||
|
|
||||||
pub(crate) files: FileStorage,
|
pub(crate) files: FileStorage,
|
||||||
|
|
@ -214,12 +214,16 @@ impl TorrentStateLive {
|
||||||
paused: TorrentStatePaused,
|
paused: TorrentStatePaused,
|
||||||
fatal_errors_tx: tokio::sync::oneshot::Sender<anyhow::Error>,
|
fatal_errors_tx: tokio::sync::oneshot::Sender<anyhow::Error>,
|
||||||
cancellation_token: CancellationToken,
|
cancellation_token: CancellationToken,
|
||||||
session_stats: Arc<AtomicSessionStats>,
|
|
||||||
) -> anyhow::Result<Arc<Self>> {
|
) -> anyhow::Result<Arc<Self>> {
|
||||||
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
|
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
|
||||||
|
let session = paused
|
||||||
let down_speed_estimator = SpeedEstimator::new(5);
|
.info
|
||||||
let up_speed_estimator = SpeedEstimator::new(5);
|
.session
|
||||||
|
.upgrade()
|
||||||
|
.context("session is dead, cannot start torrent")?;
|
||||||
|
let session_stats = session.stats.atomic.clone();
|
||||||
|
let down_speed_estimator = SpeedEstimator::default();
|
||||||
|
let up_speed_estimator = SpeedEstimator::default();
|
||||||
|
|
||||||
let have_bytes = paused.chunk_tracker.get_hns().have_bytes;
|
let have_bytes = paused.chunk_tracker.get_hns().have_bytes;
|
||||||
let lengths = *paused.chunk_tracker.get_lengths();
|
let lengths = *paused.chunk_tracker.get_lengths();
|
||||||
|
|
@ -241,7 +245,7 @@ impl TorrentStateLive {
|
||||||
let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128);
|
let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128);
|
||||||
|
|
||||||
let state = Arc::new(TorrentStateLive {
|
let state = Arc::new(TorrentStateLive {
|
||||||
meta: paused.info.clone(),
|
torrent: paused.info.clone(),
|
||||||
peers: PeerStates {
|
peers: PeerStates {
|
||||||
session_stats: session_stats.clone(),
|
session_stats: session_stats.clone(),
|
||||||
stats: Default::default(),
|
stats: Default::default(),
|
||||||
|
|
@ -277,7 +281,7 @@ impl TorrentStateLive {
|
||||||
});
|
});
|
||||||
|
|
||||||
state.spawn(
|
state.spawn(
|
||||||
error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"),
|
error_span!(parent: state.torrent.span.clone(), "speed_estimator_updater"),
|
||||||
{
|
{
|
||||||
let state = Arc::downgrade(&state);
|
let state = Arc::downgrade(&state);
|
||||||
async move {
|
async move {
|
||||||
|
|
@ -303,7 +307,7 @@ impl TorrentStateLive {
|
||||||
);
|
);
|
||||||
|
|
||||||
state.spawn(
|
state.spawn(
|
||||||
error_span!(parent: state.meta.span.clone(), "peer_adder"),
|
error_span!(parent: state.torrent.span.clone(), "peer_adder"),
|
||||||
state.clone().task_peer_adder(peer_queue_rx),
|
state.clone().task_peer_adder(peer_queue_rx),
|
||||||
);
|
);
|
||||||
Ok(state)
|
Ok(state)
|
||||||
|
|
@ -330,7 +334,7 @@ impl TorrentStateLive {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn disk_work_tx(&self) -> Option<&DiskWorkQueueSender> {
|
fn disk_work_tx(&self) -> Option<&DiskWorkQueueSender> {
|
||||||
self.meta.options.disk_write_queue.as_ref()
|
self.torrent.options.disk_write_queue.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn add_incoming_peer(
|
pub(crate) fn add_incoming_peer(
|
||||||
|
|
@ -378,7 +382,7 @@ impl TorrentStateLive {
|
||||||
|
|
||||||
self.spawn(
|
self.spawn(
|
||||||
error_span!(
|
error_span!(
|
||||||
parent: self.meta.span.clone(),
|
parent: self.torrent.span.clone(),
|
||||||
"manage_incoming_peer",
|
"manage_incoming_peer",
|
||||||
addr = %checked_peer.addr
|
addr = %checked_peer.addr
|
||||||
),
|
),
|
||||||
|
|
@ -410,18 +414,18 @@ impl TorrentStateLive {
|
||||||
first_message_received: AtomicBool::new(false),
|
first_message_received: AtomicBool::new(false),
|
||||||
};
|
};
|
||||||
let options = PeerConnectionOptions {
|
let options = PeerConnectionOptions {
|
||||||
connect_timeout: self.meta.options.peer_connect_timeout,
|
connect_timeout: self.torrent.options.peer_connect_timeout,
|
||||||
read_write_timeout: self.meta.options.peer_read_write_timeout,
|
read_write_timeout: self.torrent.options.peer_read_write_timeout,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let peer_connection = PeerConnection::new(
|
let peer_connection = PeerConnection::new(
|
||||||
checked_peer.addr,
|
checked_peer.addr,
|
||||||
self.meta.info_hash,
|
self.torrent.info_hash,
|
||||||
self.meta.peer_id,
|
self.torrent.peer_id,
|
||||||
&handler,
|
&handler,
|
||||||
Some(options),
|
Some(options),
|
||||||
self.meta.spawner,
|
self.torrent.spawner,
|
||||||
self.meta.connector.clone(),
|
self.torrent.connector.clone(),
|
||||||
);
|
);
|
||||||
let requester = handler.task_peer_chunk_requester();
|
let requester = handler.task_peer_chunk_requester();
|
||||||
|
|
||||||
|
|
@ -474,18 +478,18 @@ impl TorrentStateLive {
|
||||||
first_message_received: AtomicBool::new(false),
|
first_message_received: AtomicBool::new(false),
|
||||||
};
|
};
|
||||||
let options = PeerConnectionOptions {
|
let options = PeerConnectionOptions {
|
||||||
connect_timeout: state.meta.options.peer_connect_timeout,
|
connect_timeout: state.torrent.options.peer_connect_timeout,
|
||||||
read_write_timeout: state.meta.options.peer_read_write_timeout,
|
read_write_timeout: state.torrent.options.peer_read_write_timeout,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let peer_connection = PeerConnection::new(
|
let peer_connection = PeerConnection::new(
|
||||||
addr,
|
addr,
|
||||||
state.meta.info_hash,
|
state.torrent.info_hash,
|
||||||
state.meta.peer_id,
|
state.torrent.peer_id,
|
||||||
&handler,
|
&handler,
|
||||||
Some(options),
|
Some(options),
|
||||||
state.meta.spawner,
|
state.torrent.spawner,
|
||||||
state.meta.connector.clone(),
|
state.torrent.connector.clone(),
|
||||||
);
|
);
|
||||||
let requester = aframe!(handler
|
let requester = aframe!(handler
|
||||||
.task_peer_chunk_requester()
|
.task_peer_chunk_requester()
|
||||||
|
|
@ -532,30 +536,30 @@ impl TorrentStateLive {
|
||||||
|
|
||||||
let permit = state.peer_semaphore.clone().acquire_owned().await?;
|
let permit = state.peer_semaphore.clone().acquire_owned().await?;
|
||||||
state.spawn(
|
state.spawn(
|
||||||
error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()),
|
error_span!(parent: state.torrent.span.clone(), "manage_peer", peer = addr.to_string()),
|
||||||
aframe!(state.clone().task_manage_outgoing_peer(addr, permit)),
|
aframe!(state.clone().task_manage_outgoing_peer(addr, permit)),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn meta(&self) -> &ManagedTorrentInfo {
|
pub fn torrent(&self) -> &ManagedTorrentShared {
|
||||||
&self.meta
|
&self.torrent
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn info(&self) -> &TorrentMetaV1Info<ByteBufOwned> {
|
pub fn info(&self) -> &TorrentMetaV1Info<ByteBufOwned> {
|
||||||
&self.meta.info
|
&self.torrent.info
|
||||||
}
|
}
|
||||||
pub fn info_hash(&self) -> Id20 {
|
pub fn info_hash(&self) -> Id20 {
|
||||||
self.meta.info_hash
|
self.torrent.info_hash
|
||||||
}
|
}
|
||||||
pub fn peer_id(&self) -> Id20 {
|
pub fn peer_id(&self) -> Id20 {
|
||||||
self.meta.peer_id
|
self.torrent.peer_id
|
||||||
}
|
}
|
||||||
pub(crate) fn file_ops(&self) -> FileOps<'_> {
|
pub(crate) fn file_ops(&self) -> FileOps<'_> {
|
||||||
FileOps::new(
|
FileOps::new(
|
||||||
&self.meta.info,
|
&self.torrent.info,
|
||||||
&*self.files,
|
&*self.files,
|
||||||
&self.meta().file_infos,
|
&self.torrent().file_infos,
|
||||||
&self.lengths,
|
&self.lengths,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
@ -664,7 +668,7 @@ impl TorrentStateLive {
|
||||||
|
|
||||||
// g.chunks;
|
// g.chunks;
|
||||||
Ok(TorrentStatePaused {
|
Ok(TorrentStatePaused {
|
||||||
info: self.meta.clone(),
|
info: self.torrent.clone(),
|
||||||
files: self.files.take()?,
|
files: self.files.take()?,
|
||||||
chunk_tracker,
|
chunk_tracker,
|
||||||
streams: self.streams.clone(),
|
streams: self.streams.clone(),
|
||||||
|
|
@ -687,7 +691,8 @@ impl TorrentStateLive {
|
||||||
pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
|
pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
|
||||||
let mut g = self.lock_write("update_only_files");
|
let mut g = self.lock_write("update_only_files");
|
||||||
let ct = g.get_chunks_mut()?;
|
let ct = g.get_chunks_mut()?;
|
||||||
let hns = ct.update_only_files(self.meta().file_infos.iter().map(|f| f.len), only_files)?;
|
let hns =
|
||||||
|
ct.update_only_files(self.torrent().file_infos.iter().map(|f| f.len), only_files)?;
|
||||||
if !hns.finished() {
|
if !hns.finished() {
|
||||||
self.reconnect_all_not_needed_peers();
|
self.reconnect_all_not_needed_peers();
|
||||||
}
|
}
|
||||||
|
|
@ -706,7 +711,7 @@ impl TorrentStateLive {
|
||||||
};
|
};
|
||||||
self.streams
|
self.streams
|
||||||
.streamed_file_ids()
|
.streamed_file_ids()
|
||||||
.any(|file_id| !chunks.is_file_finished(&self.meta.file_infos[file_id]))
|
.any(|file_id| !chunks.is_file_finished(&self.torrent.file_infos[file_id]))
|
||||||
}
|
}
|
||||||
|
|
||||||
// We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite
|
// We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite
|
||||||
|
|
@ -725,7 +730,7 @@ impl TorrentStateLive {
|
||||||
|
|
||||||
// if we have all the pieces of the file, reopen it read only
|
// if we have all the pieces of the file, reopen it read only
|
||||||
for (idx, file_info) in self
|
for (idx, file_info) in self
|
||||||
.meta()
|
.torrent()
|
||||||
.file_infos
|
.file_infos
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
|
|
@ -736,9 +741,9 @@ impl TorrentStateLive {
|
||||||
}
|
}
|
||||||
|
|
||||||
self.streams
|
self.streams
|
||||||
.wake_streams_on_piece_completed(id, &self.meta.lengths);
|
.wake_streams_on_piece_completed(id, &self.torrent.lengths);
|
||||||
|
|
||||||
g.unflushed_bitv_bytes += self.meta.lengths.piece_length(id) as u64;
|
g.unflushed_bitv_bytes += self.torrent.lengths.piece_length(id) as u64;
|
||||||
if g.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES {
|
if g.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES {
|
||||||
g.try_flush_bitv()
|
g.try_flush_bitv()
|
||||||
}
|
}
|
||||||
|
|
@ -930,7 +935,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
||||||
&self,
|
&self,
|
||||||
handshake: &mut ExtendedHandshake<ByteBuf>,
|
handshake: &mut ExtendedHandshake<ByteBuf>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let info_bytes = &self.state.meta().info_bytes;
|
let info_bytes = &self.state.torrent().info_bytes;
|
||||||
if !info_bytes.is_empty() {
|
if !info_bytes.is_empty() {
|
||||||
if let Ok(len) = info_bytes.len().try_into() {
|
if let Ok(len) = info_bytes.len().try_into() {
|
||||||
handshake.metadata_size = Some(len);
|
handshake.metadata_size = Some(len);
|
||||||
|
|
@ -1010,7 +1015,7 @@ impl PeerHandler {
|
||||||
if let Some(dur) = backoff {
|
if let Some(dur) = backoff {
|
||||||
self.state.clone().spawn(
|
self.state.clone().spawn(
|
||||||
error_span!(
|
error_span!(
|
||||||
parent: self.state.meta.span.clone(),
|
parent: self.state.torrent.span.clone(),
|
||||||
"wait_for_peer",
|
"wait_for_peer",
|
||||||
peer = handle.to_string(),
|
peer = handle.to_string(),
|
||||||
duration = format!("{dur:?}")
|
duration = format!("{dur:?}")
|
||||||
|
|
@ -1069,7 +1074,7 @@ impl PeerHandler {
|
||||||
&& !g.inflight_pieces.contains_key(pid)
|
&& !g.inflight_pieces.contains_key(pid)
|
||||||
});
|
});
|
||||||
let natural_order_pieces = chunk_tracker
|
let natural_order_pieces = chunk_tracker
|
||||||
.iter_queued_pieces(&g.file_priorities, &self.state.meta().file_infos);
|
.iter_queued_pieces(&g.file_priorities, &self.state.torrent().file_infos);
|
||||||
for n in priority_streamed_pieces.chain(natural_order_pieces) {
|
for n in priority_streamed_pieces.chain(natural_order_pieces) {
|
||||||
if bf.get(n.get() as usize).map(|v| *v) == Some(true) {
|
if bf.get(n.get() as usize).map(|v| *v) == Some(true) {
|
||||||
n_opt = Some(n);
|
n_opt = Some(n);
|
||||||
|
|
@ -1612,7 +1617,7 @@ impl PeerHandler {
|
||||||
dtx.send(Box::new(work)).await?;
|
dtx.send(Box::new(work)).await?;
|
||||||
} else {
|
} else {
|
||||||
self.state
|
self.state
|
||||||
.meta
|
.torrent
|
||||||
.spawner
|
.spawner
|
||||||
.spawn_block_in_place(|| {
|
.spawn_block_in_place(|| {
|
||||||
write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info)
|
write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info)
|
||||||
|
|
@ -1624,7 +1629,7 @@ impl PeerHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_metadata_piece(&self, piece_id: u32) -> anyhow::Result<()> {
|
fn send_metadata_piece(&self, piece_id: u32) -> anyhow::Result<()> {
|
||||||
let data = &self.state.meta().info_bytes;
|
let data = &self.state.torrent().info_bytes;
|
||||||
let metadata_size = data.len();
|
let metadata_size = data.len();
|
||||||
if metadata_size == 0 {
|
if metadata_size == 0 {
|
||||||
anyhow::bail!("peer requested for info metadata but we don't have it")
|
anyhow::bail!("peer requested for info metadata but we don't have it")
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ use std::collections::HashSet;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::Weak;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
|
|
@ -19,7 +20,6 @@ use futures::future::BoxFuture;
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use librqbit_core::hash_id::Id20;
|
use librqbit_core::hash_id::Id20;
|
||||||
use librqbit_core::lengths::Lengths;
|
use librqbit_core::lengths::Lengths;
|
||||||
use librqbit_core::peer_id::generate_peer_id;
|
|
||||||
|
|
||||||
use librqbit_core::spawn_utils::spawn_with_cancel;
|
use librqbit_core::spawn_utils::spawn_with_cancel;
|
||||||
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
|
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
|
||||||
|
|
@ -34,11 +34,8 @@ use tracing::debug;
|
||||||
use tracing::error_span;
|
use tracing::error_span;
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
|
|
||||||
use crate::bitv_factory::BitVFactory;
|
|
||||||
use crate::chunk_tracker::ChunkTracker;
|
use crate::chunk_tracker::ChunkTracker;
|
||||||
use crate::file_info::FileInfo;
|
|
||||||
use crate::session::TorrentId;
|
use crate::session::TorrentId;
|
||||||
use crate::session_stats::atomic::AtomicSessionStats;
|
|
||||||
use crate::spawn_utils::BlockingSpawner;
|
use crate::spawn_utils::BlockingSpawner;
|
||||||
use crate::storage::BoxStorageFactory;
|
use crate::storage::BoxStorageFactory;
|
||||||
use crate::stream_connect::StreamConnector;
|
use crate::stream_connect::StreamConnector;
|
||||||
|
|
@ -46,6 +43,7 @@ use crate::torrent_state::stats::LiveStats;
|
||||||
use crate::type_aliases::DiskWorkQueueSender;
|
use crate::type_aliases::DiskWorkQueueSender;
|
||||||
use crate::type_aliases::FileInfos;
|
use crate::type_aliases::FileInfos;
|
||||||
use crate::type_aliases::PeerStream;
|
use crate::type_aliases::PeerStream;
|
||||||
|
use crate::Session;
|
||||||
|
|
||||||
use initializing::TorrentStateInitializing;
|
use initializing::TorrentStateInitializing;
|
||||||
|
|
||||||
|
|
@ -87,7 +85,7 @@ impl ManagedTorrentState {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct ManagedTorrentLocked {
|
pub(crate) struct ManagedTorrentLocked {
|
||||||
pub state: ManagedTorrentState,
|
pub(crate) state: ManagedTorrentState,
|
||||||
pub(crate) only_files: Option<Vec<usize>>,
|
pub(crate) only_files: Option<Vec<usize>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -101,7 +99,13 @@ pub(crate) struct ManagedTorrentOptions {
|
||||||
pub disk_write_queue: Option<DiskWorkQueueSender>,
|
pub disk_write_queue: Option<DiskWorkQueueSender>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ManagedTorrentInfo {
|
/// Common information about torrent shared among all possible states.
|
||||||
|
///
|
||||||
|
// The reason it's not inlined into ManagedTorrent is to break the Arc cycle:
|
||||||
|
// ManagedTorrent contains the current torrent state, which in turn needs access to a bunch
|
||||||
|
// of stuff, but it shouldn't access the state.
|
||||||
|
pub struct ManagedTorrentShared {
|
||||||
|
pub id: TorrentId,
|
||||||
pub info: TorrentMetaV1Info<ByteBufOwned>,
|
pub info: TorrentMetaV1Info<ByteBufOwned>,
|
||||||
pub torrent_bytes: Bytes,
|
pub torrent_bytes: Bytes,
|
||||||
pub info_bytes: Bytes,
|
pub info_bytes: Bytes,
|
||||||
|
|
@ -114,33 +118,31 @@ pub struct ManagedTorrentInfo {
|
||||||
pub span: tracing::Span,
|
pub span: tracing::Span,
|
||||||
pub(crate) options: ManagedTorrentOptions,
|
pub(crate) options: ManagedTorrentOptions,
|
||||||
pub(crate) connector: Arc<StreamConnector>,
|
pub(crate) connector: Arc<StreamConnector>,
|
||||||
|
pub(crate) storage_factory: BoxStorageFactory,
|
||||||
|
pub(crate) session: Weak<Session>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ManagedTorrent {
|
pub struct ManagedTorrent {
|
||||||
pub id: TorrentId,
|
pub shared: Arc<ManagedTorrentShared>,
|
||||||
// TODO: merge ManagedTorrent and ManagedTorrentInfo
|
pub(crate) state_change_notify: Notify,
|
||||||
pub info: Arc<ManagedTorrentInfo>,
|
pub(crate) locked: RwLock<ManagedTorrentLocked>,
|
||||||
pub(crate) storage_factory: BoxStorageFactory,
|
|
||||||
|
|
||||||
state_change_notify: Notify,
|
|
||||||
locked: RwLock<ManagedTorrentLocked>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ManagedTorrent {
|
impl ManagedTorrent {
|
||||||
pub fn id(&self) -> TorrentId {
|
pub fn id(&self) -> TorrentId {
|
||||||
self.id
|
self.shared.id
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn info(&self) -> &ManagedTorrentInfo {
|
pub fn shared(&self) -> &ManagedTorrentShared {
|
||||||
&self.info
|
&self.shared
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_total_bytes(&self) -> u64 {
|
pub fn get_total_bytes(&self) -> u64 {
|
||||||
self.info.lengths.total_length()
|
self.shared.lengths.total_length()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn info_hash(&self) -> Id20 {
|
pub fn info_hash(&self) -> Id20 {
|
||||||
self.info.info_hash
|
self.shared.info_hash
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn only_files(&self) -> Option<Vec<usize>> {
|
pub fn only_files(&self) -> Option<Vec<usize>> {
|
||||||
|
|
@ -209,18 +211,20 @@ impl ManagedTorrent {
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
peer_rx: Option<PeerStream>,
|
peer_rx: Option<PeerStream>,
|
||||||
start_paused: bool,
|
start_paused: bool,
|
||||||
live_cancellation_token: CancellationToken,
|
|
||||||
init_semaphore: Arc<tokio::sync::Semaphore>,
|
|
||||||
bitv_factory: Arc<dyn BitVFactory>,
|
|
||||||
session_stats: Arc<AtomicSessionStats>,
|
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
let session = self
|
||||||
|
.shared
|
||||||
|
.session
|
||||||
|
.upgrade()
|
||||||
|
.context("session is dead, cannot start torrent")?;
|
||||||
let mut g = self.locked.write();
|
let mut g = self.locked.write();
|
||||||
|
let cancellation_token = session.cancellation_token().child_token();
|
||||||
|
|
||||||
let spawn_fatal_errors_receiver =
|
let spawn_fatal_errors_receiver =
|
||||||
|state: &Arc<Self>,
|
|state: &Arc<Self>,
|
||||||
rx: tokio::sync::oneshot::Receiver<anyhow::Error>,
|
rx: tokio::sync::oneshot::Receiver<anyhow::Error>,
|
||||||
token: CancellationToken| {
|
token: CancellationToken| {
|
||||||
let span = state.info.span.clone();
|
let span = state.shared.span.clone();
|
||||||
let state = Arc::downgrade(state);
|
let state = Arc::downgrade(state);
|
||||||
spawn_with_cancel(
|
spawn_with_cancel(
|
||||||
error_span!(parent: span, "fatal_errors_receiver"),
|
error_span!(parent: span, "fatal_errors_receiver"),
|
||||||
|
|
@ -242,7 +246,7 @@ impl ManagedTorrent {
|
||||||
|
|
||||||
fn spawn_peer_adder(live: &Arc<TorrentStateLive>, peer_rx: Option<PeerStream>) {
|
fn spawn_peer_adder(live: &Arc<TorrentStateLive>, peer_rx: Option<PeerStream>) {
|
||||||
live.spawn(
|
live.spawn(
|
||||||
error_span!(parent: live.meta().span.clone(), "external_peer_adder"),
|
error_span!(parent: live.torrent().span.clone(), "external_peer_adder"),
|
||||||
{
|
{
|
||||||
let live = live.clone();
|
let live = live.clone();
|
||||||
async move {
|
async move {
|
||||||
|
|
@ -293,19 +297,21 @@ impl ManagedTorrent {
|
||||||
let init = init.clone();
|
let init = init.clone();
|
||||||
drop(g);
|
drop(g);
|
||||||
let t = self.clone();
|
let t = self.clone();
|
||||||
let span = self.info().span.clone();
|
let span = self.shared().span.clone();
|
||||||
let token = live_cancellation_token.clone();
|
let token = cancellation_token.clone();
|
||||||
|
|
||||||
spawn_with_cancel(
|
spawn_with_cancel(
|
||||||
error_span!(parent: span.clone(), "initialize_and_start"),
|
error_span!(parent: span.clone(), "initialize_and_start"),
|
||||||
token.clone(),
|
token.clone(),
|
||||||
async move {
|
async move {
|
||||||
let _permit = init_semaphore
|
let concurrent_init_semaphore =
|
||||||
|
session.concurrent_initialize_semaphore.clone();
|
||||||
|
let _permit = concurrent_init_semaphore
|
||||||
.acquire()
|
.acquire()
|
||||||
.await
|
.await
|
||||||
.context("bug: concurrent init semaphore was closed")?;
|
.context("bug: concurrent init semaphore was closed")?;
|
||||||
|
|
||||||
match init.check(bitv_factory).await {
|
match init.check(session.bitv_factory.clone()).await {
|
||||||
Ok(paused) => {
|
Ok(paused) => {
|
||||||
let mut g = t.locked.write();
|
let mut g = t.locked.write();
|
||||||
if let ManagedTorrentState::Initializing(_) = &g.state {
|
if let ManagedTorrentState::Initializing(_) = &g.state {
|
||||||
|
|
@ -321,12 +327,7 @@ impl ManagedTorrent {
|
||||||
}
|
}
|
||||||
|
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
let live = TorrentStateLive::new(
|
let live = TorrentStateLive::new(paused, tx, cancellation_token)?;
|
||||||
paused,
|
|
||||||
tx,
|
|
||||||
live_cancellation_token,
|
|
||||||
session_stats,
|
|
||||||
)?;
|
|
||||||
g.state = ManagedTorrentState::Live(live.clone());
|
g.state = ManagedTorrentState::Live(live.clone());
|
||||||
drop(g);
|
drop(g);
|
||||||
|
|
||||||
|
|
@ -351,24 +352,19 @@ impl ManagedTorrent {
|
||||||
ManagedTorrentState::Paused(_) => {
|
ManagedTorrentState::Paused(_) => {
|
||||||
let paused = g.state.take().assert_paused();
|
let paused = g.state.take().assert_paused();
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
let live = TorrentStateLive::new(
|
let live = TorrentStateLive::new(paused, tx, cancellation_token.clone())?;
|
||||||
paused,
|
|
||||||
tx,
|
|
||||||
live_cancellation_token.clone(),
|
|
||||||
session_stats,
|
|
||||||
)?;
|
|
||||||
g.state = ManagedTorrentState::Live(live.clone());
|
g.state = ManagedTorrentState::Live(live.clone());
|
||||||
drop(g);
|
drop(g);
|
||||||
|
|
||||||
spawn_fatal_errors_receiver(self, rx, live_cancellation_token);
|
spawn_fatal_errors_receiver(self, rx, cancellation_token);
|
||||||
spawn_peer_adder(&live, peer_rx);
|
spawn_peer_adder(&live, peer_rx);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
ManagedTorrentState::Error(_) => {
|
ManagedTorrentState::Error(_) => {
|
||||||
let initializing = Arc::new(TorrentStateInitializing::new(
|
let initializing = Arc::new(TorrentStateInitializing::new(
|
||||||
self.info.clone(),
|
self.shared.clone(),
|
||||||
g.only_files.clone(),
|
g.only_files.clone(),
|
||||||
self.storage_factory.create_and_init(self.info())?,
|
self.shared.storage_factory.create_and_init(self.shared())?,
|
||||||
));
|
));
|
||||||
g.state = ManagedTorrentState::Initializing(initializing.clone());
|
g.state = ManagedTorrentState::Initializing(initializing.clone());
|
||||||
drop(g);
|
drop(g);
|
||||||
|
|
@ -376,14 +372,7 @@ impl ManagedTorrent {
|
||||||
self.state_change_notify.notify_waiters();
|
self.state_change_notify.notify_waiters();
|
||||||
|
|
||||||
// Recurse.
|
// Recurse.
|
||||||
self.start(
|
self.start(peer_rx, start_paused)
|
||||||
peer_rx,
|
|
||||||
start_paused,
|
|
||||||
live_cancellation_token,
|
|
||||||
init_semaphore,
|
|
||||||
bitv_factory,
|
|
||||||
session_stats,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
|
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
|
||||||
}
|
}
|
||||||
|
|
@ -420,7 +409,7 @@ impl ManagedTorrent {
|
||||||
pub fn stats(&self) -> TorrentStats {
|
pub fn stats(&self) -> TorrentStats {
|
||||||
use stats::TorrentStatsState as S;
|
use stats::TorrentStatsState as S;
|
||||||
let mut resp = TorrentStats {
|
let mut resp = TorrentStats {
|
||||||
total_bytes: self.info().lengths.total_length(),
|
total_bytes: self.shared().lengths.total_length(),
|
||||||
file_progress: Vec::new(),
|
file_progress: Vec::new(),
|
||||||
state: S::Error,
|
state: S::Error,
|
||||||
error: None,
|
error: None,
|
||||||
|
|
@ -521,7 +510,7 @@ impl ManagedTorrent {
|
||||||
// Returns true if needed to unpause torrent.
|
// Returns true if needed to unpause torrent.
|
||||||
// This is just implementation detail - it's easier to pause/unpause than to tinker with internals.
|
// This is just implementation detail - it's easier to pause/unpause than to tinker with internals.
|
||||||
pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
|
pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
|
||||||
let file_count = self.info().info.iter_file_lengths()?.count();
|
let file_count = self.shared().info.iter_file_lengths()?.count();
|
||||||
for f in only_files.iter().copied() {
|
for f in only_files.iter().copied() {
|
||||||
if f >= file_count {
|
if f >= file_count {
|
||||||
anyhow::bail!("only_files contains invalid value {f}")
|
anyhow::bail!("only_files contains invalid value {f}")
|
||||||
|
|
@ -550,160 +539,4 @@ impl ManagedTorrent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct ManagedTorrentBuilder {
|
|
||||||
id: TorrentId,
|
|
||||||
info: TorrentMetaV1Info<ByteBufOwned>,
|
|
||||||
output_folder: PathBuf,
|
|
||||||
info_hash: Id20,
|
|
||||||
torrent_bytes: Bytes,
|
|
||||||
info_bytes: Bytes,
|
|
||||||
force_tracker_interval: Option<Duration>,
|
|
||||||
peer_connect_timeout: Option<Duration>,
|
|
||||||
peer_read_write_timeout: Option<Duration>,
|
|
||||||
only_files: Option<Vec<usize>>,
|
|
||||||
trackers: Vec<String>,
|
|
||||||
peer_id: Option<Id20>,
|
|
||||||
spawner: Option<BlockingSpawner>,
|
|
||||||
allow_overwrite: bool,
|
|
||||||
storage_factory: BoxStorageFactory,
|
|
||||||
disk_writer: Option<DiskWorkQueueSender>,
|
|
||||||
connector: Arc<StreamConnector>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ManagedTorrentBuilder {
|
|
||||||
pub fn new(
|
|
||||||
id: usize,
|
|
||||||
info: TorrentMetaV1Info<ByteBufOwned>,
|
|
||||||
info_hash: Id20,
|
|
||||||
torrent_bytes: Bytes,
|
|
||||||
info_bytes: Bytes,
|
|
||||||
output_folder: PathBuf,
|
|
||||||
storage_factory: BoxStorageFactory,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
id,
|
|
||||||
info,
|
|
||||||
info_hash,
|
|
||||||
torrent_bytes,
|
|
||||||
info_bytes,
|
|
||||||
spawner: None,
|
|
||||||
force_tracker_interval: None,
|
|
||||||
peer_connect_timeout: None,
|
|
||||||
peer_read_write_timeout: None,
|
|
||||||
only_files: None,
|
|
||||||
trackers: Default::default(),
|
|
||||||
peer_id: None,
|
|
||||||
allow_overwrite: false,
|
|
||||||
output_folder,
|
|
||||||
storage_factory,
|
|
||||||
disk_writer: None,
|
|
||||||
connector: Arc::new(Default::default()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn only_files(&mut self, only_files: Vec<usize>) -> &mut Self {
|
|
||||||
self.only_files = Some(only_files);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn trackers(&mut self, trackers: Vec<String>) -> &mut Self {
|
|
||||||
self.trackers = trackers;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self {
|
|
||||||
self.force_tracker_interval = Some(force_tracker_interval);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self {
|
|
||||||
self.spawner = Some(spawner);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self {
|
|
||||||
self.peer_id = Some(peer_id);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn allow_overwrite(&mut self, value: bool) -> &mut Self {
|
|
||||||
self.allow_overwrite = value;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self {
|
|
||||||
self.peer_connect_timeout = Some(timeout);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn peer_read_write_timeout(&mut self, timeout: Duration) -> &mut Self {
|
|
||||||
self.peer_read_write_timeout = Some(timeout);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn disk_writer(&mut self, value: DiskWorkQueueSender) -> &mut Self {
|
|
||||||
self.disk_writer = Some(value);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn connector(&mut self, value: Arc<StreamConnector>) -> &mut Self {
|
|
||||||
self.connector = value;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn build(self, span: tracing::Span) -> anyhow::Result<ManagedTorrentHandle> {
|
|
||||||
let lengths = Lengths::from_torrent(&self.info)?;
|
|
||||||
let file_infos = self
|
|
||||||
.info
|
|
||||||
.iter_file_details(&lengths)?
|
|
||||||
.map(|fd| {
|
|
||||||
Ok::<_, anyhow::Error>(FileInfo {
|
|
||||||
relative_filename: fd.filename.to_pathbuf()?,
|
|
||||||
offset_in_torrent: fd.offset,
|
|
||||||
piece_range: fd.pieces,
|
|
||||||
len: fd.len,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.collect::<anyhow::Result<Vec<FileInfo>>>()?;
|
|
||||||
|
|
||||||
let info = Arc::new(ManagedTorrentInfo {
|
|
||||||
span,
|
|
||||||
file_infos,
|
|
||||||
info: self.info,
|
|
||||||
torrent_bytes: self.torrent_bytes,
|
|
||||||
info_bytes: self.info_bytes,
|
|
||||||
info_hash: self.info_hash,
|
|
||||||
trackers: self.trackers.into_iter().collect(),
|
|
||||||
spawner: self.spawner.unwrap_or_default(),
|
|
||||||
peer_id: self.peer_id.unwrap_or_else(generate_peer_id),
|
|
||||||
lengths,
|
|
||||||
options: ManagedTorrentOptions {
|
|
||||||
force_tracker_interval: self.force_tracker_interval,
|
|
||||||
peer_connect_timeout: self.peer_connect_timeout,
|
|
||||||
peer_read_write_timeout: self.peer_read_write_timeout,
|
|
||||||
allow_overwrite: self.allow_overwrite,
|
|
||||||
output_folder: self.output_folder,
|
|
||||||
disk_write_queue: self.disk_writer,
|
|
||||||
},
|
|
||||||
connector: self.connector,
|
|
||||||
});
|
|
||||||
|
|
||||||
let initializing = Arc::new(TorrentStateInitializing::new(
|
|
||||||
info.clone(),
|
|
||||||
self.only_files.clone(),
|
|
||||||
self.storage_factory.create_and_init(&info)?,
|
|
||||||
));
|
|
||||||
Ok(Arc::new(ManagedTorrent {
|
|
||||||
id: self.id,
|
|
||||||
locked: RwLock::new(ManagedTorrentLocked {
|
|
||||||
state: ManagedTorrentState::Initializing(initializing),
|
|
||||||
only_files: self.only_files,
|
|
||||||
}),
|
|
||||||
state_change_notify: Notify::new(),
|
|
||||||
storage_factory: self.storage_factory,
|
|
||||||
info,
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type ManagedTorrentHandle = Arc<ManagedTorrent>;
|
pub type ManagedTorrentHandle = Arc<ManagedTorrent>;
|
||||||
|
|
|
||||||
|
|
@ -5,10 +5,10 @@ use crate::{
|
||||||
type_aliases::FileStorage,
|
type_aliases::FileStorage,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{streaming::TorrentStreams, ManagedTorrentInfo};
|
use super::{streaming::TorrentStreams, ManagedTorrentShared};
|
||||||
|
|
||||||
pub struct TorrentStatePaused {
|
pub struct TorrentStatePaused {
|
||||||
pub(crate) info: Arc<ManagedTorrentInfo>,
|
pub(crate) info: Arc<ManagedTorrentShared>,
|
||||||
pub(crate) files: FileStorage,
|
pub(crate) files: FileStorage,
|
||||||
pub(crate) chunk_tracker: ChunkTracker,
|
pub(crate) chunk_tracker: ChunkTracker,
|
||||||
pub(crate) streams: Arc<TorrentStreams>,
|
pub(crate) streams: Arc<TorrentStreams>,
|
||||||
|
|
|
||||||
|
|
@ -179,7 +179,7 @@ impl AsyncRead for FileStream {
|
||||||
|
|
||||||
let current = poll_try_io!(self
|
let current = poll_try_io!(self
|
||||||
.torrent
|
.torrent
|
||||||
.info()
|
.shared()
|
||||||
.lengths
|
.lengths
|
||||||
.compute_current_piece(self.position, self.file_torrent_abs_offset)
|
.compute_current_piece(self.position, self.file_torrent_abs_offset)
|
||||||
.context("invalid position"));
|
.context("invalid position"));
|
||||||
|
|
@ -280,7 +280,7 @@ impl ManagedTorrent {
|
||||||
s => anyhow::bail!("with_storage_and_file: invalid state: {}", s.name()),
|
s => anyhow::bail!("with_storage_and_file: invalid state: {}", s.name()),
|
||||||
};
|
};
|
||||||
let fi = self
|
let fi = self
|
||||||
.info()
|
.shared()
|
||||||
.file_infos
|
.file_infos
|
||||||
.get(file_id)
|
.get(file_id)
|
||||||
.context("invalid file")?;
|
.context("invalid file")?;
|
||||||
|
|
@ -311,7 +311,7 @@ impl ManagedTorrent {
|
||||||
|
|
||||||
fn is_file_finished(&self, file_id: usize) -> bool {
|
fn is_file_finished(&self, file_id: usize) -> bool {
|
||||||
// TODO: would be nice to remove locking
|
// TODO: would be nice to remove locking
|
||||||
self.with_chunk_tracker(|ct| ct.is_file_finished(&self.info.file_infos[file_id]))
|
self.with_chunk_tracker(|ct| ct.is_file_finished(&self.shared.file_infos[file_id]))
|
||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,12 @@ pub struct SpeedEstimator {
|
||||||
time_remaining_millis: AtomicU64,
|
time_remaining_millis: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for SpeedEstimator {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new(5)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl SpeedEstimator {
|
impl SpeedEstimator {
|
||||||
pub fn new(window_seconds: usize) -> Self {
|
pub fn new(window_seconds: usize) -> Self {
|
||||||
assert!(window_seconds > 1);
|
assert!(window_seconds > 1);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue