fix(framecache): frame-coupled audio — video+audio in ONE ring entry
Re-engineer the framecache so each video frame carries its own SDI-embedded
audio through ONE transport, eliminating the "audio ahead of video" offset at
the root: there is no longer a second independent audio buffer/FIFO that can
race ahead of video.
slot.h (FC_VERSION 1 -> 2):
- Per ring entry data region is now [video frame_size][audio FC_MAX_AUDIO_BYTES].
- fc_frame_t: the former _pad u32 is REUSED as audio_size (header still 24B).
- Header gains audio_max_bytes / audio_rate / audio_channels (self-describing).
- New fc_entry_stride()/fc_frame_audio() helpers; shm size includes audio.
- Readers/writer check version == FC_VERSION and FAIL SAFE on mismatch so an
old reader against a new writer (or vice-versa) refuses rather than misparses.
slot.c: populate audio header fields; add fc_slot_write_av(); version gate in open.
fc_client.[ch]: fc_frame_ref_t gains audio/audio_size; copy buffer holds
video+audio; both copied from the SAME entry in one read -> frame-locked.
fc_pipe.c: now <slot_id> <wait_ms> <audio_fifo_path>; per ring entry writes
video -> stdout AND that frame's audio -> the audio FIFO IN LOCKSTEP from one
cursor read (no second buffer to drift). Auto-reattaches FIFO on EPIPE.
deltacast-bridge:
- SILENT-AUDIO FIX: audio_extract_init now configures ONLY pAudioChannels[0]
of group 0 as one stereo pair (Mode=STEREO, BufferFormat=AF_16, pData=buf),
leaving pAudioChannels[1] ZEROED, exactly like Deltacast's own FFmpeg fork
(libavdevice/videomaster_common.c init_audio_info). The prior JOINED code
ALSO set channel[1].Mode/BufferFormat=STEREO/AF_16, declaring a second pair
the signal does not carry -> VHD_SlotExtractAudio returned zero samples ->
-91 dB silent audio. DataSize is (re)set to capacity before each extract.
- VHD_SDI_SP_INTERFACE now set from the channel-detected interface
(VHD_SDI_CP_INTERFACE) before StartStream, per the same fork — required for
embedded-audio extraction on JOINED SDI streams.
- fc_writer.[ch]: add fc_writer_write_av(); struct/stride bumped to v2.
- video_thread (framecache path) extracts each frame's audio from the SAME
locked JOINED slot and writes BOTH via fc_writer_write_av. Silence fallback
at the source: a frame with no embedded audio gets one frame-interval of
silence so the audio timeline length always equals the video timeline length.
- The separate audio FIFO + audio_thread + apcm ring are retained ONLY for the
legacy (-DLEGACY_FIFO / framecache-unreachable) fallback; on the primary
framecache path the bridge no longer owns the audio FIFO.
capture-manager.js: deltacast/sdi framecache branch now CREATES the audio FIFO
and passes its path to fc_pipe (argv[3]); ffmpeg keeps two raw inputs
(rawvideo pipe:0 + s16le 48k input 1) but both are now fed frame-locked from
the same ring entry. Stale-audio pre-flush retained as harmless safety.
All changes versioned; mismatched binaries refuse to attach (fail safe).
This commit is contained in:
parent
80d8b15e8c
commit
2f37119379
9 changed files with 491 additions and 146 deletions
|
|
@ -24,12 +24,18 @@
|
|||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
|
||||
/* Re-use the shared memory layout from the framecache service */
|
||||
/* Re-use the shared memory layout from the framecache service.
|
||||
* MUST stay byte-for-byte consistent with services/framecache/src/slot.h —
|
||||
* the writer and all readers share this layout. Bumped to FC_VERSION 2 for
|
||||
* frame-coupled audio (each ring entry carries video + that frame's audio). */
|
||||
#define FC_MAGIC 0x46524D43u
|
||||
#define FC_VERSION 1u
|
||||
#define FC_VERSION 2u
|
||||
#define FC_RING_DEPTH 120u
|
||||
#define FC_HEADER_SIZE 4096u
|
||||
#define FC_FRAME_HDR_SIZE 24u
|
||||
#define FC_MAX_AUDIO_BYTES 16384u /* must equal slot.h FC_MAX_AUDIO_BYTES */
|
||||
#define FC_AUDIO_RATE 48000u
|
||||
#define FC_AUDIO_CHANNELS 2u
|
||||
|
||||
typedef struct {
|
||||
uint32_t magic;
|
||||
|
|
@ -41,22 +47,30 @@ typedef struct {
|
|||
uint32_t pixel_format;
|
||||
uint32_t frame_size;
|
||||
uint32_t ring_depth;
|
||||
uint32_t audio_max_bytes; /* FC_MAX_AUDIO_BYTES */
|
||||
uint32_t audio_rate; /* FC_AUDIO_RATE */
|
||||
uint32_t audio_channels; /* FC_AUDIO_CHANNELS */
|
||||
uint32_t _reserved;
|
||||
_Atomic uint64_t write_cursor;
|
||||
_Atomic uint64_t dropped_frames;
|
||||
char source_type[32];
|
||||
char slot_id[64];
|
||||
uint8_t _pad[FC_HEADER_SIZE - 112];
|
||||
uint8_t _pad[FC_HEADER_SIZE - 124];
|
||||
} fc_hdr_t;
|
||||
|
||||
typedef struct {
|
||||
uint64_t pts_us;
|
||||
uint64_t wall_us;
|
||||
uint32_t size;
|
||||
uint32_t _pad;
|
||||
uint8_t data[];
|
||||
uint32_t size; /* video bytes */
|
||||
uint32_t audio_size; /* audio bytes for this frame (s16le stereo 48k), 0 = none */
|
||||
uint8_t data[]; /* [video frame_size][audio audio_max_bytes] */
|
||||
} fc_frm_t;
|
||||
|
||||
/* Per-entry stride MUST include the audio region. */
|
||||
static inline size_t fc_entry_stride(uint32_t frame_size) {
|
||||
return (size_t)FC_FRAME_HDR_SIZE + frame_size + (size_t)FC_MAX_AUDIO_BYTES;
|
||||
}
|
||||
|
||||
struct fc_writer {
|
||||
void *base;
|
||||
size_t shm_size;
|
||||
|
|
@ -232,8 +246,13 @@ fc_writer_t *fc_writer_open(const char *fc_url,
|
|||
fprintf(stderr, "[fc_writer:%s] bad shm header\n", slot_id);
|
||||
close(fd); return NULL;
|
||||
}
|
||||
if (hdr.version != FC_VERSION) {
|
||||
fprintf(stderr, "[fc_writer:%s] shm version %u != expected %u — refusing (fail-safe)\n",
|
||||
slot_id, hdr.version, FC_VERSION);
|
||||
close(fd); return NULL;
|
||||
}
|
||||
size_t total = (size_t)FC_HEADER_SIZE
|
||||
+ (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + hdr.frame_size);
|
||||
+ (size_t)FC_RING_DEPTH * fc_entry_stride(hdr.frame_size);
|
||||
|
||||
void *base = mmap(NULL, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
||||
if (base == MAP_FAILED) {
|
||||
|
|
@ -268,14 +287,23 @@ fc_writer_t *fc_writer_open(const char *fc_url,
|
|||
void fc_writer_write(fc_writer_t *w,
|
||||
const uint8_t *data, uint32_t size,
|
||||
uint64_t pts_us)
|
||||
{
|
||||
/* Video-only convenience wrapper (no embedded audio this frame). */
|
||||
fc_writer_write_av(w, data, size, NULL, 0, pts_us);
|
||||
}
|
||||
|
||||
void fc_writer_write_av(fc_writer_t *w,
|
||||
const uint8_t *video, uint32_t vsize,
|
||||
const uint8_t *audio, uint32_t asize,
|
||||
uint64_t pts_us)
|
||||
{
|
||||
fc_hdr_t *hdr = (fc_hdr_t *)w->base;
|
||||
uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed);
|
||||
uint64_t idx = cur % FC_RING_DEPTH;
|
||||
|
||||
/* Locate frame in ring */
|
||||
/* Locate entry in ring (stride includes the audio region). */
|
||||
uint8_t *frames = (uint8_t *)w->base + FC_HEADER_SIZE;
|
||||
fc_frm_t *frame = (fc_frm_t *)(frames + idx * ((size_t)FC_FRAME_HDR_SIZE + hdr->frame_size));
|
||||
fc_frm_t *frame = (fc_frm_t *)(frames + idx * fc_entry_stride(hdr->frame_size));
|
||||
|
||||
struct timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
|
|
@ -283,8 +311,16 @@ void fc_writer_write(fc_writer_t *w,
|
|||
|
||||
frame->pts_us = pts_us;
|
||||
frame->wall_us = wall;
|
||||
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
|
||||
memcpy(frame->data, data, frame->size);
|
||||
frame->size = vsize < hdr->frame_size ? vsize : hdr->frame_size;
|
||||
memcpy(frame->data, video, frame->size);
|
||||
|
||||
/* Frame-coupled audio: pack THIS frame's audio immediately after the video
|
||||
* bytes, within the reserved audio region. Clamp to capacity. */
|
||||
uint32_t acap = hdr->audio_max_bytes ? hdr->audio_max_bytes : FC_MAX_AUDIO_BYTES;
|
||||
uint32_t awr = (audio && asize) ? (asize < acap ? asize : acap) : 0;
|
||||
frame->audio_size = awr;
|
||||
if (awr)
|
||||
memcpy(frame->data + hdr->frame_size, audio, awr);
|
||||
|
||||
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
|
||||
sem_post(w->sem);
|
||||
|
|
|
|||
|
|
@ -40,6 +40,17 @@ void fc_writer_write(fc_writer_t *w,
|
|||
const uint8_t *data, uint32_t size,
|
||||
uint64_t pts_us);
|
||||
|
||||
/**
|
||||
* Write one FRAME-COUPLED entry: a video frame plus that frame's SDI-embedded
|
||||
* audio (interleaved s16le stereo 48k) into the SAME ring slot. Both are read
|
||||
* back together by one consumer iteration, so audio cannot drift from video.
|
||||
* audio may be NULL / asize 0 when the frame has no embedded audio.
|
||||
*/
|
||||
void fc_writer_write_av(fc_writer_t *w,
|
||||
const uint8_t *video, uint32_t vsize,
|
||||
const uint8_t *audio, uint32_t asize,
|
||||
uint64_t pts_us);
|
||||
|
||||
/**
|
||||
* Deregister slot from framecache service and unmap shm.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -424,6 +424,7 @@ typedef struct {
|
|||
int enabled; /* 0 = no scratch buffer (extraction disabled) */
|
||||
unsigned char *buf; /* scratch PCM landing buffer */
|
||||
size_t buf_sz;
|
||||
size_t silence_bytes; /* one frame of s16le stereo silence (fallback) */
|
||||
VHD_AUDIOINFO ai;
|
||||
} AudioExtract;
|
||||
|
||||
|
|
@ -435,38 +436,72 @@ static void audio_extract_init(AudioExtract *ax, PortState *ps) {
|
|||
VHD_ASR_48000, 0);
|
||||
ULONG block_size = VHD_GetBlockSize(VHD_AF_16, VHD_AM_STEREO);
|
||||
size_t fb = (size_t)2 /*ch*/ * 2 /*s16*/;
|
||||
ax->buf_sz = ((size_t)max_samples + 64) * (block_size ? block_size : fb);
|
||||
if (block_size == 0) block_size = (ULONG)fb;
|
||||
/* Exact per-frame capacity for ONE stereo pair (s16le, packed L/R), the
|
||||
* size the SDK fills in pAudioChannels[0].pData. + headroom. */
|
||||
ax->buf_sz = ((size_t)max_samples + 64) * (size_t)block_size;
|
||||
if (ax->buf_sz < 65536) ax->buf_sz = 65536;
|
||||
ax->buf = calloc(1, ax->buf_sz);
|
||||
if (!ax->buf) { ax->enabled = 0; return; }
|
||||
|
||||
/* One video-frame worth of s16le stereo silence (samples/frame * 2ch * 2B),
|
||||
* used as the frame-coupled silence fallback when the signal carries no
|
||||
* embedded audio on a frame — keeps the audio timeline length == video. */
|
||||
{
|
||||
int fn = ps->vi.fps_num > 0 ? ps->vi.fps_num : 25;
|
||||
int fd = ps->vi.fps_den > 0 ? ps->vi.fps_den : 1;
|
||||
long spf = ((long)48000 * fd + fn / 2) / fn;
|
||||
if (spf < 1) spf = 1;
|
||||
ax->silence_bytes = (size_t)spf * 2 /*ch*/ * 2 /*s16*/;
|
||||
if (ax->silence_bytes > ax->buf_sz) ax->silence_bytes = ax->buf_sz;
|
||||
}
|
||||
|
||||
memset(&ax->ai, 0, sizeof(ax->ai));
|
||||
/* Configure BOTH channels of the stereo pair (group 0). PCM lands in
|
||||
* pAudioChannels[0].pData (packed L/R s16le). Channel [1] must declare
|
||||
* Mode+BufferFormat so the SDK recognizes the pair (same as the proven
|
||||
* legacy DISJOINED_ANC config). */
|
||||
/* ── Silent-audio FIX ───────────────────────────────────────────────────
|
||||
* Configure ONLY pAudioChannels[0] of group 0 as ONE stereo pair, exactly
|
||||
* like Deltacast's own FFmpeg fork (libavdevice/videomaster_common.c,
|
||||
* init_audio_info): for a stereo pair the SDK packs interleaved L/R s16le
|
||||
* into the EVEN channel's pData; the ODD channel (index 1) must be left
|
||||
* ZEROED. The previous JOINED code ALSO set pAudioChannels[1].Mode/
|
||||
* BufferFormat = STEREO/AF_16, declaring a SECOND stereo pair the signal
|
||||
* does not carry. That mismatch made VHD_SlotExtractAudio return zero
|
||||
* samples → the -91 dB "silent audio" regression. Leaving channel[1] zero
|
||||
* and sizing DataSize = nb_samples * VHD_GetBlockSize(AF_16, STEREO) (set
|
||||
* per call in audio_extract_slot) makes the SDK land real PCM.
|
||||
*
|
||||
* DataSize must be (re)set to the buffer capacity before EACH extract call
|
||||
* because the SDK overwrites it with the number of bytes actually written. */
|
||||
ax->ai.pAudioGroups[0].pAudioChannels[0].Mode = VHD_AM_STEREO;
|
||||
ax->ai.pAudioGroups[0].pAudioChannels[0].BufferFormat = VHD_AF_16;
|
||||
ax->ai.pAudioGroups[0].pAudioChannels[0].pData = ax->buf;
|
||||
ax->ai.pAudioGroups[0].pAudioChannels[1].Mode = VHD_AM_STEREO;
|
||||
ax->ai.pAudioGroups[0].pAudioChannels[1].BufferFormat = VHD_AF_16;
|
||||
/* pAudioChannels[1] intentionally left zeroed (memset above). */
|
||||
ax->enabled = 1;
|
||||
}
|
||||
|
||||
/* Extract this slot's SDI-embedded audio and push it into the ring.
|
||||
* Must be called while `slot` is locked (JOINED slot = same frame as video).
|
||||
* Best-effort: any failure or zero-sample slot simply pushes nothing, and the
|
||||
* audio_thread covers the gap with paced silence. */
|
||||
static void audio_extract_slot(AudioExtract *ax, PortState *ps, HANDLE slot) {
|
||||
if (!ax->enabled) return;
|
||||
/* Extract this slot's SDI-embedded audio into ax->buf and return the byte
|
||||
* count. Must be called while `slot` is locked (JOINED slot = same frame as the
|
||||
* video) so the returned PCM is exactly this video frame's embedded audio.
|
||||
*
|
||||
* Frame-coupled silence fallback: if the slot yields zero samples (no embedded
|
||||
* audio on the signal, or a transient extract miss) we fill ax->buf with one
|
||||
* frame-interval of silence and return that, so EVERY ring entry carries one
|
||||
* frame of audio and the audio timeline length always equals the video timeline
|
||||
* length (drift-free). On real embedded PCM we flag have_embedded for logging.
|
||||
*
|
||||
* Returns the number of valid PCM bytes in ax->buf (>= 0). */
|
||||
static size_t audio_extract_slot(AudioExtract *ax, PortState *ps, HANDLE slot) {
|
||||
if (!ax->enabled) return 0;
|
||||
ax->ai.pAudioGroups[0].pAudioChannels[0].DataSize = (ULONG)ax->buf_sz;
|
||||
if (VHD_SlotExtractAudio(slot, &ax->ai) == VHDERR_NOERROR) {
|
||||
ULONG sz = ax->ai.pAudioGroups[0].pAudioChannels[0].DataSize;
|
||||
if (sz > 0 && (size_t)sz <= ax->buf_sz) {
|
||||
atomic_store_explicit(&ps->apcm.have_embedded, 1, memory_order_relaxed);
|
||||
apcm_push(&ps->apcm, ax->buf, (size_t)sz);
|
||||
return (size_t)sz;
|
||||
}
|
||||
}
|
||||
/* No embedded audio this frame → one frame-interval of silence. */
|
||||
if (ax->silence_bytes) memset(ax->buf, 0, ax->silence_bytes);
|
||||
return ax->silence_bytes;
|
||||
}
|
||||
|
||||
static void audio_extract_free(AudioExtract *ax) {
|
||||
|
|
@ -506,13 +541,16 @@ static void *video_thread(void *arg) {
|
|||
HANDLE slot = NULL;
|
||||
ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot);
|
||||
if (r == VHDERR_NOERROR) {
|
||||
/* ── JOINED: extract this frame's embedded audio FIRST ──────
|
||||
* Same locked slot as the video below ⇒ the audio is the SDI-
|
||||
* embedded audio of this exact frame ⇒ inherently sync'd. Push
|
||||
* it to the ring for audio_thread to drain to the FIFO. Done
|
||||
* before the video packing check so audio is never dropped on a
|
||||
* (rare) video packing mismatch. */
|
||||
audio_extract_slot(&ax, ps, slot);
|
||||
/* ── JOINED frame-coupled capture ───────────────────────────
|
||||
* Extract this frame's SDI-embedded audio from the SAME locked
|
||||
* slot as the video, then write BOTH into ONE framecache ring
|
||||
* entry via fc_writer_write_av. Because audio + video share one
|
||||
* ring slot advanced by one atomic cursor step, and are read
|
||||
* back together by one consumer (fc_pipe) iteration, the audio
|
||||
* can never drift from its video frame — there is no separate
|
||||
* audio transport/buffer. (audio_extract_slot returns one
|
||||
* frame of silence when the signal carries no embedded audio.)*/
|
||||
size_t asz = audio_extract_slot(&ax, ps, slot);
|
||||
|
||||
BYTE *buf = NULL;
|
||||
ULONG sz = 0;
|
||||
|
|
@ -532,7 +570,9 @@ static void *video_thread(void *arg) {
|
|||
* (uint64_t)ps->vi.fps_den
|
||||
/ (uint64_t)ps->vi.fps_num;
|
||||
}
|
||||
fc_writer_write(ps->fc_writer, buf, (uint32_t)sz, pts_us);
|
||||
fc_writer_write_av(ps->fc_writer, buf, (uint32_t)sz,
|
||||
ax.enabled ? ax.buf : NULL,
|
||||
(uint32_t)asz, pts_us);
|
||||
frame_seq++;
|
||||
}
|
||||
VHD_UnlockSlotHandle(slot);
|
||||
|
|
@ -581,9 +621,14 @@ static void *video_thread(void *arg) {
|
|||
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
|
||||
ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot);
|
||||
if (r == VHDERR_NOERROR) {
|
||||
/* JOINED: extract this frame's embedded audio on the SAME slot
|
||||
* (before the video packing check so audio is never dropped). */
|
||||
audio_extract_slot(&ax, ps, slot);
|
||||
/* LEGACY_FIFO path: video → video FIFO, audio → apcm ring →
|
||||
* audio_thread → audio FIFO (the old two-transport scheme).
|
||||
* Extract this frame's embedded audio on the SAME slot and push
|
||||
* it to the ring for audio_thread. (Frame coupling is only
|
||||
* available on the framecache path above; legacy keeps the
|
||||
* separate-FIFO behavior for nodes without framecache.) */
|
||||
size_t asz = audio_extract_slot(&ax, ps, slot);
|
||||
if (asz > 0) apcm_push(&ps->apcm, ax.buf, asz);
|
||||
|
||||
BYTE *buf = NULL;
|
||||
ULONG sz = 0;
|
||||
|
|
@ -836,32 +881,39 @@ int main(int argc, char *argv[]) {
|
|||
"deltacast-%u-%u", device_id, ports[pi]);
|
||||
strncpy(p->fc_url, fc_url, sizeof(p->fc_url) - 1);
|
||||
|
||||
/* Create audio FIFO (always needed — audio stays in FIFO for now). */
|
||||
if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) {
|
||||
fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno));
|
||||
continue;
|
||||
}
|
||||
|
||||
#ifndef LEGACY_FIFO
|
||||
/* Open framecache slot for video frames.
|
||||
* Fall back to FIFO if framecache is unreachable. */
|
||||
/* ── Primary: frame-coupled framecache path ─────────────────────────
|
||||
* Open the framecache slot for video + this frame's embedded audio
|
||||
* (both packed into one ring entry by fc_writer_write_av). In this path
|
||||
* the bridge does NOT create or write the audio FIFO — capture-manager
|
||||
* creates it and fc_pipe writes it, sourced from the SAME ring entry as
|
||||
* the video so audio is frame-locked. Fall back to the legacy two-FIFO
|
||||
* scheme only if framecache is unreachable. */
|
||||
p->fc_writer = fc_writer_open(p->fc_url, p->slot_id,
|
||||
(uint32_t)p->vi.width, (uint32_t)p->vi.height,
|
||||
(uint32_t)p->vi.fps_num, (uint32_t)p->vi.fps_den);
|
||||
if (!p->fc_writer) {
|
||||
fprintf(stderr, "[port:%u] framecache unavailable — creating video FIFO fallback\n",
|
||||
fprintf(stderr, "[port:%u] framecache unavailable — falling back to legacy video+audio FIFOs\n",
|
||||
ports[pi]);
|
||||
if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) {
|
||||
fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno));
|
||||
continue;
|
||||
}
|
||||
if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) {
|
||||
fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
#else
|
||||
/* Legacy: always use video FIFO */
|
||||
/* Legacy: always use video + audio FIFOs (two transports). */
|
||||
if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) {
|
||||
fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno));
|
||||
continue;
|
||||
}
|
||||
if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) {
|
||||
fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno));
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Open the RX stream in JOINED processing mode.
|
||||
|
|
@ -887,10 +939,25 @@ int main(int argc, char *argv[]) {
|
|||
VHD_SetStreamProperty(vs, VHD_SDI_SP_CLOCK_SYSTEM, p->clock_div);
|
||||
VHD_SetStreamProperty(vs, VHD_CORE_SP_TRANSFER_SCHEME, VHD_TRANSFER_SLAVED);
|
||||
VHD_SetStreamProperty(vs, VHD_CORE_SP_BUFFERQUEUE_DEPTH, 8);
|
||||
/* ── SDI interface propagation (required for embedded-audio extract) ──
|
||||
* Per Deltacast's FFmpeg fork (libavdevice/videomaster_common.c,
|
||||
* ff_videomaster_start_stream): a JOINED SDI stream must have
|
||||
* VHD_SDI_SP_INTERFACE set to the DETECTED cable interface before
|
||||
* StartStream, otherwise VHD_SlotExtractAudio on the resulting slots
|
||||
* returns zero samples. We read the channel's detected interface and
|
||||
* set it on the stream. (Prefer the channel-detected value; fall back
|
||||
* to the stream's current value if the channel property is unavailable
|
||||
* on this SDK build.) */
|
||||
ULONG iface = 0;
|
||||
if (VHD_GetStreamProperty(vs, VHD_SDI_SP_INTERFACE, &iface) == VHDERR_NOERROR) {
|
||||
if (VHD_GetChannelProperty(board, VHD_RX_CHANNEL, ports[pi],
|
||||
VHD_SDI_CP_INTERFACE, &iface) == VHDERR_NOERROR) {
|
||||
VHD_SetStreamProperty(vs, VHD_SDI_SP_INTERFACE, iface);
|
||||
fprintf(stderr, "[board] port %u explicitly set SDI Interface to %lu\n", ports[pi], iface);
|
||||
fprintf(stderr, "[board] port %u set SDI Interface (channel-detected) to %lu\n",
|
||||
ports[pi], iface);
|
||||
} else if (VHD_GetStreamProperty(vs, VHD_SDI_SP_INTERFACE, &iface) == VHDERR_NOERROR) {
|
||||
VHD_SetStreamProperty(vs, VHD_SDI_SP_INTERFACE, iface);
|
||||
fprintf(stderr, "[board] port %u set SDI Interface (stream default) to %lu\n",
|
||||
ports[pi], iface);
|
||||
}
|
||||
/* Pin to tightly-packed 8-bit UYVY. Relying on SDK default caused
|
||||
* the board to deliver frames whose size != width*height*2,
|
||||
|
|
@ -924,24 +991,39 @@ int main(int argc, char *argv[]) {
|
|||
p->slot_id);
|
||||
fflush(stderr);
|
||||
|
||||
/* Allocate the embedded-audio hand-off ring BEFORE launching threads.
|
||||
* Producer = video_thread (JOINED slot extract), consumer = audio_thread
|
||||
* (FIFO sink). If allocation fails the bridge still runs video + paced
|
||||
* silence audio (apcm_push/pop are no-ops on a NULL/empty ring). */
|
||||
p->apcm.buf = calloc(1, APCM_RING_BYTES);
|
||||
atomic_store(&p->apcm.w, 0);
|
||||
atomic_store(&p->apcm.r, 0);
|
||||
atomic_store(&p->apcm.have_embedded, 0);
|
||||
if (!p->apcm.buf)
|
||||
fprintf(stderr, "[port:%u] WARN: apcm ring alloc failed — audio will be silence\n",
|
||||
/* ── Audio transport selection ──────────────────────────────────────
|
||||
* Frame-coupled path (fc_writer active): audio rides in the framecache
|
||||
* ring entry with its video frame; fc_pipe delivers it to the audio
|
||||
* FIFO frame-locked. NO apcm ring, NO audio_thread, NO bridge-owned
|
||||
* audio FIFO — there is no second transport to drift.
|
||||
*
|
||||
* Legacy path (fc_writer NULL, framecache unreachable, or -DLEGACY_FIFO):
|
||||
* keep the apcm ring + audio_thread that drains it to the separate audio
|
||||
* FIFO (the old two-transport scheme). */
|
||||
int legacy_audio;
|
||||
#ifdef LEGACY_FIFO
|
||||
legacy_audio = 1;
|
||||
#else
|
||||
legacy_audio = (p->fc_writer == NULL);
|
||||
#endif
|
||||
if (legacy_audio) {
|
||||
p->apcm.buf = calloc(1, APCM_RING_BYTES);
|
||||
atomic_store(&p->apcm.w, 0);
|
||||
atomic_store(&p->apcm.r, 0);
|
||||
atomic_store(&p->apcm.have_embedded, 0);
|
||||
if (!p->apcm.buf)
|
||||
fprintf(stderr, "[port:%u] WARN: apcm ring alloc failed — audio will be silence\n",
|
||||
ports[pi]);
|
||||
/* Launch audio thread (FIFO sink: drains apcm ring → audio FIFO). */
|
||||
pthread_create(&p->audio_tid, NULL, audio_thread, p);
|
||||
} else {
|
||||
fprintf(stderr, "[port:%u] frame-coupled audio (framecache ring) — no separate audio FIFO/thread\n",
|
||||
ports[pi]);
|
||||
}
|
||||
|
||||
/* Launch audio thread (FIFO sink: drains apcm ring → audio FIFO,
|
||||
* blocks until reader connects; paced silence when ring empty). */
|
||||
pthread_create(&p->audio_tid, NULL, audio_thread, p);
|
||||
|
||||
/* Launch video thread (JOINED: video → framecache/FIFO AND embedded
|
||||
* audio → apcm ring; blocks until a video reader connects in FIFO mode). */
|
||||
/* Launch video thread. fc_writer path: video + this frame's embedded
|
||||
* audio → ONE framecache ring entry (fc_writer_write_av). Legacy path:
|
||||
* video → video FIFO, audio → apcm ring → audio_thread → audio FIFO. */
|
||||
pthread_create(&p->video_tid, NULL, video_thread, p);
|
||||
|
||||
active_count++;
|
||||
|
|
|
|||
|
|
@ -765,27 +765,30 @@ class CaptureManager {
|
|||
? parseInt(port, 10) : idx)
|
||||
: idx;
|
||||
|
||||
let audioFifoPath;
|
||||
let audioFifoDir, audioFifoPath;
|
||||
if (sourceType === 'deltacast') {
|
||||
const DC_PIPE_DIR = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast';
|
||||
audioFifoPath = `${DC_PIPE_DIR}/audio-${portIdx}.fifo`;
|
||||
audioFifoDir = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast';
|
||||
} else {
|
||||
const DL_AUDIO_DIR = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink';
|
||||
audioFifoPath = `${DL_AUDIO_DIR}/audio-${portIdx}.fifo`;
|
||||
audioFifoDir = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink';
|
||||
}
|
||||
audioFifoPath = `${audioFifoDir}/audio-${portIdx}.fifo`;
|
||||
|
||||
// Wait up to 30s for the audio FIFO to exist (bridge starts asynchronously)
|
||||
const { existsSync: _exists } = await import('node:fs');
|
||||
const deadline = Date.now() + WAIT_MS;
|
||||
while (Date.now() < deadline) {
|
||||
if (_exists(audioFifoPath)) break;
|
||||
await new Promise(r => setTimeout(r, 500));
|
||||
}
|
||||
// ── Frame-coupled audio (FC_VERSION 2) ────────────────────────────────
|
||||
// The bridge no longer owns the audio FIFO: audio rides in the framecache
|
||||
// ring entry with its video frame. capture-manager CREATES the audio FIFO
|
||||
// and fc_pipe WRITES it, sourced from the SAME ring entry as the video, so
|
||||
// audio is frame-locked (no second transport, no drift). We mkfifo here
|
||||
// (idempotent) so fc_pipe and ffmpeg have a stable rendezvous path.
|
||||
const { existsSync: _exists, mkdirSync: _mkdir } = await import('node:fs');
|
||||
try { _mkdir(audioFifoDir, { recursive: true }); } catch { /* exists */ }
|
||||
if (!_exists(audioFifoPath)) {
|
||||
throw new Error(
|
||||
`audio FIFO not ready after ${WAIT_MS / 1000}s: ${audioFifoPath} ` +
|
||||
`— is the bridge running?`
|
||||
);
|
||||
try {
|
||||
execFileSync('mkfifo', ['-m', '0666', audioFifoPath]);
|
||||
} catch (e) {
|
||||
if (!_exists(audioFifoPath)) {
|
||||
throw new Error(`failed to create audio FIFO ${audioFifoPath}: ${e.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Video dimensions and fps come from env vars injected by node-agent
|
||||
|
|
@ -794,12 +797,16 @@ class CaptureManager {
|
|||
const fcFps = process.env.DELTACAST_FRAMERATE || '60000/1001';
|
||||
const fcInterlaced = process.env.DELTACAST_INTERLACED === '1';
|
||||
|
||||
console.log(`[framecache] slot=${slotId} size=${fcSize} fps=${fcFps} audio=${audioFifoPath}`);
|
||||
console.log(`[framecache] slot=${slotId} size=${fcSize} fps=${fcFps} audio=${audioFifoPath} (frame-coupled)`);
|
||||
|
||||
// Spawn fc_pipe: opens the framecache slot with its own read cursor and
|
||||
// streams raw UYVY422 frames to stdout. ffmpeg reads from the pipe as
|
||||
// rawvideo input 0; audio FIFO is input 1 (same as before).
|
||||
const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], {
|
||||
// Spawn fc_pipe: opens the framecache slot with its own read cursor and,
|
||||
// for each ring entry, writes the VIDEO bytes to stdout (ffmpeg rawvideo
|
||||
// input 0) AND that frame's AUDIO bytes to the audio FIFO (ffmpeg s16le
|
||||
// input 1) IN LOCKSTEP from one cursor read. Because both come from the
|
||||
// SAME ring entry in the same iteration, audio can never drift from video
|
||||
// — the "audio ahead of video" offset is eliminated at the root.
|
||||
// argv: <slot_id> <wait_ms> <audio_fifo_path>
|
||||
const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS), audioFifoPath], {
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
});
|
||||
// Pause until piped to ffmpeg (avoids OS pipe-buffer fill stall — see
|
||||
|
|
@ -823,7 +830,12 @@ class CaptureManager {
|
|||
'-i', 'pipe:0',
|
||||
// Audio FIFO → ffmpeg input 1.
|
||||
//
|
||||
// Do NOT use -use_wallclock_as_timestamps here. The bridge feeds raw
|
||||
// fc_pipe writes this FIFO from the SAME framecache ring entry as the
|
||||
// video it sends to input 0, one entry per loop iteration — so the
|
||||
// audio is exactly each video frame's SDI-embedded audio, delivered
|
||||
// frame-locked. There is no independent audio buffer to race ahead.
|
||||
//
|
||||
// Do NOT use -use_wallclock_as_timestamps here. fc_pipe feeds raw
|
||||
// s16le at a steady 48000 samples/s off the SAME SDI clock as video,
|
||||
// so letting ffmpeg derive audio PTS from the sample count keeps audio
|
||||
// and video in one clock domain (no drift). Wallclock stamps audio by
|
||||
|
|
@ -1269,12 +1281,15 @@ exit "$BMXRC"
|
|||
console.log(`[capture] pre-roll complete.`);
|
||||
}
|
||||
|
||||
// FLUSH STALE AUDIO immediately before ffmpeg opens the FIFO. During standby
|
||||
// the bridge keeps writing audio into the named FIFO while the idle-preview
|
||||
// consumes only video, so the FIFO holds up to a full pipe buffer (~0.5s) of
|
||||
// stale audio. Draining it here (right before the record ffmpeg attaches)
|
||||
// makes audio start at the live edge, time-aligned with the first video
|
||||
// frame — eliminating the leading silence + the ~0.5% audio-length surplus.
|
||||
// FLUSH STALE AUDIO immediately before ffmpeg opens the FIFO.
|
||||
//
|
||||
// With frame-coupled audio (FC_VERSION 2) fc_pipe only writes the audio FIFO
|
||||
// once a reader attaches, and each audio chunk is bound to its video frame in
|
||||
// the same ring entry — so there is normally NO stale standby backlog. This
|
||||
// drain is retained as a harmless belt-and-suspenders: it reads whatever (if
|
||||
// anything) is buffered and returns immediately on EAGAIN, guaranteeing the
|
||||
// record ffmpeg attaches at the live edge. fc_pipe reattaches automatically
|
||||
// if it briefly saw this drain as its reader.
|
||||
if (audioFifo) {
|
||||
try {
|
||||
const fsSync = await import('node:fs');
|
||||
|
|
|
|||
|
|
@ -26,8 +26,9 @@ struct fc_consumer {
|
|||
sem_t *sem;
|
||||
uint64_t read_cursor; /* consumer's own position in the ring */
|
||||
uint64_t local_dropped; /* frames skipped by this consumer */
|
||||
uint8_t *copy_buf; /* consumer-owned frame copy buffer (frame_size bytes) */
|
||||
uint32_t frame_size; /* cached from header */
|
||||
uint8_t *copy_buf; /* consumer-owned copy buffer: [video frame_size][audio audio_max] */
|
||||
uint32_t frame_size; /* cached from header (video bytes) */
|
||||
uint32_t audio_max; /* cached from header (audio region capacity) */
|
||||
char slot_id[FC_MAX_SLOT_ID];
|
||||
};
|
||||
|
||||
|
|
@ -59,6 +60,13 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
|
|||
if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) {
|
||||
close(fd); return NULL;
|
||||
}
|
||||
/* Version gate: an old reader against a new writer (or vice-versa) computes
|
||||
* the wrong per-entry stride and would misparse. Fail safe. */
|
||||
if (hdr.version != FC_VERSION) {
|
||||
fprintf(stderr, "[fc_client] slot %s version %u != expected %u — refusing\n",
|
||||
slot_id, hdr.version, FC_VERSION);
|
||||
close(fd); return NULL;
|
||||
}
|
||||
size_t total = fc_slot_shm_size(hdr.frame_size);
|
||||
|
||||
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
|
||||
|
|
@ -72,8 +80,9 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
|
|||
|
||||
/* Consumer-owned copy buffer — fc_consumer_read copies the frame here and
|
||||
* re-validates the cursor afterward, so a writer lapping a slow consumer
|
||||
* cannot corrupt the frame the caller is using. */
|
||||
c->copy_buf = malloc(hdr.frame_size);
|
||||
* cannot corrupt the frame the caller is using. Sized for video + audio so
|
||||
* the frame-coupled audio is copied atomically with its video. */
|
||||
c->copy_buf = malloc((size_t)hdr.frame_size + hdr.audio_max_bytes);
|
||||
if (!c->copy_buf) {
|
||||
free(c); sem_close(sem); munmap(base, total); close(fd); return NULL;
|
||||
}
|
||||
|
|
@ -83,6 +92,7 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
|
|||
c->shm_size = total;
|
||||
c->sem = sem;
|
||||
c->frame_size = hdr.frame_size;
|
||||
c->audio_max = hdr.audio_max_bytes;
|
||||
/* Start reading from the current write position so we don't replay old frames */
|
||||
c->read_cursor = atomic_load_explicit(
|
||||
&((fc_header_t *)base)->write_cursor, memory_order_acquire);
|
||||
|
|
@ -157,13 +167,22 @@ int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms)
|
|||
/* Woken — loop to re-evaluate cursor-diff. */
|
||||
}
|
||||
|
||||
/* ── Copy the frame into the consumer-owned buffer ──────────────────── */
|
||||
/* ── Copy the entry (video + this frame's audio) into the owned buffer ─
|
||||
* Both are copied from the SAME ring entry in the SAME iteration, so the
|
||||
* audio handed to the caller is exactly this video frame's embedded audio —
|
||||
* frame-coupled, no second buffer to drift. copy_buf layout mirrors the
|
||||
* shm entry: [video frame_size][audio audio_max]. */
|
||||
fc_frame_t *frame = fc_frame_at(c->base, hdr->frame_size, c->read_cursor);
|
||||
uint32_t fsz = frame->size;
|
||||
if (fsz > hdr->frame_size) fsz = hdr->frame_size;
|
||||
uint32_t asz = frame->audio_size;
|
||||
if (asz > c->audio_max) asz = c->audio_max;
|
||||
uint64_t pts = frame->pts_us;
|
||||
uint64_t wall = frame->wall_us;
|
||||
memcpy(c->copy_buf, frame->data, fsz);
|
||||
if (asz)
|
||||
memcpy(c->copy_buf + c->frame_size,
|
||||
fc_frame_audio(frame, hdr->frame_size), asz);
|
||||
|
||||
/* ── Re-validate AFTER the copy ─────────────────────────────────────
|
||||
* If the writer lapped us during the copy (overwrote this slot), the copy
|
||||
|
|
@ -178,11 +197,13 @@ int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms)
|
|||
}
|
||||
|
||||
/* Copy is valid. */
|
||||
ref->data = c->copy_buf;
|
||||
ref->size = fsz;
|
||||
ref->pts_us = pts;
|
||||
ref->wall_us = wall;
|
||||
ref->seq = c->read_cursor;
|
||||
ref->data = c->copy_buf;
|
||||
ref->size = fsz;
|
||||
ref->audio = asz ? (c->copy_buf + c->frame_size) : NULL;
|
||||
ref->audio_size = asz;
|
||||
ref->pts_us = pts;
|
||||
ref->wall_us = wall;
|
||||
ref->seq = c->read_cursor;
|
||||
|
||||
c->read_cursor++;
|
||||
return dropped ? FC_DROPPED : FC_OK;
|
||||
|
|
|
|||
|
|
@ -47,7 +47,11 @@ typedef struct {
|
|||
* consumer's own buffer and re-validate the cursor
|
||||
* AFTER the copy, so a lapped frame is discarded
|
||||
* rather than streamed corrupt.) */
|
||||
uint32_t size; /* bytes */
|
||||
uint32_t size; /* video bytes */
|
||||
const uint8_t *audio; /* pointer to a CONSUMER-OWNED copy of this frame's
|
||||
* embedded audio (s16le stereo 48k), stable until
|
||||
* the next fc_consumer_read(). NULL if audio_size 0. */
|
||||
uint32_t audio_size; /* audio bytes for this frame (0 = none) */
|
||||
uint64_t pts_us; /* presentation timestamp (microseconds) */
|
||||
uint64_t wall_us; /* wall clock at write time (microseconds) */
|
||||
uint64_t seq; /* write_cursor value for this frame */
|
||||
|
|
|
|||
|
|
@ -1,20 +1,46 @@
|
|||
/**
|
||||
* fc_pipe.c — Framecache slot → stdout pipe adapter.
|
||||
* fc_pipe.c — Framecache slot → stdout(video) + FIFO(audio) pipe adapter.
|
||||
*
|
||||
* Opens a framecache slot as a consumer and writes raw video frames to
|
||||
* stdout in a continuous stream. capture-manager.js spawns this process
|
||||
* and feeds its stdout to ffmpeg as a rawvideo pipe input — identical to
|
||||
* the way DeckLink bridges currently pipe raw frames.
|
||||
* FRAME-COUPLED AUDIO (FC_VERSION 2):
|
||||
* Each framecache ring entry carries the VIDEO frame AND that frame's
|
||||
* SDI-embedded AUDIO together. fc_pipe reads ONE entry per loop iteration
|
||||
* and writes:
|
||||
* - the video bytes → stdout (ffmpeg rawvideo input 0 / pipe:0)
|
||||
* - the audio bytes → audio FIFO (ffmpeg s16le input 1)
|
||||
* IN LOCKSTEP from the SAME cursor read. Because both come out of the same
|
||||
* ring entry in the same iteration, audio can never drift ahead of (or
|
||||
* behind) its video frame — there is no second independent buffer/transport
|
||||
* to race. This eliminates the constant "audio ahead of video" offset at the
|
||||
* root.
|
||||
*
|
||||
* capture-manager.js spawns this process, pipes its stdout to ffmpeg input 0,
|
||||
* and passes the audio FIFO path it created as argv[3]; ffmpeg reads that FIFO
|
||||
* as input 1.
|
||||
*
|
||||
* Each consumer instance has its own independent read cursor, so multiple
|
||||
* fc_pipe processes reading from the same slot never interfere with each
|
||||
* other. This is how growing + proxy + HLS all read the same SDI signal
|
||||
* simultaneously.
|
||||
* fc_pipe processes reading from the same slot never interfere with each other
|
||||
* (growing + proxy + HLS all read the same SDI signal simultaneously).
|
||||
*
|
||||
* Usage:
|
||||
* fc_pipe <slot_id> [wait_ms]
|
||||
* fc_pipe <slot_id> [wait_ms] [audio_fifo_path]
|
||||
*
|
||||
* Writes raw UYVY422 frame data to stdout. Terminates on:
|
||||
* audio_fifo_path optional: if omitted (or "-"), audio is NOT emitted and
|
||||
* fc_pipe behaves video-only (legacy / network video-only sources).
|
||||
*
|
||||
* Audio FIFO lifecycle:
|
||||
* - Opened O_WRONLY|O_NONBLOCK with retry; until a reader (ffmpeg input 1)
|
||||
* attaches the open returns ENXIO. We keep delivering VIDEO meanwhile so
|
||||
* ffmpeg makes progress and opens the FIFO. Audio for those very first
|
||||
* pre-roll frames is dropped (sub-frame startup gap inside pre-roll).
|
||||
* - Once open we switch the fd to blocking and write each frame's audio with
|
||||
* the same write_all() as video, keeping them coupled. If the signal has
|
||||
* no embedded audio on a frame (audio_size 0) we synthesize exactly that
|
||||
* frame's worth of silence so ffmpeg input 1 never starves and the audio
|
||||
* timeline length always equals the video timeline length (no drift).
|
||||
* - On audio FIFO EPIPE (ffmpeg input 1 reader died, e.g. session restart),
|
||||
* we close and re-open the FIFO; VIDEO delivery is unaffected.
|
||||
*
|
||||
* Terminates on:
|
||||
* - SIGTERM / SIGINT (clean stop from capture-manager)
|
||||
* - stdout EPIPE (ffmpeg exited)
|
||||
* - Slot disappears (bridge stopped)
|
||||
|
|
@ -41,7 +67,7 @@
|
|||
static volatile int g_stop = 0;
|
||||
static void on_signal(int s) { (void)s; g_stop = 1; }
|
||||
|
||||
/* Write all bytes to fd. Returns 0 on success, -1 on EPIPE/error. */
|
||||
/* Write all bytes to fd (blocking). Returns 0 on success, -1 on EPIPE/error. */
|
||||
static int write_all_fd(int fd, const void *buf, size_t len) {
|
||||
const uint8_t *p = (const uint8_t *)buf;
|
||||
size_t off = 0;
|
||||
|
|
@ -56,22 +82,24 @@ static int write_all_fd(int fd, const void *buf, size_t len) {
|
|||
|
||||
int main(int argc, char *argv[]) {
|
||||
if (argc < 2) {
|
||||
fprintf(stderr, "Usage: %s <slot_id> [wait_ms]\n", argv[0]);
|
||||
fprintf(stderr, "Usage: %s <slot_id> [wait_ms] [audio_fifo_path]\n", argv[0]);
|
||||
return 1;
|
||||
}
|
||||
const char *slot_id = argv[1];
|
||||
uint64_t wait_ms = argc >= 3 ? (uint64_t)atoll(argv[2]) : 30000;
|
||||
const char *slot_id = argv[1];
|
||||
uint64_t wait_ms = argc >= 3 ? (uint64_t)atoll(argv[2]) : 30000;
|
||||
const char *audio_fifo = (argc >= 4 && strcmp(argv[3], "-") != 0) ? argv[3] : NULL;
|
||||
|
||||
signal(SIGTERM, on_signal);
|
||||
signal(SIGINT, on_signal);
|
||||
signal(SIGPIPE, SIG_IGN); /* detect EPIPE via write() return value */
|
||||
|
||||
/* Set stdout to binary mode — no newline translation */
|
||||
/* Set stdout to binary/blocking mode — no newline translation */
|
||||
fcntl(STDOUT_FILENO, F_SETFL,
|
||||
fcntl(STDOUT_FILENO, F_GETFL, 0) & ~O_NONBLOCK);
|
||||
|
||||
fprintf(stderr, "[fc_pipe] opening slot '%s' (wait %llums)\n",
|
||||
slot_id, (unsigned long long)wait_ms);
|
||||
fprintf(stderr, "[fc_pipe] opening slot '%s' (wait %llums) audio_fifo=%s\n",
|
||||
slot_id, (unsigned long long)wait_ms,
|
||||
audio_fifo ? audio_fifo : "(none)");
|
||||
|
||||
fc_consumer_t *c = fc_consumer_open(slot_id, wait_ms);
|
||||
if (!c) {
|
||||
|
|
@ -80,10 +108,16 @@ int main(int argc, char *argv[]) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
fprintf(stderr, "[fc_pipe] slot open, streaming to stdout\n");
|
||||
fprintf(stderr, "[fc_pipe] slot open, streaming video→stdout%s\n",
|
||||
audio_fifo ? " + audio→FIFO (frame-coupled)" : "");
|
||||
|
||||
uint64_t frames_out = 0;
|
||||
int afd = -1; /* audio FIFO fd, -1 until a reader attaches */
|
||||
int logged_aud = 0;
|
||||
|
||||
uint64_t frames_out = 0;
|
||||
uint64_t total_dropped = 0;
|
||||
uint64_t audio_bytes = 0;
|
||||
uint64_t audio_gaps = 0; /* frames where embedded audio was absent (silence filled) */
|
||||
|
||||
while (!g_stop) {
|
||||
fc_frame_ref_t ref;
|
||||
|
|
@ -93,8 +127,6 @@ int main(int argc, char *argv[]) {
|
|||
if (rc == FC_ERROR) break;
|
||||
|
||||
if (rc == FC_LAPPED) {
|
||||
/* Copy was torn (writer lapped us mid-read). No valid frame to
|
||||
* write — log and read again. */
|
||||
total_dropped = fc_consumer_dropped(c);
|
||||
fprintf(stderr, "[fc_pipe] WARNING: frame lapped mid-read — total dropped: %llu\n",
|
||||
(unsigned long long)total_dropped);
|
||||
|
|
@ -102,32 +134,73 @@ int main(int argc, char *argv[]) {
|
|||
}
|
||||
|
||||
if (rc == FC_DROPPED) {
|
||||
/* Skipped one or more older frames, but THIS frame is valid — log
|
||||
* and write it (do NOT continue). */
|
||||
total_dropped = fc_consumer_dropped(c);
|
||||
fprintf(stderr, "[fc_pipe] WARNING: consumer fell behind — total dropped: %llu\n",
|
||||
(unsigned long long)total_dropped);
|
||||
}
|
||||
|
||||
/* Write frame data to stdout (ref.data is a stable consumer-owned copy) */
|
||||
/* ── Try to (re)attach the audio FIFO without stalling video ──────────
|
||||
* O_WRONLY|O_NONBLOCK returns ENXIO until ffmpeg input 1 opens the read
|
||||
* end. We keep delivering video so ffmpeg progresses and opens it. */
|
||||
if (audio_fifo && afd < 0) {
|
||||
int fd = open(audio_fifo, O_WRONLY | O_NONBLOCK);
|
||||
if (fd >= 0) {
|
||||
/* Switch to blocking for coupled writes. */
|
||||
int fl = fcntl(fd, F_GETFL, 0);
|
||||
if (fl >= 0) fcntl(fd, F_SETFL, fl & ~O_NONBLOCK);
|
||||
afd = fd;
|
||||
if (!logged_aud) {
|
||||
fprintf(stderr, "[fc_pipe] audio FIFO reader attached — coupled audio live\n");
|
||||
logged_aud = 1;
|
||||
}
|
||||
}
|
||||
/* else: not ready yet (ENXIO) — deliver video, retry next frame. */
|
||||
}
|
||||
|
||||
/* ── Write VIDEO to stdout ──────────────────────────────────────────── */
|
||||
if (write_all_fd(STDOUT_FILENO, ref.data, ref.size) < 0) {
|
||||
if (!g_stop)
|
||||
fprintf(stderr, "[fc_pipe] stdout EPIPE — ffmpeg exited\n");
|
||||
break;
|
||||
}
|
||||
frames_out++;
|
||||
|
||||
/* Periodic stats to stderr (every 300 frames ≈ 5s at 60fps) */
|
||||
/* ── Write THIS frame's AUDIO to the FIFO, in lockstep ────────────────
|
||||
* Same ring entry, same iteration ⇒ frame-coupled. The bridge writer
|
||||
* guarantees every entry carries one frame-interval of audio (real
|
||||
* embedded PCM, or silence when the signal has none), so ref.audio_size
|
||||
* is non-zero in steady state and the audio timeline length always
|
||||
* tracks the video timeline length (no drift). A 0-size entry (only at
|
||||
* the very first frame, or a video-only net source) contributes nothing
|
||||
* and is harmless because ffmpeg derives audio PTS from sample count. */
|
||||
if (afd >= 0) {
|
||||
if (ref.audio_size > 0 && ref.audio) {
|
||||
if (write_all_fd(afd, ref.audio, ref.audio_size) < 0) {
|
||||
fprintf(stderr, "[fc_pipe] audio FIFO EPIPE — will reattach\n");
|
||||
close(afd); afd = -1;
|
||||
} else {
|
||||
audio_bytes += ref.audio_size;
|
||||
}
|
||||
} else {
|
||||
audio_gaps++; /* diagnostics: frame without embedded audio */
|
||||
}
|
||||
}
|
||||
|
||||
frames_out++;
|
||||
if (frames_out % 300 == 0) {
|
||||
fprintf(stderr, "[fc_pipe] frames=%llu dropped=%llu\n",
|
||||
fprintf(stderr, "[fc_pipe] frames=%llu dropped=%llu audio_bytes=%llu gaps=%llu\n",
|
||||
(unsigned long long)frames_out,
|
||||
(unsigned long long)total_dropped);
|
||||
(unsigned long long)total_dropped,
|
||||
(unsigned long long)audio_bytes,
|
||||
(unsigned long long)audio_gaps);
|
||||
}
|
||||
}
|
||||
|
||||
if (afd >= 0) close(afd);
|
||||
fc_consumer_close(c);
|
||||
fprintf(stderr, "[fc_pipe] done frames=%llu dropped=%llu\n",
|
||||
fprintf(stderr, "[fc_pipe] done frames=%llu dropped=%llu audio_bytes=%llu gaps=%llu\n",
|
||||
(unsigned long long)frames_out,
|
||||
(unsigned long long)total_dropped);
|
||||
(unsigned long long)total_dropped,
|
||||
(unsigned long long)audio_bytes,
|
||||
(unsigned long long)audio_gaps);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -81,6 +81,9 @@ struct fc_slot *fc_slot_create(const char *slot_id,
|
|||
hdr->pixel_format = pixel_format;
|
||||
hdr->frame_size = frame_size;
|
||||
hdr->ring_depth = FC_RING_DEPTH;
|
||||
hdr->audio_max_bytes = FC_MAX_AUDIO_BYTES;
|
||||
hdr->audio_rate = FC_AUDIO_RATE;
|
||||
hdr->audio_channels = FC_AUDIO_CHANNELS;
|
||||
atomic_store(&hdr->write_cursor, 0);
|
||||
atomic_store(&hdr->dropped_frames, 0);
|
||||
strncpy(hdr->source_type, source_type ? source_type : "unknown",
|
||||
|
|
@ -154,13 +157,43 @@ void fc_slot_write_frame(struct fc_slot *s,
|
|||
frame->wall_us = (uint64_t)({ struct timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000; });
|
||||
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
|
||||
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
|
||||
frame->audio_size = 0; /* video-only source (e.g. net_ingest): no embedded audio */
|
||||
memcpy(frame->data, data, frame->size);
|
||||
|
||||
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
|
||||
sem_post(s->sem);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write one frame-coupled entry: video bytes + this frame's audio bytes.
|
||||
* audio may be NULL / asize 0 (no embedded audio this frame). Never blocks.
|
||||
*/
|
||||
void fc_slot_write_av(struct fc_slot *s,
|
||||
const uint8_t *video, uint32_t vsize,
|
||||
const uint8_t *audio, uint32_t asize,
|
||||
uint64_t pts_us)
|
||||
{
|
||||
fc_header_t *hdr = (fc_header_t *)s->base;
|
||||
uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed);
|
||||
fc_frame_t *frame = fc_frame_at(s->base, hdr->frame_size, cur);
|
||||
|
||||
frame->pts_us = pts_us;
|
||||
frame->wall_us = (uint64_t)({ struct timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000; });
|
||||
frame->size = vsize < hdr->frame_size ? vsize : hdr->frame_size;
|
||||
memcpy(frame->data, video, frame->size);
|
||||
|
||||
uint32_t acap = hdr->audio_max_bytes;
|
||||
frame->audio_size = (audio && asize) ? (asize < acap ? asize : acap) : 0;
|
||||
if (frame->audio_size)
|
||||
memcpy(fc_frame_audio(frame, hdr->frame_size), audio, frame->audio_size);
|
||||
|
||||
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
|
||||
sem_post(s->sem);
|
||||
}
|
||||
|
||||
/* ── client-side open / read / close (also used by capture-manager) ── */
|
||||
|
||||
/**
|
||||
|
|
@ -183,6 +216,11 @@ struct fc_slot *fc_slot_open(const char *slot_id)
|
|||
if (tmp_hdr.magic != FC_MAGIC) {
|
||||
close(fd); return NULL;
|
||||
}
|
||||
if (tmp_hdr.version != FC_VERSION) {
|
||||
fprintf(stderr, "[framecache] slot %s version %u != expected %u — refusing (fail-safe)\n",
|
||||
slot_id, tmp_hdr.version, FC_VERSION);
|
||||
close(fd); return NULL;
|
||||
}
|
||||
size_t total = fc_slot_shm_size(tmp_hdr.frame_size);
|
||||
|
||||
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
|
||||
|
|
|
|||
|
|
@ -3,10 +3,33 @@
|
|||
*
|
||||
* Layout per slot (/dev/shm/framecache/<slot_id>):
|
||||
* [fc_header_t — 4KB aligned]
|
||||
* [fc_frame_t × ring_depth — each FC_FRAME_HDR_SIZE + frame_size bytes]
|
||||
* [fc_frame_t × ring_depth — each FC_FRAME_HDR_SIZE + frame_size + FC_MAX_AUDIO_BYTES bytes]
|
||||
*
|
||||
* Writer advances write_cursor atomically and posts the named semaphore.
|
||||
* Each consumer tracks its own read_cursor independently — writer never blocks.
|
||||
*
|
||||
* ── FRAME-COUPLED AUDIO (FC_VERSION 2) ──────────────────────────────────────
|
||||
* Each ring entry now carries the VIDEO frame AND that frame's SDI-embedded
|
||||
* AUDIO together in ONE transport. The per-entry data region is:
|
||||
*
|
||||
* [ video bytes : frame_size ][ audio bytes : up to FC_MAX_AUDIO_BYTES ]
|
||||
*
|
||||
* fc_frame_t.size = video byte count (== frame_size for UYVY422)
|
||||
* fc_frame_t.audio_size = audio byte count for THIS frame (s16le stereo 48k),
|
||||
* 0 if the signal carries no embedded audio on this frame.
|
||||
*
|
||||
* Because both streams live in the same ring slot written in one atomic cursor
|
||||
* advance and read in one consumer iteration, audio can never drift ahead of
|
||||
* (or behind) its video frame — the "audio ahead of video" offset is eliminated
|
||||
* at the root: there is no second independent buffer/transport to race.
|
||||
*
|
||||
* Versioning: FC_VERSION bumped 1 → 2. The frame header grew from 24 to 24
|
||||
* bytes (the former _pad uint32 is REUSED as audio_size — no size change), but
|
||||
* the per-entry stride grew by FC_MAX_AUDIO_BYTES and the header gained audio
|
||||
* descriptor fields. An old (v1) reader mmapping a v2 slot would compute the
|
||||
* wrong stride and misparse, so readers MUST check version == FC_VERSION and
|
||||
* fail safe. The writer and all readers share this header; mismatched binaries
|
||||
* refuse to attach.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
|
|
@ -15,12 +38,24 @@
|
|||
#include <semaphore.h>
|
||||
|
||||
#define FC_MAGIC 0x46524D43u /* "FRMC" */
|
||||
#define FC_VERSION 1u
|
||||
#define FC_VERSION 2u /* bumped for frame-coupled audio */
|
||||
#define FC_RING_DEPTH 120u /* ~2s at 59.94fps */
|
||||
#define FC_HEADER_SIZE 4096u /* 4KB header block */
|
||||
#define FC_FRAME_HDR_SIZE 24u /* pts_us(8) + wall_us(8) + size(4) + pad(4) */
|
||||
#define FC_FRAME_HDR_SIZE 24u /* pts_us(8) + wall_us(8) + size(4) + audio_size(4) */
|
||||
#define FC_MAX_SLOT_ID 64u
|
||||
|
||||
/* ── Frame-coupled audio constants ──────────────────────────────────────────
|
||||
* Audio is always delivered as interleaved s16le stereo at 48 kHz (the SDI
|
||||
* embedded-audio mapping ffmpeg/raw2bmx expect). Worst-case samples per video
|
||||
* frame at the lowest broadcast frame rate (~23.976 fps) is ~2002 samples;
|
||||
* 2048 samples * 2 ch * 2 bytes = 8192 bytes. We reserve 16 KB per entry for
|
||||
* generous headroom (covers transient over-delivery and any future 4-byte
|
||||
* sample widths) — ~1.9 MB extra across the 120-deep ring, negligible. */
|
||||
#define FC_AUDIO_RATE 48000u
|
||||
#define FC_AUDIO_CHANNELS 2u
|
||||
#define FC_AUDIO_SAMPLE_BYTES 2u /* s16le */
|
||||
#define FC_MAX_AUDIO_BYTES 16384u
|
||||
|
||||
/* Internal handle used by both server (writer) and client (reader) */
|
||||
struct fc_slot {
|
||||
int shm_fd;
|
||||
|
|
@ -43,23 +78,31 @@ typedef struct {
|
|||
uint32_t fps_num;
|
||||
uint32_t fps_den;
|
||||
uint32_t pixel_format; /* FC_PIX_UYVY422 */
|
||||
uint32_t frame_size; /* width * height * 2 */
|
||||
uint32_t frame_size; /* width * height * 2 (video bytes per entry) */
|
||||
uint32_t ring_depth; /* FC_RING_DEPTH */
|
||||
uint32_t audio_max_bytes; /* FC_MAX_AUDIO_BYTES — audio region per entry */
|
||||
uint32_t audio_rate; /* FC_AUDIO_RATE (48000) */
|
||||
uint32_t audio_channels; /* FC_AUDIO_CHANNELS (2) */
|
||||
uint32_t _reserved;
|
||||
_Atomic uint64_t write_cursor; /* monotonically increasing frame index */
|
||||
_Atomic uint64_t dropped_frames;
|
||||
char source_type[32]; /* "deltacast" | "blackmagic" | "srt" | "rtmp" */
|
||||
char slot_id[FC_MAX_SLOT_ID];
|
||||
uint8_t _pad[FC_HEADER_SIZE - 144];
|
||||
uint8_t _pad[FC_HEADER_SIZE - 156];
|
||||
} fc_header_t;
|
||||
|
||||
/* Per-frame metadata + data (variable length — use fc_frame_at() accessor) */
|
||||
/* Per-entry metadata + data (variable length — use fc_frame_at() accessor).
|
||||
*
|
||||
* data layout (frame-coupled): [video : size bytes][audio : audio_size bytes]
|
||||
* The audio region is reserved at audio_max_bytes; audio_size <= audio_max_bytes
|
||||
* tells the reader how many audio bytes are valid for THIS frame.
|
||||
*/
|
||||
typedef struct {
|
||||
uint64_t pts_us;
|
||||
uint64_t wall_us;
|
||||
uint32_t size;
|
||||
uint32_t _pad;
|
||||
uint8_t data[]; /* frame_size bytes */
|
||||
uint32_t size; /* video bytes */
|
||||
uint32_t audio_size; /* audio bytes for this frame (s16le stereo 48k), 0 = none */
|
||||
uint8_t data[]; /* [video frame_size][audio audio_max_bytes] */
|
||||
} fc_frame_t;
|
||||
|
||||
/* Compile-time size check */
|
||||
|
|
@ -80,6 +123,11 @@ void fc_slot_close(struct fc_slot *s);
|
|||
void fc_slot_write_frame(struct fc_slot *s,
|
||||
const uint8_t *data, uint32_t size,
|
||||
uint64_t pts_us);
|
||||
/* Frame-coupled write: video + this frame's audio in one ring entry. */
|
||||
void fc_slot_write_av(struct fc_slot *s,
|
||||
const uint8_t *video, uint32_t vsize,
|
||||
const uint8_t *audio, uint32_t asize,
|
||||
uint64_t pts_us);
|
||||
|
||||
/* Accessor functions — inline now that struct fc_slot is defined above */
|
||||
static inline fc_header_t *fc_slot_header(struct fc_slot *s) { return (fc_header_t *)s->base; }
|
||||
|
|
@ -87,13 +135,22 @@ static inline const char *fc_slot_id(struct fc_slot *s) { return s->slot_id
|
|||
static inline const char *fc_slot_shm_path(struct fc_slot *s) { return s->shm_path; }
|
||||
static inline const char *fc_slot_sem_name(struct fc_slot *s) { return s->sem_name; }
|
||||
|
||||
/**
|
||||
* Per-entry stride: header + video bytes + reserved audio region.
|
||||
* Shared by writer and all readers — frame-coupled audio rides in the same
|
||||
* entry, so the stride MUST include FC_MAX_AUDIO_BYTES on every component.
|
||||
*/
|
||||
static inline size_t fc_entry_stride(uint32_t frame_size) {
|
||||
return (size_t)FC_FRAME_HDR_SIZE + frame_size + (size_t)FC_MAX_AUDIO_BYTES;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute total shm size for a slot given frame_size.
|
||||
* = FC_HEADER_SIZE + ring_depth * (FC_FRAME_HDR_SIZE + frame_size)
|
||||
* = FC_HEADER_SIZE + ring_depth * fc_entry_stride(frame_size)
|
||||
*/
|
||||
static inline size_t fc_slot_shm_size(uint32_t frame_size) {
|
||||
return (size_t)FC_HEADER_SIZE
|
||||
+ (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + frame_size);
|
||||
+ (size_t)FC_RING_DEPTH * fc_entry_stride(frame_size);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -102,5 +159,13 @@ static inline size_t fc_slot_shm_size(uint32_t frame_size) {
|
|||
static inline fc_frame_t *fc_frame_at(void *base, uint32_t frame_size, uint64_t idx) {
|
||||
uint8_t *frames = (uint8_t *)base + FC_HEADER_SIZE;
|
||||
return (fc_frame_t *)(frames + (idx % FC_RING_DEPTH)
|
||||
* ((size_t)FC_FRAME_HDR_SIZE + frame_size));
|
||||
* fc_entry_stride(frame_size));
|
||||
}
|
||||
|
||||
/**
|
||||
* Return pointer to the audio region of an entry (immediately after the
|
||||
* frame_size video bytes within data[]).
|
||||
*/
|
||||
static inline uint8_t *fc_frame_audio(fc_frame_t *f, uint32_t frame_size) {
|
||||
return f->data + frame_size;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue