From 5668c03615dc743fb64dbb94199abd8fa6fbe146 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Thu, 4 Jun 2026 02:50:57 +0000 Subject: [PATCH 1/2] chore(capture): remove stale legacy FIFO path + pin capture profile - capture-manager: remove dead legacy deltacast FIFO video path (FC_SLOT_ID is now always set by node-agent, framecache mandatory on all SDI nodes) - node-agent: correct stale comment about legacy FIFO fallback - onboard-node.sh: harden detect_sdi (device-node checks, not just lspci) and persist COMPOSE_PROFILES so framecache survives every redeploy on SDI nodes - remove committed capture.js.bak Root cause of this session's outage: zampp3 came up without the capture compose profile, so framecache never started; the bridge published to shm with no consumer and recorders showed 'receiving' with no real capture. --- deploy/onboard-node.sh | 23 +- services/capture/src/capture-manager.js | 71 +---- services/capture/src/routes/capture.js.bak | 330 --------------------- services/node-agent/index.js | 4 +- 4 files changed, 27 insertions(+), 401 deletions(-) delete mode 100644 services/capture/src/routes/capture.js.bak diff --git a/deploy/onboard-node.sh b/deploy/onboard-node.sh index 4ea5aa6..254e36e 100644 --- a/deploy/onboard-node.sh +++ b/deploy/onboard-node.sh @@ -95,12 +95,21 @@ detect_gpu() { return 1 } -# SDI capture card present? Blackmagic DeckLink or Deltacast, via lspci. +# SDI capture card present? Blackmagic DeckLink or Deltacast. +# Checks (any hit ⇒ present), so a driver/PCI-enumeration race at onboard time +# can't silently drop the capture profile and break recorders: +# 1) lspci vendor match +# 2) Deltacast device nodes (/dev/deltacast*, /dev/delta-*) +# 3) Blackmagic device nodes (/dev/blackmagic*, /dev/decklink*) detect_sdi() { - if command -v lspci &>/dev/null; then - if lspci 2>/dev/null | grep -iE 'blackmagic|deltacast' &>/dev/null; then - return 0 - fi + if command -v lspci &>/dev/null && lspci 2>/dev/null | grep -iE 'blackmagic|deltacast' &>/dev/null; then + return 0 + fi + if ls /dev/deltacast* /dev/delta-* &>/dev/null; then + return 0 + fi + if ls /dev/blackmagic* /dev/decklink* &>/dev/null; then + return 0 fi return 1 } @@ -209,6 +218,10 @@ info "Writing $ENV_FILE" echo "NODE_IP=$NODE_IP" echo "AGENT_PORT=$AGENT_PORT" echo "HEARTBEAT_MS=30000" + # Persist detected compose profiles so every subsequent `docker compose up` + # (manual or scripted) brings up the right services — capture/framecache must + # always run on SDI nodes or recorders silently fail. Comma-sep for COMPOSE_PROFILES. + echo "COMPOSE_PROFILES=$(echo $PROFILES | tr ' ' ',')" [[ -n "$BMD_MODEL" ]] && echo "BMD_MODEL=$BMD_MODEL" for v in REDIS_URL DATABASE_URL S3_ENDPOINT S3_BUCKET S3_ACCESS_KEY S3_SECRET_KEY S3_REGION; do val="${!v:-}" diff --git a/services/capture/src/capture-manager.js b/services/capture/src/capture-manager.js index 5ab6e93..b0fc06c 100644 --- a/services/capture/src/capture-manager.js +++ b/services/capture/src/capture-manager.js @@ -611,11 +611,13 @@ class CaptureManager { // their own cursor, enabling simultaneous growing + proxy + HLS from one // SDI input without any frame splitting. // - // Audio stays on the named FIFO path (same as before — audio fan-out via - // shm is a roadmap item). + // Audio stays on the named FIFO path (audio fan-out via shm is a roadmap + // item). // - // Falls back to the legacy FIFO path when FC_SLOT_ID is not set (e.g. on - // nodes running an older node-agent or without framecache deployed). + // node-agent ALWAYS injects FC_SLOT_ID for SDI sidecars (deterministic + // `deltacast--` / `decklink--`), so this is the sole + // SDI path. The old FC_SLOT_ID-absent legacy FIFO fallback was removed once + // framecache became mandatory on every capture node. if ((sourceType === 'deltacast' || sourceType === 'sdi' || sourceType === 'blackmagic') && process.env.FC_SLOT_ID) { @@ -712,67 +714,6 @@ class CaptureManager { }; } - // ── Legacy FIFO path for deltacast ─────────────────────────────────────── - // Used when FC_SLOT_ID is not set (framecache not deployed on this node, - // or older node-agent). Will be removed once framecache is everywhere. - if (sourceType === 'deltacast') { - const idx = (typeof device === 'number' || /^\d+$/.test(String(device))) - ? parseInt(device, 10) : 0; - const portIdx = (typeof port === 'number' || /^\d+$/.test(String(port))) - ? parseInt(port, 10) : idx; - - const DC_PIPE_DIR = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast'; - const videoFifo = `${DC_PIPE_DIR}/video-${portIdx}.fifo`; - const audioFifo = `${DC_PIPE_DIR}/audio-${portIdx}.fifo`; - - const { existsSync: _exists } = await import('node:fs'); - const WAIT_MS = 30_000; - const POLL_MS = 500; - const deadline = Date.now() + WAIT_MS; - let videoReady = false; - let audioReady = false; - while (Date.now() < deadline) { - videoReady = _exists(videoFifo); - audioReady = _exists(audioFifo); - if (videoReady && audioReady) break; - await new Promise(r => setTimeout(r, POLL_MS)); - } - if (!videoReady || !audioReady) { - throw new Error( - `deltacast bridge FIFOs not ready after ${WAIT_MS / 1000}s ` + - `(video=${videoReady} audio=${audioReady}) — is deltacast-bridge running?` - ); - } - console.log(`[deltacast] port ${portIdx} FIFOs ready (legacy): ${videoFifo}, ${audioFifo}`); - - const dcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080'; - const dcFps = process.env.DELTACAST_FRAMERATE || '60000/1001'; - const dcInterlaced = process.env.DELTACAST_INTERLACED === '1'; - - return { - inputArgs: [ - '-use_wallclock_as_timestamps', '1', - '-thread_queue_size', '512', - '-f', 'rawvideo', - '-pix_fmt', 'uyvy422', - '-video_size', dcSize, - '-framerate', dcFps, - '-i', videoFifo, - '-use_wallclock_as_timestamps', '1', - '-thread_queue_size', '512', - '-f', 's16le', - '-ar', '48000', - '-ac', '2', - '-i', audioFifo, - ], - isNetwork: false, - bridgeProcess: null, - audioFifo: null, - interlaced: dcInterlaced, - audioInputIndex: 1, /* legacy deltacast: video FIFO=0, audio FIFO=1 */ - }; - } - // Default: SDI via a pluggable source backend (issue #168). The backend // selection defaults to `blackmagic` (DeckLink) so existing SDI recorders // behave exactly as before. Deltacast/AJA backends throw until their diff --git a/services/capture/src/routes/capture.js.bak b/services/capture/src/routes/capture.js.bak deleted file mode 100644 index 78ccae4..0000000 --- a/services/capture/src/routes/capture.js.bak +++ /dev/null @@ -1,330 +0,0 @@ -import express from 'express'; -import { execSync, spawn } from 'child_process'; -import captureManager from '../capture-manager.js'; - -import dgram from 'dgram'; -import net from 'net'; - -function parseUrl(u) { - try { - const m = String(u).match(/^[a-z]+:\/\/([^:\/?#]+)(?::(\d+))?/i); - if (!m) return null; - return { host: m[1], port: parseInt(m[2] || '0', 10) }; - } catch (_) { return null; } -} - -async function checkReachable(host, port, sourceType) { - if (!port) return { ok: true }; - if (sourceType === 'srt') return await udpSendProbe(host, port); - if (sourceType === 'rtmp') return await tcpConnectProbe(host, port); - return { ok: true }; -} - -function udpSendProbe(host, port) { - return new Promise((resolve) => { - const sock = dgram.createSocket('udp4'); - let done = false; - const finish = (result) => { if (done) return; done = true; try { sock.close(); } catch (_) {} resolve(result); }; - sock.on('error', (err) => { - const msg = String(err && err.message || err); - if (/EHOSTUNREACH|ENETUNREACH/i.test(msg)) { - finish({ ok: false, error: 'Host ' + host + ' is unreachable from the capture container (no route). Confirm the IP is correct and the machine is online.', diagnostic: msg }); - } else if (/ECONNREFUSED|EPORTUNREACH/i.test(msg)) { - finish({ ok: false, error: 'Nothing is listening on UDP ' + host + ':' + port + '. In vMix, confirm the SRT output is Started and the port matches.', diagnostic: msg }); - } else { - finish({ ok: false, error: 'UDP probe failed: ' + msg, diagnostic: msg }); - } - }); - sock.send(Buffer.from('Z-AMPP-PROBE'), port, host, () => {}); - setTimeout(() => finish({ ok: true }), 1500); - }); -} - -function tcpConnectProbe(host, port) { - return new Promise((resolve) => { - const sock = new net.Socket(); - let done = false; - const finish = (r) => { if (done) return; done = true; try { sock.destroy(); } catch (_) {} resolve(r); }; - sock.setTimeout(2500); - sock.once('connect', () => finish({ ok: true })); - sock.once('timeout', () => finish({ ok: false, error: 'TCP connect to ' + host + ':' + port + ' timed out. Confirm the host is reachable and a TCP listener is running.' })); - sock.once('error', (err) => { - const msg = String(err && err.message || err); - if (/EHOSTUNREACH|ENETUNREACH/i.test(msg)) finish({ ok: false, error: 'Host ' + host + ' unreachable (no route).', diagnostic: msg }); - else if (/ECONNREFUSED/i.test(msg)) finish({ ok: false, error: 'Nothing is listening on TCP ' + host + ':' + port + '.', diagnostic: msg }); - else finish({ ok: false, error: 'TCP probe failed: ' + msg, diagnostic: msg }); - }); - sock.connect(port, host); - }); -} - -function classifyProbeError(raw, sourceType) { - const r = (raw || '').toLowerCase(); - if (sourceType === 'srt') { - if (/connection .* failed: (input\/output|timer expired|connection setup failure)/i.test(raw)) { - return 'SRT handshake failed. In vMix: confirm the External Output is Started, Type=SRT, Mode=Listener, port matches, and any passphrase / stream ID is empty (or copied exactly).'; - } - } - if (sourceType === 'rtmp') { - if (/connection refused/i.test(r)) return 'Nothing is listening on RTMP at this address. Start your RTMP source.'; - if (/end-of-file|invalid data found/i.test(r)) return 'Got a TCP connection but no RTMP stream. Confirm the source is publishing and the path / stream-key match.'; - } - return raw; -} - - -const router = express.Router(); - -const MAM_API_URL = process.env.MAM_API_URL || 'http://mam-api:3000'; - -/** - * GET /devices - * List available DeckLink devices - */ -router.get('/devices', (req, res) => { - try { - const devices = []; - let output = ''; - - try { - output = execSync('ffmpeg -f decklink -list_devices 1 -i dummy 2>&1', { - encoding: 'utf-8', - }); - } catch (error) { - // ffmpeg returns non-zero, but stderr is still captured - output = error.stderr ? error.stderr.toString() : error.toString(); - } - - // Parse ffmpeg output for DeckLink device names - // Format: [decklink @ ...] "DeckLink Quad 2" (input #0) - const lines = output.split('\n'); - let deviceIndex = 0; - - for (const line of lines) { - const match = line.match(/^\s*\[decklink[^\]]*\]\s+"([^"]+)"/); - if (match) { - devices.push({ - index: deviceIndex, - name: match[1], - }); - deviceIndex++; - } - } - - res.json({ devices }); - } catch (error) { - console.error('Error listing devices:', error); - res.status(500).json({ error: 'Failed to list devices' }); - } -}); - -/** - * GET /status - * Get current capture status - */ -router.get('/status', (req, res) => { - try { - const status = captureManager.getStatus(); - res.json(status); - } catch (error) { - console.error('Error getting status:', error); - res.status(500).json({ error: 'Failed to get status' }); - } -}); -router.post('/probe', async (req, res) => { - try { - const { source_type = 'sdi', source_url, listen = false } = req.body || {}; - - if (source_type === 'sdi') { - try { - const raw = execSync('ffmpeg -hide_banner -f decklink -list_devices 1 -i dummy 2>&1', { encoding: 'utf-8', timeout: 5000 }); - const devices = []; - for (const line of raw.split('\n')) { - const m = line.match(/\[decklink[^\]]*\]\s+"([^"]+)"/); - if (m) devices.push(m[1]); - } - return res.json({ ok: true, source_type, devices }); - } catch (err) { - const out = (err.stderr || err.stdout || err.toString()).toString(); - return res.json({ ok: false, source_type, error: out.slice(0, 800) }); - } - } - - if (listen) { - return res.json({ ok: false, source_type, error: 'Listener-mode sources cannot be probed standalone. Start the recorder and watch the signal indicator.' }); - } - - if (!source_url) return res.status(400).json({ error: 'source_url is required' }); - - // Pre-flight: parse host:port and check L3/L4 reachability so we can give - // an actionable error instead of the opaque libsrt "Input/output error". - const parsed = parseUrl(source_url); - if (!parsed) { - return res.json({ ok: false, source_type, source_url, error: 'Could not parse host:port from URL.' }); - } - const reach = await checkReachable(parsed.host, parsed.port, source_type); - if (!reach.ok) { - return res.json({ ok: false, source_type, source_url, error: reach.error, diagnostic: reach.diagnostic }); - } - - let url = source_url; - if (source_type === 'srt' && !/mode=/.test(url)) { - url += (url.includes('?') ? '&' : '?') + 'mode=caller'; - } - - const args = ['-hide_banner','-v','error','-probesize','32M','-analyzeduration','8M','-rw_timeout','7000000','-i', url, '-show_streams','-show_format','-of','json']; - const ff = spawn('ffprobe', args); - let stdout = '', stderr = ''; - ff.stdout.on('data', (c) => { stdout += c; }); - ff.stderr.on('data', (c) => { stderr += c; }); - const killer = setTimeout(() => { try { ff.kill('SIGKILL'); } catch (_) {} }, 10000); - ff.on('close', (code) => { - clearTimeout(killer); - if (code !== 0) { - const rawErr = (stderr || 'ffprobe failed').slice(0, 800); - const friendly = classifyProbeError(rawErr, source_type); - return res.json({ ok: false, source_type, source_url, error: friendly, diagnostic: rawErr }); - } - try { - const parsed = JSON.parse(stdout); - const streams = (parsed.streams || []).map(s => ({ - index: s.index, codec_type: s.codec_type, codec_name: s.codec_name, - width: s.width, height: s.height, pix_fmt: s.pix_fmt, - r_frame_rate: s.r_frame_rate, avg_frame_rate: s.avg_frame_rate, - sample_rate: s.sample_rate, channels: s.channels, - channel_layout: s.channel_layout, bit_rate: s.bit_rate, - })); - return res.json({ ok: true, source_type, source_url, - format: { format_name: parsed.format && parsed.format.format_name, duration: parsed.format && parsed.format.duration, bit_rate: parsed.format && parsed.format.bit_rate }, - streams }); - } catch (err) { - return res.json({ ok: false, source_type, source_url, error: 'Could not parse ffprobe output: ' + err.message }); - } - }); - } catch (error) { - console.error('Probe error:', error); - res.status(500).json({ error: error.message }); - } -}); - -/** - * POST /start - * Start a new capture session - * - * Body (SDI): - * { project_id, clip_name, device, bin_id?, source_type? } - * - * Body (SRT/RTMP caller): - * { project_id, clip_name, source_type, source_url, bin_id? } - * - * Body (SRT/RTMP listener): - * { project_id, clip_name, source_type, listen: true, listen_port?, stream_key?, bin_id? } - */ -router.post('/start', async (req, res) => { - try { - const { - project_id, - bin_id, - clip_name, - device, - source_type = 'sdi', - source_url, - listen = false, - listen_port, - stream_key, - } = req.body; - - if (!project_id || !clip_name) { - return res.status(400).json({ - error: 'Missing required fields: project_id, clip_name', - }); - } - - // Source-specific validation - if (source_type === 'sdi') { - if (device === undefined || device === null) { - return res.status(400).json({ error: 'SDI source requires: device' }); - } - } else if (source_type === 'srt' || source_type === 'rtmp') { - if (!listen && !source_url) { - return res.status(400).json({ - error: `${source_type.toUpperCase()} caller mode requires: source_url`, - }); - } - } else { - return res.status(400).json({ - error: `Unknown source_type: ${source_type}. Must be sdi, srt, or rtmp`, - }); - } - - const session = await captureManager.start({ - projectId: project_id, - binId: bin_id || null, - clipName: clip_name, - device, - sourceType: source_type, - sourceUrl: source_url, - listen, - listenPort: listen_port, - streamKey: stream_key, - }); - - res.json(session); - } catch (error) { - console.error('Error starting capture:', error); - res.status(500).json({ error: error.message }); - } -}); - -/** - * POST /stop - * Stop the current capture session - * Body: { session_id } - */ -router.post('/stop', async (req, res) => { - try { - const { session_id } = req.body; - - if (!session_id) { - return res.status(400).json({ error: 'Missing required field: session_id' }); - } - - const completedSession = await captureManager.stop(session_id); - - // Register asset with mam-api. - // If proxyKey is null (SRT/RTMP source), set needsProxy=true so the - // worker generates a proxy from the hires file asynchronously. - try { - const mamResponse = await fetch(`${MAM_API_URL}/api/v1/assets`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - projectId: completedSession.projectId, - binId: completedSession.binId, - clipName: completedSession.clipName, - sourceType: completedSession.sourceType, - hiresKey: completedSession.hiresKey, - proxyKey: completedSession.proxyKey, - needsProxy: completedSession.proxyKey === null, - duration: completedSession.duration, - capturedAt: completedSession.startedAt, - }), - }); - - if (!mamResponse.ok) { - console.warn( - `MAM API registration returned ${mamResponse.status}: ${await mamResponse.text()}`, - ); - } - } catch (mamError) { - console.warn('Failed to register asset with MAM API:', mamError.message); - } - - res.json(completedSession); - } catch (error) { - console.error('Error stopping capture:', error); - res.status(500).json({ error: error.message }); - } -}); - -export default router; diff --git a/services/node-agent/index.js b/services/node-agent/index.js index 838a219..21a7365 100644 --- a/services/node-agent/index.js +++ b/services/node-agent/index.js @@ -555,7 +555,9 @@ async function handleSidecarStart(body, res) { // "deltacast--" (both known here), so we construct it // directly and DO NOT wait for the bridge's async format JSON. This is // the fix for the cold-start race where _dcPortFmt was still empty on - // first recorder start, silently falling back to the legacy FIFO path. + // first recorder start. FC_SLOT_ID is now MANDATORY — the legacy + // FIFO-video fallback in capture-manager was removed, so a missing slot + // id would hard-fail rather than silently degrade. const _slotId = `deltacast-${DC_BOARD}-${_portNum}`; sidecarEnv.push(`FC_SLOT_ID=${_slotId}`); From bf4632b9119010a42bbf6a864e6c25f78c59e7b8 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Thu, 4 Jun 2026 03:05:00 +0000 Subject: [PATCH 2/2] feat(mam-api): extract ensureStandbySidecar + add POST /recorders/reconcile-standby Re-provisions the persistent standby sidecar for SDI/deltacast recorders that lost theirs (manual cleanup, node redeploy, wiped /dev/shm). Without this the recorder falls back to slow on-demand spawn on /start, which can collide on the capture port (EADDRINUSE). Idempotent; { force:true } recreates even when a container_id is already set. --- services/mam-api/src/routes/recorders.js | 131 ++++++++++++++++------- 1 file changed, 93 insertions(+), 38 deletions(-) diff --git a/services/mam-api/src/routes/recorders.js b/services/mam-api/src/routes/recorders.js index d44f4cc..0244588 100644 --- a/services/mam-api/src/routes/recorders.js +++ b/services/mam-api/src/routes/recorders.js @@ -280,6 +280,55 @@ function buildStandbyEnv(recorder) { ]; } +// Source types that run a long-lived standby sidecar (idle-preview container +// kept up 24/7 so `record` is a sub-second HTTP call, not a Docker cold start). +const STANDBY_SOURCE_TYPES = ['deltacast', 'sdi', 'blackmagic']; + +// Provision (or re-provision) the single persistent standby sidecar for one +// recorder by asking its node's agent to create the idle container. Idempotent +// at the node-agent layer (one container per capture port). Updates the +// recorder row with the new container_id + status='standby'. Returns: +// { ok, containerId?, reason? } +// Non-fatal by contract — the caller logs/aggregates; a recorder is still +// usable via the on-demand spawn fallback in /start if this fails. +async function ensureStandbySidecar(recorder) { + if (!recorder.node_id || !STANDBY_SOURCE_TYPES.includes(recorder.source_type)) { + return { ok: false, reason: 'not a standby source / no node' }; + } + const { remote: isRemote, apiUrl: targetNodeApiUrl } = + await resolveNodeTarget(recorder.node_id).catch(() => ({ remote: false })); + if (!isRemote || !targetNodeApiUrl) { + return { ok: false, reason: 'node not remote/reachable' }; + } + const capturePort = SIDECAR_PORT_BASE + (recorder.device_index || 0); + const useGpu = GPU_CODECS.includes(recorder.recording_codec); + const standbyRes = await fetch(`${targetNodeApiUrl}/sidecar/standby`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + image: 'wild-dragon-capture:latest', + env: buildStandbyEnv(recorder), + capturePort, + sourceType: recorder.source_type, + useGpu, + gpuUuid: recorder.gpu_uuid || null, + }), + signal: AbortSignal.timeout(15000), + }); + if (!standbyRes.ok) { + return { ok: false, reason: `node-agent returned ${standbyRes.status}` }; + } + const { containerId } = await standbyRes.json(); + await pool.query( + `UPDATE recorders SET container_id = $1, status = 'standby', updated_at = NOW() WHERE id = $2`, + [containerId, recorder.id] + ); + recorder.container_id = containerId; + recorder.status = 'standby'; + console.log(`[recorders] standby sidecar spawned for ${recorder.id}: ${containerId}`); + return { ok: true, containerId }; +} + // Issue #162 — after a local-spawn stop, wait for the capture container to // finalize its master. The asset row was pre-created at start with // status='live' (display_name = current_session_id); the ingest/finalize step @@ -432,43 +481,8 @@ router.post('/', async (req, res, next) => { // Spawn a standby sidecar immediately for SDI/deltacast/blackmagic recorders // that have an assigned node, so the container + bridge are ready before the // user hits record. Non-fatal — recorder is still usable if this fails. - const STANDBY_SOURCE_TYPES = ['deltacast', 'sdi', 'blackmagic']; - if (recorder.node_id && STANDBY_SOURCE_TYPES.includes(recorder.source_type)) { - const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(recorder.node_id).catch(() => ({ remote: false })); - if (isRemote && targetNodeApiUrl) { - const capturePort = SIDECAR_PORT_BASE + (recorder.device_index || 0); - const useGpu = GPU_CODECS.includes(recorder.recording_codec); - try { - const standbyRes = await fetch(`${targetNodeApiUrl}/sidecar/standby`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - image: 'wild-dragon-capture:latest', - env: buildStandbyEnv(recorder), - capturePort, - sourceType: recorder.source_type, - useGpu, - gpuUuid: recorder.gpu_uuid || null, - }), - signal: AbortSignal.timeout(15000), - }); - if (standbyRes.ok) { - const { containerId } = await standbyRes.json(); - await pool.query( - `UPDATE recorders SET container_id = $1, status = 'standby', updated_at = NOW() WHERE id = $2`, - [containerId, recorder.id] - ); - recorder.container_id = containerId; - recorder.status = 'standby'; - console.log(`[recorders] standby sidecar spawned for ${recorder.id}: ${containerId}`); - } else { - console.warn(`[recorders] standby spawn returned ${standbyRes.status} for ${recorder.id} — will spawn on start`); - } - } catch (e) { - console.warn(`[recorders] standby spawn failed for ${recorder.id} (non-fatal): ${e.message}`); - } - } - } + await ensureStandbySidecar(recorder).catch(e => + console.warn(`[recorders] standby spawn failed for ${recorder.id} (non-fatal): ${e.message}`)); res.status(201).json(recorder); } catch (err) { @@ -476,6 +490,48 @@ router.post('/', async (req, res, next) => { } }); +// POST /reconcile-standby - (re)provision the persistent standby sidecar for +// every SDI/deltacast recorder that should have one. Standby sidecars are +// created on recorder-create and kept up 24/7 (RestartPolicy=unless-stopped), +// but if they're externally removed (manual cleanup, node redeploy, a wiped +// /dev/shm) nothing recreates them — the recorder then falls back to the slow +// on-demand spawn on /start, which can collide on the capture port. This +// endpoint re-warms them so all recorders return to the fast standby path. +// +// Optional body: { force: true } recreates even recorders that currently claim +// a container_id (the node-agent is idempotent per capture port, so a stale id +// is replaced cleanly). Without force, only recorders with no container_id are +// (re)provisioned. +router.post('/reconcile-standby', requireRecorderEdit, async (req, res, next) => { + try { + const force = !!(req.body && req.body.force); + const { rows } = await pool.query( + `SELECT * FROM recorders + WHERE source_type = ANY($1) + AND node_id IS NOT NULL + ORDER BY name`, + [STANDBY_SOURCE_TYPES] + ); + const results = []; + for (const recorder of rows) { + if (!force && recorder.container_id) { + results.push({ id: recorder.id, name: recorder.name, ok: true, skipped: 'already has container_id' }); + continue; + } + try { + const r = await ensureStandbySidecar(recorder); + results.push({ id: recorder.id, name: recorder.name, ...r }); + } catch (e) { + results.push({ id: recorder.id, name: recorder.name, ok: false, reason: e.message }); + } + } + const provisioned = results.filter(r => r.ok && r.containerId).length; + res.json({ provisioned, total: rows.length, results }); + } catch (err) { + next(err); + } +}); + // GET /:id - Get single recorder router.get('/:id', async (req, res, next) => { try { @@ -970,7 +1026,6 @@ router.post('/:id/stop', requireRecorderEdit, async (req, res, next) => { // /start call immediately. // // If NOT in standby (legacy on-demand spawn), use the old docker-stop path. - const STANDBY_SOURCE_TYPES = ['deltacast', 'sdi', 'blackmagic']; const isStandbySource = STANDBY_SOURCE_TYPES.includes(recorder.source_type); if (isStandbySource && recorder.container_id) {