feat(framecache): phase 4 — capture-manager reads from framecache

- services/framecache/client/fc_pipe.c: new slot→stdout pipe adapter
  - Opens framecache slot as consumer (independent cursor per instance)
  - Streams raw UYVY422 frames to stdout continuously
  - SIGPIPE detection via write() return — exits cleanly on ffmpeg exit
  - SIGTERM/SIGINT clean stop from capture-manager
  - Periodic stats to stderr (every 300 frames)
  - Exit codes: 0=clean, 1=slot not found, 2=EPIPE

- services/framecache/CMakeLists.txt: add fc_pipe target + install
- services/framecache/Dockerfile: copy fc_pipe to runtime image

- services/capture/Dockerfile:
  - New fc-pipe-builder stage (builds fc_pipe from framecache sources)
  - Copies fc_pipe binary to /usr/local/bin/fc_pipe in runtime image

- services/capture/src/capture-manager.js:
  - _buildInputArgs: new framecache path for deltacast + sdi/blackmagic
    when FC_SLOT_ID env is set (injected by node-agent from bridge fmt JSON)
    - Spawns fc_pipe <slot_id> as child process
    - Uses pipe:0 as ffmpeg rawvideo input 0
    - Audio FIFO (unchanged) as ffmpeg input 1
    - Falls back to legacy FIFO path when FC_SLOT_ID unset
  - audioMap: covers blackmagic via framecache (input 1 for audio FIFO)
  - isInterlacedSource: covers blackmagic interlaced signals
  - hiresStdio: pipe stdin when bridgeProcess set (fc_pipe stdout→ffmpeg)
  - Non-growing spawn: pipes fc_pipe.stdout → ffmpeg.stdin
  - Growing orchestrator spawn: pipes fc_pipe.stdout → bash.stdin
  - sdiHlsDir: covers blackmagic source type
  - Session state stores _fcPipeProcess for clean stop
  - stop(): sends SIGTERM to fc_pipe after ffmpeg SIGINT
This commit is contained in:
Wild Dragon Dev 2026-06-03 15:32:40 +00:00
parent b2c63de2fa
commit b700902200
5 changed files with 313 additions and 65 deletions

View file

@ -15,6 +15,19 @@ RUN cmake -S /bridge -B /bridge/build \
-DSDK_ROOT=/sdk \
&& cmake --build /bridge/build -j$(nproc)
# ── Stage 1d: Build fc_pipe (framecache slot → stdout adapter) ──────────
# Spawned by capture-manager.js to pipe raw frames from a framecache slot
# into ffmpeg as a rawvideo pipe input. Statically linked against fc_client
# (no runtime dependency on the framecache container — just shm + semaphores).
FROM debian:bookworm AS fc-pipe-builder
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential cmake libmicrohttpd-dev \
&& rm -rf /var/lib/apt/lists/*
COPY ../framecache /fc-src
RUN cmake -S /fc-src -B /fc-src/build \
-DCMAKE_BUILD_TYPE=Release \
&& cmake --build /fc-src/build --target fc_pipe -j$(nproc)
# ── Stage 1c: Build decklink-bridge binary ───────────────────────────────
FROM debian:bookworm AS decklink-bridge-builder
RUN apt-get update && apt-get install -y --no-install-recommends \
@ -172,6 +185,9 @@ COPY --from=bridge-builder /bridge/build/deltacast-capture /usr/local/bin/deltac
# DeckLink bridge binary (no SDK runtime .so — uses dlopen at runtime)
COPY --from=decklink-bridge-builder /decklink-bridge/build/decklink-bridge /usr/local/bin/decklink-bridge
# fc_pipe — framecache slot → stdout, spawned by capture-manager.js
COPY --from=fc-pipe-builder /fc-src/build/fc_pipe /usr/local/bin/fc_pipe
COPY --from=sdk-extractor /sdk/lib/libvideomasterhd.so.6.34.1 /usr/local/lib/deltacast/
COPY --from=sdk-extractor /sdk/lib/libvideomasterhd_audio.so.6.34.1 /usr/local/lib/deltacast/
RUN ln -sf libvideomasterhd.so.6.34.1 /usr/local/lib/deltacast/libvideomasterhd.so.6 \

View file

@ -547,17 +547,115 @@ class CaptureManager {
return { inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-i', sourceUrl], isNetwork: true };
}
// Deltacast SDI via shared bridge daemon (deltacast-bridge).
// ── Framecache path (primary for deltacast + blackmagic) ────────────────
//
// The bridge daemon is started by node-agent (host process, direct /dev access)
// and writes each port's streams to named FIFOs in /dev/shm/deltacast/:
// /dev/shm/deltacast/video-<port>.fifo
// /dev/shm/deltacast/audio-<port>.fifo
// When FC_SLOT_ID is set in the sidecar env (injected by node-agent from
// the bridge's format JSON), we use the framecache shm ring buffer as the
// video source instead of named FIFOs.
//
// This sidecar just reads from those FIFOs. The bridge may still be starting
// up or waiting for signal lock, so we wait up to 30s for the FIFOs to appear
// before handing them to ffmpeg. The bridge process is managed by node-agent;
// bridgeProcess is null here (no per-sidecar bridge spawn).
// fc_pipe is a small C helper that opens the framecache slot as a consumer
// and writes raw UYVY422 frames to stdout. capture-manager spawns it and
// pipes its stdout to ffmpeg as a rawvideo input — same pattern as the
// existing FIFO path, but with zero-copy shm reads and independent per-
// consumer cursors. Multiple fc_pipe instances on the same slot each get
// their own cursor, enabling simultaneous growing + proxy + HLS from one
// SDI input without any frame splitting.
//
// Audio stays on the named FIFO path (same as before — audio fan-out via
// shm is a roadmap item).
//
// Falls back to the legacy FIFO path when FC_SLOT_ID is not set (e.g. on
// nodes running an older node-agent or without framecache deployed).
if ((sourceType === 'deltacast' || sourceType === 'sdi' || sourceType === 'blackmagic')
&& process.env.FC_SLOT_ID) {
const slotId = process.env.FC_SLOT_ID;
const fcPipeBin = process.env.FC_PIPE_BIN || 'fc_pipe';
const WAIT_MS = 30_000;
// Determine audio FIFO path based on source type
const idx = (typeof device === 'number' || /^\d+$/.test(String(device)))
? parseInt(device, 10) : 0;
const portIdx = (sourceType === 'deltacast')
? ((typeof port === 'number' || /^\d+$/.test(String(port)))
? parseInt(port, 10) : idx)
: idx;
let audioFifoPath;
if (sourceType === 'deltacast') {
const DC_PIPE_DIR = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast';
audioFifoPath = `${DC_PIPE_DIR}/audio-${portIdx}.fifo`;
} else {
const DL_AUDIO_DIR = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink';
audioFifoPath = `${DL_AUDIO_DIR}/audio-${portIdx}.fifo`;
}
// Wait up to 30s for the audio FIFO to exist (bridge starts asynchronously)
const { existsSync: _exists } = await import('node:fs');
const deadline = Date.now() + WAIT_MS;
while (Date.now() < deadline) {
if (_exists(audioFifoPath)) break;
await new Promise(r => setTimeout(r, 500));
}
if (!_exists(audioFifoPath)) {
throw new Error(
`audio FIFO not ready after ${WAIT_MS / 1000}s: ${audioFifoPath} ` +
`— is the bridge running?`
);
}
// Video dimensions and fps come from env vars injected by node-agent
// (populated from the bridge's format JSON on signal lock).
const fcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080';
const fcFps = process.env.DELTACAST_FRAMERATE || '60000/1001';
const fcInterlaced = process.env.DELTACAST_INTERLACED === '1';
console.log(`[framecache] slot=${slotId} size=${fcSize} fps=${fcFps} audio=${audioFifoPath}`);
// Spawn fc_pipe: opens the framecache slot with its own read cursor and
// streams raw UYVY422 frames to stdout. ffmpeg reads from the pipe as
// rawvideo input 0; audio FIFO is input 1 (same as before).
const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], {
stdio: ['ignore', 'pipe', 'pipe'],
});
fcPipeProcess.stderr.on('data', chunk => {
process.stderr.write(`[fc_pipe:${slotId}] ${chunk}`);
});
fcPipeProcess.on('error', err => {
console.error(`[fc_pipe:${slotId}] spawn error: ${err.message}`);
});
return {
inputArgs: [
// fc_pipe stdout → ffmpeg rawvideo input 0 (video)
// -use_wallclock_as_timestamps aligns video+audio by arrival time,
// same as the legacy FIFO path.
'-use_wallclock_as_timestamps', '1',
'-thread_queue_size', '512',
'-f', 'rawvideo',
'-pix_fmt', 'uyvy422',
'-video_size', fcSize,
'-framerate', fcFps,
'-i', 'pipe:0',
// Audio FIFO → ffmpeg input 1 (unchanged from legacy path)
'-use_wallclock_as_timestamps', '1',
'-thread_queue_size', '512',
'-f', 's16le',
'-ar', '48000',
'-ac', '2',
'-i', audioFifoPath,
],
isNetwork: false,
bridgeProcess: fcPipeProcess, /* capture-manager pipes this to ffmpeg stdin */
audioFifo: null,
interlaced: fcInterlaced,
_fcPipeProcess: fcPipeProcess, /* stored for clean stop */
};
}
// ── Legacy FIFO path for deltacast ───────────────────────────────────────
// Used when FC_SLOT_ID is not set (framecache not deployed on this node,
// or older node-agent). Will be removed once framecache is everywhere.
if (sourceType === 'deltacast') {
const idx = (typeof device === 'number' || /^\d+$/.test(String(device)))
? parseInt(device, 10) : 0;
@ -568,7 +666,6 @@ class CaptureManager {
const videoFifo = `${DC_PIPE_DIR}/video-${portIdx}.fifo`;
const audioFifo = `${DC_PIPE_DIR}/audio-${portIdx}.fifo`;
// Wait up to 30s for both FIFOs to exist (bridge starts asynchronously).
const { existsSync: _exists } = await import('node:fs');
const WAIT_MS = 30_000;
const POLL_MS = 500;
@ -587,41 +684,14 @@ class CaptureManager {
`(video=${videoReady} audio=${audioReady}) — is deltacast-bridge running?`
);
}
console.log(`[deltacast] port ${portIdx} FIFOs ready: ${videoFifo}, ${audioFifo}`);
console.log(`[deltacast] port ${portIdx} FIFOs ready (legacy): ${videoFifo}, ${audioFifo}`);
// Resolution/fps are not known until the FIFO reader connects and starts
// receiving frames. We use sensible defaults here; ffmpeg's rawvideo demuxer
// will accept whatever the bridge writes once the pipe opens.
// The bridge daemon has already detected the signal and set up streams, so
// the FIFO content is ready-to-read as soon as the reader connects.
//
// NOTE: The format JSON emitted by the bridge on signal lock goes to the
// node-agent (which launched the bridge), not to this sidecar. The sidecar
// therefore uses fixed rawvideo params here. If per-port format introspection
// is needed in future, the node-agent should expose the fmt JSON via an API
// and capture-manager can query it before building inputArgs.
//
// For now, both video dimensions and framerate come from the recorder's
// configured values (passed to start() as `framerate` and implicit in the
// codec args). The rawvideo input is -video_size / -framerate from env or
// recorder config; ffmpeg tolerates a small mismatch in rawvideo (it just
// reads N bytes per frame based on the declared size).
//
// DELTACAST_VIDEO_SIZE / DELTACAST_FRAMERATE: set by node-agent in the
// sidecar env based on the bridge's per-port format JSON, if desired.
const dcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080';
const dcFps = process.env.DELTACAST_FRAMERATE || '60000/1001';
const dcInterlaced = process.env.DELTACAST_INTERLACED === '1';
return {
inputArgs: [
// Both raw FIFOs are timestampless. ffmpeg opens input 0 (video) and
// input 1 (audio) at slightly different moments, so PTS-zeroing each
// stream's first byte would bake in a fixed A/V offset. Stamping each
// input by wall-clock ARRIVAL time aligns them by real time regardless
// of FIFO open order — the robust fix for the A/V start offset.
// Large thread_queue_size avoids "thread message queue blocking" on
// the high-bitrate raw video FIFO.
'-use_wallclock_as_timestamps', '1',
'-thread_queue_size', '512',
'-f', 'rawvideo',
@ -637,8 +707,8 @@ class CaptureManager {
'-i', audioFifo,
],
isNetwork: false,
bridgeProcess: null, /* bridge is managed by node-agent, not this sidecar */
audioFifo: null, /* no per-session FIFO to clean up on stop */
bridgeProcess: null,
audioFifo: null,
interlaced: dcInterlaced,
};
}
@ -927,10 +997,14 @@ exit "$BMXRC"
sourceType, sourceBackend, device, port, board, sourceUrl, listen, listenPort, streamKey,
});
// Audio input index: the deltacast shared bridge delivers video on input 0
// (video FIFO) and audio on input 1 (audio FIFO), so audioMap is '1:a:0?'.
// DeckLink SDI and network sources carry audio inside input 0.
const audioMap = (sourceType === 'deltacast') ? '1:a:0?' : '0:a:0?';
// Audio input index:
// - deltacast + blackmagic via framecache (fc_pipe): video on input 0
// (pipe:0 from fc_pipe), audio FIFO on input 1 → audioMap = '1:a:0?'
// - DeckLink legacy (ffmpeg -f decklink): audio embedded in input 0
// - network sources: audio in input 0
const audioMap = (sourceType === 'deltacast' ||
((sourceType === 'sdi' || sourceType === 'blackmagic') && process.env.FC_SLOT_ID))
? '1:a:0?' : '0:a:0?';
// Non-growing master: ffmpeg muxes the finalized MOV directly. Growing
// master: raw2bmx muxes the OP1a from elementary FIFOs (handled below via
@ -945,7 +1019,9 @@ exit "$BMXRC"
if (hiresCodecArgs) console.log('[capture] hires ffmpeg args:', hiresCodecArgs.join(' '));
const isInterlacedSource = sourceType === 'sdi' || (sourceType === 'deltacast' && interlaced);
const isInterlacedSource = sourceType === 'sdi'
|| (sourceType === 'deltacast' && interlaced)
|| ((sourceType === 'blackmagic') && interlaced);
const sdiFilterArgs = isInterlacedSource ? ['-vf', 'yadif=mode=1:deint=1'] : [];
// Master output destination (NON-growing path only).
@ -971,14 +1047,15 @@ exit "$BMXRC"
catch (err) { console.error('[capture] could not create temp master dir:', err.message); }
}
const hiresOutput = localMasterPath;
// Deltacast reads from FIFOs (no stdin pipe needed). DeckLink pipes stdout.
const hiresStdio = ['ignore', 'ignore', 'pipe'];
// When bridgeProcess is an fc_pipe process its stdout is piped to ffmpeg
// stdin (pipe:0 input). For all other sources stdin is ignored.
const hiresStdio = bridgeProcess ? ['pipe', 'ignore', 'pipe'] : ['ignore', 'ignore', 'pipe'];
// For SDI we cannot open the DeckLink device a second time for a preview
// tee, so the live HLS preview is produced as a SECOND OUTPUT of the hires
// ffmpeg: one decklink read -> yadif -> split -> [ProRes/S3] + [H.264/HLS].
// For SDI/framecache sources the live HLS preview is a SECOND OUTPUT of
// the hires ffmpeg (one read → split → [master] + [HLS preview]).
let sdiHlsDir = null;
if ((sourceType === 'sdi' || sourceType === 'deltacast') && this._assetIdForHls) {
if ((sourceType === 'sdi' || sourceType === 'deltacast' || sourceType === 'blackmagic')
&& this._assetIdForHls) {
const fsMod = await import('node:fs');
sdiHlsDir = '/live/' + this._assetIdForHls;
try { fsMod.mkdirSync(sdiHlsDir, { recursive: true }); } catch (_) {}
@ -1008,30 +1085,36 @@ exit "$BMXRC"
interlaced: isInterlacedSource,
});
console.log('[capture] growing master via raw2bmx; orchestrator script length=' + orchArgs[1].length);
hiresProcess = spawn('bash', orchArgs, { stdio: ['ignore', 'ignore', 'pipe'], detached: true });
hiresProcess = spawn('bash', orchArgs, {
stdio: bridgeProcess ? ['pipe', 'ignore', 'pipe'] : ['ignore', 'ignore', 'pipe'],
detached: true,
});
// When video comes from fc_pipe, pipe its stdout to the bash orchestrator
// stdin (which the orchestrator forwards to the ffmpeg rawvideo input).
if (bridgeProcess && bridgeProcess.stdout && hiresProcess.stdin) {
bridgeProcess.stdout.pipe(hiresProcess.stdin);
bridgeProcess.on('exit', () => {
try { if (hiresProcess.stdin) hiresProcess.stdin.end(); } catch (_) {}
});
}
} else {
// ── Finalized (non-growing) master: ffmpeg muxes the MOV directly ──
let hiresArgs;
if ((sourceType === 'sdi' || sourceType === 'deltacast') && this._assetIdForHls) {
const isSdiLike = sourceType === 'sdi' || sourceType === 'deltacast' || sourceType === 'blackmagic';
if (isSdiLike && this._assetIdForHls) {
const filterStr = isInterlacedSource
? '[0:v]yadif=mode=1:deint=1,split=2[vhi][vlo]'
: '[0:v]split=2[vhi][vlo]';
hiresArgs = [
...inputArgs,
'-filter_complex', filterStr,
// Output 0 — ProRes/MOV master (local temp, uploaded to S3 on stop)
// Output 0 — master (local temp, uploaded to S3 on stop)
'-map', '[vhi]', '-map', audioMap,
// Keep raw audio aligned to the video clock. The two raw FIFOs carry
// no timestamps; -af aresample=async lets ffmpeg stretch/squeeze audio
// to correct any tiny rate mismatch so A/V never drifts over a long
// take. Applies to this output's mapped audio stream.
'-af', 'aresample=async=1:min_hard_comp=0.100000:first_pts=0',
...hiresCodecArgs,
hiresOutput,
// Output 1 — low-latency H.264 HLS preview for the UI monitor.
// GPU-encoded (h264_nvenc) when the GPU is attached to this sidecar,
// otherwise libx264 (issue #164). GOP is pinned to one IDR per HLS
// segment so segments start on keyframes (avoids black/flashing).
// Output 1 — low-latency H.264 HLS preview for the UI monitor
'-map', '[vlo]', '-map', audioMap,
...buildHlsVideoArgs(videoCodec, framerate),
'-c:a', 'aac', '-b:a', '128k', '-ar', '44100',
@ -1040,11 +1123,20 @@ exit "$BMXRC"
'-hls_segment_filename', sdiHlsDir + '/seg-%05d.ts',
sdiHlsDir + '/index.m3u8',
];
console.log('[HLS] SDI preview as 2nd output -> ' + sdiHlsDir);
console.log('[HLS] SDI/framecache preview as 2nd output -> ' + sdiHlsDir);
} else {
hiresArgs = [ ...inputArgs, ...sdiFilterArgs, ...hiresCodecArgs, hiresOutput ];
}
hiresProcess = spawn('ffmpeg', hiresArgs, { stdio: hiresStdio });
// When video comes from fc_pipe, pipe its stdout to ffmpeg stdin.
// fc_pipe writes raw UYVY422 frames; ffmpeg reads them as rawvideo pipe:0.
if (bridgeProcess && bridgeProcess.stdout && hiresProcess.stdin) {
bridgeProcess.stdout.pipe(hiresProcess.stdin);
bridgeProcess.on('exit', () => {
try { if (hiresProcess.stdin) hiresProcess.stdin.end(); } catch (_) {}
});
}
}
// Growing-files: nothing to upload here (promotion worker handles S3).
@ -1130,6 +1222,7 @@ exit "$BMXRC"
audioFifo,
startedAt,
duration: 0,
_fcPipeProcess: bridgeProcess || null, /* fc_pipe process, if framecache path used */
uploads,
codecs: {
videoCodec, videoBitrate, framerate,
@ -1270,6 +1363,11 @@ exit "$BMXRC"
if (processes.hires) processes.hires.kill('SIGINT');
if (processes.proxy) processes.proxy.kill('SIGINT');
if (processes.hls) { try { processes.hls.kill('SIGINT'); } catch (_) {} }
// fc_pipe process (framecache consumer) — stop after ffmpeg so it sees EOF
// naturally via EPIPE when ffmpeg stdin closes. SIGTERM as belt-and-suspenders.
if (currentSession._fcPipeProcess) {
try { currentSession._fcPipeProcess.kill('SIGTERM'); } catch (_) {}
}
/* processes.bridge: removed — bridge is managed by node-agent, not per-session */
// Wait for the master writer to finalize before we read/upload the file.

View file

@ -25,6 +25,17 @@ add_library(fc_client STATIC
target_include_directories(fc_client PUBLIC src client)
target_link_libraries(fc_client rt pthread)
# ── fc_pipe — slot → stdout adapter (used by capture-manager.js) ─────
# Spawned by capture-manager as a child process; writes raw UYVY422
# frames from a framecache slot to stdout so ffmpeg reads them as
# rawvideo pipe input. Multiple fc_pipe instances on the same slot
# each get an independent cursor — zero-copy fan-out.
add_executable(fc_pipe
client/fc_pipe.c
)
target_link_libraries(fc_pipe fc_client)
target_include_directories(fc_pipe PRIVATE src client)
# ── test consumer (dev utility) ──────────────────────────────────────
if(BUILD_TESTS)
add_executable(fc_test_consumer
@ -34,6 +45,6 @@ if(BUILD_TESTS)
target_include_directories(fc_test_consumer PRIVATE src client)
endif()
install(TARGETS framecache DESTINATION bin)
install(TARGETS framecache fc_pipe DESTINATION bin)
install(FILES client/fc_client.h src/slot.h DESTINATION include/framecache)
install(TARGETS fc_client DESTINATION lib)

View file

@ -17,7 +17,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libmicrohttpd12 \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /build/framecache /usr/local/bin/framecache
COPY --from=builder /build/framecache /usr/local/bin/framecache
COPY --from=builder /build/fc_pipe /usr/local/bin/fc_pipe
COPY --from=builder /build/fc_test_consumer /usr/local/bin/fc_test_consumer 2>/dev/null || true
# /dev/shm/framecache is created at runtime (tmpfs)

View file

@ -0,0 +1,122 @@
/**
* fc_pipe.c Framecache slot stdout pipe adapter.
*
* Opens a framecache slot as a consumer and writes raw video frames to
* stdout in a continuous stream. capture-manager.js spawns this process
* and feeds its stdout to ffmpeg as a rawvideo pipe input identical to
* the way DeckLink bridges currently pipe raw frames.
*
* Each consumer instance has its own independent read cursor, so multiple
* fc_pipe processes reading from the same slot never interfere with each
* other. This is how growing + proxy + HLS all read the same SDI signal
* simultaneously.
*
* Usage:
* fc_pipe <slot_id> [wait_ms]
*
* Writes raw UYVY422 frame data to stdout. Terminates on:
* - SIGTERM / SIGINT (clean stop from capture-manager)
* - stdout EPIPE (ffmpeg exited)
* - Slot disappears (bridge stopped)
*
* Exit codes:
* 0 clean stop (SIGTERM)
* 1 slot not found within wait_ms
* 2 stdout write error (EPIPE)
*/
#include "../src/slot.h"
#include "fc_client.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <stdint.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <fcntl.h>
static volatile int g_stop = 0;
static void on_signal(int s) { (void)s; g_stop = 1; }
/* Write all bytes to fd. Returns 0 on success, -1 on EPIPE/error. */
static int write_all_fd(int fd, const void *buf, size_t len) {
const uint8_t *p = (const uint8_t *)buf;
size_t off = 0;
while (off < len) {
ssize_t n = write(fd, p + off, len - off);
if (n > 0) { off += (size_t)n; continue; }
if (n < 0 && errno == EINTR) continue;
return -1; /* EPIPE or other fatal error */
}
return 0;
}
int main(int argc, char *argv[]) {
if (argc < 2) {
fprintf(stderr, "Usage: %s <slot_id> [wait_ms]\n", argv[0]);
return 1;
}
const char *slot_id = argv[1];
uint64_t wait_ms = argc >= 3 ? (uint64_t)atoll(argv[2]) : 30000;
signal(SIGTERM, on_signal);
signal(SIGINT, on_signal);
signal(SIGPIPE, SIG_IGN); /* detect EPIPE via write() return value */
/* Set stdout to binary mode — no newline translation */
fcntl(STDOUT_FILENO, F_SETFL,
fcntl(STDOUT_FILENO, F_GETFL, 0) & ~O_NONBLOCK);
fprintf(stderr, "[fc_pipe] opening slot '%s' (wait %llums)\n",
slot_id, (unsigned long long)wait_ms);
fc_consumer_t *c = fc_consumer_open(slot_id, wait_ms);
if (!c) {
fprintf(stderr, "[fc_pipe] slot '%s' not found within %llums\n",
slot_id, (unsigned long long)wait_ms);
return 1;
}
fprintf(stderr, "[fc_pipe] slot open, streaming to stdout\n");
uint64_t frames_out = 0;
uint64_t total_dropped = 0;
while (!g_stop) {
fc_frame_ref_t ref;
int rc = fc_consumer_read(c, &ref, 2000 /* 2s timeout */);
if (rc == FC_TIMEOUT) continue;
if (rc == FC_ERROR) break;
if (rc == FC_DROPPED) {
total_dropped = fc_consumer_dropped(c);
fprintf(stderr, "[fc_pipe] WARNING: consumer fell behind — total dropped: %llu\n",
(unsigned long long)total_dropped);
}
/* Write frame data to stdout */
if (write_all_fd(STDOUT_FILENO, ref.data, ref.size) < 0) {
if (!g_stop)
fprintf(stderr, "[fc_pipe] stdout EPIPE — ffmpeg exited\n");
break;
}
frames_out++;
/* Periodic stats to stderr (every 300 frames ≈ 5s at 60fps) */
if (frames_out % 300 == 0) {
fprintf(stderr, "[fc_pipe] frames=%llu dropped=%llu\n",
(unsigned long long)frames_out,
(unsigned long long)total_dropped);
}
}
fc_consumer_close(c);
fprintf(stderr, "[fc_pipe] done frames=%llu dropped=%llu\n",
(unsigned long long)frames_out,
(unsigned long long)total_dropped);
return 0;
}