/* services/capture/deltacast-bridge/main.c * * Deltacast VideoMaster SDI shared multi-port bridge daemon. * * Opens the board ONCE, opens RX streams for all requested ports, and * writes each port's video frames into a shared-memory framecache slot * (and audio to a named FIFO — audio-in-shm is a future roadmap item). * * Signal fan-out architecture: * Board → video_thread → fc_writer → /dev/shm/framecache/ * └→ N consumers (recording, proxy, * HLS preview) each read with * their own cursor — zero-copy, * no bandwidth splitting. * * Usage: * deltacast-bridge --device --ports * [--video-pipe-dir /dev/shm/deltacast] * [--audio-pipe-dir /dev/shm/deltacast] * [--fc-url http://framecache:7435] * [--signal-timeout ] * * Compat alias: --port treated as --ports (single port). * * For each port that acquires signal, emits one JSON line to stderr: * {"port":N,"width":W,"height":H,"fps_num":N,"fps_den":D, * "pix_fmt":"uyvy422","audio_rate":48000,"audio_channels":2, * "slot_id":"deltacast--"} * * Compile with -DLEGACY_FIFO=1 to disable shm writes and fall back to * the original named-FIFO path (for nodes without framecache running). * * Runs until SIGTERM/SIGINT, then closes all streams and the board. */ #include #include #include #include #include #include #include #include #include #include #include #include "VideoMasterHD_Core.h" #include "VideoMasterHD_Sdi.h" #include "VideoMasterHD_Sdi_Audio.h" #ifndef LEGACY_FIFO # include "fc_writer.h" #endif #ifndef F_SETPIPE_SZ #define F_SETPIPE_SZ 1031 #endif /* Default framecache URL — overridden by FC_URL env var or --fc-url arg */ #define FC_URL_DEFAULT "http://localhost:7435" /* ── Constants ────────────────────────────────────────────────────────── */ #define MAX_PORTS 8 /* ── Globals ──────────────────────────────────────────────────────────── */ static atomic_int g_stop = 0; /* global shutdown (SIGTERM/SIGINT only) */ /* Fixed A/V alignment: ms of leading silence prepended to the audio stream when * a reader attaches. The video path (framecache ring -> fc_pipe -> ffmpeg input * queue) is buffered deeper than the direct audio FIFO, so without compensation * audio reaches the muxer AHEAD of its matching video frame. Prepending N ms of * silence delays audio by N ms to re-align. Set via --audio-delay-ms (default 0). * One value, all ports, deterministic — no per-session env plumbing. */ static int g_audio_delay_ms = 0; static void on_signal(int s) { (void)s; atomic_store(&g_stop, 1); } /* Per-port stop flag — set only when a fatal error occurs on that specific * port (e.g. video lock lost). Audio EPIPE is handled by reopening the FIFO * rather than stopping the port, so the thread survives ffmpeg restarts. */ static atomic_int g_port_stop[MAX_PORTS]; /* ── Stream type by port index (non-contiguous SDK enum) ────────────── */ static ULONG rx_streamtype(unsigned port) { switch (port) { case 0: return VHD_ST_RX0; case 1: return VHD_ST_RX1; case 2: return VHD_ST_RX2; case 3: return VHD_ST_RX3; case 4: return VHD_ST_RX4; case 5: return VHD_ST_RX5; case 6: return VHD_ST_RX6; case 7: return VHD_ST_RX7; default: fprintf(stderr, "{\"error\":\"port %u not supported (max 7)\"}\n", port); return VHD_ST_RX0; } } /* ── Loopback board property by port index ───────────────────────────── */ static ULONG loopback_prop(unsigned port) { switch (port) { case 0: return VHD_CORE_BP_PASSIVE_LOOPBACK_0; case 1: return VHD_CORE_BP_PASSIVE_LOOPBACK_1; case 2: return VHD_CORE_BP_PASSIVE_LOOPBACK_2; case 3: return VHD_CORE_BP_PASSIVE_LOOPBACK_3; default: return -1; /* ports 4-7 have no passive loopback property; call site guards p < 4 */ } } /* ── Video standard → width/height/fps/interlaced ───────────────────── */ typedef struct { int width, height, fps_num, fps_den; int interlaced; } VideoInfo; static VideoInfo video_info(VHD_VIDEOSTANDARD std, VHD_CLOCKDIVISOR div) { int ntsc = (div == VHD_CLOCKDIV_1001); switch (std) { case VHD_VIDEOSTD_S274M_1080p_25Hz: return (VideoInfo){1920,1080,25,1,0}; case VHD_VIDEOSTD_S274M_1080p_30Hz: return (VideoInfo){1920,1080,ntsc?30000:30,ntsc?1001:1,0}; case VHD_VIDEOSTD_S274M_1080p_24Hz: return (VideoInfo){1920,1080,ntsc?24000:24,ntsc?1001:1,0}; case VHD_VIDEOSTD_S274M_1080p_50Hz: return (VideoInfo){1920,1080,50,1,0}; case VHD_VIDEOSTD_S274M_1080p_60Hz: return (VideoInfo){1920,1080,ntsc?60000:60,ntsc?1001:1,0}; case VHD_VIDEOSTD_S274M_1080psf_24Hz: return (VideoInfo){1920,1080,ntsc?24000:24,ntsc?1001:1,0}; case VHD_VIDEOSTD_S274M_1080psf_25Hz: return (VideoInfo){1920,1080,25,1,0}; case VHD_VIDEOSTD_S274M_1080psf_30Hz: return (VideoInfo){1920,1080,ntsc?30000:30,ntsc?1001:1,0}; case VHD_VIDEOSTD_S274M_1080i_50Hz: return (VideoInfo){1920,1080,25,1,1}; case VHD_VIDEOSTD_S274M_1080i_60Hz: return (VideoInfo){1920,1080,ntsc?30000:30,ntsc?1001:1,1}; case VHD_VIDEOSTD_S296M_720p_50Hz: return (VideoInfo){1280,720,50,1,0}; case VHD_VIDEOSTD_S296M_720p_60Hz: return (VideoInfo){1280,720,ntsc?60000:60,ntsc?1001:1,0}; case VHD_VIDEOSTD_S296M_720p_25Hz: return (VideoInfo){1280,720,25,1,0}; case VHD_VIDEOSTD_S296M_720p_30Hz: return (VideoInfo){1280,720,ntsc?30000:30,ntsc?1001:1,0}; case VHD_VIDEOSTD_S296M_720p_24Hz: return (VideoInfo){1280,720,ntsc?24000:24,ntsc?1001:1,0}; case VHD_VIDEOSTD_3840x2160p_24Hz: return (VideoInfo){3840,2160,ntsc?24000:24,ntsc?1001:1,0}; case VHD_VIDEOSTD_3840x2160p_25Hz: return (VideoInfo){3840,2160,25,1,0}; case VHD_VIDEOSTD_3840x2160p_30Hz: return (VideoInfo){3840,2160,ntsc?30000:30,ntsc?1001:1,0}; case VHD_VIDEOSTD_3840x2160p_50Hz: return (VideoInfo){3840,2160,50,1,0}; case VHD_VIDEOSTD_3840x2160p_60Hz: return (VideoInfo){3840,2160,ntsc?60000:60,ntsc?1001:1,0}; case VHD_VIDEOSTD_S259M_NTSC_480: return (VideoInfo){720,480,ntsc?30000:30,ntsc?1001:1,1}; default: return (VideoInfo){1920,1080,25,1,0}; } } /* ── Write-all helper ─────────────────────────────────────────────────── */ /* Writes all bytes to fd. Uses non-blocking I/O so the bridge never stalls * waiting for a slow reader. Returns 0 on success, -1 on fatal error (EPIPE * = reader closed the FIFO). EAGAIN / EWOULDBLOCK on a full pipe is NOT fatal * — the caller (video_thread) will retry on the next slot lock. */ static int write_all(int fd, const unsigned char *p, size_t len) { /* Make the fd non-blocking for the duration of this write */ int flags = fcntl(fd, F_GETFL, 0); if (flags < 0) return -1; if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) return -1; size_t off = 0; while (off < len) { ssize_t n = write(fd, p + off, len - off); if (n > 0) { off += (size_t)n; continue; } if (n < 0 && errno == EINTR) continue; if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { /* Pipe full — brief yield then retry */ struct timespec ts = {0, 1000000L}; /* 1ms */ nanosleep(&ts, NULL); continue; } /* EPIPE or other fatal error — restore flags and return */ fcntl(fd, F_SETFL, flags); return -1; } /* Restore blocking mode */ fcntl(fd, F_SETFL, flags); return 0; } /* ── Embedded-audio PCM ring (SPSC) ─────────────────────────────────────── * JOINED architecture: the video_thread extracts the SDI-embedded audio of * each frame from the SAME slot it pulls video from, and pushes that PCM into * this lock-free single-producer/single-consumer ring. The audio_thread is the * single consumer: it drains the ring into the named audio FIFO (ffmpeg input * 1) and survives ffmpeg restarts (EPIPE → reopen) without touching the board. * * Why a ring instead of writing the FIFO directly from video_thread: * - open(audio_fifo, O_WRONLY) blocks until an ffmpeg reader attaches. If the * video_thread blocked on that, video capture would stall. The ring keeps * the board-paced frame loop (video + audio extract) free-running while the * FIFO lifecycle (blocking open, EPIPE reopen, silence fallback) lives in * audio_thread. * - Audio is still bound to its exact video frame because it is extracted on * the SAME slot in the SAME loop iteration → zero constant offset at root. * * 4 MB holds ~21 s of 48 kHz stereo s16le — far more than any FIFO hiccup. */ #define APCM_RING_BYTES (4u * 1024u * 1024u) typedef struct { unsigned char *buf; /* APCM_RING_BYTES, power-of-two-free */ _Atomic size_t w; /* producer write offset (monotonic) */ _Atomic size_t r; /* consumer read offset (monotonic) */ _Atomic int have_embedded; /* 1 once real embedded PCM seen */ } ApcmRing; /* Producer (video_thread): copy n bytes in; drop on overflow (never blocks). */ static void apcm_push(ApcmRing *ring, const unsigned char *src, size_t n) { if (!ring->buf || n == 0) return; size_t w = atomic_load_explicit(&ring->w, memory_order_relaxed); size_t r = atomic_load_explicit(&ring->r, memory_order_acquire); size_t used = w - r; if (used + n > APCM_RING_BYTES) { /* Overflow: reader stalled (no ffmpeg attached, or slow). Drop the * oldest by advancing nothing here and simply refusing the write — * keeping the most-recent contiguous audio aligned to live video. */ return; } for (size_t i = 0; i < n; i++) ring->buf[(w + i) % APCM_RING_BYTES] = src[i]; atomic_store_explicit(&ring->w, w + n, memory_order_release); } /* Consumer (audio_thread): pop up to max bytes; returns bytes copied. */ static size_t apcm_pop(ApcmRing *ring, unsigned char *dst, size_t max) { if (!ring->buf) return 0; size_t r = atomic_load_explicit(&ring->r, memory_order_relaxed); size_t w = atomic_load_explicit(&ring->w, memory_order_acquire); size_t avail = w - r; size_t n = avail < max ? avail : max; for (size_t i = 0; i < n; i++) dst[i] = ring->buf[(r + i) % APCM_RING_BYTES]; atomic_store_explicit(&ring->r, r + n, memory_order_release); return n; } /* Consumer: discard everything currently queued (flush stale backlog to the * live edge when a fresh reader attaches). */ static void apcm_drain(ApcmRing *ring) { if (!ring->buf) return; size_t w = atomic_load_explicit(&ring->w, memory_order_acquire); atomic_store_explicit(&ring->r, w, memory_order_release); } /* ── Per-port state ───────────────────────────────────────────────────── */ typedef struct { HANDLE board; unsigned port; unsigned device; ULONG video_std; ULONG clock_div; VideoInfo vi; char video_fifo[256]; char audio_fifo[256]; char slot_id[128]; /* framecache slot id: "deltacast--" */ char fc_url[256]; /* framecache HTTP base URL */ /* threads */ pthread_t video_tid; pthread_t audio_tid; /* streams (owned by threads, set before thread launch) */ HANDLE video_stream; /* JOINED RX stream: carries video + embedded audio */ #ifndef LEGACY_FIFO fc_writer_t *fc_writer; /* shm ring buffer writer (NULL = use FIFO fallback) */ #endif /* JOINED embedded-audio plumbing (producer=video_thread, consumer=audio_thread) */ ApcmRing apcm; /* video_thread → audio_thread PCM hand-off */ } PortState; /* ── Audio thread (JOINED architecture: FIFO sink, no second VHD stream) ── * * In the JOINED re-architecture the board is opened with ONE RX stream per * port (VHD_SDI_STPROC_JOINED). The video_thread locks each slot and extracts * BOTH the video frame and that frame's SDI-EMBEDDED audio from the SAME slot, * pushing the de-interleaved s16le stereo PCM into ps->apcm. Because the audio * is the embedded audio of the exact frame, it is inherently sync'd with that * frame — zero constant offset at the root (no separate DISJOINED_ANC stream, * no independent buffer queue racing ahead of video). * * This thread NO LONGER opens a VHD stream. Its sole job is FIFO lifecycle: * - Open the named audio FIFO (blocks until ffmpeg input 1 attaches). * - On reader attach, flush the ring backlog to the LIVE edge. * - Drain ps->apcm → FIFO. When the ring is momentarily empty, emit * wall-clock-paced silence so ffmpeg input 1 never starves (also the * silence-fallback when the signal carries no embedded audio at all). * - On EPIPE (ffmpeg reader died): close and REOPEN the FIFO so the thread * survives an ffmpeg restart without bringing down other ports. * EPIPE never sets g_stop — only SIGTERM/SIGINT does that. * * The legacy --audio-delay-ms knob is still honoured (prepended once on reader * attach) but should be UNNECESSARY now that audio rides with its frame; leave * it at the default 0. */ static void *audio_thread(void *arg) { PortState *ps = (PortState *)arg; const int AUDIO_RATE = 48000; const int CHANNELS = 2; const size_t FRAME_BYTES = (size_t)CHANNELS * 2; /* s16le stereo */ int fps_num = ps->vi.fps_num > 0 ? ps->vi.fps_num : 25; int fps_den = ps->vi.fps_den > 0 ? ps->vi.fps_den : 1; long samples_per_frame = ((long)AUDIO_RATE * fps_den + fps_num / 2) / fps_num; if (samples_per_frame < 1) samples_per_frame = 1; size_t tick_bytes = (size_t)samples_per_frame * FRAME_BYTES; long frame_ns = (long)(1000000000.0 * (double)fps_den / (double)fps_num); /* Scratch buffer: large enough for a generous burst pulled from the ring * in one go (several frames of audio) plus the per-tick silence buffer. */ size_t buf_sz = tick_bytes * 8; if (buf_sz < 65536) buf_sz = 65536; unsigned char *buf = calloc(1, buf_sz); if (!buf) return NULL; /* Outer loop: reopen the FIFO writer each time a reader connects. * This allows the bridge to survive ffmpeg session stop/restart on a port * without affecting any other port's threads. */ while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { int fd = open(ps->audio_fifo, O_WRONLY); if (fd < 0) { /* Open failed (rare — FIFO was deleted?). Brief pause then retry. */ fprintf(stderr, "[audio:%u] open FIFO failed: %s\n", ps->port, strerror(errno)); struct timespec ts = {0, 200000000L}; nanosleep(&ts, NULL); continue; } fcntl(fd, F_SETPIPE_SZ, 1024 * 1024); /* ── Flush the embedded-audio ring backlog to the LIVE edge ───────── * While no reader is attached (recorder idle/standby), the open() above * blocks but the video_thread keeps free-running and pushing the * embedded audio of every live frame into ps->apcm. Without flushing, * the first thing a newly-attached reader (the record ffmpeg) receives * is that backlog — seconds of stale audio that plays as leading * mis-sync. Discard everything queued so we hand the reader the LIVE * edge, frame-aligned with the video fc_pipe is delivering right now. */ apcm_drain(&ps->apcm); /* ── Fixed A/V alignment: prepend g_audio_delay_ms of leading silence ── * Retained for compatibility; with JOINED capture audio already rides * with its frame so this should stay 0. When set, the real PCM zeros at * 48 kHz consume exactly N ms of the audio timeline (ffmpeg derives * audio PTS from sample count) — a precise, drift-free shift. */ if (g_audio_delay_ms > 0) { long delay_samples = (long)AUDIO_RATE * g_audio_delay_ms / 1000; size_t delay_bytes = (size_t)delay_samples * FRAME_BYTES; unsigned char sil[8192]; memset(sil, 0, sizeof(sil)); size_t remaining = delay_bytes; int delay_ok = 1; while (remaining > 0) { size_t chunk = remaining > sizeof(sil) ? sizeof(sil) : remaining; if (write_all(fd, sil, chunk) < 0) { delay_ok = 0; break; } remaining -= chunk; } if (delay_ok) fprintf(stderr, "[audio:%u] prepended %d ms (%ld samples) of A/V-align silence\n", ps->port, g_audio_delay_ms, delay_samples); } /* Wall-clock baseline for the silence-fill cadence. */ struct timespec next; clock_gettime(CLOCK_MONOTONIC, &next); /* Inner loop: drain the ring into the FIFO until the reader exits. * * Pacing model: * - Whenever the ring has embedded PCM, write ALL of it immediately. * The producer (video_thread) is paced by the board's JOINED slot * cadence = the true SDI clock, so the volume of bytes the ring * accumulates per unit time exactly tracks video. We never pad or * resample it, so the audio timeline length matches video length * (no progressive drift). * - When the ring is empty for a whole frame interval (no embedded * audio on the signal, or a brief gap), emit exactly one frame of * silence, wall-clock paced, so ffmpeg input 1 never starves. */ int wrote_real_since_log = 0; while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { size_t got = apcm_pop(&ps->apcm, buf, buf_sz); if (got > 0) { if (write_all(fd, buf, got) < 0) { fprintf(stderr, "[audio:%u] EPIPE — waiting for next reader\n", ps->port); break; } if (!wrote_real_since_log && atomic_load_explicit(&ps->apcm.have_embedded, memory_order_relaxed)) { fprintf(stderr, "[audio:%u] streaming SDI-embedded audio (JOINED slot)\n", ps->port); wrote_real_since_log = 1; } /* Re-baseline the silence clock so we don't burst silence right * after a real chunk; the next empty interval starts from now. */ clock_gettime(CLOCK_MONOTONIC, &next); /* Small yield to avoid a busy spin when the ring is being fed in * sub-frame increments; the board cadence refills it promptly. */ struct timespec ts = {0, frame_ns / 4 > 0 ? frame_ns / 4 : 250000L}; nanosleep(&ts, NULL); continue; } /* Ring empty this interval → emit one frame of silence, paced. */ memset(buf, 0, tick_bytes); if (write_all(fd, buf, tick_bytes) < 0) { fprintf(stderr, "[audio:%u] EPIPE — waiting for next reader\n", ps->port); break; } next.tv_nsec += frame_ns; while (next.tv_nsec >= 1000000000L) { next.tv_nsec -= 1000000000L; next.tv_sec += 1; } struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); if (next.tv_sec > now.tv_sec || (next.tv_sec == now.tv_sec && next.tv_nsec > now.tv_nsec)) { clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, NULL); } else { next = now; } } close(fd); } free(buf); return NULL; } /* ── Embedded-audio extraction context (used inside the JOINED video loop) ─ * Set up once per video_thread; reused for every slot. The VHD_AUDIOINFO is * configured for a single stereo pair (group 0) in s16le, exactly as the old * DISJOINED_ANC audio path was — the SDK lands packed L/R s16le PCM in * pAudioChannels[0].pData with the byte count in .DataSize. */ typedef struct { int enabled; /* 0 = no scratch buffer (extraction disabled) */ unsigned char *buf; /* scratch PCM landing buffer */ size_t buf_sz; size_t silence_bytes; /* one frame of s16le stereo silence (fallback) */ VHD_AUDIOINFO ai; } AudioExtract; static void audio_extract_init(AudioExtract *ax, PortState *ps) { memset(ax, 0, sizeof(*ax)); /* Worst-case samples per frame at this standard/clock, + headroom. */ ULONG max_samples = VHD_GetNbSamples((VHD_VIDEOSTANDARD)ps->video_std, (VHD_CLOCKDIVISOR)ps->clock_div, VHD_ASR_48000, 0); ULONG block_size = VHD_GetBlockSize(VHD_AF_16, VHD_AM_STEREO); size_t fb = (size_t)2 /*ch*/ * 2 /*s16*/; if (block_size == 0) block_size = (ULONG)fb; /* Exact per-frame capacity for ONE stereo pair (s16le, packed L/R), the * size the SDK fills in pAudioChannels[0].pData. + headroom. */ ax->buf_sz = ((size_t)max_samples + 64) * (size_t)block_size; if (ax->buf_sz < 65536) ax->buf_sz = 65536; ax->buf = calloc(1, ax->buf_sz); if (!ax->buf) { ax->enabled = 0; return; } /* One video-frame worth of s16le stereo silence (samples/frame * 2ch * 2B), * used as the frame-coupled silence fallback when the signal carries no * embedded audio on a frame — keeps the audio timeline length == video. */ { int fn = ps->vi.fps_num > 0 ? ps->vi.fps_num : 25; int fd = ps->vi.fps_den > 0 ? ps->vi.fps_den : 1; long spf = ((long)48000 * fd + fn / 2) / fn; if (spf < 1) spf = 1; ax->silence_bytes = (size_t)spf * 2 /*ch*/ * 2 /*s16*/; if (ax->silence_bytes > ax->buf_sz) ax->silence_bytes = ax->buf_sz; } memset(&ax->ai, 0, sizeof(ax->ai)); /* ── Silent-audio FIX ─────────────────────────────────────────────────── * Configure ONLY pAudioChannels[0] of group 0 as ONE stereo pair, exactly * like Deltacast's own FFmpeg fork (libavdevice/videomaster_common.c, * init_audio_info): for a stereo pair the SDK packs interleaved L/R s16le * into the EVEN channel's pData; the ODD channel (index 1) must be left * ZEROED. The previous JOINED code ALSO set pAudioChannels[1].Mode/ * BufferFormat = STEREO/AF_16, declaring a SECOND stereo pair the signal * does not carry. That mismatch made VHD_SlotExtractAudio return zero * samples → the -91 dB "silent audio" regression. Leaving channel[1] zero * and sizing DataSize = nb_samples * VHD_GetBlockSize(AF_16, STEREO) (set * per call in audio_extract_slot) makes the SDK land real PCM. * * DataSize must be (re)set to the buffer capacity before EACH extract call * because the SDK overwrites it with the number of bytes actually written. */ ax->ai.pAudioGroups[0].pAudioChannels[0].Mode = VHD_AM_STEREO; ax->ai.pAudioGroups[0].pAudioChannels[0].BufferFormat = VHD_AF_16; ax->ai.pAudioGroups[0].pAudioChannels[0].pData = ax->buf; /* pAudioChannels[1] intentionally left zeroed (memset above). */ ax->enabled = 1; } /* Extract this slot's SDI-embedded audio into ax->buf and return the byte * count. Must be called while `slot` is locked (JOINED slot = same frame as the * video) so the returned PCM is exactly this video frame's embedded audio. * * Frame-coupled silence fallback: if the slot yields zero samples (no embedded * audio on the signal, or a transient extract miss) we fill ax->buf with one * frame-interval of silence and return that, so EVERY ring entry carries one * frame of audio and the audio timeline length always equals the video timeline * length (drift-free). On real embedded PCM we flag have_embedded for logging. * * Returns the number of valid PCM bytes in ax->buf (>= 0). */ static size_t audio_extract_slot(AudioExtract *ax, PortState *ps, HANDLE slot) { if (!ax->enabled) return 0; ax->ai.pAudioGroups[0].pAudioChannels[0].DataSize = (ULONG)ax->buf_sz; if (VHD_SlotExtractAudio(slot, &ax->ai) == VHDERR_NOERROR) { ULONG sz = ax->ai.pAudioGroups[0].pAudioChannels[0].DataSize; if (sz > 0 && (size_t)sz <= ax->buf_sz) { atomic_store_explicit(&ps->apcm.have_embedded, 1, memory_order_relaxed); return (size_t)sz; } } /* No embedded audio this frame → one frame-interval of silence. */ if (ax->silence_bytes) memset(ax->buf, 0, ax->silence_bytes); return ax->silence_bytes; } static void audio_extract_free(AudioExtract *ax) { if (ax->buf) free(ax->buf); ax->buf = NULL; ax->enabled = 0; } /* ── Video thread ─────────────────────────────────────────────────────── */ static void *video_thread(void *arg) { PortState *ps = (PortState *)arg; /* JOINED: set up embedded-audio extraction once; reused for every slot. */ AudioExtract ax; audio_extract_init(&ax, ps); if (ax.enabled) fprintf(stderr, "[video:%u] JOINED audio extraction armed (buf=%zu)\n", ps->port, ax.buf_sz); else fprintf(stderr, "[video:%u] WARN: audio extract buffer alloc failed — silence only\n", ps->port); #ifndef LEGACY_FIFO /* ── Framecache shm path (primary) ────────────────────────────────── * Write frames directly into the shared memory ring buffer. * Multiple consumers (growing recorder, proxy encoder, HLS preview) * each hold their own read cursor and read independently — no FIFO * splitting, no bandwidth halving. * * The fc_writer was opened by main() after signal lock. If it is * NULL the framecache service was unavailable and we fall through to * the legacy FIFO path automatically. */ if (ps->fc_writer) { uint64_t frame_seq = 0; while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { HANDLE slot = NULL; ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot); if (r == VHDERR_NOERROR) { /* ── JOINED frame-coupled capture ─────────────────────────── * Extract this frame's SDI-embedded audio from the SAME locked * slot as the video, then write BOTH into ONE framecache ring * entry via fc_writer_write_av. Because audio + video share one * ring slot advanced by one atomic cursor step, and are read * back together by one consumer (fc_pipe) iteration, the audio * can never drift from its video frame — there is no separate * audio transport/buffer. (audio_extract_slot returns one * frame of silence when the signal carries no embedded audio.)*/ size_t asz = audio_extract_slot(&ax, ps, slot); BYTE *buf = NULL; ULONG sz = 0; if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) { ULONG expected = (ULONG)ps->vi.width * (ULONG)ps->vi.height * 2; if (sz != expected) { fprintf(stderr, "[video:%u] WARN: sz=%lu != expected %lu — packing mismatch, skipping\n", ps->port, (unsigned long)sz, (unsigned long)expected); VHD_UnlockSlotHandle(slot); continue; } /* pts: frame index × frame duration in µs */ uint64_t pts_us = 0; if (ps->vi.fps_num > 0) { pts_us = frame_seq * 1000000ULL * (uint64_t)ps->vi.fps_den / (uint64_t)ps->vi.fps_num; } fc_writer_write_av(ps->fc_writer, buf, (uint32_t)sz, ax.enabled ? ax.buf : NULL, (uint32_t)asz, pts_us); frame_seq++; } VHD_UnlockSlotHandle(slot); } else if (r != VHDERR_TIMEOUT) { fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping port\n", ps->port, (unsigned long)r); atomic_store(&g_port_stop[ps->port], 1); break; } } audio_extract_free(&ax); return NULL; } /* fc_writer == NULL → fall through to FIFO path */ fprintf(stderr, "[video:%u] fc_writer unavailable — falling back to FIFO\n", ps->port); #endif /* !LEGACY_FIFO */ /* ── Legacy FIFO path ──────────────────────────────────────────────── * Kept as compile-time fallback (-DLEGACY_FIFO=1) or when the * framecache service is not reachable at startup. * * Outer loop: reopen the FIFO writer each time a reader connects. * EPIPE means the ffmpeg sidecar for this port died (session * stop/restart), NOT a hardware fault. Reopen and block until the * next recorder start; other ports are unaffected. */ while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { int fd = open(ps->video_fifo, O_WRONLY); if (fd < 0) { fprintf(stderr, "[video:%u] open FIFO failed: %s\n", ps->port, strerror(errno)); struct timespec ts = {0, 200000000L}; nanosleep(&ts, NULL); continue; } { int pipe_sz = 64 * 1024 * 1024; /* 64 MB — ~16 frames of 1080p UYVY */ if (fcntl(fd, F_SETPIPE_SZ, pipe_sz) < 0) { fprintf(stderr, "[video:%u] fcntl F_SETPIPE_SZ failed: %s\n", ps->port, strerror(errno)); } } HANDLE slot = NULL; int fatal = 0; while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot); if (r == VHDERR_NOERROR) { /* LEGACY_FIFO path: video → video FIFO, audio → apcm ring → * audio_thread → audio FIFO (the old two-transport scheme). * Extract this frame's embedded audio on the SAME slot and push * it to the ring for audio_thread. (Frame coupling is only * available on the framecache path above; legacy keeps the * separate-FIFO behavior for nodes without framecache.) */ size_t asz = audio_extract_slot(&ax, ps, slot); if (asz > 0) apcm_push(&ps->apcm, ax.buf, asz); BYTE *buf = NULL; ULONG sz = 0; if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) { ULONG expected = (ULONG)ps->vi.width * (ULONG)ps->vi.height * 2; if (sz != expected) { fprintf(stderr, "[video:%u] WARN: slot sz=%lu != expected %lu (w=%d h=%d) -- packing mismatch; skipping frame\n", ps->port, (unsigned long)sz, (unsigned long)expected, ps->vi.width, ps->vi.height); VHD_UnlockSlotHandle(slot); continue; } if (write_all(fd, buf, sz) < 0) { fprintf(stderr, "[video:%u] EPIPE — waiting for next reader\n", ps->port); VHD_UnlockSlotHandle(slot); break; } } VHD_UnlockSlotHandle(slot); } else if (r != VHDERR_TIMEOUT) { fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping port\n", ps->port, (unsigned long)r); atomic_store(&g_port_stop[ps->port], 1); fatal = 1; break; } } close(fd); if (fatal) break; } audio_extract_free(&ax); return NULL; } /* ── Parse comma-separated port list ─────────────────────────────────── */ static int parse_ports(const char *csv, unsigned *ports, int max) { int count = 0; char buf[256]; strncpy(buf, csv, sizeof(buf) - 1); buf[sizeof(buf) - 1] = '\0'; char *tok = strtok(buf, ","); while (tok && count < max) { ports[count++] = (unsigned)atoi(tok); tok = strtok(NULL, ","); } return count; } /* ── Main ─────────────────────────────────────────────────────────────── */ int main(int argc, char *argv[]) { unsigned device_id = 0; unsigned ports[MAX_PORTS] = {0}; int port_count = 0; int sig_timeout = 30; const char *video_pipe_dir = "/dev/shm/deltacast"; const char *audio_pipe_dir = "/dev/shm/deltacast"; /* Framecache URL: CLI arg > FC_URL env var > default */ const char *fc_url_env = getenv("FC_URL"); const char *fc_url = fc_url_env ? fc_url_env : FC_URL_DEFAULT; for (int i = 1; i < argc; i++) { if (!strcmp(argv[i], "--device") && i+1 < argc) { device_id = (unsigned)atoi(argv[++i]); } else if (!strcmp(argv[i], "--ports") && i+1 < argc) { port_count = parse_ports(argv[++i], ports, MAX_PORTS); } else if (!strcmp(argv[i], "--port") && i+1 < argc) { /* single-port compat alias */ ports[0] = (unsigned)atoi(argv[++i]); port_count = 1; } else if (!strcmp(argv[i], "--video-pipe-dir") && i+1 < argc) { video_pipe_dir = argv[++i]; } else if (!strcmp(argv[i], "--audio-pipe-dir") && i+1 < argc) { audio_pipe_dir = argv[++i]; } else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc) { sig_timeout = atoi(argv[++i]); } else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) { fc_url = argv[++i]; } else if (!strcmp(argv[i], "--audio-delay-ms") && i+1 < argc) { g_audio_delay_ms = atoi(argv[++i]); if (g_audio_delay_ms < 0) g_audio_delay_ms = 0; if (g_audio_delay_ms > 1000) g_audio_delay_ms = 1000; } } /* Env override (FC_AUDIO_DELAY_MS) for ops who tune without editing the unit. */ { const char *ad = getenv("FC_AUDIO_DELAY_MS"); if (ad && *ad) { int v = atoi(ad); if (v >= 0 && v <= 1000) g_audio_delay_ms = v; } } if (port_count == 0) { fprintf(stderr, "{\"error\":\"no ports specified — use --ports 0,1,2,...\"}\n"); return 1; } signal(SIGINT, on_signal); signal(SIGTERM, on_signal); signal(SIGPIPE, SIG_IGN); /* ── Init API ────────────────────────────────────────────────────── */ ULONG dll_ver, nb_boards; if (VHD_GetApiInfo(&dll_ver, &nb_boards) != VHDERR_NOERROR) { fprintf(stderr, "{\"error\":\"VHD_GetApiInfo failed\"}\n"); return 1; } if (device_id >= nb_boards) { fprintf(stderr, "{\"error\":\"board %u not found (%lu detected)\"}\n", device_id, nb_boards); return 1; } /* ── Configure bi-directional channel mode before opening board ───── * * The DELTA-12G-e-h 8c is a bidirectional card. Unless we explicitly * call VHD_SetBiDirCfg(BrdId, VHD_BIDIR_80) the board may default to * a mixed RX/TX configuration (e.g. 4RX/4TX), which causes random RX * stream opens to fail with VHDERR_RESOURCEUNAVAILABLE and produces the * "connecting…" hang operators see when starting certain recorders. * * Per SDK sample Tools.cpp SetNbChannels(): open a temporary handle, * check IS_BIDIR and channel counts, call VHD_SetBiDirCfg if needed, * then close. The subsequent real board open will see all 8 as RX. */ { HANDLE tmp = NULL; if (VHD_OpenBoardHandle(device_id, &tmp, NULL, 0) == VHDERR_NOERROR) { ULONG nb_rx = 0, nb_tx = 0, is_bidir = 0; VHD_GetBoardProperty(tmp, VHD_CORE_BP_NB_RXCHANNELS, &nb_rx); VHD_GetBoardProperty(tmp, VHD_CORE_BP_NB_TXCHANNELS, &nb_tx); VHD_GetBoardProperty(tmp, VHD_CORE_BP_IS_BIDIR, &is_bidir); VHD_CloseBoardHandle(tmp); if (is_bidir) { /* Set all channels to RX. For 8-channel bidir: VHD_BIDIR_80. * VHD_SetBiDirCfg takes the board INDEX, not a handle. */ ULONG cfg = (nb_rx + nb_tx == 8) ? VHD_BIDIR_80 : VHD_BIDIR_40; ULONG r = VHD_SetBiDirCfg(device_id, cfg); if (r == VHDERR_NOERROR) fprintf(stderr, "[board] SetBiDirCfg(%lu) OK — %lu+%lu ch bidir configured all-RX\n", cfg, nb_rx, nb_tx); else fprintf(stderr, "[board] SetBiDirCfg warn rc=%lu (non-fatal)\n", r); } } } /* ── Open board ONCE ─────────────────────────────────────────────── */ HANDLE board = NULL; if (VHD_OpenBoardHandle(device_id, &board, NULL, 0) != VHDERR_NOERROR) { fprintf(stderr, "{\"error\":\"VHD_OpenBoardHandle failed for board %u\"}\n", device_id); return 1; } fprintf(stderr, "[board] opened board %u with %d port(s)\n", device_id, port_count); fprintf(stderr, "[board] audio A/V-align delay = %d ms\n", g_audio_delay_ms); /* Per SDK samples: for 12G-ASI or 3G-ASI channel types the channel must be * explicitly switched to SDI mode. Without this, VHD_SDI_CP_VIDEO_STANDARD * polls return NB_VHD_VIDEOSTANDARDS (no signal) even when signal present. * Also disable passive loopback for ports 0-3 so RX doesn't loop to TX. */ for (int pi = 0; pi < port_count; pi++) { unsigned p = ports[pi]; ULONG chn_type = 0; if (VHD_GetChannelProperty(board, VHD_RX_CHANNEL, p, VHD_CORE_CP_TYPE, &chn_type) == VHDERR_NOERROR) { if (chn_type == VHD_CHNTYPE_3GSDI_ASI || chn_type == VHD_CHNTYPE_12GSDI_ASI) VHD_SetChannelProperty(board, VHD_RX_CHANNEL, p, VHD_CORE_CP_MODE, VHD_CHANNEL_MODE_SDI); } if (p < 4) VHD_SetBoardProperty(board, loopback_prop(p), FALSE); } /* ── Wait for signal on all ports ───────────────────────────────── */ ULONG video_stds[MAX_PORTS] = {0}; ULONG clock_divs[MAX_PORTS] = {0}; int locked[MAX_PORTS] = {0}; for (int pi = 0; pi < port_count; pi++) { video_stds[pi] = (ULONG)NB_VHD_VIDEOSTANDARDS; clock_divs[pi] = VHD_CLOCKDIV_1; } struct timespec deadline; clock_gettime(CLOCK_MONOTONIC, &deadline); deadline.tv_sec += sig_timeout; while (!atomic_load(&g_stop)) { struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); if (now.tv_sec > deadline.tv_sec || (now.tv_sec == deadline.tv_sec && now.tv_nsec >= deadline.tv_nsec)) break; int all_locked = 1; for (int pi = 0; pi < port_count; pi++) { if (locked[pi]) continue; VHD_GetChannelProperty(board, VHD_RX_CHANNEL, ports[pi], VHD_SDI_CP_VIDEO_STANDARD, &video_stds[pi]); if (video_stds[pi] != (ULONG)NB_VHD_VIDEOSTANDARDS) { VHD_GetChannelProperty(board, VHD_RX_CHANNEL, ports[pi], VHD_SDI_CP_CLOCK_DIVISOR, &clock_divs[pi]); locked[pi] = 1; fprintf(stderr, "[board] port %u signal locked (std=%lu)\n", ports[pi], video_stds[pi]); } else { all_locked = 0; } } if (all_locked) break; struct timespec ts = {0, 200000000L}; /* 200ms poll */ nanosleep(&ts, NULL); } /* Report results — continue with whatever locked, abort only if NONE locked. */ int any_locked = 0; for (int pi = 0; pi < port_count; pi++) { if (locked[pi]) { any_locked = 1; } else { fprintf(stderr, "{\"error\":\"no signal on board %u port %u within %ds\"}\n", device_id, ports[pi], sig_timeout); } } if (!any_locked || atomic_load(&g_stop)) { VHD_CloseBoardHandle(board); return 1; } /* ── Create FIFOs and open streams for each locked port ─────────── */ PortState ps[MAX_PORTS]; memset(ps, 0, sizeof(ps)); int active_count = 0; /* Initialise per-port stop flags. */ for (int pi = 0; pi < MAX_PORTS; pi++) atomic_store(&g_port_stop[pi], 0); for (int pi = 0; pi < port_count; pi++) { if (!locked[pi]) continue; PortState *p = &ps[active_count]; p->board = board; p->port = ports[pi]; p->device = device_id; p->video_std = video_stds[pi]; p->clock_div = clock_divs[pi]; p->vi = video_info((VHD_VIDEOSTANDARD)video_stds[pi], (VHD_CLOCKDIVISOR)clock_divs[pi]); snprintf(p->video_fifo, sizeof(p->video_fifo), "%s/video-%u.fifo", video_pipe_dir, ports[pi]); snprintf(p->audio_fifo, sizeof(p->audio_fifo), "%s/audio-%u.fifo", audio_pipe_dir, ports[pi]); snprintf(p->slot_id, sizeof(p->slot_id), "deltacast-%u-%u", device_id, ports[pi]); strncpy(p->fc_url, fc_url, sizeof(p->fc_url) - 1); #ifndef LEGACY_FIFO /* ── Primary: frame-coupled framecache path ───────────────────────── * Open the framecache slot for video + this frame's embedded audio * (both packed into one ring entry by fc_writer_write_av). In this path * the bridge does NOT create or write the audio FIFO — capture-manager * creates it and fc_pipe writes it, sourced from the SAME ring entry as * the video so audio is frame-locked. Fall back to the legacy two-FIFO * scheme only if framecache is unreachable. */ p->fc_writer = fc_writer_open(p->fc_url, p->slot_id, (uint32_t)p->vi.width, (uint32_t)p->vi.height, (uint32_t)p->vi.fps_num, (uint32_t)p->vi.fps_den); if (!p->fc_writer) { fprintf(stderr, "[port:%u] framecache unavailable — falling back to legacy video+audio FIFOs\n", ports[pi]); if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) { fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno)); continue; } if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) { fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno)); continue; } } #else /* Legacy: always use video + audio FIFOs (two transports). */ if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) { fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno)); continue; } if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) { fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno)); continue; } #endif /* Open the RX stream in JOINED processing mode. * * JOINED (vs. the old DISJOINED_VIDEO + a separate DISJOINED_ANC audio * stream) means a single stream delivers slots that carry BOTH the * video frame AND its SDI-embedded ancillary audio. The video_thread * locks each slot once and pulls video (VHD_GetSlotBuffer) and that * frame's audio (VHD_SlotExtractAudio) from the SAME slot, so audio is * inherently frame-synchronised — eliminating the constant "audio ahead * of video" offset that two independently-buffered streams produced. * (Pattern per Deltacast's own FFmpeg fork: libavdevice/videomaster_common.c.) */ HANDLE vs = NULL; ULONG r = VHD_OpenStreamHandle(board, rx_streamtype(ports[pi]), VHD_SDI_STPROC_JOINED, NULL, &vs, NULL); if (r != VHDERR_NOERROR) { fprintf(stderr, "{\"error\":\"VHD_OpenStreamHandle JOINED failed port %u rc=%lu\"}\n", ports[pi], r); continue; } VHD_SetStreamProperty(vs, VHD_SDI_SP_VIDEO_STANDARD, p->video_std); VHD_SetStreamProperty(vs, VHD_SDI_SP_CLOCK_SYSTEM, p->clock_div); VHD_SetStreamProperty(vs, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED); VHD_SetStreamProperty(vs, VHD_CORE_SP_BUFFERQUEUE_DEPTH, 8); /* ── SDI interface propagation (required for embedded-audio extract) ── * Per Deltacast's FFmpeg fork (libavdevice/videomaster_common.c, * ff_videomaster_start_stream): a JOINED SDI stream must have * VHD_SDI_SP_INTERFACE set to the DETECTED cable interface before * StartStream, otherwise VHD_SlotExtractAudio on the resulting slots * returns zero samples. We read the channel's detected interface and * set it on the stream. (Prefer the channel-detected value; fall back * to the stream's current value if the channel property is unavailable * on this SDK build.) */ ULONG iface = 0; if (VHD_GetChannelProperty(board, VHD_RX_CHANNEL, ports[pi], VHD_SDI_CP_INTERFACE, &iface) == VHDERR_NOERROR) { VHD_SetStreamProperty(vs, VHD_SDI_SP_INTERFACE, iface); fprintf(stderr, "[board] port %u set SDI Interface (channel-detected) to %lu\n", ports[pi], iface); } else if (VHD_GetStreamProperty(vs, VHD_SDI_SP_INTERFACE, &iface) == VHDERR_NOERROR) { VHD_SetStreamProperty(vs, VHD_SDI_SP_INTERFACE, iface); fprintf(stderr, "[board] port %u set SDI Interface (stream default) to %lu\n", ports[pi], iface); } /* Pin to tightly-packed 8-bit UYVY. Relying on SDK default caused * the board to deliver frames whose size != width*height*2, * producing rolled/sheared ("bouncing and bending") video. */ VHD_SetStreamProperty(vs, VHD_CORE_SP_BUFFER_PACKING, VHD_BUFPACK_VIDEO_YUV422_8); p->video_stream = vs; if (VHD_StartStream(vs) != VHDERR_NOERROR) { fprintf(stderr, "{\"error\":\"VHD_StartStream video failed port %u\"}\n", ports[pi]); VHD_CloseStreamHandle(vs); p->video_stream = NULL; continue; } /* Emit format JSON to stderr (one line per port on signal lock). * Includes slot_id so node-agent / capture-manager can identify * the framecache slot for this port. */ fprintf(stderr, "{\"port\":%u,\"width\":%d,\"height\":%d," "\"fps_num\":%d,\"fps_den\":%d," "\"interlaced\":%s," "\"pix_fmt\":\"uyvy422\"," "\"audio_channels\":2,\"audio_rate\":48000," "\"device\":%u," "\"slot_id\":\"%s\"}\n", ports[pi], p->vi.width, p->vi.height, p->vi.fps_num, p->vi.fps_den, p->vi.interlaced ? "true" : "false", device_id, p->slot_id); fflush(stderr); /* ── Audio transport selection ────────────────────────────────────── * Frame-coupled path (fc_writer active): audio rides in the framecache * ring entry with its video frame; fc_pipe delivers it to the audio * FIFO frame-locked. NO apcm ring, NO audio_thread, NO bridge-owned * audio FIFO — there is no second transport to drift. * * Legacy path (fc_writer NULL, framecache unreachable, or -DLEGACY_FIFO): * keep the apcm ring + audio_thread that drains it to the separate audio * FIFO (the old two-transport scheme). */ int legacy_audio; #ifdef LEGACY_FIFO legacy_audio = 1; #else legacy_audio = (p->fc_writer == NULL); #endif if (legacy_audio) { p->apcm.buf = calloc(1, APCM_RING_BYTES); atomic_store(&p->apcm.w, 0); atomic_store(&p->apcm.r, 0); atomic_store(&p->apcm.have_embedded, 0); if (!p->apcm.buf) fprintf(stderr, "[port:%u] WARN: apcm ring alloc failed — audio will be silence\n", ports[pi]); /* Launch audio thread (FIFO sink: drains apcm ring → audio FIFO). */ pthread_create(&p->audio_tid, NULL, audio_thread, p); } else { fprintf(stderr, "[port:%u] frame-coupled audio (framecache ring) — no separate audio FIFO/thread\n", ports[pi]); } /* Launch video thread. fc_writer path: video + this frame's embedded * audio → ONE framecache ring entry (fc_writer_write_av). Legacy path: * video → video FIFO, audio → apcm ring → audio_thread → audio FIFO. */ pthread_create(&p->video_tid, NULL, video_thread, p); active_count++; } if (active_count == 0) { fprintf(stderr, "{\"error\":\"no ports successfully started\"}\n"); VHD_CloseBoardHandle(board); return 1; } /* ── Wait for all threads to finish ─────────────────────────────── */ for (int i = 0; i < active_count; i++) { if (ps[i].video_tid) pthread_join(ps[i].video_tid, NULL); if (ps[i].audio_tid) pthread_join(ps[i].audio_tid, NULL); } /* ── Cleanup ─────────────────────────────────────────────────────── */ for (int i = 0; i < active_count; i++) { if (ps[i].video_stream) { VHD_StopStream(ps[i].video_stream); VHD_CloseStreamHandle(ps[i].video_stream); } #ifndef LEGACY_FIFO if (ps[i].fc_writer) { fc_writer_close(ps[i].fc_writer); ps[i].fc_writer = NULL; } #endif if (ps[i].apcm.buf) { free(ps[i].apcm.buf); ps[i].apcm.buf = NULL; } } VHD_CloseBoardHandle(board); return 0; }