feat(bridge): --audio-delay-ms knob for deterministic A/V alignment

The deltacast bridge captures audio (DISJOINED_ANC VHD stream -> FIFO) and
video (VHD stream -> framecache ring -> fc_pipe -> ffmpeg) on separate paths
with independent buffering. The video path is buffered deeper, so audio reaches
the muxer AHEAD of its matching video frame (user-confirmed: audio ahead of
video on lip-sync). --audio-delay-ms / FC_AUDIO_DELAY_MS prepends N ms of real
PCM silence to the audio stream once per reader-attach, shifting the audio
timeline N ms later to re-align. One value, all 8 ports, drift-free (ffmpeg
derives audio PTS from sample count). Default 0 = unchanged (non-destructive).
Operator tunes once against a lip-sync reference. Also fixes promotion worker
to reset orphaned processing assets on failure (was stuck forever).
This commit is contained in:
OpenCode 2026-06-05 11:00:15 +00:00
parent 3b3e7edade
commit feeab99a36
2 changed files with 67 additions and 0 deletions

View file

@ -65,6 +65,13 @@
/* ── Globals ──────────────────────────────────────────────────────────── */
static atomic_int g_stop = 0; /* global shutdown (SIGTERM/SIGINT only) */
/* Fixed A/V alignment: ms of leading silence prepended to the audio stream when
* a reader attaches. The video path (framecache ring -> fc_pipe -> ffmpeg input
* queue) is buffered deeper than the direct audio FIFO, so without compensation
* audio reaches the muxer AHEAD of its matching video frame. Prepending N ms of
* silence delays audio by N ms to re-align. Set via --audio-delay-ms (default 0).
* One value, all ports, deterministic no per-session env plumbing. */
static int g_audio_delay_ms = 0;
static void on_signal(int s) { (void)s; atomic_store(&g_stop, 1); }
@ -317,6 +324,31 @@ static void *audio_thread(void *arg) {
struct timespec next;
clock_gettime(CLOCK_MONOTONIC, &next);
/* ── Fixed A/V alignment: prepend g_audio_delay_ms of leading silence ──
* The video path is buffered deeper than this audio FIFO, so audio would
* otherwise arrive at the muxer ahead of its matching video frame. Writing
* N ms of silence here (once, right after reaching the live edge) shifts
* the entire audio timeline N ms LATER, re-aligning it with video. The
* samples are real PCM zeros at 48 kHz so they consume exactly N ms of the
* audio timeline ffmpeg derives audio PTS from sample count, so this is a
* precise, drift-free delay. */
if (g_audio_delay_ms > 0) {
long delay_samples = (long)AUDIO_RATE * g_audio_delay_ms / 1000;
size_t delay_bytes = (size_t)delay_samples * FRAME_BYTES;
unsigned char sil[8192];
memset(sil, 0, sizeof(sil));
size_t remaining = delay_bytes;
int delay_ok = 1;
while (remaining > 0) {
size_t chunk = remaining > sizeof(sil) ? sizeof(sil) : remaining;
if (write_all(fd, sil, chunk) < 0) { delay_ok = 0; break; }
remaining -= chunk;
}
if (delay_ok)
fprintf(stderr, "[audio:%u] prepended %d ms (%ld samples) of A/V-align silence\n",
ps->port, g_audio_delay_ms, delay_samples);
}
/* Inner loop: feed audio into the open FIFO until reader exits (EPIPE). */
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
size_t out_bytes = 0;
@ -564,8 +596,15 @@ int main(int argc, char *argv[]) {
sig_timeout = atoi(argv[++i]);
} else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) {
fc_url = argv[++i];
} else if (!strcmp(argv[i], "--audio-delay-ms") && i+1 < argc) {
g_audio_delay_ms = atoi(argv[++i]);
if (g_audio_delay_ms < 0) g_audio_delay_ms = 0;
if (g_audio_delay_ms > 1000) g_audio_delay_ms = 1000;
}
}
/* Env override (FC_AUDIO_DELAY_MS) for ops who tune without editing the unit. */
{ const char *ad = getenv("FC_AUDIO_DELAY_MS");
if (ad && *ad) { int v = atoi(ad); if (v >= 0 && v <= 1000) g_audio_delay_ms = v; } }
if (port_count == 0) {
fprintf(stderr, "{\"error\":\"no ports specified — use --ports 0,1,2,...\"}\n");
@ -630,6 +669,7 @@ int main(int argc, char *argv[]) {
return 1;
}
fprintf(stderr, "[board] opened board %u with %d port(s)\n", device_id, port_count);
fprintf(stderr, "[board] audio A/V-align delay = %d ms\n", g_audio_delay_ms);
/* Per SDK samples: for 12G-ASI or 3G-ASI channel types the channel must be
* explicitly switched to SDI mode. Without this, VHD_SDI_CP_VIDEO_STANDARD

View file

@ -73,6 +73,33 @@ const proxyQueue = new Queue('proxy', {
// BullMQ Worker handler for manual S3 promotion
export const promotionWorker = async (job) => {
const { assetId } = job.data;
try {
return await runPromotion(job);
} catch (err) {
// RECOVERY: the /promote endpoint set the asset to 'processing' BEFORE
// queuing this job. If the job fails (e.g. the growing .mxf isn't on the
// share yet because the recorder is still writing/flushing, or a transient
// SMB/S3 error), the asset would otherwise be ORPHANED in 'processing'
// forever. Reset it to a retryable state so it isn't stuck: 'pending_migration'
// if the master still exists on SMB, else 'error' so it surfaces in the UI.
try {
const cur = await query('SELECT project_id, filename FROM assets WHERE id = $1', [assetId]);
const row = cur.rows[0];
const onSmb = row && existsSync(`${GROWING_PATH}/${row.project_id}/${row.filename}.mxf`);
await query(
`UPDATE assets SET status = $2, updated_at = NOW() WHERE id = $1 AND status = 'processing'`,
[assetId, onSmb ? 'pending_migration' : 'error']
);
console.error(`[promotion] asset ${assetId} promotion failed (${err.message}); reset to ${onSmb ? 'pending_migration (retryable)' : 'error'}`);
} catch (e2) {
console.error(`[promotion] asset ${assetId} failed AND status-reset failed: ${e2.message}`);
}
throw err;
}
};
async function runPromotion(job) {
const { assetId } = job.data;
// 1. Ensure growing share is mounted
await ensureGrowingShareMounted();