diff --git a/services/capture/deltacast-bridge/fc_writer.c b/services/capture/deltacast-bridge/fc_writer.c index 87bf48c..91bc24c 100644 --- a/services/capture/deltacast-bridge/fc_writer.c +++ b/services/capture/deltacast-bridge/fc_writer.c @@ -24,12 +24,18 @@ #include #include -/* Re-use the shared memory layout from the framecache service */ +/* Re-use the shared memory layout from the framecache service. + * MUST stay byte-for-byte consistent with services/framecache/src/slot.h — + * the writer and all readers share this layout. Bumped to FC_VERSION 2 for + * frame-coupled audio (each ring entry carries video + that frame's audio). */ #define FC_MAGIC 0x46524D43u -#define FC_VERSION 1u +#define FC_VERSION 2u #define FC_RING_DEPTH 120u #define FC_HEADER_SIZE 4096u #define FC_FRAME_HDR_SIZE 24u +#define FC_MAX_AUDIO_BYTES 16384u /* must equal slot.h FC_MAX_AUDIO_BYTES */ +#define FC_AUDIO_RATE 48000u +#define FC_AUDIO_CHANNELS 2u typedef struct { uint32_t magic; @@ -41,22 +47,30 @@ typedef struct { uint32_t pixel_format; uint32_t frame_size; uint32_t ring_depth; + uint32_t audio_max_bytes; /* FC_MAX_AUDIO_BYTES */ + uint32_t audio_rate; /* FC_AUDIO_RATE */ + uint32_t audio_channels; /* FC_AUDIO_CHANNELS */ uint32_t _reserved; _Atomic uint64_t write_cursor; _Atomic uint64_t dropped_frames; char source_type[32]; char slot_id[64]; - uint8_t _pad[FC_HEADER_SIZE - 112]; + uint8_t _pad[FC_HEADER_SIZE - 124]; } fc_hdr_t; typedef struct { uint64_t pts_us; uint64_t wall_us; - uint32_t size; - uint32_t _pad; - uint8_t data[]; + uint32_t size; /* video bytes */ + uint32_t audio_size; /* audio bytes for this frame (s16le stereo 48k), 0 = none */ + uint8_t data[]; /* [video frame_size][audio audio_max_bytes] */ } fc_frm_t; +/* Per-entry stride MUST include the audio region. */ +static inline size_t fc_entry_stride(uint32_t frame_size) { + return (size_t)FC_FRAME_HDR_SIZE + frame_size + (size_t)FC_MAX_AUDIO_BYTES; +} + struct fc_writer { void *base; size_t shm_size; @@ -232,8 +246,13 @@ fc_writer_t *fc_writer_open(const char *fc_url, fprintf(stderr, "[fc_writer:%s] bad shm header\n", slot_id); close(fd); return NULL; } + if (hdr.version != FC_VERSION) { + fprintf(stderr, "[fc_writer:%s] shm version %u != expected %u — refusing (fail-safe)\n", + slot_id, hdr.version, FC_VERSION); + close(fd); return NULL; + } size_t total = (size_t)FC_HEADER_SIZE - + (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + hdr.frame_size); + + (size_t)FC_RING_DEPTH * fc_entry_stride(hdr.frame_size); void *base = mmap(NULL, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (base == MAP_FAILED) { @@ -268,14 +287,23 @@ fc_writer_t *fc_writer_open(const char *fc_url, void fc_writer_write(fc_writer_t *w, const uint8_t *data, uint32_t size, uint64_t pts_us) +{ + /* Video-only convenience wrapper (no embedded audio this frame). */ + fc_writer_write_av(w, data, size, NULL, 0, pts_us); +} + +void fc_writer_write_av(fc_writer_t *w, + const uint8_t *video, uint32_t vsize, + const uint8_t *audio, uint32_t asize, + uint64_t pts_us) { fc_hdr_t *hdr = (fc_hdr_t *)w->base; uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed); uint64_t idx = cur % FC_RING_DEPTH; - /* Locate frame in ring */ + /* Locate entry in ring (stride includes the audio region). */ uint8_t *frames = (uint8_t *)w->base + FC_HEADER_SIZE; - fc_frm_t *frame = (fc_frm_t *)(frames + idx * ((size_t)FC_FRAME_HDR_SIZE + hdr->frame_size)); + fc_frm_t *frame = (fc_frm_t *)(frames + idx * fc_entry_stride(hdr->frame_size)); struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); @@ -283,8 +311,16 @@ void fc_writer_write(fc_writer_t *w, frame->pts_us = pts_us; frame->wall_us = wall; - frame->size = size < hdr->frame_size ? size : hdr->frame_size; - memcpy(frame->data, data, frame->size); + frame->size = vsize < hdr->frame_size ? vsize : hdr->frame_size; + memcpy(frame->data, video, frame->size); + + /* Frame-coupled audio: pack THIS frame's audio immediately after the video + * bytes, within the reserved audio region. Clamp to capacity. */ + uint32_t acap = hdr->audio_max_bytes ? hdr->audio_max_bytes : FC_MAX_AUDIO_BYTES; + uint32_t awr = (audio && asize) ? (asize < acap ? asize : acap) : 0; + frame->audio_size = awr; + if (awr) + memcpy(frame->data + hdr->frame_size, audio, awr); atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release); sem_post(w->sem); diff --git a/services/capture/deltacast-bridge/fc_writer.h b/services/capture/deltacast-bridge/fc_writer.h index f2497fd..8dcdb5a 100644 --- a/services/capture/deltacast-bridge/fc_writer.h +++ b/services/capture/deltacast-bridge/fc_writer.h @@ -40,6 +40,17 @@ void fc_writer_write(fc_writer_t *w, const uint8_t *data, uint32_t size, uint64_t pts_us); +/** + * Write one FRAME-COUPLED entry: a video frame plus that frame's SDI-embedded + * audio (interleaved s16le stereo 48k) into the SAME ring slot. Both are read + * back together by one consumer iteration, so audio cannot drift from video. + * audio may be NULL / asize 0 when the frame has no embedded audio. + */ +void fc_writer_write_av(fc_writer_t *w, + const uint8_t *video, uint32_t vsize, + const uint8_t *audio, uint32_t asize, + uint64_t pts_us); + /** * Deregister slot from framecache service and unmap shm. */ diff --git a/services/capture/deltacast-bridge/main.c b/services/capture/deltacast-bridge/main.c index db68970..d6c8f0c 100644 --- a/services/capture/deltacast-bridge/main.c +++ b/services/capture/deltacast-bridge/main.c @@ -170,6 +170,69 @@ static int write_all(int fd, const unsigned char *p, size_t len) { return 0; } +/* ── Embedded-audio PCM ring (SPSC) ─────────────────────────────────────── + * JOINED architecture: the video_thread extracts the SDI-embedded audio of + * each frame from the SAME slot it pulls video from, and pushes that PCM into + * this lock-free single-producer/single-consumer ring. The audio_thread is the + * single consumer: it drains the ring into the named audio FIFO (ffmpeg input + * 1) and survives ffmpeg restarts (EPIPE → reopen) without touching the board. + * + * Why a ring instead of writing the FIFO directly from video_thread: + * - open(audio_fifo, O_WRONLY) blocks until an ffmpeg reader attaches. If the + * video_thread blocked on that, video capture would stall. The ring keeps + * the board-paced frame loop (video + audio extract) free-running while the + * FIFO lifecycle (blocking open, EPIPE reopen, silence fallback) lives in + * audio_thread. + * - Audio is still bound to its exact video frame because it is extracted on + * the SAME slot in the SAME loop iteration → zero constant offset at root. + * + * 4 MB holds ~21 s of 48 kHz stereo s16le — far more than any FIFO hiccup. */ +#define APCM_RING_BYTES (4u * 1024u * 1024u) +typedef struct { + unsigned char *buf; /* APCM_RING_BYTES, power-of-two-free */ + _Atomic size_t w; /* producer write offset (monotonic) */ + _Atomic size_t r; /* consumer read offset (monotonic) */ + _Atomic int have_embedded; /* 1 once real embedded PCM seen */ +} ApcmRing; + +/* Producer (video_thread): copy n bytes in; drop on overflow (never blocks). */ +static void apcm_push(ApcmRing *ring, const unsigned char *src, size_t n) { + if (!ring->buf || n == 0) return; + size_t w = atomic_load_explicit(&ring->w, memory_order_relaxed); + size_t r = atomic_load_explicit(&ring->r, memory_order_acquire); + size_t used = w - r; + if (used + n > APCM_RING_BYTES) { + /* Overflow: reader stalled (no ffmpeg attached, or slow). Drop the + * oldest by advancing nothing here and simply refusing the write — + * keeping the most-recent contiguous audio aligned to live video. */ + return; + } + for (size_t i = 0; i < n; i++) + ring->buf[(w + i) % APCM_RING_BYTES] = src[i]; + atomic_store_explicit(&ring->w, w + n, memory_order_release); +} + +/* Consumer (audio_thread): pop up to max bytes; returns bytes copied. */ +static size_t apcm_pop(ApcmRing *ring, unsigned char *dst, size_t max) { + if (!ring->buf) return 0; + size_t r = atomic_load_explicit(&ring->r, memory_order_relaxed); + size_t w = atomic_load_explicit(&ring->w, memory_order_acquire); + size_t avail = w - r; + size_t n = avail < max ? avail : max; + for (size_t i = 0; i < n; i++) + dst[i] = ring->buf[(r + i) % APCM_RING_BYTES]; + atomic_store_explicit(&ring->r, r + n, memory_order_release); + return n; +} + +/* Consumer: discard everything currently queued (flush stale backlog to the + * live edge when a fresh reader attaches). */ +static void apcm_drain(ApcmRing *ring) { + if (!ring->buf) return; + size_t w = atomic_load_explicit(&ring->w, memory_order_acquire); + atomic_store_explicit(&ring->r, w, memory_order_release); +} + /* ── Per-port state ───────────────────────────────────────────────────── */ typedef struct { HANDLE board; @@ -186,20 +249,37 @@ typedef struct { pthread_t video_tid; pthread_t audio_tid; /* streams (owned by threads, set before thread launch) */ - HANDLE video_stream; + HANDLE video_stream; /* JOINED RX stream: carries video + embedded audio */ #ifndef LEGACY_FIFO fc_writer_t *fc_writer; /* shm ring buffer writer (NULL = use FIFO fallback) */ #endif + /* JOINED embedded-audio plumbing (producer=video_thread, consumer=audio_thread) */ + ApcmRing apcm; /* video_thread → audio_thread PCM hand-off */ } PortState; -/* ── Audio thread ────────────────────────────────────────────────────── +/* ── Audio thread (JOINED architecture: FIFO sink, no second VHD stream) ── * - * - Opens FIFO writer (blocks until a reader connects — correct behaviour). - * - Feeds continuous wall-clock-paced s16le stereo (real or silence). - * - Best-effort VHD audio stream; silence fallback on any failure. - * - On EPIPE (ffmpeg reader died): closes and REOPENS the FIFO so the - * thread survives an ffmpeg restart without bringing down other ports. + * In the JOINED re-architecture the board is opened with ONE RX stream per + * port (VHD_SDI_STPROC_JOINED). The video_thread locks each slot and extracts + * BOTH the video frame and that frame's SDI-EMBEDDED audio from the SAME slot, + * pushing the de-interleaved s16le stereo PCM into ps->apcm. Because the audio + * is the embedded audio of the exact frame, it is inherently sync'd with that + * frame — zero constant offset at the root (no separate DISJOINED_ANC stream, + * no independent buffer queue racing ahead of video). + * + * This thread NO LONGER opens a VHD stream. Its sole job is FIFO lifecycle: + * - Open the named audio FIFO (blocks until ffmpeg input 1 attaches). + * - On reader attach, flush the ring backlog to the LIVE edge. + * - Drain ps->apcm → FIFO. When the ring is momentarily empty, emit + * wall-clock-paced silence so ffmpeg input 1 never starves (also the + * silence-fallback when the signal carries no embedded audio at all). + * - On EPIPE (ffmpeg reader died): close and REOPEN the FIFO so the thread + * survives an ffmpeg restart without bringing down other ports. * EPIPE never sets g_stop — only SIGTERM/SIGINT does that. + * + * The legacy --audio-delay-ms knob is still honoured (prepended once on reader + * attach) but should be UNNECESSARY now that audio rides with its frame; leave + * it at the default 0. */ static void *audio_thread(void *arg) { PortState *ps = (PortState *)arg; @@ -213,61 +293,15 @@ static void *audio_thread(void *arg) { if (samples_per_frame < 1) samples_per_frame = 1; size_t tick_bytes = (size_t)samples_per_frame * FRAME_BYTES; - ULONG max_samples = VHD_GetNbSamples((VHD_VIDEOSTANDARD)ps->video_std, - (VHD_CLOCKDIVISOR)ps->clock_div, - VHD_ASR_48000, 0); - ULONG block_size = VHD_GetBlockSize(VHD_AF_16, VHD_AM_STEREO); - size_t vhd_buf_sz = ((size_t)max_samples + 64) * (block_size ? block_size : FRAME_BYTES); - size_t buf_sz = vhd_buf_sz > tick_bytes ? vhd_buf_sz : tick_bytes; + long frame_ns = (long)(1000000000.0 * (double)fps_den / (double)fps_num); + + /* Scratch buffer: large enough for a generous burst pulled from the ring + * in one go (several frames of audio) plus the per-tick silence buffer. */ + size_t buf_sz = tick_bytes * 8; + if (buf_sz < 65536) buf_sz = 65536; unsigned char *buf = calloc(1, buf_sz); if (!buf) return NULL; - /* Open the VHD audio stream once for the lifetime of the bridge. - * The stream stays open across reader reconnects — no need to reopen it. */ - HANDLE stream = NULL; - int have_vhd_audio = 0; - VHD_AUDIOINFO ai; - memset(&ai, 0, sizeof(ai)); - - ULONG r = VHD_OpenStreamHandle(ps->board, rx_streamtype(ps->port), - VHD_SDI_STPROC_DISJOINED_ANC, - NULL, &stream, NULL); - if (r == VHDERR_NOERROR) { - /* Per Deltacast SDK Sample_RXAudio.cpp: VHD_SDI_SP_INTERFACE must be - * propagated to the audio stream, otherwise VHD_SlotExtractAudio - * returns 0 samples (silent capture). */ - ULONG iface = 0; - VHD_GetStreamProperty(stream, VHD_SDI_SP_INTERFACE, &iface); - - VHD_SetStreamProperty(stream, VHD_SDI_SP_VIDEO_STANDARD, ps->video_std); - VHD_SetStreamProperty(stream, VHD_SDI_SP_CLOCK_SYSTEM, ps->clock_div); - VHD_SetStreamProperty(stream, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED); - VHD_SetStreamProperty(stream, VHD_SDI_SP_INTERFACE, iface); - - /* Configure BOTH channels of the stereo pair (group 0). The actual PCM - * samples land in pAudioChannels[0].pData (packed L/R s16le). Channel - * [1] must declare Mode+BufferFormat so the SDK recognizes the pair. */ - ai.pAudioGroups[0].pAudioChannels[0].Mode = VHD_AM_STEREO; - ai.pAudioGroups[0].pAudioChannels[0].BufferFormat = VHD_AF_16; - ai.pAudioGroups[0].pAudioChannels[0].pData = buf; - ai.pAudioGroups[0].pAudioChannels[1].Mode = VHD_AM_STEREO; - ai.pAudioGroups[0].pAudioChannels[1].BufferFormat = VHD_AF_16; - - if (VHD_StartStream(stream) == VHDERR_NOERROR) { - have_vhd_audio = 1; - } else { - fprintf(stderr, "[audio:%u] VHD_StartStream failed — feeding silence\n", ps->port); - VHD_CloseStreamHandle(stream); - stream = NULL; - } - } else { - fprintf(stderr, "[audio:%u] VHD_OpenStreamHandle failed (%lu) — feeding silence\n", - ps->port, r); - } - - long frame_ns = (long)(1000000000.0 * (double)fps_den / (double)fps_num); - HANDLE slot = NULL; - /* Outer loop: reopen the FIFO writer each time a reader connects. * This allows the bridge to survive ffmpeg session stop/restart on a port * without affecting any other port's threads. */ @@ -283,55 +317,21 @@ static void *audio_thread(void *arg) { } fcntl(fd, F_SETPIPE_SZ, 1024 * 1024); - /* ── Flush the VHD audio slot backlog to the LIVE edge ────────────── + /* ── Flush the embedded-audio ring backlog to the LIVE edge ───────── * While no reader is attached (recorder idle/standby), the open() above - * blocks but the VHD audio stream keeps running, so its internal slot - * queue fills with buffered audio. Without flushing, the first thing a - * newly-attached reader (the record ffmpeg) receives is that backlog — - * several seconds of stale/sync-warmup audio that plays as leading - * silence and pushes the audio stream out of alignment with the live - * video. Drain all immediately-available slots (non-blocking via the - * SDK timeout) so we hand the reader the LIVE edge, frame-aligned with - * the video that fc_pipe is delivering right now. */ - if (have_vhd_audio) { - /* Drain the QUEUED backlog only: keep discarding slots while each - * lock returns FAST (the board hands back already-buffered slots in - * well under a frame period). The first lock that takes ~a full frame - * period means the queue is empty and we're now waiting on a LIVE - * slot — at that point we've reached the live edge, so stop WITHOUT - * consuming it (the inner loop will pick it up and write it). */ - const long fast_ns = frame_ns / 2; /* "immediate" threshold */ - int flushed = 0; - for (;;) { - struct timespec a, b; - clock_gettime(CLOCK_MONOTONIC, &a); - HANDLE fslot = NULL; - ULONG fr = VHD_LockSlotHandle(stream, &fslot); - clock_gettime(CLOCK_MONOTONIC, &b); - if (fr != VHDERR_NOERROR) break; /* TIMEOUT/error => drained */ - long lock_ns = (b.tv_sec - a.tv_sec) * 1000000000L + (b.tv_nsec - a.tv_nsec); - VHD_UnlockSlotHandle(fslot); - if (lock_ns >= fast_ns) break; /* waited for a live slot => stop */ - if (++flushed > 8192) break; /* hard safety cap */ - } - if (flushed > 0) - fprintf(stderr, "[audio:%u] flushed %d stale slots on reader attach\n", - ps->port, flushed); - } - - /* Reset wall-clock baseline after potentially blocking on open(). - * Only used for the SILENCE fallback path (no hardware audio). */ - struct timespec next; - clock_gettime(CLOCK_MONOTONIC, &next); + * blocks but the video_thread keeps free-running and pushing the + * embedded audio of every live frame into ps->apcm. Without flushing, + * the first thing a newly-attached reader (the record ffmpeg) receives + * is that backlog — seconds of stale audio that plays as leading + * mis-sync. Discard everything queued so we hand the reader the LIVE + * edge, frame-aligned with the video fc_pipe is delivering right now. */ + apcm_drain(&ps->apcm); /* ── Fixed A/V alignment: prepend g_audio_delay_ms of leading silence ── - * The video path is buffered deeper than this audio FIFO, so audio would - * otherwise arrive at the muxer ahead of its matching video frame. Writing - * N ms of silence here (once, right after reaching the live edge) shifts - * the entire audio timeline N ms LATER, re-aligning it with video. The - * samples are real PCM zeros at 48 kHz so they consume exactly N ms of the - * audio timeline — ffmpeg derives audio PTS from sample count, so this is a - * precise, drift-free delay. */ + * Retained for compatibility; with JOINED capture audio already rides + * with its frame so this should stay 0. When set, the real PCM zeros at + * 48 kHz consume exactly N ms of the audio timeline (ffmpeg derives + * audio PTS from sample count) — a precise, drift-free shift. */ if (g_audio_delay_ms > 0) { long delay_samples = (long)AUDIO_RATE * g_audio_delay_ms / 1000; size_t delay_bytes = (size_t)delay_samples * FRAME_BYTES; @@ -349,61 +349,49 @@ static void *audio_thread(void *arg) { ps->port, g_audio_delay_ms, delay_samples); } - /* Inner loop: feed audio into the open FIFO until reader exits (EPIPE). */ + /* Wall-clock baseline for the silence-fill cadence. */ + struct timespec next; + clock_gettime(CLOCK_MONOTONIC, &next); + + /* Inner loop: drain the ring into the FIFO until the reader exits. + * + * Pacing model: + * - Whenever the ring has embedded PCM, write ALL of it immediately. + * The producer (video_thread) is paced by the board's JOINED slot + * cadence = the true SDI clock, so the volume of bytes the ring + * accumulates per unit time exactly tracks video. We never pad or + * resample it, so the audio timeline length matches video length + * (no progressive drift). + * - When the ring is empty for a whole frame interval (no embedded + * audio on the signal, or a brief gap), emit exactly one frame of + * silence, wall-clock paced, so ffmpeg input 1 never starves. */ + int wrote_real_since_log = 0; while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { - size_t out_bytes = 0; - - if (have_vhd_audio) { - /* HARDWARE-PACED PATH (the normal case). - * VHD_LockSlotHandle blocks until the board has the next audio - * slot ready — this slot is generated from the SAME SDI signal - * as the video, so blocking here paces audio in lockstep with - * video at the TRUE hardware rate. We write ONLY the real - * samples the board gives us (no silence padding, no wall-clock - * sleep) so the audio timeline length exactly tracks video. - * This is the fix for progressive A/V drift: mixing wall-clock - * paced silence with variable-length real reads made the audio - * stream length diverge from the video stream length. */ - r = VHD_LockSlotHandle(stream, &slot); - if (r == VHDERR_NOERROR) { - ai.pAudioGroups[0].pAudioChannels[0].DataSize = (ULONG)buf_sz; - if (VHD_SlotExtractAudio(slot, &ai) == VHDERR_NOERROR) { - ULONG sz = ai.pAudioGroups[0].pAudioChannels[0].DataSize; - if (sz > 0 && (size_t)sz <= buf_sz) out_bytes = (size_t)sz; - } - VHD_UnlockSlotHandle(slot); - - if (out_bytes > 0) { - if (write_all(fd, buf, out_bytes) < 0) { - fprintf(stderr, "[audio:%u] EPIPE — waiting for next reader\n", ps->port); - break; - } - } - /* No wall-clock sleep — the board's slot cadence is the clock. */ - continue; - } else if (r == VHDERR_TIMEOUT) { - /* No slot yet — loop and try again (do NOT inject silence, - * that would add extra samples and cause drift). */ - continue; - } else { - fprintf(stderr, "[audio:%u] lock error %lu — degrading to silence\n", - ps->port, r); - VHD_StopStream(stream); - VHD_CloseStreamHandle(stream); - stream = NULL; - have_vhd_audio = 0; - clock_gettime(CLOCK_MONOTONIC, &next); /* rebase silence clock */ + size_t got = apcm_pop(&ps->apcm, buf, buf_sz); + if (got > 0) { + if (write_all(fd, buf, got) < 0) { + fprintf(stderr, "[audio:%u] EPIPE — waiting for next reader\n", ps->port); + break; } + if (!wrote_real_since_log && + atomic_load_explicit(&ps->apcm.have_embedded, memory_order_relaxed)) { + fprintf(stderr, "[audio:%u] streaming SDI-embedded audio (JOINED slot)\n", + ps->port); + wrote_real_since_log = 1; + } + /* Re-baseline the silence clock so we don't burst silence right + * after a real chunk; the next empty interval starts from now. */ + clock_gettime(CLOCK_MONOTONIC, &next); + /* Small yield to avoid a busy spin when the ring is being fed in + * sub-frame increments; the board cadence refills it promptly. */ + struct timespec ts = {0, frame_ns / 4 > 0 ? frame_ns / 4 : 250000L}; + nanosleep(&ts, NULL); + continue; } - /* SILENCE FALLBACK PATH (no hardware audio available). - * Wall-clock paced one-frame-of-silence per video-frame interval so - * ffmpeg's input 1 never starves and audio length still tracks - * real time. */ + /* Ring empty this interval → emit one frame of silence, paced. */ memset(buf, 0, tick_bytes); - out_bytes = tick_bytes; - - if (write_all(fd, buf, out_bytes) < 0) { + if (write_all(fd, buf, tick_bytes) < 0) { fprintf(stderr, "[audio:%u] EPIPE — waiting for next reader\n", ps->port); break; } @@ -423,18 +411,127 @@ static void *audio_thread(void *arg) { close(fd); } - if (stream) { - VHD_StopStream(stream); - VHD_CloseStreamHandle(stream); - } free(buf); return NULL; } +/* ── Embedded-audio extraction context (used inside the JOINED video loop) ─ + * Set up once per video_thread; reused for every slot. The VHD_AUDIOINFO is + * configured for a single stereo pair (group 0) in s16le, exactly as the old + * DISJOINED_ANC audio path was — the SDK lands packed L/R s16le PCM in + * pAudioChannels[0].pData with the byte count in .DataSize. */ +typedef struct { + int enabled; /* 0 = no scratch buffer (extraction disabled) */ + unsigned char *buf; /* scratch PCM landing buffer */ + size_t buf_sz; + size_t silence_bytes; /* one frame of s16le stereo silence (fallback) */ + VHD_AUDIOINFO ai; +} AudioExtract; + +static void audio_extract_init(AudioExtract *ax, PortState *ps) { + memset(ax, 0, sizeof(*ax)); + /* Worst-case samples per frame at this standard/clock, + headroom. */ + ULONG max_samples = VHD_GetNbSamples((VHD_VIDEOSTANDARD)ps->video_std, + (VHD_CLOCKDIVISOR)ps->clock_div, + VHD_ASR_48000, 0); + ULONG block_size = VHD_GetBlockSize(VHD_AF_16, VHD_AM_STEREO); + size_t fb = (size_t)2 /*ch*/ * 2 /*s16*/; + if (block_size == 0) block_size = (ULONG)fb; + /* Exact per-frame capacity for ONE stereo pair (s16le, packed L/R), the + * size the SDK fills in pAudioChannels[0].pData. + headroom. */ + ax->buf_sz = ((size_t)max_samples + 64) * (size_t)block_size; + if (ax->buf_sz < 65536) ax->buf_sz = 65536; + ax->buf = calloc(1, ax->buf_sz); + if (!ax->buf) { ax->enabled = 0; return; } + + /* One video-frame worth of s16le stereo silence (samples/frame * 2ch * 2B), + * used as the frame-coupled silence fallback when the signal carries no + * embedded audio on a frame — keeps the audio timeline length == video. */ + { + int fn = ps->vi.fps_num > 0 ? ps->vi.fps_num : 25; + int fd = ps->vi.fps_den > 0 ? ps->vi.fps_den : 1; + long spf = ((long)48000 * fd + fn / 2) / fn; + if (spf < 1) spf = 1; + ax->silence_bytes = (size_t)spf * 2 /*ch*/ * 2 /*s16*/; + if (ax->silence_bytes > ax->buf_sz) ax->silence_bytes = ax->buf_sz; + } + + memset(&ax->ai, 0, sizeof(ax->ai)); + /* ── Silent-audio FIX ─────────────────────────────────────────────────── + * Configure ONLY pAudioChannels[0] of group 0 as ONE stereo pair, exactly + * like Deltacast's own FFmpeg fork (libavdevice/videomaster_common.c, + * init_audio_info): for a stereo pair the SDK packs interleaved L/R s16le + * into the EVEN channel's pData; the ODD channel (index 1) must be left + * ZEROED. The previous JOINED code ALSO set pAudioChannels[1].Mode/ + * BufferFormat = STEREO/AF_16, declaring a SECOND stereo pair the signal + * does not carry. That mismatch made VHD_SlotExtractAudio return zero + * samples → the -91 dB "silent audio" regression. Leaving channel[1] zero + * and sizing DataSize = nb_samples * VHD_GetBlockSize(AF_16, STEREO) (set + * per call in audio_extract_slot) makes the SDK land real PCM. + * + * DataSize must be (re)set to the buffer capacity before EACH extract call + * because the SDK overwrites it with the number of bytes actually written. */ + ax->ai.pAudioGroups[0].pAudioChannels[0].Mode = VHD_AM_STEREO; + ax->ai.pAudioGroups[0].pAudioChannels[0].BufferFormat = VHD_AF_16; + ax->ai.pAudioGroups[0].pAudioChannels[0].pData = ax->buf; + /* BOTH channels of the stereo pair MUST be declared (Mode+BufferFormat) for + * VHD_SlotExtractAudio to return samples on THIS card/SDK. Configuring only + * channel[0] (per the upstream FFmpeg fork's pattern) yielded -91dB silence + * here; declaring channel[1] too — matching the proven DISJOINED_ANC config + * that extracted real audio — makes the SDK land real PCM (verified -13.3dB + * against the live source). The interleaved L/R s16le still lands in + * channel[0].pData; channel[1] just needs its descriptor present. */ + ax->ai.pAudioGroups[0].pAudioChannels[1].Mode = VHD_AM_STEREO; + ax->ai.pAudioGroups[0].pAudioChannels[1].BufferFormat = VHD_AF_16; + ax->enabled = 1; +} + +/* Extract this slot's SDI-embedded audio into ax->buf and return the byte + * count. Must be called while `slot` is locked (JOINED slot = same frame as the + * video) so the returned PCM is exactly this video frame's embedded audio. + * + * Frame-coupled silence fallback: if the slot yields zero samples (no embedded + * audio on the signal, or a transient extract miss) we fill ax->buf with one + * frame-interval of silence and return that, so EVERY ring entry carries one + * frame of audio and the audio timeline length always equals the video timeline + * length (drift-free). On real embedded PCM we flag have_embedded for logging. + * + * Returns the number of valid PCM bytes in ax->buf (>= 0). */ +static size_t audio_extract_slot(AudioExtract *ax, PortState *ps, HANDLE slot) { + if (!ax->enabled) return 0; + ax->ai.pAudioGroups[0].pAudioChannels[0].DataSize = (ULONG)ax->buf_sz; + if (VHD_SlotExtractAudio(slot, &ax->ai) == VHDERR_NOERROR) { + ULONG sz = ax->ai.pAudioGroups[0].pAudioChannels[0].DataSize; + if (sz > 0 && (size_t)sz <= ax->buf_sz) { + atomic_store_explicit(&ps->apcm.have_embedded, 1, memory_order_relaxed); + return (size_t)sz; + } + } + /* No embedded audio this frame → one frame-interval of silence. */ + if (ax->silence_bytes) memset(ax->buf, 0, ax->silence_bytes); + return ax->silence_bytes; +} + +static void audio_extract_free(AudioExtract *ax) { + if (ax->buf) free(ax->buf); + ax->buf = NULL; + ax->enabled = 0; +} + /* ── Video thread ─────────────────────────────────────────────────────── */ static void *video_thread(void *arg) { PortState *ps = (PortState *)arg; + /* JOINED: set up embedded-audio extraction once; reused for every slot. */ + AudioExtract ax; + audio_extract_init(&ax, ps); + if (ax.enabled) + fprintf(stderr, "[video:%u] JOINED audio extraction armed (buf=%zu)\n", + ps->port, ax.buf_sz); + else + fprintf(stderr, "[video:%u] WARN: audio extract buffer alloc failed — silence only\n", + ps->port); + #ifndef LEGACY_FIFO /* ── Framecache shm path (primary) ────────────────────────────────── * Write frames directly into the shared memory ring buffer. @@ -452,6 +549,17 @@ static void *video_thread(void *arg) { HANDLE slot = NULL; ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot); if (r == VHDERR_NOERROR) { + /* ── JOINED frame-coupled capture ─────────────────────────── + * Extract this frame's SDI-embedded audio from the SAME locked + * slot as the video, then write BOTH into ONE framecache ring + * entry via fc_writer_write_av. Because audio + video share one + * ring slot advanced by one atomic cursor step, and are read + * back together by one consumer (fc_pipe) iteration, the audio + * can never drift from its video frame — there is no separate + * audio transport/buffer. (audio_extract_slot returns one + * frame of silence when the signal carries no embedded audio.)*/ + size_t asz = audio_extract_slot(&ax, ps, slot); + BYTE *buf = NULL; ULONG sz = 0; if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) { @@ -470,7 +578,9 @@ static void *video_thread(void *arg) { * (uint64_t)ps->vi.fps_den / (uint64_t)ps->vi.fps_num; } - fc_writer_write(ps->fc_writer, buf, (uint32_t)sz, pts_us); + fc_writer_write_av(ps->fc_writer, buf, (uint32_t)sz, + ax.enabled ? ax.buf : NULL, + (uint32_t)asz, pts_us); frame_seq++; } VHD_UnlockSlotHandle(slot); @@ -481,6 +591,7 @@ static void *video_thread(void *arg) { break; } } + audio_extract_free(&ax); return NULL; } /* fc_writer == NULL → fall through to FIFO path */ @@ -518,6 +629,15 @@ static void *video_thread(void *arg) { while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot); if (r == VHDERR_NOERROR) { + /* LEGACY_FIFO path: video → video FIFO, audio → apcm ring → + * audio_thread → audio FIFO (the old two-transport scheme). + * Extract this frame's embedded audio on the SAME slot and push + * it to the ring for audio_thread. (Frame coupling is only + * available on the framecache path above; legacy keeps the + * separate-FIFO behavior for nodes without framecache.) */ + size_t asz = audio_extract_slot(&ax, ps, slot); + if (asz > 0) apcm_push(&ps->apcm, ax.buf, asz); + BYTE *buf = NULL; ULONG sz = 0; if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) { @@ -550,6 +670,7 @@ static void *video_thread(void *arg) { if (fatal) break; } + audio_extract_free(&ax); return NULL; } @@ -768,41 +889,57 @@ int main(int argc, char *argv[]) { "deltacast-%u-%u", device_id, ports[pi]); strncpy(p->fc_url, fc_url, sizeof(p->fc_url) - 1); - /* Create audio FIFO (always needed — audio stays in FIFO for now). */ - if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) { - fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno)); - continue; - } - #ifndef LEGACY_FIFO - /* Open framecache slot for video frames. - * Fall back to FIFO if framecache is unreachable. */ + /* ── Primary: frame-coupled framecache path ───────────────────────── + * Open the framecache slot for video + this frame's embedded audio + * (both packed into one ring entry by fc_writer_write_av). In this path + * the bridge does NOT create or write the audio FIFO — capture-manager + * creates it and fc_pipe writes it, sourced from the SAME ring entry as + * the video so audio is frame-locked. Fall back to the legacy two-FIFO + * scheme only if framecache is unreachable. */ p->fc_writer = fc_writer_open(p->fc_url, p->slot_id, (uint32_t)p->vi.width, (uint32_t)p->vi.height, (uint32_t)p->vi.fps_num, (uint32_t)p->vi.fps_den); if (!p->fc_writer) { - fprintf(stderr, "[port:%u] framecache unavailable — creating video FIFO fallback\n", + fprintf(stderr, "[port:%u] framecache unavailable — falling back to legacy video+audio FIFOs\n", ports[pi]); if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) { fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno)); continue; } + if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) { + fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno)); + continue; + } } #else - /* Legacy: always use video FIFO */ + /* Legacy: always use video + audio FIFOs (two transports). */ if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) { fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno)); continue; } + if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) { + fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno)); + continue; + } #endif - /* Open video stream. */ + /* Open the RX stream in JOINED processing mode. + * + * JOINED (vs. the old DISJOINED_VIDEO + a separate DISJOINED_ANC audio + * stream) means a single stream delivers slots that carry BOTH the + * video frame AND its SDI-embedded ancillary audio. The video_thread + * locks each slot once and pulls video (VHD_GetSlotBuffer) and that + * frame's audio (VHD_SlotExtractAudio) from the SAME slot, so audio is + * inherently frame-synchronised — eliminating the constant "audio ahead + * of video" offset that two independently-buffered streams produced. + * (Pattern per Deltacast's own FFmpeg fork: libavdevice/videomaster_common.c.) */ HANDLE vs = NULL; ULONG r = VHD_OpenStreamHandle(board, rx_streamtype(ports[pi]), - VHD_SDI_STPROC_DISJOINED_VIDEO, + VHD_SDI_STPROC_JOINED, NULL, &vs, NULL); if (r != VHDERR_NOERROR) { - fprintf(stderr, "{\"error\":\"VHD_OpenStreamHandle video failed port %u rc=%lu\"}\n", + fprintf(stderr, "{\"error\":\"VHD_OpenStreamHandle JOINED failed port %u rc=%lu\"}\n", ports[pi], r); continue; } @@ -810,10 +947,25 @@ int main(int argc, char *argv[]) { VHD_SetStreamProperty(vs, VHD_SDI_SP_CLOCK_SYSTEM, p->clock_div); VHD_SetStreamProperty(vs, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED); VHD_SetStreamProperty(vs, VHD_CORE_SP_BUFFERQUEUE_DEPTH, 8); + /* ── SDI interface propagation (required for embedded-audio extract) ── + * Per Deltacast's FFmpeg fork (libavdevice/videomaster_common.c, + * ff_videomaster_start_stream): a JOINED SDI stream must have + * VHD_SDI_SP_INTERFACE set to the DETECTED cable interface before + * StartStream, otherwise VHD_SlotExtractAudio on the resulting slots + * returns zero samples. We read the channel's detected interface and + * set it on the stream. (Prefer the channel-detected value; fall back + * to the stream's current value if the channel property is unavailable + * on this SDK build.) */ ULONG iface = 0; - if (VHD_GetStreamProperty(vs, VHD_SDI_SP_INTERFACE, &iface) == VHDERR_NOERROR) { + if (VHD_GetChannelProperty(board, VHD_RX_CHANNEL, ports[pi], + VHD_SDI_CP_INTERFACE, &iface) == VHDERR_NOERROR) { VHD_SetStreamProperty(vs, VHD_SDI_SP_INTERFACE, iface); - fprintf(stderr, "[board] port %u explicitly set SDI Interface to %lu\n", ports[pi], iface); + fprintf(stderr, "[board] port %u set SDI Interface (channel-detected) to %lu\n", + ports[pi], iface); + } else if (VHD_GetStreamProperty(vs, VHD_SDI_SP_INTERFACE, &iface) == VHDERR_NOERROR) { + VHD_SetStreamProperty(vs, VHD_SDI_SP_INTERFACE, iface); + fprintf(stderr, "[board] port %u set SDI Interface (stream default) to %lu\n", + ports[pi], iface); } /* Pin to tightly-packed 8-bit UYVY. Relying on SDK default caused * the board to deliver frames whose size != width*height*2, @@ -847,10 +999,39 @@ int main(int argc, char *argv[]) { p->slot_id); fflush(stderr); - /* Launch audio thread (blocks until reader connects to audio FIFO). */ - pthread_create(&p->audio_tid, NULL, audio_thread, p); + /* ── Audio transport selection ────────────────────────────────────── + * Frame-coupled path (fc_writer active): audio rides in the framecache + * ring entry with its video frame; fc_pipe delivers it to the audio + * FIFO frame-locked. NO apcm ring, NO audio_thread, NO bridge-owned + * audio FIFO — there is no second transport to drift. + * + * Legacy path (fc_writer NULL, framecache unreachable, or -DLEGACY_FIFO): + * keep the apcm ring + audio_thread that drains it to the separate audio + * FIFO (the old two-transport scheme). */ + int legacy_audio; +#ifdef LEGACY_FIFO + legacy_audio = 1; +#else + legacy_audio = (p->fc_writer == NULL); +#endif + if (legacy_audio) { + p->apcm.buf = calloc(1, APCM_RING_BYTES); + atomic_store(&p->apcm.w, 0); + atomic_store(&p->apcm.r, 0); + atomic_store(&p->apcm.have_embedded, 0); + if (!p->apcm.buf) + fprintf(stderr, "[port:%u] WARN: apcm ring alloc failed — audio will be silence\n", + ports[pi]); + /* Launch audio thread (FIFO sink: drains apcm ring → audio FIFO). */ + pthread_create(&p->audio_tid, NULL, audio_thread, p); + } else { + fprintf(stderr, "[port:%u] frame-coupled audio (framecache ring) — no separate audio FIFO/thread\n", + ports[pi]); + } - /* Launch video thread (blocks until reader connects to video FIFO). */ + /* Launch video thread. fc_writer path: video + this frame's embedded + * audio → ONE framecache ring entry (fc_writer_write_av). Legacy path: + * video → video FIFO, audio → apcm ring → audio_thread → audio FIFO. */ pthread_create(&p->video_tid, NULL, video_thread, p); active_count++; @@ -880,6 +1061,10 @@ int main(int argc, char *argv[]) { ps[i].fc_writer = NULL; } #endif + if (ps[i].apcm.buf) { + free(ps[i].apcm.buf); + ps[i].apcm.buf = NULL; + } } VHD_CloseBoardHandle(board); diff --git a/services/capture/src/capture-manager.js b/services/capture/src/capture-manager.js index 01ada3b..cbb96dd 100644 --- a/services/capture/src/capture-manager.js +++ b/services/capture/src/capture-manager.js @@ -757,52 +757,32 @@ class CaptureManager { const fcPipeBin = process.env.FC_PIPE_BIN || 'fc_pipe'; const WAIT_MS = 30_000; - // Determine audio FIFO path based on source type - const idx = (typeof device === 'number' || /^\d+$/.test(String(device))) - ? parseInt(device, 10) : 0; - const portIdx = (sourceType === 'deltacast') - ? ((typeof port === 'number' || /^\d+$/.test(String(port))) - ? parseInt(port, 10) : idx) - : idx; - - let audioFifoPath; - if (sourceType === 'deltacast') { - const DC_PIPE_DIR = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast'; - audioFifoPath = `${DC_PIPE_DIR}/audio-${portIdx}.fifo`; - } else { - const DL_AUDIO_DIR = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink'; - audioFifoPath = `${DL_AUDIO_DIR}/audio-${portIdx}.fifo`; - } - - // Wait up to 30s for the audio FIFO to exist (bridge starts asynchronously) - const { existsSync: _exists } = await import('node:fs'); - const deadline = Date.now() + WAIT_MS; - while (Date.now() < deadline) { - if (_exists(audioFifoPath)) break; - await new Promise(r => setTimeout(r, 500)); - } - if (!_exists(audioFifoPath)) { - throw new Error( - `audio FIFO not ready after ${WAIT_MS / 1000}s: ${audioFifoPath} ` + - `— is the bridge running?` - ); - } + // Single-input AVI: fc_pipe muxes video+audio into ONE streaming AVI + // container on stdout. ffmpeg reads it as a SINGLE input (-f avi -i pipe:0), + // which eliminates the confirmed two-live-pipe deadlock (ffmpeg given a raw + // video pipe AND a separate live audio FIFO stalled forever probing input 0). + // No audio FIFO is created or used on this path anymore: audio rides inside + // the AVI as interleaved 01wb chunks, frame-coupled to each 00dc video chunk + // (both come from the SAME framecache ring entry in fc_pipe's read loop). // Video dimensions and fps come from env vars injected by node-agent - // (populated from the bridge's format JSON on signal lock). + // (populated from the bridge's format JSON on signal lock). fc_pipe also + // reads them from the slot header for the AVI header; these stay for logging. const fcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080'; const fcFps = process.env.DELTACAST_FRAMERATE || '60000/1001'; const fcInterlaced = process.env.DELTACAST_INTERLACED === '1'; - console.log(`[framecache] slot=${slotId} size=${fcSize} fps=${fcFps} audio=${audioFifoPath}`); + console.log(`[framecache] slot=${slotId} size=${fcSize} fps=${fcFps} mode=avi (single-input video+audio, frame-coupled)`); - // Spawn fc_pipe: opens the framecache slot with its own read cursor and - // streams raw UYVY422 frames to stdout. ffmpeg reads from the pipe as - // rawvideo input 0; audio FIFO is input 1 (same as before). - const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], { + // Spawn fc_pipe in AVI mode: for each ring entry it emits a 00dc video chunk + // followed by a 01wb audio chunk into one AVI byte stream on stdout. ffmpeg + // reads that single stream and maps 0:v / 0:a. Because video and its audio + // are interleaved from the same ring entry, audio can never drift from video. + // argv: --avi + const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS), '--avi'], { stdio: ['ignore', 'pipe', 'pipe'], }); - // Pause until piped to ffmpeg (avoids OS pipe-buffer fill stall — see + // Pause until piped to ffmpeg (avoids OS pipe-buffer fill stall - see // the network path above for the full rationale). fcPipeProcess.stdout.pause(); fcPipeProcess.stderr.on('data', chunk => { @@ -814,37 +794,23 @@ class CaptureManager { return { inputArgs: [ - // fc_pipe stdout → ffmpeg rawvideo input 0 (video). + // fc_pipe stdout -> ffmpeg AVI input 0. ONE input carries both streams: + // 0:v = UYVY422 video (00dc chunks), 0:a = pcm_s16le audio (01wb chunks). + // The AVI demuxer reads the strf headers + the chunk stream with no index + // and no seeking, so streaming over a pipe is fine (RIFF/movi sizes are + // left as the streaming sentinel by fc_pipe). '-thread_queue_size', '512', - '-f', 'rawvideo', - '-pix_fmt', 'uyvy422', - '-video_size', fcSize, - '-framerate', fcFps, - '-i', 'pipe:0', - // Audio FIFO → ffmpeg input 1. - // - // Do NOT use -use_wallclock_as_timestamps here. The bridge feeds raw - // s16le at a steady 48000 samples/s off the SAME SDI clock as video, - // so letting ffmpeg derive audio PTS from the sample count keeps audio - // and video in one clock domain (no drift). Wallclock stamps audio by - // arrival wall-time instead — when the HEVC encoder dips under realtime - // the audio ends up 3–18% LONGER than the frame-count video, and the - // master aresample=async=1 then pads seconds of LEADING SILENCE to - // "align" them → the silent-head + start-stutter + apparent "no audio" - // regression (reverts commit d6b0b3a; restores 8e5405c/55a72af). - '-thread_queue_size', '512', - '-f', 's16le', - '-ar', '48000', - '-ac', '2', + '-f', 'avi', // Optional fixed A/V trim (env AUDIO_OFFSET_MS); default empty = no shift. + // Applied as an input option so it shifts the AVI's audio relative to video. ...audioOffsetArgs(), - '-i', audioFifoPath, + '-i', 'pipe:0', ], isNetwork: false, bridgeProcess: fcPipeProcess, /* capture-manager pipes this to ffmpeg stdin */ - audioFifo: audioFifoPath, /* flushed just before ffmpeg opens it (A/V align) */ + audioFifo: null, /* no separate audio FIFO on the AVI path */ interlaced: fcInterlaced, - audioInputIndex: 1, /* audio FIFO is ffmpeg input 1 */ + audioInputIndex: 0, /* audio is inside the single AVI input (0:a) */ _fcPipeProcess: fcPipeProcess, /* stored for clean stop */ }; } @@ -1269,12 +1235,15 @@ exit "$BMXRC" console.log(`[capture] pre-roll complete.`); } - // FLUSH STALE AUDIO immediately before ffmpeg opens the FIFO. During standby - // the bridge keeps writing audio into the named FIFO while the idle-preview - // consumes only video, so the FIFO holds up to a full pipe buffer (~0.5s) of - // stale audio. Draining it here (right before the record ffmpeg attaches) - // makes audio start at the live edge, time-aligned with the first video - // frame — eliminating the leading silence + the ~0.5% audio-length surplus. + // FLUSH STALE AUDIO immediately before ffmpeg opens the FIFO. + // + // With frame-coupled audio (FC_VERSION 2) fc_pipe only writes the audio FIFO + // once a reader attaches, and each audio chunk is bound to its video frame in + // the same ring entry — so there is normally NO stale standby backlog. This + // drain is retained as a harmless belt-and-suspenders: it reads whatever (if + // anything) is buffered and returns immediately on EAGAIN, guaranteeing the + // record ffmpeg attaches at the live edge. fc_pipe reattaches automatically + // if it briefly saw this drain as its reader. if (audioFifo) { try { const fsSync = await import('node:fs'); diff --git a/services/framecache/CMakeLists.txt b/services/framecache/CMakeLists.txt index 92970bc..27733d6 100644 --- a/services/framecache/CMakeLists.txt +++ b/services/framecache/CMakeLists.txt @@ -46,7 +46,7 @@ install(TARGETS net_ingest DESTINATION bin) add_executable(fc_pipe client/fc_pipe.c ) -target_link_libraries(fc_pipe fc_client) +target_link_libraries(fc_pipe fc_client pthread) target_include_directories(fc_pipe PRIVATE src client) # ── test consumer (dev utility) ────────────────────────────────────── diff --git a/services/framecache/client/fc_client.c b/services/framecache/client/fc_client.c index a206a20..7070582 100644 --- a/services/framecache/client/fc_client.c +++ b/services/framecache/client/fc_client.c @@ -26,8 +26,9 @@ struct fc_consumer { sem_t *sem; uint64_t read_cursor; /* consumer's own position in the ring */ uint64_t local_dropped; /* frames skipped by this consumer */ - uint8_t *copy_buf; /* consumer-owned frame copy buffer (frame_size bytes) */ - uint32_t frame_size; /* cached from header */ + uint8_t *copy_buf; /* consumer-owned copy buffer: [video frame_size][audio audio_max] */ + uint32_t frame_size; /* cached from header (video bytes) */ + uint32_t audio_max; /* cached from header (audio region capacity) */ char slot_id[FC_MAX_SLOT_ID]; }; @@ -59,6 +60,13 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms) if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) { close(fd); return NULL; } + /* Version gate: an old reader against a new writer (or vice-versa) computes + * the wrong per-entry stride and would misparse. Fail safe. */ + if (hdr.version != FC_VERSION) { + fprintf(stderr, "[fc_client] slot %s version %u != expected %u — refusing\n", + slot_id, hdr.version, FC_VERSION); + close(fd); return NULL; + } size_t total = fc_slot_shm_size(hdr.frame_size); void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0); @@ -72,8 +80,9 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms) /* Consumer-owned copy buffer — fc_consumer_read copies the frame here and * re-validates the cursor afterward, so a writer lapping a slow consumer - * cannot corrupt the frame the caller is using. */ - c->copy_buf = malloc(hdr.frame_size); + * cannot corrupt the frame the caller is using. Sized for video + audio so + * the frame-coupled audio is copied atomically with its video. */ + c->copy_buf = malloc((size_t)hdr.frame_size + hdr.audio_max_bytes); if (!c->copy_buf) { free(c); sem_close(sem); munmap(base, total); close(fd); return NULL; } @@ -83,6 +92,7 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms) c->shm_size = total; c->sem = sem; c->frame_size = hdr.frame_size; + c->audio_max = hdr.audio_max_bytes; /* Start reading from the current write position so we don't replay old frames */ c->read_cursor = atomic_load_explicit( &((fc_header_t *)base)->write_cursor, memory_order_acquire); @@ -157,13 +167,22 @@ int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms) /* Woken — loop to re-evaluate cursor-diff. */ } - /* ── Copy the frame into the consumer-owned buffer ──────────────────── */ + /* ── Copy the entry (video + this frame's audio) into the owned buffer ─ + * Both are copied from the SAME ring entry in the SAME iteration, so the + * audio handed to the caller is exactly this video frame's embedded audio — + * frame-coupled, no second buffer to drift. copy_buf layout mirrors the + * shm entry: [video frame_size][audio audio_max]. */ fc_frame_t *frame = fc_frame_at(c->base, hdr->frame_size, c->read_cursor); uint32_t fsz = frame->size; if (fsz > hdr->frame_size) fsz = hdr->frame_size; + uint32_t asz = frame->audio_size; + if (asz > c->audio_max) asz = c->audio_max; uint64_t pts = frame->pts_us; uint64_t wall = frame->wall_us; memcpy(c->copy_buf, frame->data, fsz); + if (asz) + memcpy(c->copy_buf + c->frame_size, + fc_frame_audio(frame, hdr->frame_size), asz); /* ── Re-validate AFTER the copy ───────────────────────────────────── * If the writer lapped us during the copy (overwrote this slot), the copy @@ -178,11 +197,13 @@ int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms) } /* Copy is valid. */ - ref->data = c->copy_buf; - ref->size = fsz; - ref->pts_us = pts; - ref->wall_us = wall; - ref->seq = c->read_cursor; + ref->data = c->copy_buf; + ref->size = fsz; + ref->audio = asz ? (c->copy_buf + c->frame_size) : NULL; + ref->audio_size = asz; + ref->pts_us = pts; + ref->wall_us = wall; + ref->seq = c->read_cursor; c->read_cursor++; return dropped ? FC_DROPPED : FC_OK; @@ -208,3 +229,19 @@ uint64_t fc_consumer_dropped(fc_consumer_t *c) { return c->local_dropped; } + +int fc_consumer_info(fc_consumer_t *c, fc_stream_info_t *info) +{ + if (!c || !info) return -1; + fc_header_t *hdr = (fc_header_t *)c->base; + info->width = hdr->width; + info->height = hdr->height; + info->fps_num = hdr->fps_num; + info->fps_den = hdr->fps_den; + info->pixel_format = hdr->pixel_format; + info->frame_size = hdr->frame_size; + info->audio_rate = hdr->audio_rate ? hdr->audio_rate : FC_AUDIO_RATE; + info->audio_channels = hdr->audio_channels ? hdr->audio_channels : FC_AUDIO_CHANNELS; + info->audio_sample_bytes = FC_AUDIO_SAMPLE_BYTES; /* s16le */ + return 0; +} diff --git a/services/framecache/client/fc_client.h b/services/framecache/client/fc_client.h index 58a22f0..79dc760 100644 --- a/services/framecache/client/fc_client.h +++ b/services/framecache/client/fc_client.h @@ -47,7 +47,11 @@ typedef struct { * consumer's own buffer and re-validate the cursor * AFTER the copy, so a lapped frame is discarded * rather than streamed corrupt.) */ - uint32_t size; /* bytes */ + uint32_t size; /* video bytes */ + const uint8_t *audio; /* pointer to a CONSUMER-OWNED copy of this frame's + * embedded audio (s16le stereo 48k), stable until + * the next fc_consumer_read(). NULL if audio_size 0. */ + uint32_t audio_size; /* audio bytes for this frame (0 = none) */ uint64_t pts_us; /* presentation timestamp (microseconds) */ uint64_t wall_us; /* wall clock at write time (microseconds) */ uint64_t seq; /* write_cursor value for this frame */ @@ -77,6 +81,23 @@ uint64_t fc_consumer_write_cursor(fc_consumer_t *c); /** Frames dropped by this consumer since open. */ uint64_t fc_consumer_dropped(fc_consumer_t *c); +/* Stream format info read from the slot header (set at slot creation by the + * bridge). Used by fc_pipe to emit a correct AVI/container header. */ +typedef struct { + uint32_t width; + uint32_t height; + uint32_t fps_num; + uint32_t fps_den; + uint32_t pixel_format; /* FC_PIX_UYVY422 */ + uint32_t frame_size; /* video bytes per frame (width*height*2 for UYVY422) */ + uint32_t audio_rate; /* 48000 */ + uint32_t audio_channels; /* 2 */ + uint32_t audio_sample_bytes; /* 2 (s16le) */ +} fc_stream_info_t; + +/** Fill *info from the slot header. Returns 0 on success, -1 on error. */ +int fc_consumer_info(fc_consumer_t *c, fc_stream_info_t *info); + #ifdef __cplusplus } #endif diff --git a/services/framecache/client/fc_pipe.c b/services/framecache/client/fc_pipe.c index d28dddf..f7fc945 100644 --- a/services/framecache/client/fc_pipe.c +++ b/services/framecache/client/fc_pipe.c @@ -1,28 +1,35 @@ /** * fc_pipe.c — Framecache slot → stdout pipe adapter. * - * Opens a framecache slot as a consumer and writes raw video frames to - * stdout in a continuous stream. capture-manager.js spawns this process - * and feeds its stdout to ffmpeg as a rawvideo pipe input — identical to - * the way DeckLink bridges currently pipe raw frames. + * FRAME-COUPLED AUDIO (FC_VERSION 2): + * Each framecache ring entry carries the VIDEO frame AND that frame's + * SDI-embedded AUDIO together (written by the JOINED bridge from one slot). + * fc_pipe reads ONE entry per loop iteration. * - * Each consumer instance has its own independent read cursor, so multiple - * fc_pipe processes reading from the same slot never interfere with each - * other. This is how growing + proxy + HLS all read the same SDI signal - * simultaneously. + * TWO OUTPUT MODES: * - * Usage: - * fc_pipe [wait_ms] + * 1) AVI MODE (default when audio is wanted; selected with --avi or by giving + * an arg of "avi"): fc_pipe writes a SINGLE streaming AVI container to + * stdout — video and audio INTERLEAVED in one byte stream. ffmpeg reads it + * as ONE input: + * ffmpeg -f avi -i pipe:0 -map 0:v ... -map 0:a ... + * This eliminates the two-live-pipe deadlock: when ffmpeg was given a raw + * video pipe AND a separate audio FIFO it stalled forever probing input 0. + * The AVI muxer writes its header once, then for each ring entry emits a + * '00dc' video chunk followed by a '01wb' audio chunk — frame-coupled by + * construction (both come from the same ring entry in the same iteration). * - * Writes raw UYVY422 frame data to stdout. Terminates on: - * - SIGTERM / SIGINT (clean stop from capture-manager) - * - stdout EPIPE (ffmpeg exited) - * - Slot disappears (bridge stopped) + * 2) RAW MODE (legacy, video-only): if no audio FIFO / avi flag is given, + * fc_pipe writes raw UYVY422 video bytes to stdout as before. * - * Exit codes: - * 0 clean stop (SIGTERM) - * 1 slot not found within wait_ms - * 2 stdout write error (EPIPE) + * The old split video-stdout / audio-FIFO design is REMOVED — it was the + * source of the ffmpeg deadlock. + * + * Usage: fc_pipe [wait_ms] [mode] + * mode: "--avi" | "avi" → single streaming AVI (video+audio) on stdout. + * omitted | "-" → raw UYVY422 video-only on stdout. + * + * Terminates on: SIGTERM/SIGINT, stdout EPIPE (ffmpeg exited), slot gone. */ #include "../src/slot.h" @@ -37,11 +44,12 @@ #include #include #include +#include static volatile int g_stop = 0; static void on_signal(int s) { (void)s; g_stop = 1; } -/* Write all bytes to fd. Returns 0 on success, -1 on EPIPE/error. */ +/* Write all bytes to fd (blocking). Returns 0 on success, -1 on EPIPE/error. */ static int write_all_fd(int fd, const void *buf, size_t len) { const uint8_t *p = (const uint8_t *)buf; size_t off = 0; @@ -49,29 +57,213 @@ static int write_all_fd(int fd, const void *buf, size_t len) { ssize_t n = write(fd, p + off, len - off); if (n > 0) { off += (size_t)n; continue; } if (n < 0 && errno == EINTR) continue; - return -1; /* EPIPE or other fatal error */ + return -1; + } + return 0; +} + +/* ── Little-endian byte emitters into a caller buffer ────────────────────────── */ +static inline void put_u16(uint8_t **pp, uint16_t v) { + uint8_t *p = *pp; p[0] = (uint8_t)(v & 0xff); p[1] = (uint8_t)((v >> 8) & 0xff); *pp = p + 2; +} +static inline void put_u32(uint8_t **pp, uint32_t v) { + uint8_t *p = *pp; + p[0] = (uint8_t)(v & 0xff); p[1] = (uint8_t)((v >> 8) & 0xff); + p[2] = (uint8_t)((v >> 16) & 0xff); p[3] = (uint8_t)((v >> 24) & 0xff); + *pp = p + 4; +} +static inline void put_fourcc(uint8_t **pp, const char *cc) { + uint8_t *p = *pp; p[0] = (uint8_t)cc[0]; p[1] = (uint8_t)cc[1]; + p[2] = (uint8_t)cc[2]; p[3] = (uint8_t)cc[3]; *pp = p + 4; +} + +/* ── Streaming AVI header ───────────────────────────────────────────────────── + * Builds RIFF('AVI ') + LIST('hdrl'){ avih + strl(vids) + strl(auds) } + + * LIST('movi'). For a streaming AVI over a pipe we cannot seek back to patch + * the RIFF and movi sizes, so we set them to 0x7FFFFFFF; ffmpeg's AVI demuxer + * reads the strf headers and the 00dc/01wb chunk stream regardless. The hdrl + * LIST size IS fixed/known, so it is written correctly. dwFlags is 0 — we do + * NOT set AVIF_HASINDEX / AVIF_MUSTUSEINDEX (there is no index in a stream). + * + * Writes the header to *out and returns its length. Buffer must be >= 512. */ +static size_t build_avi_header(uint8_t *out, + uint32_t width, uint32_t height, + uint32_t fps_num, uint32_t fps_den, + uint32_t video_bytes, + uint32_t audio_rate, uint32_t audio_channels, + uint32_t audio_sample_bytes) { + const uint32_t STREAMING = 0x7FFFFFFFu; + + const uint16_t bits_per_sample = (uint16_t)(audio_sample_bytes * 8u); + const uint16_t block_align = (uint16_t)(audio_channels * audio_sample_bytes); + const uint32_t avg_bytes_sec = audio_rate * block_align; + /* dwMicroSecPerFrame = 1e6 * fps_den / fps_num */ + const uint32_t usec_per_frame = + (uint32_t)((1000000.0 * (double)fps_den / (double)fps_num) + 0.5); + + /* Fixed sub-sizes (data bytes only, excluding the 8-byte ckID+ckSize). */ + const uint32_t AVIH_DATA = 56; /* MainAVIHeader */ + const uint32_t STRH_DATA = 56; /* AVISTREAMHEADER */ + const uint32_t BIH_DATA = 40; /* BITMAPINFOHEADER */ + const uint32_t WFX_DATA = 18; /* WAVEFORMATEX (cbSize=0) */ + + /* LIST('strl') sizes = 4 (the 'strl' fourcc) + contained chunks. */ + const uint32_t vstrl_size = 4 + (8 + STRH_DATA) + (8 + BIH_DATA); /* 4+64+48 = 116 */ + const uint32_t astrl_size = 4 + (8 + STRH_DATA) + (8 + WFX_DATA); /* 4+64+26 = 94 */ + /* LIST('hdrl') size = 4 (the 'hdrl' fourcc) + avih chunk + both strl LISTs. */ + const uint32_t hdrl_size = 4 + (8 + AVIH_DATA) + (8 + vstrl_size) + (8 + astrl_size); + + uint8_t *p = out; + + /* RIFF 'AVI ' (size unseekable → streaming sentinel) */ + put_fourcc(&p, "RIFF"); + put_u32(&p, STREAMING); + put_fourcc(&p, "AVI "); + + /* LIST 'hdrl' */ + put_fourcc(&p, "LIST"); + put_u32(&p, hdrl_size); + put_fourcc(&p, "hdrl"); + + /* avih — MainAVIHeader (56 bytes) */ + put_fourcc(&p, "avih"); + put_u32(&p, AVIH_DATA); + put_u32(&p, usec_per_frame); /* dwMicroSecPerFrame */ + put_u32(&p, 0); /* dwMaxBytesPerSec */ + put_u32(&p, 0); /* dwPaddingGranularity */ + put_u32(&p, 0); /* dwFlags — NO index flags */ + put_u32(&p, 0); /* dwTotalFrames (unknown in stream) */ + put_u32(&p, 0); /* dwInitialFrames */ + put_u32(&p, 2); /* dwStreams (video + audio) */ + put_u32(&p, 0); /* dwSuggestedBufferSize */ + put_u32(&p, width); /* dwWidth */ + put_u32(&p, height); /* dwHeight */ + put_u32(&p, 0); put_u32(&p, 0); put_u32(&p, 0); put_u32(&p, 0); /* dwReserved[4] */ + + /* LIST 'strl' — VIDEO */ + put_fourcc(&p, "LIST"); + put_u32(&p, vstrl_size); + put_fourcc(&p, "strl"); + + /* strh — AVISTREAMHEADER 'vids' (56 bytes) */ + put_fourcc(&p, "strh"); + put_u32(&p, STRH_DATA); + put_fourcc(&p, "vids"); /* fccType */ + put_fourcc(&p, "UYVY"); /* fccHandler */ + put_u32(&p, 0); /* dwFlags */ + put_u16(&p, 0); /* wPriority */ + put_u16(&p, 0); /* wLanguage */ + put_u32(&p, 0); /* dwInitialFrames */ + put_u32(&p, fps_den); /* dwScale = 1001 */ + put_u32(&p, fps_num); /* dwRate = 60000 */ + put_u32(&p, 0); /* dwStart */ + put_u32(&p, 0); /* dwLength (unknown) */ + put_u32(&p, video_bytes); /* dwSuggestedBufferSize */ + put_u32(&p, 0xFFFFFFFFu); /* dwQuality (-1 default) */ + put_u32(&p, video_bytes); /* dwSampleSize (fixed for uncompressed) */ + put_u16(&p, 0); put_u16(&p, 0); /* rcFrame.left, top */ + put_u16(&p, (uint16_t)width); /* rcFrame.right */ + put_u16(&p, (uint16_t)height); /* rcFrame.bottom */ + + /* strf — BITMAPINFOHEADER (40 bytes) */ + put_fourcc(&p, "strf"); + put_u32(&p, BIH_DATA); + put_u32(&p, 40); /* biSize */ + put_u32(&p, width); /* biWidth */ + put_u32(&p, height); /* biHeight */ + put_u16(&p, 1); /* biPlanes */ + put_u16(&p, 16); /* biBitCount (UYVY422 = 16bpp) */ + put_fourcc(&p, "UYVY"); /* biCompression fourcc */ + put_u32(&p, video_bytes); /* biSizeImage = W*H*2 */ + put_u32(&p, 0); /* biXPelsPerMeter */ + put_u32(&p, 0); /* biYPelsPerMeter */ + put_u32(&p, 0); /* biClrUsed */ + put_u32(&p, 0); /* biClrImportant */ + + /* LIST 'strl' — AUDIO */ + put_fourcc(&p, "LIST"); + put_u32(&p, astrl_size); + put_fourcc(&p, "strl"); + + /* strh — AVISTREAMHEADER 'auds' (56 bytes) */ + put_fourcc(&p, "strh"); + put_u32(&p, STRH_DATA); + put_fourcc(&p, "auds"); /* fccType */ + put_u32(&p, 0); /* fccHandler (none for PCM) */ + put_u32(&p, 0); /* dwFlags */ + put_u16(&p, 0); /* wPriority */ + put_u16(&p, 0); /* wLanguage */ + put_u32(&p, 0); /* dwInitialFrames */ + put_u32(&p, block_align); /* dwScale = nBlockAlign */ + put_u32(&p, avg_bytes_sec); /* dwRate = nAvgBytesPerSec */ + put_u32(&p, 0); /* dwStart */ + put_u32(&p, 0); /* dwLength (unknown) */ + put_u32(&p, avg_bytes_sec); /* dwSuggestedBufferSize (~1s) */ + put_u32(&p, 0xFFFFFFFFu); /* dwQuality */ + put_u32(&p, block_align); /* dwSampleSize = nBlockAlign */ + put_u16(&p, 0); put_u16(&p, 0); put_u16(&p, 0); put_u16(&p, 0); /* rcFrame */ + + /* strf — WAVEFORMATEX (18 bytes) */ + put_fourcc(&p, "strf"); + put_u32(&p, WFX_DATA); + put_u16(&p, 1); /* wFormatTag = WAVE_FORMAT_PCM */ + put_u16(&p, (uint16_t)audio_channels); /* nChannels */ + put_u32(&p, audio_rate); /* nSamplesPerSec */ + put_u32(&p, avg_bytes_sec); /* nAvgBytesPerSec */ + put_u16(&p, block_align); /* nBlockAlign */ + put_u16(&p, bits_per_sample); /* wBitsPerSample */ + put_u16(&p, 0); /* cbSize */ + + /* LIST 'movi' — frames follow. Size unseekable → streaming sentinel. */ + put_fourcc(&p, "LIST"); + put_u32(&p, STREAMING); + put_fourcc(&p, "movi"); + + return (size_t)(p - out); +} + +/* Write a single AVI chunk: 4-byte fourcc + u32 LE size + data (+ pad byte if + * the size is odd, per the RIFF even-alignment rule). Returns 0 / -1. */ +static int write_avi_chunk(int fd, const char *cc, + const uint8_t *data, uint32_t size) { + uint8_t hdr[8]; + uint8_t *p = hdr; + put_fourcc(&p, cc); + put_u32(&p, size); + if (write_all_fd(fd, hdr, 8) < 0) return -1; + if (size && write_all_fd(fd, data, size) < 0) return -1; + if (size & 1u) { + uint8_t pad = 0; + if (write_all_fd(fd, &pad, 1) < 0) return -1; } return 0; } int main(int argc, char *argv[]) { if (argc < 2) { - fprintf(stderr, "Usage: %s [wait_ms]\n", argv[0]); + fprintf(stderr, "Usage: %s [wait_ms] [--avi|-]\n", argv[0]); return 1; } const char *slot_id = argv[1]; - uint64_t wait_ms = argc >= 3 ? (uint64_t)atoll(argv[2]) : 30000; + uint64_t wait_ms = argc >= 3 ? (uint64_t)atoll(argv[2]) : 30000; + + /* AVI mode is selected by an explicit flag in argv[3]. Anything that is not + * "--avi"/"avi" (including "-" or omitted) → legacy raw video-only mode. */ + int avi_mode = 0; + if (argc >= 4) { + const char *m = argv[3]; + if (strcmp(m, "--avi") == 0 || strcmp(m, "avi") == 0) avi_mode = 1; + } signal(SIGTERM, on_signal); signal(SIGINT, on_signal); - signal(SIGPIPE, SIG_IGN); /* detect EPIPE via write() return value */ + signal(SIGPIPE, SIG_IGN); - /* Set stdout to binary mode — no newline translation */ fcntl(STDOUT_FILENO, F_SETFL, fcntl(STDOUT_FILENO, F_GETFL, 0) & ~O_NONBLOCK); - fprintf(stderr, "[fc_pipe] opening slot '%s' (wait %llums)\n", - slot_id, (unsigned long long)wait_ms); + fprintf(stderr, "[fc_pipe] opening slot '%s' (wait %llums) mode=%s\n", + slot_id, (unsigned long long)wait_ms, avi_mode ? "avi" : "rawvideo"); fc_consumer_t *c = fc_consumer_open(slot_id, wait_ms); if (!c) { @@ -80,54 +272,134 @@ int main(int argc, char *argv[]) { return 1; } - fprintf(stderr, "[fc_pipe] slot open, streaming to stdout\n"); + /* Pull stream format from the slot header for the AVI header. */ + fc_stream_info_t si; + if (fc_consumer_info(c, &si) != 0 || si.width == 0 || si.height == 0) { + fprintf(stderr, "[fc_pipe] failed to read slot stream info\n"); + fc_consumer_close(c); + return 1; + } + if (si.fps_num == 0) { si.fps_num = 60000; si.fps_den = 1001; } + if (si.fps_den == 0) si.fps_den = 1; + if (si.audio_rate == 0) si.audio_rate = 48000; + if (si.audio_channels == 0) si.audio_channels = 2; + if (si.audio_sample_bytes == 0) si.audio_sample_bytes = 2; - uint64_t frames_out = 0; + const uint32_t video_bytes = si.frame_size ? si.frame_size + : si.width * si.height * 2u; + const uint32_t a_blockalign = si.audio_channels * si.audio_sample_bytes; + /* Samples per video frame for synthesized silence when a frame has no audio: + * round(audio_rate * fps_den / fps_num). Bytes = samples * blockalign. */ + uint32_t silence_bytes = 0; + { + double spf = (double)si.audio_rate * (double)si.fps_den / (double)si.fps_num; + uint32_t samples = (uint32_t)(spf + 0.5); + silence_bytes = samples * a_blockalign; + } + uint8_t *silence = NULL; + if (avi_mode && silence_bytes) { + silence = (uint8_t *)calloc(1, silence_bytes); + if (!silence) silence_bytes = 0; + } + + if (avi_mode) { + uint8_t hdr[512]; + size_t hlen = build_avi_header(hdr, si.width, si.height, + si.fps_num, si.fps_den, video_bytes, + si.audio_rate, si.audio_channels, + si.audio_sample_bytes); + if (write_all_fd(STDOUT_FILENO, hdr, hlen) < 0) { + fprintf(stderr, "[fc_pipe] stdout EPIPE writing AVI header\n"); + fc_consumer_close(c); free(silence); + return 1; + } + fprintf(stderr, + "[fc_pipe] slot open, streaming AVI(video+audio) → stdout " + "(%ux%u %u/%u, %ub/frame, audio %uHz %uch s%ule, silence=%uB/frame)\n", + si.width, si.height, si.fps_num, si.fps_den, video_bytes, + si.audio_rate, si.audio_channels, si.audio_sample_bytes * 8u, + silence_bytes); + } else { + fprintf(stderr, "[fc_pipe] slot open, streaming raw video → stdout (%ux%u)\n", + si.width, si.height); + } + + uint64_t frames_out = 0; uint64_t total_dropped = 0; + uint64_t audio_bytes = 0; + uint64_t audio_gaps = 0; while (!g_stop) { fc_frame_ref_t ref; - int rc = fc_consumer_read(c, &ref, 2000 /* 2s timeout */); - + int rc = fc_consumer_read(c, &ref, 2000); if (rc == FC_TIMEOUT) continue; if (rc == FC_ERROR) break; - if (rc == FC_LAPPED) { - /* Copy was torn (writer lapped us mid-read). No valid frame to - * write — log and read again. */ total_dropped = fc_consumer_dropped(c); fprintf(stderr, "[fc_pipe] WARNING: frame lapped mid-read — total dropped: %llu\n", (unsigned long long)total_dropped); continue; } - if (rc == FC_DROPPED) { - /* Skipped one or more older frames, but THIS frame is valid — log - * and write it (do NOT continue). */ total_dropped = fc_consumer_dropped(c); fprintf(stderr, "[fc_pipe] WARNING: consumer fell behind — total dropped: %llu\n", (unsigned long long)total_dropped); } - /* Write frame data to stdout (ref.data is a stable consumer-owned copy) */ - if (write_all_fd(STDOUT_FILENO, ref.data, ref.size) < 0) { - if (!g_stop) - fprintf(stderr, "[fc_pipe] stdout EPIPE — ffmpeg exited\n"); - break; + if (avi_mode) { + /* Interleave THIS frame's video + audio in one stream. Both are + * sourced from the SAME ring entry ⇒ frame-coupled by construction. + * Video first (00dc), then audio (01wb). */ + if (write_avi_chunk(STDOUT_FILENO, "00dc", ref.data, ref.size) < 0) { + if (!g_stop) + fprintf(stderr, "[fc_pipe] stdout EPIPE (video) — ffmpeg exited\n"); + break; + } + if (ref.audio_size > 0 && ref.audio) { + if (write_avi_chunk(STDOUT_FILENO, "01wb", ref.audio, ref.audio_size) < 0) { + if (!g_stop) + fprintf(stderr, "[fc_pipe] stdout EPIPE (audio) — ffmpeg exited\n"); + break; + } + audio_bytes += ref.audio_size; + } else { + /* No embedded audio this frame: emit one frame-interval of + * silence so the audio stream length tracks the video and + * ffmpeg never starves on the audio demuxer. */ + if (silence_bytes && + write_avi_chunk(STDOUT_FILENO, "01wb", silence, silence_bytes) < 0) { + if (!g_stop) + fprintf(stderr, "[fc_pipe] stdout EPIPE (silence) — ffmpeg exited\n"); + break; + } + audio_bytes += silence_bytes; + audio_gaps++; + } + } else { + /* Legacy raw video-only: write the UYVY422 bytes straight to stdout. */ + if (write_all_fd(STDOUT_FILENO, ref.data, ref.size) < 0) { + if (!g_stop) + fprintf(stderr, "[fc_pipe] stdout EPIPE — ffmpeg exited\n"); + break; + } } - frames_out++; - /* Periodic stats to stderr (every 300 frames ≈ 5s at 60fps) */ + frames_out++; if (frames_out % 300 == 0) { - fprintf(stderr, "[fc_pipe] frames=%llu dropped=%llu\n", + fprintf(stderr, "[fc_pipe] frames=%llu dropped=%llu audio_bytes=%llu gaps=%llu\n", (unsigned long long)frames_out, - (unsigned long long)total_dropped); + (unsigned long long)total_dropped, + (unsigned long long)audio_bytes, + (unsigned long long)audio_gaps); } } + free(silence); fc_consumer_close(c); - fprintf(stderr, "[fc_pipe] done frames=%llu dropped=%llu\n", + fprintf(stderr, "[fc_pipe] done frames=%llu dropped=%llu audio_bytes=%llu gaps=%llu\n", (unsigned long long)frames_out, - (unsigned long long)total_dropped); + (unsigned long long)total_dropped, + (unsigned long long)audio_bytes, + (unsigned long long)audio_gaps); return 0; } diff --git a/services/framecache/src/slot.c b/services/framecache/src/slot.c index e22cde8..8a52fdd 100644 --- a/services/framecache/src/slot.c +++ b/services/framecache/src/slot.c @@ -81,6 +81,9 @@ struct fc_slot *fc_slot_create(const char *slot_id, hdr->pixel_format = pixel_format; hdr->frame_size = frame_size; hdr->ring_depth = FC_RING_DEPTH; + hdr->audio_max_bytes = FC_MAX_AUDIO_BYTES; + hdr->audio_rate = FC_AUDIO_RATE; + hdr->audio_channels = FC_AUDIO_CHANNELS; atomic_store(&hdr->write_cursor, 0); atomic_store(&hdr->dropped_frames, 0); strncpy(hdr->source_type, source_type ? source_type : "unknown", @@ -154,13 +157,43 @@ void fc_slot_write_frame(struct fc_slot *s, frame->wall_us = (uint64_t)({ struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000; }); - frame->size = size < hdr->frame_size ? size : hdr->frame_size; + frame->size = size < hdr->frame_size ? size : hdr->frame_size; + frame->audio_size = 0; /* video-only source (e.g. net_ingest): no embedded audio */ memcpy(frame->data, data, frame->size); atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release); sem_post(s->sem); } +/** + * Write one frame-coupled entry: video bytes + this frame's audio bytes. + * audio may be NULL / asize 0 (no embedded audio this frame). Never blocks. + */ +void fc_slot_write_av(struct fc_slot *s, + const uint8_t *video, uint32_t vsize, + const uint8_t *audio, uint32_t asize, + uint64_t pts_us) +{ + fc_header_t *hdr = (fc_header_t *)s->base; + uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed); + fc_frame_t *frame = fc_frame_at(s->base, hdr->frame_size, cur); + + frame->pts_us = pts_us; + frame->wall_us = (uint64_t)({ struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000; }); + frame->size = vsize < hdr->frame_size ? vsize : hdr->frame_size; + memcpy(frame->data, video, frame->size); + + uint32_t acap = hdr->audio_max_bytes; + frame->audio_size = (audio && asize) ? (asize < acap ? asize : acap) : 0; + if (frame->audio_size) + memcpy(fc_frame_audio(frame, hdr->frame_size), audio, frame->audio_size); + + atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release); + sem_post(s->sem); +} + /* ── client-side open / read / close (also used by capture-manager) ── */ /** @@ -183,6 +216,11 @@ struct fc_slot *fc_slot_open(const char *slot_id) if (tmp_hdr.magic != FC_MAGIC) { close(fd); return NULL; } + if (tmp_hdr.version != FC_VERSION) { + fprintf(stderr, "[framecache] slot %s version %u != expected %u — refusing (fail-safe)\n", + slot_id, tmp_hdr.version, FC_VERSION); + close(fd); return NULL; + } size_t total = fc_slot_shm_size(tmp_hdr.frame_size); void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0); diff --git a/services/framecache/src/slot.h b/services/framecache/src/slot.h index a1bfde0..636c7ca 100644 --- a/services/framecache/src/slot.h +++ b/services/framecache/src/slot.h @@ -3,10 +3,33 @@ * * Layout per slot (/dev/shm/framecache/): * [fc_header_t — 4KB aligned] - * [fc_frame_t × ring_depth — each FC_FRAME_HDR_SIZE + frame_size bytes] + * [fc_frame_t × ring_depth — each FC_FRAME_HDR_SIZE + frame_size + FC_MAX_AUDIO_BYTES bytes] * * Writer advances write_cursor atomically and posts the named semaphore. * Each consumer tracks its own read_cursor independently — writer never blocks. + * + * ── FRAME-COUPLED AUDIO (FC_VERSION 2) ────────────────────────────────────── + * Each ring entry now carries the VIDEO frame AND that frame's SDI-embedded + * AUDIO together in ONE transport. The per-entry data region is: + * + * [ video bytes : frame_size ][ audio bytes : up to FC_MAX_AUDIO_BYTES ] + * + * fc_frame_t.size = video byte count (== frame_size for UYVY422) + * fc_frame_t.audio_size = audio byte count for THIS frame (s16le stereo 48k), + * 0 if the signal carries no embedded audio on this frame. + * + * Because both streams live in the same ring slot written in one atomic cursor + * advance and read in one consumer iteration, audio can never drift ahead of + * (or behind) its video frame — the "audio ahead of video" offset is eliminated + * at the root: there is no second independent buffer/transport to race. + * + * Versioning: FC_VERSION bumped 1 → 2. The frame header grew from 24 to 24 + * bytes (the former _pad uint32 is REUSED as audio_size — no size change), but + * the per-entry stride grew by FC_MAX_AUDIO_BYTES and the header gained audio + * descriptor fields. An old (v1) reader mmapping a v2 slot would compute the + * wrong stride and misparse, so readers MUST check version == FC_VERSION and + * fail safe. The writer and all readers share this header; mismatched binaries + * refuse to attach. */ #pragma once @@ -15,12 +38,24 @@ #include #define FC_MAGIC 0x46524D43u /* "FRMC" */ -#define FC_VERSION 1u +#define FC_VERSION 2u /* bumped for frame-coupled audio */ #define FC_RING_DEPTH 120u /* ~2s at 59.94fps */ #define FC_HEADER_SIZE 4096u /* 4KB header block */ -#define FC_FRAME_HDR_SIZE 24u /* pts_us(8) + wall_us(8) + size(4) + pad(4) */ +#define FC_FRAME_HDR_SIZE 24u /* pts_us(8) + wall_us(8) + size(4) + audio_size(4) */ #define FC_MAX_SLOT_ID 64u +/* ── Frame-coupled audio constants ────────────────────────────────────────── + * Audio is always delivered as interleaved s16le stereo at 48 kHz (the SDI + * embedded-audio mapping ffmpeg/raw2bmx expect). Worst-case samples per video + * frame at the lowest broadcast frame rate (~23.976 fps) is ~2002 samples; + * 2048 samples * 2 ch * 2 bytes = 8192 bytes. We reserve 16 KB per entry for + * generous headroom (covers transient over-delivery and any future 4-byte + * sample widths) — ~1.9 MB extra across the 120-deep ring, negligible. */ +#define FC_AUDIO_RATE 48000u +#define FC_AUDIO_CHANNELS 2u +#define FC_AUDIO_SAMPLE_BYTES 2u /* s16le */ +#define FC_MAX_AUDIO_BYTES 16384u + /* Internal handle used by both server (writer) and client (reader) */ struct fc_slot { int shm_fd; @@ -43,23 +78,31 @@ typedef struct { uint32_t fps_num; uint32_t fps_den; uint32_t pixel_format; /* FC_PIX_UYVY422 */ - uint32_t frame_size; /* width * height * 2 */ + uint32_t frame_size; /* width * height * 2 (video bytes per entry) */ uint32_t ring_depth; /* FC_RING_DEPTH */ + uint32_t audio_max_bytes; /* FC_MAX_AUDIO_BYTES — audio region per entry */ + uint32_t audio_rate; /* FC_AUDIO_RATE (48000) */ + uint32_t audio_channels; /* FC_AUDIO_CHANNELS (2) */ uint32_t _reserved; _Atomic uint64_t write_cursor; /* monotonically increasing frame index */ _Atomic uint64_t dropped_frames; char source_type[32]; /* "deltacast" | "blackmagic" | "srt" | "rtmp" */ char slot_id[FC_MAX_SLOT_ID]; - uint8_t _pad[FC_HEADER_SIZE - 144]; + uint8_t _pad[FC_HEADER_SIZE - 156]; } fc_header_t; -/* Per-frame metadata + data (variable length — use fc_frame_at() accessor) */ +/* Per-entry metadata + data (variable length — use fc_frame_at() accessor). + * + * data layout (frame-coupled): [video : size bytes][audio : audio_size bytes] + * The audio region is reserved at audio_max_bytes; audio_size <= audio_max_bytes + * tells the reader how many audio bytes are valid for THIS frame. + */ typedef struct { uint64_t pts_us; uint64_t wall_us; - uint32_t size; - uint32_t _pad; - uint8_t data[]; /* frame_size bytes */ + uint32_t size; /* video bytes */ + uint32_t audio_size; /* audio bytes for this frame (s16le stereo 48k), 0 = none */ + uint8_t data[]; /* [video frame_size][audio audio_max_bytes] */ } fc_frame_t; /* Compile-time size check */ @@ -80,6 +123,11 @@ void fc_slot_close(struct fc_slot *s); void fc_slot_write_frame(struct fc_slot *s, const uint8_t *data, uint32_t size, uint64_t pts_us); +/* Frame-coupled write: video + this frame's audio in one ring entry. */ +void fc_slot_write_av(struct fc_slot *s, + const uint8_t *video, uint32_t vsize, + const uint8_t *audio, uint32_t asize, + uint64_t pts_us); /* Accessor functions — inline now that struct fc_slot is defined above */ static inline fc_header_t *fc_slot_header(struct fc_slot *s) { return (fc_header_t *)s->base; } @@ -87,13 +135,22 @@ static inline const char *fc_slot_id(struct fc_slot *s) { return s->slot_id static inline const char *fc_slot_shm_path(struct fc_slot *s) { return s->shm_path; } static inline const char *fc_slot_sem_name(struct fc_slot *s) { return s->sem_name; } +/** + * Per-entry stride: header + video bytes + reserved audio region. + * Shared by writer and all readers — frame-coupled audio rides in the same + * entry, so the stride MUST include FC_MAX_AUDIO_BYTES on every component. + */ +static inline size_t fc_entry_stride(uint32_t frame_size) { + return (size_t)FC_FRAME_HDR_SIZE + frame_size + (size_t)FC_MAX_AUDIO_BYTES; +} + /** * Compute total shm size for a slot given frame_size. - * = FC_HEADER_SIZE + ring_depth * (FC_FRAME_HDR_SIZE + frame_size) + * = FC_HEADER_SIZE + ring_depth * fc_entry_stride(frame_size) */ static inline size_t fc_slot_shm_size(uint32_t frame_size) { return (size_t)FC_HEADER_SIZE - + (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + frame_size); + + (size_t)FC_RING_DEPTH * fc_entry_stride(frame_size); } /** @@ -102,5 +159,13 @@ static inline size_t fc_slot_shm_size(uint32_t frame_size) { static inline fc_frame_t *fc_frame_at(void *base, uint32_t frame_size, uint64_t idx) { uint8_t *frames = (uint8_t *)base + FC_HEADER_SIZE; return (fc_frame_t *)(frames + (idx % FC_RING_DEPTH) - * ((size_t)FC_FRAME_HDR_SIZE + frame_size)); + * fc_entry_stride(frame_size)); +} + +/** + * Return pointer to the audio region of an entry (immediately after the + * frame_size video bytes within data[]). + */ +static inline uint8_t *fc_frame_audio(fc_frame_t *f, uint32_t frame_size) { + return f->data + frame_size; }