[Feature] option to disable upload
This commit is contained in:
parent
9f798696ff
commit
fc7ae3bbe5
6 changed files with 32 additions and 14 deletions
|
|
@ -26,7 +26,7 @@ use crate::{read_buf::ReadBuf, spawn_utils::BlockingSpawner, stream_connect::Str
|
||||||
|
|
||||||
pub trait PeerConnectionHandler {
|
pub trait PeerConnectionHandler {
|
||||||
fn on_connected(&self, _connection_time: Duration) {}
|
fn on_connected(&self, _connection_time: Duration) {}
|
||||||
fn get_have_bytes(&self) -> u64;
|
fn should_send_bitfield(&self) -> bool;
|
||||||
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize>;
|
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize>;
|
||||||
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()>;
|
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()>;
|
||||||
fn on_extended_handshake(
|
fn on_extended_handshake(
|
||||||
|
|
@ -268,7 +268,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
.keep_alive_interval
|
.keep_alive_interval
|
||||||
.unwrap_or_else(|| Duration::from_secs(120));
|
.unwrap_or_else(|| Duration::from_secs(120));
|
||||||
|
|
||||||
if self.handler.get_have_bytes() > 0 {
|
if self.handler.should_send_bitfield() {
|
||||||
let len = self
|
let len = self
|
||||||
.handler
|
.handler
|
||||||
.serialize_bitfield_message_to_buf(&mut write_buf)?;
|
.serialize_bitfield_message_to_buf(&mut write_buf)?;
|
||||||
|
|
|
||||||
|
|
@ -148,8 +148,8 @@ struct Handler {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerConnectionHandler for Handler {
|
impl PeerConnectionHandler for Handler {
|
||||||
fn get_have_bytes(&self) -> u64 {
|
fn should_send_bitfield(&self) -> bool {
|
||||||
0
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
fn serialize_bitfield_message_to_buf(&self, _buf: &mut Vec<u8>) -> anyhow::Result<usize> {
|
fn serialize_bitfield_message_to_buf(&self, _buf: &mut Vec<u8>) -> anyhow::Result<usize> {
|
||||||
|
|
|
||||||
|
|
@ -124,6 +124,8 @@ pub struct Session {
|
||||||
|
|
||||||
pub(crate) stats: SessionStats,
|
pub(crate) stats: SessionStats,
|
||||||
|
|
||||||
|
disable_upload: bool,
|
||||||
|
|
||||||
// This is stored for all tasks to stop when session is dropped.
|
// This is stored for all tasks to stop when session is dropped.
|
||||||
_cancellation_token_drop_guard: DropGuard,
|
_cancellation_token_drop_guard: DropGuard,
|
||||||
}
|
}
|
||||||
|
|
@ -410,6 +412,8 @@ pub struct SessionOptions {
|
||||||
|
|
||||||
// the root span to use. If not set will be None.
|
// the root span to use. If not set will be None.
|
||||||
pub root_span: Option<Span>,
|
pub root_span: Option<Span>,
|
||||||
|
|
||||||
|
pub disable_upload: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_tcp_listener(
|
async fn create_tcp_listener(
|
||||||
|
|
@ -485,6 +489,10 @@ impl Session {
|
||||||
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
|
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
|
||||||
let token = opts.cancellation_token.take().unwrap_or_default();
|
let token = opts.cancellation_token.take().unwrap_or_default();
|
||||||
|
|
||||||
|
if opts.disable_upload {
|
||||||
|
warn!("uploading disabled");
|
||||||
|
}
|
||||||
|
|
||||||
let (tcp_listener, tcp_listen_port) =
|
let (tcp_listener, tcp_listen_port) =
|
||||||
if let Some(port_range) = opts.listen_port_range.clone() {
|
if let Some(port_range) = opts.listen_port_range.clone() {
|
||||||
let (l, p) = create_tcp_listener(port_range)
|
let (l, p) = create_tcp_listener(port_range)
|
||||||
|
|
@ -618,6 +626,7 @@ impl Session {
|
||||||
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(
|
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(
|
||||||
opts.concurrent_init_limit.unwrap_or(3),
|
opts.concurrent_init_limit.unwrap_or(3),
|
||||||
)),
|
)),
|
||||||
|
disable_upload: opts.disable_upload,
|
||||||
});
|
});
|
||||||
|
|
||||||
if let Some(mut disk_write_rx) = disk_write_rx {
|
if let Some(mut disk_write_rx) = disk_write_rx {
|
||||||
|
|
@ -1141,6 +1150,7 @@ impl Session {
|
||||||
allow_overwrite: opts.overwrite,
|
allow_overwrite: opts.overwrite,
|
||||||
output_folder,
|
output_folder,
|
||||||
disk_write_queue: self.disk_write_tx.clone(),
|
disk_write_queue: self.disk_write_tx.clone(),
|
||||||
|
disable_upload: self.disable_upload,
|
||||||
},
|
},
|
||||||
connector: self.connector.clone(),
|
connector: self.connector.clone(),
|
||||||
session: Arc::downgrade(self),
|
session: Arc::downgrade(self),
|
||||||
|
|
|
||||||
|
|
@ -173,14 +173,6 @@ impl TorrentStateLocked {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct TorrentStateOptions {
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub peer_connect_timeout: Option<Duration>,
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub peer_read_write_timeout: Option<Duration>,
|
|
||||||
}
|
|
||||||
|
|
||||||
const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024;
|
const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024;
|
||||||
|
|
||||||
pub struct TorrentStateLive {
|
pub struct TorrentStateLive {
|
||||||
|
|
@ -929,11 +921,18 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_have_bytes(&self) -> u64 {
|
fn should_send_bitfield(&self) -> bool {
|
||||||
self.state.get_approx_have_bytes()
|
if self.state.torrent().options.disable_upload {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.state.get_approx_have_bytes() > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
fn should_transmit_have(&self, id: ValidPieceIndex) -> bool {
|
fn should_transmit_have(&self, id: ValidPieceIndex) -> bool {
|
||||||
|
if self.state.torrent.options.disable_upload {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
let have = self
|
let have = self
|
||||||
.state
|
.state
|
||||||
.peers
|
.peers
|
||||||
|
|
@ -1164,6 +1163,10 @@ impl PeerHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_download_request(&self, request: Request) -> anyhow::Result<()> {
|
fn on_download_request(&self, request: Request) -> anyhow::Result<()> {
|
||||||
|
if self.state.torrent().options.disable_upload {
|
||||||
|
anyhow::bail!("upload disabled, but peer requested a piece")
|
||||||
|
}
|
||||||
|
|
||||||
let piece_index = match self.state.lengths.validate_piece_index(request.index) {
|
let piece_index = match self.state.lengths.validate_piece_index(request.index) {
|
||||||
Some(p) => p,
|
Some(p) => p,
|
||||||
None => {
|
None => {
|
||||||
|
|
|
||||||
|
|
@ -103,6 +103,7 @@ pub(crate) struct ManagedTorrentOptions {
|
||||||
pub allow_overwrite: bool,
|
pub allow_overwrite: bool,
|
||||||
pub output_folder: PathBuf,
|
pub output_folder: PathBuf,
|
||||||
pub disk_write_queue: Option<DiskWorkQueueSender>,
|
pub disk_write_queue: Option<DiskWorkQueueSender>,
|
||||||
|
pub disable_upload: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Common information about torrent shared among all possible states.
|
/// Common information about torrent shared among all possible states.
|
||||||
|
|
|
||||||
|
|
@ -202,6 +202,9 @@ struct Opts {
|
||||||
#[cfg(not(target_os = "windows"))]
|
#[cfg(not(target_os = "windows"))]
|
||||||
#[arg(long, env = "RQBIT_UMASK", value_parser=parse_umask)]
|
#[arg(long, env = "RQBIT_UMASK", value_parser=parse_umask)]
|
||||||
umask: Option<libc::mode_t>,
|
umask: Option<libc::mode_t>,
|
||||||
|
|
||||||
|
#[arg(long, env = "RQBIT_DISABLE_UPLOAD")]
|
||||||
|
disable_upload: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
|
|
@ -454,6 +457,7 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()>
|
||||||
root_span: None,
|
root_span: None,
|
||||||
fastresume: false,
|
fastresume: false,
|
||||||
cancellation_token: Some(cancel.clone()),
|
cancellation_token: Some(cancel.clone()),
|
||||||
|
disable_upload: opts.disable_upload,
|
||||||
};
|
};
|
||||||
|
|
||||||
let stats_printer = |session: Arc<Session>| async move {
|
let stats_printer = |session: Arc<Session>| async move {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue