From 99723da00fd0540f0d6f1a2f2d93932f62b1ce90 Mon Sep 17 00:00:00 2001 From: Wild Dragon Dev Date: Wed, 3 Jun 2026 15:37:17 +0000 Subject: [PATCH] =?UTF-8?q?feat(framecache):=20phase=205=20=E2=80=94=20net?= =?UTF-8?q?work=20ingest=20(RTMP/SRT)=20via=20framecache?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - services/framecache/src/net_ingest.c: new network ingest process - Spawns ffmpeg to decode SRT/RTMP → raw UYVY422 stdout - Reads decoded frames and writes into framecache slot via shm - Registers slot with framecache HTTP API on startup - Deregisters slot on clean exit (SIGTERM) - Reconnect loop for listener mode (stays alive between sessions) - --url, --slot-id, --fc-url, --width, --height, --fps-num/den, --source-type, --listen, --listen-port, --stream-key args - Emits format JSON to stderr on first frame - services/framecache/CMakeLists.txt: add net_ingest target - services/framecache/Dockerfile: copy net_ingest to runtime image - services/node-agent/index.js: - startNetIngest() / stopNetIngest(): lifecycle management per recorder - Spawns net_ingest before sidecar start for srt/rtmp sourceTypes - Injects FC_SLOT_ID=net- into sidecar env - Sets IpcMode=host for network sidecars using framecache - Maps temp id → real containerId after container create - stopNetIngest() called on sidecar stop - NET_INGEST_BIN env var (default: docker exec framecache net_ingest) - services/capture/src/capture-manager.js: - _buildInputArgs srt/rtmp: framecache path when FC_SLOT_ID set (spawns fc_pipe, uses pipe:0 rawvideo input — same as SDI path) - Falls back to direct URL when FC_SLOT_ID not set (legacy path) - audioMap: network via framecache uses '0:a:0?' (video-only fc_pipe, no audio FIFO — audio-in-shm is roadmap) - HLS tee: sdiHlsDir covers network-via-framecache; legacy tee gated on !FC_SLOT_ID to avoid duplicate HLS outputs - fc_pipe piped to ffmpeg stdin for network framecache path - docker-compose.worker.yml: FC_URL + NET_INGEST_BIN in node-agent env --- docker-compose.worker.yml | 6 + services/capture/src/capture-manager.js | 79 ++++- services/framecache/CMakeLists.txt | 13 + services/framecache/Dockerfile | 1 + services/framecache/src/net_ingest.c | 402 ++++++++++++++++++++++++ services/node-agent/index.js | 107 +++++++ 6 files changed, 593 insertions(+), 15 deletions(-) create mode 100644 services/framecache/src/net_ingest.c diff --git a/docker-compose.worker.yml b/docker-compose.worker.yml index 95e17bc..c21b1d8 100644 --- a/docker-compose.worker.yml +++ b/docker-compose.worker.yml @@ -60,6 +60,12 @@ services: BMD_MODEL: ${BMD_MODEL:-} BMD_DEVICE_PREFIX: ${BMD_DEVICE_PREFIX:-dv} LIVE_DIR: ${LIVE_DIR:-/mnt/NVME/MAM/wild-dragon-live} + # Framecache service URL (on the wild-dragon-worker network) + FC_URL: ${FC_URL:-http://framecache:7435} + # net_ingest binary — runs inside the framecache container via docker exec. + # node-agent has docker.sock so it can exec into the framecache container. + # Override with a host-installed path if preferred. + NET_INGEST_BIN: ${NET_INGEST_BIN:-docker exec framecache net_ingest} # REPO_DIR: host path to the checked-out repo. The agent passes this to the # one-shot driver-install container so install-driver.sh can read # sdk// and run deploy/install-driver.sh. Must match the host path diff --git a/services/capture/src/capture-manager.js b/services/capture/src/capture-manager.js index 5c9f3cb..6b59a84 100644 --- a/services/capture/src/capture-manager.js +++ b/services/capture/src/capture-manager.js @@ -521,6 +521,49 @@ class CaptureManager { * @private */ async _buildInputArgs({ sourceType, sourceBackend = 'blackmagic', device, port, board, sourceUrl, listen, listenPort, streamKey }) { + // ── Network sources via framecache (primary when FC_SLOT_ID is set) ────── + // node-agent starts net_ingest before the sidecar, which decodes the stream + // to raw UYVY422 and registers a framecache slot. We read from that slot via + // fc_pipe — same zero-copy path as SDI sources — enabling simultaneous + // growing + proxy + HLS from any network source. + if ((sourceType === 'srt' || sourceType === 'rtmp') && process.env.FC_SLOT_ID) { + const slotId = process.env.FC_SLOT_ID; + const fcPipeBin = process.env.FC_PIPE_BIN || 'fc_pipe'; + const WAIT_MS = 60_000; /* network sources may take longer to connect */ + + const fcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080'; + const fcFps = process.env.DELTACAST_FRAMERATE || '30000/1001'; + + console.log(`[framecache] net slot=${slotId} size=${fcSize} fps=${fcFps}`); + + const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], { + stdio: ['ignore', 'pipe', 'pipe'], + }); + fcPipeProcess.stderr.on('data', chunk => { + process.stderr.write(`[fc_pipe:${slotId}] ${chunk}`); + }); + fcPipeProcess.on('error', err => + console.error(`[fc_pipe:${slotId}] spawn error: ${err.message}`)); + + return { + inputArgs: [ + '-use_wallclock_as_timestamps', '1', + '-thread_queue_size', '512', + '-f', 'rawvideo', + '-pix_fmt', 'uyvy422', + '-video_size', fcSize, + '-framerate', fcFps, + '-i', 'pipe:0', + ], + isNetwork: false, /* treat as raw source — no -map 0:v:0? needed */ + bridgeProcess: fcPipeProcess, + audioFifo: null, + interlaced: false, + _fcPipeProcess: fcPipeProcess, + }; + } + + // ── Legacy direct network paths (no framecache / net_ingest not running) ── if (sourceType === 'srt') { let url; if (listen) { @@ -998,13 +1041,16 @@ exit "$BMXRC" }); // Audio input index: - // - deltacast + blackmagic via framecache (fc_pipe): video on input 0 - // (pipe:0 from fc_pipe), audio FIFO on input 1 → audioMap = '1:a:0?' - // - DeckLink legacy (ffmpeg -f decklink): audio embedded in input 0 - // - network sources: audio in input 0 - const audioMap = (sourceType === 'deltacast' || - ((sourceType === 'sdi' || sourceType === 'blackmagic') && process.env.FC_SLOT_ID)) - ? '1:a:0?' : '0:a:0?'; + // - deltacast via framecache: video pipe:0, audio FIFO input 1 → '1:a:0?' + // - blackmagic via framecache: same → '1:a:0?' + // - network via framecache (fc_pipe): video-only pipe, no audio input → '0:a:0?' + // (net_ingest only writes video; audio from network stream not yet in shm) + // - DeckLink legacy / network legacy: audio in input 0 → '0:a:0?' + const _viaFcPipe = !!process.env.FC_SLOT_ID; + const _hasAudioFifo = _viaFcPipe && (sourceType === 'deltacast' || + sourceType === 'sdi' || + sourceType === 'blackmagic'); + const audioMap = _hasAudioFifo ? '1:a:0?' : '0:a:0?'; // Non-growing master: ffmpeg muxes the finalized MOV directly. Growing // master: raw2bmx muxes the OP1a from elementary FIFOs (handled below via @@ -1051,10 +1097,12 @@ exit "$BMXRC" // stdin (pipe:0 input). For all other sources stdin is ignored. const hiresStdio = bridgeProcess ? ['pipe', 'ignore', 'pipe'] : ['ignore', 'ignore', 'pipe']; - // For SDI/framecache sources the live HLS preview is a SECOND OUTPUT of - // the hires ffmpeg (one read → split → [master] + [HLS preview]). + // For SDI/framecache sources (including network via framecache) the live + // HLS preview is a SECOND OUTPUT of the hires ffmpeg. + const _viaFcPipeHls = !!process.env.FC_SLOT_ID; let sdiHlsDir = null; - if ((sourceType === 'sdi' || sourceType === 'deltacast' || sourceType === 'blackmagic') + if ((sourceType === 'sdi' || sourceType === 'deltacast' || sourceType === 'blackmagic' + || (_viaFcPipeHls && (sourceType === 'srt' || sourceType === 'rtmp'))) && this._assetIdForHls) { const fsMod = await import('node:fs'); sdiHlsDir = '/live/' + this._assetIdForHls; @@ -1130,7 +1178,6 @@ exit "$BMXRC" hiresProcess = spawn('ffmpeg', hiresArgs, { stdio: hiresStdio }); // When video comes from fc_pipe, pipe its stdout to ffmpeg stdin. - // fc_pipe writes raw UYVY422 frames; ffmpeg reads them as rawvideo pipe:0. if (bridgeProcess && bridgeProcess.stdout && hiresProcess.stdin) { bridgeProcess.stdout.pipe(hiresProcess.stdin); bridgeProcess.on('exit', () => { @@ -1147,10 +1194,13 @@ exit "$BMXRC" const processes = { hires: hiresProcess }; const uploads = { hires: growingPath ? Promise.resolve({ growingPath }) : null }; - // ── HLS tee for network sources (live preview in the UI) ────────── + // ── HLS tee for legacy network sources (live preview in the UI) ────────── + // When network sources come via framecache (FC_SLOT_ID set), HLS preview is + // handled as a 2nd ffmpeg output in the hires process above (sdiHlsDir path). + // This tee is only for the legacy direct-URL network path (no framecache). let hlsProcess = null; let hlsDir = null; - if (isNetwork && this._assetIdForHls) { + if (isNetwork && !process.env.FC_SLOT_ID && this._assetIdForHls) { try { const fs = await import('node:fs'); hlsDir = '/live/' + this._assetIdForHls; @@ -1158,7 +1208,6 @@ exit "$BMXRC" const hlsArgs = [ ...inputArgs, '-map', '0:v:0?', '-map', '0:a:0?', - // GPU-gated preview encode, same as the SDI 2nd-output path (#164). ...buildHlsVideoArgs(videoCodec, framerate), '-c:a', 'aac', '-b:a', '128k', '-ar', '44100', '-f', 'hls', '-hls_time', '2', '-hls_list_size', '15', @@ -1170,7 +1219,7 @@ exit "$BMXRC" hlsProcess.stderr.on('data', (d) => { console.error('[HLS] ' + d); }); hlsProcess.on('exit', (c) => console.log('[HLS] exited ' + c)); processes.hls = hlsProcess; - console.log('[HLS] tee started -> ' + hlsDir); + console.log('[HLS] legacy-net tee started -> ' + hlsDir); } catch (err) { console.error('[HLS] tee failed:', err.message); } diff --git a/services/framecache/CMakeLists.txt b/services/framecache/CMakeLists.txt index bd92609..92970bc 100644 --- a/services/framecache/CMakeLists.txt +++ b/services/framecache/CMakeLists.txt @@ -25,6 +25,19 @@ add_library(fc_client STATIC target_include_directories(fc_client PUBLIC src client) target_link_libraries(fc_client rt pthread) +# ── net_ingest — network source (RTMP/SRT) → framecache slot ───────── +# Spawned by node-agent when a network recorder starts. +# Decodes the network stream to raw UYVY422 via ffmpeg and writes frames +# into a framecache slot, giving capture-manager the same fc_pipe consumer +# interface as SDI sources. +add_executable(net_ingest + src/net_ingest.c + src/slot.c +) +target_include_directories(net_ingest PRIVATE src) +target_link_libraries(net_ingest rt pthread) +install(TARGETS net_ingest DESTINATION bin) + # ── fc_pipe — slot → stdout adapter (used by capture-manager.js) ───── # Spawned by capture-manager as a child process; writes raw UYVY422 # frames from a framecache slot to stdout so ffmpeg reads them as diff --git a/services/framecache/Dockerfile b/services/framecache/Dockerfile index c20b8ae..c523353 100644 --- a/services/framecache/Dockerfile +++ b/services/framecache/Dockerfile @@ -19,6 +19,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ COPY --from=builder /build/framecache /usr/local/bin/framecache COPY --from=builder /build/fc_pipe /usr/local/bin/fc_pipe +COPY --from=builder /build/net_ingest /usr/local/bin/net_ingest COPY --from=builder /build/fc_test_consumer /usr/local/bin/fc_test_consumer 2>/dev/null || true # /dev/shm/framecache is created at runtime (tmpfs) diff --git a/services/framecache/src/net_ingest.c b/services/framecache/src/net_ingest.c new file mode 100644 index 0000000..a5e6bbf --- /dev/null +++ b/services/framecache/src/net_ingest.c @@ -0,0 +1,402 @@ +/** + * net_ingest.c — Network source (RTMP/SRT) → framecache slot ingest. + * + * Spawns ffmpeg to decode a network stream to raw UYVY422 on stdout, then + * reads those frames and writes them into a framecache slot via the shm + * ring buffer. Registers the slot with the framecache HTTP API on startup + * and deregisters on clean exit. + * + * Usage: + * net_ingest --url + * --slot-id + * --fc-url http://framecache:7435 + * --width --height + * --fps-num --fps-den + * [--source-type srt|rtmp] + * [--listen] # SRT/RTMP listener mode + * [--listen-port ] # listener port (SRT default 9000, RTMP 1935) + * [--stream-key ] # RTMP stream key (default "stream") + * + * Emits one JSON line to stderr on first frame: + * {"slot_id":"","width":W,"height":H,"fps_num":N,"fps_den":D, + * "source_type":"srt","pix_fmt":"uyvy422"} + * + * Exits 0 on clean stop (SIGTERM), 1 on error. + * + * The framecache slot stays alive between ffmpeg reconnects (listener mode): + * net_ingest keeps the slot open and restarts ffmpeg on disconnect. + */ + +#include "slot.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Re-use fc_writer helpers inline (no external dep) */ +#define FC_URL_DEFAULT "http://localhost:7435" + +static volatile int g_stop = 0; +static void on_signal(int s) { (void)s; g_stop = 1; } + +/* ── Tiny HTTP POST/DELETE (same approach as fc_writer.c) ─────────── */ +static int http_req(const char *method, const char *host, int port, + const char *path, const char *body, + char *resp, size_t resp_len) +{ + struct sockaddr_in sa; + memset(&sa, 0, sizeof sa); + sa.sin_family = AF_INET; + sa.sin_port = htons((uint16_t)port); + struct hostent *he = gethostbyname(host); + if (!he) return -1; + memcpy(&sa.sin_addr, he->h_addr_list[0], (size_t)he->h_length); + + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) return -1; + struct timeval tv = { .tv_sec = 5 }; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof tv); + setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof tv); + if (connect(fd, (struct sockaddr *)&sa, sizeof sa) < 0) { close(fd); return -1; } + + char req[4096]; + int rlen; + if (body) + rlen = snprintf(req, sizeof req, + "%s %s HTTP/1.0\r\nHost: %s:%d\r\n" + "Content-Type: application/json\r\nContent-Length: %zu\r\n" + "Connection: close\r\n\r\n%s", + method, path, host, port, strlen(body), body); + else + rlen = snprintf(req, sizeof req, + "%s %s HTTP/1.0\r\nHost: %s:%d\r\nConnection: close\r\n\r\n", + method, path, host, port); + + send(fd, req, (size_t)rlen, 0); + + int status = -1; + size_t got = 0; + char buf[8192]; + ssize_t n; + while ((n = recv(fd, buf + got, sizeof buf - got - 1, 0)) > 0) got += (size_t)n; + buf[got] = '\0'; + sscanf(buf, "HTTP/%*s %d", &status); + if (resp && resp_len) { + const char *b = strstr(buf, "\r\n\r\n"); + if (b) { strncpy(resp, b + 4, resp_len - 1); resp[resp_len-1] = '\0'; } + } + close(fd); + return status; +} + +static void parse_url(const char *url, char *host, size_t hl, int *port) { + const char *p = url; + if (!strncmp(p, "http://", 7)) p += 7; + *port = 7435; + const char *colon = strchr(p, ':'); + if (colon) { + size_t n = (size_t)(colon - p) < hl ? (size_t)(colon - p) : hl - 1; + strncpy(host, p, n); host[n] = '\0'; + *port = atoi(colon + 1); + } else { strncpy(host, p, hl - 1); host[hl-1] = '\0'; } +} + +static int json_str(const char *j, const char *k, char *out, size_t len) { + char pat[128]; snprintf(pat, sizeof pat, "\"%s\":", k); + const char *p = strstr(j, pat); if (!p) return -1; + p += strlen(pat); while (*p == ' ') p++; + if (*p != '"') return -1; p++; + size_t i = 0; + while (*p && *p != '"' && i < len - 1) out[i++] = *p++; + out[i] = '\0'; return 0; +} + +/* ── Frame size helpers ────────────────────────────────────────────── */ +static inline size_t frame_bytes(uint32_t w, uint32_t h) { + return (size_t)w * h * 2; /* UYVY422 */ +} + +/* ── Register slot with framecache ────────────────────────────────── */ +static int register_slot(const char *fc_url, const char *slot_id, + uint32_t w, uint32_t h, + uint32_t fps_num, uint32_t fps_den, + const char *source_type, + char *shm_path, size_t sp_len, + char *sem_name, size_t sn_len) +{ + char host[128]; int port; + parse_url(fc_url, host, sizeof host, &port); + + char body[512]; + snprintf(body, sizeof body, + "{\"slot_id\":\"%s\",\"width\":%u,\"height\":%u," + "\"fps_num\":%u,\"fps_den\":%u,\"source_type\":\"%s\"}", + slot_id, w, h, fps_num, fps_den, source_type); + + char resp[1024] = {0}; + int st = http_req("POST", host, port, "/slots", body, resp, sizeof resp); + if (st != 201) { + fprintf(stderr, "[net_ingest] POST /slots failed HTTP %d: %s\n", st, resp); + return -1; + } + json_str(resp, "shm_path", shm_path, sp_len); + json_str(resp, "sem_name", sem_name, sn_len); + return 0; +} + +static void deregister_slot(const char *fc_url, const char *slot_id) { + char host[128]; int port; + parse_url(fc_url, host, sizeof host, &port); + char path[192]; snprintf(path, sizeof path, "/slots/%s", slot_id); + http_req("DELETE", host, port, path, NULL, NULL, 0); +} + +/* ── Open shm + semaphore for writing ─────────────────────────────── */ +#include +#include + +typedef struct { + void *base; + size_t size; + int fd; + sem_t *sem; +} ShmWriter; + +static int shm_writer_open(const char *shm_path, const char *sem_name, + ShmWriter *sw) +{ + sw->fd = open(shm_path, O_RDWR); + if (sw->fd < 0) return -1; + fc_header_t hdr; + if (pread(sw->fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) { + close(sw->fd); return -1; + } + sw->size = fc_slot_shm_size(hdr.frame_size); + sw->base = mmap(NULL, sw->size, PROT_READ | PROT_WRITE, MAP_SHARED, sw->fd, 0); + if (sw->base == MAP_FAILED) { close(sw->fd); return -1; } + sw->sem = sem_open(sem_name, 0); + if (sw->sem == SEM_FAILED) { munmap(sw->base, sw->size); close(sw->fd); return -1; } + return 0; +} + +static void shm_write_frame(ShmWriter *sw, const uint8_t *data, + uint32_t size, uint64_t pts_us) +{ + fc_header_t *hdr = (fc_header_t *)sw->base; + uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed); + fc_frame_t *frame = fc_frame_at(sw->base, hdr->frame_size, cur); + struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); + frame->pts_us = pts_us; + frame->wall_us = (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL; + frame->size = size < hdr->frame_size ? size : hdr->frame_size; + memcpy(frame->data, data, frame->size); + atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release); + sem_post(sw->sem); +} + +static void shm_writer_close(ShmWriter *sw) { + if (sw->sem) { sem_close(sw->sem); sw->sem = NULL; } + if (sw->base) { munmap(sw->base, sw->size); sw->base = NULL; } + if (sw->fd >= 0) { close(sw->fd); sw->fd = -1; } +} + +/* ── Build ffmpeg args for network decode → rawvideo stdout ────────── */ +static int build_ffmpeg_args( + char **argv, int max_args, + const char *url, const char *source_type, + int listen, int listen_port, const char *stream_key, + uint32_t w, uint32_t h) +{ + char size_str[32]; + snprintf(size_str, sizeof size_str, "%ux%u", w, h); + char port_str[16]; + + int i = 0; + argv[i++] = "ffmpeg"; + argv[i++] = "-hide_banner"; + argv[i++] = "-loglevel"; argv[i++] = "warning"; + + /* Input */ + argv[i++] = "-probesize"; argv[i++] = "32M"; + argv[i++] = "-analyzeduration"; argv[i++] = "10M"; + argv[i++] = "-fflags"; argv[i++] = "+genpts"; + + if (!strcmp(source_type, "srt") && listen) { + snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 9000); + char srt_url[256]; + snprintf(srt_url, sizeof srt_url, "srt://0.0.0.0:%s?mode=listener", port_str); + argv[i++] = "-i"; argv[i++] = strdup(srt_url); + } else if (!strcmp(source_type, "rtmp") && listen) { + snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 1935); + char rtmp_url[256]; + snprintf(rtmp_url, sizeof rtmp_url, "rtmp://0.0.0.0:%s/live/%s", + port_str, stream_key ? stream_key : "stream"); + argv[i++] = "-listen"; argv[i++] = "1"; + argv[i++] = "-i"; argv[i++] = strdup(rtmp_url); + } else { + argv[i++] = "-i"; argv[i++] = (char *)url; + } + + /* Video output: raw UYVY422 to stdout */ + argv[i++] = "-map"; argv[i++] = "0:v:0"; + argv[i++] = "-vf"; argv[i++] = "scale=iw:ih,format=uyvy422"; + argv[i++] = "-f"; argv[i++] = "rawvideo"; + argv[i++] = "-pix_fmt"; argv[i++] = "uyvy422"; + argv[i++] = "-s"; argv[i++] = strdup(size_str); + argv[i++] = "pipe:1"; + + argv[i] = NULL; + return i; +} + +/* ── Main ──────────────────────────────────────────────────────────── */ +int main(int argc, char *argv[]) { + const char *url = NULL; + const char *slot_id = NULL; + const char *fc_url = getenv("FC_URL") ? getenv("FC_URL") : FC_URL_DEFAULT; + const char *source_type = "srt"; + uint32_t width = 1920, height = 1080; + uint32_t fps_num = 30000, fps_den = 1001; + int listen = 0, listen_port = 0; + const char *stream_key = "stream"; + + for (int i = 1; i < argc; i++) { + if (!strcmp(argv[i], "--url") && i+1 < argc) url = argv[++i]; + else if (!strcmp(argv[i], "--slot-id") && i+1 < argc) slot_id = argv[++i]; + else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) fc_url = argv[++i]; + else if (!strcmp(argv[i], "--source-type") && i+1 < argc) source_type = argv[++i]; + else if (!strcmp(argv[i], "--width") && i+1 < argc) width = (uint32_t)atoi(argv[++i]); + else if (!strcmp(argv[i], "--height") && i+1 < argc) height = (uint32_t)atoi(argv[++i]); + else if (!strcmp(argv[i], "--fps-num") && i+1 < argc) fps_num = (uint32_t)atoi(argv[++i]); + else if (!strcmp(argv[i], "--fps-den") && i+1 < argc) fps_den = (uint32_t)atoi(argv[++i]); + else if (!strcmp(argv[i], "--listen")) listen = 1; + else if (!strcmp(argv[i], "--listen-port") && i+1 < argc) listen_port = atoi(argv[++i]); + else if (!strcmp(argv[i], "--stream-key") && i+1 < argc) stream_key = argv[++i]; + } + + if (!slot_id) { + fprintf(stderr, "[net_ingest] --slot-id required\n"); + return 1; + } + if (!url && !listen) { + fprintf(stderr, "[net_ingest] --url or --listen required\n"); + return 1; + } + + signal(SIGTERM, on_signal); + signal(SIGINT, on_signal); + signal(SIGPIPE, SIG_IGN); + signal(SIGCHLD, SIG_DFL); + + /* ── Register slot ──────────────────────────────────────────────── */ + char shm_path[128] = {0}, sem_name[128] = {0}; + if (register_slot(fc_url, slot_id, width, height, fps_num, fps_den, + source_type, shm_path, sizeof shm_path, + sem_name, sizeof sem_name) < 0) { + return 1; + } + + ShmWriter sw = { .fd = -1 }; + if (shm_writer_open(shm_path, sem_name, &sw) < 0) { + fprintf(stderr, "[net_ingest] failed to open shm %s\n", shm_path); + deregister_slot(fc_url, slot_id); + return 1; + } + + size_t fsz = frame_bytes(width, height); + uint8_t *frame_buf = malloc(fsz); + if (!frame_buf) { shm_writer_close(&sw); deregister_slot(fc_url, slot_id); return 1; } + + uint64_t frame_seq = 0; + int reported = 0; + + fprintf(stderr, "[net_ingest] slot=%s %ux%u %.2ffps source=%s%s\n", + slot_id, width, height, + fps_den ? (double)fps_num / fps_den : 0.0, + source_type, listen ? " (listener)" : ""); + + /* ── Outer reconnect loop (listener mode stays alive between sessions) */ + while (!g_stop) { + /* Build ffmpeg argv */ + char *ff_argv[64]; + build_ffmpeg_args(ff_argv, 64, url, source_type, + listen, listen_port, stream_key, width, height); + + /* Spawn ffmpeg with stdout pipe */ + int pfd[2]; + if (pipe(pfd) < 0) break; + + pid_t pid = fork(); + if (pid < 0) { close(pfd[0]); close(pfd[1]); break; } + + if (pid == 0) { + /* Child: redirect stdout to pipe write end */ + dup2(pfd[1], STDOUT_FILENO); + close(pfd[0]); close(pfd[1]); + execvp("ffmpeg", ff_argv); + _exit(127); + } + + /* Parent: read from pipe read end */ + close(pfd[1]); + int rfd = pfd[0]; + + size_t buf_off = 0; + while (!g_stop) { + ssize_t n = read(rfd, frame_buf + buf_off, fsz - buf_off); + if (n <= 0) break; /* ffmpeg exited or pipe closed */ + buf_off += (size_t)n; + if (buf_off < fsz) continue; /* incomplete frame — keep reading */ + + /* Full frame assembled */ + uint64_t pts_us = fps_num > 0 + ? frame_seq * 1000000ULL * fps_den / fps_num + : 0; + shm_write_frame(&sw, frame_buf, (uint32_t)fsz, pts_us); + frame_seq++; + buf_off = 0; + + if (!reported) { + fprintf(stderr, + "{\"slot_id\":\"%s\",\"width\":%u,\"height\":%u," + "\"fps_num\":%u,\"fps_den\":%u," + "\"source_type\":\"%s\",\"pix_fmt\":\"uyvy422\"}\n", + slot_id, width, height, fps_num, fps_den, source_type); + fflush(stderr); + reported = 1; + } + } + + close(rfd); + /* Reap ffmpeg child */ + int wstatus; + kill(pid, SIGTERM); + waitpid(pid, &wstatus, 0); + + if (!listen || g_stop) break; + + /* Listener mode: wait 1s then reconnect */ + fprintf(stderr, "[net_ingest] listener: waiting for next connection\n"); + struct timespec ts = { .tv_sec = 1 }; + nanosleep(&ts, NULL); + } + + free(frame_buf); + shm_writer_close(&sw); + deregister_slot(fc_url, slot_id); + fprintf(stderr, "[net_ingest] done frames=%llu\n", (unsigned long long)frame_seq); + return 0; +} diff --git a/services/node-agent/index.js b/services/node-agent/index.js index dde165c..4161f8c 100644 --- a/services/node-agent/index.js +++ b/services/node-agent/index.js @@ -86,6 +86,64 @@ const _containerSourceType = new Map(); // port -> fmt JSON from bridge stderr (inject into sidecar env + slot_id) const _dcPortFmt = new Map(); +// ── Network ingest ──────────────────────────────────────────────────────── +// One net_ingest process per active network recorder (SRT/RTMP). +// Decodes the stream to raw UYVY422 and writes into a framecache slot so +// capture-manager can use fc_pipe — the same consumer path as SDI sources. +const NET_INGEST_BIN = process.env.NET_INGEST_BIN || 'net_ingest'; +// containerId → ChildProcess for cleanup on sidecar stop +const _netIngestProcs = new Map(); + +function startNetIngest(containerId, { sourceType, sourceUrl, listen, listenPort, streamKey, + width = 1920, height = 1080, + fpsNum = 30000, fpsDen = 1001 }) { + const slotId = `net-${containerId}`; + const args = [ + '--slot-id', slotId, + '--fc-url', FC_URL, + '--source-type', sourceType, + '--width', String(width), + '--height', String(height), + '--fps-num', String(fpsNum), + '--fps-den', String(fpsDen), + ]; + if (listen) { + args.push('--listen'); + if (listenPort) args.push('--listen-port', String(listenPort)); + if (streamKey) args.push('--stream-key', streamKey); + } else if (sourceUrl) { + args.push('--url', sourceUrl); + } + + console.log(`[net-ingest:${slotId}] launching: ${NET_INGEST_BIN} ${args.join(' ')}`); + const proc = spawn(NET_INGEST_BIN, args, { + stdio: ['ignore', 'ignore', 'pipe'], + env: { ...process.env, FC_URL }, + }); + proc.stderr.setEncoding('utf8'); + proc.stderr.on('data', chunk => { + for (const line of chunk.split('\n')) { + const t = line.trim(); + if (t) console.log(`[net-ingest:${slotId}] ${t}`); + } + }); + proc.on('error', err => console.error(`[net-ingest:${slotId}] spawn error: ${err.message}`)); + proc.on('exit', (c, s) => { + console.log(`[net-ingest:${slotId}] exited code=${c} signal=${s}`); + _netIngestProcs.delete(containerId); + }); + _netIngestProcs.set(containerId, { proc, slotId }); + return slotId; +} + +function stopNetIngest(containerId) { + const entry = _netIngestProcs.get(containerId); + if (!entry) return; + console.log(`[net-ingest:${entry.slotId}] stopping`); + try { entry.proc.kill('SIGTERM'); } catch (_) {} + _netIngestProcs.delete(containerId); +} + // ── DeckLink bridge ─────────────────────────────────────────────────────── // One decklink-bridge process per node, managing all DeckLink devices. // Mirrors the deltacast-bridge singleton pattern. @@ -396,6 +454,44 @@ async function handleSidecarStart(body, res) { // Always inject FC_URL so capture-manager can find the framecache service. sidecarEnv.push(`FC_URL=${FC_URL}`); + // Network sources (SRT/RTMP): launch net_ingest to decode stream into + // a framecache slot, then inject FC_SLOT_ID so capture-manager reads + // from the slot via fc_pipe (same path as SDI sources). + if (sourceType === 'srt' || sourceType === 'rtmp') { + const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14); + let _netCfg = {}; + try { _netCfg = JSON.parse(_srcCfg); } catch (_) {} + const _listen = !!(body.listen || _netCfg.listen); + const _listenPort = body.listenPort || _netCfg.listenPort || 0; + const _streamKey = body.streamKey || _netCfg.streamKey || 'stream'; + const _srcUrl = body.sourceUrl || _netCfg.url || ''; + // Width/height/fps from recorder config if available; defaults used otherwise. + // net_ingest will auto-scale via ffmpeg -vf scale=iw:ih. + const _w = _netCfg.width || 1920; + const _h = _netCfg.height || 1080; + const _fpsNum = _netCfg.fps_num || 30000; + const _fpsDen = _netCfg.fps_den || 1001; + + // containerId not known yet — we start net_ingest just before container + // start and use a temporary slot ID based on a timestamp. + const _tempId = `${sourceType}-${Date.now()}`; + const _slotId = startNetIngest(_tempId, { + sourceType: sourceType, + sourceUrl: _srcUrl, + listen: _listen, + listenPort: _listenPort, + streamKey: _streamKey, + width: _w, + height: _h, + fpsNum: _fpsNum, + fpsDen: _fpsDen, + }); + sidecarEnv.push(`FC_SLOT_ID=${_slotId}`); + hostConfig.IpcMode = 'host'; + // Store temp id so we can remap to real containerId on create success + body._netIngestTempId = _tempId; + } + // Deltacast: ensure the shared bridge daemon is running on the HOST before // starting the sidecar. The bridge writes frames to the framecache shm ring; // the sidecar reads via the consumer library (fc_client). @@ -471,6 +567,15 @@ async function handleSidecarStart(body, res) { if (sourceType === 'deltacast') _containerSourceType.set(containerId, 'deltacast'); if (sourceType === 'sdi' || sourceType === 'blackmagic') _containerSourceType.set(containerId, 'blackmagic'); + if (sourceType === 'srt' || sourceType === 'rtmp') { + _containerSourceType.set(containerId, sourceType); + // Remap net_ingest from temp id to real containerId + if (body._netIngestTempId && _netIngestProcs.has(body._netIngestTempId)) { + const entry = _netIngestProcs.get(body._netIngestTempId); + _netIngestProcs.delete(body._netIngestTempId); + _netIngestProcs.set(containerId, entry); + } + } jsonResponse(res, 201, { containerId, capturePort }); } catch (err) { if (sourceType === 'deltacast') { @@ -535,6 +640,8 @@ async function handleSidecarStop(containerId, res) { _dlSidecarCount = 0; stopDecklinkBridge(); } + } else if (_srcType === 'srt' || _srcType === 'rtmp') { + stopNetIngest(containerId); } } catch (err) { console.error(`[sidecar-stop] background cleanup failed for ${containerId}:`, err.message);