diff --git a/services/capture/Dockerfile b/services/capture/Dockerfile index 1730c6c..8e7d2f9 100644 --- a/services/capture/Dockerfile +++ b/services/capture/Dockerfile @@ -15,6 +15,24 @@ RUN cmake -S /bridge -B /bridge/build \ -DSDK_ROOT=/sdk \ && cmake --build /bridge/build -j$(nproc) +# ── Stage 1c: Build decklink-bridge binary ─────────────────────────────── +FROM debian:bookworm AS decklink-bridge-builder +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential cmake ca-certificates g++ \ + && rm -rf /var/lib/apt/lists/* +# DeckLink SDK headers (for IDeckLinkInput etc.) +COPY sdk/ /decklink-sdk/ +# Shared fc_writer module from deltacast-bridge +COPY deltacast-bridge/fc_writer.h /fc_writer/fc_writer.h +COPY deltacast-bridge/fc_writer.c /fc_writer/fc_writer.c +# decklink-bridge source +COPY decklink-bridge/ /decklink-bridge/ +RUN cmake -S /decklink-bridge -B /decklink-bridge/build \ + -DCMAKE_BUILD_TYPE=Release \ + -DDECKLINK_SDK_DIR=/decklink-sdk \ + -DDELTACAST_BRIDGE_DIR=/fc_writer \ + && cmake --build /decklink-bridge/build -j$(nproc) + # ── Stage 2: Build FFmpeg with DeckLink + NVENC (HEVC/H264) support ───────── # All-Intra HEVC NVENC is the master codec for growing-file ingest (see # docs/design/2026-05-29-all-intra-hevc-ingest.md). This stage gets the @@ -151,6 +169,9 @@ RUN raw2bmx -h >/dev/null 2>&1 && echo 'raw2bmx runtime OK' # Deltacast bridge binary + SDK runtime libs COPY --from=bridge-builder /bridge/build/deltacast-capture /usr/local/bin/deltacast-capture + +# DeckLink bridge binary (no SDK runtime .so — uses dlopen at runtime) +COPY --from=decklink-bridge-builder /decklink-bridge/build/decklink-bridge /usr/local/bin/decklink-bridge COPY --from=sdk-extractor /sdk/lib/libvideomasterhd.so.6.34.1 /usr/local/lib/deltacast/ COPY --from=sdk-extractor /sdk/lib/libvideomasterhd_audio.so.6.34.1 /usr/local/lib/deltacast/ RUN ln -sf libvideomasterhd.so.6.34.1 /usr/local/lib/deltacast/libvideomasterhd.so.6 \ diff --git a/services/capture/decklink-bridge/CMakeLists.txt b/services/capture/decklink-bridge/CMakeLists.txt new file mode 100644 index 0000000..9f78c28 --- /dev/null +++ b/services/capture/decklink-bridge/CMakeLists.txt @@ -0,0 +1,51 @@ +cmake_minimum_required(VERSION 3.16) +project(decklink-bridge CXX C) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_C_STANDARD 11) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -O2") + +# Path to DeckLink SDK headers (services/capture/sdk/) +set(DECKLINK_SDK_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../sdk" + CACHE PATH "Path to Blackmagic DeckLink SDK headers") + +# Path to Deltacast bridge (for fc_writer.h/c — shared writer module) +set(DELTACAST_BRIDGE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../deltacast-bridge" + CACHE PATH "Path to deltacast-bridge (contains fc_writer.h/c)") + +# Legacy FIFO fallback option (mirrors deltacast-bridge option) +option(LEGACY_FIFO "Use named FIFOs instead of framecache shm" OFF) + +# ── decklink-bridge executable ──────────────────────────────────────── +add_executable(decklink-bridge + main.cpp + ${DELTACAST_BRIDGE_DIR}/fc_writer.c # shared framecache writer +) + +if(LEGACY_FIFO) + target_compile_definitions(decklink-bridge PRIVATE LEGACY_FIFO=1) + message(STATUS "decklink-bridge: LEGACY_FIFO mode enabled") +else() + message(STATUS "decklink-bridge: framecache shm mode enabled") +endif() + +target_include_directories(decklink-bridge PRIVATE + ${DECKLINK_SDK_DIR} + ${DELTACAST_BRIDGE_DIR} # fc_writer.h +) + +target_link_libraries(decklink-bridge PRIVATE + pthread + rt # shm_open, sem_open + dl # dlopen (used by DeckLinkAPIDispatch.cpp on Linux) +) + +# DeckLink driver is linked at runtime via dlopen (no link-time .so needed). +# The SDK's DeckLinkAPIDispatch.cpp handles the dynamic loading. + +set_target_properties(decklink-bridge PROPERTIES + INSTALL_RPATH "/usr/local/lib" + BUILD_WITH_INSTALL_RPATH TRUE +) + +install(TARGETS decklink-bridge DESTINATION bin) diff --git a/services/capture/decklink-bridge/main.cpp b/services/capture/decklink-bridge/main.cpp new file mode 100644 index 0000000..22c0646 --- /dev/null +++ b/services/capture/decklink-bridge/main.cpp @@ -0,0 +1,542 @@ +/** + * decklink-bridge/main.cpp + * + * Blackmagic DeckLink SDI shared multi-device bridge daemon. + * + * Opens one or more DeckLink devices and for each device: + * - Auto-detects the incoming signal format + * - Registers a framecache slot via HTTP API + * - Writes raw UYVY422 (bmdFormat8BitYUV) video frames into the shm ring + * - Writes PCM s16le audio to a named FIFO (audio-in-shm is roadmap) + * + * Slot ID format: "decklink--" + * node_id comes from NODE_ID env var (set by node-agent), falls back to hostname. + * + * Usage: + * decklink-bridge --devices # device indices, e.g. "0,1" + * decklink-bridge --device # single device compat alias + * [--fc-url http://framecache:7435] + * [--audio-pipe-dir /dev/shm/decklink] + * [--signal-timeout ] + * + * For each device that acquires signal, emits one JSON line to stderr: + * {"device":N,"width":W,"height":H,"fps_num":N,"fps_den":D, + * "interlaced":false,"pix_fmt":"uyvy422", + * "audio_channels":2,"audio_rate":48000, + * "slot_id":"decklink--"} + * + * Compile with -DLEGACY_FIFO=1 to fall back to writing a raw video FIFO + * instead of the framecache shm path. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "DeckLinkAPI.h" +#include "DeckLinkAPIDispatch.cpp" + +#ifndef LEGACY_FIFO +extern "C" { +# include "../deltacast-bridge/fc_writer.h" +} +#endif + +#ifndef F_SETPIPE_SZ +# define F_SETPIPE_SZ 1031 +#endif + +#define FC_URL_DEFAULT "http://localhost:7435" +#define AUDIO_PIPE_DIR "/dev/shm/decklink" +#define MAX_DEVICES 8 + +/* ── Global shutdown flag ──────────────────────────────────────────── */ +static std::atomic g_stop{0}; +static void on_signal(int) { g_stop.store(1); } + +/* ── Helpers ───────────────────────────────────────────────────────── */ +static uint64_t now_us() { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL; +} + +static int write_all(int fd, const void *buf, size_t len) { + const uint8_t *p = static_cast(buf); + size_t off = 0; + int flags = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, flags | O_NONBLOCK); + while (off < len) { + ssize_t n = write(fd, p + off, len - off); + if (n > 0) { off += (size_t)n; continue; } + if (n < 0 && errno == EINTR) continue; + if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + struct timespec ts{0, 1000000L}; + nanosleep(&ts, nullptr); + continue; + } + fcntl(fd, F_SETFL, flags); + return -1; + } + fcntl(fd, F_SETFL, flags); + return 0; +} + +/* ── Per-device state ──────────────────────────────────────────────── */ +struct DeviceState { + int device_idx = 0; + IDeckLink *decklink = nullptr; + IDeckLinkInput *input = nullptr; + + /* Signal properties (filled on first frame or format-change) */ + int width = 0; + int height = 0; + int fps_num = 0; + int fps_den = 1; + bool interlaced = false; + bool signal_reported = false; + + std::string slot_id; + std::string fc_url; + std::string audio_fifo; + +#ifndef LEGACY_FIFO + fc_writer_t *fc_writer = nullptr; +#else + int video_fifo_fd = -1; + std::string video_fifo; +#endif + + /* Audio FIFO fd — opened once, reopened on EPIPE */ + int audio_fd = -1; + pthread_t audio_tid{}; + std::atomic audio_stop{0}; + + uint64_t frame_seq = 0; +}; + +/* ── Audio thread ──────────────────────────────────────────────────── */ +/* DeckLink audio arrives via VideoInputFrameArrived callback, not a + * separate stream. We write it from the callback directly (see below). + * This thread exists only to keep the FIFO open and provide silence + * when no frames are arriving (e.g. signal lost). */ +static void *audio_silence_thread(void *arg) { + DeviceState *ds = static_cast(arg); + + const int RATE = 48000; + const int CH = 2; + const int FPS = ds->fps_num > 0 ? ds->fps_num : 30; + const int FPS_DEN = ds->fps_den > 0 ? ds->fps_den : 1; + long samples = ((long)RATE * FPS_DEN + FPS / 2) / FPS; + size_t tick = (size_t)samples * (size_t)CH * 2; /* s16le */ + std::vector silence(tick, 0); + + while (!g_stop.load() && !ds->audio_stop.load()) { + int fd = open(ds->audio_fifo.c_str(), O_WRONLY); + if (fd < 0) { + struct timespec ts{0, 200000000L}; + nanosleep(&ts, nullptr); + continue; + } + fcntl(fd, F_SETPIPE_SZ, 1024 * 1024); + ds->audio_fd = fd; + + long frame_ns = (long)(1000000000.0 * (double)FPS_DEN / (double)FPS); + struct timespec next; + clock_gettime(CLOCK_MONOTONIC, &next); + + while (!g_stop.load() && !ds->audio_stop.load()) { + /* Only write silence if no real audio arrived recently. + * Real audio is written by VideoInputFrameArrived directly. */ + if (write_all(ds->audio_fd, silence.data(), tick) < 0) { + fprintf(stderr, "[audio:%d] EPIPE — reopening\n", ds->device_idx); + break; + } + next.tv_nsec += frame_ns; + while (next.tv_nsec >= 1000000000L) { next.tv_nsec -= 1000000000L; next.tv_sec++; } + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + if (next.tv_sec > now.tv_sec || + (next.tv_sec == now.tv_sec && next.tv_nsec > now.tv_nsec)) + clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, nullptr); + else + next = now; + } + ds->audio_fd = -1; + close(fd); + } + return nullptr; +} + +/* ── IDeckLinkInputCallback implementation ─────────────────────────── */ +class CaptureCallback : public IDeckLinkInputCallback { +public: + explicit CaptureCallback(DeviceState *ds) : m_ds(ds), m_refcount(1) {} + + /* IUnknown */ + HRESULT QueryInterface(REFIID, void **) override { return E_NOINTERFACE; } + ULONG AddRef() override { return ++m_refcount; } + ULONG Release() override { + ULONG r = --m_refcount; + if (r == 0) delete this; + return r; + } + + /* IDeckLinkInputCallback */ + HRESULT VideoInputFormatChanged( + BMDVideoInputFormatChangedEvents events, + IDeckLinkDisplayMode *newMode, + BMDDetectedVideoInputFormatFlags detectedFlags) override + { + /* Re-enable input with new mode — required for auto-detect to work */ + m_ds->input->PauseStreams(); + + BMDDisplayMode mode = newMode->GetDisplayMode(); + + /* Detect interlaced */ + BMDFieldDominance fd = newMode->GetFieldDominance(); + m_ds->interlaced = (fd == bmdUpperFieldFirst || fd == bmdLowerFieldFirst); + + /* Get width/height */ + m_ds->width = (int)newMode->GetWidth(); + m_ds->height = (int)newMode->GetHeight(); + + /* Get frame rate */ + BMDTimeValue frameDuration; BMDTimeScale timeScale; + newMode->GetFrameRate(&frameDuration, &timeScale); + m_ds->fps_num = (int)timeScale; + m_ds->fps_den = (int)frameDuration; + + m_ds->input->EnableVideoInput(mode, bmdFormat8BitYUV, + bmdVideoInputEnableFormatDetection); + m_ds->input->FlushStreams(); + m_ds->input->StartStreams(); + + fprintf(stderr, "[decklink:%d] format changed: %dx%d %.4ffps %s\n", + m_ds->device_idx, + m_ds->width, m_ds->height, + m_ds->fps_den ? (double)m_ds->fps_num / m_ds->fps_den : 0.0, + m_ds->interlaced ? "interlaced" : "progressive"); + + /* Re-open framecache slot with new format */ + this->reopen_slot(); + return S_OK; + } + + HRESULT VideoInputFrameArrived( + IDeckLinkVideoInputFrame *videoFrame, + IDeckLinkAudioInputPacket *audioPacket) override + { + if (g_stop.load()) return S_OK; + if (!videoFrame) return S_OK; + + /* Detect format on first frame if format-change hasn't fired */ + if (!m_ds->signal_reported) { + m_ds->width = (int)videoFrame->GetWidth(); + m_ds->height = (int)videoFrame->GetHeight(); + BMDTimeValue fd; BMDTimeScale ts; + videoFrame->GetStreamTime(&fd, nullptr, 1000000); /* unused — just open slot */ + /* Use stored fps from format change, or detect from row bytes */ + if (m_ds->fps_num == 0) { + m_ds->fps_num = 30000; + m_ds->fps_den = 1001; + } + this->reopen_slot(); + m_ds->signal_reported = true; + } + + /* ── Write video frame ──────────────────────────────────────── */ + void *bytes = nullptr; + videoFrame->GetBytes(&bytes); + uint32_t sz = (uint32_t)(videoFrame->GetRowBytes() * videoFrame->GetHeight()); + + uint32_t expected = (uint32_t)m_ds->width * (uint32_t)m_ds->height * 2; + if (sz != expected) { + fprintf(stderr, "[decklink:%d] WARN: frame sz=%u != expected %u — skipping\n", + m_ds->device_idx, sz, expected); + return S_OK; + } + + uint64_t pts_us = 0; + if (m_ds->fps_num > 0) { + pts_us = m_ds->frame_seq * 1000000ULL + * (uint64_t)m_ds->fps_den + / (uint64_t)m_ds->fps_num; + } + +#ifndef LEGACY_FIFO + if (m_ds->fc_writer) { + fc_writer_write(m_ds->fc_writer, + static_cast(bytes), sz, pts_us); + } +#else + if (m_ds->video_fifo_fd >= 0) { + if (write_all(m_ds->video_fifo_fd, + static_cast(bytes), sz) < 0) { + fprintf(stderr, "[decklink:%d] video FIFO EPIPE\n", m_ds->device_idx); + close(m_ds->video_fifo_fd); + m_ds->video_fifo_fd = open(m_ds->video_fifo.c_str(), O_WRONLY | O_NONBLOCK); + if (m_ds->video_fifo_fd >= 0) + fcntl(m_ds->video_fifo_fd, F_SETPIPE_SZ, 64 * 1024 * 1024); + } + } +#endif + + m_ds->frame_seq++; + + /* ── Write audio ────────────────────────────────────────────── */ + if (audioPacket && m_ds->audio_fd >= 0) { + void *abytes = nullptr; + audioPacket->GetBytes(&abytes); + uint32_t sample_count = (uint32_t)audioPacket->GetSampleFrameCount(); + uint32_t audio_sz = sample_count * 2 /* ch */ * 2 /* s16le bytes */; + if (abytes && audio_sz > 0) { + /* Non-fatal if pipe is full — silence thread provides fallback */ + write_all(m_ds->audio_fd, + static_cast(abytes), audio_sz); + } + } + + /* Emit signal JSON once per device on first frame */ + if (m_ds->frame_seq == 1) { + fprintf(stderr, + "{\"device\":%d,\"width\":%d,\"height\":%d," + "\"fps_num\":%d,\"fps_den\":%d," + "\"interlaced\":%s," + "\"pix_fmt\":\"uyvy422\"," + "\"audio_channels\":2,\"audio_rate\":48000," + "\"slot_id\":\"%s\"}\n", + m_ds->device_idx, + m_ds->width, m_ds->height, + m_ds->fps_num, m_ds->fps_den, + m_ds->interlaced ? "true" : "false", + m_ds->slot_id.c_str()); + fflush(stderr); + } + + return S_OK; + } + +private: + DeviceState *m_ds; + std::atomic m_refcount; + + void reopen_slot() { +#ifndef LEGACY_FIFO + if (m_ds->fc_writer) { + fc_writer_close(m_ds->fc_writer); + m_ds->fc_writer = nullptr; + } + if (m_ds->width > 0 && m_ds->height > 0 && m_ds->fps_num > 0) { + m_ds->fc_writer = fc_writer_open( + m_ds->fc_url.c_str(), + m_ds->slot_id.c_str(), + (uint32_t)m_ds->width, (uint32_t)m_ds->height, + (uint32_t)m_ds->fps_num, (uint32_t)m_ds->fps_den); + if (!m_ds->fc_writer) { + fprintf(stderr, "[decklink:%d] framecache unavailable\n", + m_ds->device_idx); + } + } +#endif + } +}; + +/* ── Parse comma-separated device list ────────────────────────────── */ +static std::vector parse_devices(const char *csv) { + std::vector out; + char buf[256]; + strncpy(buf, csv, sizeof buf - 1); + char *tok = strtok(buf, ","); + while (tok) { out.push_back(atoi(tok)); tok = strtok(nullptr, ","); } + return out; +} + +/* ── Main ──────────────────────────────────────────────────────────── */ +int main(int argc, char *argv[]) { + std::vector device_indices; + int sig_timeout = 30; + const char *fc_url = getenv("FC_URL") ? getenv("FC_URL") : FC_URL_DEFAULT; + const char *audio_dir = AUDIO_PIPE_DIR; + + const char *node_id = getenv("NODE_ID"); + char hostname[256] = "local"; + if (!node_id) { gethostname(hostname, sizeof hostname); node_id = hostname; } + + for (int i = 1; i < argc; i++) { + if (!strcmp(argv[i], "--devices") && i+1 < argc) + device_indices = parse_devices(argv[++i]); + else if (!strcmp(argv[i], "--device") && i+1 < argc) + device_indices.push_back(atoi(argv[++i])); + else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) + fc_url = argv[++i]; + else if (!strcmp(argv[i], "--audio-pipe-dir") && i+1 < argc) + audio_dir = argv[++i]; + else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc) + sig_timeout = atoi(argv[++i]); + } + + if (device_indices.empty()) { + fprintf(stderr, "{\"error\":\"no devices specified — use --devices 0,1 or --device 0\"}\n"); + return 1; + } + + signal(SIGINT, on_signal); + signal(SIGTERM, on_signal); + signal(SIGPIPE, SIG_IGN); + + /* Ensure audio pipe dir exists */ + mkdir(audio_dir, 0755); + + /* ── Enumerate DeckLink devices ─────────────────────────────────── */ + IDeckLinkIterator *iterator = CreateDeckLinkIteratorInstance(); + if (!iterator) { + fprintf(stderr, "{\"error\":\"CreateDeckLinkIteratorInstance failed — DeckLink driver not loaded?\"}\n"); + return 1; + } + + std::vector all_devices; + IDeckLink *dl = nullptr; + while (iterator->Next(&dl) == S_OK) { + all_devices.push_back(dl); + } + iterator->Release(); + + fprintf(stderr, "[decklink] %zu device(s) detected\n", all_devices.size()); + + /* ── Set up per-device state ─────────────────────────────────────── */ + std::vector states(device_indices.size()); + std::vector callbacks(device_indices.size(), nullptr); + + for (size_t i = 0; i < device_indices.size(); i++) { + int idx = device_indices[i]; + if (idx < 0 || (size_t)idx >= all_devices.size()) { + fprintf(stderr, "{\"error\":\"device index %d out of range (%zu detected)\"}\n", + idx, all_devices.size()); + continue; + } + + DeviceState &ds = states[i]; + ds.device_idx = idx; + ds.fc_url = fc_url; + + /* slot_id: "decklink--" */ + char sid[128]; + snprintf(sid, sizeof sid, "decklink-%s-%d", node_id, idx); + ds.slot_id = sid; + + /* Audio FIFO path */ + char apath[256]; + snprintf(apath, sizeof apath, "%s/audio-%d.fifo", audio_dir, idx); + ds.audio_fifo = apath; + mkfifo(apath, 0666); /* ignore EEXIST */ + +#ifdef LEGACY_FIFO + /* Video FIFO (legacy path only) */ + char vpath[256]; + snprintf(vpath, sizeof vpath, "%s/video-%d.fifo", audio_dir, idx); + ds.video_fifo = vpath; + mkfifo(vpath, 0666); + int vfd = open(vpath, O_WRONLY | O_NONBLOCK); + if (vfd >= 0) fcntl(vfd, F_SETPIPE_SZ, 64 * 1024 * 1024); + ds.video_fifo_fd = vfd; +#endif + + IDeckLink *decklink = all_devices[(size_t)idx]; + ds.decklink = decklink; + + /* Get IDeckLinkInput */ + IDeckLinkInput *input = nullptr; + if (decklink->QueryInterface(IID_IDeckLinkInput, + reinterpret_cast(&input)) != S_OK) { + fprintf(stderr, "[decklink:%d] QueryInterface IDeckLinkInput failed\n", idx); + continue; + } + ds.input = input; + + /* Install callback */ + CaptureCallback *cb = new CaptureCallback(&ds); + callbacks[i] = cb; + input->SetCallback(cb); + + /* Enable video with format detection — actual mode set on first + * VideoInputFormatChanged; use 1080i29.97 as a safe starting mode. */ + HRESULT hr = input->EnableVideoInput( + bmdModeHD1080i5994, + bmdFormat8BitYUV, + bmdVideoInputEnableFormatDetection); + if (hr != S_OK) { + fprintf(stderr, "[decklink:%d] EnableVideoInput failed (0x%08x)\n", idx, (unsigned)hr); + continue; + } + + /* Enable audio input — 48kHz stereo s16le */ + input->EnableAudioInput(bmdAudioSampleRate48kHz, + bmdAudioSampleType16bitInteger, 2); + + /* Start silence thread (keeps audio FIFO open) */ + ds.fps_num = 30000; ds.fps_den = 1001; /* default until format detected */ + pthread_create(&ds.audio_tid, nullptr, audio_silence_thread, &ds); + + /* Start capture */ + if (input->StartStreams() != S_OK) { + fprintf(stderr, "[decklink:%d] StartStreams failed\n", idx); + continue; + } + + fprintf(stderr, "[decklink:%d] capture started, waiting for signal...\n", idx); + } + + /* ── Run until shutdown ─────────────────────────────────────────── */ + while (!g_stop.load()) { + struct timespec ts{0, 100000000L}; /* 100ms */ + nanosleep(&ts, nullptr); + } + + fprintf(stderr, "[decklink] shutdown signal received\n"); + + /* ── Cleanup ─────────────────────────────────────────────────────── */ + for (size_t i = 0; i < states.size(); i++) { + DeviceState &ds = states[i]; + + if (ds.input) { + ds.input->StopStreams(); + ds.input->DisableVideoInput(); + ds.input->DisableAudioInput(); + ds.input->SetCallback(nullptr); + } + + ds.audio_stop.store(1); + if (ds.audio_tid) pthread_join(ds.audio_tid, nullptr); + +#ifndef LEGACY_FIFO + if (ds.fc_writer) { + fc_writer_close(ds.fc_writer); + ds.fc_writer = nullptr; + } +#else + if (ds.video_fifo_fd >= 0) close(ds.video_fifo_fd); +#endif + + if (ds.input) { ds.input->Release(); ds.input = nullptr; } + if (callbacks[i]) { callbacks[i]->Release(); callbacks[i] = nullptr; } + } + + for (auto *d : all_devices) d->Release(); + + return 0; +} diff --git a/services/node-agent/index.js b/services/node-agent/index.js index 371525e..dde165c 100644 --- a/services/node-agent/index.js +++ b/services/node-agent/index.js @@ -71,13 +71,96 @@ const DC_BRIDGE_BIN = process.env.DELTACAST_BRIDGE_BIN || 'deltacast-bridge'; const DC_PORTS_CSV = process.env.DELTACAST_PORTS || '0,1,2,3,4,5,6,7'; const DC_BOARD = process.env.DELTACAST_BOARD || '0'; +// Framecache URL — passed to all bridge processes so they can register slots. +// Set FC_URL in .env.worker (default: http://framecache:7435 within the +// wild-dragon-worker Docker network). +const FC_URL = process.env.FC_URL || 'http://framecache:7435'; +// Node identity for framecache slot IDs (e.g. "decklink-zampp3-0"). +// Set NODE_NAME in .env.worker so slot IDs are stable across restarts. +const FC_NODE_ID = process.env.NODE_NAME || process.env.HOSTNAME || 'local'; + let _dcBridge = null; // ChildProcess | null let _dcSidecarCount = 0; // active deltacast sidecars on this node // Map containerId -> sourceType so stop() can decrement the deltacast counter. const _containerSourceType = new Map(); -// port -> fmt JSON from bridge stderr (inject into sidecar env) +// port -> fmt JSON from bridge stderr (inject into sidecar env + slot_id) const _dcPortFmt = new Map(); +// ── DeckLink bridge ─────────────────────────────────────────────────────── +// One decklink-bridge process per node, managing all DeckLink devices. +// Mirrors the deltacast-bridge singleton pattern. +const DL_BRIDGE_BIN = process.env.DECKLINK_BRIDGE_BIN || 'decklink-bridge'; +const DL_AUDIO_DIR = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink'; + +let _dlBridge = null; // ChildProcess | null +let _dlSidecarCount = 0; +// device_idx -> fmt JSON from bridge stderr +const _dlDevFmt = new Map(); + +function _dlBridgeRunning() { + return _dlBridge !== null && _dlBridge.exitCode === null && _dlBridge.signalCode === null; +} + +function startDecklinkBridge(deviceIndices) { + if (_dlBridgeRunning()) return; + + try { require('fs').mkdirSync(DL_AUDIO_DIR, { recursive: true }); } catch (_) {} + + const devCsv = Array.isArray(deviceIndices) ? deviceIndices.join(',') : String(deviceIndices || '0'); + const args = [ + '--devices', devCsv, + '--fc-url', FC_URL, + '--audio-pipe-dir', DL_AUDIO_DIR, + ]; + console.log(`[dl-bridge] launching: ${DL_BRIDGE_BIN} ${args.join(' ')}`); + + const proc = spawn(DL_BRIDGE_BIN, args, { + stdio: ['ignore', 'ignore', 'pipe'], + detached: false, + env: { ...process.env, NODE_ID: FC_NODE_ID, FC_URL }, + }); + + proc.stderr.setEncoding('utf8'); + proc.stderr.on('data', (chunk) => { + for (const line of chunk.split('\n')) { + const t = line.trim(); + if (!t) continue; + if (t.startsWith('{')) { + console.log('[dl-bridge] ' + t); + try { + const f = JSON.parse(t); + if (typeof f.device === 'number') _dlDevFmt.set(f.device, f); + } catch (_) {} + } else { + console.error('[dl-bridge] ' + t); + } + } + }); + + proc.on('error', (err) => { + console.error(`[dl-bridge] spawn error: ${err.message}`); + _dlBridge = null; + }); + proc.on('exit', (code, sig) => { + console.error(`[dl-bridge] exited code=${code} signal=${sig}`); + _dlBridge = null; + }); + + _dlBridge = proc; + console.log(`[dl-bridge] pid=${proc.pid} devices=${devCsv}`); +} + +function stopDecklinkBridge() { + if (!_dlBridgeRunning()) return; + console.log('[dl-bridge] stopping'); + try { _dlBridge.kill('SIGTERM'); } catch (_) {} + const proc = _dlBridge; + setTimeout(() => { + try { if (proc.exitCode === null) proc.kill('SIGKILL'); } catch (_) {} + }, 5000); + _dlBridge = null; +} + function _dcBridgeRunning() { return _dcBridge !== null && _dcBridge.exitCode === null && _dcBridge.signalCode === null; } @@ -122,12 +205,14 @@ function startDeltacastBridge() { '--ports', DC_PORTS_CSV, '--video-pipe-dir', DC_PIPE_DIR, '--audio-pipe-dir', DC_PIPE_DIR, + '--fc-url', FC_URL, ]; console.log(`[dc-bridge] launching: ${DC_BRIDGE_BIN} ${args.join(' ')}`); const proc = spawn(DC_BRIDGE_BIN, args, { stdio: ['ignore', 'ignore', 'pipe'], detached: false, + env: { ...process.env, FC_URL, NODE_ID: FC_NODE_ID }, }); proc.stderr.setEncoding('utf8'); @@ -308,9 +393,12 @@ async function handleSidecarStart(body, res) { HostConfig: hostConfig, }; + // Always inject FC_URL so capture-manager can find the framecache service. + sidecarEnv.push(`FC_URL=${FC_URL}`); + // Deltacast: ensure the shared bridge daemon is running on the HOST before - // starting the sidecar. The sidecar reads FIFOs produced by the bridge; - // it does NOT open the board handle itself (no BufMngr.c:781 race). + // starting the sidecar. The bridge writes frames to the framecache shm ring; + // the sidecar reads via the consumer library (fc_client). if (sourceType === 'deltacast') { _dcSidecarCount++; startDeltacastBridge(); @@ -324,8 +412,36 @@ async function handleSidecarStart(body, res) { sidecarEnv.push(`DELTACAST_VIDEO_SIZE=${_fmt.width}x${_fmt.height}`); sidecarEnv.push(`DELTACAST_FRAMERATE=${_fps}`); sidecarEnv.push(`DELTACAST_INTERLACED=${_fmt.interlaced ? '1' : '0'}`); - console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} interlaced=${_fmt.interlaced}`); + // Pass slot_id so capture-manager knows which framecache slot to read + if (_fmt.slot_id) sidecarEnv.push(`FC_SLOT_ID=${_fmt.slot_id}`); + console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} interlaced=${_fmt.interlaced} slot=${_fmt.slot_id}`); } + // IPC host — sidecar must share /dev/shm with framecache container + hostConfig.IpcMode = 'host'; + } + + // DeckLink: ensure decklink-bridge is running on the HOST. + // Bridge writes to framecache; sidecar reads via fc_client. + if (sourceType === 'sdi' || sourceType === 'blackmagic') { + _dlSidecarCount++; + // Determine which device indices are active on this node + const _bmdDevices = []; + try { + const _bmdDir = '/dev/blackmagic'; + const _bmdEntries = fs.readdirSync(_bmdDir).filter(n => /^(dv|io)\d+$/.test(n)); + _bmdEntries.forEach((_, i) => _bmdDevices.push(i)); + } catch (_) { _bmdDevices.push(0); } + startDecklinkBridge(_bmdDevices); + // Inject fmt if available + const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14); + let _devIdx = NaN; + try { _devIdx = JSON.parse(_srcCfg).device ?? JSON.parse(_srcCfg).index; } catch (_) {} + if (!Number.isFinite(_devIdx)) _devIdx = parseInt((env.find(e => e.startsWith('BMD_DEVICE_INDEX=')) || '=0').split('=')[1]) || 0; + if (Number.isFinite(_devIdx) && _dlDevFmt.has(_devIdx)) { + const _fmt = _dlDevFmt.get(_devIdx); + if (_fmt.slot_id) sidecarEnv.push(`FC_SLOT_ID=${_fmt.slot_id}`); + } + hostConfig.IpcMode = 'host'; } let containerId; @@ -354,12 +470,17 @@ async function handleSidecarStart(body, res) { } if (sourceType === 'deltacast') _containerSourceType.set(containerId, 'deltacast'); + if (sourceType === 'sdi' || sourceType === 'blackmagic') _containerSourceType.set(containerId, 'blackmagic'); jsonResponse(res, 201, { containerId, capturePort }); } catch (err) { if (sourceType === 'deltacast') { _dcSidecarCount--; if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); } } + if (sourceType === 'sdi' || sourceType === 'blackmagic') { + _dlSidecarCount--; + if (_dlSidecarCount <= 0) { _dlSidecarCount = 0; stopDecklinkBridge(); } + } throw err; } } catch (err) { @@ -399,16 +520,21 @@ async function handleSidecarStop(containerId, res) { console.log(`[sidecar-stop] ==== capture logs for ${containerId} ====\n${logs}\n[sidecar-stop] ==== end logs ====`); await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {}); - // Deltacast bridge lifecycle: decrement sidecar count; stop bridge when last. - if (_containerSourceType.get(containerId) === 'deltacast') { - _containerSourceType.delete(containerId); + // Bridge lifecycle: decrement sidecar count; stop bridge when last sidecar stops. + const _srcType = _containerSourceType.get(containerId); + _containerSourceType.delete(containerId); + if (_srcType === 'deltacast') { _dcSidecarCount--; if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); } - } else { - _containerSourceType.delete(containerId); + } else if (_srcType === 'blackmagic') { + _dlSidecarCount--; + if (_dlSidecarCount <= 0) { + _dlSidecarCount = 0; + stopDecklinkBridge(); + } } } catch (err) { console.error(`[sidecar-stop] background cleanup failed for ${containerId}:`, err.message);