Limit concurrency of torrent initialization
This commit is contained in:
parent
37ee8b70ba
commit
726a5e14f9
3 changed files with 36 additions and 3 deletions
|
|
@ -112,6 +112,8 @@ pub struct Session {
|
||||||
reqwest_client: reqwest::Client,
|
reqwest_client: reqwest::Client,
|
||||||
connector: Arc<StreamConnector>,
|
connector: Arc<StreamConnector>,
|
||||||
|
|
||||||
|
concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>,
|
||||||
|
|
||||||
// This is stored for all tasks to stop when session is dropped.
|
// This is stored for all tasks to stop when session is dropped.
|
||||||
_cancellation_token_drop_guard: DropGuard,
|
_cancellation_token_drop_guard: DropGuard,
|
||||||
}
|
}
|
||||||
|
|
@ -375,6 +377,9 @@ pub struct SessionOptions {
|
||||||
|
|
||||||
// socks5://[username:password@]host:port
|
// socks5://[username:password@]host:port
|
||||||
pub socks_proxy_url: Option<String>,
|
pub socks_proxy_url: Option<String>,
|
||||||
|
|
||||||
|
// how many concurrent torrent initializations can happen
|
||||||
|
pub concurrent_init_limit: Option<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_tcp_listener(
|
async fn create_tcp_listener(
|
||||||
|
|
@ -562,6 +567,7 @@ impl Session {
|
||||||
default_storage_factory: opts.default_storage_factory,
|
default_storage_factory: opts.default_storage_factory,
|
||||||
reqwest_client,
|
reqwest_client,
|
||||||
connector: stream_connector,
|
connector: stream_connector,
|
||||||
|
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3)))
|
||||||
});
|
});
|
||||||
|
|
||||||
if let Some(mut disk_write_rx) = disk_write_rx {
|
if let Some(mut disk_write_rx) = disk_write_rx {
|
||||||
|
|
@ -1085,7 +1091,12 @@ impl Session {
|
||||||
let _ = span.enter();
|
let _ = span.enter();
|
||||||
|
|
||||||
managed_torrent
|
managed_torrent
|
||||||
.start(peer_rx, opts.paused, self.cancellation_token.child_token())
|
.start(
|
||||||
|
peer_rx,
|
||||||
|
opts.paused,
|
||||||
|
self.cancellation_token.child_token(),
|
||||||
|
self.concurrent_initialize_semaphore.clone(),
|
||||||
|
)
|
||||||
.context("error starting torrent")?;
|
.context("error starting torrent")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1233,7 +1244,12 @@ impl Session {
|
||||||
self.tcp_listen_port,
|
self.tcp_listen_port,
|
||||||
handle.info().options.force_tracker_interval,
|
handle.info().options.force_tracker_interval,
|
||||||
)?;
|
)?;
|
||||||
handle.start(peer_rx, false, self.cancellation_token.child_token())?;
|
handle.start(
|
||||||
|
peer_rx,
|
||||||
|
false,
|
||||||
|
self.cancellation_token.child_token(),
|
||||||
|
self.concurrent_initialize_semaphore.clone(),
|
||||||
|
)?;
|
||||||
self.try_update_persistence_metadata(handle).await;
|
self.try_update_persistence_metadata(handle).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -208,6 +208,7 @@ impl ManagedTorrent {
|
||||||
peer_rx: Option<PeerStream>,
|
peer_rx: Option<PeerStream>,
|
||||||
start_paused: bool,
|
start_paused: bool,
|
||||||
live_cancellation_token: CancellationToken,
|
live_cancellation_token: CancellationToken,
|
||||||
|
init_semaphore: Arc<tokio::sync::Semaphore>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut g = self.locked.write();
|
let mut g = self.locked.write();
|
||||||
|
|
||||||
|
|
@ -283,10 +284,16 @@ impl ManagedTorrent {
|
||||||
let t = self.clone();
|
let t = self.clone();
|
||||||
let span = self.info().span.clone();
|
let span = self.info().span.clone();
|
||||||
let token = live_cancellation_token.clone();
|
let token = live_cancellation_token.clone();
|
||||||
|
|
||||||
spawn_with_cancel(
|
spawn_with_cancel(
|
||||||
error_span!(parent: span.clone(), "initialize_and_start"),
|
error_span!(parent: span.clone(), "initialize_and_start"),
|
||||||
token.clone(),
|
token.clone(),
|
||||||
async move {
|
async move {
|
||||||
|
let _permit = init_semaphore
|
||||||
|
.acquire()
|
||||||
|
.await
|
||||||
|
.context("bug: concurrent init semaphore was closed")?;
|
||||||
|
|
||||||
match init.check().await {
|
match init.check().await {
|
||||||
Ok(paused) => {
|
Ok(paused) => {
|
||||||
let mut g = t.locked.write();
|
let mut g = t.locked.write();
|
||||||
|
|
@ -344,7 +351,12 @@ impl ManagedTorrent {
|
||||||
drop(g);
|
drop(g);
|
||||||
|
|
||||||
// Recurse.
|
// Recurse.
|
||||||
self.start(peer_rx, start_paused, live_cancellation_token)
|
self.start(
|
||||||
|
peer_rx,
|
||||||
|
start_paused,
|
||||||
|
live_cancellation_token,
|
||||||
|
init_semaphore,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
|
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -122,6 +122,10 @@ struct Opts {
|
||||||
/// Alternatively, set this as an environment variable RQBIT_SOCKS_PROXY_URL
|
/// Alternatively, set this as an environment variable RQBIT_SOCKS_PROXY_URL
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
socks_url: Option<String>,
|
socks_url: Option<String>,
|
||||||
|
|
||||||
|
/// How many torrents can be initializing (rehashing) at the same time
|
||||||
|
#[arg(long, default_value = "5")]
|
||||||
|
concurrent_init_limit: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
|
|
@ -335,6 +339,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
socks_proxy_url: socks_url,
|
socks_proxy_url: socks_url,
|
||||||
|
concurrent_init_limit: Some(opts.concurrent_init_limit),
|
||||||
};
|
};
|
||||||
|
|
||||||
let stats_printer = |session: Arc<Session>| async move {
|
let stats_printer = |session: Arc<Session>| async move {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue