From 92b460f5037d64f5df145e9dd0cafc6651caf204 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Fri, 29 May 2026 03:20:02 +0000 Subject: [PATCH] fix(recorder): finalise live asset on stop + add live SDI monitor Stuck-live fix: capture sidecar now finalises the pre-created live asset by id (new POST /assets/:id/finalize) instead of POSTing a new asset (409 collision); node-agent gives the sidecar a 180s stop grace so the S3 upload + callback complete; node-agent logs sidecar start/stop for diagnostics. Live SDI monitor: HLS preview is now a 2nd output of the hires ffmpeg (single DeckLink read, split to ProRes/S3 + H.264/HLS); node-agent serves /live over HTTP; mam-api proxies GET /recorders/:id/live/* to the recorder node; web-ui HlsPreview loads from the proxied URL. Co-Authored-By: Claude Opus 4.8 --- docker-compose.worker.yml | 1 + services/capture/src/capture-manager.js | 38 +++++-- services/capture/src/index.js | 16 +++ services/mam-api/src/routes/assets.js | 58 +++++++++++ services/mam-api/src/routes/recorders.js | 34 ++++++ services/node-agent/index.js | 121 +++++++++++++++++++++- services/web-ui/public/screens-home.jsx | 2 +- services/web-ui/public/screens-ingest.jsx | 10 +- 8 files changed, 267 insertions(+), 13 deletions(-) diff --git a/docker-compose.worker.yml b/docker-compose.worker.yml index 688866d..ebb4222 100644 --- a/docker-compose.worker.yml +++ b/docker-compose.worker.yml @@ -58,6 +58,7 @@ services: volumes: - /var/run/docker.sock:/var/run/docker.sock - /dev:/dev:ro + - /mnt/NVME/MAM/wild-dragon-live:/mnt/NVME/MAM/wild-dragon-live:ro devices: - /dev/blackmagic:/dev/blackmagic diff --git a/services/capture/src/capture-manager.js b/services/capture/src/capture-manager.js index 1dd177e..4e503e7 100644 --- a/services/capture/src/capture-manager.js +++ b/services/capture/src/capture-manager.js @@ -301,12 +301,38 @@ class CaptureManager { const hiresOutput = growingPath ? growingPath : 'pipe:1'; const hiresStdio = growingPath ? ['ignore', 'ignore', 'pipe'] : ['ignore', 'pipe', 'pipe']; - const hiresProcess = spawn('ffmpeg', [ - ...inputArgs, - ...sdiFilterArgs, - ...hiresCodecArgs, - hiresOutput, - ], { stdio: hiresStdio }); + // For SDI we cannot open the DeckLink device a second time for a preview + // tee, so the live HLS preview is produced as a SECOND OUTPUT of the hires + // ffmpeg: one decklink read -> yadif -> split -> [ProRes/S3] + [H.264/HLS]. + let sdiHlsDir = null; + let hiresArgs; + if (sourceType === 'sdi' && this._assetIdForHls) { + const fsMod = await import('node:fs'); + sdiHlsDir = '/live/' + this._assetIdForHls; + try { fsMod.mkdirSync(sdiHlsDir, { recursive: true }); } catch (_) {} + hiresArgs = [ + ...inputArgs, + '-filter_complex', '[0:v]yadif=mode=1:deint=1,split=2[vhi][vlo]', + // Output 0 — ProRes master (S3 pipe or growing file) + '-map', '[vhi]', '-map', '0:a:0?', + ...hiresCodecArgs, + hiresOutput, + // Output 1 — low-latency H.264 HLS preview for the UI monitor + '-map', '[vlo]', '-map', '0:a:0?', + '-c:v', 'libx264', '-preset', 'veryfast', '-tune', 'zerolatency', + '-pix_fmt', 'yuv420p', '-b:v', '2M', '-g', '60', '-sc_threshold', '0', + '-c:a', 'aac', '-b:a', '128k', '-ar', '44100', + '-f', 'hls', '-hls_time', '2', '-hls_list_size', '15', + '-hls_flags', 'delete_segments+append_list+omit_endlist', + '-hls_segment_filename', sdiHlsDir + '/seg-%05d.ts', + sdiHlsDir + '/index.m3u8', + ]; + console.log('[HLS] SDI preview as 2nd output -> ' + sdiHlsDir); + } else { + hiresArgs = [ ...inputArgs, ...sdiFilterArgs, ...hiresCodecArgs, hiresOutput ]; + } + + const hiresProcess = spawn('ffmpeg', hiresArgs, { stdio: hiresStdio }); const hiresUpload = growingPath ? Promise.resolve({ growingPath }) diff --git a/services/capture/src/index.js b/services/capture/src/index.js index 010c4fd..59d0cbe 100644 --- a/services/capture/src/index.js +++ b/services/capture/src/index.js @@ -135,6 +135,22 @@ async function gracefulShutdown(signal) { console.error('[shutdown] failed to flag empty asset:', e.message); } } + } else if (liveAssetId) { + // Finalise the pre-created live asset by id (avoids POST / 409 collision). + try { + const res = await fetch(`${MAM_API_URL}/api/v1/assets/${liveAssetId}/finalize`, { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...(MAM_API_TOKEN ? { 'Authorization': `Bearer ${MAM_API_TOKEN}` } : {}) }, + body: JSON.stringify({ hiresKey: completed.hiresKey, proxyKey: completed.proxyKey, duration: completed.duration }), + }); + if (!res.ok) { + console.warn(`[shutdown] mam-api finalize returned ${res.status}: ${await res.text()}`); + } else { + console.log('[shutdown] live asset finalised with mam-api'); + } + } catch (mamErr) { + console.error('[shutdown] failed to finalise asset:', mamErr.message); + } } else { try { const res = await fetch(`${MAM_API_URL}/api/v1/assets`, { diff --git a/services/mam-api/src/routes/assets.js b/services/mam-api/src/routes/assets.js index a815245..b7c216a 100644 --- a/services/mam-api/src/routes/assets.js +++ b/services/mam-api/src/routes/assets.js @@ -373,6 +373,64 @@ router.post('/:id/mark-empty', async (req, res, next) => { } catch (err) { next(err); } }); +// POST /:id/finalize +// Capture sidecar calls this on a SUCCESSFUL recording stop to finalise the +// pre-created 'live' asset (created at recorder start, id passed as ASSET_ID). +// Previously the sidecar did POST / to create a NEW asset, which collided with +// the existing live row -> 409 -> asset stuck 'live', no jobs. Finalising by id +// flips it out of 'live', records duration + S3 keys, and kicks off the +// proxy -> thumbnail -> filmstrip job chain. +router.post('/:id/finalize', async (req, res, next) => { + try { + const { id } = req.params; + const { hiresKey, proxyKey, duration } = req.body; + + const check = await pool.query(`SELECT * FROM assets WHERE id = $1`, [id]); + if (check.rows.length === 0) return res.status(404).json({ error: 'Asset not found' }); + const current = check.rows[0].status; + // Already terminal — idempotent no-op (handles shutdown retries). + if (current === 'ready' || current === 'error') { + return res.status(200).json({ ...check.rows[0], skipped: true }); + } + + const durationNum = duration !== undefined && duration !== null ? Number(duration) : null; + const durationMs = (durationNum !== null && Number.isFinite(durationNum)) ? Math.round(durationNum * 1000) : null; + + const upd = await pool.query( + `UPDATE assets + SET status = 'processing', + original_s3_key = COALESCE($2, original_s3_key), + proxy_s3_key = COALESCE($3, proxy_s3_key), + duration_ms = COALESCE($4, duration_ms), + updated_at = NOW() + WHERE id = $1 + RETURNING *`, + [id, hiresKey || null, proxyKey || null, durationMs] + ); + const asset = upd.rows[0]; + const thumbnailKey = `thumbnails/${id}.jpg`; + + if (asset.proxy_s3_key) { + // Proxy already produced by the capture sidecar — just build the + // thumbnail (which then chains filmstrip). Worker flips status->ready. + await thumbnailQueue.add('generate', { assetId: id, proxyKey: asset.proxy_s3_key, outputKey: thumbnailKey }); + console.log(`[assets] finalize ${id}: queued thumbnail (proxy present)`); + } else if (asset.original_s3_key) { + // No proxy yet — generate it from the hi-res master. The proxy worker + // chains thumbnail -> filmstrip on completion. + const generatedProxyKey = `proxies/${id}.mp4`; + await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: generatedProxyKey }); + console.log(`[assets] finalize ${id}: queued proxy from master`); + } else { + await pool.query(`UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`, [id]); + asset.status = 'ready'; + } + + console.log(`[assets] finalized live asset ${id} (${asset.display_name}) -> ${asset.status}`); + res.json(asset); + } catch (err) { next(err); } +}); + // POST /:id/generate-proxy router.post('/:id/generate-proxy', async (req, res, next) => { try { diff --git a/services/mam-api/src/routes/recorders.js b/services/mam-api/src/routes/recorders.js index d6e66cb..1af2f05 100644 --- a/services/mam-api/src/routes/recorders.js +++ b/services/mam-api/src/routes/recorders.js @@ -1,5 +1,6 @@ import express from 'express'; import http from 'http'; +import fs from 'fs'; import net from 'net'; import dgram from 'dgram'; import pool from '../db/pool.js'; @@ -870,4 +871,37 @@ function probeUdp(host, port) { }); } + +// GET /:id/live/* — reverse-proxy the live HLS preview from the recorder's node. +// Remote recorders: segments live on the worker node, served by its node-agent +// (/live/...). Local recorders: served from this host's /live mount. Browser +// media requests carry the session cookie (same-origin) so auth passes. +router.get('/:id/live/:rest(*)', async (req, res, next) => { + try { + const { id } = req.params; + const rest = req.params.rest; + if (!rest || rest.includes('..')) return res.status(400).end(); + const rec = await pool.query('SELECT node_id FROM recorders WHERE id = $1', [id]); + if (rec.rows.length === 0) return res.status(404).json({ error: 'Recorder not found' }); + + const ct = rest.endsWith('.m3u8') ? 'application/vnd.apple.mpegurl' + : rest.endsWith('.ts') ? 'video/mp2t' + : 'application/octet-stream'; + res.set('Cache-Control', 'no-cache'); + res.set('Content-Type', ct); + + const target = await resolveNodeTarget(rec.rows[0].node_id); + if (!target.remote) { + return fs.readFile('/live/' + rest, (err, data) => { + if (err) return res.status(404).end(); + res.end(data); + }); + } + const base = String(target.apiUrl).replace(/\/$/, ''); + const upstream = await fetch(`${base}/live/${rest}`).catch(() => null); + if (!upstream || !upstream.ok) return res.status(upstream ? upstream.status : 502).end(); + res.end(Buffer.from(await upstream.arrayBuffer())); + } catch (err) { next(err); } +}); + export default router; diff --git a/services/node-agent/index.js b/services/node-agent/index.js index 5d132eb..441d509 100644 --- a/services/node-agent/index.js +++ b/services/node-agent/index.js @@ -8,7 +8,7 @@ const NODE_ROLE = process.env.NODE_ROLE || 'worker'; const AGENT_PORT = parseInt(process.env.AGENT_PORT || '7436', 10); const HEARTBEAT_MS = parseInt(process.env.HEARTBEAT_MS || '30000', 10); const LIVE_DIR = process.env.LIVE_DIR || '/mnt/NVME/MAM/wild-dragon-live'; -const VERSION = '1.2.0'; +const VERSION = '1.3.0'; // Pick the host's LAN IP. Inside a bridge-mode container, // os.networkInterfaces() returns the container's docker-bridge IP (172.x), @@ -116,6 +116,9 @@ async function handleSidecarStart(body, res) { } const containerId = createRes.data.Id; + const _u = (env.find(e => e.startsWith('MAM_API_URL=')) || '').slice(12); + const _tok = env.some(e => e.startsWith('MAM_API_TOKEN=') && e.length > 14); + console.log(`[sidecar-start] ${containerId} image=${image} src=${sourceType} MAM_API_URL=${_u} token=${_tok}`); const startRes = await dockerApi('POST', `/containers/${containerId}/start`); if (startRes.status !== 204) { await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {}); @@ -128,12 +131,40 @@ async function handleSidecarStart(body, res) { } } +async function fetchContainerLogs(containerId) { + return await new Promise((resolve) => { + const options = { + socketPath: '/var/run/docker.sock', + path: `/v1.43/containers/${containerId}/logs?stdout=1&stderr=1&tail=200`, + method: 'GET', + }; + const req = http.request(options, res => { + const chunks = []; + res.on('data', c => chunks.push(c)); + res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8').replace(/[\x00-\x08]/g, ''))); + }); + req.on('error', () => resolve('(log fetch failed)')); + req.end(); + }); +} + async function handleSidecarStop(containerId, res) { try { - await dockerApi('POST', `/containers/${containerId}/stop`).catch(() => {}); + console.log(`[sidecar-stop] stopping ${containerId} (grace 180s)...`); + // Grace period must exceed the capture container's shutdown work + // (finalise ffmpeg session + register asset via callback). Default + // docker stop is only 10s, which SIGKILLs capture mid-finalise and + // loses the POST /assets callback -> asset stuck 'live', no jobs. + await dockerApi('POST', `/containers/${containerId}/stop?t=180`).catch(() => {}); + // Dump the capture container's shutdown logs into our persistent log + // BEFORE removing it, so failed callbacks are diagnosable. + const logs = await fetchContainerLogs(containerId); + console.log(`[sidecar-stop] ==== capture logs for ${containerId} ====\n${logs}\n[sidecar-stop] ==== end logs ====`); + // Container has now exited gracefully (or hit the 180s cap); remove it. await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {}); jsonResponse(res, 200, { ok: true }); } catch (err) { + console.error(`[sidecar-stop] error: ${err.message}`); jsonResponse(res, 500, { error: err.message }); } } @@ -188,6 +219,71 @@ function sampleCpu() { }); } + +// -- Live GPU utilization sampling ----------------------------------------- +// Spawns a short-lived nvidia container via Docker API on each heartbeat call. +// Returns array of { index, util_pct, mem_used_mb, mem_total_mb } per GPU, +// or [] if no GPUs / nvidia runtime unavailable. +async function sampleGpuUtil() { + if (!_gpuCache || _gpuCache.length === 0) return []; + + const QUERY = '--query-gpu=index,utilization.gpu,memory.used,memory.total'; + const FMT = '--format=csv,noheader,nounits'; + + let containerId; + try { + const createRes = await dockerApi('POST', '/containers/create', { + Image: 'ubuntu:22.04', + Cmd: ['nvidia-smi', QUERY, FMT], + HostConfig: { + AutoRemove: false, + Runtime: 'nvidia', + DeviceRequests: [{ Driver: 'nvidia', Count: -1, Capabilities: [['gpu']] }], + }, + }); + if (createRes.status !== 201) return []; + containerId = createRes.data.Id; + + await dockerApi('POST', `/containers/${containerId}/start`); + + for (let i = 0; i < 10; i++) { + await new Promise(r => setTimeout(r, 400)); + const inspect = await dockerApi('GET', `/containers/${containerId}/json`); + if (!inspect.data?.State?.Running) break; + } + + const logRes = await new Promise((resolve, reject) => { + const options = { + socketPath: '/var/run/docker.sock', + path: `/v1.43/containers/${containerId}/logs?stdout=1&stderr=0`, + method: 'GET', + }; + const req = http.request(options, res => { + const chunks = []; + res.on('data', c => chunks.push(c)); + res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); + }); + req.on('error', reject); + req.end(); + }); + + const text = logRes.replace(/[\x00-\x07].{7}/g, '').trim(); + const lines = text.split('\n').filter(l => /^\d+,/.test(l.trim())); + + return lines.map(line => { + const [idx, util, memUsed, memTotal] = line.split(',').map(s => parseInt(s.trim(), 10)); + return { index: idx, util_pct: util, mem_used_mb: memUsed, mem_total_mb: memTotal }; + }); + } catch (err) { + console.warn('[gpu-util] sampling failed:', err.message); + return []; + } finally { + if (containerId) { + await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {}); + } + } +} + // ── Hardware detection ──────────────────────────────────────────────────── // GPU_COUNT / BMD_COUNT env vars override filesystem detection when /dev isn't mapped // Cached GPU info from nvidia-smi — populated once at startup via Docker API. @@ -351,6 +447,7 @@ function detectHardware() { // ── Heartbeat ───────────────────────────────────────────────────────────── async function heartbeat() { const cpu_usage = await sampleCpu(); + const gpu_util = await sampleGpuUtil(); const totalMem = os.totalmem(); const freeMem = os.freemem(); const ip_address = getIp(); @@ -366,6 +463,7 @@ async function heartbeat() { mem_used_mb: Math.round((totalMem - freeMem) / 1048576), mem_total_mb: Math.round(totalMem / 1048576), capabilities, + gpu_util, }; const headers = { 'Content-Type': 'application/json' }; @@ -402,6 +500,22 @@ probeGpusViaSmi().then(() => { setInterval(heartbeat, HEARTBEAT_MS); }); +// Serve the local HLS live-preview files (written by the capture sidecar to +// LIVE_DIR) so the primary can reverse-proxy them to the browser. Read-only. +function serveLiveFile(pathname, res) { + const rel = decodeURIComponent(pathname.slice('/live/'.length)); + if (!rel || rel.includes('..') || rel.startsWith('/')) { res.writeHead(403); return res.end(); } + const filePath = LIVE_DIR + '/' + rel; + fs.readFile(filePath, (err, data) => { + if (err) { res.writeHead(404); return res.end(); } + const ct = filePath.endsWith('.m3u8') ? 'application/vnd.apple.mpegurl' + : filePath.endsWith('.ts') ? 'video/mp2t' + : 'application/octet-stream'; + res.writeHead(200, { 'Content-Type': ct, 'Cache-Control': 'no-cache', 'Access-Control-Allow-Origin': '*' }); + res.end(data); + }); +} + // ── HTTP server ─────────────────────────────────────────────────────────── const server = http.createServer((req, res) => { const { pathname } = new URL(req.url, 'http://localhost'); @@ -431,6 +545,9 @@ const server = http.createServer((req, res) => { const id = pathname.slice('/sidecar/'.length, -'/status'.length); handleSidecarStatus(id, res); + } else if (req.method === 'GET' && pathname.startsWith('/live/')) { + serveLiveFile(pathname, res); + } else { res.writeHead(404); res.end(); diff --git a/services/web-ui/public/screens-home.jsx b/services/web-ui/public/screens-home.jsx index 8b14e02..de45bcf 100644 --- a/services/web-ui/public/screens-home.jsx +++ b/services/web-ui/public/screens-home.jsx @@ -660,7 +660,7 @@ function OnAirTile({ recorder, onClick }) {
{recorder.live_asset_id - ? + ? : } diff --git a/services/web-ui/public/screens-ingest.jsx b/services/web-ui/public/screens-ingest.jsx index 1ed3be1..ef2d7f0 100644 --- a/services/web-ui/public/screens-ingest.jsx +++ b/services/web-ui/public/screens-ingest.jsx @@ -384,13 +384,15 @@ function YouTubeImport({ navigate }) { and nginx.conf); we attach hls.js to a