feat(framecache): phase 5 — network ingest (RTMP/SRT) via framecache

- services/framecache/src/net_ingest.c: new network ingest process
  - Spawns ffmpeg to decode SRT/RTMP → raw UYVY422 stdout
  - Reads decoded frames and writes into framecache slot via shm
  - Registers slot with framecache HTTP API on startup
  - Deregisters slot on clean exit (SIGTERM)
  - Reconnect loop for listener mode (stays alive between sessions)
  - --url, --slot-id, --fc-url, --width, --height, --fps-num/den,
    --source-type, --listen, --listen-port, --stream-key args
  - Emits format JSON to stderr on first frame

- services/framecache/CMakeLists.txt: add net_ingest target
- services/framecache/Dockerfile: copy net_ingest to runtime image

- services/node-agent/index.js:
  - startNetIngest() / stopNetIngest(): lifecycle management per recorder
  - Spawns net_ingest before sidecar start for srt/rtmp sourceTypes
  - Injects FC_SLOT_ID=net-<containerId> into sidecar env
  - Sets IpcMode=host for network sidecars using framecache
  - Maps temp id → real containerId after container create
  - stopNetIngest() called on sidecar stop
  - NET_INGEST_BIN env var (default: docker exec framecache net_ingest)

- services/capture/src/capture-manager.js:
  - _buildInputArgs srt/rtmp: framecache path when FC_SLOT_ID set
    (spawns fc_pipe, uses pipe:0 rawvideo input — same as SDI path)
  - Falls back to direct URL when FC_SLOT_ID not set (legacy path)
  - audioMap: network via framecache uses '0🅰️0?' (video-only fc_pipe,
    no audio FIFO — audio-in-shm is roadmap)
  - HLS tee: sdiHlsDir covers network-via-framecache; legacy tee gated
    on !FC_SLOT_ID to avoid duplicate HLS outputs
  - fc_pipe piped to ffmpeg stdin for network framecache path

- docker-compose.worker.yml: FC_URL + NET_INGEST_BIN in node-agent env
This commit is contained in:
Wild Dragon Dev 2026-06-03 15:37:17 +00:00
parent b700902200
commit 99723da00f
6 changed files with 593 additions and 15 deletions

View file

@ -60,6 +60,12 @@ services:
BMD_MODEL: ${BMD_MODEL:-}
BMD_DEVICE_PREFIX: ${BMD_DEVICE_PREFIX:-dv}
LIVE_DIR: ${LIVE_DIR:-/mnt/NVME/MAM/wild-dragon-live}
# Framecache service URL (on the wild-dragon-worker network)
FC_URL: ${FC_URL:-http://framecache:7435}
# net_ingest binary — runs inside the framecache container via docker exec.
# node-agent has docker.sock so it can exec into the framecache container.
# Override with a host-installed path if preferred.
NET_INGEST_BIN: ${NET_INGEST_BIN:-docker exec framecache net_ingest}
# REPO_DIR: host path to the checked-out repo. The agent passes this to the
# one-shot driver-install container so install-driver.sh can read
# sdk/<vendor>/ and run deploy/install-driver.sh. Must match the host path

View file

@ -521,6 +521,49 @@ class CaptureManager {
* @private
*/
async _buildInputArgs({ sourceType, sourceBackend = 'blackmagic', device, port, board, sourceUrl, listen, listenPort, streamKey }) {
// ── Network sources via framecache (primary when FC_SLOT_ID is set) ──────
// node-agent starts net_ingest before the sidecar, which decodes the stream
// to raw UYVY422 and registers a framecache slot. We read from that slot via
// fc_pipe — same zero-copy path as SDI sources — enabling simultaneous
// growing + proxy + HLS from any network source.
if ((sourceType === 'srt' || sourceType === 'rtmp') && process.env.FC_SLOT_ID) {
const slotId = process.env.FC_SLOT_ID;
const fcPipeBin = process.env.FC_PIPE_BIN || 'fc_pipe';
const WAIT_MS = 60_000; /* network sources may take longer to connect */
const fcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080';
const fcFps = process.env.DELTACAST_FRAMERATE || '30000/1001';
console.log(`[framecache] net slot=${slotId} size=${fcSize} fps=${fcFps}`);
const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], {
stdio: ['ignore', 'pipe', 'pipe'],
});
fcPipeProcess.stderr.on('data', chunk => {
process.stderr.write(`[fc_pipe:${slotId}] ${chunk}`);
});
fcPipeProcess.on('error', err =>
console.error(`[fc_pipe:${slotId}] spawn error: ${err.message}`));
return {
inputArgs: [
'-use_wallclock_as_timestamps', '1',
'-thread_queue_size', '512',
'-f', 'rawvideo',
'-pix_fmt', 'uyvy422',
'-video_size', fcSize,
'-framerate', fcFps,
'-i', 'pipe:0',
],
isNetwork: false, /* treat as raw source — no -map 0:v:0? needed */
bridgeProcess: fcPipeProcess,
audioFifo: null,
interlaced: false,
_fcPipeProcess: fcPipeProcess,
};
}
// ── Legacy direct network paths (no framecache / net_ingest not running) ──
if (sourceType === 'srt') {
let url;
if (listen) {
@ -998,13 +1041,16 @@ exit "$BMXRC"
});
// Audio input index:
// - deltacast + blackmagic via framecache (fc_pipe): video on input 0
// (pipe:0 from fc_pipe), audio FIFO on input 1 → audioMap = '1:a:0?'
// - DeckLink legacy (ffmpeg -f decklink): audio embedded in input 0
// - network sources: audio in input 0
const audioMap = (sourceType === 'deltacast' ||
((sourceType === 'sdi' || sourceType === 'blackmagic') && process.env.FC_SLOT_ID))
? '1:a:0?' : '0:a:0?';
// - deltacast via framecache: video pipe:0, audio FIFO input 1 → '1:a:0?'
// - blackmagic via framecache: same → '1:a:0?'
// - network via framecache (fc_pipe): video-only pipe, no audio input → '0:a:0?'
// (net_ingest only writes video; audio from network stream not yet in shm)
// - DeckLink legacy / network legacy: audio in input 0 → '0:a:0?'
const _viaFcPipe = !!process.env.FC_SLOT_ID;
const _hasAudioFifo = _viaFcPipe && (sourceType === 'deltacast' ||
sourceType === 'sdi' ||
sourceType === 'blackmagic');
const audioMap = _hasAudioFifo ? '1:a:0?' : '0:a:0?';
// Non-growing master: ffmpeg muxes the finalized MOV directly. Growing
// master: raw2bmx muxes the OP1a from elementary FIFOs (handled below via
@ -1051,10 +1097,12 @@ exit "$BMXRC"
// stdin (pipe:0 input). For all other sources stdin is ignored.
const hiresStdio = bridgeProcess ? ['pipe', 'ignore', 'pipe'] : ['ignore', 'ignore', 'pipe'];
// For SDI/framecache sources the live HLS preview is a SECOND OUTPUT of
// the hires ffmpeg (one read → split → [master] + [HLS preview]).
// For SDI/framecache sources (including network via framecache) the live
// HLS preview is a SECOND OUTPUT of the hires ffmpeg.
const _viaFcPipeHls = !!process.env.FC_SLOT_ID;
let sdiHlsDir = null;
if ((sourceType === 'sdi' || sourceType === 'deltacast' || sourceType === 'blackmagic')
if ((sourceType === 'sdi' || sourceType === 'deltacast' || sourceType === 'blackmagic'
|| (_viaFcPipeHls && (sourceType === 'srt' || sourceType === 'rtmp')))
&& this._assetIdForHls) {
const fsMod = await import('node:fs');
sdiHlsDir = '/live/' + this._assetIdForHls;
@ -1130,7 +1178,6 @@ exit "$BMXRC"
hiresProcess = spawn('ffmpeg', hiresArgs, { stdio: hiresStdio });
// When video comes from fc_pipe, pipe its stdout to ffmpeg stdin.
// fc_pipe writes raw UYVY422 frames; ffmpeg reads them as rawvideo pipe:0.
if (bridgeProcess && bridgeProcess.stdout && hiresProcess.stdin) {
bridgeProcess.stdout.pipe(hiresProcess.stdin);
bridgeProcess.on('exit', () => {
@ -1147,10 +1194,13 @@ exit "$BMXRC"
const processes = { hires: hiresProcess };
const uploads = { hires: growingPath ? Promise.resolve({ growingPath }) : null };
// ── HLS tee for network sources (live preview in the UI) ──────────
// ── HLS tee for legacy network sources (live preview in the UI) ──────────
// When network sources come via framecache (FC_SLOT_ID set), HLS preview is
// handled as a 2nd ffmpeg output in the hires process above (sdiHlsDir path).
// This tee is only for the legacy direct-URL network path (no framecache).
let hlsProcess = null;
let hlsDir = null;
if (isNetwork && this._assetIdForHls) {
if (isNetwork && !process.env.FC_SLOT_ID && this._assetIdForHls) {
try {
const fs = await import('node:fs');
hlsDir = '/live/' + this._assetIdForHls;
@ -1158,7 +1208,6 @@ exit "$BMXRC"
const hlsArgs = [
...inputArgs,
'-map', '0:v:0?', '-map', '0:a:0?',
// GPU-gated preview encode, same as the SDI 2nd-output path (#164).
...buildHlsVideoArgs(videoCodec, framerate),
'-c:a', 'aac', '-b:a', '128k', '-ar', '44100',
'-f', 'hls', '-hls_time', '2', '-hls_list_size', '15',
@ -1170,7 +1219,7 @@ exit "$BMXRC"
hlsProcess.stderr.on('data', (d) => { console.error('[HLS] ' + d); });
hlsProcess.on('exit', (c) => console.log('[HLS] exited ' + c));
processes.hls = hlsProcess;
console.log('[HLS] tee started -> ' + hlsDir);
console.log('[HLS] legacy-net tee started -> ' + hlsDir);
} catch (err) {
console.error('[HLS] tee failed:', err.message);
}

View file

@ -25,6 +25,19 @@ add_library(fc_client STATIC
target_include_directories(fc_client PUBLIC src client)
target_link_libraries(fc_client rt pthread)
# ── net_ingest — network source (RTMP/SRT) → framecache slot ─────────
# Spawned by node-agent when a network recorder starts.
# Decodes the network stream to raw UYVY422 via ffmpeg and writes frames
# into a framecache slot, giving capture-manager the same fc_pipe consumer
# interface as SDI sources.
add_executable(net_ingest
src/net_ingest.c
src/slot.c
)
target_include_directories(net_ingest PRIVATE src)
target_link_libraries(net_ingest rt pthread)
install(TARGETS net_ingest DESTINATION bin)
# ── fc_pipe — slot → stdout adapter (used by capture-manager.js) ─────
# Spawned by capture-manager as a child process; writes raw UYVY422
# frames from a framecache slot to stdout so ffmpeg reads them as

View file

@ -19,6 +19,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
COPY --from=builder /build/framecache /usr/local/bin/framecache
COPY --from=builder /build/fc_pipe /usr/local/bin/fc_pipe
COPY --from=builder /build/net_ingest /usr/local/bin/net_ingest
COPY --from=builder /build/fc_test_consumer /usr/local/bin/fc_test_consumer 2>/dev/null || true
# /dev/shm/framecache is created at runtime (tmpfs)

View file

@ -0,0 +1,402 @@
/**
* net_ingest.c Network source (RTMP/SRT) framecache slot ingest.
*
* Spawns ffmpeg to decode a network stream to raw UYVY422 on stdout, then
* reads those frames and writes them into a framecache slot via the shm
* ring buffer. Registers the slot with the framecache HTTP API on startup
* and deregisters on clean exit.
*
* Usage:
* net_ingest --url <srt://...|rtmp://...>
* --slot-id <recorder-uuid>
* --fc-url http://framecache:7435
* --width <W> --height <H>
* --fps-num <N> --fps-den <D>
* [--source-type srt|rtmp]
* [--listen] # SRT/RTMP listener mode
* [--listen-port <N>] # listener port (SRT default 9000, RTMP 1935)
* [--stream-key <k>] # RTMP stream key (default "stream")
*
* Emits one JSON line to stderr on first frame:
* {"slot_id":"<id>","width":W,"height":H,"fps_num":N,"fps_den":D,
* "source_type":"srt","pix_fmt":"uyvy422"}
*
* Exits 0 on clean stop (SIGTERM), 1 on error.
*
* The framecache slot stays alive between ffmpeg reconnects (listener mode):
* net_ingest keeps the slot open and restarts ffmpeg on disconnect.
*/
#include "slot.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <stdatomic.h>
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#include <netdb.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
/* Re-use fc_writer helpers inline (no external dep) */
#define FC_URL_DEFAULT "http://localhost:7435"
static volatile int g_stop = 0;
static void on_signal(int s) { (void)s; g_stop = 1; }
/* ── Tiny HTTP POST/DELETE (same approach as fc_writer.c) ─────────── */
static int http_req(const char *method, const char *host, int port,
const char *path, const char *body,
char *resp, size_t resp_len)
{
struct sockaddr_in sa;
memset(&sa, 0, sizeof sa);
sa.sin_family = AF_INET;
sa.sin_port = htons((uint16_t)port);
struct hostent *he = gethostbyname(host);
if (!he) return -1;
memcpy(&sa.sin_addr, he->h_addr_list[0], (size_t)he->h_length);
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) return -1;
struct timeval tv = { .tv_sec = 5 };
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof tv);
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof tv);
if (connect(fd, (struct sockaddr *)&sa, sizeof sa) < 0) { close(fd); return -1; }
char req[4096];
int rlen;
if (body)
rlen = snprintf(req, sizeof req,
"%s %s HTTP/1.0\r\nHost: %s:%d\r\n"
"Content-Type: application/json\r\nContent-Length: %zu\r\n"
"Connection: close\r\n\r\n%s",
method, path, host, port, strlen(body), body);
else
rlen = snprintf(req, sizeof req,
"%s %s HTTP/1.0\r\nHost: %s:%d\r\nConnection: close\r\n\r\n",
method, path, host, port);
send(fd, req, (size_t)rlen, 0);
int status = -1;
size_t got = 0;
char buf[8192];
ssize_t n;
while ((n = recv(fd, buf + got, sizeof buf - got - 1, 0)) > 0) got += (size_t)n;
buf[got] = '\0';
sscanf(buf, "HTTP/%*s %d", &status);
if (resp && resp_len) {
const char *b = strstr(buf, "\r\n\r\n");
if (b) { strncpy(resp, b + 4, resp_len - 1); resp[resp_len-1] = '\0'; }
}
close(fd);
return status;
}
static void parse_url(const char *url, char *host, size_t hl, int *port) {
const char *p = url;
if (!strncmp(p, "http://", 7)) p += 7;
*port = 7435;
const char *colon = strchr(p, ':');
if (colon) {
size_t n = (size_t)(colon - p) < hl ? (size_t)(colon - p) : hl - 1;
strncpy(host, p, n); host[n] = '\0';
*port = atoi(colon + 1);
} else { strncpy(host, p, hl - 1); host[hl-1] = '\0'; }
}
static int json_str(const char *j, const char *k, char *out, size_t len) {
char pat[128]; snprintf(pat, sizeof pat, "\"%s\":", k);
const char *p = strstr(j, pat); if (!p) return -1;
p += strlen(pat); while (*p == ' ') p++;
if (*p != '"') return -1; p++;
size_t i = 0;
while (*p && *p != '"' && i < len - 1) out[i++] = *p++;
out[i] = '\0'; return 0;
}
/* ── Frame size helpers ────────────────────────────────────────────── */
static inline size_t frame_bytes(uint32_t w, uint32_t h) {
return (size_t)w * h * 2; /* UYVY422 */
}
/* ── Register slot with framecache ────────────────────────────────── */
static int register_slot(const char *fc_url, const char *slot_id,
uint32_t w, uint32_t h,
uint32_t fps_num, uint32_t fps_den,
const char *source_type,
char *shm_path, size_t sp_len,
char *sem_name, size_t sn_len)
{
char host[128]; int port;
parse_url(fc_url, host, sizeof host, &port);
char body[512];
snprintf(body, sizeof body,
"{\"slot_id\":\"%s\",\"width\":%u,\"height\":%u,"
"\"fps_num\":%u,\"fps_den\":%u,\"source_type\":\"%s\"}",
slot_id, w, h, fps_num, fps_den, source_type);
char resp[1024] = {0};
int st = http_req("POST", host, port, "/slots", body, resp, sizeof resp);
if (st != 201) {
fprintf(stderr, "[net_ingest] POST /slots failed HTTP %d: %s\n", st, resp);
return -1;
}
json_str(resp, "shm_path", shm_path, sp_len);
json_str(resp, "sem_name", sem_name, sn_len);
return 0;
}
static void deregister_slot(const char *fc_url, const char *slot_id) {
char host[128]; int port;
parse_url(fc_url, host, sizeof host, &port);
char path[192]; snprintf(path, sizeof path, "/slots/%s", slot_id);
http_req("DELETE", host, port, path, NULL, NULL, 0);
}
/* ── Open shm + semaphore for writing ─────────────────────────────── */
#include <sys/mman.h>
#include <semaphore.h>
typedef struct {
void *base;
size_t size;
int fd;
sem_t *sem;
} ShmWriter;
static int shm_writer_open(const char *shm_path, const char *sem_name,
ShmWriter *sw)
{
sw->fd = open(shm_path, O_RDWR);
if (sw->fd < 0) return -1;
fc_header_t hdr;
if (pread(sw->fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) {
close(sw->fd); return -1;
}
sw->size = fc_slot_shm_size(hdr.frame_size);
sw->base = mmap(NULL, sw->size, PROT_READ | PROT_WRITE, MAP_SHARED, sw->fd, 0);
if (sw->base == MAP_FAILED) { close(sw->fd); return -1; }
sw->sem = sem_open(sem_name, 0);
if (sw->sem == SEM_FAILED) { munmap(sw->base, sw->size); close(sw->fd); return -1; }
return 0;
}
static void shm_write_frame(ShmWriter *sw, const uint8_t *data,
uint32_t size, uint64_t pts_us)
{
fc_header_t *hdr = (fc_header_t *)sw->base;
uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed);
fc_frame_t *frame = fc_frame_at(sw->base, hdr->frame_size, cur);
struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts);
frame->pts_us = pts_us;
frame->wall_us = (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL;
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
memcpy(frame->data, data, frame->size);
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
sem_post(sw->sem);
}
static void shm_writer_close(ShmWriter *sw) {
if (sw->sem) { sem_close(sw->sem); sw->sem = NULL; }
if (sw->base) { munmap(sw->base, sw->size); sw->base = NULL; }
if (sw->fd >= 0) { close(sw->fd); sw->fd = -1; }
}
/* ── Build ffmpeg args for network decode → rawvideo stdout ────────── */
static int build_ffmpeg_args(
char **argv, int max_args,
const char *url, const char *source_type,
int listen, int listen_port, const char *stream_key,
uint32_t w, uint32_t h)
{
char size_str[32];
snprintf(size_str, sizeof size_str, "%ux%u", w, h);
char port_str[16];
int i = 0;
argv[i++] = "ffmpeg";
argv[i++] = "-hide_banner";
argv[i++] = "-loglevel"; argv[i++] = "warning";
/* Input */
argv[i++] = "-probesize"; argv[i++] = "32M";
argv[i++] = "-analyzeduration"; argv[i++] = "10M";
argv[i++] = "-fflags"; argv[i++] = "+genpts";
if (!strcmp(source_type, "srt") && listen) {
snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 9000);
char srt_url[256];
snprintf(srt_url, sizeof srt_url, "srt://0.0.0.0:%s?mode=listener", port_str);
argv[i++] = "-i"; argv[i++] = strdup(srt_url);
} else if (!strcmp(source_type, "rtmp") && listen) {
snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 1935);
char rtmp_url[256];
snprintf(rtmp_url, sizeof rtmp_url, "rtmp://0.0.0.0:%s/live/%s",
port_str, stream_key ? stream_key : "stream");
argv[i++] = "-listen"; argv[i++] = "1";
argv[i++] = "-i"; argv[i++] = strdup(rtmp_url);
} else {
argv[i++] = "-i"; argv[i++] = (char *)url;
}
/* Video output: raw UYVY422 to stdout */
argv[i++] = "-map"; argv[i++] = "0:v:0";
argv[i++] = "-vf"; argv[i++] = "scale=iw:ih,format=uyvy422";
argv[i++] = "-f"; argv[i++] = "rawvideo";
argv[i++] = "-pix_fmt"; argv[i++] = "uyvy422";
argv[i++] = "-s"; argv[i++] = strdup(size_str);
argv[i++] = "pipe:1";
argv[i] = NULL;
return i;
}
/* ── Main ──────────────────────────────────────────────────────────── */
int main(int argc, char *argv[]) {
const char *url = NULL;
const char *slot_id = NULL;
const char *fc_url = getenv("FC_URL") ? getenv("FC_URL") : FC_URL_DEFAULT;
const char *source_type = "srt";
uint32_t width = 1920, height = 1080;
uint32_t fps_num = 30000, fps_den = 1001;
int listen = 0, listen_port = 0;
const char *stream_key = "stream";
for (int i = 1; i < argc; i++) {
if (!strcmp(argv[i], "--url") && i+1 < argc) url = argv[++i];
else if (!strcmp(argv[i], "--slot-id") && i+1 < argc) slot_id = argv[++i];
else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) fc_url = argv[++i];
else if (!strcmp(argv[i], "--source-type") && i+1 < argc) source_type = argv[++i];
else if (!strcmp(argv[i], "--width") && i+1 < argc) width = (uint32_t)atoi(argv[++i]);
else if (!strcmp(argv[i], "--height") && i+1 < argc) height = (uint32_t)atoi(argv[++i]);
else if (!strcmp(argv[i], "--fps-num") && i+1 < argc) fps_num = (uint32_t)atoi(argv[++i]);
else if (!strcmp(argv[i], "--fps-den") && i+1 < argc) fps_den = (uint32_t)atoi(argv[++i]);
else if (!strcmp(argv[i], "--listen")) listen = 1;
else if (!strcmp(argv[i], "--listen-port") && i+1 < argc) listen_port = atoi(argv[++i]);
else if (!strcmp(argv[i], "--stream-key") && i+1 < argc) stream_key = argv[++i];
}
if (!slot_id) {
fprintf(stderr, "[net_ingest] --slot-id required\n");
return 1;
}
if (!url && !listen) {
fprintf(stderr, "[net_ingest] --url or --listen required\n");
return 1;
}
signal(SIGTERM, on_signal);
signal(SIGINT, on_signal);
signal(SIGPIPE, SIG_IGN);
signal(SIGCHLD, SIG_DFL);
/* ── Register slot ──────────────────────────────────────────────── */
char shm_path[128] = {0}, sem_name[128] = {0};
if (register_slot(fc_url, slot_id, width, height, fps_num, fps_den,
source_type, shm_path, sizeof shm_path,
sem_name, sizeof sem_name) < 0) {
return 1;
}
ShmWriter sw = { .fd = -1 };
if (shm_writer_open(shm_path, sem_name, &sw) < 0) {
fprintf(stderr, "[net_ingest] failed to open shm %s\n", shm_path);
deregister_slot(fc_url, slot_id);
return 1;
}
size_t fsz = frame_bytes(width, height);
uint8_t *frame_buf = malloc(fsz);
if (!frame_buf) { shm_writer_close(&sw); deregister_slot(fc_url, slot_id); return 1; }
uint64_t frame_seq = 0;
int reported = 0;
fprintf(stderr, "[net_ingest] slot=%s %ux%u %.2ffps source=%s%s\n",
slot_id, width, height,
fps_den ? (double)fps_num / fps_den : 0.0,
source_type, listen ? " (listener)" : "");
/* ── Outer reconnect loop (listener mode stays alive between sessions) */
while (!g_stop) {
/* Build ffmpeg argv */
char *ff_argv[64];
build_ffmpeg_args(ff_argv, 64, url, source_type,
listen, listen_port, stream_key, width, height);
/* Spawn ffmpeg with stdout pipe */
int pfd[2];
if (pipe(pfd) < 0) break;
pid_t pid = fork();
if (pid < 0) { close(pfd[0]); close(pfd[1]); break; }
if (pid == 0) {
/* Child: redirect stdout to pipe write end */
dup2(pfd[1], STDOUT_FILENO);
close(pfd[0]); close(pfd[1]);
execvp("ffmpeg", ff_argv);
_exit(127);
}
/* Parent: read from pipe read end */
close(pfd[1]);
int rfd = pfd[0];
size_t buf_off = 0;
while (!g_stop) {
ssize_t n = read(rfd, frame_buf + buf_off, fsz - buf_off);
if (n <= 0) break; /* ffmpeg exited or pipe closed */
buf_off += (size_t)n;
if (buf_off < fsz) continue; /* incomplete frame — keep reading */
/* Full frame assembled */
uint64_t pts_us = fps_num > 0
? frame_seq * 1000000ULL * fps_den / fps_num
: 0;
shm_write_frame(&sw, frame_buf, (uint32_t)fsz, pts_us);
frame_seq++;
buf_off = 0;
if (!reported) {
fprintf(stderr,
"{\"slot_id\":\"%s\",\"width\":%u,\"height\":%u,"
"\"fps_num\":%u,\"fps_den\":%u,"
"\"source_type\":\"%s\",\"pix_fmt\":\"uyvy422\"}\n",
slot_id, width, height, fps_num, fps_den, source_type);
fflush(stderr);
reported = 1;
}
}
close(rfd);
/* Reap ffmpeg child */
int wstatus;
kill(pid, SIGTERM);
waitpid(pid, &wstatus, 0);
if (!listen || g_stop) break;
/* Listener mode: wait 1s then reconnect */
fprintf(stderr, "[net_ingest] listener: waiting for next connection\n");
struct timespec ts = { .tv_sec = 1 };
nanosleep(&ts, NULL);
}
free(frame_buf);
shm_writer_close(&sw);
deregister_slot(fc_url, slot_id);
fprintf(stderr, "[net_ingest] done frames=%llu\n", (unsigned long long)frame_seq);
return 0;
}

View file

@ -86,6 +86,64 @@ const _containerSourceType = new Map();
// port -> fmt JSON from bridge stderr (inject into sidecar env + slot_id)
const _dcPortFmt = new Map();
// ── Network ingest ────────────────────────────────────────────────────────
// One net_ingest process per active network recorder (SRT/RTMP).
// Decodes the stream to raw UYVY422 and writes into a framecache slot so
// capture-manager can use fc_pipe — the same consumer path as SDI sources.
const NET_INGEST_BIN = process.env.NET_INGEST_BIN || 'net_ingest';
// containerId → ChildProcess for cleanup on sidecar stop
const _netIngestProcs = new Map();
function startNetIngest(containerId, { sourceType, sourceUrl, listen, listenPort, streamKey,
width = 1920, height = 1080,
fpsNum = 30000, fpsDen = 1001 }) {
const slotId = `net-${containerId}`;
const args = [
'--slot-id', slotId,
'--fc-url', FC_URL,
'--source-type', sourceType,
'--width', String(width),
'--height', String(height),
'--fps-num', String(fpsNum),
'--fps-den', String(fpsDen),
];
if (listen) {
args.push('--listen');
if (listenPort) args.push('--listen-port', String(listenPort));
if (streamKey) args.push('--stream-key', streamKey);
} else if (sourceUrl) {
args.push('--url', sourceUrl);
}
console.log(`[net-ingest:${slotId}] launching: ${NET_INGEST_BIN} ${args.join(' ')}`);
const proc = spawn(NET_INGEST_BIN, args, {
stdio: ['ignore', 'ignore', 'pipe'],
env: { ...process.env, FC_URL },
});
proc.stderr.setEncoding('utf8');
proc.stderr.on('data', chunk => {
for (const line of chunk.split('\n')) {
const t = line.trim();
if (t) console.log(`[net-ingest:${slotId}] ${t}`);
}
});
proc.on('error', err => console.error(`[net-ingest:${slotId}] spawn error: ${err.message}`));
proc.on('exit', (c, s) => {
console.log(`[net-ingest:${slotId}] exited code=${c} signal=${s}`);
_netIngestProcs.delete(containerId);
});
_netIngestProcs.set(containerId, { proc, slotId });
return slotId;
}
function stopNetIngest(containerId) {
const entry = _netIngestProcs.get(containerId);
if (!entry) return;
console.log(`[net-ingest:${entry.slotId}] stopping`);
try { entry.proc.kill('SIGTERM'); } catch (_) {}
_netIngestProcs.delete(containerId);
}
// ── DeckLink bridge ───────────────────────────────────────────────────────
// One decklink-bridge process per node, managing all DeckLink devices.
// Mirrors the deltacast-bridge singleton pattern.
@ -396,6 +454,44 @@ async function handleSidecarStart(body, res) {
// Always inject FC_URL so capture-manager can find the framecache service.
sidecarEnv.push(`FC_URL=${FC_URL}`);
// Network sources (SRT/RTMP): launch net_ingest to decode stream into
// a framecache slot, then inject FC_SLOT_ID so capture-manager reads
// from the slot via fc_pipe (same path as SDI sources).
if (sourceType === 'srt' || sourceType === 'rtmp') {
const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14);
let _netCfg = {};
try { _netCfg = JSON.parse(_srcCfg); } catch (_) {}
const _listen = !!(body.listen || _netCfg.listen);
const _listenPort = body.listenPort || _netCfg.listenPort || 0;
const _streamKey = body.streamKey || _netCfg.streamKey || 'stream';
const _srcUrl = body.sourceUrl || _netCfg.url || '';
// Width/height/fps from recorder config if available; defaults used otherwise.
// net_ingest will auto-scale via ffmpeg -vf scale=iw:ih.
const _w = _netCfg.width || 1920;
const _h = _netCfg.height || 1080;
const _fpsNum = _netCfg.fps_num || 30000;
const _fpsDen = _netCfg.fps_den || 1001;
// containerId not known yet — we start net_ingest just before container
// start and use a temporary slot ID based on a timestamp.
const _tempId = `${sourceType}-${Date.now()}`;
const _slotId = startNetIngest(_tempId, {
sourceType: sourceType,
sourceUrl: _srcUrl,
listen: _listen,
listenPort: _listenPort,
streamKey: _streamKey,
width: _w,
height: _h,
fpsNum: _fpsNum,
fpsDen: _fpsDen,
});
sidecarEnv.push(`FC_SLOT_ID=${_slotId}`);
hostConfig.IpcMode = 'host';
// Store temp id so we can remap to real containerId on create success
body._netIngestTempId = _tempId;
}
// Deltacast: ensure the shared bridge daemon is running on the HOST before
// starting the sidecar. The bridge writes frames to the framecache shm ring;
// the sidecar reads via the consumer library (fc_client).
@ -471,6 +567,15 @@ async function handleSidecarStart(body, res) {
if (sourceType === 'deltacast') _containerSourceType.set(containerId, 'deltacast');
if (sourceType === 'sdi' || sourceType === 'blackmagic') _containerSourceType.set(containerId, 'blackmagic');
if (sourceType === 'srt' || sourceType === 'rtmp') {
_containerSourceType.set(containerId, sourceType);
// Remap net_ingest from temp id to real containerId
if (body._netIngestTempId && _netIngestProcs.has(body._netIngestTempId)) {
const entry = _netIngestProcs.get(body._netIngestTempId);
_netIngestProcs.delete(body._netIngestTempId);
_netIngestProcs.set(containerId, entry);
}
}
jsonResponse(res, 201, { containerId, capturePort });
} catch (err) {
if (sourceType === 'deltacast') {
@ -535,6 +640,8 @@ async function handleSidecarStop(containerId, res) {
_dlSidecarCount = 0;
stopDecklinkBridge();
}
} else if (_srcType === 'srt' || _srcType === 'rtmp') {
stopNetIngest(containerId);
}
} catch (err) {
console.error(`[sidecar-stop] background cleanup failed for ${containerId}:`, err.message);