This one makes it better for sure

This commit is contained in:
Igor Katson 2024-02-26 22:59:54 +00:00
parent 15d17355b5
commit f42007f436
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 204 additions and 194 deletions

View file

@ -4,6 +4,7 @@ use std::{
io::{BufReader, BufWriter, Read}, io::{BufReader, BufWriter, Read},
net::SocketAddr, net::SocketAddr,
path::PathBuf, path::PathBuf,
pin::Pin,
str::FromStr, str::FromStr,
sync::Arc, sync::Arc,
time::Duration, time::Duration,
@ -24,7 +25,7 @@ use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::{ByteBuf, ByteBufT, ByteString}; use buffers::{ByteBuf, ByteBufT, ByteString};
use clone_to_owned::CloneToOwned; use clone_to_owned::CloneToOwned;
use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig}; use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig};
use futures::{stream::FuturesUnordered, TryFutureExt}; use futures::{stream::FuturesUnordered, Future, FutureExt, TryFutureExt};
use librqbit_core::{ use librqbit_core::{
directories::get_configuration_directory, directories::get_configuration_directory,
magnet::Magnet, magnet::Magnet,
@ -377,8 +378,9 @@ pub(crate) struct CheckedIncomingConnection {
impl Session { impl Session {
/// Create a new session. The passed in folder will be used as a default unless overriden per torrent. /// Create a new session. The passed in folder will be used as a default unless overriden per torrent.
pub async fn new(output_folder: PathBuf) -> anyhow::Result<Arc<Self>> { #[inline(never)]
Self::new_with_opts(output_folder, SessionOptions::default()).await pub fn new(output_folder: PathBuf) -> Pin<Box<dyn Future<Output = anyhow::Result<Arc<Self>>>>> {
Self::new_with_opts(output_folder, SessionOptions::default())
} }
pub fn default_persistence_filename() -> anyhow::Result<PathBuf> { pub fn default_persistence_filename() -> anyhow::Result<PathBuf> {
@ -391,93 +393,97 @@ impl Session {
} }
/// Create a new session with options. /// Create a new session with options.
pub async fn new_with_opts( #[inline(never)]
pub fn new_with_opts(
output_folder: PathBuf, output_folder: PathBuf,
mut opts: SessionOptions, mut opts: SessionOptions,
) -> anyhow::Result<Arc<Self>> { ) -> Pin<Box<dyn Future<Output = anyhow::Result<Arc<Self>>>>> {
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); async move {
let token = CancellationToken::new(); let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
let token = CancellationToken::new();
let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range { let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range {
let (l, p) = create_tcp_listener(port_range) let (l, p) = create_tcp_listener(port_range)
.await
.context("error listening on TCP")?;
info!("Listening on 0.0.0.0:{p} for incoming peer connections");
(Some(l), Some(p))
} else {
(None, None)
};
let dht = if opts.disable_dht {
None
} else {
let dht = if opts.disable_dht_persistence {
DhtBuilder::with_config(DhtConfig {
cancellation_token: Some(token.child_token()),
..Default::default()
})
.await
.context("error initializing DHT")?
} else {
let pdht_config = opts.dht_config.take().unwrap_or_default();
PersistentDht::create(Some(pdht_config), Some(token.clone()))
.await .await
.context("error initializing persistent DHT")? .context("error listening on TCP")?;
info!("Listening on 0.0.0.0:{p} for incoming peer connections");
(Some(l), Some(p))
} else {
(None, None)
}; };
Some(dht) let dht = if opts.disable_dht {
}; None
let peer_opts = opts.peer_opts.unwrap_or_default(); } else {
let persistence_filename = match opts.persistence_filename { let dht = if opts.disable_dht_persistence {
Some(filename) => filename, DhtBuilder::with_config(DhtConfig {
None => Self::default_persistence_filename()?, cancellation_token: Some(token.child_token()),
}; ..Default::default()
let spawner = BlockingSpawner::default(); })
.await
.context("error initializing DHT")?
} else {
let pdht_config = opts.dht_config.take().unwrap_or_default();
PersistentDht::create(Some(pdht_config), Some(token.clone()))
.await
.context("error initializing persistent DHT")?
};
let session = Arc::new(Self { Some(dht)
persistence_filename, };
peer_id, let peer_opts = opts.peer_opts.unwrap_or_default();
dht, let persistence_filename = match opts.persistence_filename {
peer_opts, Some(filename) => filename,
spawner, None => Self::default_persistence_filename()?,
output_folder, };
db: RwLock::new(Default::default()), let spawner = BlockingSpawner::default();
_cancellation_token_drop_guard: token.clone().drop_guard(),
cancellation_token: token,
tcp_listen_port,
});
if let Some(tcp_listener) = tcp_listener { let session = Arc::new(Self {
session.spawn( persistence_filename,
error_span!("tcp_listen", port = tcp_listen_port), peer_id,
session.clone().task_tcp_listener(tcp_listener), dht,
); peer_opts,
} spawner,
output_folder,
db: RwLock::new(Default::default()),
_cancellation_token_drop_guard: token.clone().drop_guard(),
cancellation_token: token,
tcp_listen_port,
});
if let Some(listen_port) = tcp_listen_port { if let Some(tcp_listener) = tcp_listener {
if opts.enable_upnp_port_forwarding {
session.spawn( session.spawn(
error_span!("upnp_forward", port = listen_port), error_span!("tcp_listen", port = tcp_listen_port),
session.clone().task_upnp_port_forwarder(listen_port), session.clone().task_tcp_listener(tcp_listener),
); );
} }
}
if opts.persistence { if let Some(listen_port) = tcp_listen_port {
info!( if opts.enable_upnp_port_forwarding {
"will use {:?} for session persistence", session.spawn(
session.persistence_filename error_span!("upnp_forward", port = listen_port),
); session.clone().task_upnp_port_forwarder(listen_port),
if let Some(parent) = session.persistence_filename.parent() { );
std::fs::create_dir_all(parent).with_context(|| { }
format!("couldn't create directory {:?} for session storage", parent)
})?;
} }
let persistence_task = session.clone().task_persistence();
session.spawn(error_span!("session_persistence"), persistence_task);
}
Ok(session) if opts.persistence {
info!(
"will use {:?} for session persistence",
session.persistence_filename
);
if let Some(parent) = session.persistence_filename.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!("couldn't create directory {:?} for session storage", parent)
})?;
}
let persistence_task = session.clone().task_persistence();
session.spawn(error_span!("session_persistence"), persistence_task);
}
Ok(session)
}
.boxed()
} }
async fn task_persistence(self: Arc<Self>) -> anyhow::Result<()> { async fn task_persistence(self: Arc<Self>) -> anyhow::Result<()> {
@ -738,136 +744,139 @@ impl Session {
} }
/// Add a torrent to the session. /// Add a torrent to the session.
pub async fn add_torrent( pub fn add_torrent<'a>(
&self, &'a self,
add: AddTorrent<'_>, add: AddTorrent<'a>,
opts: Option<AddTorrentOptions>, opts: Option<AddTorrentOptions>,
) -> anyhow::Result<AddTorrentResponse> { ) -> Pin<Box<dyn Future<Output = anyhow::Result<AddTorrentResponse>> + Send + 'a>> {
// Magnet links are different in that we first need to discover the metadata. async move {
let span = error_span!("add_torrent"); // Magnet links are different in that we first need to discover the metadata.
let _ = span.enter(); let span = error_span!("add_torrent");
let _ = span.enter();
let opts = opts.unwrap_or_default(); let opts = opts.unwrap_or_default();
let announce_port = if opts.list_only { let announce_port = if opts.list_only {
None None
} else { } else {
self.tcp_listen_port self.tcp_listen_port
}; };
let paused = opts.list_only || opts.paused; let paused = opts.list_only || opts.paused;
// The main difference between magnet link and torrent file, is that we need to resolve the magnet link // The main difference between magnet link and torrent file, is that we need to resolve the magnet link
// into a torrent file by connecting to peers that support extended handshakes. // into a torrent file by connecting to peers that support extended handshakes.
// So we must discover at least one peer and connect to it to be able to proceed further. // So we must discover at least one peer and connect to it to be able to proceed further.
let (info_hash, info, trackers, peer_rx, initial_peers) = match add { let (info_hash, info, trackers, peer_rx, initial_peers) = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
let magnet = let magnet = Magnet::parse(&magnet)
Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; .context("provided path is not a valid magnet URL")?;
let info_hash = magnet let info_hash = magnet
.as_id20() .as_id20()
.context("magnet link didn't contain a BTv1 infohash")?; .context("magnet link didn't contain a BTv1 infohash")?;
let peer_rx = self.make_peer_rx( let peer_rx = self.make_peer_rx(
info_hash, info_hash,
magnet.trackers.clone(), magnet.trackers.clone(),
announce_port,
opts.force_tracker_interval,
)?;
let peer_rx = match peer_rx {
Some(peer_rx) => peer_rx,
None => bail!("can't find peers: DHT disabled and no trackers in magnet"),
};
debug!(?info_hash, "querying DHT");
let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver(
self.peer_id,
info_hash,
opts.initial_peers.clone().unwrap_or_default(),
peer_rx,
Some(self.merge_peer_opts(opts.peer_opts)),
)
.await
{
ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen),
ReadMetainfoResult::ChannelClosed { .. } => {
anyhow::bail!("DHT died, no way to discover torrent metainfo")
}
};
debug!(?info, "received result from DHT");
(
info_hash,
info,
magnet.trackers,
Some(peer_rx),
initial_peers,
)
}
other => {
let torrent = match other {
AddTorrent::Url(url)
if url.starts_with("http://") || url.starts_with("https://") =>
{
torrent_from_url(&url).await?
}
AddTorrent::Url(url) => {
bail!(
"unsupported URL {:?}. Supporting magnet:, http:, and https",
url
)
}
AddTorrent::TorrentFileBytes(bytes) => {
torrent_from_bytes(&bytes).context("error decoding torrent")?
}
AddTorrent::TorrentInfo(t) => *t,
};
let trackers = torrent
.iter_announce()
.filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) {
Ok(url) => Some(url.to_owned()),
Err(_) => {
warn!("cannot parse tracker url as utf-8, ignoring");
None
}
})
.collect::<Vec<_>>();
let peer_rx = if paused {
None
} else {
self.make_peer_rx(
torrent.info_hash,
trackers.clone(),
announce_port, announce_port,
opts.force_tracker_interval, opts.force_tracker_interval,
)? )?;
}; let peer_rx = match peer_rx {
Some(peer_rx) => peer_rx,
None => bail!("can't find peers: DHT disabled and no trackers in magnet"),
};
( debug!(?info_hash, "querying DHT");
torrent.info_hash, let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver(
torrent.info, self.peer_id,
trackers, info_hash,
peer_rx, opts.initial_peers.clone().unwrap_or_default(),
opts.initial_peers peer_rx,
.clone() Some(self.merge_peer_opts(opts.peer_opts)),
.unwrap_or_default() )
.into_iter() .await
.collect(), {
) ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen),
} ReadMetainfoResult::ChannelClosed { .. } => {
}; anyhow::bail!("DHT died, no way to discover torrent metainfo")
}
};
debug!(?info, "received result from DHT");
(
info_hash,
info,
magnet.trackers,
Some(peer_rx),
initial_peers,
)
}
other => {
let torrent = match other {
AddTorrent::Url(url)
if url.starts_with("http://") || url.starts_with("https://") =>
{
torrent_from_url(&url).await?
}
AddTorrent::Url(url) => {
bail!(
"unsupported URL {:?}. Supporting magnet:, http:, and https",
url
)
}
AddTorrent::TorrentFileBytes(bytes) => {
torrent_from_bytes(&bytes).context("error decoding torrent")?
}
AddTorrent::TorrentInfo(t) => *t,
};
self.main_torrent_info( let trackers = torrent
info_hash, .iter_announce()
info, .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) {
trackers, Ok(url) => Some(url.to_owned()),
peer_rx, Err(_) => {
initial_peers.into_iter().collect(), warn!("cannot parse tracker url as utf-8, ignoring");
opts, None
) }
.await })
.collect::<Vec<_>>();
let peer_rx = if paused {
None
} else {
self.make_peer_rx(
torrent.info_hash,
trackers.clone(),
announce_port,
opts.force_tracker_interval,
)?
};
(
torrent.info_hash,
torrent.info,
trackers,
peer_rx,
opts.initial_peers
.clone()
.unwrap_or_default()
.into_iter()
.collect(),
)
}
};
self.main_torrent_info(
info_hash,
info,
trackers,
peer_rx,
initial_peers.into_iter().collect(),
opts,
)
.await
}
.boxed()
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]

View file

@ -60,6 +60,7 @@ pub struct InitLoggingResult {
pub line_broadcast: LineBroadcast, pub line_broadcast: LineBroadcast,
} }
#[inline(never)]
pub fn init_logging(opts: InitLoggingOptions) -> anyhow::Result<InitLoggingResult> { pub fn init_logging(opts: InitLoggingOptions) -> anyhow::Result<InitLoggingResult> {
let stderr_filter = EnvFilter::builder() let stderr_filter = EnvFilter::builder()
.with_default_directive( .with_default_directive(