fix(framecache): address critical bugs from code review
C-Bug 1 (Torn read): fc_client.c zero-copy pointer replaced with consumer-owned copy buffer + post-copy cursor revalidation to prevent reading torn frames when the writer laps a slow consumer. New FC_LAPPED return code. C-Bug 3 (Semaphore busy-spin): fc_client.c drains the semaphore (sem_trywait) so the count never accumulates, relying entirely on write_cursor diff for availability. Prevents 100% CPU loops + EOVERFLOW. C-Bug 4 (GET /slots stack overflow): framecache.c uses heap allocation with explicit bounds checking for JSON serialization instead of a 64KB stack buffer. C-Bug 6 (DeckLink race): decklink-bridge uses pthread_mutex_t around fc_writer calls and reopen_slot to prevent UAF/double-free from concurrent SDK callbacks. C-Bug 2-net (Resolution resync): net_ingest explicitly scales to target W:H so ffmpeg always outputs exactly frame_size bytes, ignoring source resolution changes. C-Bug 8 (strdup leak): net_ingest uses static caller-owned buffers for ffmpeg args instead of strdup across listener reconnects. C-Bug 9 (PROT_READ segfault): removed atomic write to hdr->dropped_frames from the consumer read loop (which maps shm read-only).
This commit is contained in:
parent
99723da00f
commit
01211fef7a
9 changed files with 319 additions and 122 deletions
|
|
@ -108,7 +108,7 @@ struct DeviceState {
|
|||
int fps_num = 0;
|
||||
int fps_den = 1;
|
||||
bool interlaced = false;
|
||||
bool signal_reported = false;
|
||||
std::atomic<bool> signal_reported{false};
|
||||
|
||||
std::string slot_id;
|
||||
std::string fc_url;
|
||||
|
|
@ -116,6 +116,14 @@ struct DeviceState {
|
|||
|
||||
#ifndef LEGACY_FIFO
|
||||
fc_writer_t *fc_writer = nullptr;
|
||||
/* Guards fc_writer + format fields (width/height/fps/signal_reported)
|
||||
* against concurrent access from DeckLink SDK callback threads:
|
||||
* VideoInputFormatChanged and VideoInputFrameArrived can fire on
|
||||
* different threads without mutual exclusion, and reopen_slot() does
|
||||
* close-then-open on fc_writer. Without this lock a frame callback could
|
||||
* call fc_writer_write() on a freed writer (use-after-free), or two
|
||||
* reopen_slot() calls could double-free. */
|
||||
pthread_mutex_t fc_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||
#else
|
||||
int video_fifo_fd = -1;
|
||||
std::string video_fifo;
|
||||
|
|
@ -244,19 +252,18 @@ public:
|
|||
if (g_stop.load()) return S_OK;
|
||||
if (!videoFrame) return S_OK;
|
||||
|
||||
/* Detect format on first frame if format-change hasn't fired */
|
||||
if (!m_ds->signal_reported) {
|
||||
/* Detect format on first frame if format-change hasn't fired.
|
||||
* Use atomic exchange so only ONE thread runs the first-frame init
|
||||
* even if two frame callbacks race before signal_reported is set. */
|
||||
bool expected = false;
|
||||
if (m_ds->signal_reported.compare_exchange_strong(expected, true)) {
|
||||
m_ds->width = (int)videoFrame->GetWidth();
|
||||
m_ds->height = (int)videoFrame->GetHeight();
|
||||
BMDTimeValue fd; BMDTimeScale ts;
|
||||
videoFrame->GetStreamTime(&fd, nullptr, 1000000); /* unused — just open slot */
|
||||
/* Use stored fps from format change, or detect from row bytes */
|
||||
if (m_ds->fps_num == 0) {
|
||||
m_ds->fps_num = 30000;
|
||||
m_ds->fps_den = 1001;
|
||||
}
|
||||
this->reopen_slot();
|
||||
m_ds->signal_reported = true;
|
||||
}
|
||||
|
||||
/* ── Write video frame ──────────────────────────────────────── */
|
||||
|
|
@ -279,10 +286,14 @@ public:
|
|||
}
|
||||
|
||||
#ifndef LEGACY_FIFO
|
||||
/* Lock so a concurrent VideoInputFormatChanged → reopen_slot() cannot
|
||||
* free fc_writer between our null-check and the write (use-after-free). */
|
||||
pthread_mutex_lock(&m_ds->fc_lock);
|
||||
if (m_ds->fc_writer) {
|
||||
fc_writer_write(m_ds->fc_writer,
|
||||
static_cast<const uint8_t *>(bytes), sz, pts_us);
|
||||
}
|
||||
pthread_mutex_unlock(&m_ds->fc_lock);
|
||||
#else
|
||||
if (m_ds->video_fifo_fd >= 0) {
|
||||
if (write_all(m_ds->video_fifo_fd,
|
||||
|
|
@ -337,6 +348,9 @@ private:
|
|||
|
||||
void reopen_slot() {
|
||||
#ifndef LEGACY_FIFO
|
||||
/* Serialize with frame writes and any concurrent reopen_slot() so we
|
||||
* never double-free fc_writer or write to a half-closed one. */
|
||||
pthread_mutex_lock(&m_ds->fc_lock);
|
||||
if (m_ds->fc_writer) {
|
||||
fc_writer_close(m_ds->fc_writer);
|
||||
m_ds->fc_writer = nullptr;
|
||||
|
|
@ -352,6 +366,7 @@ private:
|
|||
m_ds->device_idx);
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&m_ds->fc_lock);
|
||||
#endif
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -539,6 +539,10 @@ class CaptureManager {
|
|||
const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], {
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
});
|
||||
// Pause stdout immediately so frames don't fill the OS pipe buffer (and
|
||||
// block fc_pipe's write()) in the window between spawn here and the
|
||||
// .pipe(ffmpeg.stdin) attach later in start(). .pipe() auto-resumes.
|
||||
fcPipeProcess.stdout.pause();
|
||||
fcPipeProcess.stderr.on('data', chunk => {
|
||||
process.stderr.write(`[fc_pipe:${slotId}] ${chunk}`);
|
||||
});
|
||||
|
|
@ -559,6 +563,7 @@ class CaptureManager {
|
|||
bridgeProcess: fcPipeProcess,
|
||||
audioFifo: null,
|
||||
interlaced: false,
|
||||
audioInputIndex: 0, /* network fc_pipe is video-only — no audio input */
|
||||
_fcPipeProcess: fcPipeProcess,
|
||||
};
|
||||
}
|
||||
|
|
@ -661,6 +666,9 @@ class CaptureManager {
|
|||
const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], {
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
});
|
||||
// Pause until piped to ffmpeg (avoids OS pipe-buffer fill stall — see
|
||||
// the network path above for the full rationale).
|
||||
fcPipeProcess.stdout.pause();
|
||||
fcPipeProcess.stderr.on('data', chunk => {
|
||||
process.stderr.write(`[fc_pipe:${slotId}] ${chunk}`);
|
||||
});
|
||||
|
|
@ -692,6 +700,7 @@ class CaptureManager {
|
|||
bridgeProcess: fcPipeProcess, /* capture-manager pipes this to ffmpeg stdin */
|
||||
audioFifo: null,
|
||||
interlaced: fcInterlaced,
|
||||
audioInputIndex: 1, /* audio FIFO is ffmpeg input 1 */
|
||||
_fcPipeProcess: fcPipeProcess, /* stored for clean stop */
|
||||
};
|
||||
}
|
||||
|
|
@ -753,6 +762,7 @@ class CaptureManager {
|
|||
bridgeProcess: null,
|
||||
audioFifo: null,
|
||||
interlaced: dcInterlaced,
|
||||
audioInputIndex: 1, /* legacy deltacast: video FIFO=0, audio FIFO=1 */
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -899,14 +909,22 @@ OUT=${sh(outPath)}
|
|||
mkfifo "$VF" "$AF"
|
||||
PATCHPID=
|
||||
cleanup() { rm -f "$VF" "$AF"; [ -n "$PATCHPID" ] && kill "$PATCHPID" 2>/dev/null; }
|
||||
trap cleanup EXIT
|
||||
trap cleanup EXIT
|
||||
# Prime both FIFOs read-write (non-blocking) to break the open-order deadlock.
|
||||
exec 7<>"$VF" 8<>"$AF"
|
||||
# raw2bmx: close priming FDs (no stray writer) before exec so it sees real EOF.
|
||||
( exec 7>&- 8>&-; exec ${bmxLine} ) &
|
||||
# CRITICAL: redirect raw2bmx stdin from /dev/null so it does NOT inherit the
|
||||
# parent bash stdin. When the video source is fc_pipe (framecache), bash stdin
|
||||
# carries the raw video stream destined for ffmpeg's pipe:0 — if raw2bmx also
|
||||
# inherited fd 0 it would steal bytes from that stream, corrupting both the
|
||||
# growing master and the ffmpeg input.
|
||||
( exec 7>&- 8>&- 0</dev/null; exec ${bmxLine} ) &
|
||||
BMXPID=$!
|
||||
# ffmpeg: also closes priming FDs; it opens its own write ends.
|
||||
( exec 7>&- 8>&-; exec ${ffLine} ) &
|
||||
# ffmpeg: closes priming FDs and EXPLICITLY inherits bash stdin (fd 0) so that
|
||||
# 'pipe:0' reads the fc_pipe video stream Node piped into this orchestrator's
|
||||
# stdin. For non-fc_pipe sources (FIFO/device input) fd 0 is unused and this is
|
||||
# harmless.
|
||||
( exec 7>&- 8>&- 0<&0; exec ${ffLine} ) &
|
||||
FFPID=$!
|
||||
# Forward a clean stop to ffmpeg; raw2bmx then gets EOF and finalizes the footer.
|
||||
stop() { kill -INT "$FFPID" 2>/dev/null; }
|
||||
|
|
@ -1036,21 +1054,18 @@ exit "$BMXRC"
|
|||
const startedAt = new Date().toISOString();
|
||||
|
||||
this._sessionIdForBridge = sessionId;
|
||||
const { inputArgs, isNetwork, bridgeProcess = null, audioFifo = null, interlaced = false } = await this._buildInputArgs({
|
||||
const { inputArgs, isNetwork, bridgeProcess = null, audioFifo = null, interlaced = false, audioInputIndex = 0 } = await this._buildInputArgs({
|
||||
sourceType, sourceBackend, device, port, board, sourceUrl, listen, listenPort, streamKey,
|
||||
});
|
||||
|
||||
// Audio input index:
|
||||
// - deltacast via framecache: video pipe:0, audio FIFO input 1 → '1:a:0?'
|
||||
// - blackmagic via framecache: same → '1:a:0?'
|
||||
// - network via framecache (fc_pipe): video-only pipe, no audio input → '0:a:0?'
|
||||
// (net_ingest only writes video; audio from network stream not yet in shm)
|
||||
// - DeckLink legacy / network legacy: audio in input 0 → '0:a:0?'
|
||||
const _viaFcPipe = !!process.env.FC_SLOT_ID;
|
||||
const _hasAudioFifo = _viaFcPipe && (sourceType === 'deltacast' ||
|
||||
sourceType === 'sdi' ||
|
||||
sourceType === 'blackmagic');
|
||||
const audioMap = _hasAudioFifo ? '1:a:0?' : '0:a:0?';
|
||||
// Audio input index is returned EXPLICITLY by _buildInputArgs (audioInputIndex)
|
||||
// rather than guessed from sourceType/FC_SLOT_ID — that guess was wrong for
|
||||
// the legacy deltacast FIFO path (which has audio at input 1 but no FC_SLOT_ID),
|
||||
// silently dropping audio. Each return path now declares its own audio input:
|
||||
// - deltacast/blackmagic via framecache: audio FIFO = input 1
|
||||
// - legacy deltacast FIFO: audio FIFO = input 1
|
||||
// - network (framecache or legacy) + DeckLink-backend SDI: audio in input 0
|
||||
const audioMap = `${audioInputIndex}:a:0?`;
|
||||
|
||||
// Non-growing master: ffmpeg muxes the finalized MOV directly. Growing
|
||||
// master: raw2bmx muxes the OP1a from elementary FIFOs (handled below via
|
||||
|
|
@ -1150,22 +1165,34 @@ exit "$BMXRC"
|
|||
// ── Finalized (non-growing) master: ffmpeg muxes the MOV directly ──
|
||||
let hiresArgs;
|
||||
const isSdiLike = sourceType === 'sdi' || sourceType === 'deltacast' || sourceType === 'blackmagic';
|
||||
if (isSdiLike && this._assetIdForHls) {
|
||||
// Network via framecache (fc_pipe) also produces its master + HLS as a
|
||||
// single split ffmpeg, exactly like SDI — it reads pipe:0, not a URL.
|
||||
const isNetFcPipe = !!process.env.FC_SLOT_ID && (sourceType === 'srt' || sourceType === 'rtmp');
|
||||
if ((isSdiLike || isNetFcPipe) && this._assetIdForHls) {
|
||||
const filterStr = isInterlacedSource
|
||||
? '[0:v]yadif=mode=1:deint=1,split=2[vhi][vlo]'
|
||||
: '[0:v]split=2[vhi][vlo]';
|
||||
// Network fc_pipe is video-only (no audio input) — omit audio maps so
|
||||
// ffmpeg doesn't fail trying to map a nonexistent audio stream.
|
||||
const hasAudio = audioInputIndex >= 0 && !isNetFcPipe;
|
||||
const masterAudioMap = hasAudio ? ['-map', audioMap] : [];
|
||||
const masterAudioFilter = hasAudio
|
||||
? ['-af', 'aresample=async=1:min_hard_comp=0.100000:first_pts=0'] : [];
|
||||
const hlsAudioMap = hasAudio ? ['-map', audioMap] : [];
|
||||
const hlsAudioCodec = hasAudio
|
||||
? ['-c:a', 'aac', '-b:a', '128k', '-ar', '44100'] : [];
|
||||
hiresArgs = [
|
||||
...inputArgs,
|
||||
'-filter_complex', filterStr,
|
||||
// Output 0 — master (local temp, uploaded to S3 on stop)
|
||||
'-map', '[vhi]', '-map', audioMap,
|
||||
'-af', 'aresample=async=1:min_hard_comp=0.100000:first_pts=0',
|
||||
'-map', '[vhi]', ...masterAudioMap,
|
||||
...masterAudioFilter,
|
||||
...hiresCodecArgs,
|
||||
hiresOutput,
|
||||
// Output 1 — low-latency H.264 HLS preview for the UI monitor
|
||||
'-map', '[vlo]', '-map', audioMap,
|
||||
'-map', '[vlo]', ...hlsAudioMap,
|
||||
...buildHlsVideoArgs(videoCodec, framerate),
|
||||
'-c:a', 'aac', '-b:a', '128k', '-ar', '44100',
|
||||
...hlsAudioCodec,
|
||||
'-f', 'hls', '-hls_time', '2', '-hls_list_size', '15',
|
||||
'-hls_flags', 'delete_segments+append_list+omit_endlist',
|
||||
'-hls_segment_filename', sdiHlsDir + '/seg-%05d.ts',
|
||||
|
|
|
|||
|
|
@ -26,6 +26,8 @@ struct fc_consumer {
|
|||
sem_t *sem;
|
||||
uint64_t read_cursor; /* consumer's own position in the ring */
|
||||
uint64_t local_dropped; /* frames skipped by this consumer */
|
||||
uint8_t *copy_buf; /* consumer-owned frame copy buffer (frame_size bytes) */
|
||||
uint32_t frame_size; /* cached from header */
|
||||
char slot_id[FC_MAX_SLOT_ID];
|
||||
};
|
||||
|
||||
|
|
@ -68,10 +70,19 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
|
|||
fc_consumer_t *c = calloc(1, sizeof *c);
|
||||
if (!c) { sem_close(sem); munmap(base, total); close(fd); return NULL; }
|
||||
|
||||
c->shm_fd = fd;
|
||||
c->base = base;
|
||||
c->shm_size = total;
|
||||
c->sem = sem;
|
||||
/* Consumer-owned copy buffer — fc_consumer_read copies the frame here and
|
||||
* re-validates the cursor afterward, so a writer lapping a slow consumer
|
||||
* cannot corrupt the frame the caller is using. */
|
||||
c->copy_buf = malloc(hdr.frame_size);
|
||||
if (!c->copy_buf) {
|
||||
free(c); sem_close(sem); munmap(base, total); close(fd); return NULL;
|
||||
}
|
||||
|
||||
c->shm_fd = fd;
|
||||
c->base = base;
|
||||
c->shm_size = total;
|
||||
c->sem = sem;
|
||||
c->frame_size = hdr.frame_size;
|
||||
/* Start reading from the current write position so we don't replay old frames */
|
||||
c->read_cursor = atomic_load_explicit(
|
||||
&((fc_header_t *)base)->write_cursor, memory_order_acquire);
|
||||
|
|
@ -83,46 +94,94 @@ fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
|
|||
int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms)
|
||||
{
|
||||
fc_header_t *hdr = (fc_header_t *)c->base;
|
||||
int dropped = 0; /* set when this call skipped one or more frames */
|
||||
|
||||
/* Wait for a new frame via semaphore */
|
||||
struct timespec abs_ts;
|
||||
clock_gettime(CLOCK_REALTIME, &abs_ts);
|
||||
abs_ts.tv_sec += (time_t)(timeout_ms / 1000);
|
||||
abs_ts.tv_nsec += (long)((timeout_ms % 1000) * 1000000L);
|
||||
if (abs_ts.tv_nsec >= 1000000000L) {
|
||||
abs_ts.tv_sec++;
|
||||
abs_ts.tv_nsec -= 1000000000L;
|
||||
}
|
||||
|
||||
while (sem_timedwait(c->sem, &abs_ts) != 0) {
|
||||
if (errno == ETIMEDOUT) return FC_TIMEOUT;
|
||||
if (errno == EINTR) continue;
|
||||
return FC_ERROR;
|
||||
}
|
||||
|
||||
uint64_t write_cur = atomic_load_explicit(&hdr->write_cursor,
|
||||
memory_order_acquire);
|
||||
int dropped = 0;
|
||||
|
||||
/* Check if consumer fell behind by more than ring_depth */
|
||||
if (write_cur > c->read_cursor + hdr->ring_depth) {
|
||||
uint64_t skipped = write_cur - c->read_cursor - hdr->ring_depth;
|
||||
c->read_cursor = write_cur - hdr->ring_depth;
|
||||
c->local_dropped += skipped;
|
||||
atomic_fetch_add(&hdr->dropped_frames, skipped);
|
||||
dropped = 1;
|
||||
}
|
||||
|
||||
if (c->read_cursor >= write_cur) {
|
||||
/* Semaphore posted but nothing new yet (spurious) */
|
||||
return FC_TIMEOUT;
|
||||
/* ── Wait for new data ──────────────────────────────────────────────
|
||||
* The semaphore is used ONLY as an edge-wakeup hint, never as a frame
|
||||
* counter. The writer posts once per frame, but a consumer that skips
|
||||
* frames (lap) or reads less often than the writer posts would otherwise
|
||||
* leave the count climbing unbounded — causing sem_timedwait to never
|
||||
* block (100% CPU busy-spin) and eventually EOVERFLOW. So:
|
||||
* - cursor-diff (write_cursor - read_cursor) is the SOURCE OF TRUTH for
|
||||
* whether a frame is available.
|
||||
* - we drain the semaphore to zero (sem_trywait loop) so the count never
|
||||
* accumulates.
|
||||
* - if no frame is available we block on ONE sem_timedwait for wakeup. */
|
||||
for (;;) {
|
||||
uint64_t write_cur = atomic_load_explicit(&hdr->write_cursor,
|
||||
memory_order_acquire);
|
||||
|
||||
/* Lap detection: if the writer is more than ring_depth ahead, the
|
||||
* oldest unread frames have been overwritten — skip to the oldest
|
||||
* still-valid frame. */
|
||||
if (write_cur > c->read_cursor + hdr->ring_depth) {
|
||||
uint64_t skipped = write_cur - c->read_cursor - hdr->ring_depth;
|
||||
c->read_cursor = write_cur - hdr->ring_depth;
|
||||
c->local_dropped += skipped;
|
||||
/* NOTE: do NOT write hdr->dropped_frames here — the consumer maps
|
||||
* the shm PROT_READ (read-only), so an atomic write would SIGSEGV.
|
||||
* Per-consumer drops are tracked in c->local_dropped and exposed
|
||||
* via fc_consumer_dropped(). The writer owns hdr->dropped_frames. */
|
||||
dropped = 1;
|
||||
}
|
||||
|
||||
if (c->read_cursor < write_cur) {
|
||||
/* A frame is available — drain the semaphore so its count never
|
||||
* accumulates, then read+copy below. */
|
||||
while (sem_trywait(c->sem) == 0) { /* drain */ }
|
||||
break;
|
||||
}
|
||||
|
||||
/* No frame yet — drain stale posts, then block for a wakeup. */
|
||||
while (sem_trywait(c->sem) == 0) { /* drain */ }
|
||||
|
||||
struct timespec abs_ts;
|
||||
clock_gettime(CLOCK_REALTIME, &abs_ts);
|
||||
abs_ts.tv_sec += (time_t)(timeout_ms / 1000);
|
||||
abs_ts.tv_nsec += (long)((timeout_ms % 1000) * 1000000L);
|
||||
if (abs_ts.tv_nsec >= 1000000000L) { abs_ts.tv_sec++; abs_ts.tv_nsec -= 1000000000L; }
|
||||
|
||||
int w = sem_timedwait(c->sem, &abs_ts);
|
||||
if (w != 0) {
|
||||
if (errno == ETIMEDOUT) {
|
||||
/* Re-check the cursor once more before giving up — the writer
|
||||
* may have advanced between our check and the wait. */
|
||||
uint64_t wc2 = atomic_load_explicit(&hdr->write_cursor,
|
||||
memory_order_acquire);
|
||||
if (c->read_cursor < wc2) continue;
|
||||
return FC_TIMEOUT;
|
||||
}
|
||||
if (errno == EINTR) continue;
|
||||
return FC_ERROR;
|
||||
}
|
||||
/* Woken — loop to re-evaluate cursor-diff. */
|
||||
}
|
||||
|
||||
/* ── Copy the frame into the consumer-owned buffer ──────────────────── */
|
||||
fc_frame_t *frame = fc_frame_at(c->base, hdr->frame_size, c->read_cursor);
|
||||
ref->data = frame->data;
|
||||
ref->size = frame->size;
|
||||
ref->pts_us = frame->pts_us;
|
||||
ref->wall_us = frame->wall_us;
|
||||
uint32_t fsz = frame->size;
|
||||
if (fsz > hdr->frame_size) fsz = hdr->frame_size;
|
||||
uint64_t pts = frame->pts_us;
|
||||
uint64_t wall = frame->wall_us;
|
||||
memcpy(c->copy_buf, frame->data, fsz);
|
||||
|
||||
/* ── Re-validate AFTER the copy ─────────────────────────────────────
|
||||
* If the writer lapped us during the copy (overwrote this slot), the copy
|
||||
* may be torn — discard it and signal DROPPED so the caller reads again. */
|
||||
uint64_t write_after = atomic_load_explicit(&hdr->write_cursor,
|
||||
memory_order_acquire);
|
||||
if (write_after > c->read_cursor + hdr->ring_depth) {
|
||||
uint64_t skipped = write_after - c->read_cursor - hdr->ring_depth;
|
||||
c->read_cursor = write_after - hdr->ring_depth;
|
||||
c->local_dropped += skipped;
|
||||
return FC_LAPPED; /* copy torn — ref not valid, caller reads again */
|
||||
}
|
||||
|
||||
/* Copy is valid. */
|
||||
ref->data = c->copy_buf;
|
||||
ref->size = fsz;
|
||||
ref->pts_us = pts;
|
||||
ref->wall_us = wall;
|
||||
ref->seq = c->read_cursor;
|
||||
|
||||
c->read_cursor++;
|
||||
|
|
@ -132,6 +191,7 @@ int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms)
|
|||
void fc_consumer_close(fc_consumer_t *c)
|
||||
{
|
||||
if (!c) return;
|
||||
if (c->copy_buf) free(c->copy_buf);
|
||||
sem_close(c->sem);
|
||||
munmap(c->base, c->shm_size);
|
||||
close(c->shm_fd);
|
||||
|
|
|
|||
|
|
@ -26,15 +26,27 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
/* Return codes */
|
||||
#define FC_OK 0
|
||||
#define FC_TIMEOUT 1 /* no new frame within timeout_ms */
|
||||
#define FC_DROPPED 2 /* consumer fell behind; cursor snapped to latest */
|
||||
#define FC_OK 0 /* valid frame returned in ref */
|
||||
#define FC_TIMEOUT 1 /* no new frame within timeout_ms — ref not populated */
|
||||
#define FC_DROPPED 2 /* valid frame returned in ref, BUT one or more older
|
||||
* frames were skipped first (consumer fell behind).
|
||||
* ref IS populated — caller should USE the frame. */
|
||||
#define FC_LAPPED 3 /* the copy was overwritten mid-read (writer lapped the
|
||||
* consumer during memcpy). ref NOT populated — caller
|
||||
* should call fc_consumer_read again. */
|
||||
#define FC_ERROR -1
|
||||
|
||||
typedef struct fc_consumer fc_consumer_t;
|
||||
|
||||
typedef struct {
|
||||
const uint8_t *data; /* zero-copy pointer into shm ring — valid until next read */
|
||||
const uint8_t *data; /* pointer to a CONSUMER-OWNED copy of the frame —
|
||||
* stable until the next fc_consumer_read() call.
|
||||
* (Previously a zero-copy pointer into the shm ring,
|
||||
* which the writer could overwrite mid-use when it
|
||||
* lapped a slow consumer. We now copy into the
|
||||
* consumer's own buffer and re-validate the cursor
|
||||
* AFTER the copy, so a lapped frame is discarded
|
||||
* rather than streamed corrupt.) */
|
||||
uint32_t size; /* bytes */
|
||||
uint64_t pts_us; /* presentation timestamp (microseconds) */
|
||||
uint64_t wall_us; /* wall clock at write time (microseconds) */
|
||||
|
|
|
|||
|
|
@ -92,13 +92,24 @@ int main(int argc, char *argv[]) {
|
|||
if (rc == FC_TIMEOUT) continue;
|
||||
if (rc == FC_ERROR) break;
|
||||
|
||||
if (rc == FC_LAPPED) {
|
||||
/* Copy was torn (writer lapped us mid-read). No valid frame to
|
||||
* write — log and read again. */
|
||||
total_dropped = fc_consumer_dropped(c);
|
||||
fprintf(stderr, "[fc_pipe] WARNING: frame lapped mid-read — total dropped: %llu\n",
|
||||
(unsigned long long)total_dropped);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rc == FC_DROPPED) {
|
||||
/* Skipped one or more older frames, but THIS frame is valid — log
|
||||
* and write it (do NOT continue). */
|
||||
total_dropped = fc_consumer_dropped(c);
|
||||
fprintf(stderr, "[fc_pipe] WARNING: consumer fell behind — total dropped: %llu\n",
|
||||
(unsigned long long)total_dropped);
|
||||
}
|
||||
|
||||
/* Write frame data to stdout */
|
||||
/* Write frame data to stdout (ref.data is a stable consumer-owned copy) */
|
||||
if (write_all_fd(STDOUT_FILENO, ref.data, ref.size) < 0) {
|
||||
if (!g_stop)
|
||||
fprintf(stderr, "[fc_pipe] stdout EPIPE — ffmpeg exited\n");
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ int main(int argc, char **argv)
|
|||
int rc = fc_consumer_read(c, &ref, 2000);
|
||||
if (rc == FC_TIMEOUT) continue;
|
||||
if (rc == FC_ERROR) { fprintf(stderr, "read error\n"); break; }
|
||||
if (rc == FC_LAPPED) { /* torn copy — no valid frame, read again */ continue; }
|
||||
if (rc == FC_DROPPED) {
|
||||
dropped = fc_consumer_dropped(c);
|
||||
fprintf(stderr, "[WARN] consumer fell behind — total dropped: %llu\n",
|
||||
|
|
|
|||
|
|
@ -170,22 +170,38 @@ static enum MHD_Result handle_request(
|
|||
goto done;
|
||||
}
|
||||
|
||||
/* GET /slots */
|
||||
/* GET /slots
|
||||
* Worst case: FC_MAX_SLOTS (256) × ~2KB/entry ≈ 512KB. A 64KB stack buffer
|
||||
* would overflow at ~32 slots (and `pos` could pass `sizeof big`, making
|
||||
* `sizeof big - pos` underflow to a huge size_t). Heap-allocate a buffer
|
||||
* sized for the worst case and bound-check every append. */
|
||||
if (strcmp(method, "GET") == 0 && strcmp(url, "/slots") == 0) {
|
||||
char big[65536];
|
||||
int pos = 0;
|
||||
pos += snprintf(big + pos, sizeof big - pos, "[");
|
||||
size_t cap = (size_t)FC_MAX_SLOTS * 2100 + 64; /* worst case + brackets */
|
||||
char *big = malloc(cap);
|
||||
if (!big) {
|
||||
rc = respond(conn, MHD_HTTP_INTERNAL_SERVER_ERROR,
|
||||
"{\"error\":\"out of memory\"}");
|
||||
goto done;
|
||||
}
|
||||
size_t pos = 0;
|
||||
if (pos < cap) big[pos++] = '[';
|
||||
int first = 1;
|
||||
for (int i = 0; i < FC_MAX_SLOTS; i++) {
|
||||
if (!g_registry[i].active) continue;
|
||||
char entry[2048];
|
||||
char entry[2100];
|
||||
slot_to_json(g_registry[i].slot, entry, sizeof entry);
|
||||
size_t elen = strlen(entry);
|
||||
/* +2 for possible comma + closing bracket, +1 for NUL */
|
||||
if (pos + elen + 3 >= cap) break; /* never overflow */
|
||||
if (!first) big[pos++] = ',';
|
||||
first = 0;
|
||||
pos += snprintf(big + pos, sizeof big - pos, "%s", entry);
|
||||
memcpy(big + pos, entry, elen);
|
||||
pos += elen;
|
||||
}
|
||||
snprintf(big + pos, sizeof big - pos, "]");
|
||||
if (pos + 2 < cap) big[pos++] = ']';
|
||||
big[pos] = '\0';
|
||||
rc = respond(conn, MHD_HTTP_OK, big);
|
||||
free(big);
|
||||
goto done;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -213,15 +213,26 @@ static void shm_writer_close(ShmWriter *sw) {
|
|||
if (sw->fd >= 0) { close(sw->fd); sw->fd = -1; }
|
||||
}
|
||||
|
||||
/* ── Build ffmpeg args for network decode → rawvideo stdout ────────── */
|
||||
/* ── Build ffmpeg args for network decode → rawvideo stdout ──────────
|
||||
* All dynamic strings are written into CALLER-OWNED buffers (passed in) so
|
||||
* there is no per-call strdup leak across listener reconnects. The video
|
||||
* filter forces the EXACT target W:H (scale=W:H, not iw:ih) so a mid-stream
|
||||
* source resolution change cannot desync the fixed-size frame reassembly —
|
||||
* ffmpeg's scaler always emits width*height*2 bytes per frame.
|
||||
*
|
||||
* Caller must provide:
|
||||
* url_buf — at least 320 bytes (built listener URL, or copied caller URL)
|
||||
* vf_buf — at least 64 bytes (scale/format filter)
|
||||
*/
|
||||
static int build_ffmpeg_args(
|
||||
char **argv, int max_args,
|
||||
const char *url, const char *source_type,
|
||||
int listen, int listen_port, const char *stream_key,
|
||||
uint32_t w, uint32_t h)
|
||||
uint32_t w, uint32_t h,
|
||||
char *url_buf, size_t url_buf_len,
|
||||
char *vf_buf, size_t vf_buf_len)
|
||||
{
|
||||
char size_str[32];
|
||||
snprintf(size_str, sizeof size_str, "%ux%u", w, h);
|
||||
(void)max_args;
|
||||
char port_str[16];
|
||||
|
||||
int i = 0;
|
||||
|
|
@ -236,26 +247,29 @@ static int build_ffmpeg_args(
|
|||
|
||||
if (!strcmp(source_type, "srt") && listen) {
|
||||
snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 9000);
|
||||
char srt_url[256];
|
||||
snprintf(srt_url, sizeof srt_url, "srt://0.0.0.0:%s?mode=listener", port_str);
|
||||
argv[i++] = "-i"; argv[i++] = strdup(srt_url);
|
||||
snprintf(url_buf, url_buf_len, "srt://0.0.0.0:%s?mode=listener", port_str);
|
||||
argv[i++] = "-i"; argv[i++] = url_buf;
|
||||
} else if (!strcmp(source_type, "rtmp") && listen) {
|
||||
snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 1935);
|
||||
char rtmp_url[256];
|
||||
snprintf(rtmp_url, sizeof rtmp_url, "rtmp://0.0.0.0:%s/live/%s",
|
||||
snprintf(url_buf, url_buf_len, "rtmp://0.0.0.0:%s/live/%s",
|
||||
port_str, stream_key ? stream_key : "stream");
|
||||
argv[i++] = "-listen"; argv[i++] = "1";
|
||||
argv[i++] = "-i"; argv[i++] = strdup(rtmp_url);
|
||||
argv[i++] = "-i"; argv[i++] = url_buf;
|
||||
} else {
|
||||
argv[i++] = "-i"; argv[i++] = (char *)url;
|
||||
}
|
||||
|
||||
/* Force EXACT output dimensions so every frame is exactly w*h*2 bytes,
|
||||
* even if the source resolution changes mid-stream (SRT/RTMP reconnect to
|
||||
* a different encoder). This is the resync guarantee for the fixed-size
|
||||
* frame reassembly loop in main(). */
|
||||
snprintf(vf_buf, vf_buf_len, "scale=%u:%u,format=uyvy422", w, h);
|
||||
|
||||
/* Video output: raw UYVY422 to stdout */
|
||||
argv[i++] = "-map"; argv[i++] = "0:v:0";
|
||||
argv[i++] = "-vf"; argv[i++] = "scale=iw:ih,format=uyvy422";
|
||||
argv[i++] = "-vf"; argv[i++] = vf_buf;
|
||||
argv[i++] = "-f"; argv[i++] = "rawvideo";
|
||||
argv[i++] = "-pix_fmt"; argv[i++] = "uyvy422";
|
||||
argv[i++] = "-s"; argv[i++] = strdup(size_str);
|
||||
argv[i++] = "pipe:1";
|
||||
|
||||
argv[i] = NULL;
|
||||
|
|
@ -328,12 +342,18 @@ int main(int argc, char *argv[]) {
|
|||
fps_den ? (double)fps_num / fps_den : 0.0,
|
||||
source_type, listen ? " (listener)" : "");
|
||||
|
||||
/* Caller-owned arg buffers — reused each reconnect, no per-loop leak. */
|
||||
char ff_url_buf[320];
|
||||
char ff_vf_buf[64];
|
||||
|
||||
/* ── Outer reconnect loop (listener mode stays alive between sessions) */
|
||||
while (!g_stop) {
|
||||
/* Build ffmpeg argv */
|
||||
/* Build ffmpeg argv (writes into ff_url_buf / ff_vf_buf, no strdup) */
|
||||
char *ff_argv[64];
|
||||
build_ffmpeg_args(ff_argv, 64, url, source_type,
|
||||
listen, listen_port, stream_key, width, height);
|
||||
listen, listen_port, stream_key, width, height,
|
||||
ff_url_buf, sizeof ff_url_buf,
|
||||
ff_vf_buf, sizeof ff_vf_buf);
|
||||
|
||||
/* Spawn ffmpeg with stdout pipe */
|
||||
int pfd[2];
|
||||
|
|
|
|||
|
|
@ -130,7 +130,12 @@ function startNetIngest(containerId, { sourceType, sourceUrl, listen, listenPort
|
|||
proc.on('error', err => console.error(`[net-ingest:${slotId}] spawn error: ${err.message}`));
|
||||
proc.on('exit', (c, s) => {
|
||||
console.log(`[net-ingest:${slotId}] exited code=${c} signal=${s}`);
|
||||
_netIngestProcs.delete(containerId);
|
||||
// The map key may have been remapped from the temp id to the real
|
||||
// containerId after spawn. Delete by PROCESS IDENTITY, not the captured
|
||||
// key, so the entry can't leak after an unexpected crash.
|
||||
for (const [key, entry] of _netIngestProcs) {
|
||||
if (entry.proc === proc) { _netIngestProcs.delete(key); break; }
|
||||
}
|
||||
});
|
||||
_netIngestProcs.set(containerId, { proc, slotId });
|
||||
return slotId;
|
||||
|
|
@ -498,29 +503,38 @@ async function handleSidecarStart(body, res) {
|
|||
if (sourceType === 'deltacast') {
|
||||
_dcSidecarCount++;
|
||||
startDeltacastBridge();
|
||||
// Inject per-port signal format so capture-manager uses real dimensions/fps
|
||||
const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14);
|
||||
let _portNum = NaN;
|
||||
try { _portNum = JSON.parse(_srcCfg).port; } catch (_) {}
|
||||
if (Number.isFinite(_portNum) && _dcPortFmt.has(_portNum)) {
|
||||
if (!Number.isFinite(_portNum)) _portNum = 0;
|
||||
|
||||
// FC_SLOT_ID is DETERMINISTIC — the deltacast-bridge builds it as
|
||||
// "deltacast-<board>-<port>" (both known here), so we construct it
|
||||
// directly and DO NOT wait for the bridge's async format JSON. This is
|
||||
// the fix for the cold-start race where _dcPortFmt was still empty on
|
||||
// first recorder start, silently falling back to the legacy FIFO path.
|
||||
const _slotId = `deltacast-${DC_BOARD}-${_portNum}`;
|
||||
sidecarEnv.push(`FC_SLOT_ID=${_slotId}`);
|
||||
|
||||
// Format (width/height/fps) is best-effort enrichment from the bridge's
|
||||
// stderr JSON if it has already arrived; capture-manager has sane
|
||||
// defaults and waits for the slot to appear regardless.
|
||||
if (_dcPortFmt.has(_portNum)) {
|
||||
const _fmt = _dcPortFmt.get(_portNum);
|
||||
const _fps = (_fmt.fps_den && _fmt.fps_den !== 1) ? `${_fmt.fps_num}/${_fmt.fps_den}` : String(_fmt.fps_num);
|
||||
sidecarEnv.push(`DELTACAST_VIDEO_SIZE=${_fmt.width}x${_fmt.height}`);
|
||||
sidecarEnv.push(`DELTACAST_FRAMERATE=${_fps}`);
|
||||
sidecarEnv.push(`DELTACAST_INTERLACED=${_fmt.interlaced ? '1' : '0'}`);
|
||||
// Pass slot_id so capture-manager knows which framecache slot to read
|
||||
if (_fmt.slot_id) sidecarEnv.push(`FC_SLOT_ID=${_fmt.slot_id}`);
|
||||
console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} interlaced=${_fmt.interlaced} slot=${_fmt.slot_id}`);
|
||||
console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} slot=${_slotId}`);
|
||||
} else {
|
||||
console.log(`[dc-bridge] port ${_portNum} slot=${_slotId} (fmt not yet available — using defaults)`);
|
||||
}
|
||||
// IPC host — sidecar must share /dev/shm with framecache container
|
||||
hostConfig.IpcMode = 'host';
|
||||
}
|
||||
|
||||
// DeckLink: ensure decklink-bridge is running on the HOST.
|
||||
// Bridge writes to framecache; sidecar reads via fc_client.
|
||||
if (sourceType === 'sdi' || sourceType === 'blackmagic') {
|
||||
_dlSidecarCount++;
|
||||
// Determine which device indices are active on this node
|
||||
const _bmdDevices = [];
|
||||
try {
|
||||
const _bmdDir = '/dev/blackmagic';
|
||||
|
|
@ -528,26 +542,57 @@ async function handleSidecarStart(body, res) {
|
|||
_bmdEntries.forEach((_, i) => _bmdDevices.push(i));
|
||||
} catch (_) { _bmdDevices.push(0); }
|
||||
startDecklinkBridge(_bmdDevices);
|
||||
// Inject fmt if available
|
||||
|
||||
const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14);
|
||||
let _devIdx = NaN;
|
||||
try { _devIdx = JSON.parse(_srcCfg).device ?? JSON.parse(_srcCfg).index; } catch (_) {}
|
||||
if (!Number.isFinite(_devIdx)) _devIdx = parseInt((env.find(e => e.startsWith('BMD_DEVICE_INDEX=')) || '=0').split('=')[1]) || 0;
|
||||
if (Number.isFinite(_devIdx) && _dlDevFmt.has(_devIdx)) {
|
||||
|
||||
// FC_SLOT_ID is DETERMINISTIC — decklink-bridge builds it as
|
||||
// "decklink-<NODE_ID>-<device_idx>". Construct it directly (no wait on
|
||||
// async fmt JSON). FC_NODE_ID matches what node-agent passes to the
|
||||
// bridge via the NODE_ID env var.
|
||||
const _slotId = `decklink-${FC_NODE_ID}-${_devIdx}`;
|
||||
sidecarEnv.push(`FC_SLOT_ID=${_slotId}`);
|
||||
|
||||
if (_dlDevFmt.has(_devIdx)) {
|
||||
const _fmt = _dlDevFmt.get(_devIdx);
|
||||
if (_fmt.slot_id) sidecarEnv.push(`FC_SLOT_ID=${_fmt.slot_id}`);
|
||||
const _fps = (_fmt.fps_den && _fmt.fps_den !== 1) ? `${_fmt.fps_num}/${_fmt.fps_den}` : String(_fmt.fps_num);
|
||||
sidecarEnv.push(`DELTACAST_VIDEO_SIZE=${_fmt.width}x${_fmt.height}`);
|
||||
sidecarEnv.push(`DELTACAST_FRAMERATE=${_fps}`);
|
||||
sidecarEnv.push(`DELTACAST_INTERLACED=${_fmt.interlaced ? '1' : '0'}`);
|
||||
console.log(`[dl-bridge] device ${_devIdx} fmt: ${_fmt.width}x${_fmt.height} ${_fps} slot=${_slotId}`);
|
||||
} else {
|
||||
console.log(`[dl-bridge] device ${_devIdx} slot=${_slotId} (fmt not yet available — using defaults)`);
|
||||
}
|
||||
hostConfig.IpcMode = 'host';
|
||||
}
|
||||
|
||||
// Single cleanup for ALL failure paths (create fail, start fail, throw):
|
||||
// decrements the right bridge counter (stopping the bridge when it hits 0)
|
||||
// AND stops any net_ingest started for this request. Previously only the
|
||||
// deltacast counter was decremented — blackmagic count and net_ingest leaked
|
||||
// on every failed start, eventually stranding the bridge / ingest forever.
|
||||
const _cleanupOnFailure = () => {
|
||||
if (sourceType === 'deltacast') {
|
||||
_dcSidecarCount--;
|
||||
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
|
||||
} else if (sourceType === 'sdi' || sourceType === 'blackmagic') {
|
||||
_dlSidecarCount--;
|
||||
if (_dlSidecarCount <= 0) { _dlSidecarCount = 0; stopDecklinkBridge(); }
|
||||
} else if (sourceType === 'srt' || sourceType === 'rtmp') {
|
||||
// net_ingest may be keyed by the temp id (create not yet succeeded) or
|
||||
// the real containerId (remapped). Stop whichever exists.
|
||||
if (body._netIngestTempId) stopNetIngest(body._netIngestTempId);
|
||||
if (containerId) stopNetIngest(containerId);
|
||||
}
|
||||
};
|
||||
|
||||
let containerId;
|
||||
try {
|
||||
const createRes = await dockerApi('POST', '/containers/create', spec);
|
||||
if (createRes.status !== 201) {
|
||||
if (sourceType === 'deltacast') {
|
||||
_dcSidecarCount--;
|
||||
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
|
||||
}
|
||||
_cleanupOnFailure();
|
||||
return jsonResponse(res, 502, { error: 'Failed to create container', details: createRes.data });
|
||||
}
|
||||
|
||||
|
|
@ -557,11 +602,8 @@ async function handleSidecarStart(body, res) {
|
|||
console.log(`[sidecar-start] ${containerId} image=${image} src=${sourceType} MAM_API_URL=${_u} token=${_tok}`);
|
||||
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
|
||||
if (startRes.status !== 204) {
|
||||
if (sourceType === 'deltacast') {
|
||||
_dcSidecarCount--;
|
||||
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
|
||||
}
|
||||
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
||||
_cleanupOnFailure();
|
||||
return jsonResponse(res, 502, { error: 'Failed to start container', details: startRes.data });
|
||||
}
|
||||
|
||||
|
|
@ -578,14 +620,7 @@ async function handleSidecarStart(body, res) {
|
|||
}
|
||||
jsonResponse(res, 201, { containerId, capturePort });
|
||||
} catch (err) {
|
||||
if (sourceType === 'deltacast') {
|
||||
_dcSidecarCount--;
|
||||
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
|
||||
}
|
||||
if (sourceType === 'sdi' || sourceType === 'blackmagic') {
|
||||
_dlSidecarCount--;
|
||||
if (_dlSidecarCount <= 0) { _dlSidecarCount = 0; stopDecklinkBridge(); }
|
||||
}
|
||||
_cleanupOnFailure();
|
||||
throw err;
|
||||
}
|
||||
} catch (err) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue