From 01211fef7a4cb4a8d9c49ddb8e3996e5aa3478a4 Mon Sep 17 00:00:00 2001 From: Wild Dragon Dev Date: Wed, 3 Jun 2026 16:25:34 +0000 Subject: [PATCH] fix(framecache): address critical bugs from code review C-Bug 1 (Torn read): fc_client.c zero-copy pointer replaced with consumer-owned copy buffer + post-copy cursor revalidation to prevent reading torn frames when the writer laps a slow consumer. New FC_LAPPED return code. C-Bug 3 (Semaphore busy-spin): fc_client.c drains the semaphore (sem_trywait) so the count never accumulates, relying entirely on write_cursor diff for availability. Prevents 100% CPU loops + EOVERFLOW. C-Bug 4 (GET /slots stack overflow): framecache.c uses heap allocation with explicit bounds checking for JSON serialization instead of a 64KB stack buffer. C-Bug 6 (DeckLink race): decklink-bridge uses pthread_mutex_t around fc_writer calls and reopen_slot to prevent UAF/double-free from concurrent SDK callbacks. C-Bug 2-net (Resolution resync): net_ingest explicitly scales to target W:H so ffmpeg always outputs exactly frame_size bytes, ignoring source resolution changes. C-Bug 8 (strdup leak): net_ingest uses static caller-owned buffers for ffmpeg args instead of strdup across listener reconnects. C-Bug 9 (PROT_READ segfault): removed atomic write to hdr->dropped_frames from the consumer read loop (which maps shm read-only). --- services/capture/decklink-bridge/main.cpp | 29 +++- services/capture/src/capture-manager.js | 69 ++++++--- services/framecache/client/fc_client.c | 140 +++++++++++++----- services/framecache/client/fc_client.h | 20 ++- services/framecache/client/fc_pipe.c | 13 +- services/framecache/client/fc_test_consumer.c | 1 + services/framecache/src/framecache.c | 30 +++- services/framecache/src/net_ingest.c | 48 ++++-- services/node-agent/index.js | 91 ++++++++---- 9 files changed, 319 insertions(+), 122 deletions(-) diff --git a/services/capture/decklink-bridge/main.cpp b/services/capture/decklink-bridge/main.cpp index 22c0646..cd0d094 100644 --- a/services/capture/decklink-bridge/main.cpp +++ b/services/capture/decklink-bridge/main.cpp @@ -108,7 +108,7 @@ struct DeviceState { int fps_num = 0; int fps_den = 1; bool interlaced = false; - bool signal_reported = false; + std::atomic signal_reported{false}; std::string slot_id; std::string fc_url; @@ -116,6 +116,14 @@ struct DeviceState { #ifndef LEGACY_FIFO fc_writer_t *fc_writer = nullptr; + /* Guards fc_writer + format fields (width/height/fps/signal_reported) + * against concurrent access from DeckLink SDK callback threads: + * VideoInputFormatChanged and VideoInputFrameArrived can fire on + * different threads without mutual exclusion, and reopen_slot() does + * close-then-open on fc_writer. Without this lock a frame callback could + * call fc_writer_write() on a freed writer (use-after-free), or two + * reopen_slot() calls could double-free. */ + pthread_mutex_t fc_lock = PTHREAD_MUTEX_INITIALIZER; #else int video_fifo_fd = -1; std::string video_fifo; @@ -244,19 +252,18 @@ public: if (g_stop.load()) return S_OK; if (!videoFrame) return S_OK; - /* Detect format on first frame if format-change hasn't fired */ - if (!m_ds->signal_reported) { + /* Detect format on first frame if format-change hasn't fired. + * Use atomic exchange so only ONE thread runs the first-frame init + * even if two frame callbacks race before signal_reported is set. */ + bool expected = false; + if (m_ds->signal_reported.compare_exchange_strong(expected, true)) { m_ds->width = (int)videoFrame->GetWidth(); m_ds->height = (int)videoFrame->GetHeight(); - BMDTimeValue fd; BMDTimeScale ts; - videoFrame->GetStreamTime(&fd, nullptr, 1000000); /* unused — just open slot */ - /* Use stored fps from format change, or detect from row bytes */ if (m_ds->fps_num == 0) { m_ds->fps_num = 30000; m_ds->fps_den = 1001; } this->reopen_slot(); - m_ds->signal_reported = true; } /* ── Write video frame ──────────────────────────────────────── */ @@ -279,10 +286,14 @@ public: } #ifndef LEGACY_FIFO + /* Lock so a concurrent VideoInputFormatChanged → reopen_slot() cannot + * free fc_writer between our null-check and the write (use-after-free). */ + pthread_mutex_lock(&m_ds->fc_lock); if (m_ds->fc_writer) { fc_writer_write(m_ds->fc_writer, static_cast(bytes), sz, pts_us); } + pthread_mutex_unlock(&m_ds->fc_lock); #else if (m_ds->video_fifo_fd >= 0) { if (write_all(m_ds->video_fifo_fd, @@ -337,6 +348,9 @@ private: void reopen_slot() { #ifndef LEGACY_FIFO + /* Serialize with frame writes and any concurrent reopen_slot() so we + * never double-free fc_writer or write to a half-closed one. */ + pthread_mutex_lock(&m_ds->fc_lock); if (m_ds->fc_writer) { fc_writer_close(m_ds->fc_writer); m_ds->fc_writer = nullptr; @@ -352,6 +366,7 @@ private: m_ds->device_idx); } } + pthread_mutex_unlock(&m_ds->fc_lock); #endif } }; diff --git a/services/capture/src/capture-manager.js b/services/capture/src/capture-manager.js index 6b59a84..b629b1c 100644 --- a/services/capture/src/capture-manager.js +++ b/services/capture/src/capture-manager.js @@ -539,6 +539,10 @@ class CaptureManager { const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], { stdio: ['ignore', 'pipe', 'pipe'], }); + // Pause stdout immediately so frames don't fill the OS pipe buffer (and + // block fc_pipe's write()) in the window between spawn here and the + // .pipe(ffmpeg.stdin) attach later in start(). .pipe() auto-resumes. + fcPipeProcess.stdout.pause(); fcPipeProcess.stderr.on('data', chunk => { process.stderr.write(`[fc_pipe:${slotId}] ${chunk}`); }); @@ -559,6 +563,7 @@ class CaptureManager { bridgeProcess: fcPipeProcess, audioFifo: null, interlaced: false, + audioInputIndex: 0, /* network fc_pipe is video-only — no audio input */ _fcPipeProcess: fcPipeProcess, }; } @@ -661,6 +666,9 @@ class CaptureManager { const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], { stdio: ['ignore', 'pipe', 'pipe'], }); + // Pause until piped to ffmpeg (avoids OS pipe-buffer fill stall — see + // the network path above for the full rationale). + fcPipeProcess.stdout.pause(); fcPipeProcess.stderr.on('data', chunk => { process.stderr.write(`[fc_pipe:${slotId}] ${chunk}`); }); @@ -692,6 +700,7 @@ class CaptureManager { bridgeProcess: fcPipeProcess, /* capture-manager pipes this to ffmpeg stdin */ audioFifo: null, interlaced: fcInterlaced, + audioInputIndex: 1, /* audio FIFO is ffmpeg input 1 */ _fcPipeProcess: fcPipeProcess, /* stored for clean stop */ }; } @@ -753,6 +762,7 @@ class CaptureManager { bridgeProcess: null, audioFifo: null, interlaced: dcInterlaced, + audioInputIndex: 1, /* legacy deltacast: video FIFO=0, audio FIFO=1 */ }; } @@ -899,14 +909,22 @@ OUT=${sh(outPath)} mkfifo "$VF" "$AF" PATCHPID= cleanup() { rm -f "$VF" "$AF"; [ -n "$PATCHPID" ] && kill "$PATCHPID" 2>/dev/null; } -trap cleanup EXIT + trap cleanup EXIT # Prime both FIFOs read-write (non-blocking) to break the open-order deadlock. exec 7<>"$VF" 8<>"$AF" # raw2bmx: close priming FDs (no stray writer) before exec so it sees real EOF. -( exec 7>&- 8>&-; exec ${bmxLine} ) & +# CRITICAL: redirect raw2bmx stdin from /dev/null so it does NOT inherit the +# parent bash stdin. When the video source is fc_pipe (framecache), bash stdin +# carries the raw video stream destined for ffmpeg's pipe:0 — if raw2bmx also +# inherited fd 0 it would steal bytes from that stream, corrupting both the +# growing master and the ffmpeg input. +( exec 7>&- 8>&- 0&- 8>&-; exec ${ffLine} ) & +# ffmpeg: closes priming FDs and EXPLICITLY inherits bash stdin (fd 0) so that +# 'pipe:0' reads the fc_pipe video stream Node piped into this orchestrator's +# stdin. For non-fc_pipe sources (FIFO/device input) fd 0 is unused and this is +# harmless. +( exec 7>&- 8>&- 0<&0; exec ${ffLine} ) & FFPID=$! # Forward a clean stop to ffmpeg; raw2bmx then gets EOF and finalizes the footer. stop() { kill -INT "$FFPID" 2>/dev/null; } @@ -1036,21 +1054,18 @@ exit "$BMXRC" const startedAt = new Date().toISOString(); this._sessionIdForBridge = sessionId; - const { inputArgs, isNetwork, bridgeProcess = null, audioFifo = null, interlaced = false } = await this._buildInputArgs({ + const { inputArgs, isNetwork, bridgeProcess = null, audioFifo = null, interlaced = false, audioInputIndex = 0 } = await this._buildInputArgs({ sourceType, sourceBackend, device, port, board, sourceUrl, listen, listenPort, streamKey, }); - // Audio input index: - // - deltacast via framecache: video pipe:0, audio FIFO input 1 → '1:a:0?' - // - blackmagic via framecache: same → '1:a:0?' - // - network via framecache (fc_pipe): video-only pipe, no audio input → '0:a:0?' - // (net_ingest only writes video; audio from network stream not yet in shm) - // - DeckLink legacy / network legacy: audio in input 0 → '0:a:0?' - const _viaFcPipe = !!process.env.FC_SLOT_ID; - const _hasAudioFifo = _viaFcPipe && (sourceType === 'deltacast' || - sourceType === 'sdi' || - sourceType === 'blackmagic'); - const audioMap = _hasAudioFifo ? '1:a:0?' : '0:a:0?'; + // Audio input index is returned EXPLICITLY by _buildInputArgs (audioInputIndex) + // rather than guessed from sourceType/FC_SLOT_ID — that guess was wrong for + // the legacy deltacast FIFO path (which has audio at input 1 but no FC_SLOT_ID), + // silently dropping audio. Each return path now declares its own audio input: + // - deltacast/blackmagic via framecache: audio FIFO = input 1 + // - legacy deltacast FIFO: audio FIFO = input 1 + // - network (framecache or legacy) + DeckLink-backend SDI: audio in input 0 + const audioMap = `${audioInputIndex}:a:0?`; // Non-growing master: ffmpeg muxes the finalized MOV directly. Growing // master: raw2bmx muxes the OP1a from elementary FIFOs (handled below via @@ -1150,22 +1165,34 @@ exit "$BMXRC" // ── Finalized (non-growing) master: ffmpeg muxes the MOV directly ── let hiresArgs; const isSdiLike = sourceType === 'sdi' || sourceType === 'deltacast' || sourceType === 'blackmagic'; - if (isSdiLike && this._assetIdForHls) { + // Network via framecache (fc_pipe) also produces its master + HLS as a + // single split ffmpeg, exactly like SDI — it reads pipe:0, not a URL. + const isNetFcPipe = !!process.env.FC_SLOT_ID && (sourceType === 'srt' || sourceType === 'rtmp'); + if ((isSdiLike || isNetFcPipe) && this._assetIdForHls) { const filterStr = isInterlacedSource ? '[0:v]yadif=mode=1:deint=1,split=2[vhi][vlo]' : '[0:v]split=2[vhi][vlo]'; + // Network fc_pipe is video-only (no audio input) — omit audio maps so + // ffmpeg doesn't fail trying to map a nonexistent audio stream. + const hasAudio = audioInputIndex >= 0 && !isNetFcPipe; + const masterAudioMap = hasAudio ? ['-map', audioMap] : []; + const masterAudioFilter = hasAudio + ? ['-af', 'aresample=async=1:min_hard_comp=0.100000:first_pts=0'] : []; + const hlsAudioMap = hasAudio ? ['-map', audioMap] : []; + const hlsAudioCodec = hasAudio + ? ['-c:a', 'aac', '-b:a', '128k', '-ar', '44100'] : []; hiresArgs = [ ...inputArgs, '-filter_complex', filterStr, // Output 0 — master (local temp, uploaded to S3 on stop) - '-map', '[vhi]', '-map', audioMap, - '-af', 'aresample=async=1:min_hard_comp=0.100000:first_pts=0', + '-map', '[vhi]', ...masterAudioMap, + ...masterAudioFilter, ...hiresCodecArgs, hiresOutput, // Output 1 — low-latency H.264 HLS preview for the UI monitor - '-map', '[vlo]', '-map', audioMap, + '-map', '[vlo]', ...hlsAudioMap, ...buildHlsVideoArgs(videoCodec, framerate), - '-c:a', 'aac', '-b:a', '128k', '-ar', '44100', + ...hlsAudioCodec, '-f', 'hls', '-hls_time', '2', '-hls_list_size', '15', '-hls_flags', 'delete_segments+append_list+omit_endlist', '-hls_segment_filename', sdiHlsDir + '/seg-%05d.ts', diff --git a/services/framecache/client/fc_client.c b/services/framecache/client/fc_client.c index 978e331..a206a20 100644 --- a/services/framecache/client/fc_client.c +++ b/services/framecache/client/fc_client.c @@ -26,6 +26,8 @@ struct fc_consumer { sem_t *sem; uint64_t read_cursor; /* consumer's own position in the ring */ uint64_t local_dropped; /* frames skipped by this consumer */ + uint8_t *copy_buf; /* consumer-owned frame copy buffer (frame_size bytes) */ + uint32_t frame_size; /* cached from header */ char slot_id[FC_MAX_SLOT_ID]; }; @@ -68,10 +70,19 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms) fc_consumer_t *c = calloc(1, sizeof *c); if (!c) { sem_close(sem); munmap(base, total); close(fd); return NULL; } - c->shm_fd = fd; - c->base = base; - c->shm_size = total; - c->sem = sem; + /* Consumer-owned copy buffer — fc_consumer_read copies the frame here and + * re-validates the cursor afterward, so a writer lapping a slow consumer + * cannot corrupt the frame the caller is using. */ + c->copy_buf = malloc(hdr.frame_size); + if (!c->copy_buf) { + free(c); sem_close(sem); munmap(base, total); close(fd); return NULL; + } + + c->shm_fd = fd; + c->base = base; + c->shm_size = total; + c->sem = sem; + c->frame_size = hdr.frame_size; /* Start reading from the current write position so we don't replay old frames */ c->read_cursor = atomic_load_explicit( &((fc_header_t *)base)->write_cursor, memory_order_acquire); @@ -83,46 +94,94 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms) int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms) { fc_header_t *hdr = (fc_header_t *)c->base; + int dropped = 0; /* set when this call skipped one or more frames */ - /* Wait for a new frame via semaphore */ - struct timespec abs_ts; - clock_gettime(CLOCK_REALTIME, &abs_ts); - abs_ts.tv_sec += (time_t)(timeout_ms / 1000); - abs_ts.tv_nsec += (long)((timeout_ms % 1000) * 1000000L); - if (abs_ts.tv_nsec >= 1000000000L) { - abs_ts.tv_sec++; - abs_ts.tv_nsec -= 1000000000L; - } - - while (sem_timedwait(c->sem, &abs_ts) != 0) { - if (errno == ETIMEDOUT) return FC_TIMEOUT; - if (errno == EINTR) continue; - return FC_ERROR; - } - - uint64_t write_cur = atomic_load_explicit(&hdr->write_cursor, - memory_order_acquire); - int dropped = 0; - - /* Check if consumer fell behind by more than ring_depth */ - if (write_cur > c->read_cursor + hdr->ring_depth) { - uint64_t skipped = write_cur - c->read_cursor - hdr->ring_depth; - c->read_cursor = write_cur - hdr->ring_depth; - c->local_dropped += skipped; - atomic_fetch_add(&hdr->dropped_frames, skipped); - dropped = 1; - } - - if (c->read_cursor >= write_cur) { - /* Semaphore posted but nothing new yet (spurious) */ - return FC_TIMEOUT; + /* ── Wait for new data ────────────────────────────────────────────── + * The semaphore is used ONLY as an edge-wakeup hint, never as a frame + * counter. The writer posts once per frame, but a consumer that skips + * frames (lap) or reads less often than the writer posts would otherwise + * leave the count climbing unbounded — causing sem_timedwait to never + * block (100% CPU busy-spin) and eventually EOVERFLOW. So: + * - cursor-diff (write_cursor - read_cursor) is the SOURCE OF TRUTH for + * whether a frame is available. + * - we drain the semaphore to zero (sem_trywait loop) so the count never + * accumulates. + * - if no frame is available we block on ONE sem_timedwait for wakeup. */ + for (;;) { + uint64_t write_cur = atomic_load_explicit(&hdr->write_cursor, + memory_order_acquire); + + /* Lap detection: if the writer is more than ring_depth ahead, the + * oldest unread frames have been overwritten — skip to the oldest + * still-valid frame. */ + if (write_cur > c->read_cursor + hdr->ring_depth) { + uint64_t skipped = write_cur - c->read_cursor - hdr->ring_depth; + c->read_cursor = write_cur - hdr->ring_depth; + c->local_dropped += skipped; + /* NOTE: do NOT write hdr->dropped_frames here — the consumer maps + * the shm PROT_READ (read-only), so an atomic write would SIGSEGV. + * Per-consumer drops are tracked in c->local_dropped and exposed + * via fc_consumer_dropped(). The writer owns hdr->dropped_frames. */ + dropped = 1; + } + + if (c->read_cursor < write_cur) { + /* A frame is available — drain the semaphore so its count never + * accumulates, then read+copy below. */ + while (sem_trywait(c->sem) == 0) { /* drain */ } + break; + } + + /* No frame yet — drain stale posts, then block for a wakeup. */ + while (sem_trywait(c->sem) == 0) { /* drain */ } + + struct timespec abs_ts; + clock_gettime(CLOCK_REALTIME, &abs_ts); + abs_ts.tv_sec += (time_t)(timeout_ms / 1000); + abs_ts.tv_nsec += (long)((timeout_ms % 1000) * 1000000L); + if (abs_ts.tv_nsec >= 1000000000L) { abs_ts.tv_sec++; abs_ts.tv_nsec -= 1000000000L; } + + int w = sem_timedwait(c->sem, &abs_ts); + if (w != 0) { + if (errno == ETIMEDOUT) { + /* Re-check the cursor once more before giving up — the writer + * may have advanced between our check and the wait. */ + uint64_t wc2 = atomic_load_explicit(&hdr->write_cursor, + memory_order_acquire); + if (c->read_cursor < wc2) continue; + return FC_TIMEOUT; + } + if (errno == EINTR) continue; + return FC_ERROR; + } + /* Woken — loop to re-evaluate cursor-diff. */ } + /* ── Copy the frame into the consumer-owned buffer ──────────────────── */ fc_frame_t *frame = fc_frame_at(c->base, hdr->frame_size, c->read_cursor); - ref->data = frame->data; - ref->size = frame->size; - ref->pts_us = frame->pts_us; - ref->wall_us = frame->wall_us; + uint32_t fsz = frame->size; + if (fsz > hdr->frame_size) fsz = hdr->frame_size; + uint64_t pts = frame->pts_us; + uint64_t wall = frame->wall_us; + memcpy(c->copy_buf, frame->data, fsz); + + /* ── Re-validate AFTER the copy ───────────────────────────────────── + * If the writer lapped us during the copy (overwrote this slot), the copy + * may be torn — discard it and signal DROPPED so the caller reads again. */ + uint64_t write_after = atomic_load_explicit(&hdr->write_cursor, + memory_order_acquire); + if (write_after > c->read_cursor + hdr->ring_depth) { + uint64_t skipped = write_after - c->read_cursor - hdr->ring_depth; + c->read_cursor = write_after - hdr->ring_depth; + c->local_dropped += skipped; + return FC_LAPPED; /* copy torn — ref not valid, caller reads again */ + } + + /* Copy is valid. */ + ref->data = c->copy_buf; + ref->size = fsz; + ref->pts_us = pts; + ref->wall_us = wall; ref->seq = c->read_cursor; c->read_cursor++; @@ -132,6 +191,7 @@ int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms) void fc_consumer_close(fc_consumer_t *c) { if (!c) return; + if (c->copy_buf) free(c->copy_buf); sem_close(c->sem); munmap(c->base, c->shm_size); close(c->shm_fd); diff --git a/services/framecache/client/fc_client.h b/services/framecache/client/fc_client.h index 442ce16..58a22f0 100644 --- a/services/framecache/client/fc_client.h +++ b/services/framecache/client/fc_client.h @@ -26,15 +26,27 @@ extern "C" { #endif /* Return codes */ -#define FC_OK 0 -#define FC_TIMEOUT 1 /* no new frame within timeout_ms */ -#define FC_DROPPED 2 /* consumer fell behind; cursor snapped to latest */ +#define FC_OK 0 /* valid frame returned in ref */ +#define FC_TIMEOUT 1 /* no new frame within timeout_ms — ref not populated */ +#define FC_DROPPED 2 /* valid frame returned in ref, BUT one or more older + * frames were skipped first (consumer fell behind). + * ref IS populated — caller should USE the frame. */ +#define FC_LAPPED 3 /* the copy was overwritten mid-read (writer lapped the + * consumer during memcpy). ref NOT populated — caller + * should call fc_consumer_read again. */ #define FC_ERROR -1 typedef struct fc_consumer fc_consumer_t; typedef struct { - const uint8_t *data; /* zero-copy pointer into shm ring — valid until next read */ + const uint8_t *data; /* pointer to a CONSUMER-OWNED copy of the frame — + * stable until the next fc_consumer_read() call. + * (Previously a zero-copy pointer into the shm ring, + * which the writer could overwrite mid-use when it + * lapped a slow consumer. We now copy into the + * consumer's own buffer and re-validate the cursor + * AFTER the copy, so a lapped frame is discarded + * rather than streamed corrupt.) */ uint32_t size; /* bytes */ uint64_t pts_us; /* presentation timestamp (microseconds) */ uint64_t wall_us; /* wall clock at write time (microseconds) */ diff --git a/services/framecache/client/fc_pipe.c b/services/framecache/client/fc_pipe.c index 0010a7c..d28dddf 100644 --- a/services/framecache/client/fc_pipe.c +++ b/services/framecache/client/fc_pipe.c @@ -92,13 +92,24 @@ int main(int argc, char *argv[]) { if (rc == FC_TIMEOUT) continue; if (rc == FC_ERROR) break; + if (rc == FC_LAPPED) { + /* Copy was torn (writer lapped us mid-read). No valid frame to + * write — log and read again. */ + total_dropped = fc_consumer_dropped(c); + fprintf(stderr, "[fc_pipe] WARNING: frame lapped mid-read — total dropped: %llu\n", + (unsigned long long)total_dropped); + continue; + } + if (rc == FC_DROPPED) { + /* Skipped one or more older frames, but THIS frame is valid — log + * and write it (do NOT continue). */ total_dropped = fc_consumer_dropped(c); fprintf(stderr, "[fc_pipe] WARNING: consumer fell behind — total dropped: %llu\n", (unsigned long long)total_dropped); } - /* Write frame data to stdout */ + /* Write frame data to stdout (ref.data is a stable consumer-owned copy) */ if (write_all_fd(STDOUT_FILENO, ref.data, ref.size) < 0) { if (!g_stop) fprintf(stderr, "[fc_pipe] stdout EPIPE — ffmpeg exited\n"); diff --git a/services/framecache/client/fc_test_consumer.c b/services/framecache/client/fc_test_consumer.c index c809463..5f47fc3 100644 --- a/services/framecache/client/fc_test_consumer.c +++ b/services/framecache/client/fc_test_consumer.c @@ -43,6 +43,7 @@ int main(int argc, char **argv) int rc = fc_consumer_read(c, &ref, 2000); if (rc == FC_TIMEOUT) continue; if (rc == FC_ERROR) { fprintf(stderr, "read error\n"); break; } + if (rc == FC_LAPPED) { /* torn copy — no valid frame, read again */ continue; } if (rc == FC_DROPPED) { dropped = fc_consumer_dropped(c); fprintf(stderr, "[WARN] consumer fell behind — total dropped: %llu\n", diff --git a/services/framecache/src/framecache.c b/services/framecache/src/framecache.c index d1631ee..2aa517c 100644 --- a/services/framecache/src/framecache.c +++ b/services/framecache/src/framecache.c @@ -170,22 +170,38 @@ static enum MHD_Result handle_request( goto done; } - /* GET /slots */ + /* GET /slots + * Worst case: FC_MAX_SLOTS (256) × ~2KB/entry ≈ 512KB. A 64KB stack buffer + * would overflow at ~32 slots (and `pos` could pass `sizeof big`, making + * `sizeof big - pos` underflow to a huge size_t). Heap-allocate a buffer + * sized for the worst case and bound-check every append. */ if (strcmp(method, "GET") == 0 && strcmp(url, "/slots") == 0) { - char big[65536]; - int pos = 0; - pos += snprintf(big + pos, sizeof big - pos, "["); + size_t cap = (size_t)FC_MAX_SLOTS * 2100 + 64; /* worst case + brackets */ + char *big = malloc(cap); + if (!big) { + rc = respond(conn, MHD_HTTP_INTERNAL_SERVER_ERROR, + "{\"error\":\"out of memory\"}"); + goto done; + } + size_t pos = 0; + if (pos < cap) big[pos++] = '['; int first = 1; for (int i = 0; i < FC_MAX_SLOTS; i++) { if (!g_registry[i].active) continue; - char entry[2048]; + char entry[2100]; slot_to_json(g_registry[i].slot, entry, sizeof entry); + size_t elen = strlen(entry); + /* +2 for possible comma + closing bracket, +1 for NUL */ + if (pos + elen + 3 >= cap) break; /* never overflow */ if (!first) big[pos++] = ','; first = 0; - pos += snprintf(big + pos, sizeof big - pos, "%s", entry); + memcpy(big + pos, entry, elen); + pos += elen; } - snprintf(big + pos, sizeof big - pos, "]"); + if (pos + 2 < cap) big[pos++] = ']'; + big[pos] = '\0'; rc = respond(conn, MHD_HTTP_OK, big); + free(big); goto done; } diff --git a/services/framecache/src/net_ingest.c b/services/framecache/src/net_ingest.c index a5e6bbf..545e1fd 100644 --- a/services/framecache/src/net_ingest.c +++ b/services/framecache/src/net_ingest.c @@ -213,15 +213,26 @@ static void shm_writer_close(ShmWriter *sw) { if (sw->fd >= 0) { close(sw->fd); sw->fd = -1; } } -/* ── Build ffmpeg args for network decode → rawvideo stdout ────────── */ +/* ── Build ffmpeg args for network decode → rawvideo stdout ────────── + * All dynamic strings are written into CALLER-OWNED buffers (passed in) so + * there is no per-call strdup leak across listener reconnects. The video + * filter forces the EXACT target W:H (scale=W:H, not iw:ih) so a mid-stream + * source resolution change cannot desync the fixed-size frame reassembly — + * ffmpeg's scaler always emits width*height*2 bytes per frame. + * + * Caller must provide: + * url_buf — at least 320 bytes (built listener URL, or copied caller URL) + * vf_buf — at least 64 bytes (scale/format filter) + */ static int build_ffmpeg_args( char **argv, int max_args, const char *url, const char *source_type, int listen, int listen_port, const char *stream_key, - uint32_t w, uint32_t h) + uint32_t w, uint32_t h, + char *url_buf, size_t url_buf_len, + char *vf_buf, size_t vf_buf_len) { - char size_str[32]; - snprintf(size_str, sizeof size_str, "%ux%u", w, h); + (void)max_args; char port_str[16]; int i = 0; @@ -236,26 +247,29 @@ static int build_ffmpeg_args( if (!strcmp(source_type, "srt") && listen) { snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 9000); - char srt_url[256]; - snprintf(srt_url, sizeof srt_url, "srt://0.0.0.0:%s?mode=listener", port_str); - argv[i++] = "-i"; argv[i++] = strdup(srt_url); + snprintf(url_buf, url_buf_len, "srt://0.0.0.0:%s?mode=listener", port_str); + argv[i++] = "-i"; argv[i++] = url_buf; } else if (!strcmp(source_type, "rtmp") && listen) { snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 1935); - char rtmp_url[256]; - snprintf(rtmp_url, sizeof rtmp_url, "rtmp://0.0.0.0:%s/live/%s", + snprintf(url_buf, url_buf_len, "rtmp://0.0.0.0:%s/live/%s", port_str, stream_key ? stream_key : "stream"); argv[i++] = "-listen"; argv[i++] = "1"; - argv[i++] = "-i"; argv[i++] = strdup(rtmp_url); + argv[i++] = "-i"; argv[i++] = url_buf; } else { argv[i++] = "-i"; argv[i++] = (char *)url; } + /* Force EXACT output dimensions so every frame is exactly w*h*2 bytes, + * even if the source resolution changes mid-stream (SRT/RTMP reconnect to + * a different encoder). This is the resync guarantee for the fixed-size + * frame reassembly loop in main(). */ + snprintf(vf_buf, vf_buf_len, "scale=%u:%u,format=uyvy422", w, h); + /* Video output: raw UYVY422 to stdout */ argv[i++] = "-map"; argv[i++] = "0:v:0"; - argv[i++] = "-vf"; argv[i++] = "scale=iw:ih,format=uyvy422"; + argv[i++] = "-vf"; argv[i++] = vf_buf; argv[i++] = "-f"; argv[i++] = "rawvideo"; argv[i++] = "-pix_fmt"; argv[i++] = "uyvy422"; - argv[i++] = "-s"; argv[i++] = strdup(size_str); argv[i++] = "pipe:1"; argv[i] = NULL; @@ -328,12 +342,18 @@ int main(int argc, char *argv[]) { fps_den ? (double)fps_num / fps_den : 0.0, source_type, listen ? " (listener)" : ""); + /* Caller-owned arg buffers — reused each reconnect, no per-loop leak. */ + char ff_url_buf[320]; + char ff_vf_buf[64]; + /* ── Outer reconnect loop (listener mode stays alive between sessions) */ while (!g_stop) { - /* Build ffmpeg argv */ + /* Build ffmpeg argv (writes into ff_url_buf / ff_vf_buf, no strdup) */ char *ff_argv[64]; build_ffmpeg_args(ff_argv, 64, url, source_type, - listen, listen_port, stream_key, width, height); + listen, listen_port, stream_key, width, height, + ff_url_buf, sizeof ff_url_buf, + ff_vf_buf, sizeof ff_vf_buf); /* Spawn ffmpeg with stdout pipe */ int pfd[2]; diff --git a/services/node-agent/index.js b/services/node-agent/index.js index 4161f8c..67dc78a 100644 --- a/services/node-agent/index.js +++ b/services/node-agent/index.js @@ -130,7 +130,12 @@ function startNetIngest(containerId, { sourceType, sourceUrl, listen, listenPort proc.on('error', err => console.error(`[net-ingest:${slotId}] spawn error: ${err.message}`)); proc.on('exit', (c, s) => { console.log(`[net-ingest:${slotId}] exited code=${c} signal=${s}`); - _netIngestProcs.delete(containerId); + // The map key may have been remapped from the temp id to the real + // containerId after spawn. Delete by PROCESS IDENTITY, not the captured + // key, so the entry can't leak after an unexpected crash. + for (const [key, entry] of _netIngestProcs) { + if (entry.proc === proc) { _netIngestProcs.delete(key); break; } + } }); _netIngestProcs.set(containerId, { proc, slotId }); return slotId; @@ -498,29 +503,38 @@ async function handleSidecarStart(body, res) { if (sourceType === 'deltacast') { _dcSidecarCount++; startDeltacastBridge(); - // Inject per-port signal format so capture-manager uses real dimensions/fps const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14); let _portNum = NaN; try { _portNum = JSON.parse(_srcCfg).port; } catch (_) {} - if (Number.isFinite(_portNum) && _dcPortFmt.has(_portNum)) { + if (!Number.isFinite(_portNum)) _portNum = 0; + + // FC_SLOT_ID is DETERMINISTIC — the deltacast-bridge builds it as + // "deltacast--" (both known here), so we construct it + // directly and DO NOT wait for the bridge's async format JSON. This is + // the fix for the cold-start race where _dcPortFmt was still empty on + // first recorder start, silently falling back to the legacy FIFO path. + const _slotId = `deltacast-${DC_BOARD}-${_portNum}`; + sidecarEnv.push(`FC_SLOT_ID=${_slotId}`); + + // Format (width/height/fps) is best-effort enrichment from the bridge's + // stderr JSON if it has already arrived; capture-manager has sane + // defaults and waits for the slot to appear regardless. + if (_dcPortFmt.has(_portNum)) { const _fmt = _dcPortFmt.get(_portNum); const _fps = (_fmt.fps_den && _fmt.fps_den !== 1) ? `${_fmt.fps_num}/${_fmt.fps_den}` : String(_fmt.fps_num); sidecarEnv.push(`DELTACAST_VIDEO_SIZE=${_fmt.width}x${_fmt.height}`); sidecarEnv.push(`DELTACAST_FRAMERATE=${_fps}`); sidecarEnv.push(`DELTACAST_INTERLACED=${_fmt.interlaced ? '1' : '0'}`); - // Pass slot_id so capture-manager knows which framecache slot to read - if (_fmt.slot_id) sidecarEnv.push(`FC_SLOT_ID=${_fmt.slot_id}`); - console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} interlaced=${_fmt.interlaced} slot=${_fmt.slot_id}`); + console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} slot=${_slotId}`); + } else { + console.log(`[dc-bridge] port ${_portNum} slot=${_slotId} (fmt not yet available — using defaults)`); } - // IPC host — sidecar must share /dev/shm with framecache container hostConfig.IpcMode = 'host'; } // DeckLink: ensure decklink-bridge is running on the HOST. - // Bridge writes to framecache; sidecar reads via fc_client. if (sourceType === 'sdi' || sourceType === 'blackmagic') { _dlSidecarCount++; - // Determine which device indices are active on this node const _bmdDevices = []; try { const _bmdDir = '/dev/blackmagic'; @@ -528,26 +542,57 @@ async function handleSidecarStart(body, res) { _bmdEntries.forEach((_, i) => _bmdDevices.push(i)); } catch (_) { _bmdDevices.push(0); } startDecklinkBridge(_bmdDevices); - // Inject fmt if available + const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14); let _devIdx = NaN; try { _devIdx = JSON.parse(_srcCfg).device ?? JSON.parse(_srcCfg).index; } catch (_) {} if (!Number.isFinite(_devIdx)) _devIdx = parseInt((env.find(e => e.startsWith('BMD_DEVICE_INDEX=')) || '=0').split('=')[1]) || 0; - if (Number.isFinite(_devIdx) && _dlDevFmt.has(_devIdx)) { + + // FC_SLOT_ID is DETERMINISTIC — decklink-bridge builds it as + // "decklink--". Construct it directly (no wait on + // async fmt JSON). FC_NODE_ID matches what node-agent passes to the + // bridge via the NODE_ID env var. + const _slotId = `decklink-${FC_NODE_ID}-${_devIdx}`; + sidecarEnv.push(`FC_SLOT_ID=${_slotId}`); + + if (_dlDevFmt.has(_devIdx)) { const _fmt = _dlDevFmt.get(_devIdx); - if (_fmt.slot_id) sidecarEnv.push(`FC_SLOT_ID=${_fmt.slot_id}`); + const _fps = (_fmt.fps_den && _fmt.fps_den !== 1) ? `${_fmt.fps_num}/${_fmt.fps_den}` : String(_fmt.fps_num); + sidecarEnv.push(`DELTACAST_VIDEO_SIZE=${_fmt.width}x${_fmt.height}`); + sidecarEnv.push(`DELTACAST_FRAMERATE=${_fps}`); + sidecarEnv.push(`DELTACAST_INTERLACED=${_fmt.interlaced ? '1' : '0'}`); + console.log(`[dl-bridge] device ${_devIdx} fmt: ${_fmt.width}x${_fmt.height} ${_fps} slot=${_slotId}`); + } else { + console.log(`[dl-bridge] device ${_devIdx} slot=${_slotId} (fmt not yet available — using defaults)`); } hostConfig.IpcMode = 'host'; } + // Single cleanup for ALL failure paths (create fail, start fail, throw): + // decrements the right bridge counter (stopping the bridge when it hits 0) + // AND stops any net_ingest started for this request. Previously only the + // deltacast counter was decremented — blackmagic count and net_ingest leaked + // on every failed start, eventually stranding the bridge / ingest forever. + const _cleanupOnFailure = () => { + if (sourceType === 'deltacast') { + _dcSidecarCount--; + if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); } + } else if (sourceType === 'sdi' || sourceType === 'blackmagic') { + _dlSidecarCount--; + if (_dlSidecarCount <= 0) { _dlSidecarCount = 0; stopDecklinkBridge(); } + } else if (sourceType === 'srt' || sourceType === 'rtmp') { + // net_ingest may be keyed by the temp id (create not yet succeeded) or + // the real containerId (remapped). Stop whichever exists. + if (body._netIngestTempId) stopNetIngest(body._netIngestTempId); + if (containerId) stopNetIngest(containerId); + } + }; + let containerId; try { const createRes = await dockerApi('POST', '/containers/create', spec); if (createRes.status !== 201) { - if (sourceType === 'deltacast') { - _dcSidecarCount--; - if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); } - } + _cleanupOnFailure(); return jsonResponse(res, 502, { error: 'Failed to create container', details: createRes.data }); } @@ -557,11 +602,8 @@ async function handleSidecarStart(body, res) { console.log(`[sidecar-start] ${containerId} image=${image} src=${sourceType} MAM_API_URL=${_u} token=${_tok}`); const startRes = await dockerApi('POST', `/containers/${containerId}/start`); if (startRes.status !== 204) { - if (sourceType === 'deltacast') { - _dcSidecarCount--; - if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); } - } await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {}); + _cleanupOnFailure(); return jsonResponse(res, 502, { error: 'Failed to start container', details: startRes.data }); } @@ -578,14 +620,7 @@ async function handleSidecarStart(body, res) { } jsonResponse(res, 201, { containerId, capturePort }); } catch (err) { - if (sourceType === 'deltacast') { - _dcSidecarCount--; - if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); } - } - if (sourceType === 'sdi' || sourceType === 'blackmagic') { - _dlSidecarCount--; - if (_dlSidecarCount <= 0) { _dlSidecarCount = 0; stopDecklinkBridge(); } - } + _cleanupOnFailure(); throw err; } } catch (err) {