feat(framecache): phase 3 — decklink-bridge writes to shm

- services/capture/decklink-bridge/main.cpp: new C++ DeckLink bridge
  - IDeckLinkInputCallback (VideoInputFrameArrived) writes UYVY422
    frames to framecache slot via fc_writer_write()
  - VideoInputFormatChanged reopens slot with new resolution/fps
  - bmdVideoInputEnableFormatDetection: auto-detects signal format
  - bmdFormat8BitYUV (UYVY422) — same pixel format as deltacast-bridge
  - Audio written from callback to named FIFO (same pattern as deltacast)
  - Silence thread keeps audio FIFO open between sessions
  - slot_id: decklink-<NODE_ID>-<device_idx>
  - Format JSON emitted on first frame (includes slot_id)
  - LEGACY_FIFO compile flag mirrors deltacast-bridge
  - --devices csv, --fc-url, --audio-pipe-dir, --signal-timeout args

- services/capture/decklink-bridge/CMakeLists.txt:
  - Reuses fc_writer.c from deltacast-bridge (shared writer module)
  - Links rt + dl + pthread; DeckLink SDK via dlopen at runtime
  - LEGACY_FIFO option

- services/capture/Dockerfile:
  - New decklink-bridge-builder stage (g++ + DeckLink SDK headers)
  - Copies decklink-bridge binary to /usr/local/bin/decklink-bridge

- services/node-agent/index.js:
  - FC_URL + FC_NODE_ID constants (from env vars, passed to all bridges)
  - startDecklinkBridge(deviceIndices) / stopDecklinkBridge() functions
    mirror deltacast bridge lifecycle management
  - deltacast startDeltacastBridge: adds --fc-url arg + NODE_ID env
  - sidecar start: injects FC_URL into all sidecar envs; sets IpcMode=host
    for deltacast + blackmagic sidecars; starts decklink-bridge for sdi/
    blackmagic source types; injects FC_SLOT_ID from fmt JSON
  - sidecar stop: stopDecklinkBridge() when last blackmagic sidecar stops
This commit is contained in:
Wild Dragon Dev 2026-06-03 15:25:25 +00:00
parent 0d479d043d
commit b2c63de2fa
4 changed files with 749 additions and 9 deletions

View file

@ -15,6 +15,24 @@ RUN cmake -S /bridge -B /bridge/build \
-DSDK_ROOT=/sdk \ -DSDK_ROOT=/sdk \
&& cmake --build /bridge/build -j$(nproc) && cmake --build /bridge/build -j$(nproc)
# ── Stage 1c: Build decklink-bridge binary ───────────────────────────────
FROM debian:bookworm AS decklink-bridge-builder
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential cmake ca-certificates g++ \
&& rm -rf /var/lib/apt/lists/*
# DeckLink SDK headers (for IDeckLinkInput etc.)
COPY sdk/ /decklink-sdk/
# Shared fc_writer module from deltacast-bridge
COPY deltacast-bridge/fc_writer.h /fc_writer/fc_writer.h
COPY deltacast-bridge/fc_writer.c /fc_writer/fc_writer.c
# decklink-bridge source
COPY decklink-bridge/ /decklink-bridge/
RUN cmake -S /decklink-bridge -B /decklink-bridge/build \
-DCMAKE_BUILD_TYPE=Release \
-DDECKLINK_SDK_DIR=/decklink-sdk \
-DDELTACAST_BRIDGE_DIR=/fc_writer \
&& cmake --build /decklink-bridge/build -j$(nproc)
# ── Stage 2: Build FFmpeg with DeckLink + NVENC (HEVC/H264) support ───────── # ── Stage 2: Build FFmpeg with DeckLink + NVENC (HEVC/H264) support ─────────
# All-Intra HEVC NVENC is the master codec for growing-file ingest (see # All-Intra HEVC NVENC is the master codec for growing-file ingest (see
# docs/design/2026-05-29-all-intra-hevc-ingest.md). This stage gets the # docs/design/2026-05-29-all-intra-hevc-ingest.md). This stage gets the
@ -151,6 +169,9 @@ RUN raw2bmx -h >/dev/null 2>&1 && echo 'raw2bmx runtime OK'
# Deltacast bridge binary + SDK runtime libs # Deltacast bridge binary + SDK runtime libs
COPY --from=bridge-builder /bridge/build/deltacast-capture /usr/local/bin/deltacast-capture COPY --from=bridge-builder /bridge/build/deltacast-capture /usr/local/bin/deltacast-capture
# DeckLink bridge binary (no SDK runtime .so — uses dlopen at runtime)
COPY --from=decklink-bridge-builder /decklink-bridge/build/decklink-bridge /usr/local/bin/decklink-bridge
COPY --from=sdk-extractor /sdk/lib/libvideomasterhd.so.6.34.1 /usr/local/lib/deltacast/ COPY --from=sdk-extractor /sdk/lib/libvideomasterhd.so.6.34.1 /usr/local/lib/deltacast/
COPY --from=sdk-extractor /sdk/lib/libvideomasterhd_audio.so.6.34.1 /usr/local/lib/deltacast/ COPY --from=sdk-extractor /sdk/lib/libvideomasterhd_audio.so.6.34.1 /usr/local/lib/deltacast/
RUN ln -sf libvideomasterhd.so.6.34.1 /usr/local/lib/deltacast/libvideomasterhd.so.6 \ RUN ln -sf libvideomasterhd.so.6.34.1 /usr/local/lib/deltacast/libvideomasterhd.so.6 \

View file

@ -0,0 +1,51 @@
cmake_minimum_required(VERSION 3.16)
project(decklink-bridge CXX C)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_C_STANDARD 11)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -O2")
# Path to DeckLink SDK headers (services/capture/sdk/)
set(DECKLINK_SDK_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../sdk"
CACHE PATH "Path to Blackmagic DeckLink SDK headers")
# Path to Deltacast bridge (for fc_writer.h/c — shared writer module)
set(DELTACAST_BRIDGE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../deltacast-bridge"
CACHE PATH "Path to deltacast-bridge (contains fc_writer.h/c)")
# Legacy FIFO fallback option (mirrors deltacast-bridge option)
option(LEGACY_FIFO "Use named FIFOs instead of framecache shm" OFF)
# ── decklink-bridge executable ────────────────────────────────────────
add_executable(decklink-bridge
main.cpp
${DELTACAST_BRIDGE_DIR}/fc_writer.c # shared framecache writer
)
if(LEGACY_FIFO)
target_compile_definitions(decklink-bridge PRIVATE LEGACY_FIFO=1)
message(STATUS "decklink-bridge: LEGACY_FIFO mode enabled")
else()
message(STATUS "decklink-bridge: framecache shm mode enabled")
endif()
target_include_directories(decklink-bridge PRIVATE
${DECKLINK_SDK_DIR}
${DELTACAST_BRIDGE_DIR} # fc_writer.h
)
target_link_libraries(decklink-bridge PRIVATE
pthread
rt # shm_open, sem_open
dl # dlopen (used by DeckLinkAPIDispatch.cpp on Linux)
)
# DeckLink driver is linked at runtime via dlopen (no link-time .so needed).
# The SDK's DeckLinkAPIDispatch.cpp handles the dynamic loading.
set_target_properties(decklink-bridge PROPERTIES
INSTALL_RPATH "/usr/local/lib"
BUILD_WITH_INSTALL_RPATH TRUE
)
install(TARGETS decklink-bridge DESTINATION bin)

View file

@ -0,0 +1,542 @@
/**
* decklink-bridge/main.cpp
*
* Blackmagic DeckLink SDI shared multi-device bridge daemon.
*
* Opens one or more DeckLink devices and for each device:
* - Auto-detects the incoming signal format
* - Registers a framecache slot via HTTP API
* - Writes raw UYVY422 (bmdFormat8BitYUV) video frames into the shm ring
* - Writes PCM s16le audio to a named FIFO (audio-in-shm is roadmap)
*
* Slot ID format: "decklink-<node_id>-<device_index>"
* node_id comes from NODE_ID env var (set by node-agent), falls back to hostname.
*
* Usage:
* decklink-bridge --devices <csv> # device indices, e.g. "0,1"
* decklink-bridge --device <N> # single device compat alias
* [--fc-url http://framecache:7435]
* [--audio-pipe-dir /dev/shm/decklink]
* [--signal-timeout <sec>]
*
* For each device that acquires signal, emits one JSON line to stderr:
* {"device":N,"width":W,"height":H,"fps_num":N,"fps_den":D,
* "interlaced":false,"pix_fmt":"uyvy422",
* "audio_channels":2,"audio_rate":48000,
* "slot_id":"decklink-<node>-<N>"}
*
* Compile with -DLEGACY_FIFO=1 to fall back to writing a raw video FIFO
* instead of the framecache shm path.
*/
#include <algorithm>
#include <atomic>
#include <cerrno>
#include <cmath>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <string>
#include <vector>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
#include "DeckLinkAPI.h"
#include "DeckLinkAPIDispatch.cpp"
#ifndef LEGACY_FIFO
extern "C" {
# include "../deltacast-bridge/fc_writer.h"
}
#endif
#ifndef F_SETPIPE_SZ
# define F_SETPIPE_SZ 1031
#endif
#define FC_URL_DEFAULT "http://localhost:7435"
#define AUDIO_PIPE_DIR "/dev/shm/decklink"
#define MAX_DEVICES 8
/* ── Global shutdown flag ──────────────────────────────────────────── */
static std::atomic<int> g_stop{0};
static void on_signal(int) { g_stop.store(1); }
/* ── Helpers ───────────────────────────────────────────────────────── */
static uint64_t now_us() {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL;
}
static int write_all(int fd, const void *buf, size_t len) {
const uint8_t *p = static_cast<const uint8_t *>(buf);
size_t off = 0;
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
while (off < len) {
ssize_t n = write(fd, p + off, len - off);
if (n > 0) { off += (size_t)n; continue; }
if (n < 0 && errno == EINTR) continue;
if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
struct timespec ts{0, 1000000L};
nanosleep(&ts, nullptr);
continue;
}
fcntl(fd, F_SETFL, flags);
return -1;
}
fcntl(fd, F_SETFL, flags);
return 0;
}
/* ── Per-device state ──────────────────────────────────────────────── */
struct DeviceState {
int device_idx = 0;
IDeckLink *decklink = nullptr;
IDeckLinkInput *input = nullptr;
/* Signal properties (filled on first frame or format-change) */
int width = 0;
int height = 0;
int fps_num = 0;
int fps_den = 1;
bool interlaced = false;
bool signal_reported = false;
std::string slot_id;
std::string fc_url;
std::string audio_fifo;
#ifndef LEGACY_FIFO
fc_writer_t *fc_writer = nullptr;
#else
int video_fifo_fd = -1;
std::string video_fifo;
#endif
/* Audio FIFO fd — opened once, reopened on EPIPE */
int audio_fd = -1;
pthread_t audio_tid{};
std::atomic<int> audio_stop{0};
uint64_t frame_seq = 0;
};
/* ── Audio thread ──────────────────────────────────────────────────── */
/* DeckLink audio arrives via VideoInputFrameArrived callback, not a
* separate stream. We write it from the callback directly (see below).
* This thread exists only to keep the FIFO open and provide silence
* when no frames are arriving (e.g. signal lost). */
static void *audio_silence_thread(void *arg) {
DeviceState *ds = static_cast<DeviceState *>(arg);
const int RATE = 48000;
const int CH = 2;
const int FPS = ds->fps_num > 0 ? ds->fps_num : 30;
const int FPS_DEN = ds->fps_den > 0 ? ds->fps_den : 1;
long samples = ((long)RATE * FPS_DEN + FPS / 2) / FPS;
size_t tick = (size_t)samples * (size_t)CH * 2; /* s16le */
std::vector<uint8_t> silence(tick, 0);
while (!g_stop.load() && !ds->audio_stop.load()) {
int fd = open(ds->audio_fifo.c_str(), O_WRONLY);
if (fd < 0) {
struct timespec ts{0, 200000000L};
nanosleep(&ts, nullptr);
continue;
}
fcntl(fd, F_SETPIPE_SZ, 1024 * 1024);
ds->audio_fd = fd;
long frame_ns = (long)(1000000000.0 * (double)FPS_DEN / (double)FPS);
struct timespec next;
clock_gettime(CLOCK_MONOTONIC, &next);
while (!g_stop.load() && !ds->audio_stop.load()) {
/* Only write silence if no real audio arrived recently.
* Real audio is written by VideoInputFrameArrived directly. */
if (write_all(ds->audio_fd, silence.data(), tick) < 0) {
fprintf(stderr, "[audio:%d] EPIPE — reopening\n", ds->device_idx);
break;
}
next.tv_nsec += frame_ns;
while (next.tv_nsec >= 1000000000L) { next.tv_nsec -= 1000000000L; next.tv_sec++; }
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
if (next.tv_sec > now.tv_sec ||
(next.tv_sec == now.tv_sec && next.tv_nsec > now.tv_nsec))
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, nullptr);
else
next = now;
}
ds->audio_fd = -1;
close(fd);
}
return nullptr;
}
/* ── IDeckLinkInputCallback implementation ─────────────────────────── */
class CaptureCallback : public IDeckLinkInputCallback {
public:
explicit CaptureCallback(DeviceState *ds) : m_ds(ds), m_refcount(1) {}
/* IUnknown */
HRESULT QueryInterface(REFIID, void **) override { return E_NOINTERFACE; }
ULONG AddRef() override { return ++m_refcount; }
ULONG Release() override {
ULONG r = --m_refcount;
if (r == 0) delete this;
return r;
}
/* IDeckLinkInputCallback */
HRESULT VideoInputFormatChanged(
BMDVideoInputFormatChangedEvents events,
IDeckLinkDisplayMode *newMode,
BMDDetectedVideoInputFormatFlags detectedFlags) override
{
/* Re-enable input with new mode — required for auto-detect to work */
m_ds->input->PauseStreams();
BMDDisplayMode mode = newMode->GetDisplayMode();
/* Detect interlaced */
BMDFieldDominance fd = newMode->GetFieldDominance();
m_ds->interlaced = (fd == bmdUpperFieldFirst || fd == bmdLowerFieldFirst);
/* Get width/height */
m_ds->width = (int)newMode->GetWidth();
m_ds->height = (int)newMode->GetHeight();
/* Get frame rate */
BMDTimeValue frameDuration; BMDTimeScale timeScale;
newMode->GetFrameRate(&frameDuration, &timeScale);
m_ds->fps_num = (int)timeScale;
m_ds->fps_den = (int)frameDuration;
m_ds->input->EnableVideoInput(mode, bmdFormat8BitYUV,
bmdVideoInputEnableFormatDetection);
m_ds->input->FlushStreams();
m_ds->input->StartStreams();
fprintf(stderr, "[decklink:%d] format changed: %dx%d %.4ffps %s\n",
m_ds->device_idx,
m_ds->width, m_ds->height,
m_ds->fps_den ? (double)m_ds->fps_num / m_ds->fps_den : 0.0,
m_ds->interlaced ? "interlaced" : "progressive");
/* Re-open framecache slot with new format */
this->reopen_slot();
return S_OK;
}
HRESULT VideoInputFrameArrived(
IDeckLinkVideoInputFrame *videoFrame,
IDeckLinkAudioInputPacket *audioPacket) override
{
if (g_stop.load()) return S_OK;
if (!videoFrame) return S_OK;
/* Detect format on first frame if format-change hasn't fired */
if (!m_ds->signal_reported) {
m_ds->width = (int)videoFrame->GetWidth();
m_ds->height = (int)videoFrame->GetHeight();
BMDTimeValue fd; BMDTimeScale ts;
videoFrame->GetStreamTime(&fd, nullptr, 1000000); /* unused — just open slot */
/* Use stored fps from format change, or detect from row bytes */
if (m_ds->fps_num == 0) {
m_ds->fps_num = 30000;
m_ds->fps_den = 1001;
}
this->reopen_slot();
m_ds->signal_reported = true;
}
/* ── Write video frame ──────────────────────────────────────── */
void *bytes = nullptr;
videoFrame->GetBytes(&bytes);
uint32_t sz = (uint32_t)(videoFrame->GetRowBytes() * videoFrame->GetHeight());
uint32_t expected = (uint32_t)m_ds->width * (uint32_t)m_ds->height * 2;
if (sz != expected) {
fprintf(stderr, "[decklink:%d] WARN: frame sz=%u != expected %u — skipping\n",
m_ds->device_idx, sz, expected);
return S_OK;
}
uint64_t pts_us = 0;
if (m_ds->fps_num > 0) {
pts_us = m_ds->frame_seq * 1000000ULL
* (uint64_t)m_ds->fps_den
/ (uint64_t)m_ds->fps_num;
}
#ifndef LEGACY_FIFO
if (m_ds->fc_writer) {
fc_writer_write(m_ds->fc_writer,
static_cast<const uint8_t *>(bytes), sz, pts_us);
}
#else
if (m_ds->video_fifo_fd >= 0) {
if (write_all(m_ds->video_fifo_fd,
static_cast<const uint8_t *>(bytes), sz) < 0) {
fprintf(stderr, "[decklink:%d] video FIFO EPIPE\n", m_ds->device_idx);
close(m_ds->video_fifo_fd);
m_ds->video_fifo_fd = open(m_ds->video_fifo.c_str(), O_WRONLY | O_NONBLOCK);
if (m_ds->video_fifo_fd >= 0)
fcntl(m_ds->video_fifo_fd, F_SETPIPE_SZ, 64 * 1024 * 1024);
}
}
#endif
m_ds->frame_seq++;
/* ── Write audio ────────────────────────────────────────────── */
if (audioPacket && m_ds->audio_fd >= 0) {
void *abytes = nullptr;
audioPacket->GetBytes(&abytes);
uint32_t sample_count = (uint32_t)audioPacket->GetSampleFrameCount();
uint32_t audio_sz = sample_count * 2 /* ch */ * 2 /* s16le bytes */;
if (abytes && audio_sz > 0) {
/* Non-fatal if pipe is full — silence thread provides fallback */
write_all(m_ds->audio_fd,
static_cast<const uint8_t *>(abytes), audio_sz);
}
}
/* Emit signal JSON once per device on first frame */
if (m_ds->frame_seq == 1) {
fprintf(stderr,
"{\"device\":%d,\"width\":%d,\"height\":%d,"
"\"fps_num\":%d,\"fps_den\":%d,"
"\"interlaced\":%s,"
"\"pix_fmt\":\"uyvy422\","
"\"audio_channels\":2,\"audio_rate\":48000,"
"\"slot_id\":\"%s\"}\n",
m_ds->device_idx,
m_ds->width, m_ds->height,
m_ds->fps_num, m_ds->fps_den,
m_ds->interlaced ? "true" : "false",
m_ds->slot_id.c_str());
fflush(stderr);
}
return S_OK;
}
private:
DeviceState *m_ds;
std::atomic<ULONG> m_refcount;
void reopen_slot() {
#ifndef LEGACY_FIFO
if (m_ds->fc_writer) {
fc_writer_close(m_ds->fc_writer);
m_ds->fc_writer = nullptr;
}
if (m_ds->width > 0 && m_ds->height > 0 && m_ds->fps_num > 0) {
m_ds->fc_writer = fc_writer_open(
m_ds->fc_url.c_str(),
m_ds->slot_id.c_str(),
(uint32_t)m_ds->width, (uint32_t)m_ds->height,
(uint32_t)m_ds->fps_num, (uint32_t)m_ds->fps_den);
if (!m_ds->fc_writer) {
fprintf(stderr, "[decklink:%d] framecache unavailable\n",
m_ds->device_idx);
}
}
#endif
}
};
/* ── Parse comma-separated device list ────────────────────────────── */
static std::vector<int> parse_devices(const char *csv) {
std::vector<int> out;
char buf[256];
strncpy(buf, csv, sizeof buf - 1);
char *tok = strtok(buf, ",");
while (tok) { out.push_back(atoi(tok)); tok = strtok(nullptr, ","); }
return out;
}
/* ── Main ──────────────────────────────────────────────────────────── */
int main(int argc, char *argv[]) {
std::vector<int> device_indices;
int sig_timeout = 30;
const char *fc_url = getenv("FC_URL") ? getenv("FC_URL") : FC_URL_DEFAULT;
const char *audio_dir = AUDIO_PIPE_DIR;
const char *node_id = getenv("NODE_ID");
char hostname[256] = "local";
if (!node_id) { gethostname(hostname, sizeof hostname); node_id = hostname; }
for (int i = 1; i < argc; i++) {
if (!strcmp(argv[i], "--devices") && i+1 < argc)
device_indices = parse_devices(argv[++i]);
else if (!strcmp(argv[i], "--device") && i+1 < argc)
device_indices.push_back(atoi(argv[++i]));
else if (!strcmp(argv[i], "--fc-url") && i+1 < argc)
fc_url = argv[++i];
else if (!strcmp(argv[i], "--audio-pipe-dir") && i+1 < argc)
audio_dir = argv[++i];
else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc)
sig_timeout = atoi(argv[++i]);
}
if (device_indices.empty()) {
fprintf(stderr, "{\"error\":\"no devices specified — use --devices 0,1 or --device 0\"}\n");
return 1;
}
signal(SIGINT, on_signal);
signal(SIGTERM, on_signal);
signal(SIGPIPE, SIG_IGN);
/* Ensure audio pipe dir exists */
mkdir(audio_dir, 0755);
/* ── Enumerate DeckLink devices ─────────────────────────────────── */
IDeckLinkIterator *iterator = CreateDeckLinkIteratorInstance();
if (!iterator) {
fprintf(stderr, "{\"error\":\"CreateDeckLinkIteratorInstance failed — DeckLink driver not loaded?\"}\n");
return 1;
}
std::vector<IDeckLink *> all_devices;
IDeckLink *dl = nullptr;
while (iterator->Next(&dl) == S_OK) {
all_devices.push_back(dl);
}
iterator->Release();
fprintf(stderr, "[decklink] %zu device(s) detected\n", all_devices.size());
/* ── Set up per-device state ─────────────────────────────────────── */
std::vector<DeviceState> states(device_indices.size());
std::vector<CaptureCallback *> callbacks(device_indices.size(), nullptr);
for (size_t i = 0; i < device_indices.size(); i++) {
int idx = device_indices[i];
if (idx < 0 || (size_t)idx >= all_devices.size()) {
fprintf(stderr, "{\"error\":\"device index %d out of range (%zu detected)\"}\n",
idx, all_devices.size());
continue;
}
DeviceState &ds = states[i];
ds.device_idx = idx;
ds.fc_url = fc_url;
/* slot_id: "decklink-<node_id>-<device_idx>" */
char sid[128];
snprintf(sid, sizeof sid, "decklink-%s-%d", node_id, idx);
ds.slot_id = sid;
/* Audio FIFO path */
char apath[256];
snprintf(apath, sizeof apath, "%s/audio-%d.fifo", audio_dir, idx);
ds.audio_fifo = apath;
mkfifo(apath, 0666); /* ignore EEXIST */
#ifdef LEGACY_FIFO
/* Video FIFO (legacy path only) */
char vpath[256];
snprintf(vpath, sizeof vpath, "%s/video-%d.fifo", audio_dir, idx);
ds.video_fifo = vpath;
mkfifo(vpath, 0666);
int vfd = open(vpath, O_WRONLY | O_NONBLOCK);
if (vfd >= 0) fcntl(vfd, F_SETPIPE_SZ, 64 * 1024 * 1024);
ds.video_fifo_fd = vfd;
#endif
IDeckLink *decklink = all_devices[(size_t)idx];
ds.decklink = decklink;
/* Get IDeckLinkInput */
IDeckLinkInput *input = nullptr;
if (decklink->QueryInterface(IID_IDeckLinkInput,
reinterpret_cast<void **>(&input)) != S_OK) {
fprintf(stderr, "[decklink:%d] QueryInterface IDeckLinkInput failed\n", idx);
continue;
}
ds.input = input;
/* Install callback */
CaptureCallback *cb = new CaptureCallback(&ds);
callbacks[i] = cb;
input->SetCallback(cb);
/* Enable video with format detection — actual mode set on first
* VideoInputFormatChanged; use 1080i29.97 as a safe starting mode. */
HRESULT hr = input->EnableVideoInput(
bmdModeHD1080i5994,
bmdFormat8BitYUV,
bmdVideoInputEnableFormatDetection);
if (hr != S_OK) {
fprintf(stderr, "[decklink:%d] EnableVideoInput failed (0x%08x)\n", idx, (unsigned)hr);
continue;
}
/* Enable audio input — 48kHz stereo s16le */
input->EnableAudioInput(bmdAudioSampleRate48kHz,
bmdAudioSampleType16bitInteger, 2);
/* Start silence thread (keeps audio FIFO open) */
ds.fps_num = 30000; ds.fps_den = 1001; /* default until format detected */
pthread_create(&ds.audio_tid, nullptr, audio_silence_thread, &ds);
/* Start capture */
if (input->StartStreams() != S_OK) {
fprintf(stderr, "[decklink:%d] StartStreams failed\n", idx);
continue;
}
fprintf(stderr, "[decklink:%d] capture started, waiting for signal...\n", idx);
}
/* ── Run until shutdown ─────────────────────────────────────────── */
while (!g_stop.load()) {
struct timespec ts{0, 100000000L}; /* 100ms */
nanosleep(&ts, nullptr);
}
fprintf(stderr, "[decklink] shutdown signal received\n");
/* ── Cleanup ─────────────────────────────────────────────────────── */
for (size_t i = 0; i < states.size(); i++) {
DeviceState &ds = states[i];
if (ds.input) {
ds.input->StopStreams();
ds.input->DisableVideoInput();
ds.input->DisableAudioInput();
ds.input->SetCallback(nullptr);
}
ds.audio_stop.store(1);
if (ds.audio_tid) pthread_join(ds.audio_tid, nullptr);
#ifndef LEGACY_FIFO
if (ds.fc_writer) {
fc_writer_close(ds.fc_writer);
ds.fc_writer = nullptr;
}
#else
if (ds.video_fifo_fd >= 0) close(ds.video_fifo_fd);
#endif
if (ds.input) { ds.input->Release(); ds.input = nullptr; }
if (callbacks[i]) { callbacks[i]->Release(); callbacks[i] = nullptr; }
}
for (auto *d : all_devices) d->Release();
return 0;
}

View file

@ -71,13 +71,96 @@ const DC_BRIDGE_BIN = process.env.DELTACAST_BRIDGE_BIN || 'deltacast-bridge';
const DC_PORTS_CSV = process.env.DELTACAST_PORTS || '0,1,2,3,4,5,6,7'; const DC_PORTS_CSV = process.env.DELTACAST_PORTS || '0,1,2,3,4,5,6,7';
const DC_BOARD = process.env.DELTACAST_BOARD || '0'; const DC_BOARD = process.env.DELTACAST_BOARD || '0';
// Framecache URL — passed to all bridge processes so they can register slots.
// Set FC_URL in .env.worker (default: http://framecache:7435 within the
// wild-dragon-worker Docker network).
const FC_URL = process.env.FC_URL || 'http://framecache:7435';
// Node identity for framecache slot IDs (e.g. "decklink-zampp3-0").
// Set NODE_NAME in .env.worker so slot IDs are stable across restarts.
const FC_NODE_ID = process.env.NODE_NAME || process.env.HOSTNAME || 'local';
let _dcBridge = null; // ChildProcess | null let _dcBridge = null; // ChildProcess | null
let _dcSidecarCount = 0; // active deltacast sidecars on this node let _dcSidecarCount = 0; // active deltacast sidecars on this node
// Map containerId -> sourceType so stop() can decrement the deltacast counter. // Map containerId -> sourceType so stop() can decrement the deltacast counter.
const _containerSourceType = new Map(); const _containerSourceType = new Map();
// port -> fmt JSON from bridge stderr (inject into sidecar env) // port -> fmt JSON from bridge stderr (inject into sidecar env + slot_id)
const _dcPortFmt = new Map(); const _dcPortFmt = new Map();
// ── DeckLink bridge ───────────────────────────────────────────────────────
// One decklink-bridge process per node, managing all DeckLink devices.
// Mirrors the deltacast-bridge singleton pattern.
const DL_BRIDGE_BIN = process.env.DECKLINK_BRIDGE_BIN || 'decklink-bridge';
const DL_AUDIO_DIR = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink';
let _dlBridge = null; // ChildProcess | null
let _dlSidecarCount = 0;
// device_idx -> fmt JSON from bridge stderr
const _dlDevFmt = new Map();
function _dlBridgeRunning() {
return _dlBridge !== null && _dlBridge.exitCode === null && _dlBridge.signalCode === null;
}
function startDecklinkBridge(deviceIndices) {
if (_dlBridgeRunning()) return;
try { require('fs').mkdirSync(DL_AUDIO_DIR, { recursive: true }); } catch (_) {}
const devCsv = Array.isArray(deviceIndices) ? deviceIndices.join(',') : String(deviceIndices || '0');
const args = [
'--devices', devCsv,
'--fc-url', FC_URL,
'--audio-pipe-dir', DL_AUDIO_DIR,
];
console.log(`[dl-bridge] launching: ${DL_BRIDGE_BIN} ${args.join(' ')}`);
const proc = spawn(DL_BRIDGE_BIN, args, {
stdio: ['ignore', 'ignore', 'pipe'],
detached: false,
env: { ...process.env, NODE_ID: FC_NODE_ID, FC_URL },
});
proc.stderr.setEncoding('utf8');
proc.stderr.on('data', (chunk) => {
for (const line of chunk.split('\n')) {
const t = line.trim();
if (!t) continue;
if (t.startsWith('{')) {
console.log('[dl-bridge] ' + t);
try {
const f = JSON.parse(t);
if (typeof f.device === 'number') _dlDevFmt.set(f.device, f);
} catch (_) {}
} else {
console.error('[dl-bridge] ' + t);
}
}
});
proc.on('error', (err) => {
console.error(`[dl-bridge] spawn error: ${err.message}`);
_dlBridge = null;
});
proc.on('exit', (code, sig) => {
console.error(`[dl-bridge] exited code=${code} signal=${sig}`);
_dlBridge = null;
});
_dlBridge = proc;
console.log(`[dl-bridge] pid=${proc.pid} devices=${devCsv}`);
}
function stopDecklinkBridge() {
if (!_dlBridgeRunning()) return;
console.log('[dl-bridge] stopping');
try { _dlBridge.kill('SIGTERM'); } catch (_) {}
const proc = _dlBridge;
setTimeout(() => {
try { if (proc.exitCode === null) proc.kill('SIGKILL'); } catch (_) {}
}, 5000);
_dlBridge = null;
}
function _dcBridgeRunning() { function _dcBridgeRunning() {
return _dcBridge !== null && _dcBridge.exitCode === null && _dcBridge.signalCode === null; return _dcBridge !== null && _dcBridge.exitCode === null && _dcBridge.signalCode === null;
} }
@ -122,12 +205,14 @@ function startDeltacastBridge() {
'--ports', DC_PORTS_CSV, '--ports', DC_PORTS_CSV,
'--video-pipe-dir', DC_PIPE_DIR, '--video-pipe-dir', DC_PIPE_DIR,
'--audio-pipe-dir', DC_PIPE_DIR, '--audio-pipe-dir', DC_PIPE_DIR,
'--fc-url', FC_URL,
]; ];
console.log(`[dc-bridge] launching: ${DC_BRIDGE_BIN} ${args.join(' ')}`); console.log(`[dc-bridge] launching: ${DC_BRIDGE_BIN} ${args.join(' ')}`);
const proc = spawn(DC_BRIDGE_BIN, args, { const proc = spawn(DC_BRIDGE_BIN, args, {
stdio: ['ignore', 'ignore', 'pipe'], stdio: ['ignore', 'ignore', 'pipe'],
detached: false, detached: false,
env: { ...process.env, FC_URL, NODE_ID: FC_NODE_ID },
}); });
proc.stderr.setEncoding('utf8'); proc.stderr.setEncoding('utf8');
@ -308,9 +393,12 @@ async function handleSidecarStart(body, res) {
HostConfig: hostConfig, HostConfig: hostConfig,
}; };
// Always inject FC_URL so capture-manager can find the framecache service.
sidecarEnv.push(`FC_URL=${FC_URL}`);
// Deltacast: ensure the shared bridge daemon is running on the HOST before // Deltacast: ensure the shared bridge daemon is running on the HOST before
// starting the sidecar. The sidecar reads FIFOs produced by the bridge; // starting the sidecar. The bridge writes frames to the framecache shm ring;
// it does NOT open the board handle itself (no BufMngr.c:781 race). // the sidecar reads via the consumer library (fc_client).
if (sourceType === 'deltacast') { if (sourceType === 'deltacast') {
_dcSidecarCount++; _dcSidecarCount++;
startDeltacastBridge(); startDeltacastBridge();
@ -324,8 +412,36 @@ async function handleSidecarStart(body, res) {
sidecarEnv.push(`DELTACAST_VIDEO_SIZE=${_fmt.width}x${_fmt.height}`); sidecarEnv.push(`DELTACAST_VIDEO_SIZE=${_fmt.width}x${_fmt.height}`);
sidecarEnv.push(`DELTACAST_FRAMERATE=${_fps}`); sidecarEnv.push(`DELTACAST_FRAMERATE=${_fps}`);
sidecarEnv.push(`DELTACAST_INTERLACED=${_fmt.interlaced ? '1' : '0'}`); sidecarEnv.push(`DELTACAST_INTERLACED=${_fmt.interlaced ? '1' : '0'}`);
console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} interlaced=${_fmt.interlaced}`); // Pass slot_id so capture-manager knows which framecache slot to read
if (_fmt.slot_id) sidecarEnv.push(`FC_SLOT_ID=${_fmt.slot_id}`);
console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} interlaced=${_fmt.interlaced} slot=${_fmt.slot_id}`);
} }
// IPC host — sidecar must share /dev/shm with framecache container
hostConfig.IpcMode = 'host';
}
// DeckLink: ensure decklink-bridge is running on the HOST.
// Bridge writes to framecache; sidecar reads via fc_client.
if (sourceType === 'sdi' || sourceType === 'blackmagic') {
_dlSidecarCount++;
// Determine which device indices are active on this node
const _bmdDevices = [];
try {
const _bmdDir = '/dev/blackmagic';
const _bmdEntries = fs.readdirSync(_bmdDir).filter(n => /^(dv|io)\d+$/.test(n));
_bmdEntries.forEach((_, i) => _bmdDevices.push(i));
} catch (_) { _bmdDevices.push(0); }
startDecklinkBridge(_bmdDevices);
// Inject fmt if available
const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14);
let _devIdx = NaN;
try { _devIdx = JSON.parse(_srcCfg).device ?? JSON.parse(_srcCfg).index; } catch (_) {}
if (!Number.isFinite(_devIdx)) _devIdx = parseInt((env.find(e => e.startsWith('BMD_DEVICE_INDEX=')) || '=0').split('=')[1]) || 0;
if (Number.isFinite(_devIdx) && _dlDevFmt.has(_devIdx)) {
const _fmt = _dlDevFmt.get(_devIdx);
if (_fmt.slot_id) sidecarEnv.push(`FC_SLOT_ID=${_fmt.slot_id}`);
}
hostConfig.IpcMode = 'host';
} }
let containerId; let containerId;
@ -354,12 +470,17 @@ async function handleSidecarStart(body, res) {
} }
if (sourceType === 'deltacast') _containerSourceType.set(containerId, 'deltacast'); if (sourceType === 'deltacast') _containerSourceType.set(containerId, 'deltacast');
if (sourceType === 'sdi' || sourceType === 'blackmagic') _containerSourceType.set(containerId, 'blackmagic');
jsonResponse(res, 201, { containerId, capturePort }); jsonResponse(res, 201, { containerId, capturePort });
} catch (err) { } catch (err) {
if (sourceType === 'deltacast') { if (sourceType === 'deltacast') {
_dcSidecarCount--; _dcSidecarCount--;
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); } if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
} }
if (sourceType === 'sdi' || sourceType === 'blackmagic') {
_dlSidecarCount--;
if (_dlSidecarCount <= 0) { _dlSidecarCount = 0; stopDecklinkBridge(); }
}
throw err; throw err;
} }
} catch (err) { } catch (err) {
@ -399,16 +520,21 @@ async function handleSidecarStop(containerId, res) {
console.log(`[sidecar-stop] ==== capture logs for ${containerId} ====\n${logs}\n[sidecar-stop] ==== end logs ====`); console.log(`[sidecar-stop] ==== capture logs for ${containerId} ====\n${logs}\n[sidecar-stop] ==== end logs ====`);
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {}); await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
// Deltacast bridge lifecycle: decrement sidecar count; stop bridge when last. // Bridge lifecycle: decrement sidecar count; stop bridge when last sidecar stops.
if (_containerSourceType.get(containerId) === 'deltacast') { const _srcType = _containerSourceType.get(containerId);
_containerSourceType.delete(containerId); _containerSourceType.delete(containerId);
if (_srcType === 'deltacast') {
_dcSidecarCount--; _dcSidecarCount--;
if (_dcSidecarCount <= 0) { if (_dcSidecarCount <= 0) {
_dcSidecarCount = 0; _dcSidecarCount = 0;
stopDeltacastBridge(); stopDeltacastBridge();
} }
} else { } else if (_srcType === 'blackmagic') {
_containerSourceType.delete(containerId); _dlSidecarCount--;
if (_dlSidecarCount <= 0) {
_dlSidecarCount = 0;
stopDecklinkBridge();
}
} }
} catch (err) { } catch (err) {
console.error(`[sidecar-stop] background cleanup failed for ${containerId}:`, err.message); console.error(`[sidecar-stop] background cleanup failed for ${containerId}:`, err.message);