Remove ManagedTorrentBuilder

This commit is contained in:
Igor Katson 2024-08-21 15:51:58 +01:00
parent 338b221e8d
commit 80f4d3b1b2
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 77 additions and 206 deletions

View file

@ -12,6 +12,7 @@ use crate::{
api::TorrentIdOrHash,
bitv_factory::{BitVFactory, NonPersistentBitVFactory},
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
file_info::FileInfo,
merge_streams::merge_streams,
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
@ -23,10 +24,11 @@ use crate::{
},
stream_connect::{SocksProxyConfig, StreamConnector},
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
initializing::TorrentStateInitializing, ManagedTorrentHandle, ManagedTorrentLocked,
ManagedTorrentOptions, ManagedTorrentState, TorrentStateLive,
},
type_aliases::{DiskWorkQueueSender, PeerStream},
ManagedTorrentInfo,
ManagedTorrent, ManagedTorrentInfo,
};
use anyhow::{bail, Context};
use bencode::bencode_serialize_to_writer;
@ -43,6 +45,7 @@ use itertools::Itertools;
use librqbit_core::{
constants::CHUNK_SIZE,
directories::get_configuration_directory,
lengths::Lengths,
magnet::Magnet,
peer_id::generate_peer_id,
spawn_utils::spawn_with_cancel,
@ -51,7 +54,10 @@ use librqbit_core::{
use parking_lot::RwLock;
use peer_binary_protocol::Handshake;
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio::{
net::{TcpListener, TcpStream},
sync::Notify,
};
use tokio_stream::StreamExt;
use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span};
@ -1023,7 +1029,7 @@ impl Session {
}
async fn main_torrent_info(
&self,
self: &Arc<Self>,
add_res: InternalAddResult,
mut opts: AddTorrentOptions,
) -> anyhow::Result<AddTorrentResponse> {
@ -1084,43 +1090,6 @@ impl Session {
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
};
let mut builder = ManagedTorrentBuilder::new(
id,
info,
info_hash,
torrent_bytes,
info_bytes,
output_folder,
storage_factory,
);
builder
.allow_overwrite(opts.overwrite)
.spawner(self.spawner)
.trackers(trackers)
.connector(self.connector.clone())
.peer_id(self.peer_id);
if let Some(d) = self.disk_write_tx.clone() {
builder.disk_writer(d);
}
if let Some(only_files) = only_files {
builder.only_files(only_files);
}
if let Some(interval) = opts.force_tracker_interval {
builder.force_tracker_interval(interval);
}
let peer_opts = self.merge_peer_opts(opts.peer_opts);
if let Some(t) = peer_opts.connect_timeout {
builder.peer_connect_timeout(t);
}
if let Some(t) = peer_opts.read_write_timeout {
builder.peer_read_write_timeout(t);
}
let managed_torrent = {
let mut g = self.db.write();
if let Some((id, handle)) = g.torrents.iter().find_map(|(eid, t)| {
@ -1132,13 +1101,70 @@ impl Session {
}) {
return Ok(AddTorrentResponse::AlreadyManaged(id, handle));
}
let managed_torrent = builder.build(error_span!(parent: self.rs(), "torrent", id))?;
g.add_torrent(managed_torrent.clone(), id);
managed_torrent
let lengths = Lengths::from_torrent(&info)?;
let file_infos = info
.iter_file_details(&lengths)?
.map(|fd| {
Ok::<_, anyhow::Error>(FileInfo {
relative_filename: fd.filename.to_pathbuf()?,
offset_in_torrent: fd.offset,
piece_range: fd.pieces,
len: fd.len,
})
})
.collect::<anyhow::Result<Vec<FileInfo>>>()?;
let span = error_span!(parent: self.rs(), "torrent", id);
let peer_opts = self.merge_peer_opts(opts.peer_opts);
let minfo = Arc::new(ManagedTorrentInfo {
span,
file_infos,
info,
torrent_bytes,
info_bytes,
info_hash,
trackers: trackers.into_iter().collect(),
spawner: self.spawner,
peer_id: self.peer_id,
lengths,
options: ManagedTorrentOptions {
force_tracker_interval: opts.force_tracker_interval,
peer_connect_timeout: peer_opts.connect_timeout,
peer_read_write_timeout: peer_opts.read_write_timeout,
allow_overwrite: opts.overwrite,
output_folder,
disk_write_queue: self.disk_write_tx.clone(),
},
connector: self.connector.clone(),
});
let initializing = Arc::new(TorrentStateInitializing::new(
minfo.clone(),
only_files.clone(),
storage_factory.create_and_init(&minfo)?,
));
let handle = Arc::new(ManagedTorrent {
id,
locked: RwLock::new(ManagedTorrentLocked {
state: ManagedTorrentState::Initializing(initializing),
only_files,
}),
state_change_notify: Notify::new(),
storage_factory,
info: minfo,
session: Arc::downgrade(self),
});
g.add_torrent(handle.clone(), id);
handle
};
if let Some(p) = self.persistence.as_ref() {
p.store(id, &managed_torrent).await?;
if let Err(e) = p.store(id, &managed_torrent).await {
self.db.write().torrents.remove(&id);
return Err(e);
}
}
// Merge "initial_peers" and "peer_rx" into one stream.

View file

@ -9,6 +9,7 @@ use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use anyhow::bail;
@ -19,7 +20,6 @@ use futures::future::BoxFuture;
use futures::FutureExt;
use librqbit_core::hash_id::Id20;
use librqbit_core::lengths::Lengths;
use librqbit_core::peer_id::generate_peer_id;
use librqbit_core::spawn_utils::spawn_with_cancel;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
@ -36,7 +36,6 @@ use tracing::warn;
use crate::bitv_factory::BitVFactory;
use crate::chunk_tracker::ChunkTracker;
use crate::file_info::FileInfo;
use crate::session::TorrentId;
use crate::session_stats::atomic::AtomicSessionStats;
use crate::spawn_utils::BlockingSpawner;
@ -46,6 +45,7 @@ use crate::torrent_state::stats::LiveStats;
use crate::type_aliases::DiskWorkQueueSender;
use crate::type_aliases::FileInfos;
use crate::type_aliases::PeerStream;
use crate::Session;
use initializing::TorrentStateInitializing;
@ -87,7 +87,7 @@ impl ManagedTorrentState {
}
pub(crate) struct ManagedTorrentLocked {
pub state: ManagedTorrentState,
pub(crate) state: ManagedTorrentState,
pub(crate) only_files: Option<Vec<usize>>,
}
@ -122,8 +122,9 @@ pub struct ManagedTorrent {
pub info: Arc<ManagedTorrentInfo>,
pub(crate) storage_factory: BoxStorageFactory,
state_change_notify: Notify,
locked: RwLock<ManagedTorrentLocked>,
pub(crate) session: Weak<Session>,
pub(crate) state_change_notify: Notify,
pub(crate) locked: RwLock<ManagedTorrentLocked>,
}
impl ManagedTorrent {
@ -550,160 +551,4 @@ impl ManagedTorrent {
}
}
pub(crate) struct ManagedTorrentBuilder {
id: TorrentId,
info: TorrentMetaV1Info<ByteBufOwned>,
output_folder: PathBuf,
info_hash: Id20,
torrent_bytes: Bytes,
info_bytes: Bytes,
force_tracker_interval: Option<Duration>,
peer_connect_timeout: Option<Duration>,
peer_read_write_timeout: Option<Duration>,
only_files: Option<Vec<usize>>,
trackers: Vec<String>,
peer_id: Option<Id20>,
spawner: Option<BlockingSpawner>,
allow_overwrite: bool,
storage_factory: BoxStorageFactory,
disk_writer: Option<DiskWorkQueueSender>,
connector: Arc<StreamConnector>,
}
impl ManagedTorrentBuilder {
pub fn new(
id: usize,
info: TorrentMetaV1Info<ByteBufOwned>,
info_hash: Id20,
torrent_bytes: Bytes,
info_bytes: Bytes,
output_folder: PathBuf,
storage_factory: BoxStorageFactory,
) -> Self {
Self {
id,
info,
info_hash,
torrent_bytes,
info_bytes,
spawner: None,
force_tracker_interval: None,
peer_connect_timeout: None,
peer_read_write_timeout: None,
only_files: None,
trackers: Default::default(),
peer_id: None,
allow_overwrite: false,
output_folder,
storage_factory,
disk_writer: None,
connector: Arc::new(Default::default()),
}
}
pub fn only_files(&mut self, only_files: Vec<usize>) -> &mut Self {
self.only_files = Some(only_files);
self
}
pub fn trackers(&mut self, trackers: Vec<String>) -> &mut Self {
self.trackers = trackers;
self
}
pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self {
self.force_tracker_interval = Some(force_tracker_interval);
self
}
pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self {
self.spawner = Some(spawner);
self
}
pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self {
self.peer_id = Some(peer_id);
self
}
pub fn allow_overwrite(&mut self, value: bool) -> &mut Self {
self.allow_overwrite = value;
self
}
pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self {
self.peer_connect_timeout = Some(timeout);
self
}
pub fn peer_read_write_timeout(&mut self, timeout: Duration) -> &mut Self {
self.peer_read_write_timeout = Some(timeout);
self
}
pub fn disk_writer(&mut self, value: DiskWorkQueueSender) -> &mut Self {
self.disk_writer = Some(value);
self
}
pub fn connector(&mut self, value: Arc<StreamConnector>) -> &mut Self {
self.connector = value;
self
}
pub fn build(self, span: tracing::Span) -> anyhow::Result<ManagedTorrentHandle> {
let lengths = Lengths::from_torrent(&self.info)?;
let file_infos = self
.info
.iter_file_details(&lengths)?
.map(|fd| {
Ok::<_, anyhow::Error>(FileInfo {
relative_filename: fd.filename.to_pathbuf()?,
offset_in_torrent: fd.offset,
piece_range: fd.pieces,
len: fd.len,
})
})
.collect::<anyhow::Result<Vec<FileInfo>>>()?;
let info = Arc::new(ManagedTorrentInfo {
span,
file_infos,
info: self.info,
torrent_bytes: self.torrent_bytes,
info_bytes: self.info_bytes,
info_hash: self.info_hash,
trackers: self.trackers.into_iter().collect(),
spawner: self.spawner.unwrap_or_default(),
peer_id: self.peer_id.unwrap_or_else(generate_peer_id),
lengths,
options: ManagedTorrentOptions {
force_tracker_interval: self.force_tracker_interval,
peer_connect_timeout: self.peer_connect_timeout,
peer_read_write_timeout: self.peer_read_write_timeout,
allow_overwrite: self.allow_overwrite,
output_folder: self.output_folder,
disk_write_queue: self.disk_writer,
},
connector: self.connector,
});
let initializing = Arc::new(TorrentStateInitializing::new(
info.clone(),
self.only_files.clone(),
self.storage_factory.create_and_init(&info)?,
));
Ok(Arc::new(ManagedTorrent {
id: self.id,
locked: RwLock::new(ManagedTorrentLocked {
state: ManagedTorrentState::Initializing(initializing),
only_files: self.only_files,
}),
state_change_notify: Notify::new(),
storage_factory: self.storage_factory,
info,
}))
}
}
pub type ManagedTorrentHandle = Arc<ManagedTorrent>;