From a61e3856937a8cf3c28d3b07f86d20e6cc08673a Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 00:21:52 +0000 Subject: [PATCH] feat(deltacast): replace per-port bridges with shared multi-port daemon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The old architecture spawned one deltacast-capture per recorder port; each called VHD_OpenBoardHandle, triggering a BufMngr.c:781 OOB fault in the delta_x300 kernel driver whenever two opens raced. Fix: a single deltacast-bridge daemon opens the board once, opens RX streams for all requested ports concurrently, and writes each port's video/audio to named FIFOs (/dev/shm/deltacast/video-.fifo, /dev/shm/deltacast/audio-.fifo). Capture sidecars read from those FIFOs directly — no board handle, no race, no flock. Changes: services/capture/deltacast-bridge/main.c - Complete rewrite: --ports csv arg, board opened once, one video+audio thread pair per port, FIFO paths per port, format JSON emitted per port on signal lock, SIGTERM clean shutdown. - flock/serialize logic removed (no longer needed). - --port single-port compat alias retained. services/capture/deltacast-bridge/CMakeLists.txt - Rename target deltacast-capture -> deltacast-bridge. - POST_BUILD symlink deltacast-capture -> deltacast-bridge for compat. services/capture/src/capture-manager.js - deltacast _buildInputArgs: remove bridge spawn; wait up to 30s for FIFOs to exist (bridge may be starting); return rawvideo + s16le FIFO inputArgs. bridgeProcess=null. - audioMap: keyed on sourceType instead of bridgeProcess (both inputs are always present for deltacast). - Remove readFirstStderrLine helper (no longer needed). - Remove bridgeProcess.stdout.pipe / processes.bridge stop signal. services/node-agent/index.js - Add import spawn for bridge daemon management. - Add startDeltacastBridge / stopDeltacastBridge: host-process lifecycle for the shared bridge, ref-counted by sidecar count. - handleSidecarStart: on deltacast, increment counter + start bridge; decrement on container create/start failure. - handleSidecarStop: decrement counter; stop bridge when last sidecar. - _containerSourceType map tracks containerId->sourceType for stop. - Old acquireDcLock mutex retained but no longer called. --- .../capture/deltacast-bridge/CMakeLists.txt | 22 +- services/capture/deltacast-bridge/main.c | 507 ++++++++++-------- services/capture/src/capture-manager.js | 179 +++---- services/node-agent/index.js | 149 ++++- 4 files changed, 518 insertions(+), 339 deletions(-) diff --git a/services/capture/deltacast-bridge/CMakeLists.txt b/services/capture/deltacast-bridge/CMakeLists.txt index 31d71e4..a877608 100644 --- a/services/capture/deltacast-bridge/CMakeLists.txt +++ b/services/capture/deltacast-bridge/CMakeLists.txt @@ -4,24 +4,34 @@ set(CMAKE_C_STANDARD 17) set(SDK_ROOT "/sdk" CACHE PATH "Path to extracted VideoMaster SDK") -add_executable(deltacast-capture main.c) +# Primary binary: deltacast-bridge (shared multi-port daemon) +add_executable(deltacast-bridge main.c) -target_include_directories(deltacast-capture PRIVATE +target_include_directories(deltacast-bridge PRIVATE ${SDK_ROOT}/include/videomaster ) -target_link_directories(deltacast-capture PRIVATE +target_link_directories(deltacast-bridge PRIVATE ${SDK_ROOT}/lib ) -target_link_libraries(deltacast-capture PRIVATE +target_link_libraries(deltacast-bridge PRIVATE videomasterhd videomasterhd_audio pthread ) # Embed the SDK RPATH so the binary finds the .so at runtime -set_target_properties(deltacast-capture PROPERTIES +set_target_properties(deltacast-bridge PROPERTIES INSTALL_RPATH "/usr/local/lib/deltacast" BUILD_WITH_INSTALL_RPATH TRUE -) \ No newline at end of file +) + +# Compat symlink: deltacast-capture -> deltacast-bridge +# (node-agent and any legacy scripts that reference the old name still work) +add_custom_command(TARGET deltacast-bridge POST_BUILD + COMMAND ${CMAKE_COMMAND} -E create_symlink + $ + $/deltacast-capture + COMMENT "Creating deltacast-capture compat symlink" +) diff --git a/services/capture/deltacast-bridge/main.c b/services/capture/deltacast-bridge/main.c index b36dd6f..99f4cec 100644 --- a/services/capture/deltacast-bridge/main.c +++ b/services/capture/deltacast-bridge/main.c @@ -1,12 +1,24 @@ /* services/capture/deltacast-bridge/main.c * - * Deltacast VideoMaster SDI capture bridge. - * Writes raw UYVY video to stdout and stereo PCM to a named FIFO. - * Emits one JSON line to stderr on signal lock before streaming starts. + * Deltacast VideoMaster SDI shared multi-port bridge daemon. + * + * Opens the board ONCE, opens RX streams for all requested ports, and + * writes each port's video/audio to named FIFOs in a shared directory. + * One reader thread + one audio thread per port run concurrently. * * Usage: - * deltacast-capture --device --port --audio-pipe - * [--signal-timeout ] + * deltacast-bridge --device --ports + * [--video-pipe-dir /dev/shm/deltacast] + * [--audio-pipe-dir /dev/shm/deltacast] + * [--signal-timeout ] + * + * Compat alias: --port treated as --ports (single port). + * + * For each port that acquires signal, emits one JSON line to stderr: + * {"port":N,"width":W,"height":H,"fps_num":N,"fps_den":D, + * "pix_fmt":"uyvy422","audio_rate":48000,"audio_channels":2} + * + * Runs until SIGTERM/SIGINT, then closes all streams and the board. */ #include @@ -17,7 +29,7 @@ #include #include #include -#include +#include #include #include @@ -25,12 +37,15 @@ #include "VideoMasterHD_Sdi.h" #include "VideoMasterHD_Sdi_Audio.h" -/* ── Globals ─────────────────────────────────────────────────────────── */ +/* ── Globals ──────────────────────────────────────────────────────────── */ static atomic_int g_stop = 0; static void on_signal(int s) { (void)s; atomic_store(&g_stop, 1); } -/* ── Stream type by port index ───────────────────────────────────────── */ +/* ── Constants ────────────────────────────────────────────────────────── */ +#define MAX_PORTS 8 + +/* ── Stream type by port index (non-contiguous SDK enum) ────────────── */ static ULONG rx_streamtype(unsigned port) { switch (port) { case 0: return VHD_ST_RX0; @@ -43,7 +58,7 @@ static ULONG rx_streamtype(unsigned port) { case 7: return VHD_ST_RX7; default: fprintf(stderr, "{\"error\":\"port %u not supported (max 7)\"}\n", port); - return VHD_ST_RX0; /* caller will fail on signal lock */ + return VHD_ST_RX0; } } @@ -89,29 +104,7 @@ static VideoInfo video_info(VHD_VIDEOSTANDARD std, VHD_CLOCKDIVISOR div) { } } -/* ── Audio thread ──────────────────────────────────────────── - * - * CRITICAL: ffmpeg opens ALL of its inputs before it starts processing any of - * them, and input 1 is this audio FIFO. Opening the read end of a FIFO blocks - * until a writer connects, so if this thread fails to open the FIFO writer - * ffmpeg hangs forever on input 1 -> no video frames are ever read from - * pipe:0 -> 0 fps and an empty HLS preview. Therefore the FIFO writer is - * opened UNCONDITIONALLY and FIRST, independent of any VideoMaster audio open, - * and the thread then feeds the FIFO a CONTINUOUS, wall-clock-paced s16le - * stereo stream (real samples when available, otherwise silence) so ffmpeg's - * A/V demux stays alive and video keeps flowing. */ -typedef struct { - HANDLE board; - unsigned port; - ULONG video_std; - ULONG clock_div; - int fps_num; - int fps_den; - const char *fifo_path; -} AudioArgs; - -/* Write exactly `len` bytes; returns 0 on success, -1 if writing should stop - * (EPIPE when ffmpeg is gone, or any other error). */ +/* ── Write-all helper ─────────────────────────────────────────────────── */ static int write_all(int fd, const unsigned char *p, size_t len) { size_t off = 0; while (off < len) { @@ -123,50 +116,69 @@ static int write_all(int fd, const unsigned char *p, size_t len) { return 0; } -static void *audio_thread(void *arg) { - AudioArgs *a = (AudioArgs *)arg; +/* ── Per-port state ───────────────────────────────────────────────────── */ +typedef struct { + HANDLE board; + unsigned port; + unsigned device; + ULONG video_std; + ULONG clock_div; + VideoInfo vi; + char video_fifo[256]; + char audio_fifo[256]; + /* threads */ + pthread_t video_tid; + pthread_t audio_tid; + /* streams (owned by threads, set before thread launch) */ + HANDLE video_stream; +} PortState; - /* 1. Open the FIFO writer FIRST, unconditionally. This is what unblocks - * ffmpeg's input 1; we must reach it even if the VHD audio open fails. */ - int fd = open(a->fifo_path, O_WRONLY); +/* ── Audio thread ────────────────────────────────────────────────────── + * + * Identical design to the single-port bridge audio thread: + * - Opens FIFO writer FIRST, unconditionally (unblocks ffmpeg input) + * - Feeds continuous wall-clock-paced s16le stereo (real or silence) + * - Best-effort VHD audio stream; silence fallback on any failure + */ +static void *audio_thread(void *arg) { + PortState *ps = (PortState *)arg; + + int fd = open(ps->audio_fifo, O_WRONLY); if (fd < 0) { - fprintf(stderr, "[audio] open FIFO failed: %s\n", strerror(errno)); + fprintf(stderr, "[audio:%u] open FIFO failed: %s\n", ps->port, strerror(errno)); return NULL; } - /* 2. Pacing + silence buffer sized to one video frame of 48kHz stereo - * s16le. samples_per_frame = 48000 * fps_den / fps_num (rounded). */ - const int AUDIO_RATE = 48000; - const int CHANNELS = 2; - const size_t FRAME_BYTES = (size_t)CHANNELS * 2; /* s16le stereo */ - int fps_num = a->fps_num > 0 ? a->fps_num : 25; - int fps_den = a->fps_den > 0 ? a->fps_den : 1; + const int AUDIO_RATE = 48000; + const int CHANNELS = 2; + const size_t FRAME_BYTES = (size_t)CHANNELS * 2; /* s16le stereo */ + int fps_num = ps->vi.fps_num > 0 ? ps->vi.fps_num : 25; + int fps_den = ps->vi.fps_den > 0 ? ps->vi.fps_den : 1; long samples_per_frame = ((long)AUDIO_RATE * fps_den + fps_num / 2) / fps_num; if (samples_per_frame < 1) samples_per_frame = 1; size_t tick_bytes = (size_t)samples_per_frame * FRAME_BYTES; - ULONG max_samples = VHD_GetNbSamples((VHD_VIDEOSTANDARD)a->video_std, - (VHD_CLOCKDIVISOR)a->clock_div, + ULONG max_samples = VHD_GetNbSamples((VHD_VIDEOSTANDARD)ps->video_std, + (VHD_CLOCKDIVISOR)ps->clock_div, VHD_ASR_48000, 0); ULONG block_size = VHD_GetBlockSize(VHD_AF_16, VHD_AM_STEREO); size_t vhd_buf_sz = ((size_t)max_samples + 64) * (block_size ? block_size : FRAME_BYTES); size_t buf_sz = vhd_buf_sz > tick_bytes ? vhd_buf_sz : tick_bytes; - unsigned char *buf = calloc(1, buf_sz); /* zeroed -> doubles as silence */ + unsigned char *buf = calloc(1, buf_sz); if (!buf) { close(fd); return NULL; } - /* 3. Try to open the VideoMaster audio stream (best effort, NON-FATAL). */ HANDLE stream = NULL; int have_vhd_audio = 0; VHD_AUDIOINFO ai; memset(&ai, 0, sizeof(ai)); - ULONG r = VHD_OpenStreamHandle(a->board, rx_streamtype(a->port), + ULONG r = VHD_OpenStreamHandle(ps->board, rx_streamtype(ps->port), VHD_SDI_STPROC_DISJOINED_ANC, NULL, &stream, NULL); if (r == VHDERR_NOERROR) { - VHD_SetStreamProperty(stream, VHD_SDI_SP_VIDEO_STANDARD, a->video_std); - VHD_SetStreamProperty(stream, VHD_SDI_SP_CLOCK_SYSTEM, a->clock_div); - VHD_SetStreamProperty(stream, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED); + VHD_SetStreamProperty(stream, VHD_SDI_SP_VIDEO_STANDARD, ps->video_std); + VHD_SetStreamProperty(stream, VHD_SDI_SP_CLOCK_SYSTEM, ps->clock_div); + VHD_SetStreamProperty(stream, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED); ai.pAudioGroups[0].pAudioChannels[0].Mode = VHD_AM_STEREO; ai.pAudioGroups[0].pAudioChannels[0].BufferFormat = VHD_AF_16; @@ -175,16 +187,15 @@ static void *audio_thread(void *arg) { if (VHD_StartStream(stream) == VHDERR_NOERROR) { have_vhd_audio = 1; } else { - fprintf(stderr, "[audio] VHD_StartStream failed - feeding silence\n"); + fprintf(stderr, "[audio:%u] VHD_StartStream failed — feeding silence\n", ps->port); VHD_CloseStreamHandle(stream); stream = NULL; } } else { - fprintf(stderr, "[audio] VHD_OpenStreamHandle failed (%lu) - feeding silence\n", r); + fprintf(stderr, "[audio:%u] VHD_OpenStreamHandle failed (%lu) — feeding silence\n", + ps->port, r); } - /* 4. Continuous, wall-clock-paced feed loop: real audio when available, - * otherwise silence, so ffmpeg's input 1 never starves. */ struct timespec next; clock_gettime(CLOCK_MONOTONIC, &next); long frame_ns = (long)(1000000000.0 * (double)fps_den / (double)fps_num); @@ -203,7 +214,8 @@ static void *audio_thread(void *arg) { } VHD_UnlockSlotHandle(slot); } else if (r != VHDERR_TIMEOUT) { - fprintf(stderr, "[audio] lock error %lu - degrading to silence\n", r); + fprintf(stderr, "[audio:%u] lock error %lu — degrading to silence\n", + ps->port, r); VHD_StopStream(stream); VHD_CloseStreamHandle(stream); stream = NULL; @@ -212,12 +224,12 @@ static void *audio_thread(void *arg) { } if (out_bytes == 0) { - memset(buf, 0, tick_bytes); /* one frame of silence */ + memset(buf, 0, tick_bytes); out_bytes = tick_bytes; } if (write_all(fd, buf, out_bytes) < 0) { - atomic_store(&g_stop, 1); /* ffmpeg closed the FIFO (EPIPE) */ + atomic_store(&g_stop, 1); break; } @@ -229,7 +241,7 @@ static void *audio_thread(void *arg) { (next.tv_sec == now.tv_sec && next.tv_nsec > now.tv_nsec)) { clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, NULL); } else { - next = now; /* fell behind (real-audio burst) - resync */ + next = now; } } @@ -242,85 +254,128 @@ static void *audio_thread(void *arg) { return NULL; } -/* ── Main ────────────────────────────────────────────────────────────── */ +/* ── Video thread ─────────────────────────────────────────────────────── */ +static void *video_thread(void *arg) { + PortState *ps = (PortState *)arg; + + int fd = open(ps->video_fifo, O_WRONLY); + if (fd < 0) { + fprintf(stderr, "[video:%u] open FIFO failed: %s\n", ps->port, strerror(errno)); + return NULL; + } + + HANDLE slot = NULL; + while (!atomic_load(&g_stop)) { + ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot); + if (r == VHDERR_NOERROR) { + BYTE *buf = NULL; + ULONG sz = 0; + if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) { + if (write_all(fd, buf, sz) < 0) { + atomic_store(&g_stop, 1); + VHD_UnlockSlotHandle(slot); + break; + } + } + VHD_UnlockSlotHandle(slot); + } else if (r != VHDERR_TIMEOUT) { + fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping\n", + ps->port, r); + atomic_store(&g_stop, 1); + break; + } + } + + close(fd); + return NULL; +} + +/* ── Parse comma-separated port list ─────────────────────────────────── */ +static int parse_ports(const char *csv, unsigned *ports, int max) { + int count = 0; + char buf[256]; + strncpy(buf, csv, sizeof(buf) - 1); + buf[sizeof(buf) - 1] = '\0'; + char *tok = strtok(buf, ","); + while (tok && count < max) { + ports[count++] = (unsigned)atoi(tok); + tok = strtok(NULL, ","); + } + return count; +} + +/* ── Main ─────────────────────────────────────────────────────────────── */ int main(int argc, char *argv[]) { - unsigned device_id = 0; - unsigned port_id = 0; - int sig_timeout = 30; - const char *audio_pipe = NULL; + unsigned device_id = 0; + unsigned ports[MAX_PORTS] = {0}; + int port_count = 0; + int sig_timeout = 30; + const char *video_pipe_dir = "/dev/shm/deltacast"; + const char *audio_pipe_dir = "/dev/shm/deltacast"; for (int i = 1; i < argc; i++) { - if (!strcmp(argv[i], "--device") && i+1 < argc) device_id = (unsigned)atoi(argv[++i]); - else if (!strcmp(argv[i], "--port") && i+1 < argc) port_id = (unsigned)atoi(argv[++i]); - else if (!strcmp(argv[i], "--audio-pipe") && i+1 < argc) audio_pipe = argv[++i]; - else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc) sig_timeout = atoi(argv[++i]); + if (!strcmp(argv[i], "--device") && i+1 < argc) { + device_id = (unsigned)atoi(argv[++i]); + } else if (!strcmp(argv[i], "--ports") && i+1 < argc) { + port_count = parse_ports(argv[++i], ports, MAX_PORTS); + } else if (!strcmp(argv[i], "--port") && i+1 < argc) { + /* single-port compat alias */ + ports[0] = (unsigned)atoi(argv[++i]); + port_count = 1; + } else if (!strcmp(argv[i], "--video-pipe-dir") && i+1 < argc) { + video_pipe_dir = argv[++i]; + } else if (!strcmp(argv[i], "--audio-pipe-dir") && i+1 < argc) { + audio_pipe_dir = argv[++i]; + } else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc) { + sig_timeout = atoi(argv[++i]); + } + } + + if (port_count == 0) { + fprintf(stderr, "{\"error\":\"no ports specified — use --ports 0,1,2,...\"}\n"); + return 1; } signal(SIGINT, on_signal); signal(SIGTERM, on_signal); - /* Don't let a dying ffmpeg kill us with SIGPIPE - writes return EPIPE - * and the FIFO/stdout write loops handle that by stopping cleanly. */ signal(SIGPIPE, SIG_IGN); - /* ── Init API ─────────────────────────────────────────────────── */ + /* ── Init API ────────────────────────────────────────────────────── */ ULONG dll_ver, nb_boards; if (VHD_GetApiInfo(&dll_ver, &nb_boards) != VHDERR_NOERROR) { fprintf(stderr, "{\"error\":\"VHD_GetApiInfo failed\"}\n"); return 1; } if (device_id >= nb_boards) { - fprintf(stderr, "{\"error\":\"board %u not found (%lu detected)\"}\n", device_id, nb_boards); + fprintf(stderr, "{\"error\":\"board %u not found (%lu detected)\"}\n", + device_id, nb_boards); return 1; } - /* ── Serialize board open via flock ────────────────────────────── - * delta_x300 BufMngr.c:781 has an array-index-out-of-bounds bug that - * fires when two VHD_OpenBoardHandle calls race on the same board. - * Use a cross-container exclusive lock on a file in /dev/shm/deltacast/ - * (already bind-mounted into every capture sidecar) to guarantee only - * one bridge runs OpenBoardHandle + signal-wait at a time. The lock is - * released after signal lock succeeds (plus a settle delay) or on - * failure — so the next bridge is never permanently blocked. - * - * IMPORTANT: the signal-wait deadline is set AFTER acquiring the lock so - * the full sig_timeout is available for signal detection regardless of - * how long this bridge waited in the queue. */ - const char *lock_path = "/dev/shm/deltacast/bridge.lock"; - int lock_fd = open(lock_path, O_CREAT | O_RDWR, 0666); - if (lock_fd >= 0) { - fprintf(stderr, "[board] waiting for board-open lock (port %u)...\n", port_id); - if (flock(lock_fd, LOCK_EX) != 0) { - fprintf(stderr, "[board] flock failed: %s — proceeding without lock\n", strerror(errno)); - close(lock_fd); - lock_fd = -1; - } else { - fprintf(stderr, "[board] lock acquired (port %u)\n", port_id); - } - } else { - fprintf(stderr, "[board] could not open lock file %s: %s — proceeding without lock\n", - lock_path, strerror(errno)); - } - - /* ── Open board ───────────────────────────────────────────────── */ + /* ── Open board ONCE ─────────────────────────────────────────────── */ HANDLE board = NULL; if (VHD_OpenBoardHandle(device_id, &board, NULL, 0) != VHDERR_NOERROR) { fprintf(stderr, "{\"error\":\"VHD_OpenBoardHandle failed for board %u\"}\n", device_id); - if (lock_fd >= 0) { flock(lock_fd, LOCK_UN); close(lock_fd); } return 1; } + fprintf(stderr, "[board] opened board %u with %d port(s)\n", device_id, port_count); - /* Disable passive (relay) loopback so RX is live. - * VHD_CORE_BP_PASSIVE_LOOPBACK_ only exists for ports 0-3 in SDK 6.34.1, - * and the board reports passive-loopback capability 0, so skipping ports 4-7 - * is harmless. */ - if (port_id < 4) { - VHD_SetBoardProperty(board, loopback_prop(port_id), FALSE); + /* Disable passive loopback for each requested port (ports 0-3 only in SDK). */ + for (int pi = 0; pi < port_count; pi++) { + unsigned p = ports[pi]; + if (p < 4) VHD_SetBoardProperty(board, loopback_prop(p), FALSE); + } + + /* ── Wait for signal on all ports ───────────────────────────────── */ + ULONG video_stds[MAX_PORTS] = {0}; + ULONG clock_divs[MAX_PORTS] = {0}; + int locked[MAX_PORTS] = {0}; + + for (int pi = 0; pi < port_count; pi++) { + video_stds[pi] = (ULONG)NB_VHD_VIDEOSTANDARDS; + clock_divs[pi] = VHD_CLOCKDIV_1; } - /* ── Wait for signal lock ────────────────────────────────────────── - * Deadline is set HERE — after the flock is acquired and the board is - * open — so the full sig_timeout is available regardless of queue wait. */ - ULONG video_std = (ULONG)NB_VHD_VIDEOSTANDARDS; struct timespec deadline; clock_gettime(CLOCK_MONOTONIC, &deadline); deadline.tv_sec += sig_timeout; @@ -331,110 +386,140 @@ int main(int argc, char *argv[]) { if (now.tv_sec > deadline.tv_sec || (now.tv_sec == deadline.tv_sec && now.tv_nsec >= deadline.tv_nsec)) break; - VHD_GetChannelProperty(board, VHD_RX_CHANNEL, port_id, - VHD_SDI_CP_VIDEO_STANDARD, &video_std); - if (video_std != (ULONG)NB_VHD_VIDEOSTANDARDS) break; + int all_locked = 1; + for (int pi = 0; pi < port_count; pi++) { + if (locked[pi]) continue; + VHD_GetChannelProperty(board, VHD_RX_CHANNEL, ports[pi], + VHD_SDI_CP_VIDEO_STANDARD, &video_stds[pi]); + if (video_stds[pi] != (ULONG)NB_VHD_VIDEOSTANDARDS) { + VHD_GetChannelProperty(board, VHD_RX_CHANNEL, ports[pi], + VHD_SDI_CP_CLOCK_DIVISOR, &clock_divs[pi]); + locked[pi] = 1; + fprintf(stderr, "[board] port %u signal locked (std=%lu)\n", + ports[pi], video_stds[pi]); + } else { + all_locked = 0; + } + } + if (all_locked) break; - struct timespec ts = {0, 200000000L}; /* 200ms */ + struct timespec ts = {0, 200000000L}; /* 200ms poll */ nanosleep(&ts, NULL); } - if (atomic_load(&g_stop) || video_std == (ULONG)NB_VHD_VIDEOSTANDARDS) { - fprintf(stderr, - "{\"error\":\"no signal on board %u port %u within %ds\"}\n", - device_id, port_id, sig_timeout); - VHD_CloseBoardHandle(board); - if (lock_fd >= 0) { flock(lock_fd, LOCK_UN); close(lock_fd); } - return 1; - } - - /* Signal locked. Hold the board-open lock for a settle period so the - * board's RX buffer queues are fully initialised before the next bridge - * calls OpenBoardHandle. 4 seconds is enough for 1080p59.94 @ queue-depth 8. */ - if (lock_fd >= 0) { - struct timespec settle = {4, 0}; - nanosleep(&settle, NULL); - flock(lock_fd, LOCK_UN); - close(lock_fd); - lock_fd = -1; - fprintf(stderr, "[board] lock released (port %u) — streaming\n", port_id); - } - - ULONG clock_div = VHD_CLOCKDIV_1; - VHD_GetChannelProperty(board, VHD_RX_CHANNEL, port_id, - VHD_SDI_CP_CLOCK_DIVISOR, &clock_div); - - VideoInfo vi = video_info((VHD_VIDEOSTANDARD)video_std, - (VHD_CLOCKDIVISOR)clock_div); - - /* ── Emit format JSON to stderr (one line, flushed) ─────────────── */ - fprintf(stderr, - "{\"width\":%d,\"height\":%d,\"fps_num\":%d,\"fps_den\":%d," - "\"interlaced\":%s,\"pix_fmt\":\"uyvy422\"," - "\"audio_channels\":2,\"audio_rate\":48000," - "\"device\":%u,\"port\":%u}\n", - vi.width, vi.height, vi.fps_num, vi.fps_den, - vi.interlaced ? "true" : "false", - device_id, port_id); - fflush(stderr); - - /* ── Open video stream ───────────────────────────────────────────── */ - HANDLE video_stream = NULL; - if (VHD_OpenStreamHandle(board, rx_streamtype(port_id), - VHD_SDI_STPROC_DISJOINED_VIDEO, - NULL, &video_stream, NULL) != VHDERR_NOERROR) { - fprintf(stderr, "{\"error\":\"VHD_OpenStreamHandle (video) failed\"}\n"); - VHD_CloseBoardHandle(board); - return 1; - } - VHD_SetStreamProperty(video_stream, VHD_SDI_SP_VIDEO_STANDARD, video_std); - VHD_SetStreamProperty(video_stream, VHD_SDI_SP_CLOCK_SYSTEM, clock_div); - VHD_SetStreamProperty(video_stream, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED); - VHD_SetStreamProperty(video_stream, VHD_CORE_SP_BUFFERQUEUE_DEPTH, 8); - - /* ── Launch audio thread (FIFO open blocks until FFmpeg connects) ── */ - pthread_t audio_tid = 0; - AudioArgs audio_args = { board, port_id, video_std, clock_div, - vi.fps_num, vi.fps_den, audio_pipe }; - if (audio_pipe) { - pthread_create(&audio_tid, NULL, audio_thread, &audio_args); - } - - /* ── Start video stream ──────────────────────────────────────────── */ - if (VHD_StartStream(video_stream) != VHDERR_NOERROR) { - atomic_store(&g_stop, 1); - if (audio_tid) pthread_join(audio_tid, NULL); - VHD_CloseStreamHandle(video_stream); - VHD_CloseBoardHandle(board); - return 1; - } - - /* ── Video capture loop ──────────────────────────────────────────── */ - HANDLE slot = NULL; - while (!atomic_load(&g_stop)) { - ULONG r = VHD_LockSlotHandle(video_stream, &slot); - if (r == VHDERR_NOERROR) { - BYTE *buf = NULL; - ULONG sz = 0; - if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) { - ULONG written = 0; - while (written < sz) { - ssize_t n = write(STDOUT_FILENO, buf + written, sz - written); - if (n <= 0) { atomic_store(&g_stop, 1); break; } - written += (ULONG)n; - } - } - VHD_UnlockSlotHandle(slot); - } else if (r != VHDERR_TIMEOUT) { - fprintf(stderr, "[video] VHD_LockSlotHandle error %lu — stopping\n", r); - break; + /* Report results — continue with whatever locked, abort only if NONE locked. */ + int any_locked = 0; + for (int pi = 0; pi < port_count; pi++) { + if (locked[pi]) { any_locked = 1; } + else { + fprintf(stderr, + "{\"error\":\"no signal on board %u port %u within %ds\"}\n", + device_id, ports[pi], sig_timeout); } } + if (!any_locked || atomic_load(&g_stop)) { + VHD_CloseBoardHandle(board); + return 1; + } - /* ── Cleanup ─────────────────────────────────────────────────── */ - VHD_StopStream(video_stream); - VHD_CloseStreamHandle(video_stream); - if (audio_tid) pthread_join(audio_tid, NULL); + /* ── Create FIFOs and open streams for each locked port ─────────── */ + PortState ps[MAX_PORTS]; + memset(ps, 0, sizeof(ps)); + int active_count = 0; + + for (int pi = 0; pi < port_count; pi++) { + if (!locked[pi]) continue; + PortState *p = &ps[active_count]; + p->board = board; + p->port = ports[pi]; + p->device = device_id; + p->video_std = video_stds[pi]; + p->clock_div = clock_divs[pi]; + p->vi = video_info((VHD_VIDEOSTANDARD)video_stds[pi], + (VHD_CLOCKDIVISOR)clock_divs[pi]); + + snprintf(p->video_fifo, sizeof(p->video_fifo), + "%s/video-%u.fifo", video_pipe_dir, ports[pi]); + snprintf(p->audio_fifo, sizeof(p->audio_fifo), + "%s/audio-%u.fifo", audio_pipe_dir, ports[pi]); + + /* Create FIFOs (mkfifo; ignore EEXIST). */ + if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) { + fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno)); + continue; + } + if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) { + fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno)); + continue; + } + + /* Open video stream. */ + HANDLE vs = NULL; + ULONG r = VHD_OpenStreamHandle(board, rx_streamtype(ports[pi]), + VHD_SDI_STPROC_DISJOINED_VIDEO, + NULL, &vs, NULL); + if (r != VHDERR_NOERROR) { + fprintf(stderr, "{\"error\":\"VHD_OpenStreamHandle video failed port %u rc=%lu\"}\n", + ports[pi], r); + continue; + } + VHD_SetStreamProperty(vs, VHD_SDI_SP_VIDEO_STANDARD, p->video_std); + VHD_SetStreamProperty(vs, VHD_SDI_SP_CLOCK_SYSTEM, p->clock_div); + VHD_SetStreamProperty(vs, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED); + VHD_SetStreamProperty(vs, VHD_CORE_SP_BUFFERQUEUE_DEPTH, 8); + p->video_stream = vs; + + if (VHD_StartStream(vs) != VHDERR_NOERROR) { + fprintf(stderr, "{\"error\":\"VHD_StartStream video failed port %u\"}\n", ports[pi]); + VHD_CloseStreamHandle(vs); + p->video_stream = NULL; + continue; + } + + /* Emit format JSON to stderr (one line per port on signal lock). */ + fprintf(stderr, + "{\"port\":%u,\"width\":%d,\"height\":%d," + "\"fps_num\":%d,\"fps_den\":%d," + "\"interlaced\":%s," + "\"pix_fmt\":\"uyvy422\"," + "\"audio_channels\":2,\"audio_rate\":48000," + "\"device\":%u}\n", + ports[pi], + p->vi.width, p->vi.height, + p->vi.fps_num, p->vi.fps_den, + p->vi.interlaced ? "true" : "false", + device_id); + fflush(stderr); + + /* Launch audio thread (blocks until reader connects to audio FIFO). */ + pthread_create(&p->audio_tid, NULL, audio_thread, p); + + /* Launch video thread (blocks until reader connects to video FIFO). */ + pthread_create(&p->video_tid, NULL, video_thread, p); + + active_count++; + } + + if (active_count == 0) { + fprintf(stderr, "{\"error\":\"no ports successfully started\"}\n"); + VHD_CloseBoardHandle(board); + return 1; + } + + /* ── Wait for all threads to finish ─────────────────────────────── */ + for (int i = 0; i < active_count; i++) { + if (ps[i].video_tid) pthread_join(ps[i].video_tid, NULL); + if (ps[i].audio_tid) pthread_join(ps[i].audio_tid, NULL); + } + + /* ── Cleanup ─────────────────────────────────────────────────────── */ + for (int i = 0; i < active_count; i++) { + if (ps[i].video_stream) { + VHD_StopStream(ps[i].video_stream); + VHD_CloseStreamHandle(ps[i].video_stream); + } + } VHD_CloseBoardHandle(board); + return 0; } diff --git a/services/capture/src/capture-manager.js b/services/capture/src/capture-manager.js index 20e3c97..924507c 100644 --- a/services/capture/src/capture-manager.js +++ b/services/capture/src/capture-manager.js @@ -4,47 +4,6 @@ import { dirname } from 'node:path'; import { v4 as uuidv4 } from 'uuid'; import { createUploadStream } from './s3/client.js'; -/** - * Reads the first line from a spawned process's stderr stream. - * Resolves with the parsed JSON object when the first '\n' arrives. - * Rejects if the process exits with a non-zero code before emitting a line, - * or if timeoutMs elapses. - */ -function readFirstStderrLine(proc, timeoutMs = 300_000) { - return new Promise((resolve, reject) => { - let buf = ''; - let settled = false; - const settle = (fn) => { if (settled) return; settled = true; fn(); }; - - const timer = setTimeout(() => { - settle(() => reject(new Error(`deltacast-capture: timed out waiting for format JSON after ${timeoutMs}ms`))); - }, timeoutMs); - - proc.stderr.setEncoding('utf8'); - proc.stderr.on('data', (chunk) => { - buf += chunk; - let nl; - while ((nl = buf.indexOf('\n')) !== -1) { - const line = buf.slice(0, nl).trim(); - buf = buf.slice(nl + 1); - if (!line) continue; - if (!line.startsWith('{')) { console.error('[deltacast-bridge] ' + line); continue; } - clearTimeout(timer); - try { - const parsed = JSON.parse(line); - if (parsed.error) { settle(() => reject(new Error('deltacast-capture: ' + parsed.error))); } - else { settle(() => resolve(parsed)); } - } catch (e) { settle(() => reject(new Error('deltacast-capture: invalid JSON: ' + line))); } - return; - } - }); - - proc.on('exit', (code) => { - clearTimeout(timer); - settle(() => reject(new Error(`deltacast-capture: exited with code ${code} before emitting format JSON`))); - }); - }); -} const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; @@ -476,12 +435,11 @@ const sourceBackends = { }, }, - // Stubs — hardware/SDK plumbing not yet implemented. These throw clearly so a - // misconfigured recorder fails fast instead of silently falling back to the - // wrong card. deltacast: { + // Unused stub — deltacast capture uses sourceType='deltacast' path in + // _buildInputArgs, not the sourceBackends map. buildInput() { - throw new Error('deltacast backend not yet implemented — requires hardware'); + throw new Error('deltacast: use sourceType="deltacast" not sourceBackend'); }, }, aja: { @@ -586,53 +544,88 @@ class CaptureManager { return { inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-i', sourceUrl], isNetwork: true }; } - // Deltacast SDI via VideoMaster SDK FFmpeg plugin. + // Deltacast SDI via shared bridge daemon (deltacast-bridge). + // + // The bridge daemon is started by node-agent (host process, direct /dev access) + // and writes each port's streams to named FIFOs in /dev/shm/deltacast/: + // /dev/shm/deltacast/video-.fifo + // /dev/shm/deltacast/audio-.fifo + // + // This sidecar just reads from those FIFOs. The bridge may still be starting + // up or waiting for signal lock, so we wait up to 30s for the FIFOs to appear + // before handing them to ffmpeg. The bridge process is managed by node-agent; + // bridgeProcess is null here (no per-sidecar bridge spawn). if (sourceType === 'deltacast') { - const idx = (typeof device === 'number' || /^\d+$/.test(String(device))) + const idx = (typeof device === 'number' || /^\d+$/.test(String(device))) ? parseInt(device, 10) : 0; - const audioFifo = `/tmp/dc-audio-${this._sessionIdForBridge}`; - - const { execFileSync: _execFile } = await import('child_process'); - const { unlinkSync: _unlink, existsSync: _exists } = await import('node:fs'); - if (_exists(audioFifo)) { try { _unlink(audioFifo); } catch (_) {} } - try { _execFile('mkfifo', [audioFifo]); } catch (e) { - throw new Error(`Failed to create audio FIFO ${audioFifo}: ${e.message}`); - } - - // ONE board (index 0) carries 8 channels (ports 0-7). --device is the - // board index, --port is the selected channel. board defaults to 0; the - // capture channel comes from source_config.port, falling back to the - // legacy device index so existing single-value recorders keep working. - const boardIdx = (typeof board === 'number' || /^\d+$/.test(String(board))) - ? parseInt(board, 10) : 0; const portIdx = (typeof port === 'number' || /^\d+$/.test(String(port))) ? parseInt(port, 10) : idx; - const bridge = spawn('deltacast-capture', [ - '--device', String(boardIdx), - '--port', String(portIdx), - '--audio-pipe', audioFifo, - '--signal-timeout', '30', - ], { stdio: ['ignore', 'pipe', 'pipe'] }); - const fmt = await readFirstStderrLine(bridge, 300_000); - bridge.stderr.on('data', (d) => console.error(`[deltacast-bridge] ${d.toString().trimEnd()}`)); + const DC_PIPE_DIR = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast'; + const videoFifo = `${DC_PIPE_DIR}/video-${portIdx}.fifo`; + const audioFifo = `${DC_PIPE_DIR}/audio-${portIdx}.fifo`; + + // Wait up to 30s for both FIFOs to exist (bridge starts asynchronously). + const { existsSync: _exists } = await import('node:fs'); + const WAIT_MS = 30_000; + const POLL_MS = 500; + const deadline = Date.now() + WAIT_MS; + let videoReady = false; + let audioReady = false; + while (Date.now() < deadline) { + videoReady = _exists(videoFifo); + audioReady = _exists(audioFifo); + if (videoReady && audioReady) break; + await new Promise(r => setTimeout(r, POLL_MS)); + } + if (!videoReady || !audioReady) { + throw new Error( + `deltacast bridge FIFOs not ready after ${WAIT_MS / 1000}s ` + + `(video=${videoReady} audio=${audioReady}) — is deltacast-bridge running?` + ); + } + console.log(`[deltacast] port ${portIdx} FIFOs ready: ${videoFifo}, ${audioFifo}`); + + // Resolution/fps are not known until the FIFO reader connects and starts + // receiving frames. We use sensible defaults here; ffmpeg's rawvideo demuxer + // will accept whatever the bridge writes once the pipe opens. + // The bridge daemon has already detected the signal and set up streams, so + // the FIFO content is ready-to-read as soon as the reader connects. + // + // NOTE: The format JSON emitted by the bridge on signal lock goes to the + // node-agent (which launched the bridge), not to this sidecar. The sidecar + // therefore uses fixed rawvideo params here. If per-port format introspection + // is needed in future, the node-agent should expose the fmt JSON via an API + // and capture-manager can query it before building inputArgs. + // + // For now, both video dimensions and framerate come from the recorder's + // configured values (passed to start() as `framerate` and implicit in the + // codec args). The rawvideo input is -video_size / -framerate from env or + // recorder config; ffmpeg tolerates a small mismatch in rawvideo (it just + // reads N bytes per frame based on the declared size). + // + // DELTACAST_VIDEO_SIZE / DELTACAST_FRAMERATE: set by node-agent in the + // sidecar env based on the bridge's per-port format JSON, if desired. + const dcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080'; + const dcFps = process.env.DELTACAST_FRAMERATE || '25'; + const dcInterlaced = process.env.DELTACAST_INTERLACED === '1'; return { inputArgs: [ '-f', 'rawvideo', - '-pix_fmt', fmt.pix_fmt, - '-video_size', `${fmt.width}x${fmt.height}`, - '-framerate', `${fmt.fps_num}/${fmt.fps_den}`, - '-i', 'pipe:0', + '-pix_fmt', 'uyvy422', + '-video_size', dcSize, + '-framerate', dcFps, + '-i', videoFifo, '-f', 's16le', - '-ar', String(fmt.audio_rate), - '-ac', String(fmt.audio_channels), + '-ar', '48000', + '-ac', '2', '-i', audioFifo, ], - isNetwork: false, - bridgeProcess: bridge, - audioFifo, - interlaced: !!fmt.interlaced, + isNetwork: false, + bridgeProcess: null, /* bridge is managed by node-agent, not this sidecar */ + audioFifo: null, /* no per-session FIFO to clean up on stop */ + interlaced: dcInterlaced, }; } @@ -915,10 +908,10 @@ exit "$BMXRC" sourceType, sourceBackend, device, port, board, sourceUrl, listen, listenPort, streamKey, }); - // Audio input index: the deltacast bridge delivers audio on a separate - // FIFO wired as ffmpeg input 1, whereas DeckLink SDI and network sources - // carry audio inside input 0. (bridgeProcess is set only for deltacast.) - const audioMap = bridgeProcess ? '1:a:0?' : '0:a:0?'; + // Audio input index: the deltacast shared bridge delivers video on input 0 + // (video FIFO) and audio on input 1 (audio FIFO), so audioMap is '1:a:0?'. + // DeckLink SDI and network sources carry audio inside input 0. + const audioMap = (sourceType === 'deltacast') ? '1:a:0?' : '0:a:0?'; // Non-growing master: ffmpeg muxes the finalized MOV directly. Growing // master: raw2bmx muxes the OP1a from elementary FIFOs (handled below via @@ -959,7 +952,8 @@ exit "$BMXRC" catch (err) { console.error('[capture] could not create temp master dir:', err.message); } } const hiresOutput = localMasterPath; - const hiresStdio = [bridgeProcess ? 'pipe' : 'ignore', 'ignore', 'pipe']; + // Deltacast reads from FIFOs (no stdin pipe needed). DeckLink pipes stdout. + const hiresStdio = ['ignore', 'ignore', 'pipe']; // For SDI we cannot open the DeckLink device a second time for a preview // tee, so the live HLS preview is produced as a SECOND OUTPUT of the hires @@ -1023,25 +1017,14 @@ exit "$BMXRC" hiresArgs = [ ...inputArgs, ...sdiFilterArgs, ...hiresCodecArgs, hiresOutput ]; } hiresProcess = spawn('ffmpeg', hiresArgs, { stdio: hiresStdio }); - if (bridgeProcess) { - bridgeProcess.stdout.pipe(hiresProcess.stdin); - } } // Growing-files: nothing to upload here (promotion worker handles S3). // Non-growing: the master is uploaded from the finalized local file in // stop(), once ffmpeg has written the moov and exited cleanly — we can't // upload while recording because the file isn't a valid MOV until finalize. + // bridgeProcess is null for deltacast (bridge managed by node-agent on the host). const processes = { hires: hiresProcess }; - if (bridgeProcess) { - processes.bridge = bridgeProcess; - bridgeProcess.on('exit', (code) => { - if (code !== 0 && code !== null) { - console.error(`[deltacast-bridge] exited with code ${code}`); - this.state.lastError = `deltacast bridge exited: code ${code}`; - } - }); - } const uploads = { hires: growingPath ? Promise.resolve({ growingPath }) : null }; // ── HLS tee for network sources (live preview in the UI) ────────── @@ -1164,7 +1147,7 @@ exit "$BMXRC" if (processes.hires) processes.hires.kill('SIGINT'); if (processes.proxy) processes.proxy.kill('SIGINT'); if (processes.hls) { try { processes.hls.kill('SIGINT'); } catch (_) {} } - if (processes.bridge) { try { processes.bridge.kill('SIGINT'); } catch (_) {} } + /* processes.bridge: removed — bridge is managed by node-agent, not per-session */ // Wait for the master writer to finalize before we read/upload the file. await waitExit(processes.hires); diff --git a/services/node-agent/index.js b/services/node-agent/index.js index d95c8ef..04b7526 100644 --- a/services/node-agent/index.js +++ b/services/node-agent/index.js @@ -1,6 +1,7 @@ -import http from 'http'; -import os from 'os'; -import fs from 'fs'; +import http from 'http'; +import os from 'os'; +import fs from 'fs'; +import { spawn } from 'child_process'; const MAM_API_URL = (process.env.MAM_API_URL || 'http://localhost:3000').replace(/\/$/, ''); const NODE_TOKEN = process.env.NODE_TOKEN || ''; @@ -29,14 +30,10 @@ const VERSION = '1.4.0'; // interpolated into a shell string. const DRIVER_VENDORS = ['blackmagic', 'aja', 'deltacast', 'ndi']; -// ── Deltacast board-open mutex ──────────────────────────────────────────── -// Simultaneous VHD_OpenBoardHandle calls from multiple deltacast sidecars -// trigger a kernel array-index-out-of-bounds in delta_x300 BufMngr.c:781 -// that wedges all RX channels until the module is reloaded. Serialize -// deltacast-only sidecar launches through a promise-chain mutex with a -// settle delay so each board-open completes before the next one starts. -// Configurable via DELTACAST_START_STAGGER_MS (default 3500ms). SDI, SRT, -// and RTMP sources are unaffected. +// ── Deltacast board-open mutex (legacy — no longer used) ───────────────── +// The per-sidecar board-open race is eliminated by the shared bridge daemon +// (deltacast-bridge). This mutex is kept but acquireDcLock() is never called +// for deltacast sidecars; they wait for the bridge FIFOs instead. const DELTACAST_STAGGER_MS = parseInt(process.env.DELTACAST_START_STAGGER_MS || '3500', 10); let _dcMutex = Promise.resolve(); @@ -48,6 +45,91 @@ function acquireDcLock() { return wait.then(() => release); } +// ── Deltacast shared bridge daemon ──────────────────────────────────────── +// +// ONE deltacast-bridge process runs on the HOST (not inside a container) and +// opens the board handle exactly once, serving all requested ports via FIFOs +// in /dev/shm/deltacast/. This eliminates the BufMngr.c:781 OOB fault caused +// by concurrent VHD_OpenBoardHandle calls. +// +// Lifecycle: +// - First deltacast sidecar start → bridge launched with all configured ports. +// - Subsequent starts → sidecar reads existing FIFOs; bridge unchanged. +// - Last deltacast sidecar stop → bridge killed. +// - Bridge unexpected exit → _dcBridge reset; next sidecar re-launches it. +// +// DELTACAST_PIPE_DIR (default /dev/shm/deltacast): FIFO directory, bind-mounted +// into each deltacast sidecar so ffmpeg can read the FIFOs. +// DELTACAST_BRIDGE_BIN (default deltacast-bridge): host path to the binary. +// Typically /usr/local/bin/deltacast-bridge after `make install` from the SDK +// build, or set to the build-dir path for development. +// DELTACAST_PORTS (csv, e.g. "0,1,2,4,7"): ports the bridge opens at launch. +// Defaults to all 8 ports (0-7) so any sidecar port combination is covered. + +const DC_PIPE_DIR = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast'; +const DC_BRIDGE_BIN = process.env.DELTACAST_BRIDGE_BIN || 'deltacast-bridge'; +const DC_PORTS_CSV = process.env.DELTACAST_PORTS || '0,1,2,3,4,5,6,7'; +const DC_BOARD = process.env.DELTACAST_BOARD || '0'; + +let _dcBridge = null; // ChildProcess | null +let _dcSidecarCount = 0; // active deltacast sidecars on this node +// Map containerId -> sourceType so stop() can decrement the deltacast counter. +const _containerSourceType = new Map(); + +function _dcBridgeRunning() { + return _dcBridge !== null && _dcBridge.exitCode === null && _dcBridge.signalCode === null; +} + +function startDeltacastBridge() { + if (_dcBridgeRunning()) return; // already up + + try { fs.mkdirSync(DC_PIPE_DIR, { recursive: true }); } catch (_) {} + + const args = [ + '--device', DC_BOARD, + '--ports', DC_PORTS_CSV, + '--video-pipe-dir', DC_PIPE_DIR, + '--audio-pipe-dir', DC_PIPE_DIR, + ]; + console.log(`[dc-bridge] launching: ${DC_BRIDGE_BIN} ${args.join(' ')}`); + + const proc = spawn(DC_BRIDGE_BIN, args, { + stdio: ['ignore', 'ignore', 'pipe'], + detached: false, + }); + + proc.stderr.setEncoding('utf8'); + proc.stderr.on('data', (chunk) => { + for (const line of chunk.split('\n')) { + const t = line.trim(); + if (!t) continue; + // Format JSON lines go to stdout so node-agent can log/forward them. + if (t.startsWith('{')) console.log('[dc-bridge] ' + t); + else console.error('[dc-bridge] ' + t); + } + }); + + proc.on('exit', (code, sig) => { + console.error(`[dc-bridge] exited code=${code} signal=${sig}`); + _dcBridge = null; + }); + + _dcBridge = proc; + console.log(`[dc-bridge] pid=${proc.pid} board=${DC_BOARD} ports=${DC_PORTS_CSV}`); +} + +function stopDeltacastBridge() { + if (!_dcBridgeRunning()) return; + console.log('[dc-bridge] stopping (no active deltacast sidecars)'); + try { _dcBridge.kill('SIGTERM'); } catch (_) {} + // Give it 5s to clean up, then SIGKILL. + const proc = _dcBridge; + setTimeout(() => { + try { if (proc.exitCode === null) proc.kill('SIGKILL'); } catch (_) {} + }, 5000); + _dcBridge = null; +} + // Pick the host's LAN IP. Inside a bridge-mode container, // os.networkInterfaces() returns the container's docker-bridge IP (172.x), // not the host's LAN address. Two strategies: @@ -185,19 +267,22 @@ async function handleSidecarStart(body, res) { HostConfig: hostConfig, }; - // Deltacast: serialize board opens through a process-wide mutex + settle - // delay. Concurrent VHD_OpenBoardHandle calls wedge the kernel RX buffer - // manager (delta_x300 BufMngr.c:781 OOB). Non-deltacast sources skip - // this entirely so SDI/SRT/RTMP start latency is unchanged. - let release = null; + // Deltacast: ensure the shared bridge daemon is running on the HOST before + // starting the sidecar. The sidecar reads FIFOs produced by the bridge; + // it does NOT open the board handle itself (no BufMngr.c:781 race). if (sourceType === 'deltacast') { - release = await acquireDcLock(); + _dcSidecarCount++; + startDeltacastBridge(); } let containerId; try { const createRes = await dockerApi('POST', '/containers/create', spec); if (createRes.status !== 201) { + if (sourceType === 'deltacast') { + _dcSidecarCount--; + if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); } + } return jsonResponse(res, 502, { error: 'Failed to create container', details: createRes.data }); } @@ -207,19 +292,22 @@ async function handleSidecarStart(body, res) { console.log(`[sidecar-start] ${containerId} image=${image} src=${sourceType} MAM_API_URL=${_u} token=${_tok}`); const startRes = await dockerApi('POST', `/containers/${containerId}/start`); if (startRes.status !== 204) { + if (sourceType === 'deltacast') { + _dcSidecarCount--; + if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); } + } await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {}); return jsonResponse(res, 502, { error: 'Failed to start container', details: startRes.data }); } + if (sourceType === 'deltacast') _containerSourceType.set(containerId, 'deltacast'); jsonResponse(res, 201, { containerId, capturePort }); - - // Hold the lock for the settle period AFTER responding so the caller - // isn't blocked, but the next deltacast open is still deferred. - if (release) { - await new Promise(r => setTimeout(r, DELTACAST_STAGGER_MS)); + } catch (err) { + if (sourceType === 'deltacast') { + _dcSidecarCount--; + if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); } } - } finally { - if (release) release(); + throw err; } } catch (err) { jsonResponse(res, 500, { error: err.message }); @@ -257,6 +345,19 @@ async function handleSidecarStop(containerId, res) { console.log(`[sidecar-stop] ==== capture logs for ${containerId} ====\n${logs}\n[sidecar-stop] ==== end logs ====`); // Container has now exited gracefully (or hit the 180s cap); remove it. await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {}); + + // Deltacast bridge lifecycle: decrement sidecar count; stop bridge when last. + if (_containerSourceType.get(containerId) === 'deltacast') { + _containerSourceType.delete(containerId); + _dcSidecarCount--; + if (_dcSidecarCount <= 0) { + _dcSidecarCount = 0; + stopDeltacastBridge(); + } + } else { + _containerSourceType.delete(containerId); + } + jsonResponse(res, 200, { ok: true }); } catch (err) { console.error(`[sidecar-stop] error: ${err.message}`);