From c083d1006a39d4b2a3c0fdebcf6daf5224c0d70f Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 01:35:07 +0000 Subject: [PATCH] fix: deltacast audio missing after ffmpeg restart (EPIPE cascade + stale FIFO guard) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause A (main.c): audio_thread set the global g_stop flag on EPIPE (ffmpeg reader died). This killed ALL port threads across the entire bridge process. Bridge process then exited with all 8 ports gone. Root cause B (node-agent/index.js): startDeltacastBridge() skipped respawn when FIFOs existed in /dev/shm/deltacast, even if the bridge process was dead. Next ffmpeg opened the audio FIFO read-end and blocked forever (no writer) → no audio (or video) for any new recording. Fix A (main.c): - Add per-port atomic g_port_stop[MAX_PORTS] flags. - Audio thread: on EPIPE, close the FIFO fd and loop back to reopen it. The VHD ANC stream stays open across reconnects. Other ports unaffected. - Video thread: on EPIPE or stream error, set only g_port_stop[port], not the global g_stop. Other ports keep running. - MAX_PORTS #define moved before globals so g_port_stop[MAX_PORTS] compiles. Fix B (node-agent/index.js): - Add _dcBridgeProcessAlive() — scans /proc//cmdline for deltacast-bridge. - startDeltacastBridge(): if FIFOs exist but no live bridge process is found, spawn a fresh bridge instead of silently returning. Detects bridges started externally (e.g. sudo on the host before node-agent started). Requires: bridge rebuild + restart on zampp3. No capture image rebuild needed. Co-Authored-By: Claude Sonnet 4.6 --- services/capture/deltacast-bridge/main.c | 150 ++++++++++++++--------- services/node-agent/index.js | 32 ++++- 2 files changed, 122 insertions(+), 60 deletions(-) diff --git a/services/capture/deltacast-bridge/main.c b/services/capture/deltacast-bridge/main.c index 99f4cec..8a7a387 100644 --- a/services/capture/deltacast-bridge/main.c +++ b/services/capture/deltacast-bridge/main.c @@ -37,13 +37,18 @@ #include "VideoMasterHD_Sdi.h" #include "VideoMasterHD_Sdi_Audio.h" +/* ── Constants ────────────────────────────────────────────────────────── */ +#define MAX_PORTS 8 + /* ── Globals ──────────────────────────────────────────────────────────── */ -static atomic_int g_stop = 0; +static atomic_int g_stop = 0; /* global shutdown (SIGTERM/SIGINT only) */ static void on_signal(int s) { (void)s; atomic_store(&g_stop, 1); } -/* ── Constants ────────────────────────────────────────────────────────── */ -#define MAX_PORTS 8 +/* Per-port stop flag — set only when a fatal error occurs on that specific + * port (e.g. video lock lost). Audio EPIPE is handled by reopening the FIFO + * rather than stopping the port, so the thread survives ffmpeg restarts. */ +static atomic_int g_port_stop[MAX_PORTS]; /* ── Stream type by port index (non-contiguous SDK enum) ────────────── */ static ULONG rx_streamtype(unsigned port) { @@ -135,20 +140,16 @@ typedef struct { /* ── Audio thread ────────────────────────────────────────────────────── * - * Identical design to the single-port bridge audio thread: - * - Opens FIFO writer FIRST, unconditionally (unblocks ffmpeg input) - * - Feeds continuous wall-clock-paced s16le stereo (real or silence) - * - Best-effort VHD audio stream; silence fallback on any failure + * - Opens FIFO writer (blocks until a reader connects — correct behaviour). + * - Feeds continuous wall-clock-paced s16le stereo (real or silence). + * - Best-effort VHD audio stream; silence fallback on any failure. + * - On EPIPE (ffmpeg reader died): closes and REOPENS the FIFO so the + * thread survives an ffmpeg restart without bringing down other ports. + * EPIPE never sets g_stop — only SIGTERM/SIGINT does that. */ static void *audio_thread(void *arg) { PortState *ps = (PortState *)arg; - int fd = open(ps->audio_fifo, O_WRONLY); - if (fd < 0) { - fprintf(stderr, "[audio:%u] open FIFO failed: %s\n", ps->port, strerror(errno)); - return NULL; - } - const int AUDIO_RATE = 48000; const int CHANNELS = 2; const size_t FRAME_BYTES = (size_t)CHANNELS * 2; /* s16le stereo */ @@ -165,8 +166,10 @@ static void *audio_thread(void *arg) { size_t vhd_buf_sz = ((size_t)max_samples + 64) * (block_size ? block_size : FRAME_BYTES); size_t buf_sz = vhd_buf_sz > tick_bytes ? vhd_buf_sz : tick_bytes; unsigned char *buf = calloc(1, buf_sz); - if (!buf) { close(fd); return NULL; } + if (!buf) return NULL; + /* Open the VHD audio stream once for the lifetime of the bridge. + * The stream stays open across reader reconnects — no need to reopen it. */ HANDLE stream = NULL; int have_vhd_audio = 0; VHD_AUDIOINFO ai; @@ -196,56 +199,80 @@ static void *audio_thread(void *arg) { ps->port, r); } - struct timespec next; - clock_gettime(CLOCK_MONOTONIC, &next); long frame_ns = (long)(1000000000.0 * (double)fps_den / (double)fps_num); HANDLE slot = NULL; - while (!atomic_load(&g_stop)) { - size_t out_bytes = 0; + /* Outer loop: reopen the FIFO writer each time a reader connects. + * This allows the bridge to survive ffmpeg session stop/restart on a port + * without affecting any other port's threads. */ + while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { - if (have_vhd_audio) { - r = VHD_LockSlotHandle(stream, &slot); - if (r == VHDERR_NOERROR) { - ai.pAudioGroups[0].pAudioChannels[0].DataSize = (ULONG)buf_sz; - if (VHD_SlotExtractAudio(slot, &ai) == VHDERR_NOERROR) { - ULONG sz = ai.pAudioGroups[0].pAudioChannels[0].DataSize; - if (sz > 0 && (size_t)sz <= buf_sz) out_bytes = (size_t)sz; + int fd = open(ps->audio_fifo, O_WRONLY); + if (fd < 0) { + /* Open failed (rare — FIFO was deleted?). Brief pause then retry. */ + fprintf(stderr, "[audio:%u] open FIFO failed: %s\n", ps->port, strerror(errno)); + struct timespec ts = {0, 200000000L}; + nanosleep(&ts, NULL); + continue; + } + fprintf(stderr, "[audio:%u] FIFO writer connected\n", ps->port); + + /* Reset wall-clock baseline after potentially blocking on open(). */ + struct timespec next; + clock_gettime(CLOCK_MONOTONIC, &next); + + /* Inner loop: feed audio into the open FIFO until reader exits (EPIPE). */ + while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { + size_t out_bytes = 0; + + if (have_vhd_audio) { + r = VHD_LockSlotHandle(stream, &slot); + if (r == VHDERR_NOERROR) { + ai.pAudioGroups[0].pAudioChannels[0].DataSize = (ULONG)buf_sz; + if (VHD_SlotExtractAudio(slot, &ai) == VHDERR_NOERROR) { + ULONG sz = ai.pAudioGroups[0].pAudioChannels[0].DataSize; + if (sz > 0 && (size_t)sz <= buf_sz) out_bytes = (size_t)sz; + } + VHD_UnlockSlotHandle(slot); + } else if (r != VHDERR_TIMEOUT) { + fprintf(stderr, "[audio:%u] lock error %lu — degrading to silence\n", + ps->port, r); + VHD_StopStream(stream); + VHD_CloseStreamHandle(stream); + stream = NULL; + have_vhd_audio = 0; } - VHD_UnlockSlotHandle(slot); - } else if (r != VHDERR_TIMEOUT) { - fprintf(stderr, "[audio:%u] lock error %lu — degrading to silence\n", - ps->port, r); - VHD_StopStream(stream); - VHD_CloseStreamHandle(stream); - stream = NULL; - have_vhd_audio = 0; + } + + if (out_bytes == 0) { + memset(buf, 0, tick_bytes); + out_bytes = tick_bytes; + } + + if (write_all(fd, buf, out_bytes) < 0) { + /* EPIPE: ffmpeg reader on this port died (session stop/restart). + * Close and break to the outer loop which will reopen and block + * until the next ffmpeg reader connects. + * Do NOT set g_stop — other ports must keep running. */ + fprintf(stderr, "[audio:%u] EPIPE — waiting for next reader\n", ps->port); + break; + } + + next.tv_nsec += frame_ns; + while (next.tv_nsec >= 1000000000L) { next.tv_nsec -= 1000000000L; next.tv_sec += 1; } + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + if (next.tv_sec > now.tv_sec || + (next.tv_sec == now.tv_sec && next.tv_nsec > now.tv_nsec)) { + clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, NULL); + } else { + next = now; } } - if (out_bytes == 0) { - memset(buf, 0, tick_bytes); - out_bytes = tick_bytes; - } - - if (write_all(fd, buf, out_bytes) < 0) { - atomic_store(&g_stop, 1); - break; - } - - next.tv_nsec += frame_ns; - while (next.tv_nsec >= 1000000000L) { next.tv_nsec -= 1000000000L; next.tv_sec += 1; } - struct timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); - if (next.tv_sec > now.tv_sec || - (next.tv_sec == now.tv_sec && next.tv_nsec > now.tv_nsec)) { - clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, NULL); - } else { - next = now; - } + close(fd); } - close(fd); if (stream) { VHD_StopStream(stream); VHD_CloseStreamHandle(stream); @@ -261,27 +288,31 @@ static void *video_thread(void *arg) { int fd = open(ps->video_fifo, O_WRONLY); if (fd < 0) { fprintf(stderr, "[video:%u] open FIFO failed: %s\n", ps->port, strerror(errno)); + atomic_store(&g_port_stop[ps->port], 1); return NULL; } HANDLE slot = NULL; - while (!atomic_load(&g_stop)) { + while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot); if (r == VHDERR_NOERROR) { BYTE *buf = NULL; ULONG sz = 0; if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) { if (write_all(fd, buf, sz) < 0) { - atomic_store(&g_stop, 1); + /* EPIPE on video: the capture sidecar for this port died. + * Stop only this port's threads — other ports unaffected. */ + fprintf(stderr, "[video:%u] EPIPE — stopping port\n", ps->port); + atomic_store(&g_port_stop[ps->port], 1); VHD_UnlockSlotHandle(slot); break; } } VHD_UnlockSlotHandle(slot); } else if (r != VHDERR_TIMEOUT) { - fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping\n", + fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping port\n", ps->port, r); - atomic_store(&g_stop, 1); + atomic_store(&g_port_stop[ps->port], 1); break; } } @@ -427,6 +458,9 @@ int main(int argc, char *argv[]) { memset(ps, 0, sizeof(ps)); int active_count = 0; + /* Initialise per-port stop flags. */ + for (int pi = 0; pi < MAX_PORTS; pi++) atomic_store(&g_port_stop[pi], 0); + for (int pi = 0; pi < port_count; pi++) { if (!locked[pi]) continue; PortState *p = &ps[active_count]; diff --git a/services/node-agent/index.js b/services/node-agent/index.js index b2e7243..6a8906d 100644 --- a/services/node-agent/index.js +++ b/services/node-agent/index.js @@ -82,12 +82,40 @@ function _dcBridgeRunning() { return _dcBridge !== null && _dcBridge.exitCode === null && _dcBridge.signalCode === null; } +// Check /proc on Linux to see if a deltacast-bridge process is alive. +// Used by startDeltacastBridge() to detect a bridge started outside node-agent +// (e.g. manually with sudo, or from a prior node-agent process). +function _dcBridgeProcessAlive() { + try { + for (const pid of fs.readdirSync('/proc')) { + if (!/^\d+$/.test(pid)) continue; + try { + // cmdline is NUL-delimited; read as binary-friendly string. + const cmdline = fs.readFileSync(`/proc/${pid}/cmdline`, 'latin1'); + if (cmdline.includes('deltacast-bridge')) return true; + } catch (_) { /* process may have exited mid-scan */ } + } + } catch (_) {} + return false; +} + function startDeltacastBridge() { - if (_dcBridgeRunning()) return; // already up + if (_dcBridgeRunning()) return; // already up (we spawned it) try { fs.mkdirSync(DC_PIPE_DIR, { recursive: true }); } catch (_) {} + + // FIFOs may exist from a previous run. Only skip the spawn if a + // deltacast-bridge process is actually alive on the host — stale FIFOs with + // no live writer cause ffmpeg to block on open() indefinitely (no audio/video). const _v0 = DC_PIPE_DIR + '/video-0.fifo'; - if (fs.existsSync(_v0)) { console.log('[dc-bridge] FIFOs exist, skipping spawn'); return; } + if (fs.existsSync(_v0)) { + if (_dcBridgeProcessAlive()) { + console.log('[dc-bridge] FIFOs exist and bridge process alive — skipping spawn'); + return; + } + console.log('[dc-bridge] FIFOs exist but bridge is NOT running — spawning fresh bridge'); + // Stale FIFOs are harmless: the bridge recreates them (mkfifo ignores EEXIST). + } const args = [ '--device', DC_BOARD,