From feeab99a36d7190038dff27e8acccc40a1d3c3d7 Mon Sep 17 00:00:00 2001 From: OpenCode Date: Fri, 5 Jun 2026 11:00:15 +0000 Subject: [PATCH] feat(bridge): --audio-delay-ms knob for deterministic A/V alignment The deltacast bridge captures audio (DISJOINED_ANC VHD stream -> FIFO) and video (VHD stream -> framecache ring -> fc_pipe -> ffmpeg) on separate paths with independent buffering. The video path is buffered deeper, so audio reaches the muxer AHEAD of its matching video frame (user-confirmed: audio ahead of video on lip-sync). --audio-delay-ms / FC_AUDIO_DELAY_MS prepends N ms of real PCM silence to the audio stream once per reader-attach, shifting the audio timeline N ms later to re-align. One value, all 8 ports, drift-free (ffmpeg derives audio PTS from sample count). Default 0 = unchanged (non-destructive). Operator tunes once against a lip-sync reference. Also fixes promotion worker to reset orphaned processing assets on failure (was stuck forever). --- services/capture/deltacast-bridge/main.c | 40 ++++++++++++++++++++++++ services/worker/src/workers/promotion.js | 27 ++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/services/capture/deltacast-bridge/main.c b/services/capture/deltacast-bridge/main.c index c184c50..db68970 100644 --- a/services/capture/deltacast-bridge/main.c +++ b/services/capture/deltacast-bridge/main.c @@ -65,6 +65,13 @@ /* ── Globals ──────────────────────────────────────────────────────────── */ static atomic_int g_stop = 0; /* global shutdown (SIGTERM/SIGINT only) */ +/* Fixed A/V alignment: ms of leading silence prepended to the audio stream when + * a reader attaches. The video path (framecache ring -> fc_pipe -> ffmpeg input + * queue) is buffered deeper than the direct audio FIFO, so without compensation + * audio reaches the muxer AHEAD of its matching video frame. Prepending N ms of + * silence delays audio by N ms to re-align. Set via --audio-delay-ms (default 0). + * One value, all ports, deterministic — no per-session env plumbing. */ +static int g_audio_delay_ms = 0; static void on_signal(int s) { (void)s; atomic_store(&g_stop, 1); } @@ -317,6 +324,31 @@ static void *audio_thread(void *arg) { struct timespec next; clock_gettime(CLOCK_MONOTONIC, &next); + /* ── Fixed A/V alignment: prepend g_audio_delay_ms of leading silence ── + * The video path is buffered deeper than this audio FIFO, so audio would + * otherwise arrive at the muxer ahead of its matching video frame. Writing + * N ms of silence here (once, right after reaching the live edge) shifts + * the entire audio timeline N ms LATER, re-aligning it with video. The + * samples are real PCM zeros at 48 kHz so they consume exactly N ms of the + * audio timeline — ffmpeg derives audio PTS from sample count, so this is a + * precise, drift-free delay. */ + if (g_audio_delay_ms > 0) { + long delay_samples = (long)AUDIO_RATE * g_audio_delay_ms / 1000; + size_t delay_bytes = (size_t)delay_samples * FRAME_BYTES; + unsigned char sil[8192]; + memset(sil, 0, sizeof(sil)); + size_t remaining = delay_bytes; + int delay_ok = 1; + while (remaining > 0) { + size_t chunk = remaining > sizeof(sil) ? sizeof(sil) : remaining; + if (write_all(fd, sil, chunk) < 0) { delay_ok = 0; break; } + remaining -= chunk; + } + if (delay_ok) + fprintf(stderr, "[audio:%u] prepended %d ms (%ld samples) of A/V-align silence\n", + ps->port, g_audio_delay_ms, delay_samples); + } + /* Inner loop: feed audio into the open FIFO until reader exits (EPIPE). */ while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { size_t out_bytes = 0; @@ -564,8 +596,15 @@ int main(int argc, char *argv[]) { sig_timeout = atoi(argv[++i]); } else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) { fc_url = argv[++i]; + } else if (!strcmp(argv[i], "--audio-delay-ms") && i+1 < argc) { + g_audio_delay_ms = atoi(argv[++i]); + if (g_audio_delay_ms < 0) g_audio_delay_ms = 0; + if (g_audio_delay_ms > 1000) g_audio_delay_ms = 1000; } } + /* Env override (FC_AUDIO_DELAY_MS) for ops who tune without editing the unit. */ + { const char *ad = getenv("FC_AUDIO_DELAY_MS"); + if (ad && *ad) { int v = atoi(ad); if (v >= 0 && v <= 1000) g_audio_delay_ms = v; } } if (port_count == 0) { fprintf(stderr, "{\"error\":\"no ports specified — use --ports 0,1,2,...\"}\n"); @@ -630,6 +669,7 @@ int main(int argc, char *argv[]) { return 1; } fprintf(stderr, "[board] opened board %u with %d port(s)\n", device_id, port_count); + fprintf(stderr, "[board] audio A/V-align delay = %d ms\n", g_audio_delay_ms); /* Per SDK samples: for 12G-ASI or 3G-ASI channel types the channel must be * explicitly switched to SDI mode. Without this, VHD_SDI_CP_VIDEO_STANDARD diff --git a/services/worker/src/workers/promotion.js b/services/worker/src/workers/promotion.js index 8dd0874..e9e2e86 100644 --- a/services/worker/src/workers/promotion.js +++ b/services/worker/src/workers/promotion.js @@ -73,6 +73,33 @@ const proxyQueue = new Queue('proxy', { // BullMQ Worker handler for manual S3 promotion export const promotionWorker = async (job) => { const { assetId } = job.data; + try { + return await runPromotion(job); + } catch (err) { + // RECOVERY: the /promote endpoint set the asset to 'processing' BEFORE + // queuing this job. If the job fails (e.g. the growing .mxf isn't on the + // share yet because the recorder is still writing/flushing, or a transient + // SMB/S3 error), the asset would otherwise be ORPHANED in 'processing' + // forever. Reset it to a retryable state so it isn't stuck: 'pending_migration' + // if the master still exists on SMB, else 'error' so it surfaces in the UI. + try { + const cur = await query('SELECT project_id, filename FROM assets WHERE id = $1', [assetId]); + const row = cur.rows[0]; + const onSmb = row && existsSync(`${GROWING_PATH}/${row.project_id}/${row.filename}.mxf`); + await query( + `UPDATE assets SET status = $2, updated_at = NOW() WHERE id = $1 AND status = 'processing'`, + [assetId, onSmb ? 'pending_migration' : 'error'] + ); + console.error(`[promotion] asset ${assetId} promotion failed (${err.message}); reset to ${onSmb ? 'pending_migration (retryable)' : 'error'}`); + } catch (e2) { + console.error(`[promotion] asset ${assetId} failed AND status-reset failed: ${e2.message}`); + } + throw err; + } +}; + +async function runPromotion(job) { + const { assetId } = job.data; // 1. Ensure growing share is mounted await ensureGrowingShareMounted();