Merge: deltacast multi-port bridge + UI fixes

This commit is contained in:
Zac Gaetano 2026-06-02 00:27:26 +00:00
commit f2f3a88308
5 changed files with 585 additions and 339 deletions

View file

@ -4,24 +4,34 @@ set(CMAKE_C_STANDARD 17)
set(SDK_ROOT "/sdk" CACHE PATH "Path to extracted VideoMaster SDK")
add_executable(deltacast-capture main.c)
# Primary binary: deltacast-bridge (shared multi-port daemon)
add_executable(deltacast-bridge main.c)
target_include_directories(deltacast-capture PRIVATE
target_include_directories(deltacast-bridge PRIVATE
${SDK_ROOT}/include/videomaster
)
target_link_directories(deltacast-capture PRIVATE
target_link_directories(deltacast-bridge PRIVATE
${SDK_ROOT}/lib
)
target_link_libraries(deltacast-capture PRIVATE
target_link_libraries(deltacast-bridge PRIVATE
videomasterhd
videomasterhd_audio
pthread
)
# Embed the SDK RPATH so the binary finds the .so at runtime
set_target_properties(deltacast-capture PROPERTIES
set_target_properties(deltacast-bridge PROPERTIES
INSTALL_RPATH "/usr/local/lib/deltacast"
BUILD_WITH_INSTALL_RPATH TRUE
)
)
# Compat symlink: deltacast-capture -> deltacast-bridge
# (node-agent and any legacy scripts that reference the old name still work)
add_custom_command(TARGET deltacast-bridge POST_BUILD
COMMAND ${CMAKE_COMMAND} -E create_symlink
$<TARGET_FILE:deltacast-bridge>
$<TARGET_FILE_DIR:deltacast-bridge>/deltacast-capture
COMMENT "Creating deltacast-capture compat symlink"
)

View file

@ -1,12 +1,24 @@
/* services/capture/deltacast-bridge/main.c
*
* Deltacast VideoMaster SDI capture bridge.
* Writes raw UYVY video to stdout and stereo PCM to a named FIFO.
* Emits one JSON line to stderr on signal lock before streaming starts.
* Deltacast VideoMaster SDI shared multi-port bridge daemon.
*
* Opens the board ONCE, opens RX streams for all requested ports, and
* writes each port's video/audio to named FIFOs in a shared directory.
* One reader thread + one audio thread per port run concurrently.
*
* Usage:
* deltacast-capture --device <N> --port <N> --audio-pipe <path>
* [--signal-timeout <sec>]
* deltacast-bridge --device <N> --ports <csv>
* [--video-pipe-dir /dev/shm/deltacast]
* [--audio-pipe-dir /dev/shm/deltacast]
* [--signal-timeout <sec>]
*
* Compat alias: --port <N> treated as --ports <N> (single port).
*
* For each port that acquires signal, emits one JSON line to stderr:
* {"port":N,"width":W,"height":H,"fps_num":N,"fps_den":D,
* "pix_fmt":"uyvy422","audio_rate":48000,"audio_channels":2}
*
* Runs until SIGTERM/SIGINT, then closes all streams and the board.
*/
#include <errno.h>
@ -17,7 +29,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
@ -25,12 +37,15 @@
#include "VideoMasterHD_Sdi.h"
#include "VideoMasterHD_Sdi_Audio.h"
/* ── Globals ─────────────────────────────────────────────────────────── */
/* ── Globals ─────────────────────────────────────────────────────────── */
static atomic_int g_stop = 0;
static void on_signal(int s) { (void)s; atomic_store(&g_stop, 1); }
/* ── Stream type by port index ───────────────────────────────────────── */
/* ── Constants ────────────────────────────────────────────────────────── */
#define MAX_PORTS 8
/* ── Stream type by port index (non-contiguous SDK enum) ────────────── */
static ULONG rx_streamtype(unsigned port) {
switch (port) {
case 0: return VHD_ST_RX0;
@ -43,7 +58,7 @@ static ULONG rx_streamtype(unsigned port) {
case 7: return VHD_ST_RX7;
default:
fprintf(stderr, "{\"error\":\"port %u not supported (max 7)\"}\n", port);
return VHD_ST_RX0; /* caller will fail on signal lock */
return VHD_ST_RX0;
}
}
@ -89,29 +104,7 @@ static VideoInfo video_info(VHD_VIDEOSTANDARD std, VHD_CLOCKDIVISOR div) {
}
}
/* ── Audio thread ────────────────────────────────────────────
*
* CRITICAL: ffmpeg opens ALL of its inputs before it starts processing any of
* them, and input 1 is this audio FIFO. Opening the read end of a FIFO blocks
* until a writer connects, so if this thread fails to open the FIFO writer
* ffmpeg hangs forever on input 1 -> no video frames are ever read from
* pipe:0 -> 0 fps and an empty HLS preview. Therefore the FIFO writer is
* opened UNCONDITIONALLY and FIRST, independent of any VideoMaster audio open,
* and the thread then feeds the FIFO a CONTINUOUS, wall-clock-paced s16le
* stereo stream (real samples when available, otherwise silence) so ffmpeg's
* A/V demux stays alive and video keeps flowing. */
typedef struct {
HANDLE board;
unsigned port;
ULONG video_std;
ULONG clock_div;
int fps_num;
int fps_den;
const char *fifo_path;
} AudioArgs;
/* Write exactly `len` bytes; returns 0 on success, -1 if writing should stop
* (EPIPE when ffmpeg is gone, or any other error). */
/* ── Write-all helper ─────────────────────────────────────────────────── */
static int write_all(int fd, const unsigned char *p, size_t len) {
size_t off = 0;
while (off < len) {
@ -123,50 +116,69 @@ static int write_all(int fd, const unsigned char *p, size_t len) {
return 0;
}
static void *audio_thread(void *arg) {
AudioArgs *a = (AudioArgs *)arg;
/* ── Per-port state ───────────────────────────────────────────────────── */
typedef struct {
HANDLE board;
unsigned port;
unsigned device;
ULONG video_std;
ULONG clock_div;
VideoInfo vi;
char video_fifo[256];
char audio_fifo[256];
/* threads */
pthread_t video_tid;
pthread_t audio_tid;
/* streams (owned by threads, set before thread launch) */
HANDLE video_stream;
} PortState;
/* 1. Open the FIFO writer FIRST, unconditionally. This is what unblocks
* ffmpeg's input 1; we must reach it even if the VHD audio open fails. */
int fd = open(a->fifo_path, O_WRONLY);
/* ── Audio thread ──────────────────────────────────────────────────────
*
* Identical design to the single-port bridge audio thread:
* - Opens FIFO writer FIRST, unconditionally (unblocks ffmpeg input)
* - Feeds continuous wall-clock-paced s16le stereo (real or silence)
* - Best-effort VHD audio stream; silence fallback on any failure
*/
static void *audio_thread(void *arg) {
PortState *ps = (PortState *)arg;
int fd = open(ps->audio_fifo, O_WRONLY);
if (fd < 0) {
fprintf(stderr, "[audio] open FIFO failed: %s\n", strerror(errno));
fprintf(stderr, "[audio:%u] open FIFO failed: %s\n", ps->port, strerror(errno));
return NULL;
}
/* 2. Pacing + silence buffer sized to one video frame of 48kHz stereo
* s16le. samples_per_frame = 48000 * fps_den / fps_num (rounded). */
const int AUDIO_RATE = 48000;
const int CHANNELS = 2;
const size_t FRAME_BYTES = (size_t)CHANNELS * 2; /* s16le stereo */
int fps_num = a->fps_num > 0 ? a->fps_num : 25;
int fps_den = a->fps_den > 0 ? a->fps_den : 1;
const int AUDIO_RATE = 48000;
const int CHANNELS = 2;
const size_t FRAME_BYTES = (size_t)CHANNELS * 2; /* s16le stereo */
int fps_num = ps->vi.fps_num > 0 ? ps->vi.fps_num : 25;
int fps_den = ps->vi.fps_den > 0 ? ps->vi.fps_den : 1;
long samples_per_frame = ((long)AUDIO_RATE * fps_den + fps_num / 2) / fps_num;
if (samples_per_frame < 1) samples_per_frame = 1;
size_t tick_bytes = (size_t)samples_per_frame * FRAME_BYTES;
ULONG max_samples = VHD_GetNbSamples((VHD_VIDEOSTANDARD)a->video_std,
(VHD_CLOCKDIVISOR)a->clock_div,
ULONG max_samples = VHD_GetNbSamples((VHD_VIDEOSTANDARD)ps->video_std,
(VHD_CLOCKDIVISOR)ps->clock_div,
VHD_ASR_48000, 0);
ULONG block_size = VHD_GetBlockSize(VHD_AF_16, VHD_AM_STEREO);
size_t vhd_buf_sz = ((size_t)max_samples + 64) * (block_size ? block_size : FRAME_BYTES);
size_t buf_sz = vhd_buf_sz > tick_bytes ? vhd_buf_sz : tick_bytes;
unsigned char *buf = calloc(1, buf_sz); /* zeroed -> doubles as silence */
unsigned char *buf = calloc(1, buf_sz);
if (!buf) { close(fd); return NULL; }
/* 3. Try to open the VideoMaster audio stream (best effort, NON-FATAL). */
HANDLE stream = NULL;
int have_vhd_audio = 0;
VHD_AUDIOINFO ai;
memset(&ai, 0, sizeof(ai));
ULONG r = VHD_OpenStreamHandle(a->board, rx_streamtype(a->port),
ULONG r = VHD_OpenStreamHandle(ps->board, rx_streamtype(ps->port),
VHD_SDI_STPROC_DISJOINED_ANC,
NULL, &stream, NULL);
if (r == VHDERR_NOERROR) {
VHD_SetStreamProperty(stream, VHD_SDI_SP_VIDEO_STANDARD, a->video_std);
VHD_SetStreamProperty(stream, VHD_SDI_SP_CLOCK_SYSTEM, a->clock_div);
VHD_SetStreamProperty(stream, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED);
VHD_SetStreamProperty(stream, VHD_SDI_SP_VIDEO_STANDARD, ps->video_std);
VHD_SetStreamProperty(stream, VHD_SDI_SP_CLOCK_SYSTEM, ps->clock_div);
VHD_SetStreamProperty(stream, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED);
ai.pAudioGroups[0].pAudioChannels[0].Mode = VHD_AM_STEREO;
ai.pAudioGroups[0].pAudioChannels[0].BufferFormat = VHD_AF_16;
@ -175,16 +187,15 @@ static void *audio_thread(void *arg) {
if (VHD_StartStream(stream) == VHDERR_NOERROR) {
have_vhd_audio = 1;
} else {
fprintf(stderr, "[audio] VHD_StartStream failed - feeding silence\n");
fprintf(stderr, "[audio:%u] VHD_StartStream failed — feeding silence\n", ps->port);
VHD_CloseStreamHandle(stream);
stream = NULL;
}
} else {
fprintf(stderr, "[audio] VHD_OpenStreamHandle failed (%lu) - feeding silence\n", r);
fprintf(stderr, "[audio:%u] VHD_OpenStreamHandle failed (%lu) — feeding silence\n",
ps->port, r);
}
/* 4. Continuous, wall-clock-paced feed loop: real audio when available,
* otherwise silence, so ffmpeg's input 1 never starves. */
struct timespec next;
clock_gettime(CLOCK_MONOTONIC, &next);
long frame_ns = (long)(1000000000.0 * (double)fps_den / (double)fps_num);
@ -203,7 +214,8 @@ static void *audio_thread(void *arg) {
}
VHD_UnlockSlotHandle(slot);
} else if (r != VHDERR_TIMEOUT) {
fprintf(stderr, "[audio] lock error %lu - degrading to silence\n", r);
fprintf(stderr, "[audio:%u] lock error %lu — degrading to silence\n",
ps->port, r);
VHD_StopStream(stream);
VHD_CloseStreamHandle(stream);
stream = NULL;
@ -212,12 +224,12 @@ static void *audio_thread(void *arg) {
}
if (out_bytes == 0) {
memset(buf, 0, tick_bytes); /* one frame of silence */
memset(buf, 0, tick_bytes);
out_bytes = tick_bytes;
}
if (write_all(fd, buf, out_bytes) < 0) {
atomic_store(&g_stop, 1); /* ffmpeg closed the FIFO (EPIPE) */
atomic_store(&g_stop, 1);
break;
}
@ -229,7 +241,7 @@ static void *audio_thread(void *arg) {
(next.tv_sec == now.tv_sec && next.tv_nsec > now.tv_nsec)) {
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, NULL);
} else {
next = now; /* fell behind (real-audio burst) - resync */
next = now;
}
}
@ -242,85 +254,128 @@ static void *audio_thread(void *arg) {
return NULL;
}
/* ── Main ────────────────────────────────────────────────────────────── */
/* ── Video thread ─────────────────────────────────────────────────────── */
static void *video_thread(void *arg) {
PortState *ps = (PortState *)arg;
int fd = open(ps->video_fifo, O_WRONLY);
if (fd < 0) {
fprintf(stderr, "[video:%u] open FIFO failed: %s\n", ps->port, strerror(errno));
return NULL;
}
HANDLE slot = NULL;
while (!atomic_load(&g_stop)) {
ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot);
if (r == VHDERR_NOERROR) {
BYTE *buf = NULL;
ULONG sz = 0;
if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) {
if (write_all(fd, buf, sz) < 0) {
atomic_store(&g_stop, 1);
VHD_UnlockSlotHandle(slot);
break;
}
}
VHD_UnlockSlotHandle(slot);
} else if (r != VHDERR_TIMEOUT) {
fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping\n",
ps->port, r);
atomic_store(&g_stop, 1);
break;
}
}
close(fd);
return NULL;
}
/* ── Parse comma-separated port list ─────────────────────────────────── */
static int parse_ports(const char *csv, unsigned *ports, int max) {
int count = 0;
char buf[256];
strncpy(buf, csv, sizeof(buf) - 1);
buf[sizeof(buf) - 1] = '\0';
char *tok = strtok(buf, ",");
while (tok && count < max) {
ports[count++] = (unsigned)atoi(tok);
tok = strtok(NULL, ",");
}
return count;
}
/* ── Main ─────────────────────────────────────────────────────────────── */
int main(int argc, char *argv[]) {
unsigned device_id = 0;
unsigned port_id = 0;
int sig_timeout = 30;
const char *audio_pipe = NULL;
unsigned device_id = 0;
unsigned ports[MAX_PORTS] = {0};
int port_count = 0;
int sig_timeout = 30;
const char *video_pipe_dir = "/dev/shm/deltacast";
const char *audio_pipe_dir = "/dev/shm/deltacast";
for (int i = 1; i < argc; i++) {
if (!strcmp(argv[i], "--device") && i+1 < argc) device_id = (unsigned)atoi(argv[++i]);
else if (!strcmp(argv[i], "--port") && i+1 < argc) port_id = (unsigned)atoi(argv[++i]);
else if (!strcmp(argv[i], "--audio-pipe") && i+1 < argc) audio_pipe = argv[++i];
else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc) sig_timeout = atoi(argv[++i]);
if (!strcmp(argv[i], "--device") && i+1 < argc) {
device_id = (unsigned)atoi(argv[++i]);
} else if (!strcmp(argv[i], "--ports") && i+1 < argc) {
port_count = parse_ports(argv[++i], ports, MAX_PORTS);
} else if (!strcmp(argv[i], "--port") && i+1 < argc) {
/* single-port compat alias */
ports[0] = (unsigned)atoi(argv[++i]);
port_count = 1;
} else if (!strcmp(argv[i], "--video-pipe-dir") && i+1 < argc) {
video_pipe_dir = argv[++i];
} else if (!strcmp(argv[i], "--audio-pipe-dir") && i+1 < argc) {
audio_pipe_dir = argv[++i];
} else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc) {
sig_timeout = atoi(argv[++i]);
}
}
if (port_count == 0) {
fprintf(stderr, "{\"error\":\"no ports specified — use --ports 0,1,2,...\"}\n");
return 1;
}
signal(SIGINT, on_signal);
signal(SIGTERM, on_signal);
/* Don't let a dying ffmpeg kill us with SIGPIPE - writes return EPIPE
* and the FIFO/stdout write loops handle that by stopping cleanly. */
signal(SIGPIPE, SIG_IGN);
/* ── Init API ─────────────────────────────────────────────────── */
/* ── Init API ────────────────────────────────────────────────────── */
ULONG dll_ver, nb_boards;
if (VHD_GetApiInfo(&dll_ver, &nb_boards) != VHDERR_NOERROR) {
fprintf(stderr, "{\"error\":\"VHD_GetApiInfo failed\"}\n");
return 1;
}
if (device_id >= nb_boards) {
fprintf(stderr, "{\"error\":\"board %u not found (%lu detected)\"}\n", device_id, nb_boards);
fprintf(stderr, "{\"error\":\"board %u not found (%lu detected)\"}\n",
device_id, nb_boards);
return 1;
}
/* ── Serialize board open via flock ──────────────────────────────
* delta_x300 BufMngr.c:781 has an array-index-out-of-bounds bug that
* fires when two VHD_OpenBoardHandle calls race on the same board.
* Use a cross-container exclusive lock on a file in /dev/shm/deltacast/
* (already bind-mounted into every capture sidecar) to guarantee only
* one bridge runs OpenBoardHandle + signal-wait at a time. The lock is
* released after signal lock succeeds (plus a settle delay) or on
* failure so the next bridge is never permanently blocked.
*
* IMPORTANT: the signal-wait deadline is set AFTER acquiring the lock so
* the full sig_timeout is available for signal detection regardless of
* how long this bridge waited in the queue. */
const char *lock_path = "/dev/shm/deltacast/bridge.lock";
int lock_fd = open(lock_path, O_CREAT | O_RDWR, 0666);
if (lock_fd >= 0) {
fprintf(stderr, "[board] waiting for board-open lock (port %u)...\n", port_id);
if (flock(lock_fd, LOCK_EX) != 0) {
fprintf(stderr, "[board] flock failed: %s — proceeding without lock\n", strerror(errno));
close(lock_fd);
lock_fd = -1;
} else {
fprintf(stderr, "[board] lock acquired (port %u)\n", port_id);
}
} else {
fprintf(stderr, "[board] could not open lock file %s: %s — proceeding without lock\n",
lock_path, strerror(errno));
}
/* ── Open board ───────────────────────────────────────────────── */
/* ── Open board ONCE ─────────────────────────────────────────────── */
HANDLE board = NULL;
if (VHD_OpenBoardHandle(device_id, &board, NULL, 0) != VHDERR_NOERROR) {
fprintf(stderr, "{\"error\":\"VHD_OpenBoardHandle failed for board %u\"}\n", device_id);
if (lock_fd >= 0) { flock(lock_fd, LOCK_UN); close(lock_fd); }
return 1;
}
fprintf(stderr, "[board] opened board %u with %d port(s)\n", device_id, port_count);
/* Disable passive (relay) loopback so RX is live.
* VHD_CORE_BP_PASSIVE_LOOPBACK_<n> only exists for ports 0-3 in SDK 6.34.1,
* and the board reports passive-loopback capability 0, so skipping ports 4-7
* is harmless. */
if (port_id < 4) {
VHD_SetBoardProperty(board, loopback_prop(port_id), FALSE);
/* Disable passive loopback for each requested port (ports 0-3 only in SDK). */
for (int pi = 0; pi < port_count; pi++) {
unsigned p = ports[pi];
if (p < 4) VHD_SetBoardProperty(board, loopback_prop(p), FALSE);
}
/* ── Wait for signal on all ports ───────────────────────────────── */
ULONG video_stds[MAX_PORTS] = {0};
ULONG clock_divs[MAX_PORTS] = {0};
int locked[MAX_PORTS] = {0};
for (int pi = 0; pi < port_count; pi++) {
video_stds[pi] = (ULONG)NB_VHD_VIDEOSTANDARDS;
clock_divs[pi] = VHD_CLOCKDIV_1;
}
/* ── Wait for signal lock ──────────────────────────────────────────
* Deadline is set HERE after the flock is acquired and the board is
* open so the full sig_timeout is available regardless of queue wait. */
ULONG video_std = (ULONG)NB_VHD_VIDEOSTANDARDS;
struct timespec deadline;
clock_gettime(CLOCK_MONOTONIC, &deadline);
deadline.tv_sec += sig_timeout;
@ -331,110 +386,140 @@ int main(int argc, char *argv[]) {
if (now.tv_sec > deadline.tv_sec ||
(now.tv_sec == deadline.tv_sec && now.tv_nsec >= deadline.tv_nsec)) break;
VHD_GetChannelProperty(board, VHD_RX_CHANNEL, port_id,
VHD_SDI_CP_VIDEO_STANDARD, &video_std);
if (video_std != (ULONG)NB_VHD_VIDEOSTANDARDS) break;
int all_locked = 1;
for (int pi = 0; pi < port_count; pi++) {
if (locked[pi]) continue;
VHD_GetChannelProperty(board, VHD_RX_CHANNEL, ports[pi],
VHD_SDI_CP_VIDEO_STANDARD, &video_stds[pi]);
if (video_stds[pi] != (ULONG)NB_VHD_VIDEOSTANDARDS) {
VHD_GetChannelProperty(board, VHD_RX_CHANNEL, ports[pi],
VHD_SDI_CP_CLOCK_DIVISOR, &clock_divs[pi]);
locked[pi] = 1;
fprintf(stderr, "[board] port %u signal locked (std=%lu)\n",
ports[pi], video_stds[pi]);
} else {
all_locked = 0;
}
}
if (all_locked) break;
struct timespec ts = {0, 200000000L}; /* 200ms */
struct timespec ts = {0, 200000000L}; /* 200ms poll */
nanosleep(&ts, NULL);
}
if (atomic_load(&g_stop) || video_std == (ULONG)NB_VHD_VIDEOSTANDARDS) {
fprintf(stderr,
"{\"error\":\"no signal on board %u port %u within %ds\"}\n",
device_id, port_id, sig_timeout);
VHD_CloseBoardHandle(board);
if (lock_fd >= 0) { flock(lock_fd, LOCK_UN); close(lock_fd); }
return 1;
}
/* Signal locked. Hold the board-open lock for a settle period so the
* board's RX buffer queues are fully initialised before the next bridge
* calls OpenBoardHandle. 4 seconds is enough for 1080p59.94 @ queue-depth 8. */
if (lock_fd >= 0) {
struct timespec settle = {4, 0};
nanosleep(&settle, NULL);
flock(lock_fd, LOCK_UN);
close(lock_fd);
lock_fd = -1;
fprintf(stderr, "[board] lock released (port %u) — streaming\n", port_id);
}
ULONG clock_div = VHD_CLOCKDIV_1;
VHD_GetChannelProperty(board, VHD_RX_CHANNEL, port_id,
VHD_SDI_CP_CLOCK_DIVISOR, &clock_div);
VideoInfo vi = video_info((VHD_VIDEOSTANDARD)video_std,
(VHD_CLOCKDIVISOR)clock_div);
/* ── Emit format JSON to stderr (one line, flushed) ─────────────── */
fprintf(stderr,
"{\"width\":%d,\"height\":%d,\"fps_num\":%d,\"fps_den\":%d,"
"\"interlaced\":%s,\"pix_fmt\":\"uyvy422\","
"\"audio_channels\":2,\"audio_rate\":48000,"
"\"device\":%u,\"port\":%u}\n",
vi.width, vi.height, vi.fps_num, vi.fps_den,
vi.interlaced ? "true" : "false",
device_id, port_id);
fflush(stderr);
/* ── Open video stream ───────────────────────────────────────────── */
HANDLE video_stream = NULL;
if (VHD_OpenStreamHandle(board, rx_streamtype(port_id),
VHD_SDI_STPROC_DISJOINED_VIDEO,
NULL, &video_stream, NULL) != VHDERR_NOERROR) {
fprintf(stderr, "{\"error\":\"VHD_OpenStreamHandle (video) failed\"}\n");
VHD_CloseBoardHandle(board);
return 1;
}
VHD_SetStreamProperty(video_stream, VHD_SDI_SP_VIDEO_STANDARD, video_std);
VHD_SetStreamProperty(video_stream, VHD_SDI_SP_CLOCK_SYSTEM, clock_div);
VHD_SetStreamProperty(video_stream, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED);
VHD_SetStreamProperty(video_stream, VHD_CORE_SP_BUFFERQUEUE_DEPTH, 8);
/* ── Launch audio thread (FIFO open blocks until FFmpeg connects) ── */
pthread_t audio_tid = 0;
AudioArgs audio_args = { board, port_id, video_std, clock_div,
vi.fps_num, vi.fps_den, audio_pipe };
if (audio_pipe) {
pthread_create(&audio_tid, NULL, audio_thread, &audio_args);
}
/* ── Start video stream ──────────────────────────────────────────── */
if (VHD_StartStream(video_stream) != VHDERR_NOERROR) {
atomic_store(&g_stop, 1);
if (audio_tid) pthread_join(audio_tid, NULL);
VHD_CloseStreamHandle(video_stream);
VHD_CloseBoardHandle(board);
return 1;
}
/* ── Video capture loop ──────────────────────────────────────────── */
HANDLE slot = NULL;
while (!atomic_load(&g_stop)) {
ULONG r = VHD_LockSlotHandle(video_stream, &slot);
if (r == VHDERR_NOERROR) {
BYTE *buf = NULL;
ULONG sz = 0;
if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) {
ULONG written = 0;
while (written < sz) {
ssize_t n = write(STDOUT_FILENO, buf + written, sz - written);
if (n <= 0) { atomic_store(&g_stop, 1); break; }
written += (ULONG)n;
}
}
VHD_UnlockSlotHandle(slot);
} else if (r != VHDERR_TIMEOUT) {
fprintf(stderr, "[video] VHD_LockSlotHandle error %lu — stopping\n", r);
break;
/* Report results — continue with whatever locked, abort only if NONE locked. */
int any_locked = 0;
for (int pi = 0; pi < port_count; pi++) {
if (locked[pi]) { any_locked = 1; }
else {
fprintf(stderr,
"{\"error\":\"no signal on board %u port %u within %ds\"}\n",
device_id, ports[pi], sig_timeout);
}
}
if (!any_locked || atomic_load(&g_stop)) {
VHD_CloseBoardHandle(board);
return 1;
}
/* ── Cleanup ─────────────────────────────────────────────────── */
VHD_StopStream(video_stream);
VHD_CloseStreamHandle(video_stream);
if (audio_tid) pthread_join(audio_tid, NULL);
/* ── Create FIFOs and open streams for each locked port ─────────── */
PortState ps[MAX_PORTS];
memset(ps, 0, sizeof(ps));
int active_count = 0;
for (int pi = 0; pi < port_count; pi++) {
if (!locked[pi]) continue;
PortState *p = &ps[active_count];
p->board = board;
p->port = ports[pi];
p->device = device_id;
p->video_std = video_stds[pi];
p->clock_div = clock_divs[pi];
p->vi = video_info((VHD_VIDEOSTANDARD)video_stds[pi],
(VHD_CLOCKDIVISOR)clock_divs[pi]);
snprintf(p->video_fifo, sizeof(p->video_fifo),
"%s/video-%u.fifo", video_pipe_dir, ports[pi]);
snprintf(p->audio_fifo, sizeof(p->audio_fifo),
"%s/audio-%u.fifo", audio_pipe_dir, ports[pi]);
/* Create FIFOs (mkfifo; ignore EEXIST). */
if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) {
fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno));
continue;
}
if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) {
fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno));
continue;
}
/* Open video stream. */
HANDLE vs = NULL;
ULONG r = VHD_OpenStreamHandle(board, rx_streamtype(ports[pi]),
VHD_SDI_STPROC_DISJOINED_VIDEO,
NULL, &vs, NULL);
if (r != VHDERR_NOERROR) {
fprintf(stderr, "{\"error\":\"VHD_OpenStreamHandle video failed port %u rc=%lu\"}\n",
ports[pi], r);
continue;
}
VHD_SetStreamProperty(vs, VHD_SDI_SP_VIDEO_STANDARD, p->video_std);
VHD_SetStreamProperty(vs, VHD_SDI_SP_CLOCK_SYSTEM, p->clock_div);
VHD_SetStreamProperty(vs, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED);
VHD_SetStreamProperty(vs, VHD_CORE_SP_BUFFERQUEUE_DEPTH, 8);
p->video_stream = vs;
if (VHD_StartStream(vs) != VHDERR_NOERROR) {
fprintf(stderr, "{\"error\":\"VHD_StartStream video failed port %u\"}\n", ports[pi]);
VHD_CloseStreamHandle(vs);
p->video_stream = NULL;
continue;
}
/* Emit format JSON to stderr (one line per port on signal lock). */
fprintf(stderr,
"{\"port\":%u,\"width\":%d,\"height\":%d,"
"\"fps_num\":%d,\"fps_den\":%d,"
"\"interlaced\":%s,"
"\"pix_fmt\":\"uyvy422\","
"\"audio_channels\":2,\"audio_rate\":48000,"
"\"device\":%u}\n",
ports[pi],
p->vi.width, p->vi.height,
p->vi.fps_num, p->vi.fps_den,
p->vi.interlaced ? "true" : "false",
device_id);
fflush(stderr);
/* Launch audio thread (blocks until reader connects to audio FIFO). */
pthread_create(&p->audio_tid, NULL, audio_thread, p);
/* Launch video thread (blocks until reader connects to video FIFO). */
pthread_create(&p->video_tid, NULL, video_thread, p);
active_count++;
}
if (active_count == 0) {
fprintf(stderr, "{\"error\":\"no ports successfully started\"}\n");
VHD_CloseBoardHandle(board);
return 1;
}
/* ── Wait for all threads to finish ─────────────────────────────── */
for (int i = 0; i < active_count; i++) {
if (ps[i].video_tid) pthread_join(ps[i].video_tid, NULL);
if (ps[i].audio_tid) pthread_join(ps[i].audio_tid, NULL);
}
/* ── Cleanup ─────────────────────────────────────────────────────── */
for (int i = 0; i < active_count; i++) {
if (ps[i].video_stream) {
VHD_StopStream(ps[i].video_stream);
VHD_CloseStreamHandle(ps[i].video_stream);
}
}
VHD_CloseBoardHandle(board);
return 0;
}

View file

@ -4,47 +4,6 @@ import { dirname } from 'node:path';
import { v4 as uuidv4 } from 'uuid';
import { createUploadStream } from './s3/client.js';
/**
* Reads the first line from a spawned process's stderr stream.
* Resolves with the parsed JSON object when the first '\n' arrives.
* Rejects if the process exits with a non-zero code before emitting a line,
* or if timeoutMs elapses.
*/
function readFirstStderrLine(proc, timeoutMs = 300_000) {
return new Promise((resolve, reject) => {
let buf = '';
let settled = false;
const settle = (fn) => { if (settled) return; settled = true; fn(); };
const timer = setTimeout(() => {
settle(() => reject(new Error(`deltacast-capture: timed out waiting for format JSON after ${timeoutMs}ms`)));
}, timeoutMs);
proc.stderr.setEncoding('utf8');
proc.stderr.on('data', (chunk) => {
buf += chunk;
let nl;
while ((nl = buf.indexOf('\n')) !== -1) {
const line = buf.slice(0, nl).trim();
buf = buf.slice(nl + 1);
if (!line) continue;
if (!line.startsWith('{')) { console.error('[deltacast-bridge] ' + line); continue; }
clearTimeout(timer);
try {
const parsed = JSON.parse(line);
if (parsed.error) { settle(() => reject(new Error('deltacast-capture: ' + parsed.error))); }
else { settle(() => resolve(parsed)); }
} catch (e) { settle(() => reject(new Error('deltacast-capture: invalid JSON: ' + line))); }
return;
}
});
proc.on('exit', (code) => {
clearTimeout(timer);
settle(() => reject(new Error(`deltacast-capture: exited with code ${code} before emitting format JSON`)));
});
});
}
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
@ -476,12 +435,11 @@ const sourceBackends = {
},
},
// Stubs — hardware/SDK plumbing not yet implemented. These throw clearly so a
// misconfigured recorder fails fast instead of silently falling back to the
// wrong card.
deltacast: {
// Unused stub — deltacast capture uses sourceType='deltacast' path in
// _buildInputArgs, not the sourceBackends map.
buildInput() {
throw new Error('deltacast backend not yet implemented — requires hardware');
throw new Error('deltacast: use sourceType="deltacast" not sourceBackend');
},
},
aja: {
@ -586,53 +544,88 @@ class CaptureManager {
return { inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-i', sourceUrl], isNetwork: true };
}
// Deltacast SDI via VideoMaster SDK FFmpeg plugin.
// Deltacast SDI via shared bridge daemon (deltacast-bridge).
//
// The bridge daemon is started by node-agent (host process, direct /dev access)
// and writes each port's streams to named FIFOs in /dev/shm/deltacast/:
// /dev/shm/deltacast/video-<port>.fifo
// /dev/shm/deltacast/audio-<port>.fifo
//
// This sidecar just reads from those FIFOs. The bridge may still be starting
// up or waiting for signal lock, so we wait up to 30s for the FIFOs to appear
// before handing them to ffmpeg. The bridge process is managed by node-agent;
// bridgeProcess is null here (no per-sidecar bridge spawn).
if (sourceType === 'deltacast') {
const idx = (typeof device === 'number' || /^\d+$/.test(String(device)))
const idx = (typeof device === 'number' || /^\d+$/.test(String(device)))
? parseInt(device, 10) : 0;
const audioFifo = `/tmp/dc-audio-${this._sessionIdForBridge}`;
const { execFileSync: _execFile } = await import('child_process');
const { unlinkSync: _unlink, existsSync: _exists } = await import('node:fs');
if (_exists(audioFifo)) { try { _unlink(audioFifo); } catch (_) {} }
try { _execFile('mkfifo', [audioFifo]); } catch (e) {
throw new Error(`Failed to create audio FIFO ${audioFifo}: ${e.message}`);
}
// ONE board (index 0) carries 8 channels (ports 0-7). --device is the
// board index, --port is the selected channel. board defaults to 0; the
// capture channel comes from source_config.port, falling back to the
// legacy device index so existing single-value recorders keep working.
const boardIdx = (typeof board === 'number' || /^\d+$/.test(String(board)))
? parseInt(board, 10) : 0;
const portIdx = (typeof port === 'number' || /^\d+$/.test(String(port)))
? parseInt(port, 10) : idx;
const bridge = spawn('deltacast-capture', [
'--device', String(boardIdx),
'--port', String(portIdx),
'--audio-pipe', audioFifo,
'--signal-timeout', '30',
], { stdio: ['ignore', 'pipe', 'pipe'] });
const fmt = await readFirstStderrLine(bridge, 300_000);
bridge.stderr.on('data', (d) => console.error(`[deltacast-bridge] ${d.toString().trimEnd()}`));
const DC_PIPE_DIR = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast';
const videoFifo = `${DC_PIPE_DIR}/video-${portIdx}.fifo`;
const audioFifo = `${DC_PIPE_DIR}/audio-${portIdx}.fifo`;
// Wait up to 30s for both FIFOs to exist (bridge starts asynchronously).
const { existsSync: _exists } = await import('node:fs');
const WAIT_MS = 30_000;
const POLL_MS = 500;
const deadline = Date.now() + WAIT_MS;
let videoReady = false;
let audioReady = false;
while (Date.now() < deadline) {
videoReady = _exists(videoFifo);
audioReady = _exists(audioFifo);
if (videoReady && audioReady) break;
await new Promise(r => setTimeout(r, POLL_MS));
}
if (!videoReady || !audioReady) {
throw new Error(
`deltacast bridge FIFOs not ready after ${WAIT_MS / 1000}s ` +
`(video=${videoReady} audio=${audioReady}) — is deltacast-bridge running?`
);
}
console.log(`[deltacast] port ${portIdx} FIFOs ready: ${videoFifo}, ${audioFifo}`);
// Resolution/fps are not known until the FIFO reader connects and starts
// receiving frames. We use sensible defaults here; ffmpeg's rawvideo demuxer
// will accept whatever the bridge writes once the pipe opens.
// The bridge daemon has already detected the signal and set up streams, so
// the FIFO content is ready-to-read as soon as the reader connects.
//
// NOTE: The format JSON emitted by the bridge on signal lock goes to the
// node-agent (which launched the bridge), not to this sidecar. The sidecar
// therefore uses fixed rawvideo params here. If per-port format introspection
// is needed in future, the node-agent should expose the fmt JSON via an API
// and capture-manager can query it before building inputArgs.
//
// For now, both video dimensions and framerate come from the recorder's
// configured values (passed to start() as `framerate` and implicit in the
// codec args). The rawvideo input is -video_size / -framerate from env or
// recorder config; ffmpeg tolerates a small mismatch in rawvideo (it just
// reads N bytes per frame based on the declared size).
//
// DELTACAST_VIDEO_SIZE / DELTACAST_FRAMERATE: set by node-agent in the
// sidecar env based on the bridge's per-port format JSON, if desired.
const dcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080';
const dcFps = process.env.DELTACAST_FRAMERATE || '25';
const dcInterlaced = process.env.DELTACAST_INTERLACED === '1';
return {
inputArgs: [
'-f', 'rawvideo',
'-pix_fmt', fmt.pix_fmt,
'-video_size', `${fmt.width}x${fmt.height}`,
'-framerate', `${fmt.fps_num}/${fmt.fps_den}`,
'-i', 'pipe:0',
'-pix_fmt', 'uyvy422',
'-video_size', dcSize,
'-framerate', dcFps,
'-i', videoFifo,
'-f', 's16le',
'-ar', String(fmt.audio_rate),
'-ac', String(fmt.audio_channels),
'-ar', '48000',
'-ac', '2',
'-i', audioFifo,
],
isNetwork: false,
bridgeProcess: bridge,
audioFifo,
interlaced: !!fmt.interlaced,
isNetwork: false,
bridgeProcess: null, /* bridge is managed by node-agent, not this sidecar */
audioFifo: null, /* no per-session FIFO to clean up on stop */
interlaced: dcInterlaced,
};
}
@ -915,10 +908,10 @@ exit "$BMXRC"
sourceType, sourceBackend, device, port, board, sourceUrl, listen, listenPort, streamKey,
});
// Audio input index: the deltacast bridge delivers audio on a separate
// FIFO wired as ffmpeg input 1, whereas DeckLink SDI and network sources
// carry audio inside input 0. (bridgeProcess is set only for deltacast.)
const audioMap = bridgeProcess ? '1:a:0?' : '0:a:0?';
// Audio input index: the deltacast shared bridge delivers video on input 0
// (video FIFO) and audio on input 1 (audio FIFO), so audioMap is '1:a:0?'.
// DeckLink SDI and network sources carry audio inside input 0.
const audioMap = (sourceType === 'deltacast') ? '1:a:0?' : '0:a:0?';
// Non-growing master: ffmpeg muxes the finalized MOV directly. Growing
// master: raw2bmx muxes the OP1a from elementary FIFOs (handled below via
@ -959,7 +952,8 @@ exit "$BMXRC"
catch (err) { console.error('[capture] could not create temp master dir:', err.message); }
}
const hiresOutput = localMasterPath;
const hiresStdio = [bridgeProcess ? 'pipe' : 'ignore', 'ignore', 'pipe'];
// Deltacast reads from FIFOs (no stdin pipe needed). DeckLink pipes stdout.
const hiresStdio = ['ignore', 'ignore', 'pipe'];
// For SDI we cannot open the DeckLink device a second time for a preview
// tee, so the live HLS preview is produced as a SECOND OUTPUT of the hires
@ -1023,25 +1017,14 @@ exit "$BMXRC"
hiresArgs = [ ...inputArgs, ...sdiFilterArgs, ...hiresCodecArgs, hiresOutput ];
}
hiresProcess = spawn('ffmpeg', hiresArgs, { stdio: hiresStdio });
if (bridgeProcess) {
bridgeProcess.stdout.pipe(hiresProcess.stdin);
}
}
// Growing-files: nothing to upload here (promotion worker handles S3).
// Non-growing: the master is uploaded from the finalized local file in
// stop(), once ffmpeg has written the moov and exited cleanly — we can't
// upload while recording because the file isn't a valid MOV until finalize.
// bridgeProcess is null for deltacast (bridge managed by node-agent on the host).
const processes = { hires: hiresProcess };
if (bridgeProcess) {
processes.bridge = bridgeProcess;
bridgeProcess.on('exit', (code) => {
if (code !== 0 && code !== null) {
console.error(`[deltacast-bridge] exited with code ${code}`);
this.state.lastError = `deltacast bridge exited: code ${code}`;
}
});
}
const uploads = { hires: growingPath ? Promise.resolve({ growingPath }) : null };
// ── HLS tee for network sources (live preview in the UI) ──────────
@ -1164,7 +1147,7 @@ exit "$BMXRC"
if (processes.hires) processes.hires.kill('SIGINT');
if (processes.proxy) processes.proxy.kill('SIGINT');
if (processes.hls) { try { processes.hls.kill('SIGINT'); } catch (_) {} }
if (processes.bridge) { try { processes.bridge.kill('SIGINT'); } catch (_) {} }
/* processes.bridge: removed — bridge is managed by node-agent, not per-session */
// Wait for the master writer to finalize before we read/upload the file.
await waitExit(processes.hires);

View file

@ -1,6 +1,7 @@
import http from 'http';
import os from 'os';
import fs from 'fs';
import http from 'http';
import os from 'os';
import fs from 'fs';
import { spawn } from 'child_process';
const MAM_API_URL = (process.env.MAM_API_URL || 'http://localhost:3000').replace(/\/$/, '');
const NODE_TOKEN = process.env.NODE_TOKEN || '';
@ -29,14 +30,10 @@ const VERSION = '1.4.0';
// interpolated into a shell string.
const DRIVER_VENDORS = ['blackmagic', 'aja', 'deltacast', 'ndi'];
// ── Deltacast board-open mutex ────────────────────────────────────────────
// Simultaneous VHD_OpenBoardHandle calls from multiple deltacast sidecars
// trigger a kernel array-index-out-of-bounds in delta_x300 BufMngr.c:781
// that wedges all RX channels until the module is reloaded. Serialize
// deltacast-only sidecar launches through a promise-chain mutex with a
// settle delay so each board-open completes before the next one starts.
// Configurable via DELTACAST_START_STAGGER_MS (default 3500ms). SDI, SRT,
// and RTMP sources are unaffected.
// ── Deltacast board-open mutex (legacy — no longer used) ─────────────────
// The per-sidecar board-open race is eliminated by the shared bridge daemon
// (deltacast-bridge). This mutex is kept but acquireDcLock() is never called
// for deltacast sidecars; they wait for the bridge FIFOs instead.
const DELTACAST_STAGGER_MS = parseInt(process.env.DELTACAST_START_STAGGER_MS || '3500', 10);
let _dcMutex = Promise.resolve();
@ -48,6 +45,97 @@ function acquireDcLock() {
return wait.then(() => release);
}
// ── Deltacast shared bridge daemon ────────────────────────────────────────
//
// ONE deltacast-bridge process runs on the HOST (not inside a container) and
// opens the board handle exactly once, serving all requested ports via FIFOs
// in /dev/shm/deltacast/. This eliminates the BufMngr.c:781 OOB fault caused
// by concurrent VHD_OpenBoardHandle calls.
//
// Lifecycle:
// - First deltacast sidecar start → bridge launched with all configured ports.
// - Subsequent starts → sidecar reads existing FIFOs; bridge unchanged.
// - Last deltacast sidecar stop → bridge killed.
// - Bridge unexpected exit → _dcBridge reset; next sidecar re-launches it.
//
// DELTACAST_PIPE_DIR (default /dev/shm/deltacast): FIFO directory, bind-mounted
// into each deltacast sidecar so ffmpeg can read the FIFOs.
// DELTACAST_BRIDGE_BIN (default deltacast-bridge): host path to the binary.
// Typically /usr/local/bin/deltacast-bridge after `make install` from the SDK
// build, or set to the build-dir path for development.
// DELTACAST_PORTS (csv, e.g. "0,1,2,4,7"): ports the bridge opens at launch.
// Defaults to all 8 ports (0-7) so any sidecar port combination is covered.
const DC_PIPE_DIR = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast';
const DC_BRIDGE_BIN = process.env.DELTACAST_BRIDGE_BIN || 'deltacast-bridge';
const DC_PORTS_CSV = process.env.DELTACAST_PORTS || '0,1,2,3,4,5,6,7';
const DC_BOARD = process.env.DELTACAST_BOARD || '0';
let _dcBridge = null; // ChildProcess | null
let _dcSidecarCount = 0; // active deltacast sidecars on this node
// Map containerId -> sourceType so stop() can decrement the deltacast counter.
const _containerSourceType = new Map();
// port -> fmt JSON from bridge stderr (inject into sidecar env)
const _dcPortFmt = new Map();
function _dcBridgeRunning() {
return _dcBridge !== null && _dcBridge.exitCode === null && _dcBridge.signalCode === null;
}
function startDeltacastBridge() {
if (_dcBridgeRunning()) return; // already up
try { fs.mkdirSync(DC_PIPE_DIR, { recursive: true }); } catch (_) {}
const args = [
'--device', DC_BOARD,
'--ports', DC_PORTS_CSV,
'--video-pipe-dir', DC_PIPE_DIR,
'--audio-pipe-dir', DC_PIPE_DIR,
];
console.log(`[dc-bridge] launching: ${DC_BRIDGE_BIN} ${args.join(' ')}`);
const proc = spawn(DC_BRIDGE_BIN, args, {
stdio: ['ignore', 'ignore', 'pipe'],
detached: false,
});
proc.stderr.setEncoding('utf8');
proc.stderr.on('data', (chunk) => {
for (const line of chunk.split('\n')) {
const t = line.trim();
if (!t) continue;
// Format JSON lines go to stdout so node-agent can log/forward them.
if (t.startsWith('{')) {
console.log('[dc-bridge] ' + t);
try { const f = JSON.parse(t); if (typeof f.port === 'number') _dcPortFmt.set(f.port, f); } catch (_) {}
} else {
console.error('[dc-bridge] ' + t);
}
}
});
proc.on('exit', (code, sig) => {
console.error(`[dc-bridge] exited code=${code} signal=${sig}`);
_dcBridge = null;
});
_dcBridge = proc;
console.log(`[dc-bridge] pid=${proc.pid} board=${DC_BOARD} ports=${DC_PORTS_CSV}`);
}
function stopDeltacastBridge() {
if (!_dcBridgeRunning()) return;
console.log('[dc-bridge] stopping (no active deltacast sidecars)');
try { _dcBridge.kill('SIGTERM'); } catch (_) {}
// Give it 5s to clean up, then SIGKILL.
const proc = _dcBridge;
setTimeout(() => {
try { if (proc.exitCode === null) proc.kill('SIGKILL'); } catch (_) {}
}, 5000);
_dcBridge = null;
}
// Pick the host's LAN IP. Inside a bridge-mode container,
// os.networkInterfaces() returns the container's docker-bridge IP (172.x),
// not the host's LAN address. Two strategies:
@ -185,19 +273,34 @@ async function handleSidecarStart(body, res) {
HostConfig: hostConfig,
};
// Deltacast: serialize board opens through a process-wide mutex + settle
// delay. Concurrent VHD_OpenBoardHandle calls wedge the kernel RX buffer
// manager (delta_x300 BufMngr.c:781 OOB). Non-deltacast sources skip
// this entirely so SDI/SRT/RTMP start latency is unchanged.
let release = null;
// Deltacast: ensure the shared bridge daemon is running on the HOST before
// starting the sidecar. The sidecar reads FIFOs produced by the bridge;
// it does NOT open the board handle itself (no BufMngr.c:781 race).
if (sourceType === 'deltacast') {
release = await acquireDcLock();
_dcSidecarCount++;
startDeltacastBridge();
// Inject per-port signal format so capture-manager uses real dimensions/fps
const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14);
let _portNum = NaN;
try { _portNum = JSON.parse(_srcCfg).port; } catch (_) {}
if (Number.isFinite(_portNum) && _dcPortFmt.has(_portNum)) {
const _fmt = _dcPortFmt.get(_portNum);
const _fps = (_fmt.fps_den && _fmt.fps_den !== 1) ? `${_fmt.fps_num}/${_fmt.fps_den}` : String(_fmt.fps_num);
sidecarEnv.push(`DELTACAST_VIDEO_SIZE=${_fmt.width}x${_fmt.height}`);
sidecarEnv.push(`DELTACAST_FRAMERATE=${_fps}`);
sidecarEnv.push(`DELTACAST_INTERLACED=${_fmt.interlaced ? '1' : '0'}`);
console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} interlaced=${_fmt.interlaced}`);
}
}
let containerId;
try {
const createRes = await dockerApi('POST', '/containers/create', spec);
if (createRes.status !== 201) {
if (sourceType === 'deltacast') {
_dcSidecarCount--;
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
}
return jsonResponse(res, 502, { error: 'Failed to create container', details: createRes.data });
}
@ -207,19 +310,22 @@ async function handleSidecarStart(body, res) {
console.log(`[sidecar-start] ${containerId} image=${image} src=${sourceType} MAM_API_URL=${_u} token=${_tok}`);
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
if (startRes.status !== 204) {
if (sourceType === 'deltacast') {
_dcSidecarCount--;
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
}
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
return jsonResponse(res, 502, { error: 'Failed to start container', details: startRes.data });
}
if (sourceType === 'deltacast') _containerSourceType.set(containerId, 'deltacast');
jsonResponse(res, 201, { containerId, capturePort });
// Hold the lock for the settle period AFTER responding so the caller
// isn't blocked, but the next deltacast open is still deferred.
if (release) {
await new Promise(r => setTimeout(r, DELTACAST_STAGGER_MS));
} catch (err) {
if (sourceType === 'deltacast') {
_dcSidecarCount--;
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
}
} finally {
if (release) release();
throw err;
}
} catch (err) {
jsonResponse(res, 500, { error: err.message });
@ -257,6 +363,19 @@ async function handleSidecarStop(containerId, res) {
console.log(`[sidecar-stop] ==== capture logs for ${containerId} ====\n${logs}\n[sidecar-stop] ==== end logs ====`);
// Container has now exited gracefully (or hit the 180s cap); remove it.
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
// Deltacast bridge lifecycle: decrement sidecar count; stop bridge when last.
if (_containerSourceType.get(containerId) === 'deltacast') {
_containerSourceType.delete(containerId);
_dcSidecarCount--;
if (_dcSidecarCount <= 0) {
_dcSidecarCount = 0;
stopDeltacastBridge();
}
} else {
_containerSourceType.delete(containerId);
}
jsonResponse(res, 200, { ok: true });
} catch (err) {
console.error(`[sidecar-stop] error: ${err.message}`);

49
tools/na_patch.py Normal file
View file

@ -0,0 +1,49 @@
"""Patches node-agent to store bridge per-port format JSON and inject into sidecar env."""
with open('services/node-agent/index.js') as f:
s = f.read()
# 1. Add per-port format cache
old1 = "const _containerSourceType = new Map();"
new1 = ("const _containerSourceType = new Map();\n"
"// port -> fmt JSON from bridge stderr (inject into sidecar env)\n"
"const _dcPortFmt = new Map();")
assert old1 in s, "MISS: _containerSourceType"
s = s.replace(old1, new1, 1)
# 2. Parse format JSON in bridge stderr handler
old2 = (" if (t.startsWith('{')) console.log('[dc-bridge] ' + t);\n"
" else console.error('[dc-bridge] ' + t);")
new2 = (" if (t.startsWith('{')) {\n"
" console.log('[dc-bridge] ' + t);\n"
" try { const f = JSON.parse(t); if (typeof f.port === 'number') _dcPortFmt.set(f.port, f); } catch (_) {}\n"
" } else {\n"
" console.error('[dc-bridge] ' + t);\n"
" }")
assert old2 in s, "MISS: stderr handler"
s = s.replace(old2, new2, 1)
# 3. Inject env after startDeltacastBridge()
old3 = (" if (sourceType === 'deltacast') {\n"
" _dcSidecarCount++;\n"
" startDeltacastBridge();")
new3 = (" if (sourceType === 'deltacast') {\n"
" _dcSidecarCount++;\n"
" startDeltacastBridge();\n"
" // Inject per-port signal format so capture-manager uses real dimensions/fps\n"
" const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14);\n"
" let _portNum = NaN;\n"
" try { _portNum = JSON.parse(_srcCfg).port; } catch (_) {}\n"
" if (Number.isFinite(_portNum) && _dcPortFmt.has(_portNum)) {\n"
" const _fmt = _dcPortFmt.get(_portNum);\n"
" const _fps = (_fmt.fps_den && _fmt.fps_den !== 1) ? `${_fmt.fps_num}/${_fmt.fps_den}` : String(_fmt.fps_num);\n"
" sidecarEnv.push(`DELTACAST_VIDEO_SIZE=${_fmt.width}x${_fmt.height}`);\n"
" sidecarEnv.push(`DELTACAST_FRAMERATE=${_fps}`);\n"
" sidecarEnv.push(`DELTACAST_INTERLACED=${_fmt.interlaced ? '1' : '0'}`);\n"
" console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} interlaced=${_fmt.interlaced}`);\n"
" }")
assert old3 in s, "MISS: sidecar start deltacast block"
s = s.replace(old3, new3, 1)
with open('services/node-agent/index.js', 'w') as f:
f.write(s)
print("PATCHED OK")