diff --git a/services/mam-api/src/db/migrations/036-recorder-hardware-identity.sql b/services/mam-api/src/db/migrations/036-recorder-hardware-identity.sql new file mode 100644 index 0000000..ca454e7 --- /dev/null +++ b/services/mam-api/src/db/migrations/036-recorder-hardware-identity.sql @@ -0,0 +1,38 @@ +-- Migration 036: Recorders become physical hardware, not user-created rows. +-- +-- A recorder now maps 1:1 to a physical capture port: (node_id, device_index). +-- mam-api auto-provisions one row per port from each node-agent heartbeat's +-- capabilities (deltacast/blackmagic arrays). Rows are NEVER deleted by the +-- operator — they're discovered, enabled/disabled, and configured in place. +-- This removes the delete/create churn that orphaned standby sidecars and +-- caused capture-port (EADDRINUSE) collisions. +-- +-- New columns: +-- label : optional friendly name overlaid on the hardware identity +-- (e.g. "Aurora" for zampp3-dc0). NULL → UI shows node+port name. +-- enabled : operator opt-in. false (default) = no standby sidecar, port idle. +-- true = persistent standby sidecar kept up (idle-preview), ready +-- to record. Toggled by the Enable/Disable button. +-- auto_provisioned : true when the row was created by heartbeat discovery +-- (vs a legacy manually-created recorder). Informational. +-- +-- Identity: +-- UNIQUE(node_id, device_index) is the structural guarantee that two +-- recorders can never share a capture port — the root-cause fix for the +-- collisions. Partial unique index (WHERE both are non-null) so any legacy +-- rows without a node/device don't violate it. + +ALTER TABLE recorders + ADD COLUMN IF NOT EXISTS label TEXT DEFAULT NULL, + ADD COLUMN IF NOT EXISTS enabled BOOLEAN NOT NULL DEFAULT false, + ADD COLUMN IF NOT EXISTS auto_provisioned BOOLEAN NOT NULL DEFAULT false; + +-- One recorder per physical port. Partial so pre-existing rows lacking a +-- node_id/device_index (e.g. network sources) are unaffected. +CREATE UNIQUE INDEX IF NOT EXISTS recorders_node_device_uniq + ON recorders (node_id, device_index) + WHERE node_id IS NOT NULL AND device_index IS NOT NULL; + +-- Fast lookup of a node's ports during heartbeat reconciliation. +CREATE INDEX IF NOT EXISTS recorders_node_id_idx + ON recorders (node_id); diff --git a/services/mam-api/src/routes/cluster.js b/services/mam-api/src/routes/cluster.js index f94d9fa..1e45fb7 100644 --- a/services/mam-api/src/routes/cluster.js +++ b/services/mam-api/src/routes/cluster.js @@ -242,10 +242,65 @@ router.post('/heartbeat', async (req, res, next) => { metrics != null ? JSON.stringify(metrics) : null, ] ); + + // Auto-provision recorder rows from this node's capture hardware. One row + // per physical port, keyed (node_id, device_index). Discovery only — it + // never enables, records, or deletes; the operator opts a port in via the + // Enable button. Non-fatal so a reconcile hiccup never drops a heartbeat. + reconcileRecordersForNode(r.rows[0]).catch(e => + console.warn(`[recorders] auto-provision for ${hostname} failed (non-fatal): ${e.message}`)); + res.json(r.rows[0]); } catch (err) { next(err); } }); +// Discover capture ports from a node's heartbeat capabilities and upsert one +// recorder row per port. Idempotent via UNIQUE(node_id, device_index): a row +// is created the first time a port is seen (disabled, no sidecar) and left +// untouched on every subsequent heartbeat — operator config/label/enabled +// state is preserved. Ports that vanish are NOT deleted (node may be briefly +// offline); the UI greys them via the node's last_seen. +async function reconcileRecordersForNode(node) { + if (!node || !node.id) return; + const cap = node.capabilities || {}; + // Each entry: { source_type, device_index }. Deltacast uses 'port', DeckLink + // uses 'index'; both become device_index (the capture-port offset). + const ports = []; + for (const d of (cap.deltacast || [])) { + const idx = d.index ?? d.port; + if (Number.isInteger(idx)) ports.push({ source_type: 'deltacast', device_index: idx }); + } + for (const b of (cap.blackmagic || [])) { + const idx = b.index; + if (Number.isInteger(idx)) ports.push({ source_type: 'blackmagic', device_index: idx }); + } + if (ports.length === 0) return; + + for (const p of ports) { + // INSERT … ON CONFLICT DO NOTHING: create-once. Never overwrite an existing + // row (preserves label, enabled, codec config, status). source_config keeps + // the legacy {port}/{device} shape the capture pipeline already reads. + const srcCfg = p.source_type === 'deltacast' + ? { port: p.device_index } + : { device: p.device_index }; + await pool.query( + `INSERT INTO recorders + (node_id, device_index, source_type, source_config, name, enabled, auto_provisioned) + VALUES ($1, $2, $3::source_type, $4, $5, false, true) + ON CONFLICT (node_id, device_index) WHERE node_id IS NOT NULL AND device_index IS NOT NULL + DO NOTHING`, + [ + node.id, + p.device_index, + p.source_type, + JSON.stringify(srcCfg), + // Deterministic hardware name; the operator can set a friendly `label`. + `${node.hostname}-${p.source_type === 'deltacast' ? 'dc' : 'bmd'}${p.device_index}`, + ] + ); + } +} + router.get('/devices/blackmagic/signal', async (req, res, next) => { try { const nodesResult = await pool.query( diff --git a/services/mam-api/src/routes/recorders.js b/services/mam-api/src/routes/recorders.js index 0244588..afce8fd 100644 --- a/services/mam-api/src/routes/recorders.js +++ b/services/mam-api/src/routes/recorders.js @@ -154,7 +154,7 @@ const RECORDER_FIELDS = [ 'proxy_audio_codec', 'proxy_audio_bitrate', 'proxy_audio_channels', 'proxy_container', 'project_id', 'node_id', 'device_index', - 'growing_enabled', + 'growing_enabled', 'label', ]; function pickRecorderFields(body) { @@ -329,6 +329,31 @@ async function ensureStandbySidecar(recorder) { return { ok: true, containerId }; } +// Tear down a recorder's standby sidecar (Disable). Asks the node-agent to +// remove the container, then clears container_id and sets status='stopped'. +// Best-effort on the node-agent call — even if the delete fails we still clear +// the row so the operator isn't stuck; the force-free-port logic on the next +// Enable will reclaim a stray container. Returns { ok, reason? }. +async function teardownStandbySidecar(recorder) { + if (recorder.node_id && recorder.container_id) { + const { remote: isRemote, apiUrl: targetNodeApiUrl } = + await resolveNodeTarget(recorder.node_id).catch(() => ({ remote: false })); + if (isRemote && targetNodeApiUrl) { + await fetch(`${targetNodeApiUrl}/sidecar/${recorder.container_id}`, { + method: 'DELETE', + signal: AbortSignal.timeout(15000), + }).catch(e => console.warn(`[recorders] sidecar teardown for ${recorder.id} failed (clearing anyway): ${e.message}`)); + } + } + await pool.query( + `UPDATE recorders SET container_id = NULL, status = 'stopped', current_session_id = NULL, updated_at = NOW() WHERE id = $1`, + [recorder.id] + ); + recorder.container_id = null; + recorder.status = 'stopped'; + return { ok: true }; +} + // Issue #162 — after a local-spawn stop, wait for the capture container to // finalize its master. The asset row was pre-created at start with // status='live' (display_name = current_session_id); the ingest/finalize step @@ -532,6 +557,60 @@ router.post('/reconcile-standby', requireRecorderEdit, async (req, res, next) => } }); +// POST /:id/enable - Operator opt-in. Brings up the persistent standby sidecar +// (idle-preview, kept up 24/7) so the port is ready to record in <1s. Sets +// enabled=true. Idempotent: if already enabled with a live container the +// node-agent's force-free-port logic replaces any stale container cleanly. +router.post('/:id/enable', requireRecorderEdit, async (req, res, next) => { + try { + const { id } = req.params; + const { rows } = await pool.query('SELECT * FROM recorders WHERE id = $1', [id]); + if (rows.length === 0) return res.status(404).json({ error: 'Recorder not found' }); + const recorder = rows[0]; + + if (!STANDBY_SOURCE_TYPES.includes(recorder.source_type)) { + return res.status(400).json({ error: `Source type "${recorder.source_type}" does not support standby/enable` }); + } + if (!recorder.node_id) { + return res.status(409).json({ error: 'Recorder has no assigned node (hardware offline?) — cannot enable' }); + } + + const r = await ensureStandbySidecar(recorder); + if (!r.ok) { + return res.status(502).json({ error: `Could not start standby sidecar: ${r.reason || 'unknown'}` }); + } + await pool.query('UPDATE recorders SET enabled = true, updated_at = NOW() WHERE id = $1', [id]); + recorder.enabled = true; + res.json(recorder); + } catch (err) { + next(err); + } +}); + +// POST /:id/disable - Operator opt-out. Stops & removes the standby sidecar, +// freeing the capture port, and sets enabled=false. Config (codec, label, +// growing) is preserved on the row for the next enable. Refuses while the +// recorder is actively recording — stop it first. +router.post('/:id/disable', requireRecorderEdit, async (req, res, next) => { + try { + const { id } = req.params; + const { rows } = await pool.query('SELECT * FROM recorders WHERE id = $1', [id]); + if (rows.length === 0) return res.status(404).json({ error: 'Recorder not found' }); + const recorder = rows[0]; + + if (recorder.status === 'recording') { + return res.status(409).json({ error: 'Recorder is recording — stop it before disabling' }); + } + + await teardownStandbySidecar(recorder); + await pool.query('UPDATE recorders SET enabled = false, updated_at = NOW() WHERE id = $1', [id]); + recorder.enabled = false; + res.json(recorder); + } catch (err) { + next(err); + } +}); + // GET /:id - Get single recorder router.get('/:id', async (req, res, next) => { try { diff --git a/services/node-agent/index.js b/services/node-agent/index.js index 21a7365..01e911d 100644 --- a/services/node-agent/index.js +++ b/services/node-agent/index.js @@ -451,6 +451,10 @@ async function handleSidecarStart(body, res) { gpuUuid = null, } = body; + // Reclaim the capture port before spawning, so an on-demand start can never + // collide (EADDRINUSE) with a stale/standby container already on that port. + await freeCapturePort(capturePort); + const binds = [`${LIVE_DIR}:/live`]; // Always mount /dev/shm so the sidecar can access framecache slots. if (fs.existsSync('/dev/shm')) binds.push('/dev/shm:/dev/shm'); @@ -696,6 +700,37 @@ async function fetchContainerLogs(containerId) { // The bridge is started here (warms it up for zero-lag on first /start call). // Per-session params (CLIP_NAME, ASSET_ID, PROJECT_ID) are NOT in the env — // they arrive via HTTP POST /capture/start when the user hits record. +// Force-free a capture port before binding a new sidecar to it. With +// NetworkMode=host, two capture containers requesting the same PORT collide +// with EADDRINUSE — the exact failure that orphaned/duplicated sidecars caused. +// We enumerate ALL capture containers (running or not), read each one's PORT +// env, and force-remove any bound to this capturePort. Idempotent and safe: +// the only thing on that port should be a sidecar we're about to replace. +async function freeCapturePort(capturePort) { + try { + // all=1 so we also catch Exited/Created stragglers still holding the name. + const listRes = await dockerApi('GET', '/containers/json?all=1'); + if (listRes.status !== 200 || !Array.isArray(listRes.data)) return; + for (const c of listRes.data) { + const img = c.Image || ''; + if (!/wild-dragon-capture/.test(img)) continue; + // Inspect to read the PORT env (list payload doesn't include env). + try { + const insp = await dockerApi('GET', `/containers/${c.Id}/json`); + const cenv = (insp.status === 200 && insp.data?.Config?.Env) || []; + const portEnv = cenv.find(e => e.startsWith('PORT=')); + const p = portEnv ? parseInt(portEnv.split('=')[1], 10) : NaN; + if (p === capturePort) { + console.log(`[sidecar] force-freeing capture port ${capturePort}: removing stale container ${c.Id.slice(0, 12)}`); + await dockerApi('DELETE', `/containers/${c.Id}?force=true`).catch(() => {}); + } + } catch (_) { /* container vanished mid-scan — fine */ } + } + } catch (e) { + console.warn(`[sidecar] freeCapturePort(${capturePort}) scan failed (continuing): ${e.message}`); + } +} + async function handleSidecarStandby(body, res) { try { const { @@ -707,6 +742,10 @@ async function handleSidecarStandby(body, res) { gpuUuid = null, } = body; + // Reclaim the port first so a re-Enable (or a stale container surviving a + // node-agent restart) can never collide on bind. + await freeCapturePort(capturePort); + const binds = [`${LIVE_DIR}:/live`]; if (fs.existsSync('/dev/shm')) binds.push('/dev/shm:/dev/shm'); if (sourceType === 'sdi' || sourceType === 'blackmagic') binds.unshift('/dev/blackmagic:/dev/blackmagic'); diff --git a/services/web-ui/public/screens-ingest.jsx b/services/web-ui/public/screens-ingest.jsx index 854d4cc..ccfde15 100644 --- a/services/web-ui/public/screens-ingest.jsx +++ b/services/web-ui/public/screens-ingest.jsx @@ -601,39 +601,201 @@ function HlsPreviewUrl({ url }) { } /* ===== Recorders ===== */ + +// Per-recorder config editor. Recorders are physical ports — this PATCHes the +// existing row in place (never delete/recreate), so codec/growing/label/project +// changes persist across enable/disable. If the recorder is currently ENABLED, +// saving bounces its standby sidecar (disable→enable) so the new env takes +// effect; the operator is told. Refuses while recording. +function RecorderConfigModal({ recorder, onClose, onSaved }) { + const PROJECTS = window.ZAMPP_DATA?.PROJECTS || []; + const GROWING_CODEC = 'hevc_nvenc'; + const BITRATE_CODECS = new Set(['hevc_nvenc', 'h264_nvenc', 'libx264', 'libx265', 'dnxhd', 'dnxhr_hq']); + + const [label, setLabel] = React.useState(recorder.label || ''); + const [codec, setCodec] = React.useState(recorder.recording_codec || 'hevc_nvenc'); + const [bitrate, setBitrate] = React.useState((recorder.recording_video_bitrate || '25').replace(/M$/i, '')); + const [growing, setGrowing] = React.useState(recorder.growing_enabled === true); + const [projectId, setProjectId] = React.useState(recorder.project_id || PROJECTS[0]?.id || ''); + const [saving, setSaving] = React.useState(false); + const [err, setErr] = React.useState(null); + + const isRec = recorder.status === 'recording'; + const showBitrate = growing || BITRATE_CODECS.has(codec); + + const submit = () => { + if (saving || isRec) return; + setSaving(true); setErr(null); + // Growing forces the XDCAM/HEVC master path on the backend; send the GPU + // master codec so the row is coherent if growing is later turned off. + const effCodec = growing ? GROWING_CODEC : codec; + const body = { + label: label.trim() || null, + recording_codec: effCodec, + growing_enabled: growing, + project_id: projectId || null, + }; + if (showBitrate && bitrate) body.recording_video_bitrate = String(bitrate).replace(/M$/i, '') + 'M'; + + window.ZAMPP_API.fetch('/recorders/' + recorder.id, { method: 'PATCH', body: JSON.stringify(body) }) + .then(async () => { + // If enabled, bounce the standby sidecar so the new env is applied. + if (recorder.enabled) { + await window.ZAMPP_API.fetch('/recorders/' + recorder.id + '/disable', { method: 'POST' }).catch(() => {}); + await window.ZAMPP_API.fetch('/recorders/' + recorder.id + '/enable', { method: 'POST' }).catch(() => {}); + } + setSaving(false); + onSaved(); + }) + .catch(e => { setSaving(false); setErr(e.message || 'Save failed'); }); + }; + + return ( +