From 2fa67bd58dac5d384867a7b9253c989b277e060c Mon Sep 17 00:00:00 2001 From: Jeremy Soller Date: Fri, 26 Jan 2024 14:43:16 -0700 Subject: [PATCH] Fix video/audio sync --- src/main.rs | 32 +++--- src/player.rs | 290 ++++++++++++++++++++++++++++++++++---------------- 2 files changed, 218 insertions(+), 104 deletions(-) diff --git a/src/main.rs b/src/main.rs index e7de48d..9a2a8a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use cosmic::{ }; use std::{ any::TypeId, - collections::{HashMap, VecDeque}, + collections::HashMap, env, path::PathBuf, process, @@ -31,7 +31,7 @@ mod key_bind; mod localize; -use player::{PlayerMessage, VideoFrame}; +use player::{PlayerMessage, VideoFrame, VideoQueue}; mod player; /// Runs application with these settings @@ -67,7 +67,7 @@ fn main() -> Result<(), Box> { } }; - let (player_tx, video_frames_lock) = player::run(path); + let (player_tx, video_queue_lock) = player::run(path); let mut settings = Settings::default(); settings = settings.theme(config.app_theme.theme()); @@ -85,7 +85,7 @@ fn main() -> Result<(), Box> { config_handler, config, player_tx, - video_frames_lock, + video_queue_lock, }; cosmic::app::run::(settings, flags)?; @@ -114,7 +114,7 @@ pub struct Flags { config_handler: Option, config: Config, player_tx: mpsc::Sender, - video_frames_lock: Arc>>, + video_queue_lock: Arc>, } /// Messages that are used specifically by our [`App`]. @@ -299,23 +299,25 @@ impl Application for App { let start = Instant::now(); let mut video_frame_opt: Option = None; - { - let mut video_frames = self.flags.video_frames_lock.lock().unwrap(); - while let Some(video_frame) = video_frames.pop_front() { - if video_frame.1.unwrap_or(frame_time) <= frame_time { + let delayed_time = { + let mut video_queue = self.flags.video_queue_lock.lock().unwrap(); + let delayed_time = frame_time - video_queue.delay; + while let Some(video_frame) = video_queue.data.pop_front() { + if video_frame.1.unwrap_or(delayed_time) <= delayed_time { if let Some(old_frame) = video_frame_opt { - //TODO: log this outside of locking video_frames_lock? + //TODO: log this outside of locking video_queue_lock? log::warn!("skipping video frame {:?}", old_frame.0.pts()); } // Frame is ready to be shown video_frame_opt = Some(video_frame); } else { // Put frame back and exit loop - video_frames.push_front(video_frame); + video_queue.data.push_front(video_frame); break; } } - } + delayed_time + }; match video_frame_opt { Some(video_frame) => { @@ -331,13 +333,13 @@ impl Application for App { ); if let Some(present_time) = present_time_opt { - if present_time > frame_time { - let ahead = present_time - frame_time; + if present_time > delayed_time { + let ahead = present_time - delayed_time; if ahead > Duration::from_millis(1) { log::debug!("video ahead {:?}", ahead); } } else { - let behind = frame_time - present_time; + let behind = delayed_time - present_time; if behind > Duration::from_millis(1) { log::debug!("video behind {:?}", behind); } diff --git a/src/player.rs b/src/player.rs index 11a055b..4add82a 100644 --- a/src/player.rs +++ b/src/player.rs @@ -18,6 +18,7 @@ use ffmpeg::{ Packet, }; use std::{ + cmp, collections::VecDeque, error::Error, path::{Path, PathBuf}, @@ -27,6 +28,36 @@ use std::{ time::{Duration, Instant}, }; +//TODO: calculate presentation time of end of queue +pub struct AudioQueue { + pub channels: usize, + pub rate: f64, + pub data: VecDeque, + // Delay for data to hit speakers, used to sync with video + pub delay: Duration, +} + +impl AudioQueue { + pub fn new(channels: cpal::ChannelCount, rate: cpal::SampleRate) -> Self { + Self { + channels: channels as usize, + rate: rate.0 as f64, + data: VecDeque::new(), + delay: Duration::default(), + } + } + + pub fn duration(&self) -> Duration { + self.duration_for_samples(self.data.len()) + } + + pub fn duration_for_samples(&self, samples: usize) -> Duration { + let frames = samples / self.channels; + let seconds = (frames as f64) / self.rate; + Duration::from_secs_f64(seconds) + } +} + #[derive(Clone, Debug)] pub enum PlayerMessage { SeekRelative(f64), @@ -48,9 +79,44 @@ impl AsRef<[u8]> for VideoFrame { } } -fn cpal( - audio_queue_lock: Arc>>, -) -> (cpal::SupportedStreamConfig, Box) { +pub struct VideoQueue { + pub data: VecDeque, + // Delay to add to each frame to sync with audio + pub delay: Duration, +} + +impl VideoQueue { + pub fn new() -> Self { + Self { + data: VecDeque::new(), + delay: Duration::default(), + } + } + + pub fn duration(&self) -> Duration { + //TODO: can accurate duration actually be calculated since one frame would count as zero? + let mut start_end_opt = None; + for frame in self.data.iter() { + if let Some(frame_time) = frame.1 { + start_end_opt = Some(match start_end_opt { + Some((start, end)) => (cmp::min(start, frame_time), cmp::max(end, frame_time)), + None => (frame_time, frame_time), + }); + } + } + if let Some((start, end)) = start_end_opt { + end.duration_since(start) + } else { + Duration::default() + } + } +} + +fn cpal() -> ( + cpal::SupportedStreamConfig, + Box, + Arc>, +) { let host = cpal::default_host(); let device = host .default_output_device() @@ -60,8 +126,13 @@ fn cpal( .expect("failed to get default audio output config"); println!("{:?}: {:?}", device.name(), config); + let audio_queue_lock = Arc::new(Mutex::new(AudioQueue::new( + config.channels(), + config.sample_rate(), + ))); let stream = { let config = config.clone(); + let audio_queue_lock = audio_queue_lock.clone(); match config.sample_format() { cpal::SampleFormat::I8 => cpal_stream::(device, config.into(), audio_queue_lock), cpal::SampleFormat::I16 => cpal_stream::(device, config.into(), audio_queue_lock), @@ -82,26 +153,30 @@ fn cpal( .unwrap() }; - (config, stream) + (config, stream, audio_queue_lock) } fn cpal_stream( device: cpal::Device, config: cpal::StreamConfig, - audio_queue_lock: Arc>>, + audio_queue_lock: Arc>, ) -> Result, Box> where T: SizedSample + FromSample, { let data_fn = { - let audio_queue_lock = audio_queue_lock.clone(); - move |data: &mut [T], _: &cpal::OutputCallbackInfo| { + move |samples: &mut [T], info: &cpal::OutputCallbackInfo| { + let timestamp = info.timestamp(); + let delay = timestamp.playback.duration_since(×tamp.callback); + let mut underrun = 0; { //TODO: buffer audio let mut audio_queue = audio_queue_lock.lock().unwrap(); - for sample in data { - let float = match audio_queue.pop_front() { + //TODO: also add samples time? + audio_queue.delay = delay.unwrap_or_default(); + for sample in samples { + let float = match audio_queue.data.pop_front() { Some(some) => some, None => { underrun += 1; @@ -110,7 +185,7 @@ where }; *sample = T::from_sample(float); } - } + }; if underrun > 0 { log::error!("audio underrun {}", underrun); } @@ -124,10 +199,9 @@ where fn ffmpeg_thread>( path: P, player_rx: mpsc::Receiver, - video_frames_lock: Arc>>, + video_queue_lock: Arc>, ) -> Result<(), Box> { - let audio_queue_lock = Arc::new(Mutex::new(VecDeque::new())); - let (audio_config, cpal_stream) = cpal(audio_queue_lock.clone()); + let (audio_config, cpal_stream, audio_queue_lock) = cpal(); let mut ictx = input(&path)?; @@ -166,85 +240,92 @@ fn ffmpeg_thread>( video_decoder_context.decoder().video()? }; - let video_format = video_decoder.format(); - let video_width = video_decoder.width(); - let video_height = video_decoder.height(); let (cpu_frame_tx, cpu_frame_rx) = mpsc::channel::<(Video, Option)>(); - thread::Builder::new() - .name("video_scale".to_string()) - .spawn(move || { - let mut video_scaler = scaling::context::Context::get( - video_format, - video_width, - video_height, - Pixel::RGBA, - video_width, - video_height, - scaling::Flags::FAST_BILINEAR, - ) - .unwrap(); - - loop { - let mut recv_opt: Option<(Video, Option)> = None; - while let Ok(recv) = cpu_frame_rx.try_recv() { - if let Some((old_frame, _)) = recv_opt { - //TODO: only skip if behind (frames come in weird timing from codecs) - log::warn!("skipping cpu video frame at {:?}", old_frame.pts()); - } - recv_opt = Some(recv); - } - let (cpu_frame, sync_time_opt) = match recv_opt { - Some(some) => some, - None => cpu_frame_rx.recv().unwrap(), - }; - let pts_opt = cpu_frame.pts(); - - // Start count after blocking recv - let start = Instant::now(); - - video_scaler.cached( - cpu_frame.format(), - cpu_frame.width(), - cpu_frame.height(), + { + let video_format = video_decoder.format(); + let video_width = video_decoder.width(); + let video_height = video_decoder.height(); + let video_queue_lock = video_queue_lock.clone(); + thread::Builder::new() + .name("video_scale".to_string()) + .spawn(move || { + let mut video_scaler = scaling::context::Context::get( + video_format, + video_width, + video_height, Pixel::RGBA, - cpu_frame.width(), - cpu_frame.height(), + video_width, + video_height, scaling::Flags::FAST_BILINEAR, - ); + ) + .unwrap(); - let mut scaled_frame = Video::empty(); - video_scaler.run(&cpu_frame, &mut scaled_frame).unwrap(); - scaled_frame.set_pts(pts_opt); + loop { + let mut recv_opt: Option<(Video, Option)> = None; + /*TODO: SKIP + while let Ok(recv) = cpu_frame_rx.try_recv() { + if let Some((old_frame, _)) = recv_opt { + //TODO: only skip if behind (frames come in weird timing from codecs) + log::warn!("skipping cpu video frame at {:?}", old_frame.pts()); + } + recv_opt = Some(recv); + } + */ + let (cpu_frame, sync_time_opt) = match recv_opt { + Some(some) => some, + None => cpu_frame_rx.recv().unwrap(), + }; + let pts_opt = cpu_frame.pts(); - let present_time_opt = if let Some(pts) = pts_opt { - let expected_float = pts as f64 * video_time_base; - let expected = Duration::from_secs_f64(expected_float); - if let Some(sync_time) = sync_time_opt { - Some(sync_time + expected) + // Start count after blocking recv + let start = Instant::now(); + + video_scaler.cached( + cpu_frame.format(), + cpu_frame.width(), + cpu_frame.height(), + Pixel::RGBA, + cpu_frame.width(), + cpu_frame.height(), + scaling::Flags::FAST_BILINEAR, + ); + + let mut scaled_frame = Video::empty(); + video_scaler.run(&cpu_frame, &mut scaled_frame).unwrap(); + scaled_frame.set_pts(pts_opt); + + let present_time_opt = if let Some(pts) = pts_opt { + let expected_float = pts as f64 * video_time_base; + let expected = Duration::from_secs_f64(expected_float); + if let Some(sync_time) = sync_time_opt { + Some(sync_time + expected) + } else { + None + } } else { None + }; + + let video_frame = VideoFrame(scaled_frame, present_time_opt); + { + let mut video_queue = video_queue_lock.lock().unwrap(); + video_queue.data.push_back(video_frame); } - } else { - None - }; - let video_frame = VideoFrame(scaled_frame, present_time_opt); - { - let mut video_frames = video_frames_lock.lock().unwrap(); - video_frames.push_back(video_frame); + let duration = start.elapsed(); + log::debug!("scaled video frame at {:?} in {:?}", pts_opt, duration,); } + })? + }; - let duration = start.elapsed(); - log::debug!("scaled video frame at {:?} in {:?}", pts_opt, duration,); - } - })?; - - let (gpu_frame_tx, gpu_frame_rx) = mpsc::channel::<(Video, Option)>(); + // Sync channel to prevent allocation issues and falling behind + let (gpu_frame_tx, gpu_frame_rx) = mpsc::sync_channel::<(Video, Option)>(2); thread::Builder::new() .name("video_map_gpu_cpu".to_string()) .spawn(move || { loop { let mut recv_opt: Option<(Video, Option)> = None; + /*TODO: SKIP while let Ok(recv) = gpu_frame_rx.try_recv() { if let Some((old_frame, _)) = recv_opt { //TODO: only skip if behind (frames come in weird timing from codecs) @@ -252,6 +333,7 @@ fn ffmpeg_thread>( } recv_opt = Some(recv); } + */ let (gpu_frame, sync_time_opt) = match recv_opt { Some(some) => some, None => gpu_frame_rx.recv().unwrap(), @@ -287,7 +369,8 @@ fn ffmpeg_thread>( } })?; - let (video_packet_tx, video_packet_rx) = mpsc::channel::<(Packet, Option)>(); + // Sync channel to prevent getting too far behind + let (video_packet_tx, video_packet_rx) = mpsc::sync_channel::<(Packet, Option)>(2); thread::Builder::new() .name("video_decode".to_string()) .spawn(move || { @@ -395,7 +478,7 @@ fn ffmpeg_thread>( }; { let mut audio_queue = audio_queue_lock.lock().unwrap(); - audio_queue.extend(plane); + audio_queue.data.extend(plane); } } } @@ -411,7 +494,6 @@ fn ffmpeg_thread>( if sleep > min_sleep { // We leave min_sleep of buffer room log::debug!("audio ahead {:?}", sleep); - thread::sleep(sleep - min_sleep); } } else { let skip = actual - expected; @@ -428,6 +510,9 @@ fn ffmpeg_thread>( Ok(()) }; + //TODO: dynamically choose this + let buffer_duration = Duration::from_millis(250); + // Start CPAL stream cpal_stream.play()?; @@ -454,6 +539,38 @@ fn ffmpeg_thread>( Err(_err) => {} } + let (audio_queue_duration, audio_queue_delay) = { + let audio_queue = audio_queue_lock.lock().unwrap(); + (audio_queue.duration(), audio_queue.delay) + }; + + let (video_queue_duration, video_queue_delay) = { + let mut video_queue = video_queue_lock.lock().unwrap(); + let video_queue_duration = video_queue.duration(); + if video_queue_duration < buffer_duration { + // If we do not have enough video queued, delay the video output + video_queue.delay = buffer_duration - video_queue_duration; + } else { + video_queue.delay = Duration::default(); + } + // Add audio queue delay to sync with audio + video_queue.delay += audio_queue_delay; + (video_queue_duration, video_queue.delay) + }; + + println!( + "VIDEO: {:?}, {:?} AUDIO: {:?}, {:?}", + video_queue_duration, video_queue_delay, audio_queue_duration, audio_queue_delay + ); + + let min_queue_duration = cmp::min(video_queue_duration, audio_queue_duration); + if min_queue_duration > buffer_duration { + // If we have enough queued, we can sleep + let sleep = min_queue_duration - buffer_duration; + println!("SLEEP {:?}", sleep); + thread::sleep(sleep); + } + while let Ok(message) = player_rx.try_recv() { match message { PlayerMessage::SeekRelative(seek_seconds) => { @@ -486,25 +603,20 @@ fn ffmpeg_thread>( Ok(()) } -pub fn run( - path: PathBuf, -) -> ( - mpsc::Sender, - Arc>>, -) { +pub fn run(path: PathBuf) -> (mpsc::Sender, Arc>) { ffmpeg::init().unwrap(); let (player_tx, player_rx) = mpsc::channel(); - let video_frames_lock = Arc::new(Mutex::new(VecDeque::new())); + let video_queue_lock = Arc::new(Mutex::new(VideoQueue::new())); { - let video_frames_lock = video_frames_lock.clone(); + let video_queue_lock = video_queue_lock.clone(); thread::Builder::new() .name("ffmpeg".to_string()) .spawn(move || { - ffmpeg_thread(path, player_rx, video_frames_lock).unwrap(); + ffmpeg_thread(path, player_rx, video_queue_lock).unwrap(); }) .unwrap(); } - (player_tx, video_frames_lock) + (player_tx, video_queue_lock) }