Fix video/audio sync

This commit is contained in:
Jeremy Soller 2024-01-26 14:43:16 -07:00
parent 04e1c60189
commit 2fa67bd58d
No known key found for this signature in database
GPG key ID: DCFCA852D3906975
2 changed files with 218 additions and 104 deletions

View file

@ -15,7 +15,7 @@ use cosmic::{
}; };
use std::{ use std::{
any::TypeId, any::TypeId,
collections::{HashMap, VecDeque}, collections::HashMap,
env, env,
path::PathBuf, path::PathBuf,
process, process,
@ -31,7 +31,7 @@ mod key_bind;
mod localize; mod localize;
use player::{PlayerMessage, VideoFrame}; use player::{PlayerMessage, VideoFrame, VideoQueue};
mod player; mod player;
/// Runs application with these settings /// Runs application with these settings
@ -67,7 +67,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
}; };
let (player_tx, video_frames_lock) = player::run(path); let (player_tx, video_queue_lock) = player::run(path);
let mut settings = Settings::default(); let mut settings = Settings::default();
settings = settings.theme(config.app_theme.theme()); settings = settings.theme(config.app_theme.theme());
@ -85,7 +85,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
config_handler, config_handler,
config, config,
player_tx, player_tx,
video_frames_lock, video_queue_lock,
}; };
cosmic::app::run::<App>(settings, flags)?; cosmic::app::run::<App>(settings, flags)?;
@ -114,7 +114,7 @@ pub struct Flags {
config_handler: Option<cosmic_config::Config>, config_handler: Option<cosmic_config::Config>,
config: Config, config: Config,
player_tx: mpsc::Sender<PlayerMessage>, player_tx: mpsc::Sender<PlayerMessage>,
video_frames_lock: Arc<Mutex<VecDeque<VideoFrame>>>, video_queue_lock: Arc<Mutex<VideoQueue>>,
} }
/// Messages that are used specifically by our [`App`]. /// Messages that are used specifically by our [`App`].
@ -299,23 +299,25 @@ impl Application for App {
let start = Instant::now(); let start = Instant::now();
let mut video_frame_opt: Option<VideoFrame> = None; let mut video_frame_opt: Option<VideoFrame> = None;
{ let delayed_time = {
let mut video_frames = self.flags.video_frames_lock.lock().unwrap(); let mut video_queue = self.flags.video_queue_lock.lock().unwrap();
while let Some(video_frame) = video_frames.pop_front() { let delayed_time = frame_time - video_queue.delay;
if video_frame.1.unwrap_or(frame_time) <= frame_time { while let Some(video_frame) = video_queue.data.pop_front() {
if video_frame.1.unwrap_or(delayed_time) <= delayed_time {
if let Some(old_frame) = video_frame_opt { if let Some(old_frame) = video_frame_opt {
//TODO: log this outside of locking video_frames_lock? //TODO: log this outside of locking video_queue_lock?
log::warn!("skipping video frame {:?}", old_frame.0.pts()); log::warn!("skipping video frame {:?}", old_frame.0.pts());
} }
// Frame is ready to be shown // Frame is ready to be shown
video_frame_opt = Some(video_frame); video_frame_opt = Some(video_frame);
} else { } else {
// Put frame back and exit loop // Put frame back and exit loop
video_frames.push_front(video_frame); video_queue.data.push_front(video_frame);
break; break;
} }
} }
} delayed_time
};
match video_frame_opt { match video_frame_opt {
Some(video_frame) => { Some(video_frame) => {
@ -331,13 +333,13 @@ impl Application for App {
); );
if let Some(present_time) = present_time_opt { if let Some(present_time) = present_time_opt {
if present_time > frame_time { if present_time > delayed_time {
let ahead = present_time - frame_time; let ahead = present_time - delayed_time;
if ahead > Duration::from_millis(1) { if ahead > Duration::from_millis(1) {
log::debug!("video ahead {:?}", ahead); log::debug!("video ahead {:?}", ahead);
} }
} else { } else {
let behind = frame_time - present_time; let behind = delayed_time - present_time;
if behind > Duration::from_millis(1) { if behind > Duration::from_millis(1) {
log::debug!("video behind {:?}", behind); log::debug!("video behind {:?}", behind);
} }

View file

@ -18,6 +18,7 @@ use ffmpeg::{
Packet, Packet,
}; };
use std::{ use std::{
cmp,
collections::VecDeque, collections::VecDeque,
error::Error, error::Error,
path::{Path, PathBuf}, path::{Path, PathBuf},
@ -27,6 +28,36 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
//TODO: calculate presentation time of end of queue
pub struct AudioQueue {
pub channels: usize,
pub rate: f64,
pub data: VecDeque<f32>,
// Delay for data to hit speakers, used to sync with video
pub delay: Duration,
}
impl AudioQueue {
pub fn new(channels: cpal::ChannelCount, rate: cpal::SampleRate) -> Self {
Self {
channels: channels as usize,
rate: rate.0 as f64,
data: VecDeque::new(),
delay: Duration::default(),
}
}
pub fn duration(&self) -> Duration {
self.duration_for_samples(self.data.len())
}
pub fn duration_for_samples(&self, samples: usize) -> Duration {
let frames = samples / self.channels;
let seconds = (frames as f64) / self.rate;
Duration::from_secs_f64(seconds)
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum PlayerMessage { pub enum PlayerMessage {
SeekRelative(f64), SeekRelative(f64),
@ -48,9 +79,44 @@ impl AsRef<[u8]> for VideoFrame {
} }
} }
fn cpal( pub struct VideoQueue {
audio_queue_lock: Arc<Mutex<VecDeque<f32>>>, pub data: VecDeque<VideoFrame>,
) -> (cpal::SupportedStreamConfig, Box<dyn StreamTrait>) { // Delay to add to each frame to sync with audio
pub delay: Duration,
}
impl VideoQueue {
pub fn new() -> Self {
Self {
data: VecDeque::new(),
delay: Duration::default(),
}
}
pub fn duration(&self) -> Duration {
//TODO: can accurate duration actually be calculated since one frame would count as zero?
let mut start_end_opt = None;
for frame in self.data.iter() {
if let Some(frame_time) = frame.1 {
start_end_opt = Some(match start_end_opt {
Some((start, end)) => (cmp::min(start, frame_time), cmp::max(end, frame_time)),
None => (frame_time, frame_time),
});
}
}
if let Some((start, end)) = start_end_opt {
end.duration_since(start)
} else {
Duration::default()
}
}
}
fn cpal() -> (
cpal::SupportedStreamConfig,
Box<dyn StreamTrait>,
Arc<Mutex<AudioQueue>>,
) {
let host = cpal::default_host(); let host = cpal::default_host();
let device = host let device = host
.default_output_device() .default_output_device()
@ -60,8 +126,13 @@ fn cpal(
.expect("failed to get default audio output config"); .expect("failed to get default audio output config");
println!("{:?}: {:?}", device.name(), config); println!("{:?}: {:?}", device.name(), config);
let audio_queue_lock = Arc::new(Mutex::new(AudioQueue::new(
config.channels(),
config.sample_rate(),
)));
let stream = { let stream = {
let config = config.clone(); let config = config.clone();
let audio_queue_lock = audio_queue_lock.clone();
match config.sample_format() { match config.sample_format() {
cpal::SampleFormat::I8 => cpal_stream::<i8>(device, config.into(), audio_queue_lock), cpal::SampleFormat::I8 => cpal_stream::<i8>(device, config.into(), audio_queue_lock),
cpal::SampleFormat::I16 => cpal_stream::<i16>(device, config.into(), audio_queue_lock), cpal::SampleFormat::I16 => cpal_stream::<i16>(device, config.into(), audio_queue_lock),
@ -82,26 +153,30 @@ fn cpal(
.unwrap() .unwrap()
}; };
(config, stream) (config, stream, audio_queue_lock)
} }
fn cpal_stream<T>( fn cpal_stream<T>(
device: cpal::Device, device: cpal::Device,
config: cpal::StreamConfig, config: cpal::StreamConfig,
audio_queue_lock: Arc<Mutex<VecDeque<f32>>>, audio_queue_lock: Arc<Mutex<AudioQueue>>,
) -> Result<Box<dyn StreamTrait>, Box<dyn Error>> ) -> Result<Box<dyn StreamTrait>, Box<dyn Error>>
where where
T: SizedSample + FromSample<f32>, T: SizedSample + FromSample<f32>,
{ {
let data_fn = { let data_fn = {
let audio_queue_lock = audio_queue_lock.clone(); move |samples: &mut [T], info: &cpal::OutputCallbackInfo| {
move |data: &mut [T], _: &cpal::OutputCallbackInfo| { let timestamp = info.timestamp();
let delay = timestamp.playback.duration_since(&timestamp.callback);
let mut underrun = 0; let mut underrun = 0;
{ {
//TODO: buffer audio //TODO: buffer audio
let mut audio_queue = audio_queue_lock.lock().unwrap(); let mut audio_queue = audio_queue_lock.lock().unwrap();
for sample in data { //TODO: also add samples time?
let float = match audio_queue.pop_front() { audio_queue.delay = delay.unwrap_or_default();
for sample in samples {
let float = match audio_queue.data.pop_front() {
Some(some) => some, Some(some) => some,
None => { None => {
underrun += 1; underrun += 1;
@ -110,7 +185,7 @@ where
}; };
*sample = T::from_sample(float); *sample = T::from_sample(float);
} }
} };
if underrun > 0 { if underrun > 0 {
log::error!("audio underrun {}", underrun); log::error!("audio underrun {}", underrun);
} }
@ -124,10 +199,9 @@ where
fn ffmpeg_thread<P: AsRef<Path>>( fn ffmpeg_thread<P: AsRef<Path>>(
path: P, path: P,
player_rx: mpsc::Receiver<PlayerMessage>, player_rx: mpsc::Receiver<PlayerMessage>,
video_frames_lock: Arc<Mutex<VecDeque<VideoFrame>>>, video_queue_lock: Arc<Mutex<VideoQueue>>,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
let audio_queue_lock = Arc::new(Mutex::new(VecDeque::new())); let (audio_config, cpal_stream, audio_queue_lock) = cpal();
let (audio_config, cpal_stream) = cpal(audio_queue_lock.clone());
let mut ictx = input(&path)?; let mut ictx = input(&path)?;
@ -166,85 +240,92 @@ fn ffmpeg_thread<P: AsRef<Path>>(
video_decoder_context.decoder().video()? video_decoder_context.decoder().video()?
}; };
let video_format = video_decoder.format();
let video_width = video_decoder.width();
let video_height = video_decoder.height();
let (cpu_frame_tx, cpu_frame_rx) = mpsc::channel::<(Video, Option<Instant>)>(); let (cpu_frame_tx, cpu_frame_rx) = mpsc::channel::<(Video, Option<Instant>)>();
thread::Builder::new() {
.name("video_scale".to_string()) let video_format = video_decoder.format();
.spawn(move || { let video_width = video_decoder.width();
let mut video_scaler = scaling::context::Context::get( let video_height = video_decoder.height();
video_format, let video_queue_lock = video_queue_lock.clone();
video_width, thread::Builder::new()
video_height, .name("video_scale".to_string())
Pixel::RGBA, .spawn(move || {
video_width, let mut video_scaler = scaling::context::Context::get(
video_height, video_format,
scaling::Flags::FAST_BILINEAR, video_width,
) video_height,
.unwrap();
loop {
let mut recv_opt: Option<(Video, Option<Instant>)> = None;
while let Ok(recv) = cpu_frame_rx.try_recv() {
if let Some((old_frame, _)) = recv_opt {
//TODO: only skip if behind (frames come in weird timing from codecs)
log::warn!("skipping cpu video frame at {:?}", old_frame.pts());
}
recv_opt = Some(recv);
}
let (cpu_frame, sync_time_opt) = match recv_opt {
Some(some) => some,
None => cpu_frame_rx.recv().unwrap(),
};
let pts_opt = cpu_frame.pts();
// Start count after blocking recv
let start = Instant::now();
video_scaler.cached(
cpu_frame.format(),
cpu_frame.width(),
cpu_frame.height(),
Pixel::RGBA, Pixel::RGBA,
cpu_frame.width(), video_width,
cpu_frame.height(), video_height,
scaling::Flags::FAST_BILINEAR, scaling::Flags::FAST_BILINEAR,
); )
.unwrap();
let mut scaled_frame = Video::empty(); loop {
video_scaler.run(&cpu_frame, &mut scaled_frame).unwrap(); let mut recv_opt: Option<(Video, Option<Instant>)> = None;
scaled_frame.set_pts(pts_opt); /*TODO: SKIP
while let Ok(recv) = cpu_frame_rx.try_recv() {
if let Some((old_frame, _)) = recv_opt {
//TODO: only skip if behind (frames come in weird timing from codecs)
log::warn!("skipping cpu video frame at {:?}", old_frame.pts());
}
recv_opt = Some(recv);
}
*/
let (cpu_frame, sync_time_opt) = match recv_opt {
Some(some) => some,
None => cpu_frame_rx.recv().unwrap(),
};
let pts_opt = cpu_frame.pts();
let present_time_opt = if let Some(pts) = pts_opt { // Start count after blocking recv
let expected_float = pts as f64 * video_time_base; let start = Instant::now();
let expected = Duration::from_secs_f64(expected_float);
if let Some(sync_time) = sync_time_opt { video_scaler.cached(
Some(sync_time + expected) cpu_frame.format(),
cpu_frame.width(),
cpu_frame.height(),
Pixel::RGBA,
cpu_frame.width(),
cpu_frame.height(),
scaling::Flags::FAST_BILINEAR,
);
let mut scaled_frame = Video::empty();
video_scaler.run(&cpu_frame, &mut scaled_frame).unwrap();
scaled_frame.set_pts(pts_opt);
let present_time_opt = if let Some(pts) = pts_opt {
let expected_float = pts as f64 * video_time_base;
let expected = Duration::from_secs_f64(expected_float);
if let Some(sync_time) = sync_time_opt {
Some(sync_time + expected)
} else {
None
}
} else { } else {
None None
};
let video_frame = VideoFrame(scaled_frame, present_time_opt);
{
let mut video_queue = video_queue_lock.lock().unwrap();
video_queue.data.push_back(video_frame);
} }
} else {
None
};
let video_frame = VideoFrame(scaled_frame, present_time_opt); let duration = start.elapsed();
{ log::debug!("scaled video frame at {:?} in {:?}", pts_opt, duration,);
let mut video_frames = video_frames_lock.lock().unwrap();
video_frames.push_back(video_frame);
} }
})?
};
let duration = start.elapsed(); // Sync channel to prevent allocation issues and falling behind
log::debug!("scaled video frame at {:?} in {:?}", pts_opt, duration,); let (gpu_frame_tx, gpu_frame_rx) = mpsc::sync_channel::<(Video, Option<Instant>)>(2);
}
})?;
let (gpu_frame_tx, gpu_frame_rx) = mpsc::channel::<(Video, Option<Instant>)>();
thread::Builder::new() thread::Builder::new()
.name("video_map_gpu_cpu".to_string()) .name("video_map_gpu_cpu".to_string())
.spawn(move || { .spawn(move || {
loop { loop {
let mut recv_opt: Option<(Video, Option<Instant>)> = None; let mut recv_opt: Option<(Video, Option<Instant>)> = None;
/*TODO: SKIP
while let Ok(recv) = gpu_frame_rx.try_recv() { while let Ok(recv) = gpu_frame_rx.try_recv() {
if let Some((old_frame, _)) = recv_opt { if let Some((old_frame, _)) = recv_opt {
//TODO: only skip if behind (frames come in weird timing from codecs) //TODO: only skip if behind (frames come in weird timing from codecs)
@ -252,6 +333,7 @@ fn ffmpeg_thread<P: AsRef<Path>>(
} }
recv_opt = Some(recv); recv_opt = Some(recv);
} }
*/
let (gpu_frame, sync_time_opt) = match recv_opt { let (gpu_frame, sync_time_opt) = match recv_opt {
Some(some) => some, Some(some) => some,
None => gpu_frame_rx.recv().unwrap(), None => gpu_frame_rx.recv().unwrap(),
@ -287,7 +369,8 @@ fn ffmpeg_thread<P: AsRef<Path>>(
} }
})?; })?;
let (video_packet_tx, video_packet_rx) = mpsc::channel::<(Packet, Option<Instant>)>(); // Sync channel to prevent getting too far behind
let (video_packet_tx, video_packet_rx) = mpsc::sync_channel::<(Packet, Option<Instant>)>(2);
thread::Builder::new() thread::Builder::new()
.name("video_decode".to_string()) .name("video_decode".to_string())
.spawn(move || { .spawn(move || {
@ -395,7 +478,7 @@ fn ffmpeg_thread<P: AsRef<Path>>(
}; };
{ {
let mut audio_queue = audio_queue_lock.lock().unwrap(); let mut audio_queue = audio_queue_lock.lock().unwrap();
audio_queue.extend(plane); audio_queue.data.extend(plane);
} }
} }
} }
@ -411,7 +494,6 @@ fn ffmpeg_thread<P: AsRef<Path>>(
if sleep > min_sleep { if sleep > min_sleep {
// We leave min_sleep of buffer room // We leave min_sleep of buffer room
log::debug!("audio ahead {:?}", sleep); log::debug!("audio ahead {:?}", sleep);
thread::sleep(sleep - min_sleep);
} }
} else { } else {
let skip = actual - expected; let skip = actual - expected;
@ -428,6 +510,9 @@ fn ffmpeg_thread<P: AsRef<Path>>(
Ok(()) Ok(())
}; };
//TODO: dynamically choose this
let buffer_duration = Duration::from_millis(250);
// Start CPAL stream // Start CPAL stream
cpal_stream.play()?; cpal_stream.play()?;
@ -454,6 +539,38 @@ fn ffmpeg_thread<P: AsRef<Path>>(
Err(_err) => {} Err(_err) => {}
} }
let (audio_queue_duration, audio_queue_delay) = {
let audio_queue = audio_queue_lock.lock().unwrap();
(audio_queue.duration(), audio_queue.delay)
};
let (video_queue_duration, video_queue_delay) = {
let mut video_queue = video_queue_lock.lock().unwrap();
let video_queue_duration = video_queue.duration();
if video_queue_duration < buffer_duration {
// If we do not have enough video queued, delay the video output
video_queue.delay = buffer_duration - video_queue_duration;
} else {
video_queue.delay = Duration::default();
}
// Add audio queue delay to sync with audio
video_queue.delay += audio_queue_delay;
(video_queue_duration, video_queue.delay)
};
println!(
"VIDEO: {:?}, {:?} AUDIO: {:?}, {:?}",
video_queue_duration, video_queue_delay, audio_queue_duration, audio_queue_delay
);
let min_queue_duration = cmp::min(video_queue_duration, audio_queue_duration);
if min_queue_duration > buffer_duration {
// If we have enough queued, we can sleep
let sleep = min_queue_duration - buffer_duration;
println!("SLEEP {:?}", sleep);
thread::sleep(sleep);
}
while let Ok(message) = player_rx.try_recv() { while let Ok(message) = player_rx.try_recv() {
match message { match message {
PlayerMessage::SeekRelative(seek_seconds) => { PlayerMessage::SeekRelative(seek_seconds) => {
@ -486,25 +603,20 @@ fn ffmpeg_thread<P: AsRef<Path>>(
Ok(()) Ok(())
} }
pub fn run( pub fn run(path: PathBuf) -> (mpsc::Sender<PlayerMessage>, Arc<Mutex<VideoQueue>>) {
path: PathBuf,
) -> (
mpsc::Sender<PlayerMessage>,
Arc<Mutex<VecDeque<VideoFrame>>>,
) {
ffmpeg::init().unwrap(); ffmpeg::init().unwrap();
let (player_tx, player_rx) = mpsc::channel(); let (player_tx, player_rx) = mpsc::channel();
let video_frames_lock = Arc::new(Mutex::new(VecDeque::new())); let video_queue_lock = Arc::new(Mutex::new(VideoQueue::new()));
{ {
let video_frames_lock = video_frames_lock.clone(); let video_queue_lock = video_queue_lock.clone();
thread::Builder::new() thread::Builder::new()
.name("ffmpeg".to_string()) .name("ffmpeg".to_string())
.spawn(move || { .spawn(move || {
ffmpeg_thread(path, player_rx, video_frames_lock).unwrap(); ffmpeg_thread(path, player_rx, video_queue_lock).unwrap();
}) })
.unwrap(); .unwrap();
} }
(player_tx, video_frames_lock) (player_tx, video_queue_lock)
} }