Create worker module to contain all the feature flag chaos

This commit is contained in:
Héctor Ramón Jiménez 2025-10-26 21:48:10 +01:00
parent b408961d77
commit 22488c537c
No known key found for this signature in database
GPG key ID: 7CC46565708259A7
3 changed files with 220 additions and 201 deletions

View file

@ -2,11 +2,11 @@ use crate::core::{self, Size};
use crate::graphics::Shell;
use crate::image::atlas::{self, Atlas};
#[cfg(feature = "image")]
use std::collections::HashMap;
#[cfg(all(feature = "image", not(target_arch = "wasm32")))]
use worker::Worker;
#[cfg(feature = "image")]
use std::sync::mpsc;
use std::collections::HashMap;
use std::sync::Arc;
@ -16,14 +16,8 @@ pub struct Cache {
raster: Raster,
#[cfg(feature = "svg")]
vector: crate::image::vector::Cache,
#[cfg(feature = "image")]
jobs: mpsc::SyncSender<Job>,
#[cfg(feature = "image")]
quit: mpsc::SyncSender<()>,
#[cfg(feature = "image")]
work: mpsc::Receiver<Work>,
#[cfg(all(feature = "image", not(target_arch = "wasm32")))]
worker_: Option<std::thread::JoinHandle<()>>,
worker: Worker,
}
impl Cache {
@ -35,33 +29,20 @@ impl Cache {
_shell: &Shell,
) -> Self {
#[cfg(all(feature = "image", not(target_arch = "wasm32")))]
let (worker, jobs, quit, work) =
let worker =
Worker::new(device, _queue, backend, layout.clone(), _shell);
#[cfg(all(feature = "image", target_arch = "wasm32"))]
let (jobs, work) = (mpsc::sync_channel(0).0, mpsc::sync_channel(0).1);
#[cfg(all(feature = "image", not(target_arch = "wasm32")))]
let handle = std::thread::spawn(move || worker.run());
Self {
atlas: Atlas::new(device, backend, layout),
#[cfg(feature = "image")]
raster: Raster {
cache: crate::image::raster::Cache::default(),
pending: HashMap::new(),
jobs: jobs.clone(),
},
#[cfg(feature = "svg")]
vector: crate::image::vector::Cache::default(),
#[cfg(feature = "image")]
jobs,
#[cfg(feature = "image")]
quit,
#[cfg(feature = "image")]
work,
#[cfg(all(feature = "image", not(target_arch = "wasm32")))]
worker_: Some(handle),
worker,
}
}
@ -100,7 +81,7 @@ impl Cache {
}
let _ = self.raster.pending.insert(handle.id(), vec![callback]);
let _ = self.raster.jobs.send(Job::Load(handle.clone()));
self.worker.load(handle);
}
#[cfg(feature = "image")]
@ -110,7 +91,7 @@ impl Cache {
if let Some(memory) = load_image(
&mut self.raster.cache,
&mut self.raster.pending,
&mut self.raster.jobs,
&self.worker,
handle,
None,
) {
@ -141,7 +122,7 @@ impl Cache {
let memory = load_image(
&mut self.raster.cache,
&mut self.raster.pending,
&mut self.raster.jobs,
&self.worker,
handle,
None,
)?;
@ -183,14 +164,8 @@ impl Cache {
}
if !self.raster.pending.contains_key(&handle.id()) {
let _ = self.jobs.send(Job::Upload {
handle: handle.clone(),
rgba: image.clone().into_raw(),
width: image.width(),
height: image.height(),
});
let _ = self.raster.pending.insert(handle.id(), Vec::new());
self.worker.upload(handle, image);
}
None
@ -225,7 +200,7 @@ impl Cache {
pub fn trim(&mut self) {
#[cfg(feature = "image")]
self.raster.cache.trim(&mut self.atlas, |bind_group| {
let _ = self.jobs.send(Job::Drop(bind_group));
self.worker.drop(bind_group);
});
#[cfg(feature = "svg")]
@ -236,9 +211,9 @@ impl Cache {
fn receive(&mut self) {
use crate::image::raster::Memory;
while let Ok(work) = self.work.try_recv() {
while let Ok(work) = self.worker.try_recv() {
match work {
Work::Upload {
worker::Work::Upload {
handle,
entry,
bind_group,
@ -270,7 +245,7 @@ impl Cache {
},
);
}
Work::Error { handle, error } => {
worker::Work::Error { handle, error } => {
self.raster.cache.insert(&handle, Memory::error(error));
}
}
@ -281,9 +256,7 @@ impl Cache {
#[cfg(all(feature = "image", not(target_arch = "wasm32")))]
impl Drop for Cache {
fn drop(&mut self) {
let _ = self.quit.try_send(());
let _ = self.jobs.send(Job::Quit);
let _ = self.worker_.take().unwrap().join();
self.worker.quit();
}
}
@ -291,7 +264,6 @@ impl Drop for Cache {
struct Raster {
cache: crate::image::raster::Cache,
pending: HashMap<core::image::Id, Vec<Callback>>,
jobs: mpsc::SyncSender<Job>,
}
#[cfg(feature = "image")]
@ -301,7 +273,7 @@ type Callback = Box<dyn FnOnce(core::image::Allocation) + Send>;
fn load_image<'a>(
cache: &'a mut crate::image::raster::Cache,
pending: &mut HashMap<core::image::Id, Vec<Callback>>,
jobs: &mut mpsc::SyncSender<Job>,
worker: &Worker,
handle: &core::image::Handle,
callback: Option<Callback>,
) -> Option<&'a mut crate::image::raster::Memory> {
@ -315,74 +287,45 @@ fn load_image<'a>(
// Load RGBA handles synchronously, since it's very cheap
cache.insert(handle, Memory::load(handle));
} else if !pending.contains_key(&handle.id()) {
let _ = jobs.send(Job::Load(handle.clone()));
let _ = pending.insert(handle.id(), Vec::from_iter(callback));
worker.load(handle);
}
}
cache.get_mut(handle)
}
#[cfg(feature = "image")]
#[derive(Debug)]
enum Job {
Load(core::image::Handle),
Upload {
handle: core::image::Handle,
rgba: core::image::Bytes,
width: u32,
height: u32,
},
Drop(Arc<wgpu::BindGroup>),
Quit,
}
#[cfg(feature = "image")]
enum Work {
Upload {
handle: core::image::Handle,
entry: atlas::Entry,
bind_group: Arc<wgpu::BindGroup>,
},
Error {
handle: core::image::Handle,
error: crate::graphics::image::image_rs::error::ImageError,
},
}
#[cfg(feature = "image")]
struct Worker {
device: wgpu::Device,
queue: wgpu::Queue,
backend: wgpu::Backend,
texture_layout: wgpu::BindGroupLayout,
shell: Shell,
belt: wgpu::util::StagingBelt,
jobs: mpsc::Receiver<Job>,
output: mpsc::SyncSender<Work>,
quit: mpsc::Receiver<()>,
}
#[cfg(all(feature = "image", not(target_arch = "wasm32")))]
impl Worker {
fn new(
device: &wgpu::Device,
queue: &wgpu::Queue,
backend: wgpu::Backend,
texture_layout: wgpu::BindGroupLayout,
shell: &Shell,
) -> (
Self,
mpsc::SyncSender<Job>,
mpsc::SyncSender<()>,
mpsc::Receiver<Work>,
) {
let (jobs_sender, jobs_receiver) = mpsc::sync_channel(1_000);
let (quit_sender, quit_receiver) = mpsc::sync_channel(1);
let (work_sender, work_receiver) = mpsc::sync_channel(1_000);
mod worker {
use crate::core::image;
use crate::graphics::Shell;
use crate::image::atlas::{self, Atlas};
use crate::image::raster;
(
Self {
use std::sync::Arc;
use std::sync::mpsc;
use std::thread;
pub struct Worker {
jobs: mpsc::SyncSender<Job>,
quit: mpsc::SyncSender<()>,
work: mpsc::Receiver<Work>,
handle: Option<std::thread::JoinHandle<()>>,
}
impl Worker {
pub fn new(
device: &wgpu::Device,
queue: &wgpu::Queue,
backend: wgpu::Backend,
texture_layout: wgpu::BindGroupLayout,
shell: &Shell,
) -> Self {
let (jobs_sender, jobs_receiver) = mpsc::sync_channel(1_000);
let (quit_sender, quit_receiver) = mpsc::sync_channel(1);
let (work_sender, work_receiver) = mpsc::sync_channel(1_000);
let instance = Instance {
device: device.clone(),
queue: queue.clone(),
backend,
@ -392,114 +335,190 @@ impl Worker {
jobs: jobs_receiver,
output: work_sender,
quit: quit_receiver,
},
jobs_sender,
quit_sender,
work_receiver,
)
}
fn run(mut self) {
loop {
if self.quit.try_recv().is_ok() {
return;
}
let Ok(job) = self.jobs.recv() else {
return;
};
match job {
Job::Load(handle) => {
match crate::graphics::image::load(&handle) {
Ok(image) => self.upload(
handle,
image.width(),
image.height(),
image.into_raw(),
Shell::invalidate_layout,
),
Err(error) => {
let _ =
self.output.send(Work::Error { handle, error });
}
}
}
Job::Upload {
handle,
rgba,
width,
height,
} => {
self.upload(
handle,
width,
height,
rgba,
Shell::request_redraw,
);
}
Job::Drop(bind_group) => {
drop(bind_group);
}
Job::Quit => return,
let handle = thread::spawn(move || instance.run());
Self {
jobs: jobs_sender,
quit: quit_sender,
work: work_receiver,
handle: Some(handle),
}
}
pub fn load(&self, handle: &image::Handle) {
let _ = self.jobs.send(Job::Load(handle.clone()));
}
pub fn upload(&self, handle: &image::Handle, image: raster::Image) {
let _ = self.jobs.send(Job::Upload {
handle: handle.clone(),
width: image.width(),
height: image.height(),
rgba: image.into_raw(),
});
}
pub fn drop(&self, bind_group: Arc<wgpu::BindGroup>) {
let _ = self.jobs.send(Job::Drop(bind_group));
}
pub fn try_recv(&self) -> Result<Work, mpsc::TryRecvError> {
self.work.try_recv()
}
pub fn quit(&mut self) {
let _ = self.quit.try_send(());
let _ = self.jobs.send(Job::Quit);
let _ = self.handle.take().map(thread::JoinHandle::join);
}
}
fn upload(
&mut self,
handle: core::image::Handle,
width: u32,
height: u32,
rgba: core::image::Bytes,
callback: fn(&Shell),
) {
let mut encoder = self.device.create_command_encoder(
&wgpu::CommandEncoderDescriptor {
label: Some("raster image upload"),
},
);
pub struct Instance {
device: wgpu::Device,
queue: wgpu::Queue,
backend: wgpu::Backend,
texture_layout: wgpu::BindGroupLayout,
shell: Shell,
belt: wgpu::util::StagingBelt,
jobs: mpsc::Receiver<Job>,
output: mpsc::SyncSender<Work>,
quit: mpsc::Receiver<()>,
}
let mut atlas = Atlas::with_size(
&self.device,
self.backend,
self.texture_layout.clone(),
width.max(height),
);
#[cfg(feature = "image")]
#[derive(Debug)]
enum Job {
Load(image::Handle),
Upload {
handle: image::Handle,
rgba: image::Bytes,
width: u32,
height: u32,
},
Drop(Arc<wgpu::BindGroup>),
Quit,
}
let Some(entry) = atlas.upload(
&self.device,
&mut encoder,
&mut self.belt,
width,
height,
&rgba,
) else {
return;
};
#[cfg(feature = "image")]
pub enum Work {
Upload {
handle: image::Handle,
entry: atlas::Entry,
bind_group: Arc<wgpu::BindGroup>,
},
Error {
handle: image::Handle,
error: crate::graphics::image::image_rs::error::ImageError,
},
}
let output = self.output.clone();
let shell = self.shell.clone();
#[cfg(all(feature = "image", not(target_arch = "wasm32")))]
impl Instance {
fn run(mut self) {
loop {
if self.quit.try_recv().is_ok() {
return;
}
self.belt.finish();
let submission = self.queue.submit([encoder.finish()]);
self.belt.recall();
let Ok(job) = self.jobs.recv() else {
return;
};
let bind_group = atlas.bind_group().clone();
match job {
Job::Load(handle) => {
match crate::graphics::image::load(&handle) {
Ok(image) => self.upload(
handle,
image.width(),
image.height(),
image.into_raw(),
Shell::invalidate_layout,
),
Err(error) => {
let _ = self
.output
.send(Work::Error { handle, error });
}
}
}
Job::Upload {
handle,
rgba,
width,
height,
} => {
self.upload(
handle,
width,
height,
rgba,
Shell::request_redraw,
);
}
Job::Drop(bind_group) => {
drop(bind_group);
}
Job::Quit => return,
}
}
}
self.queue.on_submitted_work_done(move || {
let _ = output.send(Work::Upload {
handle,
entry,
bind_group,
fn upload(
&mut self,
handle: image::Handle,
width: u32,
height: u32,
rgba: image::Bytes,
callback: fn(&Shell),
) {
let mut encoder = self.device.create_command_encoder(
&wgpu::CommandEncoderDescriptor {
label: Some("raster image upload"),
},
);
let mut atlas = Atlas::with_size(
&self.device,
self.backend,
self.texture_layout.clone(),
width.max(height),
);
let Some(entry) = atlas.upload(
&self.device,
&mut encoder,
&mut self.belt,
width,
height,
&rgba,
) else {
return;
};
let output = self.output.clone();
let shell = self.shell.clone();
self.belt.finish();
let submission = self.queue.submit([encoder.finish()]);
self.belt.recall();
let bind_group = atlas.bind_group().clone();
self.queue.on_submitted_work_done(move || {
let _ = output.send(Work::Upload {
handle,
entry,
bind_group,
});
callback(&shell);
});
callback(&shell);
});
let _ = self
.device
.poll(wgpu::PollType::WaitForSubmissionIndex(submission));
let _ = self
.device
.poll(wgpu::PollType::WaitForSubmissionIndex(submission));
}
}
}

View file

@ -7,7 +7,7 @@ use crate::image::atlas::{self, Atlas};
use rustc_hash::{FxHashMap, FxHashSet};
use std::sync::{Arc, Weak};
type Image = image_rs::ImageBuffer<image_rs::Rgba<u8>, image::Bytes>;
pub type Image = image_rs::ImageBuffer<image_rs::Rgba<u8>, image::Bytes>;
/// Entry in cache corresponding to an image handle
#[derive(Debug)]

View file

@ -82,9 +82,9 @@ fn fs_main(input: VertexOutput) -> @location(0) vec4<f32> {
2.0 * (fragment - position - scale / 2.0),
scale,
input.border_radius * 2.0,
);
) / 2.0;
let antialias: f32 = clamp(0.5 - d, 0.0, 1.0);
let antialias: f32 = clamp(1.0 - d, 0.0, 1.0);
return textureSample(u_texture, u_sampler, input.uv, i32(input.layer)) * vec4<f32>(1.0, 1.0, 1.0, antialias * input.opacity);
}