dragonflight/services/framecache/client/fc_client.c
Zac Gaetano 97267aa857 fix(framecache): single-input streaming AVI muxer in fc_pipe (kills 2-pipe ffmpeg deadlock)
fc_pipe now muxes video + that frame's embedded audio into ONE streaming AVI
container on stdout, so ffmpeg reads a SINGLE input (-f avi -i pipe:0) instead
of a raw video pipe + a separate live audio FIFO. The two-live-pipe design
deadlocked ffmpeg (it stalled forever probing input 0); a single interleaved
stream removes that failure mode entirely.

fc_pipe.c:
  - New AVI mode (argv[3] == "--avi"/"avi"). Writes RIFF('AVI ') + LIST(hdrl)
    { avih + strl(vids: strh+BITMAPINFOHEADER UYVY 16bpp) + strl(auds:
    strh+WAVEFORMATEX PCM s16le 48k 2ch) } + LIST(movi) once, then per ring
    entry a '00dc' video chunk followed by a '01wb' audio chunk (LE sizes,
    even-pad). RIFF/movi sizes use the 0x7FFFFFFF streaming sentinel (pipe is
    unseekable); dwFlags has NO index bits. Frame-coupled by construction: both
    chunks come from the SAME ring entry in one read-loop iteration.
  - dwScale/dwRate = fps_den/fps_num (video) and nBlockAlign/nAvgBytesPerSec
    (audio). If a frame has audio_size 0, emits one frame-interval of silence
    (round(48000*fps_den/fps_num) samples) so the audio timeline tracks video
    and ffmpeg never starves on the audio demuxer.
  - Legacy raw video-only mode retained when no avi flag is given. The old
    split-stdout/audio-FIFO threaded path is removed (it was the deadlock).

fc_client.{h,c}:
  - Add fc_consumer_info() / fc_stream_info_t to expose the slot header's
    width/height/fps/audio params to fc_pipe for the AVI header.

capture-manager.js (_buildInputArgs deltacast/sdi framecache branch):
  - Spawn fc_pipe with "--avi" (no audio FIFO). Remove the mkfifo + audio-FIFO
    creation for this path.
  - inputArgs: ONE input  -thread_queue_size 512 -f avi [AUDIO_OFFSET_MS] -i pipe:0
    (was: -f rawvideo -i pipe:0  AND  -f s16le -ar 48000 -ac 2 -i <fifo>).
  - audioInputIndex 0, audioFifo null. Growing VC-3/HEVC builders already map
    [0:v] and audioMap 0🅰️0?; with one AVI input that resolves to 0:v / 0:a.

Validated on zampp3 against the LIVE deltacast-0-0 slot: fc_pipe --avi | ffmpeg
-f avi -i pipe:0 -> dnxhd/pcm_s24le MXF gives 360 video / 360 audio packets in
6.006s (no stall at 2 frames). A synthetic 1 kHz sine slot through the same
path yields mean_volume -9 dB / max -6 dB, proving the muxer carries real audio
end-to-end (the live SDI input currently carries no embedded audio, so the
bridge's silence fallback reads -91 dB — upstream of the muxer).
2026-06-05 14:06:35 +00:00

247 lines
9.8 KiB
C

/**
* fc_client.c — Consumer-side framecache client implementation.
*/
#include "fc_client.h"
#include "../src/slot.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <time.h>
#include <unistd.h>
#define SHM_DIR "/dev/shm/framecache"
#define SEM_PREFIX "/framecache-"
#define SEM_SUFFIX "-write"
struct fc_consumer {
int shm_fd;
void *base;
size_t shm_size;
sem_t *sem;
uint64_t read_cursor; /* consumer's own position in the ring */
uint64_t local_dropped; /* frames skipped by this consumer */
uint8_t *copy_buf; /* consumer-owned copy buffer: [video frame_size][audio audio_max] */
uint32_t frame_size; /* cached from header (video bytes) */
uint32_t audio_max; /* cached from header (audio region capacity) */
char slot_id[FC_MAX_SLOT_ID];
};
static uint64_t now_us(void)
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL;
}
fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
{
char shm_path[128], sem_name[128];
snprintf(shm_path, sizeof shm_path, "%s/%s", SHM_DIR, slot_id);
snprintf(sem_name, sizeof sem_name, "%s%s%s", SEM_PREFIX, slot_id, SEM_SUFFIX);
uint64_t deadline = now_us() + wait_ms * 1000ULL;
int fd = -1;
while (1) {
fd = open(shm_path, O_RDONLY);
if (fd >= 0) break;
if (now_us() >= deadline) return NULL;
struct timespec ts = { .tv_nsec = 100000000 }; /* 100ms */
nanosleep(&ts, NULL);
}
/* Read header to get frame_size */
fc_header_t hdr;
if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) {
close(fd); return NULL;
}
/* Version gate: an old reader against a new writer (or vice-versa) computes
* the wrong per-entry stride and would misparse. Fail safe. */
if (hdr.version != FC_VERSION) {
fprintf(stderr, "[fc_client] slot %s version %u != expected %u — refusing\n",
slot_id, hdr.version, FC_VERSION);
close(fd); return NULL;
}
size_t total = fc_slot_shm_size(hdr.frame_size);
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) { close(fd); return NULL; }
sem_t *sem = sem_open(sem_name, 0);
if (sem == SEM_FAILED) { munmap(base, total); close(fd); return NULL; }
fc_consumer_t *c = calloc(1, sizeof *c);
if (!c) { sem_close(sem); munmap(base, total); close(fd); return NULL; }
/* Consumer-owned copy buffer — fc_consumer_read copies the frame here and
* re-validates the cursor afterward, so a writer lapping a slow consumer
* cannot corrupt the frame the caller is using. Sized for video + audio so
* the frame-coupled audio is copied atomically with its video. */
c->copy_buf = malloc((size_t)hdr.frame_size + hdr.audio_max_bytes);
if (!c->copy_buf) {
free(c); sem_close(sem); munmap(base, total); close(fd); return NULL;
}
c->shm_fd = fd;
c->base = base;
c->shm_size = total;
c->sem = sem;
c->frame_size = hdr.frame_size;
c->audio_max = hdr.audio_max_bytes;
/* Start reading from the current write position so we don't replay old frames */
c->read_cursor = atomic_load_explicit(
&((fc_header_t *)base)->write_cursor, memory_order_acquire);
c->local_dropped = 0;
strncpy(c->slot_id, slot_id, FC_MAX_SLOT_ID - 1);
return c;
}
int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms)
{
fc_header_t *hdr = (fc_header_t *)c->base;
int dropped = 0; /* set when this call skipped one or more frames */
/* ── Wait for new data ──────────────────────────────────────────────
* The semaphore is used ONLY as an edge-wakeup hint, never as a frame
* counter. The writer posts once per frame, but a consumer that skips
* frames (lap) or reads less often than the writer posts would otherwise
* leave the count climbing unbounded — causing sem_timedwait to never
* block (100% CPU busy-spin) and eventually EOVERFLOW. So:
* - cursor-diff (write_cursor - read_cursor) is the SOURCE OF TRUTH for
* whether a frame is available.
* - we drain the semaphore to zero (sem_trywait loop) so the count never
* accumulates.
* - if no frame is available we block on ONE sem_timedwait for wakeup. */
for (;;) {
uint64_t write_cur = atomic_load_explicit(&hdr->write_cursor,
memory_order_acquire);
/* Lap detection: if the writer is more than ring_depth ahead, the
* oldest unread frames have been overwritten — skip to the oldest
* still-valid frame. */
if (write_cur > c->read_cursor + hdr->ring_depth) {
uint64_t skipped = write_cur - c->read_cursor - hdr->ring_depth;
c->read_cursor = write_cur - hdr->ring_depth;
c->local_dropped += skipped;
/* NOTE: do NOT write hdr->dropped_frames here — the consumer maps
* the shm PROT_READ (read-only), so an atomic write would SIGSEGV.
* Per-consumer drops are tracked in c->local_dropped and exposed
* via fc_consumer_dropped(). The writer owns hdr->dropped_frames. */
dropped = 1;
}
if (c->read_cursor < write_cur) {
/* A frame is available — drain the semaphore so its count never
* accumulates, then read+copy below. */
while (sem_trywait(c->sem) == 0) { /* drain */ }
break;
}
/* No frame yet — drain stale posts, then block for a wakeup. */
while (sem_trywait(c->sem) == 0) { /* drain */ }
struct timespec abs_ts;
clock_gettime(CLOCK_REALTIME, &abs_ts);
abs_ts.tv_sec += (time_t)(timeout_ms / 1000);
abs_ts.tv_nsec += (long)((timeout_ms % 1000) * 1000000L);
if (abs_ts.tv_nsec >= 1000000000L) { abs_ts.tv_sec++; abs_ts.tv_nsec -= 1000000000L; }
int w = sem_timedwait(c->sem, &abs_ts);
if (w != 0) {
if (errno == ETIMEDOUT) {
/* Re-check the cursor once more before giving up — the writer
* may have advanced between our check and the wait. */
uint64_t wc2 = atomic_load_explicit(&hdr->write_cursor,
memory_order_acquire);
if (c->read_cursor < wc2) continue;
return FC_TIMEOUT;
}
if (errno == EINTR) continue;
return FC_ERROR;
}
/* Woken — loop to re-evaluate cursor-diff. */
}
/* ── Copy the entry (video + this frame's audio) into the owned buffer ─
* Both are copied from the SAME ring entry in the SAME iteration, so the
* audio handed to the caller is exactly this video frame's embedded audio —
* frame-coupled, no second buffer to drift. copy_buf layout mirrors the
* shm entry: [video frame_size][audio audio_max]. */
fc_frame_t *frame = fc_frame_at(c->base, hdr->frame_size, c->read_cursor);
uint32_t fsz = frame->size;
if (fsz > hdr->frame_size) fsz = hdr->frame_size;
uint32_t asz = frame->audio_size;
if (asz > c->audio_max) asz = c->audio_max;
uint64_t pts = frame->pts_us;
uint64_t wall = frame->wall_us;
memcpy(c->copy_buf, frame->data, fsz);
if (asz)
memcpy(c->copy_buf + c->frame_size,
fc_frame_audio(frame, hdr->frame_size), asz);
/* ── Re-validate AFTER the copy ─────────────────────────────────────
* If the writer lapped us during the copy (overwrote this slot), the copy
* may be torn — discard it and signal DROPPED so the caller reads again. */
uint64_t write_after = atomic_load_explicit(&hdr->write_cursor,
memory_order_acquire);
if (write_after > c->read_cursor + hdr->ring_depth) {
uint64_t skipped = write_after - c->read_cursor - hdr->ring_depth;
c->read_cursor = write_after - hdr->ring_depth;
c->local_dropped += skipped;
return FC_LAPPED; /* copy torn — ref not valid, caller reads again */
}
/* Copy is valid. */
ref->data = c->copy_buf;
ref->size = fsz;
ref->audio = asz ? (c->copy_buf + c->frame_size) : NULL;
ref->audio_size = asz;
ref->pts_us = pts;
ref->wall_us = wall;
ref->seq = c->read_cursor;
c->read_cursor++;
return dropped ? FC_DROPPED : FC_OK;
}
void fc_consumer_close(fc_consumer_t *c)
{
if (!c) return;
if (c->copy_buf) free(c->copy_buf);
sem_close(c->sem);
munmap(c->base, c->shm_size);
close(c->shm_fd);
free(c);
}
uint64_t fc_consumer_write_cursor(fc_consumer_t *c)
{
fc_header_t *hdr = (fc_header_t *)c->base;
return atomic_load(&hdr->write_cursor);
}
uint64_t fc_consumer_dropped(fc_consumer_t *c)
{
return c->local_dropped;
}
int fc_consumer_info(fc_consumer_t *c, fc_stream_info_t *info)
{
if (!c || !info) return -1;
fc_header_t *hdr = (fc_header_t *)c->base;
info->width = hdr->width;
info->height = hdr->height;
info->fps_num = hdr->fps_num;
info->fps_den = hdr->fps_den;
info->pixel_format = hdr->pixel_format;
info->frame_size = hdr->frame_size;
info->audio_rate = hdr->audio_rate ? hdr->audio_rate : FC_AUDIO_RATE;
info->audio_channels = hdr->audio_channels ? hdr->audio_channels : FC_AUDIO_CHANNELS;
info->audio_sample_bytes = FC_AUDIO_SAMPLE_BYTES; /* s16le */
return 0;
}