diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 55977de..04a1be4 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -26,7 +26,7 @@ use crate::{read_buf::ReadBuf, spawn_utils::BlockingSpawner, stream_connect::Str pub trait PeerConnectionHandler { fn on_connected(&self, _connection_time: Duration) {} - fn get_have_bytes(&self) -> u64; + fn should_send_bitfield(&self) -> bool; fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> anyhow::Result; fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>; fn on_extended_handshake( @@ -268,7 +268,7 @@ impl PeerConnection { .keep_alive_interval .unwrap_or_else(|| Duration::from_secs(120)); - if self.handler.get_have_bytes() > 0 { + if self.handler.should_send_bitfield() { let len = self .handler .serialize_bitfield_message_to_buf(&mut write_buf)?; diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index 6485cd0..a22ab43 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -148,8 +148,8 @@ struct Handler { } impl PeerConnectionHandler for Handler { - fn get_have_bytes(&self) -> u64 { - 0 + fn should_send_bitfield(&self) -> bool { + false } fn serialize_bitfield_message_to_buf(&self, _buf: &mut Vec) -> anyhow::Result { diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 9c956bb..d4f0a1a 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -124,6 +124,8 @@ pub struct Session { pub(crate) stats: SessionStats, + disable_upload: bool, + // This is stored for all tasks to stop when session is dropped. _cancellation_token_drop_guard: DropGuard, } @@ -410,6 +412,8 @@ pub struct SessionOptions { // the root span to use. If not set will be None. pub root_span: Option, + + pub disable_upload: bool, } async fn create_tcp_listener( @@ -485,6 +489,10 @@ impl Session { let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); let token = opts.cancellation_token.take().unwrap_or_default(); + if opts.disable_upload { + warn!("uploading disabled"); + } + let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range.clone() { let (l, p) = create_tcp_listener(port_range) @@ -618,6 +626,7 @@ impl Session { concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new( opts.concurrent_init_limit.unwrap_or(3), )), + disable_upload: opts.disable_upload, }); if let Some(mut disk_write_rx) = disk_write_rx { @@ -1141,6 +1150,7 @@ impl Session { allow_overwrite: opts.overwrite, output_folder, disk_write_queue: self.disk_write_tx.clone(), + disable_upload: self.disable_upload, }, connector: self.connector.clone(), session: Arc::downgrade(self), diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ec04115..d5a551a 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -173,14 +173,6 @@ impl TorrentStateLocked { } } -#[derive(Default)] -pub struct TorrentStateOptions { - #[allow(dead_code)] - pub peer_connect_timeout: Option, - #[allow(dead_code)] - pub peer_read_write_timeout: Option, -} - const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024; pub struct TorrentStateLive { @@ -929,11 +921,18 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { Ok(()) } - fn get_have_bytes(&self) -> u64 { - self.state.get_approx_have_bytes() + fn should_send_bitfield(&self) -> bool { + if self.state.torrent().options.disable_upload { + return false; + } + + self.state.get_approx_have_bytes() > 0 } fn should_transmit_have(&self, id: ValidPieceIndex) -> bool { + if self.state.torrent.options.disable_upload { + return false; + } let have = self .state .peers @@ -1164,6 +1163,10 @@ impl PeerHandler { } fn on_download_request(&self, request: Request) -> anyhow::Result<()> { + if self.state.torrent().options.disable_upload { + anyhow::bail!("upload disabled, but peer requested a piece") + } + let piece_index = match self.state.lengths.validate_piece_index(request.index) { Some(p) => p, None => { diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 33a137c..d5a08e8 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -103,6 +103,7 @@ pub(crate) struct ManagedTorrentOptions { pub allow_overwrite: bool, pub output_folder: PathBuf, pub disk_write_queue: Option, + pub disable_upload: bool, } /// Common information about torrent shared among all possible states. diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index ea89913..c24754e 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -202,6 +202,9 @@ struct Opts { #[cfg(not(target_os = "windows"))] #[arg(long, env = "RQBIT_UMASK", value_parser=parse_umask)] umask: Option, + + #[arg(long, env = "RQBIT_DISABLE_UPLOAD")] + disable_upload: bool, } #[derive(Parser)] @@ -454,6 +457,7 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> root_span: None, fastresume: false, cancellation_token: Some(cancel.clone()), + disable_upload: opts.disable_upload, }; let stats_printer = |session: Arc| async move {