fix: deltacast audio missing after ffmpeg restart (EPIPE cascade + stale FIFO guard)

Root cause A (main.c): audio_thread set the global g_stop flag on EPIPE
(ffmpeg reader died). This killed ALL port threads across the entire bridge
process. Bridge process then exited with all 8 ports gone.

Root cause B (node-agent/index.js): startDeltacastBridge() skipped respawn
when FIFOs existed in /dev/shm/deltacast, even if the bridge process was dead.
Next ffmpeg opened the audio FIFO read-end and blocked forever (no writer) →
no audio (or video) for any new recording.

Fix A (main.c):
- Add per-port atomic g_port_stop[MAX_PORTS] flags.
- Audio thread: on EPIPE, close the FIFO fd and loop back to reopen it.
  The VHD ANC stream stays open across reconnects. Other ports unaffected.
- Video thread: on EPIPE or stream error, set only g_port_stop[port], not
  the global g_stop. Other ports keep running.
- MAX_PORTS #define moved before globals so g_port_stop[MAX_PORTS] compiles.

Fix B (node-agent/index.js):
- Add _dcBridgeProcessAlive() — scans /proc/<pid>/cmdline for deltacast-bridge.
- startDeltacastBridge(): if FIFOs exist but no live bridge process is found,
  spawn a fresh bridge instead of silently returning. Detects bridges started
  externally (e.g. sudo on the host before node-agent started).

Requires: bridge rebuild + restart on zampp3. No capture image rebuild needed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Claude 2026-06-02 01:35:07 +00:00
parent 1c068b470e
commit c083d1006a
2 changed files with 122 additions and 60 deletions

View file

@ -37,13 +37,18 @@
#include "VideoMasterHD_Sdi.h"
#include "VideoMasterHD_Sdi_Audio.h"
/* ── Constants ────────────────────────────────────────────────────────── */
#define MAX_PORTS 8
/* ── Globals ──────────────────────────────────────────────────────────── */
static atomic_int g_stop = 0;
static atomic_int g_stop = 0; /* global shutdown (SIGTERM/SIGINT only) */
static void on_signal(int s) { (void)s; atomic_store(&g_stop, 1); }
/* ── Constants ────────────────────────────────────────────────────────── */
#define MAX_PORTS 8
/* Per-port stop flag — set only when a fatal error occurs on that specific
* port (e.g. video lock lost). Audio EPIPE is handled by reopening the FIFO
* rather than stopping the port, so the thread survives ffmpeg restarts. */
static atomic_int g_port_stop[MAX_PORTS];
/* ── Stream type by port index (non-contiguous SDK enum) ────────────── */
static ULONG rx_streamtype(unsigned port) {
@ -135,20 +140,16 @@ typedef struct {
/* ── Audio thread ──────────────────────────────────────────────────────
*
* Identical design to the single-port bridge audio thread:
* - Opens FIFO writer FIRST, unconditionally (unblocks ffmpeg input)
* - Feeds continuous wall-clock-paced s16le stereo (real or silence)
* - Best-effort VHD audio stream; silence fallback on any failure
* - Opens FIFO writer (blocks until a reader connects correct behaviour).
* - Feeds continuous wall-clock-paced s16le stereo (real or silence).
* - Best-effort VHD audio stream; silence fallback on any failure.
* - On EPIPE (ffmpeg reader died): closes and REOPENS the FIFO so the
* thread survives an ffmpeg restart without bringing down other ports.
* EPIPE never sets g_stop only SIGTERM/SIGINT does that.
*/
static void *audio_thread(void *arg) {
PortState *ps = (PortState *)arg;
int fd = open(ps->audio_fifo, O_WRONLY);
if (fd < 0) {
fprintf(stderr, "[audio:%u] open FIFO failed: %s\n", ps->port, strerror(errno));
return NULL;
}
const int AUDIO_RATE = 48000;
const int CHANNELS = 2;
const size_t FRAME_BYTES = (size_t)CHANNELS * 2; /* s16le stereo */
@ -165,8 +166,10 @@ static void *audio_thread(void *arg) {
size_t vhd_buf_sz = ((size_t)max_samples + 64) * (block_size ? block_size : FRAME_BYTES);
size_t buf_sz = vhd_buf_sz > tick_bytes ? vhd_buf_sz : tick_bytes;
unsigned char *buf = calloc(1, buf_sz);
if (!buf) { close(fd); return NULL; }
if (!buf) return NULL;
/* Open the VHD audio stream once for the lifetime of the bridge.
* The stream stays open across reader reconnects no need to reopen it. */
HANDLE stream = NULL;
int have_vhd_audio = 0;
VHD_AUDIOINFO ai;
@ -196,56 +199,80 @@ static void *audio_thread(void *arg) {
ps->port, r);
}
struct timespec next;
clock_gettime(CLOCK_MONOTONIC, &next);
long frame_ns = (long)(1000000000.0 * (double)fps_den / (double)fps_num);
HANDLE slot = NULL;
while (!atomic_load(&g_stop)) {
size_t out_bytes = 0;
/* Outer loop: reopen the FIFO writer each time a reader connects.
* This allows the bridge to survive ffmpeg session stop/restart on a port
* without affecting any other port's threads. */
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
if (have_vhd_audio) {
r = VHD_LockSlotHandle(stream, &slot);
if (r == VHDERR_NOERROR) {
ai.pAudioGroups[0].pAudioChannels[0].DataSize = (ULONG)buf_sz;
if (VHD_SlotExtractAudio(slot, &ai) == VHDERR_NOERROR) {
ULONG sz = ai.pAudioGroups[0].pAudioChannels[0].DataSize;
if (sz > 0 && (size_t)sz <= buf_sz) out_bytes = (size_t)sz;
int fd = open(ps->audio_fifo, O_WRONLY);
if (fd < 0) {
/* Open failed (rare — FIFO was deleted?). Brief pause then retry. */
fprintf(stderr, "[audio:%u] open FIFO failed: %s\n", ps->port, strerror(errno));
struct timespec ts = {0, 200000000L};
nanosleep(&ts, NULL);
continue;
}
fprintf(stderr, "[audio:%u] FIFO writer connected\n", ps->port);
/* Reset wall-clock baseline after potentially blocking on open(). */
struct timespec next;
clock_gettime(CLOCK_MONOTONIC, &next);
/* Inner loop: feed audio into the open FIFO until reader exits (EPIPE). */
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
size_t out_bytes = 0;
if (have_vhd_audio) {
r = VHD_LockSlotHandle(stream, &slot);
if (r == VHDERR_NOERROR) {
ai.pAudioGroups[0].pAudioChannels[0].DataSize = (ULONG)buf_sz;
if (VHD_SlotExtractAudio(slot, &ai) == VHDERR_NOERROR) {
ULONG sz = ai.pAudioGroups[0].pAudioChannels[0].DataSize;
if (sz > 0 && (size_t)sz <= buf_sz) out_bytes = (size_t)sz;
}
VHD_UnlockSlotHandle(slot);
} else if (r != VHDERR_TIMEOUT) {
fprintf(stderr, "[audio:%u] lock error %lu — degrading to silence\n",
ps->port, r);
VHD_StopStream(stream);
VHD_CloseStreamHandle(stream);
stream = NULL;
have_vhd_audio = 0;
}
VHD_UnlockSlotHandle(slot);
} else if (r != VHDERR_TIMEOUT) {
fprintf(stderr, "[audio:%u] lock error %lu — degrading to silence\n",
ps->port, r);
VHD_StopStream(stream);
VHD_CloseStreamHandle(stream);
stream = NULL;
have_vhd_audio = 0;
}
if (out_bytes == 0) {
memset(buf, 0, tick_bytes);
out_bytes = tick_bytes;
}
if (write_all(fd, buf, out_bytes) < 0) {
/* EPIPE: ffmpeg reader on this port died (session stop/restart).
* Close and break to the outer loop which will reopen and block
* until the next ffmpeg reader connects.
* Do NOT set g_stop other ports must keep running. */
fprintf(stderr, "[audio:%u] EPIPE — waiting for next reader\n", ps->port);
break;
}
next.tv_nsec += frame_ns;
while (next.tv_nsec >= 1000000000L) { next.tv_nsec -= 1000000000L; next.tv_sec += 1; }
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
if (next.tv_sec > now.tv_sec ||
(next.tv_sec == now.tv_sec && next.tv_nsec > now.tv_nsec)) {
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, NULL);
} else {
next = now;
}
}
if (out_bytes == 0) {
memset(buf, 0, tick_bytes);
out_bytes = tick_bytes;
}
if (write_all(fd, buf, out_bytes) < 0) {
atomic_store(&g_stop, 1);
break;
}
next.tv_nsec += frame_ns;
while (next.tv_nsec >= 1000000000L) { next.tv_nsec -= 1000000000L; next.tv_sec += 1; }
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
if (next.tv_sec > now.tv_sec ||
(next.tv_sec == now.tv_sec && next.tv_nsec > now.tv_nsec)) {
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, NULL);
} else {
next = now;
}
close(fd);
}
close(fd);
if (stream) {
VHD_StopStream(stream);
VHD_CloseStreamHandle(stream);
@ -261,27 +288,31 @@ static void *video_thread(void *arg) {
int fd = open(ps->video_fifo, O_WRONLY);
if (fd < 0) {
fprintf(stderr, "[video:%u] open FIFO failed: %s\n", ps->port, strerror(errno));
atomic_store(&g_port_stop[ps->port], 1);
return NULL;
}
HANDLE slot = NULL;
while (!atomic_load(&g_stop)) {
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot);
if (r == VHDERR_NOERROR) {
BYTE *buf = NULL;
ULONG sz = 0;
if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) {
if (write_all(fd, buf, sz) < 0) {
atomic_store(&g_stop, 1);
/* EPIPE on video: the capture sidecar for this port died.
* Stop only this port's threads other ports unaffected. */
fprintf(stderr, "[video:%u] EPIPE — stopping port\n", ps->port);
atomic_store(&g_port_stop[ps->port], 1);
VHD_UnlockSlotHandle(slot);
break;
}
}
VHD_UnlockSlotHandle(slot);
} else if (r != VHDERR_TIMEOUT) {
fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping\n",
fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping port\n",
ps->port, r);
atomic_store(&g_stop, 1);
atomic_store(&g_port_stop[ps->port], 1);
break;
}
}
@ -427,6 +458,9 @@ int main(int argc, char *argv[]) {
memset(ps, 0, sizeof(ps));
int active_count = 0;
/* Initialise per-port stop flags. */
for (int pi = 0; pi < MAX_PORTS; pi++) atomic_store(&g_port_stop[pi], 0);
for (int pi = 0; pi < port_count; pi++) {
if (!locked[pi]) continue;
PortState *p = &ps[active_count];

View file

@ -82,12 +82,40 @@ function _dcBridgeRunning() {
return _dcBridge !== null && _dcBridge.exitCode === null && _dcBridge.signalCode === null;
}
// Check /proc on Linux to see if a deltacast-bridge process is alive.
// Used by startDeltacastBridge() to detect a bridge started outside node-agent
// (e.g. manually with sudo, or from a prior node-agent process).
function _dcBridgeProcessAlive() {
try {
for (const pid of fs.readdirSync('/proc')) {
if (!/^\d+$/.test(pid)) continue;
try {
// cmdline is NUL-delimited; read as binary-friendly string.
const cmdline = fs.readFileSync(`/proc/${pid}/cmdline`, 'latin1');
if (cmdline.includes('deltacast-bridge')) return true;
} catch (_) { /* process may have exited mid-scan */ }
}
} catch (_) {}
return false;
}
function startDeltacastBridge() {
if (_dcBridgeRunning()) return; // already up
if (_dcBridgeRunning()) return; // already up (we spawned it)
try { fs.mkdirSync(DC_PIPE_DIR, { recursive: true }); } catch (_) {}
// FIFOs may exist from a previous run. Only skip the spawn if a
// deltacast-bridge process is actually alive on the host — stale FIFOs with
// no live writer cause ffmpeg to block on open() indefinitely (no audio/video).
const _v0 = DC_PIPE_DIR + '/video-0.fifo';
if (fs.existsSync(_v0)) { console.log('[dc-bridge] FIFOs exist, skipping spawn'); return; }
if (fs.existsSync(_v0)) {
if (_dcBridgeProcessAlive()) {
console.log('[dc-bridge] FIFOs exist and bridge process alive — skipping spawn');
return;
}
console.log('[dc-bridge] FIFOs exist but bridge is NOT running — spawning fresh bridge');
// Stale FIFOs are harmless: the bridge recreates them (mkfifo ignores EEXIST).
}
const args = [
'--device', DC_BOARD,