/** * decklink-bridge/main.cpp * * Blackmagic DeckLink SDI shared multi-device bridge daemon. * * Opens one or more DeckLink devices and for each device: * - Auto-detects the incoming signal format * - Registers a framecache slot via HTTP API * - Writes raw UYVY422 (bmdFormat8BitYUV) video frames into the shm ring * - Writes PCM s16le audio to a named FIFO (audio-in-shm is roadmap) * * Slot ID format: "decklink--" * node_id comes from NODE_ID env var (set by node-agent), falls back to hostname. * * Usage: * decklink-bridge --devices # device indices, e.g. "0,1" * decklink-bridge --device # single device compat alias * [--fc-url http://framecache:7435] * [--audio-pipe-dir /dev/shm/decklink] * [--signal-timeout ] * * For each device that acquires signal, emits one JSON line to stderr: * {"device":N,"width":W,"height":H,"fps_num":N,"fps_den":D, * "interlaced":false,"pix_fmt":"uyvy422", * "audio_channels":2,"audio_rate":48000, * "slot_id":"decklink--"} * * Compile with -DLEGACY_FIFO=1 to fall back to writing a raw video FIFO * instead of the framecache shm path. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "DeckLinkAPI.h" #include "DeckLinkAPIDispatch.cpp" #ifndef LEGACY_FIFO extern "C" { # include "fc_writer.h" } #endif #ifndef F_SETPIPE_SZ # define F_SETPIPE_SZ 1031 #endif #define FC_URL_DEFAULT "http://localhost:7435" #define AUDIO_PIPE_DIR "/dev/shm/decklink" #define MAX_DEVICES 8 /* ── Global shutdown flag ──────────────────────────────────────────── */ static std::atomic g_stop{0}; static void on_signal(int) { g_stop.store(1); } /* ── Helpers ───────────────────────────────────────────────────────── */ static uint64_t now_us() { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); return (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL; } static int write_all(int fd, const void *buf, size_t len) { const uint8_t *p = static_cast(buf); size_t off = 0; int flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags | O_NONBLOCK); while (off < len) { ssize_t n = write(fd, p + off, len - off); if (n > 0) { off += (size_t)n; continue; } if (n < 0 && errno == EINTR) continue; if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { struct timespec ts{0, 1000000L}; nanosleep(&ts, nullptr); continue; } fcntl(fd, F_SETFL, flags); return -1; } fcntl(fd, F_SETFL, flags); return 0; } /* ── Per-device state ──────────────────────────────────────────────── */ struct DeviceState { int device_idx = 0; IDeckLink *decklink = nullptr; IDeckLinkInput *input = nullptr; /* Signal properties (filled on first frame or format-change) */ int width = 0; int height = 0; int fps_num = 0; int fps_den = 1; bool interlaced = false; std::atomic signal_reported{false}; std::string slot_id; std::string fc_url; std::string audio_fifo; #ifndef LEGACY_FIFO fc_writer_t *fc_writer = nullptr; /* Guards fc_writer + format fields (width/height/fps/signal_reported) * against concurrent access from DeckLink SDK callback threads: * VideoInputFormatChanged and VideoInputFrameArrived can fire on * different threads without mutual exclusion, and reopen_slot() does * close-then-open on fc_writer. Without this lock a frame callback could * call fc_writer_write() on a freed writer (use-after-free), or two * reopen_slot() calls could double-free. */ pthread_mutex_t fc_lock = PTHREAD_MUTEX_INITIALIZER; #else int video_fifo_fd = -1; std::string video_fifo; #endif /* Audio FIFO fd — opened once, reopened on EPIPE */ int audio_fd = -1; pthread_t audio_tid{}; std::atomic audio_stop{0}; uint64_t frame_seq = 0; }; /* ── Audio thread ──────────────────────────────────────────────────── */ /* DeckLink audio arrives via VideoInputFrameArrived callback, not a * separate stream. We write it from the callback directly (see below). * This thread exists only to keep the FIFO open and provide silence * when no frames are arriving (e.g. signal lost). */ static void *audio_silence_thread(void *arg) { DeviceState *ds = static_cast(arg); const int RATE = 48000; const int CH = 2; const int FPS = ds->fps_num > 0 ? ds->fps_num : 30; const int FPS_DEN = ds->fps_den > 0 ? ds->fps_den : 1; long samples = ((long)RATE * FPS_DEN + FPS / 2) / FPS; size_t tick = (size_t)samples * (size_t)CH * 2; /* s16le */ std::vector silence(tick, 0); while (!g_stop.load() && !ds->audio_stop.load()) { int fd = open(ds->audio_fifo.c_str(), O_WRONLY); if (fd < 0) { struct timespec ts{0, 200000000L}; nanosleep(&ts, nullptr); continue; } fcntl(fd, F_SETPIPE_SZ, 1024 * 1024); ds->audio_fd = fd; long frame_ns = (long)(1000000000.0 * (double)FPS_DEN / (double)FPS); struct timespec next; clock_gettime(CLOCK_MONOTONIC, &next); while (!g_stop.load() && !ds->audio_stop.load()) { /* Only write silence if no real audio arrived recently. * Real audio is written by VideoInputFrameArrived directly. */ if (write_all(ds->audio_fd, silence.data(), tick) < 0) { fprintf(stderr, "[audio:%d] EPIPE — reopening\n", ds->device_idx); break; } next.tv_nsec += frame_ns; while (next.tv_nsec >= 1000000000L) { next.tv_nsec -= 1000000000L; next.tv_sec++; } struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); if (next.tv_sec > now.tv_sec || (next.tv_sec == now.tv_sec && next.tv_nsec > now.tv_nsec)) clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, nullptr); else next = now; } ds->audio_fd = -1; close(fd); } return nullptr; } /* ── IDeckLinkInputCallback implementation ─────────────────────────── */ class CaptureCallback : public IDeckLinkInputCallback { public: explicit CaptureCallback(DeviceState *ds) : m_ds(ds), m_refcount(1) {} /* IUnknown */ HRESULT QueryInterface(REFIID, void **) override { return E_NOINTERFACE; } ULONG AddRef() override { return ++m_refcount; } ULONG Release() override { ULONG r = --m_refcount; if (r == 0) delete this; return r; } /* IDeckLinkInputCallback */ HRESULT VideoInputFormatChanged( BMDVideoInputFormatChangedEvents events, IDeckLinkDisplayMode *newMode, BMDDetectedVideoInputFormatFlags detectedFlags) override { /* Re-enable input with new mode — required for auto-detect to work */ m_ds->input->PauseStreams(); BMDDisplayMode mode = newMode->GetDisplayMode(); /* Detect interlaced */ BMDFieldDominance fd = newMode->GetFieldDominance(); m_ds->interlaced = (fd == bmdUpperFieldFirst || fd == bmdLowerFieldFirst); /* Get width/height */ m_ds->width = (int)newMode->GetWidth(); m_ds->height = (int)newMode->GetHeight(); /* Get frame rate */ BMDTimeValue frameDuration; BMDTimeScale timeScale; newMode->GetFrameRate(&frameDuration, &timeScale); m_ds->fps_num = (int)timeScale; m_ds->fps_den = (int)frameDuration; m_ds->input->EnableVideoInput(mode, bmdFormat8BitYUV, bmdVideoInputEnableFormatDetection); m_ds->input->FlushStreams(); m_ds->input->StartStreams(); fprintf(stderr, "[decklink:%d] format changed: %dx%d %.4ffps %s\n", m_ds->device_idx, m_ds->width, m_ds->height, m_ds->fps_den ? (double)m_ds->fps_num / m_ds->fps_den : 0.0, m_ds->interlaced ? "interlaced" : "progressive"); /* Re-open framecache slot with new format */ this->reopen_slot(); return S_OK; } HRESULT VideoInputFrameArrived( IDeckLinkVideoInputFrame *videoFrame, IDeckLinkAudioInputPacket *audioPacket) override { if (g_stop.load()) return S_OK; if (!videoFrame) return S_OK; /* Detect format on first frame if format-change hasn't fired. * Use atomic exchange so only ONE thread runs the first-frame init * even if two frame callbacks race before signal_reported is set. */ bool exp = false; if (m_ds->signal_reported.compare_exchange_strong(exp, true)) { m_ds->width = (int)videoFrame->GetWidth(); m_ds->height = (int)videoFrame->GetHeight(); if (m_ds->fps_num == 0) { m_ds->fps_num = 30000; m_ds->fps_den = 1001; } this->reopen_slot(); } /* ── Write video frame ──────────────────────────────────────── */ void *bytes = nullptr; videoFrame->GetBytes(&bytes); uint32_t sz = (uint32_t)(videoFrame->GetRowBytes() * videoFrame->GetHeight()); uint32_t expected = (uint32_t)m_ds->width * (uint32_t)m_ds->height * 2; if (sz != expected) { fprintf(stderr, "[decklink:%d] WARN: frame sz=%u != expected %u — skipping\n", m_ds->device_idx, sz, expected); return S_OK; } uint64_t pts_us = 0; if (m_ds->fps_num > 0) { pts_us = m_ds->frame_seq * 1000000ULL * (uint64_t)m_ds->fps_den / (uint64_t)m_ds->fps_num; } #ifndef LEGACY_FIFO /* Lock so a concurrent VideoInputFormatChanged → reopen_slot() cannot * free fc_writer between our null-check and the write (use-after-free). */ pthread_mutex_lock(&m_ds->fc_lock); if (m_ds->fc_writer) { fc_writer_write(m_ds->fc_writer, static_cast(bytes), sz, pts_us); } pthread_mutex_unlock(&m_ds->fc_lock); #else if (m_ds->video_fifo_fd >= 0) { if (write_all(m_ds->video_fifo_fd, static_cast(bytes), sz) < 0) { fprintf(stderr, "[decklink:%d] video FIFO EPIPE\n", m_ds->device_idx); close(m_ds->video_fifo_fd); m_ds->video_fifo_fd = open(m_ds->video_fifo.c_str(), O_WRONLY | O_NONBLOCK); if (m_ds->video_fifo_fd >= 0) fcntl(m_ds->video_fifo_fd, F_SETPIPE_SZ, 64 * 1024 * 1024); } } #endif m_ds->frame_seq++; /* ── Write audio ────────────────────────────────────────────── */ if (audioPacket && m_ds->audio_fd >= 0) { void *abytes = nullptr; audioPacket->GetBytes(&abytes); uint32_t sample_count = (uint32_t)audioPacket->GetSampleFrameCount(); uint32_t audio_sz = sample_count * 2 /* ch */ * 2 /* s16le bytes */; if (abytes && audio_sz > 0) { /* Non-fatal if pipe is full — silence thread provides fallback */ write_all(m_ds->audio_fd, static_cast(abytes), audio_sz); } } /* Emit signal JSON once per device on first frame */ if (m_ds->frame_seq == 1) { fprintf(stderr, "{\"device\":%d,\"width\":%d,\"height\":%d," "\"fps_num\":%d,\"fps_den\":%d," "\"interlaced\":%s," "\"pix_fmt\":\"uyvy422\"," "\"audio_channels\":2,\"audio_rate\":48000," "\"slot_id\":\"%s\"}\n", m_ds->device_idx, m_ds->width, m_ds->height, m_ds->fps_num, m_ds->fps_den, m_ds->interlaced ? "true" : "false", m_ds->slot_id.c_str()); fflush(stderr); } return S_OK; } private: DeviceState *m_ds; std::atomic m_refcount; void reopen_slot() { #ifndef LEGACY_FIFO /* Serialize with frame writes and any concurrent reopen_slot() so we * never double-free fc_writer or write to a half-closed one. */ pthread_mutex_lock(&m_ds->fc_lock); if (m_ds->fc_writer) { fc_writer_close(m_ds->fc_writer); m_ds->fc_writer = nullptr; } if (m_ds->width > 0 && m_ds->height > 0 && m_ds->fps_num > 0) { m_ds->fc_writer = fc_writer_open( m_ds->fc_url.c_str(), m_ds->slot_id.c_str(), (uint32_t)m_ds->width, (uint32_t)m_ds->height, (uint32_t)m_ds->fps_num, (uint32_t)m_ds->fps_den); if (!m_ds->fc_writer) { fprintf(stderr, "[decklink:%d] framecache unavailable\n", m_ds->device_idx); } } pthread_mutex_unlock(&m_ds->fc_lock); #endif } }; /* ── Parse comma-separated device list ────────────────────────────── */ static std::vector parse_devices(const char *csv) { std::vector out; char buf[256]; strncpy(buf, csv, sizeof buf - 1); char *tok = strtok(buf, ","); while (tok) { out.push_back(atoi(tok)); tok = strtok(nullptr, ","); } return out; } /* ── Main ──────────────────────────────────────────────────────────── */ int main(int argc, char *argv[]) { std::vector device_indices; int sig_timeout = 30; const char *fc_url = getenv("FC_URL") ? getenv("FC_URL") : FC_URL_DEFAULT; const char *audio_dir = AUDIO_PIPE_DIR; const char *node_id = getenv("NODE_ID"); char hostname[256] = "local"; if (!node_id) { gethostname(hostname, sizeof hostname); node_id = hostname; } for (int i = 1; i < argc; i++) { if (!strcmp(argv[i], "--devices") && i+1 < argc) device_indices = parse_devices(argv[++i]); else if (!strcmp(argv[i], "--device") && i+1 < argc) device_indices.push_back(atoi(argv[++i])); else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) fc_url = argv[++i]; else if (!strcmp(argv[i], "--audio-pipe-dir") && i+1 < argc) audio_dir = argv[++i]; else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc) sig_timeout = atoi(argv[++i]); } if (device_indices.empty()) { fprintf(stderr, "{\"error\":\"no devices specified — use --devices 0,1 or --device 0\"}\n"); return 1; } signal(SIGINT, on_signal); signal(SIGTERM, on_signal); signal(SIGPIPE, SIG_IGN); /* Ensure audio pipe dir exists */ mkdir(audio_dir, 0755); /* ── Enumerate DeckLink devices ─────────────────────────────────── */ IDeckLinkIterator *iterator = CreateDeckLinkIteratorInstance(); if (!iterator) { fprintf(stderr, "{\"error\":\"CreateDeckLinkIteratorInstance failed — DeckLink driver not loaded?\"}\n"); return 1; } std::vector all_devices; IDeckLink *dl = nullptr; while (iterator->Next(&dl) == S_OK) { all_devices.push_back(dl); } iterator->Release(); fprintf(stderr, "[decklink] %zu device(s) detected\n", all_devices.size()); /* ── Set up per-device state ─────────────────────────────────────── */ std::vector states(device_indices.size()); std::vector callbacks(device_indices.size(), nullptr); for (size_t i = 0; i < device_indices.size(); i++) { int idx = device_indices[i]; if (idx < 0 || (size_t)idx >= all_devices.size()) { fprintf(stderr, "{\"error\":\"device index %d out of range (%zu detected)\"}\n", idx, all_devices.size()); continue; } DeviceState &ds = states[i]; ds.device_idx = idx; ds.fc_url = fc_url; /* slot_id: "decklink--" */ char sid[128]; snprintf(sid, sizeof sid, "decklink-%s-%d", node_id, idx); ds.slot_id = sid; /* Audio FIFO path */ char apath[256]; snprintf(apath, sizeof apath, "%s/audio-%d.fifo", audio_dir, idx); ds.audio_fifo = apath; mkfifo(apath, 0666); /* ignore EEXIST */ #ifdef LEGACY_FIFO /* Video FIFO (legacy path only) */ char vpath[256]; snprintf(vpath, sizeof vpath, "%s/video-%d.fifo", audio_dir, idx); ds.video_fifo = vpath; mkfifo(vpath, 0666); int vfd = open(vpath, O_WRONLY | O_NONBLOCK); if (vfd >= 0) fcntl(vfd, F_SETPIPE_SZ, 64 * 1024 * 1024); ds.video_fifo_fd = vfd; #endif IDeckLink *decklink = all_devices[(size_t)idx]; ds.decklink = decklink; /* Get IDeckLinkInput */ IDeckLinkInput *input = nullptr; if (decklink->QueryInterface(IID_IDeckLinkInput, reinterpret_cast(&input)) != S_OK) { fprintf(stderr, "[decklink:%d] QueryInterface IDeckLinkInput failed\n", idx); continue; } ds.input = input; /* Install callback */ CaptureCallback *cb = new CaptureCallback(&ds); callbacks[i] = cb; input->SetCallback(cb); /* Enable video with format detection — actual mode set on first * VideoInputFormatChanged; use 1080i29.97 as a safe starting mode. */ HRESULT hr = input->EnableVideoInput( bmdModeHD1080i5994, bmdFormat8BitYUV, bmdVideoInputEnableFormatDetection); if (hr != S_OK) { fprintf(stderr, "[decklink:%d] EnableVideoInput failed (0x%08x)\n", idx, (unsigned)hr); continue; } /* Enable audio input — 48kHz stereo s16le */ input->EnableAudioInput(bmdAudioSampleRate48kHz, bmdAudioSampleType16bitInteger, 2); /* Start silence thread (keeps audio FIFO open) */ ds.fps_num = 30000; ds.fps_den = 1001; /* default until format detected */ pthread_create(&ds.audio_tid, nullptr, audio_silence_thread, &ds); /* Start capture */ if (input->StartStreams() != S_OK) { fprintf(stderr, "[decklink:%d] StartStreams failed\n", idx); continue; } fprintf(stderr, "[decklink:%d] capture started, waiting for signal...\n", idx); } /* ── Run until shutdown ─────────────────────────────────────────── */ while (!g_stop.load()) { struct timespec ts{0, 100000000L}; /* 100ms */ nanosleep(&ts, nullptr); } fprintf(stderr, "[decklink] shutdown signal received\n"); /* ── Cleanup ─────────────────────────────────────────────────────── */ for (size_t i = 0; i < states.size(); i++) { DeviceState &ds = states[i]; if (ds.input) { ds.input->StopStreams(); ds.input->DisableVideoInput(); ds.input->DisableAudioInput(); ds.input->SetCallback(nullptr); } ds.audio_stop.store(1); if (ds.audio_tid) pthread_join(ds.audio_tid, nullptr); #ifndef LEGACY_FIFO if (ds.fc_writer) { fc_writer_close(ds.fc_writer); ds.fc_writer = nullptr; } #else if (ds.video_fifo_fd >= 0) close(ds.video_fifo_fd); #endif if (ds.input) { ds.input->Release(); ds.input = nullptr; } if (callbacks[i]) { callbacks[i]->Release(); callbacks[i] = nullptr; } } for (auto *d : all_devices) d->Release(); return 0; }