Do not use extra audio thread, do not include blocking recv in timers

This commit is contained in:
Jeremy Soller 2024-01-25 09:42:02 -07:00
parent 4650e1434d
commit 4e0969546a
No known key found for this signature in database
GPG key ID: DCFCA852D3906975

View file

@ -6,6 +6,7 @@ use cpal::{
FromSample, SizedSample, FromSample, SizedSample,
}; };
use ffmpeg::{ use ffmpeg::{
codec::discard,
format::{input, Pixel}, format::{input, Pixel},
media::Type, media::Type,
software::{resampling, scaling}, software::{resampling, scaling},
@ -47,7 +48,9 @@ impl AsRef<[u8]> for VideoFrame {
} }
} }
fn cpal(audio_queue_lock: Arc<Mutex<VecDeque<f32>>>) -> cpal::SupportedStreamConfig { fn cpal(
audio_queue_lock: Arc<Mutex<VecDeque<f32>>>,
) -> (cpal::SupportedStreamConfig, Box<dyn StreamTrait>) {
let host = cpal::default_host(); let host = cpal::default_host();
let device = host let device = host
.default_output_device() .default_output_device()
@ -57,58 +60,36 @@ fn cpal(audio_queue_lock: Arc<Mutex<VecDeque<f32>>>) -> cpal::SupportedStreamCon
.expect("failed to get default audio output config"); .expect("failed to get default audio output config");
println!("{:?}: {:?}", device.name(), config); println!("{:?}: {:?}", device.name(), config);
{ let stream = {
let config = config.clone(); let config = config.clone();
thread::spawn(move || { match config.sample_format() {
match config.sample_format() { cpal::SampleFormat::I8 => cpal_stream::<i8>(device, config.into(), audio_queue_lock),
cpal::SampleFormat::I8 => { cpal::SampleFormat::I16 => cpal_stream::<i16>(device, config.into(), audio_queue_lock),
cpal_thread::<i8>(device, config.into(), audio_queue_lock) // cpal::SampleFormat::I24 => cpal_stream::<I24>(device, config.into(), audio_queue_lock),
} cpal::SampleFormat::I32 => cpal_stream::<i32>(device, config.into(), audio_queue_lock),
cpal::SampleFormat::I16 => { // cpal::SampleFormat::I48 => cpal_stream::<I48>(device, config.into(), audio_queue_lock),
cpal_thread::<i16>(device, config.into(), audio_queue_lock) cpal::SampleFormat::I64 => cpal_stream::<i64>(device, config.into(), audio_queue_lock),
} cpal::SampleFormat::U8 => cpal_stream::<u8>(device, config.into(), audio_queue_lock),
// cpal::SampleFormat::I24 => cpal_thread::<I24>(device, config.into(), audio_queue_lock), cpal::SampleFormat::U16 => cpal_stream::<u16>(device, config.into(), audio_queue_lock),
cpal::SampleFormat::I32 => { // cpal::SampleFormat::U24 => cpal_stream::<U24>(device, config.into(), audio_queue_lock),
cpal_thread::<i32>(device, config.into(), audio_queue_lock) cpal::SampleFormat::U32 => cpal_stream::<u32>(device, config.into(), audio_queue_lock),
} // cpal::SampleFormat::U48 => cpal_stream::<U48>(device, config.into(), audio_queue_lock),
// cpal::SampleFormat::I48 => cpal_thread::<I48>(device, config.into(), audio_queue_lock), cpal::SampleFormat::U64 => cpal_stream::<u64>(device, config.into(), audio_queue_lock),
cpal::SampleFormat::I64 => { cpal::SampleFormat::F32 => cpal_stream::<f32>(device, config.into(), audio_queue_lock),
cpal_thread::<i64>(device, config.into(), audio_queue_lock) cpal::SampleFormat::F64 => cpal_stream::<f64>(device, config.into(), audio_queue_lock),
} sample_format => panic!("unsupported sample format '{sample_format}'"),
cpal::SampleFormat::U8 => { }
cpal_thread::<u8>(device, config.into(), audio_queue_lock) .unwrap()
} };
cpal::SampleFormat::U16 => {
cpal_thread::<u16>(device, config.into(), audio_queue_lock)
}
// cpal::SampleFormat::U24 => cpal_thread::<U24>(device, config.into(), audio_queue_lock),
cpal::SampleFormat::U32 => {
cpal_thread::<u32>(device, config.into(), audio_queue_lock)
}
// cpal::SampleFormat::U48 => cpal_thread::<U48>(device, config.into(), audio_queue_lock),
cpal::SampleFormat::U64 => {
cpal_thread::<u64>(device, config.into(), audio_queue_lock)
}
cpal::SampleFormat::F32 => {
cpal_thread::<f32>(device, config.into(), audio_queue_lock)
}
cpal::SampleFormat::F64 => {
cpal_thread::<f64>(device, config.into(), audio_queue_lock)
}
sample_format => panic!("unsupported sample format '{sample_format}'"),
}
.unwrap();
});
}
config (config, stream)
} }
fn cpal_thread<T>( fn cpal_stream<T>(
device: cpal::Device, device: cpal::Device,
config: cpal::StreamConfig, config: cpal::StreamConfig,
audio_queue_lock: Arc<Mutex<VecDeque<f32>>>, audio_queue_lock: Arc<Mutex<VecDeque<f32>>>,
) -> Result<(), Box<dyn Error>> ) -> Result<Box<dyn StreamTrait>, Box<dyn Error>>
where where
T: SizedSample + FromSample<f32>, T: SizedSample + FromSample<f32>,
{ {
@ -135,25 +116,19 @@ where
} }
} }
}; };
let err_fn = |err| eprintln!("an error occurred on stream: {}", err); let err_fn = |err| eprintln!("an error occurred on stream: {}", err);
let stream = device.build_output_stream(&config, data_fn, err_fn, None)?; let stream = device.build_output_stream(&config, data_fn, err_fn, None)?;
stream.play()?; Ok(Box::new(stream))
loop {
//TODO: move this code to ffmpeg_thread so we don't have to sleep here?
thread::sleep(Duration::from_millis(1000));
}
} }
fn ffmpeg_thread<P: AsRef<Path>>( fn ffmpeg_thread<P: AsRef<Path>>(
path: P, path: P,
player_rx: mpsc::Receiver<PlayerMessage>, player_rx: mpsc::Receiver<PlayerMessage>,
video_frame_lock: Arc<Mutex<Option<VideoFrame>>>, video_frame_lock: Arc<Mutex<Option<VideoFrame>>>,
audio_config: cpal::SupportedStreamConfig, ) -> Result<(), Box<dyn Error>> {
audio_queue_lock: Arc<Mutex<VecDeque<f32>>>, let audio_queue_lock = Arc::new(Mutex::new(VecDeque::new()));
) -> Result<(), ffmpeg::Error> { let (audio_config, cpal_stream) = cpal(audio_queue_lock.clone());
let mut ictx = input(&path)?; let mut ictx = input(&path)?;
let video_stream = ictx let video_stream = ictx
@ -169,90 +144,107 @@ fn ffmpeg_thread<P: AsRef<Path>>(
let video_format = video_decoder.format(); let video_format = video_decoder.format();
let video_width = video_decoder.width(); let video_width = video_decoder.width();
let video_height = video_decoder.height(); let video_height = video_decoder.height();
let (raw_frame_tx, raw_frame_rx) = mpsc::channel(); let (raw_frame_tx, raw_frame_rx) = mpsc::channel::<Video>();
thread::spawn(move || -> Result<(), ffmpeg::Error> { thread::Builder::new()
let mut video_scaler = scaling::context::Context::get( .name("video_scale".to_string())
video_format, .spawn(move || -> Result<(), ffmpeg::Error> {
video_width, let mut video_scaler = scaling::context::Context::get(
video_height, video_format,
Pixel::RGBA, video_width,
video_width, video_height,
video_height, Pixel::RGBA,
scaling::Flags::FAST_BILINEAR, video_width,
)?; video_height,
scaling::Flags::FAST_BILINEAR,
)?;
loop {
let start = Instant::now();
let mut raw_frame: Video = raw_frame_rx.recv().unwrap();
while let Ok(extra_frame) = raw_frame_rx.try_recv() {
log::warn!("missed raw video frame at {:?}", raw_frame.pts());
raw_frame = extra_frame;
}
let pts = raw_frame.pts();
let mut scaled_frame = Video::empty();
video_scaler.run(&raw_frame, &mut scaled_frame)?;
scaled_frame.set_pts(pts);
let missed_pts_opt = {
let mut video_frame_opt = video_frame_lock.lock().unwrap();
let missed_pts_opt = match &*video_frame_opt {
Some(old_frame) => Some(old_frame.0.pts()),
None => None,
};
*video_frame_opt = Some(VideoFrame(scaled_frame));
missed_pts_opt
};
if let Some(missed_pts) = missed_pts_opt {
log::warn!("missed scaled video frame at {:?}", missed_pts);
}
let duration = start.elapsed();
log::debug!("scaled video frame at {:?} in {:?}", pts, duration);
}
});
let (video_packet_tx, video_packet_rx) = mpsc::channel();
thread::spawn(move || -> Result<(), ffmpeg::Error> {
let mut eof = false;
while !eof {
let start = Instant::now();
match video_packet_rx.recv() {
Ok(packet) => {
video_decoder.send_packet(&packet)?;
}
Err(_err) => {
video_decoder.send_eof()?;
eof = true;
}
}
let mut pts = None;
let mut video_frames = 0;
loop { loop {
let mut raw_frame = Video::empty(); let mut raw_frame: Video = raw_frame_rx.recv().unwrap();
if video_decoder.receive_frame(&mut raw_frame).is_ok() { while let Ok(extra_frame) = raw_frame_rx.try_recv() {
pts = raw_frame.pts(); log::warn!("missed raw video frame at {:?}", raw_frame.pts());
raw_frame_tx.send(raw_frame).unwrap(); raw_frame = extra_frame;
video_frames += 1; }
} else { let pts = raw_frame.pts();
break;
// Start count after blocking recv
let start = Instant::now();
let mut scaled_frame = Video::empty();
video_scaler.run(&raw_frame, &mut scaled_frame)?;
scaled_frame.set_pts(pts);
let missed_pts_opt = {
let mut video_frame_opt = video_frame_lock.lock().unwrap();
let missed_pts_opt = match &*video_frame_opt {
Some(old_frame) => Some(old_frame.0.pts()),
None => None,
};
*video_frame_opt = Some(VideoFrame(scaled_frame));
missed_pts_opt
};
if let Some(missed_pts) = missed_pts_opt {
log::warn!("missed scaled video frame at {:?}", missed_pts);
}
let duration = start.elapsed();
log::debug!("scaled video frame at {:?} in {:?}", pts, duration);
}
});
let (video_packet_tx, video_packet_rx) = mpsc::channel::<Packet>();
thread::Builder::new()
.name("video_decode".to_string())
.spawn(move || -> Result<(), ffmpeg::Error> {
let mut eof = false;
while !eof {
{
let packet_res = video_packet_rx.recv();
// Start timer after blocking recv
let start = Instant::now();
let mut packet_pts = None;
match packet_res {
Ok(packet) => {
packet_pts = packet.pts();
video_decoder.send_packet(&packet)?;
}
Err(_err) => {
video_decoder.send_eof()?;
eof = true;
}
}
let duration = start.elapsed();
log::debug!("sent packet at {:?} in {:?}", packet_pts, duration);
}
let start = Instant::now();
let mut pts = None;
let mut video_frames = 0;
loop {
let mut raw_frame = Video::empty();
if video_decoder.receive_frame(&mut raw_frame).is_ok() {
pts = raw_frame.pts();
raw_frame_tx.send(raw_frame).unwrap();
video_frames += 1;
} else {
break;
}
}
if video_frames > 0 {
let duration = start.elapsed();
log::debug!(
"received {} video frames at {:?} in {:?}",
video_frames,
pts,
duration
);
} }
} }
Ok(())
if video_frames > 0 { });
let duration = start.elapsed();
log::debug!(
"received {} video frames at {:?} in {:?}",
video_frames,
pts,
duration
);
}
}
Ok(())
});
let audio_stream = ictx let audio_stream = ictx
.streams() .streams()
@ -282,8 +274,6 @@ fn ffmpeg_thread<P: AsRef<Path>>(
audio_config.sample_rate().0, audio_config.sample_rate().0,
)?; )?;
let mut sync_sample = 0;
let mut current_sample = 0;
let min_sleep = Duration::from_millis(1); let min_sleep = Duration::from_millis(1);
let min_skip = Duration::from_millis(1); let min_skip = Duration::from_millis(1);
let mut receive_and_process_decoded_audio_frames = |decoder: &mut ffmpeg::decoder::Audio, let mut receive_and_process_decoded_audio_frames = |decoder: &mut ffmpeg::decoder::Audio,
@ -291,7 +281,10 @@ fn ffmpeg_thread<P: AsRef<Path>>(
-> Result<(), ffmpeg::Error> { -> Result<(), ffmpeg::Error> {
let mut decoded = Audio::empty(); let mut decoded = Audio::empty();
let mut resampled = Audio::empty(); let mut resampled = Audio::empty();
let mut pts_opt = None;
while decoder.receive_frame(&mut decoded).is_ok() { while decoder.receive_frame(&mut decoded).is_ok() {
pts_opt = decoded.pts();
audio_resampler.run(&decoded, &mut resampled)?; audio_resampler.run(&decoded, &mut resampled)?;
{ {
// plane method doesn't work with packed samples, so do it manually // plane method doesn't work with packed samples, so do it manually
@ -306,36 +299,39 @@ fn ffmpeg_thread<P: AsRef<Path>>(
audio_queue.extend(plane); audio_queue.extend(plane);
} }
} }
current_sample += decoded.samples();
if sync_time_opt.is_none() {
*sync_time_opt = Some(Instant::now());
sync_sample = current_sample;
}
} }
// Sync with audio if let Some(pts) = pts_opt {
if let Some(sync_time) = &sync_time_opt { let expected_float = pts as f64 * audio_time_base;
let samples = current_sample - sync_sample;
let expected_float = (samples as f64) / f64::from(decoder.rate());
let expected = Duration::from_secs_f64(expected_float); let expected = Duration::from_secs_f64(expected_float);
let actual = sync_time.elapsed(); if let Some(sync_time) = &sync_time_opt {
if expected > actual { // Sync with audio
let sleep = expected - actual; let actual = sync_time.elapsed();
if sleep > min_sleep { if expected > actual {
// We leave min_sleep of buffer room let sleep = expected - actual;
thread::sleep(sleep - min_sleep); if sleep > min_sleep {
// We leave min_sleep of buffer room
log::debug!("ahead {:?}", sleep);
thread::sleep(sleep - min_sleep);
}
} else {
let skip = actual - expected;
if skip > min_skip {
//TODO: handle frame skipping
log::debug!("behind {:?}", skip);
}
} }
} else { } else {
let skip = actual - expected; // Set up sync
if skip > min_skip { *sync_time_opt = Some(Instant::now() - expected);
//TODO: handle frame skipping
}
} }
} }
Ok(()) Ok(())
}; };
// Start CPAL stream
cpal_stream.play()?;
let mut sync_time_opt = None; let mut sync_time_opt = None;
let mut seconds_opt = None; let mut seconds_opt = None;
loop { loop {
@ -394,23 +390,16 @@ fn ffmpeg_thread<P: AsRef<Path>>(
pub fn run(path: PathBuf) -> (mpsc::Sender<PlayerMessage>, Arc<Mutex<Option<VideoFrame>>>) { pub fn run(path: PathBuf) -> (mpsc::Sender<PlayerMessage>, Arc<Mutex<Option<VideoFrame>>>) {
ffmpeg::init().unwrap(); ffmpeg::init().unwrap();
let audio_queue_lock = Arc::new(Mutex::new(VecDeque::new()));
let audio_config = cpal(audio_queue_lock.clone());
let (player_tx, player_rx) = mpsc::channel(); let (player_tx, player_rx) = mpsc::channel();
let video_frame_lock = Arc::new(Mutex::new(None)); let video_frame_lock = Arc::new(Mutex::new(None));
{ {
let video_frame_lock = video_frame_lock.clone(); let video_frame_lock = video_frame_lock.clone();
thread::spawn(move || { thread::Builder::new()
ffmpeg_thread( .name("ffmpeg".to_string())
path, .spawn(move || {
player_rx, ffmpeg_thread(path, player_rx, video_frame_lock).unwrap();
video_frame_lock, });
audio_config,
audio_queue_lock,
)
.unwrap();
});
} }
(player_tx, video_frame_lock) (player_tx, video_frame_lock)
} }