merge: frame-coupled embedded A/V capture (JOINED + AVI single-input)

Brings the working frame-coupled Deltacast capture to main:
- JOINED single-slot capture: video + SDI-embedded audio from the same VHD
  slot per frame (deltacast-bridge).
- Audio extraction fix: declare BOTH stereo channels for VHD_SlotExtractAudio
  (single-channel gave -91dB silence on this card/SDK).
- Single streaming AVI muxer in fc_pipe: one -f avi -i pipe:0 to ffmpeg,
  eliminating the two-raw-pipe ffmpeg startup deadlock and the separate audio
  FIFO transport entirely.

Verified end-to-end on live source: audio -15.5dB audible, video/audio packet
parity 1805/1805, identical durations (drift-free), 0 decode errors. No raw2bmx,
no separate audio FIFO. A/V sync confirmed by operator.
This commit is contained in:
OpenCode 2026-06-05 14:20:58 +00:00
commit b287ad08ef
10 changed files with 962 additions and 328 deletions

View file

@ -24,12 +24,18 @@
#include <netinet/in.h>
#include <arpa/inet.h>
/* Re-use the shared memory layout from the framecache service */
/* Re-use the shared memory layout from the framecache service.
* MUST stay byte-for-byte consistent with services/framecache/src/slot.h
* the writer and all readers share this layout. Bumped to FC_VERSION 2 for
* frame-coupled audio (each ring entry carries video + that frame's audio). */
#define FC_MAGIC 0x46524D43u
#define FC_VERSION 1u
#define FC_VERSION 2u
#define FC_RING_DEPTH 120u
#define FC_HEADER_SIZE 4096u
#define FC_FRAME_HDR_SIZE 24u
#define FC_MAX_AUDIO_BYTES 16384u /* must equal slot.h FC_MAX_AUDIO_BYTES */
#define FC_AUDIO_RATE 48000u
#define FC_AUDIO_CHANNELS 2u
typedef struct {
uint32_t magic;
@ -41,22 +47,30 @@ typedef struct {
uint32_t pixel_format;
uint32_t frame_size;
uint32_t ring_depth;
uint32_t audio_max_bytes; /* FC_MAX_AUDIO_BYTES */
uint32_t audio_rate; /* FC_AUDIO_RATE */
uint32_t audio_channels; /* FC_AUDIO_CHANNELS */
uint32_t _reserved;
_Atomic uint64_t write_cursor;
_Atomic uint64_t dropped_frames;
char source_type[32];
char slot_id[64];
uint8_t _pad[FC_HEADER_SIZE - 112];
uint8_t _pad[FC_HEADER_SIZE - 124];
} fc_hdr_t;
typedef struct {
uint64_t pts_us;
uint64_t wall_us;
uint32_t size;
uint32_t _pad;
uint8_t data[];
uint32_t size; /* video bytes */
uint32_t audio_size; /* audio bytes for this frame (s16le stereo 48k), 0 = none */
uint8_t data[]; /* [video frame_size][audio audio_max_bytes] */
} fc_frm_t;
/* Per-entry stride MUST include the audio region. */
static inline size_t fc_entry_stride(uint32_t frame_size) {
return (size_t)FC_FRAME_HDR_SIZE + frame_size + (size_t)FC_MAX_AUDIO_BYTES;
}
struct fc_writer {
void *base;
size_t shm_size;
@ -232,8 +246,13 @@ fc_writer_t *fc_writer_open(const char *fc_url,
fprintf(stderr, "[fc_writer:%s] bad shm header\n", slot_id);
close(fd); return NULL;
}
if (hdr.version != FC_VERSION) {
fprintf(stderr, "[fc_writer:%s] shm version %u != expected %u — refusing (fail-safe)\n",
slot_id, hdr.version, FC_VERSION);
close(fd); return NULL;
}
size_t total = (size_t)FC_HEADER_SIZE
+ (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + hdr.frame_size);
+ (size_t)FC_RING_DEPTH * fc_entry_stride(hdr.frame_size);
void *base = mmap(NULL, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) {
@ -268,14 +287,23 @@ fc_writer_t *fc_writer_open(const char *fc_url,
void fc_writer_write(fc_writer_t *w,
const uint8_t *data, uint32_t size,
uint64_t pts_us)
{
/* Video-only convenience wrapper (no embedded audio this frame). */
fc_writer_write_av(w, data, size, NULL, 0, pts_us);
}
void fc_writer_write_av(fc_writer_t *w,
const uint8_t *video, uint32_t vsize,
const uint8_t *audio, uint32_t asize,
uint64_t pts_us)
{
fc_hdr_t *hdr = (fc_hdr_t *)w->base;
uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed);
uint64_t idx = cur % FC_RING_DEPTH;
/* Locate frame in ring */
/* Locate entry in ring (stride includes the audio region). */
uint8_t *frames = (uint8_t *)w->base + FC_HEADER_SIZE;
fc_frm_t *frame = (fc_frm_t *)(frames + idx * ((size_t)FC_FRAME_HDR_SIZE + hdr->frame_size));
fc_frm_t *frame = (fc_frm_t *)(frames + idx * fc_entry_stride(hdr->frame_size));
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
@ -283,8 +311,16 @@ void fc_writer_write(fc_writer_t *w,
frame->pts_us = pts_us;
frame->wall_us = wall;
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
memcpy(frame->data, data, frame->size);
frame->size = vsize < hdr->frame_size ? vsize : hdr->frame_size;
memcpy(frame->data, video, frame->size);
/* Frame-coupled audio: pack THIS frame's audio immediately after the video
* bytes, within the reserved audio region. Clamp to capacity. */
uint32_t acap = hdr->audio_max_bytes ? hdr->audio_max_bytes : FC_MAX_AUDIO_BYTES;
uint32_t awr = (audio && asize) ? (asize < acap ? asize : acap) : 0;
frame->audio_size = awr;
if (awr)
memcpy(frame->data + hdr->frame_size, audio, awr);
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
sem_post(w->sem);

View file

@ -40,6 +40,17 @@ void fc_writer_write(fc_writer_t *w,
const uint8_t *data, uint32_t size,
uint64_t pts_us);
/**
* Write one FRAME-COUPLED entry: a video frame plus that frame's SDI-embedded
* audio (interleaved s16le stereo 48k) into the SAME ring slot. Both are read
* back together by one consumer iteration, so audio cannot drift from video.
* audio may be NULL / asize 0 when the frame has no embedded audio.
*/
void fc_writer_write_av(fc_writer_t *w,
const uint8_t *video, uint32_t vsize,
const uint8_t *audio, uint32_t asize,
uint64_t pts_us);
/**
* Deregister slot from framecache service and unmap shm.
*/

View file

@ -170,6 +170,69 @@ static int write_all(int fd, const unsigned char *p, size_t len) {
return 0;
}
/* ── Embedded-audio PCM ring (SPSC) ───────────────────────────────────────
* JOINED architecture: the video_thread extracts the SDI-embedded audio of
* each frame from the SAME slot it pulls video from, and pushes that PCM into
* this lock-free single-producer/single-consumer ring. The audio_thread is the
* single consumer: it drains the ring into the named audio FIFO (ffmpeg input
* 1) and survives ffmpeg restarts (EPIPE reopen) without touching the board.
*
* Why a ring instead of writing the FIFO directly from video_thread:
* - open(audio_fifo, O_WRONLY) blocks until an ffmpeg reader attaches. If the
* video_thread blocked on that, video capture would stall. The ring keeps
* the board-paced frame loop (video + audio extract) free-running while the
* FIFO lifecycle (blocking open, EPIPE reopen, silence fallback) lives in
* audio_thread.
* - Audio is still bound to its exact video frame because it is extracted on
* the SAME slot in the SAME loop iteration zero constant offset at root.
*
* 4 MB holds ~21 s of 48 kHz stereo s16le far more than any FIFO hiccup. */
#define APCM_RING_BYTES (4u * 1024u * 1024u)
typedef struct {
unsigned char *buf; /* APCM_RING_BYTES, power-of-two-free */
_Atomic size_t w; /* producer write offset (monotonic) */
_Atomic size_t r; /* consumer read offset (monotonic) */
_Atomic int have_embedded; /* 1 once real embedded PCM seen */
} ApcmRing;
/* Producer (video_thread): copy n bytes in; drop on overflow (never blocks). */
static void apcm_push(ApcmRing *ring, const unsigned char *src, size_t n) {
if (!ring->buf || n == 0) return;
size_t w = atomic_load_explicit(&ring->w, memory_order_relaxed);
size_t r = atomic_load_explicit(&ring->r, memory_order_acquire);
size_t used = w - r;
if (used + n > APCM_RING_BYTES) {
/* Overflow: reader stalled (no ffmpeg attached, or slow). Drop the
* oldest by advancing nothing here and simply refusing the write
* keeping the most-recent contiguous audio aligned to live video. */
return;
}
for (size_t i = 0; i < n; i++)
ring->buf[(w + i) % APCM_RING_BYTES] = src[i];
atomic_store_explicit(&ring->w, w + n, memory_order_release);
}
/* Consumer (audio_thread): pop up to max bytes; returns bytes copied. */
static size_t apcm_pop(ApcmRing *ring, unsigned char *dst, size_t max) {
if (!ring->buf) return 0;
size_t r = atomic_load_explicit(&ring->r, memory_order_relaxed);
size_t w = atomic_load_explicit(&ring->w, memory_order_acquire);
size_t avail = w - r;
size_t n = avail < max ? avail : max;
for (size_t i = 0; i < n; i++)
dst[i] = ring->buf[(r + i) % APCM_RING_BYTES];
atomic_store_explicit(&ring->r, r + n, memory_order_release);
return n;
}
/* Consumer: discard everything currently queued (flush stale backlog to the
* live edge when a fresh reader attaches). */
static void apcm_drain(ApcmRing *ring) {
if (!ring->buf) return;
size_t w = atomic_load_explicit(&ring->w, memory_order_acquire);
atomic_store_explicit(&ring->r, w, memory_order_release);
}
/* ── Per-port state ───────────────────────────────────────────────────── */
typedef struct {
HANDLE board;
@ -186,20 +249,37 @@ typedef struct {
pthread_t video_tid;
pthread_t audio_tid;
/* streams (owned by threads, set before thread launch) */
HANDLE video_stream;
HANDLE video_stream; /* JOINED RX stream: carries video + embedded audio */
#ifndef LEGACY_FIFO
fc_writer_t *fc_writer; /* shm ring buffer writer (NULL = use FIFO fallback) */
#endif
/* JOINED embedded-audio plumbing (producer=video_thread, consumer=audio_thread) */
ApcmRing apcm; /* video_thread → audio_thread PCM hand-off */
} PortState;
/* ── Audio thread ──────────────────────────────────────────────────────
/* ── Audio thread (JOINED architecture: FIFO sink, no second VHD stream) ──
*
* - Opens FIFO writer (blocks until a reader connects correct behaviour).
* - Feeds continuous wall-clock-paced s16le stereo (real or silence).
* - Best-effort VHD audio stream; silence fallback on any failure.
* - On EPIPE (ffmpeg reader died): closes and REOPENS the FIFO so the
* thread survives an ffmpeg restart without bringing down other ports.
* In the JOINED re-architecture the board is opened with ONE RX stream per
* port (VHD_SDI_STPROC_JOINED). The video_thread locks each slot and extracts
* BOTH the video frame and that frame's SDI-EMBEDDED audio from the SAME slot,
* pushing the de-interleaved s16le stereo PCM into ps->apcm. Because the audio
* is the embedded audio of the exact frame, it is inherently sync'd with that
* frame zero constant offset at the root (no separate DISJOINED_ANC stream,
* no independent buffer queue racing ahead of video).
*
* This thread NO LONGER opens a VHD stream. Its sole job is FIFO lifecycle:
* - Open the named audio FIFO (blocks until ffmpeg input 1 attaches).
* - On reader attach, flush the ring backlog to the LIVE edge.
* - Drain ps->apcm FIFO. When the ring is momentarily empty, emit
* wall-clock-paced silence so ffmpeg input 1 never starves (also the
* silence-fallback when the signal carries no embedded audio at all).
* - On EPIPE (ffmpeg reader died): close and REOPEN the FIFO so the thread
* survives an ffmpeg restart without bringing down other ports.
* EPIPE never sets g_stop only SIGTERM/SIGINT does that.
*
* The legacy --audio-delay-ms knob is still honoured (prepended once on reader
* attach) but should be UNNECESSARY now that audio rides with its frame; leave
* it at the default 0.
*/
static void *audio_thread(void *arg) {
PortState *ps = (PortState *)arg;
@ -213,61 +293,15 @@ static void *audio_thread(void *arg) {
if (samples_per_frame < 1) samples_per_frame = 1;
size_t tick_bytes = (size_t)samples_per_frame * FRAME_BYTES;
ULONG max_samples = VHD_GetNbSamples((VHD_VIDEOSTANDARD)ps->video_std,
(VHD_CLOCKDIVISOR)ps->clock_div,
VHD_ASR_48000, 0);
ULONG block_size = VHD_GetBlockSize(VHD_AF_16, VHD_AM_STEREO);
size_t vhd_buf_sz = ((size_t)max_samples + 64) * (block_size ? block_size : FRAME_BYTES);
size_t buf_sz = vhd_buf_sz > tick_bytes ? vhd_buf_sz : tick_bytes;
long frame_ns = (long)(1000000000.0 * (double)fps_den / (double)fps_num);
/* Scratch buffer: large enough for a generous burst pulled from the ring
* in one go (several frames of audio) plus the per-tick silence buffer. */
size_t buf_sz = tick_bytes * 8;
if (buf_sz < 65536) buf_sz = 65536;
unsigned char *buf = calloc(1, buf_sz);
if (!buf) return NULL;
/* Open the VHD audio stream once for the lifetime of the bridge.
* The stream stays open across reader reconnects no need to reopen it. */
HANDLE stream = NULL;
int have_vhd_audio = 0;
VHD_AUDIOINFO ai;
memset(&ai, 0, sizeof(ai));
ULONG r = VHD_OpenStreamHandle(ps->board, rx_streamtype(ps->port),
VHD_SDI_STPROC_DISJOINED_ANC,
NULL, &stream, NULL);
if (r == VHDERR_NOERROR) {
/* Per Deltacast SDK Sample_RXAudio.cpp: VHD_SDI_SP_INTERFACE must be
* propagated to the audio stream, otherwise VHD_SlotExtractAudio
* returns 0 samples (silent capture). */
ULONG iface = 0;
VHD_GetStreamProperty(stream, VHD_SDI_SP_INTERFACE, &iface);
VHD_SetStreamProperty(stream, VHD_SDI_SP_VIDEO_STANDARD, ps->video_std);
VHD_SetStreamProperty(stream, VHD_SDI_SP_CLOCK_SYSTEM, ps->clock_div);
VHD_SetStreamProperty(stream, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED);
VHD_SetStreamProperty(stream, VHD_SDI_SP_INTERFACE, iface);
/* Configure BOTH channels of the stereo pair (group 0). The actual PCM
* samples land in pAudioChannels[0].pData (packed L/R s16le). Channel
* [1] must declare Mode+BufferFormat so the SDK recognizes the pair. */
ai.pAudioGroups[0].pAudioChannels[0].Mode = VHD_AM_STEREO;
ai.pAudioGroups[0].pAudioChannels[0].BufferFormat = VHD_AF_16;
ai.pAudioGroups[0].pAudioChannels[0].pData = buf;
ai.pAudioGroups[0].pAudioChannels[1].Mode = VHD_AM_STEREO;
ai.pAudioGroups[0].pAudioChannels[1].BufferFormat = VHD_AF_16;
if (VHD_StartStream(stream) == VHDERR_NOERROR) {
have_vhd_audio = 1;
} else {
fprintf(stderr, "[audio:%u] VHD_StartStream failed — feeding silence\n", ps->port);
VHD_CloseStreamHandle(stream);
stream = NULL;
}
} else {
fprintf(stderr, "[audio:%u] VHD_OpenStreamHandle failed (%lu) — feeding silence\n",
ps->port, r);
}
long frame_ns = (long)(1000000000.0 * (double)fps_den / (double)fps_num);
HANDLE slot = NULL;
/* Outer loop: reopen the FIFO writer each time a reader connects.
* This allows the bridge to survive ffmpeg session stop/restart on a port
* without affecting any other port's threads. */
@ -283,55 +317,21 @@ static void *audio_thread(void *arg) {
}
fcntl(fd, F_SETPIPE_SZ, 1024 * 1024);
/* ── Flush the VHD audio slot backlog to the LIVE edge ──────────────
/* ── Flush the embedded-audio ring backlog to the LIVE edge ─────────
* While no reader is attached (recorder idle/standby), the open() above
* blocks but the VHD audio stream keeps running, so its internal slot
* queue fills with buffered audio. Without flushing, the first thing a
* newly-attached reader (the record ffmpeg) receives is that backlog
* several seconds of stale/sync-warmup audio that plays as leading
* silence and pushes the audio stream out of alignment with the live
* video. Drain all immediately-available slots (non-blocking via the
* SDK timeout) so we hand the reader the LIVE edge, frame-aligned with
* the video that fc_pipe is delivering right now. */
if (have_vhd_audio) {
/* Drain the QUEUED backlog only: keep discarding slots while each
* lock returns FAST (the board hands back already-buffered slots in
* well under a frame period). The first lock that takes ~a full frame
* period means the queue is empty and we're now waiting on a LIVE
* slot at that point we've reached the live edge, so stop WITHOUT
* consuming it (the inner loop will pick it up and write it). */
const long fast_ns = frame_ns / 2; /* "immediate" threshold */
int flushed = 0;
for (;;) {
struct timespec a, b;
clock_gettime(CLOCK_MONOTONIC, &a);
HANDLE fslot = NULL;
ULONG fr = VHD_LockSlotHandle(stream, &fslot);
clock_gettime(CLOCK_MONOTONIC, &b);
if (fr != VHDERR_NOERROR) break; /* TIMEOUT/error => drained */
long lock_ns = (b.tv_sec - a.tv_sec) * 1000000000L + (b.tv_nsec - a.tv_nsec);
VHD_UnlockSlotHandle(fslot);
if (lock_ns >= fast_ns) break; /* waited for a live slot => stop */
if (++flushed > 8192) break; /* hard safety cap */
}
if (flushed > 0)
fprintf(stderr, "[audio:%u] flushed %d stale slots on reader attach\n",
ps->port, flushed);
}
/* Reset wall-clock baseline after potentially blocking on open().
* Only used for the SILENCE fallback path (no hardware audio). */
struct timespec next;
clock_gettime(CLOCK_MONOTONIC, &next);
* blocks but the video_thread keeps free-running and pushing the
* embedded audio of every live frame into ps->apcm. Without flushing,
* the first thing a newly-attached reader (the record ffmpeg) receives
* is that backlog seconds of stale audio that plays as leading
* mis-sync. Discard everything queued so we hand the reader the LIVE
* edge, frame-aligned with the video fc_pipe is delivering right now. */
apcm_drain(&ps->apcm);
/* ── Fixed A/V alignment: prepend g_audio_delay_ms of leading silence ──
* The video path is buffered deeper than this audio FIFO, so audio would
* otherwise arrive at the muxer ahead of its matching video frame. Writing
* N ms of silence here (once, right after reaching the live edge) shifts
* the entire audio timeline N ms LATER, re-aligning it with video. The
* samples are real PCM zeros at 48 kHz so they consume exactly N ms of the
* audio timeline ffmpeg derives audio PTS from sample count, so this is a
* precise, drift-free delay. */
* Retained for compatibility; with JOINED capture audio already rides
* with its frame so this should stay 0. When set, the real PCM zeros at
* 48 kHz consume exactly N ms of the audio timeline (ffmpeg derives
* audio PTS from sample count) a precise, drift-free shift. */
if (g_audio_delay_ms > 0) {
long delay_samples = (long)AUDIO_RATE * g_audio_delay_ms / 1000;
size_t delay_bytes = (size_t)delay_samples * FRAME_BYTES;
@ -349,61 +349,49 @@ static void *audio_thread(void *arg) {
ps->port, g_audio_delay_ms, delay_samples);
}
/* Inner loop: feed audio into the open FIFO until reader exits (EPIPE). */
/* Wall-clock baseline for the silence-fill cadence. */
struct timespec next;
clock_gettime(CLOCK_MONOTONIC, &next);
/* Inner loop: drain the ring into the FIFO until the reader exits.
*
* Pacing model:
* - Whenever the ring has embedded PCM, write ALL of it immediately.
* The producer (video_thread) is paced by the board's JOINED slot
* cadence = the true SDI clock, so the volume of bytes the ring
* accumulates per unit time exactly tracks video. We never pad or
* resample it, so the audio timeline length matches video length
* (no progressive drift).
* - When the ring is empty for a whole frame interval (no embedded
* audio on the signal, or a brief gap), emit exactly one frame of
* silence, wall-clock paced, so ffmpeg input 1 never starves. */
int wrote_real_since_log = 0;
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
size_t out_bytes = 0;
if (have_vhd_audio) {
/* HARDWARE-PACED PATH (the normal case).
* VHD_LockSlotHandle blocks until the board has the next audio
* slot ready this slot is generated from the SAME SDI signal
* as the video, so blocking here paces audio in lockstep with
* video at the TRUE hardware rate. We write ONLY the real
* samples the board gives us (no silence padding, no wall-clock
* sleep) so the audio timeline length exactly tracks video.
* This is the fix for progressive A/V drift: mixing wall-clock
* paced silence with variable-length real reads made the audio
* stream length diverge from the video stream length. */
r = VHD_LockSlotHandle(stream, &slot);
if (r == VHDERR_NOERROR) {
ai.pAudioGroups[0].pAudioChannels[0].DataSize = (ULONG)buf_sz;
if (VHD_SlotExtractAudio(slot, &ai) == VHDERR_NOERROR) {
ULONG sz = ai.pAudioGroups[0].pAudioChannels[0].DataSize;
if (sz > 0 && (size_t)sz <= buf_sz) out_bytes = (size_t)sz;
}
VHD_UnlockSlotHandle(slot);
if (out_bytes > 0) {
if (write_all(fd, buf, out_bytes) < 0) {
fprintf(stderr, "[audio:%u] EPIPE — waiting for next reader\n", ps->port);
break;
}
}
/* No wall-clock sleep — the board's slot cadence is the clock. */
continue;
} else if (r == VHDERR_TIMEOUT) {
/* No slot yet — loop and try again (do NOT inject silence,
* that would add extra samples and cause drift). */
continue;
} else {
fprintf(stderr, "[audio:%u] lock error %lu — degrading to silence\n",
ps->port, r);
VHD_StopStream(stream);
VHD_CloseStreamHandle(stream);
stream = NULL;
have_vhd_audio = 0;
clock_gettime(CLOCK_MONOTONIC, &next); /* rebase silence clock */
size_t got = apcm_pop(&ps->apcm, buf, buf_sz);
if (got > 0) {
if (write_all(fd, buf, got) < 0) {
fprintf(stderr, "[audio:%u] EPIPE — waiting for next reader\n", ps->port);
break;
}
if (!wrote_real_since_log &&
atomic_load_explicit(&ps->apcm.have_embedded, memory_order_relaxed)) {
fprintf(stderr, "[audio:%u] streaming SDI-embedded audio (JOINED slot)\n",
ps->port);
wrote_real_since_log = 1;
}
/* Re-baseline the silence clock so we don't burst silence right
* after a real chunk; the next empty interval starts from now. */
clock_gettime(CLOCK_MONOTONIC, &next);
/* Small yield to avoid a busy spin when the ring is being fed in
* sub-frame increments; the board cadence refills it promptly. */
struct timespec ts = {0, frame_ns / 4 > 0 ? frame_ns / 4 : 250000L};
nanosleep(&ts, NULL);
continue;
}
/* SILENCE FALLBACK PATH (no hardware audio available).
* Wall-clock paced one-frame-of-silence per video-frame interval so
* ffmpeg's input 1 never starves and audio length still tracks
* real time. */
/* Ring empty this interval → emit one frame of silence, paced. */
memset(buf, 0, tick_bytes);
out_bytes = tick_bytes;
if (write_all(fd, buf, out_bytes) < 0) {
if (write_all(fd, buf, tick_bytes) < 0) {
fprintf(stderr, "[audio:%u] EPIPE — waiting for next reader\n", ps->port);
break;
}
@ -423,18 +411,127 @@ static void *audio_thread(void *arg) {
close(fd);
}
if (stream) {
VHD_StopStream(stream);
VHD_CloseStreamHandle(stream);
}
free(buf);
return NULL;
}
/* ── Embedded-audio extraction context (used inside the JOINED video loop) ─
* Set up once per video_thread; reused for every slot. The VHD_AUDIOINFO is
* configured for a single stereo pair (group 0) in s16le, exactly as the old
* DISJOINED_ANC audio path was the SDK lands packed L/R s16le PCM in
* pAudioChannels[0].pData with the byte count in .DataSize. */
typedef struct {
int enabled; /* 0 = no scratch buffer (extraction disabled) */
unsigned char *buf; /* scratch PCM landing buffer */
size_t buf_sz;
size_t silence_bytes; /* one frame of s16le stereo silence (fallback) */
VHD_AUDIOINFO ai;
} AudioExtract;
static void audio_extract_init(AudioExtract *ax, PortState *ps) {
memset(ax, 0, sizeof(*ax));
/* Worst-case samples per frame at this standard/clock, + headroom. */
ULONG max_samples = VHD_GetNbSamples((VHD_VIDEOSTANDARD)ps->video_std,
(VHD_CLOCKDIVISOR)ps->clock_div,
VHD_ASR_48000, 0);
ULONG block_size = VHD_GetBlockSize(VHD_AF_16, VHD_AM_STEREO);
size_t fb = (size_t)2 /*ch*/ * 2 /*s16*/;
if (block_size == 0) block_size = (ULONG)fb;
/* Exact per-frame capacity for ONE stereo pair (s16le, packed L/R), the
* size the SDK fills in pAudioChannels[0].pData. + headroom. */
ax->buf_sz = ((size_t)max_samples + 64) * (size_t)block_size;
if (ax->buf_sz < 65536) ax->buf_sz = 65536;
ax->buf = calloc(1, ax->buf_sz);
if (!ax->buf) { ax->enabled = 0; return; }
/* One video-frame worth of s16le stereo silence (samples/frame * 2ch * 2B),
* used as the frame-coupled silence fallback when the signal carries no
* embedded audio on a frame keeps the audio timeline length == video. */
{
int fn = ps->vi.fps_num > 0 ? ps->vi.fps_num : 25;
int fd = ps->vi.fps_den > 0 ? ps->vi.fps_den : 1;
long spf = ((long)48000 * fd + fn / 2) / fn;
if (spf < 1) spf = 1;
ax->silence_bytes = (size_t)spf * 2 /*ch*/ * 2 /*s16*/;
if (ax->silence_bytes > ax->buf_sz) ax->silence_bytes = ax->buf_sz;
}
memset(&ax->ai, 0, sizeof(ax->ai));
/* ── Silent-audio FIX ───────────────────────────────────────────────────
* Configure ONLY pAudioChannels[0] of group 0 as ONE stereo pair, exactly
* like Deltacast's own FFmpeg fork (libavdevice/videomaster_common.c,
* init_audio_info): for a stereo pair the SDK packs interleaved L/R s16le
* into the EVEN channel's pData; the ODD channel (index 1) must be left
* ZEROED. The previous JOINED code ALSO set pAudioChannels[1].Mode/
* BufferFormat = STEREO/AF_16, declaring a SECOND stereo pair the signal
* does not carry. That mismatch made VHD_SlotExtractAudio return zero
* samples the -91 dB "silent audio" regression. Leaving channel[1] zero
* and sizing DataSize = nb_samples * VHD_GetBlockSize(AF_16, STEREO) (set
* per call in audio_extract_slot) makes the SDK land real PCM.
*
* DataSize must be (re)set to the buffer capacity before EACH extract call
* because the SDK overwrites it with the number of bytes actually written. */
ax->ai.pAudioGroups[0].pAudioChannels[0].Mode = VHD_AM_STEREO;
ax->ai.pAudioGroups[0].pAudioChannels[0].BufferFormat = VHD_AF_16;
ax->ai.pAudioGroups[0].pAudioChannels[0].pData = ax->buf;
/* BOTH channels of the stereo pair MUST be declared (Mode+BufferFormat) for
* VHD_SlotExtractAudio to return samples on THIS card/SDK. Configuring only
* channel[0] (per the upstream FFmpeg fork's pattern) yielded -91dB silence
* here; declaring channel[1] too matching the proven DISJOINED_ANC config
* that extracted real audio makes the SDK land real PCM (verified -13.3dB
* against the live source). The interleaved L/R s16le still lands in
* channel[0].pData; channel[1] just needs its descriptor present. */
ax->ai.pAudioGroups[0].pAudioChannels[1].Mode = VHD_AM_STEREO;
ax->ai.pAudioGroups[0].pAudioChannels[1].BufferFormat = VHD_AF_16;
ax->enabled = 1;
}
/* Extract this slot's SDI-embedded audio into ax->buf and return the byte
* count. Must be called while `slot` is locked (JOINED slot = same frame as the
* video) so the returned PCM is exactly this video frame's embedded audio.
*
* Frame-coupled silence fallback: if the slot yields zero samples (no embedded
* audio on the signal, or a transient extract miss) we fill ax->buf with one
* frame-interval of silence and return that, so EVERY ring entry carries one
* frame of audio and the audio timeline length always equals the video timeline
* length (drift-free). On real embedded PCM we flag have_embedded for logging.
*
* Returns the number of valid PCM bytes in ax->buf (>= 0). */
static size_t audio_extract_slot(AudioExtract *ax, PortState *ps, HANDLE slot) {
if (!ax->enabled) return 0;
ax->ai.pAudioGroups[0].pAudioChannels[0].DataSize = (ULONG)ax->buf_sz;
if (VHD_SlotExtractAudio(slot, &ax->ai) == VHDERR_NOERROR) {
ULONG sz = ax->ai.pAudioGroups[0].pAudioChannels[0].DataSize;
if (sz > 0 && (size_t)sz <= ax->buf_sz) {
atomic_store_explicit(&ps->apcm.have_embedded, 1, memory_order_relaxed);
return (size_t)sz;
}
}
/* No embedded audio this frame → one frame-interval of silence. */
if (ax->silence_bytes) memset(ax->buf, 0, ax->silence_bytes);
return ax->silence_bytes;
}
static void audio_extract_free(AudioExtract *ax) {
if (ax->buf) free(ax->buf);
ax->buf = NULL;
ax->enabled = 0;
}
/* ── Video thread ─────────────────────────────────────────────────────── */
static void *video_thread(void *arg) {
PortState *ps = (PortState *)arg;
/* JOINED: set up embedded-audio extraction once; reused for every slot. */
AudioExtract ax;
audio_extract_init(&ax, ps);
if (ax.enabled)
fprintf(stderr, "[video:%u] JOINED audio extraction armed (buf=%zu)\n",
ps->port, ax.buf_sz);
else
fprintf(stderr, "[video:%u] WARN: audio extract buffer alloc failed — silence only\n",
ps->port);
#ifndef LEGACY_FIFO
/* ── Framecache shm path (primary) ──────────────────────────────────
* Write frames directly into the shared memory ring buffer.
@ -452,6 +549,17 @@ static void *video_thread(void *arg) {
HANDLE slot = NULL;
ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot);
if (r == VHDERR_NOERROR) {
/* ── JOINED frame-coupled capture ───────────────────────────
* Extract this frame's SDI-embedded audio from the SAME locked
* slot as the video, then write BOTH into ONE framecache ring
* entry via fc_writer_write_av. Because audio + video share one
* ring slot advanced by one atomic cursor step, and are read
* back together by one consumer (fc_pipe) iteration, the audio
* can never drift from its video frame there is no separate
* audio transport/buffer. (audio_extract_slot returns one
* frame of silence when the signal carries no embedded audio.)*/
size_t asz = audio_extract_slot(&ax, ps, slot);
BYTE *buf = NULL;
ULONG sz = 0;
if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) {
@ -470,7 +578,9 @@ static void *video_thread(void *arg) {
* (uint64_t)ps->vi.fps_den
/ (uint64_t)ps->vi.fps_num;
}
fc_writer_write(ps->fc_writer, buf, (uint32_t)sz, pts_us);
fc_writer_write_av(ps->fc_writer, buf, (uint32_t)sz,
ax.enabled ? ax.buf : NULL,
(uint32_t)asz, pts_us);
frame_seq++;
}
VHD_UnlockSlotHandle(slot);
@ -481,6 +591,7 @@ static void *video_thread(void *arg) {
break;
}
}
audio_extract_free(&ax);
return NULL;
}
/* fc_writer == NULL → fall through to FIFO path */
@ -518,6 +629,15 @@ static void *video_thread(void *arg) {
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot);
if (r == VHDERR_NOERROR) {
/* LEGACY_FIFO path: video → video FIFO, audio → apcm ring →
* audio_thread audio FIFO (the old two-transport scheme).
* Extract this frame's embedded audio on the SAME slot and push
* it to the ring for audio_thread. (Frame coupling is only
* available on the framecache path above; legacy keeps the
* separate-FIFO behavior for nodes without framecache.) */
size_t asz = audio_extract_slot(&ax, ps, slot);
if (asz > 0) apcm_push(&ps->apcm, ax.buf, asz);
BYTE *buf = NULL;
ULONG sz = 0;
if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) {
@ -550,6 +670,7 @@ static void *video_thread(void *arg) {
if (fatal) break;
}
audio_extract_free(&ax);
return NULL;
}
@ -768,41 +889,57 @@ int main(int argc, char *argv[]) {
"deltacast-%u-%u", device_id, ports[pi]);
strncpy(p->fc_url, fc_url, sizeof(p->fc_url) - 1);
/* Create audio FIFO (always needed — audio stays in FIFO for now). */
if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) {
fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno));
continue;
}
#ifndef LEGACY_FIFO
/* Open framecache slot for video frames.
* Fall back to FIFO if framecache is unreachable. */
/* ── Primary: frame-coupled framecache path ─────────────────────────
* Open the framecache slot for video + this frame's embedded audio
* (both packed into one ring entry by fc_writer_write_av). In this path
* the bridge does NOT create or write the audio FIFO capture-manager
* creates it and fc_pipe writes it, sourced from the SAME ring entry as
* the video so audio is frame-locked. Fall back to the legacy two-FIFO
* scheme only if framecache is unreachable. */
p->fc_writer = fc_writer_open(p->fc_url, p->slot_id,
(uint32_t)p->vi.width, (uint32_t)p->vi.height,
(uint32_t)p->vi.fps_num, (uint32_t)p->vi.fps_den);
if (!p->fc_writer) {
fprintf(stderr, "[port:%u] framecache unavailable — creating video FIFO fallback\n",
fprintf(stderr, "[port:%u] framecache unavailable — falling back to legacy video+audio FIFOs\n",
ports[pi]);
if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) {
fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno));
continue;
}
if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) {
fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno));
continue;
}
}
#else
/* Legacy: always use video FIFO */
/* Legacy: always use video + audio FIFOs (two transports). */
if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) {
fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno));
continue;
}
if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) {
fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno));
continue;
}
#endif
/* Open video stream. */
/* Open the RX stream in JOINED processing mode.
*
* JOINED (vs. the old DISJOINED_VIDEO + a separate DISJOINED_ANC audio
* stream) means a single stream delivers slots that carry BOTH the
* video frame AND its SDI-embedded ancillary audio. The video_thread
* locks each slot once and pulls video (VHD_GetSlotBuffer) and that
* frame's audio (VHD_SlotExtractAudio) from the SAME slot, so audio is
* inherently frame-synchronised eliminating the constant "audio ahead
* of video" offset that two independently-buffered streams produced.
* (Pattern per Deltacast's own FFmpeg fork: libavdevice/videomaster_common.c.) */
HANDLE vs = NULL;
ULONG r = VHD_OpenStreamHandle(board, rx_streamtype(ports[pi]),
VHD_SDI_STPROC_DISJOINED_VIDEO,
VHD_SDI_STPROC_JOINED,
NULL, &vs, NULL);
if (r != VHDERR_NOERROR) {
fprintf(stderr, "{\"error\":\"VHD_OpenStreamHandle video failed port %u rc=%lu\"}\n",
fprintf(stderr, "{\"error\":\"VHD_OpenStreamHandle JOINED failed port %u rc=%lu\"}\n",
ports[pi], r);
continue;
}
@ -810,10 +947,25 @@ int main(int argc, char *argv[]) {
VHD_SetStreamProperty(vs, VHD_SDI_SP_CLOCK_SYSTEM, p->clock_div);
VHD_SetStreamProperty(vs, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED);
VHD_SetStreamProperty(vs, VHD_CORE_SP_BUFFERQUEUE_DEPTH, 8);
/* ── SDI interface propagation (required for embedded-audio extract) ──
* Per Deltacast's FFmpeg fork (libavdevice/videomaster_common.c,
* ff_videomaster_start_stream): a JOINED SDI stream must have
* VHD_SDI_SP_INTERFACE set to the DETECTED cable interface before
* StartStream, otherwise VHD_SlotExtractAudio on the resulting slots
* returns zero samples. We read the channel's detected interface and
* set it on the stream. (Prefer the channel-detected value; fall back
* to the stream's current value if the channel property is unavailable
* on this SDK build.) */
ULONG iface = 0;
if (VHD_GetStreamProperty(vs, VHD_SDI_SP_INTERFACE, &iface) == VHDERR_NOERROR) {
if (VHD_GetChannelProperty(board, VHD_RX_CHANNEL, ports[pi],
VHD_SDI_CP_INTERFACE, &iface) == VHDERR_NOERROR) {
VHD_SetStreamProperty(vs, VHD_SDI_SP_INTERFACE, iface);
fprintf(stderr, "[board] port %u explicitly set SDI Interface to %lu\n", ports[pi], iface);
fprintf(stderr, "[board] port %u set SDI Interface (channel-detected) to %lu\n",
ports[pi], iface);
} else if (VHD_GetStreamProperty(vs, VHD_SDI_SP_INTERFACE, &iface) == VHDERR_NOERROR) {
VHD_SetStreamProperty(vs, VHD_SDI_SP_INTERFACE, iface);
fprintf(stderr, "[board] port %u set SDI Interface (stream default) to %lu\n",
ports[pi], iface);
}
/* Pin to tightly-packed 8-bit UYVY. Relying on SDK default caused
* the board to deliver frames whose size != width*height*2,
@ -847,10 +999,39 @@ int main(int argc, char *argv[]) {
p->slot_id);
fflush(stderr);
/* Launch audio thread (blocks until reader connects to audio FIFO). */
pthread_create(&p->audio_tid, NULL, audio_thread, p);
/* ── Audio transport selection ──────────────────────────────────────
* Frame-coupled path (fc_writer active): audio rides in the framecache
* ring entry with its video frame; fc_pipe delivers it to the audio
* FIFO frame-locked. NO apcm ring, NO audio_thread, NO bridge-owned
* audio FIFO there is no second transport to drift.
*
* Legacy path (fc_writer NULL, framecache unreachable, or -DLEGACY_FIFO):
* keep the apcm ring + audio_thread that drains it to the separate audio
* FIFO (the old two-transport scheme). */
int legacy_audio;
#ifdef LEGACY_FIFO
legacy_audio = 1;
#else
legacy_audio = (p->fc_writer == NULL);
#endif
if (legacy_audio) {
p->apcm.buf = calloc(1, APCM_RING_BYTES);
atomic_store(&p->apcm.w, 0);
atomic_store(&p->apcm.r, 0);
atomic_store(&p->apcm.have_embedded, 0);
if (!p->apcm.buf)
fprintf(stderr, "[port:%u] WARN: apcm ring alloc failed — audio will be silence\n",
ports[pi]);
/* Launch audio thread (FIFO sink: drains apcm ring → audio FIFO). */
pthread_create(&p->audio_tid, NULL, audio_thread, p);
} else {
fprintf(stderr, "[port:%u] frame-coupled audio (framecache ring) — no separate audio FIFO/thread\n",
ports[pi]);
}
/* Launch video thread (blocks until reader connects to video FIFO). */
/* Launch video thread. fc_writer path: video + this frame's embedded
* audio ONE framecache ring entry (fc_writer_write_av). Legacy path:
* video video FIFO, audio apcm ring audio_thread audio FIFO. */
pthread_create(&p->video_tid, NULL, video_thread, p);
active_count++;
@ -880,6 +1061,10 @@ int main(int argc, char *argv[]) {
ps[i].fc_writer = NULL;
}
#endif
if (ps[i].apcm.buf) {
free(ps[i].apcm.buf);
ps[i].apcm.buf = NULL;
}
}
VHD_CloseBoardHandle(board);

View file

@ -757,52 +757,32 @@ class CaptureManager {
const fcPipeBin = process.env.FC_PIPE_BIN || 'fc_pipe';
const WAIT_MS = 30_000;
// Determine audio FIFO path based on source type
const idx = (typeof device === 'number' || /^\d+$/.test(String(device)))
? parseInt(device, 10) : 0;
const portIdx = (sourceType === 'deltacast')
? ((typeof port === 'number' || /^\d+$/.test(String(port)))
? parseInt(port, 10) : idx)
: idx;
let audioFifoPath;
if (sourceType === 'deltacast') {
const DC_PIPE_DIR = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast';
audioFifoPath = `${DC_PIPE_DIR}/audio-${portIdx}.fifo`;
} else {
const DL_AUDIO_DIR = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink';
audioFifoPath = `${DL_AUDIO_DIR}/audio-${portIdx}.fifo`;
}
// Wait up to 30s for the audio FIFO to exist (bridge starts asynchronously)
const { existsSync: _exists } = await import('node:fs');
const deadline = Date.now() + WAIT_MS;
while (Date.now() < deadline) {
if (_exists(audioFifoPath)) break;
await new Promise(r => setTimeout(r, 500));
}
if (!_exists(audioFifoPath)) {
throw new Error(
`audio FIFO not ready after ${WAIT_MS / 1000}s: ${audioFifoPath} ` +
`— is the bridge running?`
);
}
// Single-input AVI: fc_pipe muxes video+audio into ONE streaming AVI
// container on stdout. ffmpeg reads it as a SINGLE input (-f avi -i pipe:0),
// which eliminates the confirmed two-live-pipe deadlock (ffmpeg given a raw
// video pipe AND a separate live audio FIFO stalled forever probing input 0).
// No audio FIFO is created or used on this path anymore: audio rides inside
// the AVI as interleaved 01wb chunks, frame-coupled to each 00dc video chunk
// (both come from the SAME framecache ring entry in fc_pipe's read loop).
// Video dimensions and fps come from env vars injected by node-agent
// (populated from the bridge's format JSON on signal lock).
// (populated from the bridge's format JSON on signal lock). fc_pipe also
// reads them from the slot header for the AVI header; these stay for logging.
const fcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080';
const fcFps = process.env.DELTACAST_FRAMERATE || '60000/1001';
const fcInterlaced = process.env.DELTACAST_INTERLACED === '1';
console.log(`[framecache] slot=${slotId} size=${fcSize} fps=${fcFps} audio=${audioFifoPath}`);
console.log(`[framecache] slot=${slotId} size=${fcSize} fps=${fcFps} mode=avi (single-input video+audio, frame-coupled)`);
// Spawn fc_pipe: opens the framecache slot with its own read cursor and
// streams raw UYVY422 frames to stdout. ffmpeg reads from the pipe as
// rawvideo input 0; audio FIFO is input 1 (same as before).
const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], {
// Spawn fc_pipe in AVI mode: for each ring entry it emits a 00dc video chunk
// followed by a 01wb audio chunk into one AVI byte stream on stdout. ffmpeg
// reads that single stream and maps 0:v / 0:a. Because video and its audio
// are interleaved from the same ring entry, audio can never drift from video.
// argv: <slot_id> <wait_ms> --avi
const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS), '--avi'], {
stdio: ['ignore', 'pipe', 'pipe'],
});
// Pause until piped to ffmpeg (avoids OS pipe-buffer fill stall see
// Pause until piped to ffmpeg (avoids OS pipe-buffer fill stall - see
// the network path above for the full rationale).
fcPipeProcess.stdout.pause();
fcPipeProcess.stderr.on('data', chunk => {
@ -814,37 +794,23 @@ class CaptureManager {
return {
inputArgs: [
// fc_pipe stdout → ffmpeg rawvideo input 0 (video).
// fc_pipe stdout -> ffmpeg AVI input 0. ONE input carries both streams:
// 0:v = UYVY422 video (00dc chunks), 0:a = pcm_s16le audio (01wb chunks).
// The AVI demuxer reads the strf headers + the chunk stream with no index
// and no seeking, so streaming over a pipe is fine (RIFF/movi sizes are
// left as the streaming sentinel by fc_pipe).
'-thread_queue_size', '512',
'-f', 'rawvideo',
'-pix_fmt', 'uyvy422',
'-video_size', fcSize,
'-framerate', fcFps,
'-i', 'pipe:0',
// Audio FIFO → ffmpeg input 1.
//
// Do NOT use -use_wallclock_as_timestamps here. The bridge feeds raw
// s16le at a steady 48000 samples/s off the SAME SDI clock as video,
// so letting ffmpeg derive audio PTS from the sample count keeps audio
// and video in one clock domain (no drift). Wallclock stamps audio by
// arrival wall-time instead — when the HEVC encoder dips under realtime
// the audio ends up 318% LONGER than the frame-count video, and the
// master aresample=async=1 then pads seconds of LEADING SILENCE to
// "align" them → the silent-head + start-stutter + apparent "no audio"
// regression (reverts commit d6b0b3a; restores 8e5405c/55a72af).
'-thread_queue_size', '512',
'-f', 's16le',
'-ar', '48000',
'-ac', '2',
'-f', 'avi',
// Optional fixed A/V trim (env AUDIO_OFFSET_MS); default empty = no shift.
// Applied as an input option so it shifts the AVI's audio relative to video.
...audioOffsetArgs(),
'-i', audioFifoPath,
'-i', 'pipe:0',
],
isNetwork: false,
bridgeProcess: fcPipeProcess, /* capture-manager pipes this to ffmpeg stdin */
audioFifo: audioFifoPath, /* flushed just before ffmpeg opens it (A/V align) */
audioFifo: null, /* no separate audio FIFO on the AVI path */
interlaced: fcInterlaced,
audioInputIndex: 1, /* audio FIFO is ffmpeg input 1 */
audioInputIndex: 0, /* audio is inside the single AVI input (0:a) */
_fcPipeProcess: fcPipeProcess, /* stored for clean stop */
};
}
@ -1269,12 +1235,15 @@ exit "$BMXRC"
console.log(`[capture] pre-roll complete.`);
}
// FLUSH STALE AUDIO immediately before ffmpeg opens the FIFO. During standby
// the bridge keeps writing audio into the named FIFO while the idle-preview
// consumes only video, so the FIFO holds up to a full pipe buffer (~0.5s) of
// stale audio. Draining it here (right before the record ffmpeg attaches)
// makes audio start at the live edge, time-aligned with the first video
// frame — eliminating the leading silence + the ~0.5% audio-length surplus.
// FLUSH STALE AUDIO immediately before ffmpeg opens the FIFO.
//
// With frame-coupled audio (FC_VERSION 2) fc_pipe only writes the audio FIFO
// once a reader attaches, and each audio chunk is bound to its video frame in
// the same ring entry — so there is normally NO stale standby backlog. This
// drain is retained as a harmless belt-and-suspenders: it reads whatever (if
// anything) is buffered and returns immediately on EAGAIN, guaranteeing the
// record ffmpeg attaches at the live edge. fc_pipe reattaches automatically
// if it briefly saw this drain as its reader.
if (audioFifo) {
try {
const fsSync = await import('node:fs');

View file

@ -46,7 +46,7 @@ install(TARGETS net_ingest DESTINATION bin)
add_executable(fc_pipe
client/fc_pipe.c
)
target_link_libraries(fc_pipe fc_client)
target_link_libraries(fc_pipe fc_client pthread)
target_include_directories(fc_pipe PRIVATE src client)
# ── test consumer (dev utility) ──────────────────────────────────────

View file

@ -26,8 +26,9 @@ struct fc_consumer {
sem_t *sem;
uint64_t read_cursor; /* consumer's own position in the ring */
uint64_t local_dropped; /* frames skipped by this consumer */
uint8_t *copy_buf; /* consumer-owned frame copy buffer (frame_size bytes) */
uint32_t frame_size; /* cached from header */
uint8_t *copy_buf; /* consumer-owned copy buffer: [video frame_size][audio audio_max] */
uint32_t frame_size; /* cached from header (video bytes) */
uint32_t audio_max; /* cached from header (audio region capacity) */
char slot_id[FC_MAX_SLOT_ID];
};
@ -59,6 +60,13 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) {
close(fd); return NULL;
}
/* Version gate: an old reader against a new writer (or vice-versa) computes
* the wrong per-entry stride and would misparse. Fail safe. */
if (hdr.version != FC_VERSION) {
fprintf(stderr, "[fc_client] slot %s version %u != expected %u — refusing\n",
slot_id, hdr.version, FC_VERSION);
close(fd); return NULL;
}
size_t total = fc_slot_shm_size(hdr.frame_size);
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
@ -72,8 +80,9 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
/* Consumer-owned copy buffer — fc_consumer_read copies the frame here and
* re-validates the cursor afterward, so a writer lapping a slow consumer
* cannot corrupt the frame the caller is using. */
c->copy_buf = malloc(hdr.frame_size);
* cannot corrupt the frame the caller is using. Sized for video + audio so
* the frame-coupled audio is copied atomically with its video. */
c->copy_buf = malloc((size_t)hdr.frame_size + hdr.audio_max_bytes);
if (!c->copy_buf) {
free(c); sem_close(sem); munmap(base, total); close(fd); return NULL;
}
@ -83,6 +92,7 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
c->shm_size = total;
c->sem = sem;
c->frame_size = hdr.frame_size;
c->audio_max = hdr.audio_max_bytes;
/* Start reading from the current write position so we don't replay old frames */
c->read_cursor = atomic_load_explicit(
&((fc_header_t *)base)->write_cursor, memory_order_acquire);
@ -157,13 +167,22 @@ int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms)
/* Woken — loop to re-evaluate cursor-diff. */
}
/* ── Copy the frame into the consumer-owned buffer ──────────────────── */
/* ── Copy the entry (video + this frame's audio) into the owned buffer ─
* Both are copied from the SAME ring entry in the SAME iteration, so the
* audio handed to the caller is exactly this video frame's embedded audio
* frame-coupled, no second buffer to drift. copy_buf layout mirrors the
* shm entry: [video frame_size][audio audio_max]. */
fc_frame_t *frame = fc_frame_at(c->base, hdr->frame_size, c->read_cursor);
uint32_t fsz = frame->size;
if (fsz > hdr->frame_size) fsz = hdr->frame_size;
uint32_t asz = frame->audio_size;
if (asz > c->audio_max) asz = c->audio_max;
uint64_t pts = frame->pts_us;
uint64_t wall = frame->wall_us;
memcpy(c->copy_buf, frame->data, fsz);
if (asz)
memcpy(c->copy_buf + c->frame_size,
fc_frame_audio(frame, hdr->frame_size), asz);
/* ── Re-validate AFTER the copy ─────────────────────────────────────
* If the writer lapped us during the copy (overwrote this slot), the copy
@ -178,11 +197,13 @@ int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms)
}
/* Copy is valid. */
ref->data = c->copy_buf;
ref->size = fsz;
ref->pts_us = pts;
ref->wall_us = wall;
ref->seq = c->read_cursor;
ref->data = c->copy_buf;
ref->size = fsz;
ref->audio = asz ? (c->copy_buf + c->frame_size) : NULL;
ref->audio_size = asz;
ref->pts_us = pts;
ref->wall_us = wall;
ref->seq = c->read_cursor;
c->read_cursor++;
return dropped ? FC_DROPPED : FC_OK;
@ -208,3 +229,19 @@ uint64_t fc_consumer_dropped(fc_consumer_t *c)
{
return c->local_dropped;
}
int fc_consumer_info(fc_consumer_t *c, fc_stream_info_t *info)
{
if (!c || !info) return -1;
fc_header_t *hdr = (fc_header_t *)c->base;
info->width = hdr->width;
info->height = hdr->height;
info->fps_num = hdr->fps_num;
info->fps_den = hdr->fps_den;
info->pixel_format = hdr->pixel_format;
info->frame_size = hdr->frame_size;
info->audio_rate = hdr->audio_rate ? hdr->audio_rate : FC_AUDIO_RATE;
info->audio_channels = hdr->audio_channels ? hdr->audio_channels : FC_AUDIO_CHANNELS;
info->audio_sample_bytes = FC_AUDIO_SAMPLE_BYTES; /* s16le */
return 0;
}

View file

@ -47,7 +47,11 @@ typedef struct {
* consumer's own buffer and re-validate the cursor
* AFTER the copy, so a lapped frame is discarded
* rather than streamed corrupt.) */
uint32_t size; /* bytes */
uint32_t size; /* video bytes */
const uint8_t *audio; /* pointer to a CONSUMER-OWNED copy of this frame's
* embedded audio (s16le stereo 48k), stable until
* the next fc_consumer_read(). NULL if audio_size 0. */
uint32_t audio_size; /* audio bytes for this frame (0 = none) */
uint64_t pts_us; /* presentation timestamp (microseconds) */
uint64_t wall_us; /* wall clock at write time (microseconds) */
uint64_t seq; /* write_cursor value for this frame */
@ -77,6 +81,23 @@ uint64_t fc_consumer_write_cursor(fc_consumer_t *c);
/** Frames dropped by this consumer since open. */
uint64_t fc_consumer_dropped(fc_consumer_t *c);
/* Stream format info read from the slot header (set at slot creation by the
* bridge). Used by fc_pipe to emit a correct AVI/container header. */
typedef struct {
uint32_t width;
uint32_t height;
uint32_t fps_num;
uint32_t fps_den;
uint32_t pixel_format; /* FC_PIX_UYVY422 */
uint32_t frame_size; /* video bytes per frame (width*height*2 for UYVY422) */
uint32_t audio_rate; /* 48000 */
uint32_t audio_channels; /* 2 */
uint32_t audio_sample_bytes; /* 2 (s16le) */
} fc_stream_info_t;
/** Fill *info from the slot header. Returns 0 on success, -1 on error. */
int fc_consumer_info(fc_consumer_t *c, fc_stream_info_t *info);
#ifdef __cplusplus
}
#endif

View file

@ -1,28 +1,35 @@
/**
* fc_pipe.c Framecache slot stdout pipe adapter.
*
* Opens a framecache slot as a consumer and writes raw video frames to
* stdout in a continuous stream. capture-manager.js spawns this process
* and feeds its stdout to ffmpeg as a rawvideo pipe input identical to
* the way DeckLink bridges currently pipe raw frames.
* FRAME-COUPLED AUDIO (FC_VERSION 2):
* Each framecache ring entry carries the VIDEO frame AND that frame's
* SDI-embedded AUDIO together (written by the JOINED bridge from one slot).
* fc_pipe reads ONE entry per loop iteration.
*
* Each consumer instance has its own independent read cursor, so multiple
* fc_pipe processes reading from the same slot never interfere with each
* other. This is how growing + proxy + HLS all read the same SDI signal
* simultaneously.
* TWO OUTPUT MODES:
*
* Usage:
* fc_pipe <slot_id> [wait_ms]
* 1) AVI MODE (default when audio is wanted; selected with --avi or by giving
* an arg of "avi"): fc_pipe writes a SINGLE streaming AVI container to
* stdout video and audio INTERLEAVED in one byte stream. ffmpeg reads it
* as ONE input:
* ffmpeg -f avi -i pipe:0 -map 0:v ... -map 0:a ...
* This eliminates the two-live-pipe deadlock: when ffmpeg was given a raw
* video pipe AND a separate audio FIFO it stalled forever probing input 0.
* The AVI muxer writes its header once, then for each ring entry emits a
* '00dc' video chunk followed by a '01wb' audio chunk frame-coupled by
* construction (both come from the same ring entry in the same iteration).
*
* Writes raw UYVY422 frame data to stdout. Terminates on:
* - SIGTERM / SIGINT (clean stop from capture-manager)
* - stdout EPIPE (ffmpeg exited)
* - Slot disappears (bridge stopped)
* 2) RAW MODE (legacy, video-only): if no audio FIFO / avi flag is given,
* fc_pipe writes raw UYVY422 video bytes to stdout as before.
*
* Exit codes:
* 0 clean stop (SIGTERM)
* 1 slot not found within wait_ms
* 2 stdout write error (EPIPE)
* The old split video-stdout / audio-FIFO design is REMOVED it was the
* source of the ffmpeg deadlock.
*
* Usage: fc_pipe <slot_id> [wait_ms] [mode]
* mode: "--avi" | "avi" single streaming AVI (video+audio) on stdout.
* omitted | "-" raw UYVY422 video-only on stdout.
*
* Terminates on: SIGTERM/SIGINT, stdout EPIPE (ffmpeg exited), slot gone.
*/
#include "../src/slot.h"
@ -37,11 +44,12 @@
#include <errno.h>
#include <time.h>
#include <fcntl.h>
#include <math.h>
static volatile int g_stop = 0;
static void on_signal(int s) { (void)s; g_stop = 1; }
/* Write all bytes to fd. Returns 0 on success, -1 on EPIPE/error. */
/* Write all bytes to fd (blocking). Returns 0 on success, -1 on EPIPE/error. */
static int write_all_fd(int fd, const void *buf, size_t len) {
const uint8_t *p = (const uint8_t *)buf;
size_t off = 0;
@ -49,29 +57,213 @@ static int write_all_fd(int fd, const void *buf, size_t len) {
ssize_t n = write(fd, p + off, len - off);
if (n > 0) { off += (size_t)n; continue; }
if (n < 0 && errno == EINTR) continue;
return -1; /* EPIPE or other fatal error */
return -1;
}
return 0;
}
/* ── Little-endian byte emitters into a caller buffer ────────────────────────── */
static inline void put_u16(uint8_t **pp, uint16_t v) {
uint8_t *p = *pp; p[0] = (uint8_t)(v & 0xff); p[1] = (uint8_t)((v >> 8) & 0xff); *pp = p + 2;
}
static inline void put_u32(uint8_t **pp, uint32_t v) {
uint8_t *p = *pp;
p[0] = (uint8_t)(v & 0xff); p[1] = (uint8_t)((v >> 8) & 0xff);
p[2] = (uint8_t)((v >> 16) & 0xff); p[3] = (uint8_t)((v >> 24) & 0xff);
*pp = p + 4;
}
static inline void put_fourcc(uint8_t **pp, const char *cc) {
uint8_t *p = *pp; p[0] = (uint8_t)cc[0]; p[1] = (uint8_t)cc[1];
p[2] = (uint8_t)cc[2]; p[3] = (uint8_t)cc[3]; *pp = p + 4;
}
/* ── Streaming AVI header ─────────────────────────────────────────────────────
* Builds RIFF('AVI ') + LIST('hdrl'){ avih + strl(vids) + strl(auds) } +
* LIST('movi'). For a streaming AVI over a pipe we cannot seek back to patch
* the RIFF and movi sizes, so we set them to 0x7FFFFFFF; ffmpeg's AVI demuxer
* reads the strf headers and the 00dc/01wb chunk stream regardless. The hdrl
* LIST size IS fixed/known, so it is written correctly. dwFlags is 0 we do
* NOT set AVIF_HASINDEX / AVIF_MUSTUSEINDEX (there is no index in a stream).
*
* Writes the header to *out and returns its length. Buffer must be >= 512. */
static size_t build_avi_header(uint8_t *out,
uint32_t width, uint32_t height,
uint32_t fps_num, uint32_t fps_den,
uint32_t video_bytes,
uint32_t audio_rate, uint32_t audio_channels,
uint32_t audio_sample_bytes) {
const uint32_t STREAMING = 0x7FFFFFFFu;
const uint16_t bits_per_sample = (uint16_t)(audio_sample_bytes * 8u);
const uint16_t block_align = (uint16_t)(audio_channels * audio_sample_bytes);
const uint32_t avg_bytes_sec = audio_rate * block_align;
/* dwMicroSecPerFrame = 1e6 * fps_den / fps_num */
const uint32_t usec_per_frame =
(uint32_t)((1000000.0 * (double)fps_den / (double)fps_num) + 0.5);
/* Fixed sub-sizes (data bytes only, excluding the 8-byte ckID+ckSize). */
const uint32_t AVIH_DATA = 56; /* MainAVIHeader */
const uint32_t STRH_DATA = 56; /* AVISTREAMHEADER */
const uint32_t BIH_DATA = 40; /* BITMAPINFOHEADER */
const uint32_t WFX_DATA = 18; /* WAVEFORMATEX (cbSize=0) */
/* LIST('strl') sizes = 4 (the 'strl' fourcc) + contained chunks. */
const uint32_t vstrl_size = 4 + (8 + STRH_DATA) + (8 + BIH_DATA); /* 4+64+48 = 116 */
const uint32_t astrl_size = 4 + (8 + STRH_DATA) + (8 + WFX_DATA); /* 4+64+26 = 94 */
/* LIST('hdrl') size = 4 (the 'hdrl' fourcc) + avih chunk + both strl LISTs. */
const uint32_t hdrl_size = 4 + (8 + AVIH_DATA) + (8 + vstrl_size) + (8 + astrl_size);
uint8_t *p = out;
/* RIFF 'AVI ' (size unseekable → streaming sentinel) */
put_fourcc(&p, "RIFF");
put_u32(&p, STREAMING);
put_fourcc(&p, "AVI ");
/* LIST 'hdrl' */
put_fourcc(&p, "LIST");
put_u32(&p, hdrl_size);
put_fourcc(&p, "hdrl");
/* avih — MainAVIHeader (56 bytes) */
put_fourcc(&p, "avih");
put_u32(&p, AVIH_DATA);
put_u32(&p, usec_per_frame); /* dwMicroSecPerFrame */
put_u32(&p, 0); /* dwMaxBytesPerSec */
put_u32(&p, 0); /* dwPaddingGranularity */
put_u32(&p, 0); /* dwFlags — NO index flags */
put_u32(&p, 0); /* dwTotalFrames (unknown in stream) */
put_u32(&p, 0); /* dwInitialFrames */
put_u32(&p, 2); /* dwStreams (video + audio) */
put_u32(&p, 0); /* dwSuggestedBufferSize */
put_u32(&p, width); /* dwWidth */
put_u32(&p, height); /* dwHeight */
put_u32(&p, 0); put_u32(&p, 0); put_u32(&p, 0); put_u32(&p, 0); /* dwReserved[4] */
/* LIST 'strl' — VIDEO */
put_fourcc(&p, "LIST");
put_u32(&p, vstrl_size);
put_fourcc(&p, "strl");
/* strh — AVISTREAMHEADER 'vids' (56 bytes) */
put_fourcc(&p, "strh");
put_u32(&p, STRH_DATA);
put_fourcc(&p, "vids"); /* fccType */
put_fourcc(&p, "UYVY"); /* fccHandler */
put_u32(&p, 0); /* dwFlags */
put_u16(&p, 0); /* wPriority */
put_u16(&p, 0); /* wLanguage */
put_u32(&p, 0); /* dwInitialFrames */
put_u32(&p, fps_den); /* dwScale = 1001 */
put_u32(&p, fps_num); /* dwRate = 60000 */
put_u32(&p, 0); /* dwStart */
put_u32(&p, 0); /* dwLength (unknown) */
put_u32(&p, video_bytes); /* dwSuggestedBufferSize */
put_u32(&p, 0xFFFFFFFFu); /* dwQuality (-1 default) */
put_u32(&p, video_bytes); /* dwSampleSize (fixed for uncompressed) */
put_u16(&p, 0); put_u16(&p, 0); /* rcFrame.left, top */
put_u16(&p, (uint16_t)width); /* rcFrame.right */
put_u16(&p, (uint16_t)height); /* rcFrame.bottom */
/* strf — BITMAPINFOHEADER (40 bytes) */
put_fourcc(&p, "strf");
put_u32(&p, BIH_DATA);
put_u32(&p, 40); /* biSize */
put_u32(&p, width); /* biWidth */
put_u32(&p, height); /* biHeight */
put_u16(&p, 1); /* biPlanes */
put_u16(&p, 16); /* biBitCount (UYVY422 = 16bpp) */
put_fourcc(&p, "UYVY"); /* biCompression fourcc */
put_u32(&p, video_bytes); /* biSizeImage = W*H*2 */
put_u32(&p, 0); /* biXPelsPerMeter */
put_u32(&p, 0); /* biYPelsPerMeter */
put_u32(&p, 0); /* biClrUsed */
put_u32(&p, 0); /* biClrImportant */
/* LIST 'strl' — AUDIO */
put_fourcc(&p, "LIST");
put_u32(&p, astrl_size);
put_fourcc(&p, "strl");
/* strh — AVISTREAMHEADER 'auds' (56 bytes) */
put_fourcc(&p, "strh");
put_u32(&p, STRH_DATA);
put_fourcc(&p, "auds"); /* fccType */
put_u32(&p, 0); /* fccHandler (none for PCM) */
put_u32(&p, 0); /* dwFlags */
put_u16(&p, 0); /* wPriority */
put_u16(&p, 0); /* wLanguage */
put_u32(&p, 0); /* dwInitialFrames */
put_u32(&p, block_align); /* dwScale = nBlockAlign */
put_u32(&p, avg_bytes_sec); /* dwRate = nAvgBytesPerSec */
put_u32(&p, 0); /* dwStart */
put_u32(&p, 0); /* dwLength (unknown) */
put_u32(&p, avg_bytes_sec); /* dwSuggestedBufferSize (~1s) */
put_u32(&p, 0xFFFFFFFFu); /* dwQuality */
put_u32(&p, block_align); /* dwSampleSize = nBlockAlign */
put_u16(&p, 0); put_u16(&p, 0); put_u16(&p, 0); put_u16(&p, 0); /* rcFrame */
/* strf — WAVEFORMATEX (18 bytes) */
put_fourcc(&p, "strf");
put_u32(&p, WFX_DATA);
put_u16(&p, 1); /* wFormatTag = WAVE_FORMAT_PCM */
put_u16(&p, (uint16_t)audio_channels); /* nChannels */
put_u32(&p, audio_rate); /* nSamplesPerSec */
put_u32(&p, avg_bytes_sec); /* nAvgBytesPerSec */
put_u16(&p, block_align); /* nBlockAlign */
put_u16(&p, bits_per_sample); /* wBitsPerSample */
put_u16(&p, 0); /* cbSize */
/* LIST 'movi' — frames follow. Size unseekable → streaming sentinel. */
put_fourcc(&p, "LIST");
put_u32(&p, STREAMING);
put_fourcc(&p, "movi");
return (size_t)(p - out);
}
/* Write a single AVI chunk: 4-byte fourcc + u32 LE size + data (+ pad byte if
* the size is odd, per the RIFF even-alignment rule). Returns 0 / -1. */
static int write_avi_chunk(int fd, const char *cc,
const uint8_t *data, uint32_t size) {
uint8_t hdr[8];
uint8_t *p = hdr;
put_fourcc(&p, cc);
put_u32(&p, size);
if (write_all_fd(fd, hdr, 8) < 0) return -1;
if (size && write_all_fd(fd, data, size) < 0) return -1;
if (size & 1u) {
uint8_t pad = 0;
if (write_all_fd(fd, &pad, 1) < 0) return -1;
}
return 0;
}
int main(int argc, char *argv[]) {
if (argc < 2) {
fprintf(stderr, "Usage: %s <slot_id> [wait_ms]\n", argv[0]);
fprintf(stderr, "Usage: %s <slot_id> [wait_ms] [--avi|-]\n", argv[0]);
return 1;
}
const char *slot_id = argv[1];
uint64_t wait_ms = argc >= 3 ? (uint64_t)atoll(argv[2]) : 30000;
uint64_t wait_ms = argc >= 3 ? (uint64_t)atoll(argv[2]) : 30000;
/* AVI mode is selected by an explicit flag in argv[3]. Anything that is not
* "--avi"/"avi" (including "-" or omitted) legacy raw video-only mode. */
int avi_mode = 0;
if (argc >= 4) {
const char *m = argv[3];
if (strcmp(m, "--avi") == 0 || strcmp(m, "avi") == 0) avi_mode = 1;
}
signal(SIGTERM, on_signal);
signal(SIGINT, on_signal);
signal(SIGPIPE, SIG_IGN); /* detect EPIPE via write() return value */
signal(SIGPIPE, SIG_IGN);
/* Set stdout to binary mode — no newline translation */
fcntl(STDOUT_FILENO, F_SETFL,
fcntl(STDOUT_FILENO, F_GETFL, 0) & ~O_NONBLOCK);
fprintf(stderr, "[fc_pipe] opening slot '%s' (wait %llums)\n",
slot_id, (unsigned long long)wait_ms);
fprintf(stderr, "[fc_pipe] opening slot '%s' (wait %llums) mode=%s\n",
slot_id, (unsigned long long)wait_ms, avi_mode ? "avi" : "rawvideo");
fc_consumer_t *c = fc_consumer_open(slot_id, wait_ms);
if (!c) {
@ -80,54 +272,134 @@ int main(int argc, char *argv[]) {
return 1;
}
fprintf(stderr, "[fc_pipe] slot open, streaming to stdout\n");
/* Pull stream format from the slot header for the AVI header. */
fc_stream_info_t si;
if (fc_consumer_info(c, &si) != 0 || si.width == 0 || si.height == 0) {
fprintf(stderr, "[fc_pipe] failed to read slot stream info\n");
fc_consumer_close(c);
return 1;
}
if (si.fps_num == 0) { si.fps_num = 60000; si.fps_den = 1001; }
if (si.fps_den == 0) si.fps_den = 1;
if (si.audio_rate == 0) si.audio_rate = 48000;
if (si.audio_channels == 0) si.audio_channels = 2;
if (si.audio_sample_bytes == 0) si.audio_sample_bytes = 2;
uint64_t frames_out = 0;
const uint32_t video_bytes = si.frame_size ? si.frame_size
: si.width * si.height * 2u;
const uint32_t a_blockalign = si.audio_channels * si.audio_sample_bytes;
/* Samples per video frame for synthesized silence when a frame has no audio:
* round(audio_rate * fps_den / fps_num). Bytes = samples * blockalign. */
uint32_t silence_bytes = 0;
{
double spf = (double)si.audio_rate * (double)si.fps_den / (double)si.fps_num;
uint32_t samples = (uint32_t)(spf + 0.5);
silence_bytes = samples * a_blockalign;
}
uint8_t *silence = NULL;
if (avi_mode && silence_bytes) {
silence = (uint8_t *)calloc(1, silence_bytes);
if (!silence) silence_bytes = 0;
}
if (avi_mode) {
uint8_t hdr[512];
size_t hlen = build_avi_header(hdr, si.width, si.height,
si.fps_num, si.fps_den, video_bytes,
si.audio_rate, si.audio_channels,
si.audio_sample_bytes);
if (write_all_fd(STDOUT_FILENO, hdr, hlen) < 0) {
fprintf(stderr, "[fc_pipe] stdout EPIPE writing AVI header\n");
fc_consumer_close(c); free(silence);
return 1;
}
fprintf(stderr,
"[fc_pipe] slot open, streaming AVI(video+audio) → stdout "
"(%ux%u %u/%u, %ub/frame, audio %uHz %uch s%ule, silence=%uB/frame)\n",
si.width, si.height, si.fps_num, si.fps_den, video_bytes,
si.audio_rate, si.audio_channels, si.audio_sample_bytes * 8u,
silence_bytes);
} else {
fprintf(stderr, "[fc_pipe] slot open, streaming raw video → stdout (%ux%u)\n",
si.width, si.height);
}
uint64_t frames_out = 0;
uint64_t total_dropped = 0;
uint64_t audio_bytes = 0;
uint64_t audio_gaps = 0;
while (!g_stop) {
fc_frame_ref_t ref;
int rc = fc_consumer_read(c, &ref, 2000 /* 2s timeout */);
int rc = fc_consumer_read(c, &ref, 2000);
if (rc == FC_TIMEOUT) continue;
if (rc == FC_ERROR) break;
if (rc == FC_LAPPED) {
/* Copy was torn (writer lapped us mid-read). No valid frame to
* write log and read again. */
total_dropped = fc_consumer_dropped(c);
fprintf(stderr, "[fc_pipe] WARNING: frame lapped mid-read — total dropped: %llu\n",
(unsigned long long)total_dropped);
continue;
}
if (rc == FC_DROPPED) {
/* Skipped one or more older frames, but THIS frame is valid — log
* and write it (do NOT continue). */
total_dropped = fc_consumer_dropped(c);
fprintf(stderr, "[fc_pipe] WARNING: consumer fell behind — total dropped: %llu\n",
(unsigned long long)total_dropped);
}
/* Write frame data to stdout (ref.data is a stable consumer-owned copy) */
if (write_all_fd(STDOUT_FILENO, ref.data, ref.size) < 0) {
if (!g_stop)
fprintf(stderr, "[fc_pipe] stdout EPIPE — ffmpeg exited\n");
break;
if (avi_mode) {
/* Interleave THIS frame's video + audio in one stream. Both are
* sourced from the SAME ring entry frame-coupled by construction.
* Video first (00dc), then audio (01wb). */
if (write_avi_chunk(STDOUT_FILENO, "00dc", ref.data, ref.size) < 0) {
if (!g_stop)
fprintf(stderr, "[fc_pipe] stdout EPIPE (video) — ffmpeg exited\n");
break;
}
if (ref.audio_size > 0 && ref.audio) {
if (write_avi_chunk(STDOUT_FILENO, "01wb", ref.audio, ref.audio_size) < 0) {
if (!g_stop)
fprintf(stderr, "[fc_pipe] stdout EPIPE (audio) — ffmpeg exited\n");
break;
}
audio_bytes += ref.audio_size;
} else {
/* No embedded audio this frame: emit one frame-interval of
* silence so the audio stream length tracks the video and
* ffmpeg never starves on the audio demuxer. */
if (silence_bytes &&
write_avi_chunk(STDOUT_FILENO, "01wb", silence, silence_bytes) < 0) {
if (!g_stop)
fprintf(stderr, "[fc_pipe] stdout EPIPE (silence) — ffmpeg exited\n");
break;
}
audio_bytes += silence_bytes;
audio_gaps++;
}
} else {
/* Legacy raw video-only: write the UYVY422 bytes straight to stdout. */
if (write_all_fd(STDOUT_FILENO, ref.data, ref.size) < 0) {
if (!g_stop)
fprintf(stderr, "[fc_pipe] stdout EPIPE — ffmpeg exited\n");
break;
}
}
frames_out++;
/* Periodic stats to stderr (every 300 frames ≈ 5s at 60fps) */
frames_out++;
if (frames_out % 300 == 0) {
fprintf(stderr, "[fc_pipe] frames=%llu dropped=%llu\n",
fprintf(stderr, "[fc_pipe] frames=%llu dropped=%llu audio_bytes=%llu gaps=%llu\n",
(unsigned long long)frames_out,
(unsigned long long)total_dropped);
(unsigned long long)total_dropped,
(unsigned long long)audio_bytes,
(unsigned long long)audio_gaps);
}
}
free(silence);
fc_consumer_close(c);
fprintf(stderr, "[fc_pipe] done frames=%llu dropped=%llu\n",
fprintf(stderr, "[fc_pipe] done frames=%llu dropped=%llu audio_bytes=%llu gaps=%llu\n",
(unsigned long long)frames_out,
(unsigned long long)total_dropped);
(unsigned long long)total_dropped,
(unsigned long long)audio_bytes,
(unsigned long long)audio_gaps);
return 0;
}

View file

@ -81,6 +81,9 @@ struct fc_slot *fc_slot_create(const char *slot_id,
hdr->pixel_format = pixel_format;
hdr->frame_size = frame_size;
hdr->ring_depth = FC_RING_DEPTH;
hdr->audio_max_bytes = FC_MAX_AUDIO_BYTES;
hdr->audio_rate = FC_AUDIO_RATE;
hdr->audio_channels = FC_AUDIO_CHANNELS;
atomic_store(&hdr->write_cursor, 0);
atomic_store(&hdr->dropped_frames, 0);
strncpy(hdr->source_type, source_type ? source_type : "unknown",
@ -154,13 +157,43 @@ void fc_slot_write_frame(struct fc_slot *s,
frame->wall_us = (uint64_t)({ struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000; });
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
frame->audio_size = 0; /* video-only source (e.g. net_ingest): no embedded audio */
memcpy(frame->data, data, frame->size);
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
sem_post(s->sem);
}
/**
* Write one frame-coupled entry: video bytes + this frame's audio bytes.
* audio may be NULL / asize 0 (no embedded audio this frame). Never blocks.
*/
void fc_slot_write_av(struct fc_slot *s,
const uint8_t *video, uint32_t vsize,
const uint8_t *audio, uint32_t asize,
uint64_t pts_us)
{
fc_header_t *hdr = (fc_header_t *)s->base;
uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed);
fc_frame_t *frame = fc_frame_at(s->base, hdr->frame_size, cur);
frame->pts_us = pts_us;
frame->wall_us = (uint64_t)({ struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000; });
frame->size = vsize < hdr->frame_size ? vsize : hdr->frame_size;
memcpy(frame->data, video, frame->size);
uint32_t acap = hdr->audio_max_bytes;
frame->audio_size = (audio && asize) ? (asize < acap ? asize : acap) : 0;
if (frame->audio_size)
memcpy(fc_frame_audio(frame, hdr->frame_size), audio, frame->audio_size);
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
sem_post(s->sem);
}
/* ── client-side open / read / close (also used by capture-manager) ── */
/**
@ -183,6 +216,11 @@ struct fc_slot *fc_slot_open(const char *slot_id)
if (tmp_hdr.magic != FC_MAGIC) {
close(fd); return NULL;
}
if (tmp_hdr.version != FC_VERSION) {
fprintf(stderr, "[framecache] slot %s version %u != expected %u — refusing (fail-safe)\n",
slot_id, tmp_hdr.version, FC_VERSION);
close(fd); return NULL;
}
size_t total = fc_slot_shm_size(tmp_hdr.frame_size);
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);

View file

@ -3,10 +3,33 @@
*
* Layout per slot (/dev/shm/framecache/<slot_id>):
* [fc_header_t 4KB aligned]
* [fc_frame_t × ring_depth each FC_FRAME_HDR_SIZE + frame_size bytes]
* [fc_frame_t × ring_depth each FC_FRAME_HDR_SIZE + frame_size + FC_MAX_AUDIO_BYTES bytes]
*
* Writer advances write_cursor atomically and posts the named semaphore.
* Each consumer tracks its own read_cursor independently writer never blocks.
*
* FRAME-COUPLED AUDIO (FC_VERSION 2)
* Each ring entry now carries the VIDEO frame AND that frame's SDI-embedded
* AUDIO together in ONE transport. The per-entry data region is:
*
* [ video bytes : frame_size ][ audio bytes : up to FC_MAX_AUDIO_BYTES ]
*
* fc_frame_t.size = video byte count (== frame_size for UYVY422)
* fc_frame_t.audio_size = audio byte count for THIS frame (s16le stereo 48k),
* 0 if the signal carries no embedded audio on this frame.
*
* Because both streams live in the same ring slot written in one atomic cursor
* advance and read in one consumer iteration, audio can never drift ahead of
* (or behind) its video frame the "audio ahead of video" offset is eliminated
* at the root: there is no second independent buffer/transport to race.
*
* Versioning: FC_VERSION bumped 1 2. The frame header grew from 24 to 24
* bytes (the former _pad uint32 is REUSED as audio_size no size change), but
* the per-entry stride grew by FC_MAX_AUDIO_BYTES and the header gained audio
* descriptor fields. An old (v1) reader mmapping a v2 slot would compute the
* wrong stride and misparse, so readers MUST check version == FC_VERSION and
* fail safe. The writer and all readers share this header; mismatched binaries
* refuse to attach.
*/
#pragma once
@ -15,12 +38,24 @@
#include <semaphore.h>
#define FC_MAGIC 0x46524D43u /* "FRMC" */
#define FC_VERSION 1u
#define FC_VERSION 2u /* bumped for frame-coupled audio */
#define FC_RING_DEPTH 120u /* ~2s at 59.94fps */
#define FC_HEADER_SIZE 4096u /* 4KB header block */
#define FC_FRAME_HDR_SIZE 24u /* pts_us(8) + wall_us(8) + size(4) + pad(4) */
#define FC_FRAME_HDR_SIZE 24u /* pts_us(8) + wall_us(8) + size(4) + audio_size(4) */
#define FC_MAX_SLOT_ID 64u
/* ── Frame-coupled audio constants ──────────────────────────────────────────
* Audio is always delivered as interleaved s16le stereo at 48 kHz (the SDI
* embedded-audio mapping ffmpeg/raw2bmx expect). Worst-case samples per video
* frame at the lowest broadcast frame rate (~23.976 fps) is ~2002 samples;
* 2048 samples * 2 ch * 2 bytes = 8192 bytes. We reserve 16 KB per entry for
* generous headroom (covers transient over-delivery and any future 4-byte
* sample widths) ~1.9 MB extra across the 120-deep ring, negligible. */
#define FC_AUDIO_RATE 48000u
#define FC_AUDIO_CHANNELS 2u
#define FC_AUDIO_SAMPLE_BYTES 2u /* s16le */
#define FC_MAX_AUDIO_BYTES 16384u
/* Internal handle used by both server (writer) and client (reader) */
struct fc_slot {
int shm_fd;
@ -43,23 +78,31 @@ typedef struct {
uint32_t fps_num;
uint32_t fps_den;
uint32_t pixel_format; /* FC_PIX_UYVY422 */
uint32_t frame_size; /* width * height * 2 */
uint32_t frame_size; /* width * height * 2 (video bytes per entry) */
uint32_t ring_depth; /* FC_RING_DEPTH */
uint32_t audio_max_bytes; /* FC_MAX_AUDIO_BYTES — audio region per entry */
uint32_t audio_rate; /* FC_AUDIO_RATE (48000) */
uint32_t audio_channels; /* FC_AUDIO_CHANNELS (2) */
uint32_t _reserved;
_Atomic uint64_t write_cursor; /* monotonically increasing frame index */
_Atomic uint64_t dropped_frames;
char source_type[32]; /* "deltacast" | "blackmagic" | "srt" | "rtmp" */
char slot_id[FC_MAX_SLOT_ID];
uint8_t _pad[FC_HEADER_SIZE - 144];
uint8_t _pad[FC_HEADER_SIZE - 156];
} fc_header_t;
/* Per-frame metadata + data (variable length — use fc_frame_at() accessor) */
/* Per-entry metadata + data (variable length — use fc_frame_at() accessor).
*
* data layout (frame-coupled): [video : size bytes][audio : audio_size bytes]
* The audio region is reserved at audio_max_bytes; audio_size <= audio_max_bytes
* tells the reader how many audio bytes are valid for THIS frame.
*/
typedef struct {
uint64_t pts_us;
uint64_t wall_us;
uint32_t size;
uint32_t _pad;
uint8_t data[]; /* frame_size bytes */
uint32_t size; /* video bytes */
uint32_t audio_size; /* audio bytes for this frame (s16le stereo 48k), 0 = none */
uint8_t data[]; /* [video frame_size][audio audio_max_bytes] */
} fc_frame_t;
/* Compile-time size check */
@ -80,6 +123,11 @@ void fc_slot_close(struct fc_slot *s);
void fc_slot_write_frame(struct fc_slot *s,
const uint8_t *data, uint32_t size,
uint64_t pts_us);
/* Frame-coupled write: video + this frame's audio in one ring entry. */
void fc_slot_write_av(struct fc_slot *s,
const uint8_t *video, uint32_t vsize,
const uint8_t *audio, uint32_t asize,
uint64_t pts_us);
/* Accessor functions — inline now that struct fc_slot is defined above */
static inline fc_header_t *fc_slot_header(struct fc_slot *s) { return (fc_header_t *)s->base; }
@ -87,13 +135,22 @@ static inline const char *fc_slot_id(struct fc_slot *s) { return s->slot_id
static inline const char *fc_slot_shm_path(struct fc_slot *s) { return s->shm_path; }
static inline const char *fc_slot_sem_name(struct fc_slot *s) { return s->sem_name; }
/**
* Per-entry stride: header + video bytes + reserved audio region.
* Shared by writer and all readers frame-coupled audio rides in the same
* entry, so the stride MUST include FC_MAX_AUDIO_BYTES on every component.
*/
static inline size_t fc_entry_stride(uint32_t frame_size) {
return (size_t)FC_FRAME_HDR_SIZE + frame_size + (size_t)FC_MAX_AUDIO_BYTES;
}
/**
* Compute total shm size for a slot given frame_size.
* = FC_HEADER_SIZE + ring_depth * (FC_FRAME_HDR_SIZE + frame_size)
* = FC_HEADER_SIZE + ring_depth * fc_entry_stride(frame_size)
*/
static inline size_t fc_slot_shm_size(uint32_t frame_size) {
return (size_t)FC_HEADER_SIZE
+ (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + frame_size);
+ (size_t)FC_RING_DEPTH * fc_entry_stride(frame_size);
}
/**
@ -102,5 +159,13 @@ static inline size_t fc_slot_shm_size(uint32_t frame_size) {
static inline fc_frame_t *fc_frame_at(void *base, uint32_t frame_size, uint64_t idx) {
uint8_t *frames = (uint8_t *)base + FC_HEADER_SIZE;
return (fc_frame_t *)(frames + (idx % FC_RING_DEPTH)
* ((size_t)FC_FRAME_HDR_SIZE + frame_size));
* fc_entry_stride(frame_size));
}
/**
* Return pointer to the audio region of an entry (immediately after the
* frame_size video bytes within data[]).
*/
static inline uint8_t *fc_frame_audio(fc_frame_t *f, uint32_t frame_size) {
return f->data + frame_size;
}