fix(recorder): finalise live asset on stop + add live SDI monitor

Stuck-live fix: capture sidecar now finalises the pre-created live asset by id (new POST /assets/:id/finalize) instead of POSTing a new asset (409 collision); node-agent gives the sidecar a 180s stop grace so the S3 upload + callback complete; node-agent logs sidecar start/stop for diagnostics.

Live SDI monitor: HLS preview is now a 2nd output of the hires ffmpeg (single DeckLink read, split to ProRes/S3 + H.264/HLS); node-agent serves /live over HTTP; mam-api proxies GET /recorders/:id/live/* to the recorder node; web-ui HlsPreview loads from the proxied URL.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Zac Gaetano 2026-05-29 03:20:02 +00:00
parent 500599a955
commit 92b460f503
8 changed files with 267 additions and 13 deletions

View file

@ -58,6 +58,7 @@ services:
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /dev:/dev:ro
- /mnt/NVME/MAM/wild-dragon-live:/mnt/NVME/MAM/wild-dragon-live:ro
devices:
- /dev/blackmagic:/dev/blackmagic

View file

@ -301,12 +301,38 @@ class CaptureManager {
const hiresOutput = growingPath ? growingPath : 'pipe:1';
const hiresStdio = growingPath ? ['ignore', 'ignore', 'pipe'] : ['ignore', 'pipe', 'pipe'];
const hiresProcess = spawn('ffmpeg', [
...inputArgs,
...sdiFilterArgs,
...hiresCodecArgs,
hiresOutput,
], { stdio: hiresStdio });
// For SDI we cannot open the DeckLink device a second time for a preview
// tee, so the live HLS preview is produced as a SECOND OUTPUT of the hires
// ffmpeg: one decklink read -> yadif -> split -> [ProRes/S3] + [H.264/HLS].
let sdiHlsDir = null;
let hiresArgs;
if (sourceType === 'sdi' && this._assetIdForHls) {
const fsMod = await import('node:fs');
sdiHlsDir = '/live/' + this._assetIdForHls;
try { fsMod.mkdirSync(sdiHlsDir, { recursive: true }); } catch (_) {}
hiresArgs = [
...inputArgs,
'-filter_complex', '[0:v]yadif=mode=1:deint=1,split=2[vhi][vlo]',
// Output 0 — ProRes master (S3 pipe or growing file)
'-map', '[vhi]', '-map', '0:a:0?',
...hiresCodecArgs,
hiresOutput,
// Output 1 — low-latency H.264 HLS preview for the UI monitor
'-map', '[vlo]', '-map', '0:a:0?',
'-c:v', 'libx264', '-preset', 'veryfast', '-tune', 'zerolatency',
'-pix_fmt', 'yuv420p', '-b:v', '2M', '-g', '60', '-sc_threshold', '0',
'-c:a', 'aac', '-b:a', '128k', '-ar', '44100',
'-f', 'hls', '-hls_time', '2', '-hls_list_size', '15',
'-hls_flags', 'delete_segments+append_list+omit_endlist',
'-hls_segment_filename', sdiHlsDir + '/seg-%05d.ts',
sdiHlsDir + '/index.m3u8',
];
console.log('[HLS] SDI preview as 2nd output -> ' + sdiHlsDir);
} else {
hiresArgs = [ ...inputArgs, ...sdiFilterArgs, ...hiresCodecArgs, hiresOutput ];
}
const hiresProcess = spawn('ffmpeg', hiresArgs, { stdio: hiresStdio });
const hiresUpload = growingPath
? Promise.resolve({ growingPath })

View file

@ -135,6 +135,22 @@ async function gracefulShutdown(signal) {
console.error('[shutdown] failed to flag empty asset:', e.message);
}
}
} else if (liveAssetId) {
// Finalise the pre-created live asset by id (avoids POST / 409 collision).
try {
const res = await fetch(`${MAM_API_URL}/api/v1/assets/${liveAssetId}/finalize`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...(MAM_API_TOKEN ? { 'Authorization': `Bearer ${MAM_API_TOKEN}` } : {}) },
body: JSON.stringify({ hiresKey: completed.hiresKey, proxyKey: completed.proxyKey, duration: completed.duration }),
});
if (!res.ok) {
console.warn(`[shutdown] mam-api finalize returned ${res.status}: ${await res.text()}`);
} else {
console.log('[shutdown] live asset finalised with mam-api');
}
} catch (mamErr) {
console.error('[shutdown] failed to finalise asset:', mamErr.message);
}
} else {
try {
const res = await fetch(`${MAM_API_URL}/api/v1/assets`, {

View file

@ -373,6 +373,64 @@ router.post('/:id/mark-empty', async (req, res, next) => {
} catch (err) { next(err); }
});
// POST /:id/finalize
// Capture sidecar calls this on a SUCCESSFUL recording stop to finalise the
// pre-created 'live' asset (created at recorder start, id passed as ASSET_ID).
// Previously the sidecar did POST / to create a NEW asset, which collided with
// the existing live row -> 409 -> asset stuck 'live', no jobs. Finalising by id
// flips it out of 'live', records duration + S3 keys, and kicks off the
// proxy -> thumbnail -> filmstrip job chain.
router.post('/:id/finalize', async (req, res, next) => {
try {
const { id } = req.params;
const { hiresKey, proxyKey, duration } = req.body;
const check = await pool.query(`SELECT * FROM assets WHERE id = $1`, [id]);
if (check.rows.length === 0) return res.status(404).json({ error: 'Asset not found' });
const current = check.rows[0].status;
// Already terminal — idempotent no-op (handles shutdown retries).
if (current === 'ready' || current === 'error') {
return res.status(200).json({ ...check.rows[0], skipped: true });
}
const durationNum = duration !== undefined && duration !== null ? Number(duration) : null;
const durationMs = (durationNum !== null && Number.isFinite(durationNum)) ? Math.round(durationNum * 1000) : null;
const upd = await pool.query(
`UPDATE assets
SET status = 'processing',
original_s3_key = COALESCE($2, original_s3_key),
proxy_s3_key = COALESCE($3, proxy_s3_key),
duration_ms = COALESCE($4, duration_ms),
updated_at = NOW()
WHERE id = $1
RETURNING *`,
[id, hiresKey || null, proxyKey || null, durationMs]
);
const asset = upd.rows[0];
const thumbnailKey = `thumbnails/${id}.jpg`;
if (asset.proxy_s3_key) {
// Proxy already produced by the capture sidecar — just build the
// thumbnail (which then chains filmstrip). Worker flips status->ready.
await thumbnailQueue.add('generate', { assetId: id, proxyKey: asset.proxy_s3_key, outputKey: thumbnailKey });
console.log(`[assets] finalize ${id}: queued thumbnail (proxy present)`);
} else if (asset.original_s3_key) {
// No proxy yet — generate it from the hi-res master. The proxy worker
// chains thumbnail -> filmstrip on completion.
const generatedProxyKey = `proxies/${id}.mp4`;
await proxyQueue.add('generate', { assetId: id, inputKey: asset.original_s3_key, outputKey: generatedProxyKey });
console.log(`[assets] finalize ${id}: queued proxy from master`);
} else {
await pool.query(`UPDATE assets SET status = 'ready', updated_at = NOW() WHERE id = $1`, [id]);
asset.status = 'ready';
}
console.log(`[assets] finalized live asset ${id} (${asset.display_name}) -> ${asset.status}`);
res.json(asset);
} catch (err) { next(err); }
});
// POST /:id/generate-proxy
router.post('/:id/generate-proxy', async (req, res, next) => {
try {

View file

@ -1,5 +1,6 @@
import express from 'express';
import http from 'http';
import fs from 'fs';
import net from 'net';
import dgram from 'dgram';
import pool from '../db/pool.js';
@ -870,4 +871,37 @@ function probeUdp(host, port) {
});
}
// GET /:id/live/* — reverse-proxy the live HLS preview from the recorder's node.
// Remote recorders: segments live on the worker node, served by its node-agent
// (/live/...). Local recorders: served from this host's /live mount. Browser
// media requests carry the session cookie (same-origin) so auth passes.
router.get('/:id/live/:rest(*)', async (req, res, next) => {
try {
const { id } = req.params;
const rest = req.params.rest;
if (!rest || rest.includes('..')) return res.status(400).end();
const rec = await pool.query('SELECT node_id FROM recorders WHERE id = $1', [id]);
if (rec.rows.length === 0) return res.status(404).json({ error: 'Recorder not found' });
const ct = rest.endsWith('.m3u8') ? 'application/vnd.apple.mpegurl'
: rest.endsWith('.ts') ? 'video/mp2t'
: 'application/octet-stream';
res.set('Cache-Control', 'no-cache');
res.set('Content-Type', ct);
const target = await resolveNodeTarget(rec.rows[0].node_id);
if (!target.remote) {
return fs.readFile('/live/' + rest, (err, data) => {
if (err) return res.status(404).end();
res.end(data);
});
}
const base = String(target.apiUrl).replace(/\/$/, '');
const upstream = await fetch(`${base}/live/${rest}`).catch(() => null);
if (!upstream || !upstream.ok) return res.status(upstream ? upstream.status : 502).end();
res.end(Buffer.from(await upstream.arrayBuffer()));
} catch (err) { next(err); }
});
export default router;

View file

@ -8,7 +8,7 @@ const NODE_ROLE = process.env.NODE_ROLE || 'worker';
const AGENT_PORT = parseInt(process.env.AGENT_PORT || '7436', 10);
const HEARTBEAT_MS = parseInt(process.env.HEARTBEAT_MS || '30000', 10);
const LIVE_DIR = process.env.LIVE_DIR || '/mnt/NVME/MAM/wild-dragon-live';
const VERSION = '1.2.0';
const VERSION = '1.3.0';
// Pick the host's LAN IP. Inside a bridge-mode container,
// os.networkInterfaces() returns the container's docker-bridge IP (172.x),
@ -116,6 +116,9 @@ async function handleSidecarStart(body, res) {
}
const containerId = createRes.data.Id;
const _u = (env.find(e => e.startsWith('MAM_API_URL=')) || '').slice(12);
const _tok = env.some(e => e.startsWith('MAM_API_TOKEN=') && e.length > 14);
console.log(`[sidecar-start] ${containerId} image=${image} src=${sourceType} MAM_API_URL=${_u} token=${_tok}`);
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
if (startRes.status !== 204) {
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
@ -128,12 +131,40 @@ async function handleSidecarStart(body, res) {
}
}
async function fetchContainerLogs(containerId) {
return await new Promise((resolve) => {
const options = {
socketPath: '/var/run/docker.sock',
path: `/v1.43/containers/${containerId}/logs?stdout=1&stderr=1&tail=200`,
method: 'GET',
};
const req = http.request(options, res => {
const chunks = [];
res.on('data', c => chunks.push(c));
res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8').replace(/[\x00-\x08]/g, '')));
});
req.on('error', () => resolve('(log fetch failed)'));
req.end();
});
}
async function handleSidecarStop(containerId, res) {
try {
await dockerApi('POST', `/containers/${containerId}/stop`).catch(() => {});
console.log(`[sidecar-stop] stopping ${containerId} (grace 180s)...`);
// Grace period must exceed the capture container's shutdown work
// (finalise ffmpeg session + register asset via callback). Default
// docker stop is only 10s, which SIGKILLs capture mid-finalise and
// loses the POST /assets callback -> asset stuck 'live', no jobs.
await dockerApi('POST', `/containers/${containerId}/stop?t=180`).catch(() => {});
// Dump the capture container's shutdown logs into our persistent log
// BEFORE removing it, so failed callbacks are diagnosable.
const logs = await fetchContainerLogs(containerId);
console.log(`[sidecar-stop] ==== capture logs for ${containerId} ====\n${logs}\n[sidecar-stop] ==== end logs ====`);
// Container has now exited gracefully (or hit the 180s cap); remove it.
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
jsonResponse(res, 200, { ok: true });
} catch (err) {
console.error(`[sidecar-stop] error: ${err.message}`);
jsonResponse(res, 500, { error: err.message });
}
}
@ -188,6 +219,71 @@ function sampleCpu() {
});
}
// -- Live GPU utilization sampling -----------------------------------------
// Spawns a short-lived nvidia container via Docker API on each heartbeat call.
// Returns array of { index, util_pct, mem_used_mb, mem_total_mb } per GPU,
// or [] if no GPUs / nvidia runtime unavailable.
async function sampleGpuUtil() {
if (!_gpuCache || _gpuCache.length === 0) return [];
const QUERY = '--query-gpu=index,utilization.gpu,memory.used,memory.total';
const FMT = '--format=csv,noheader,nounits';
let containerId;
try {
const createRes = await dockerApi('POST', '/containers/create', {
Image: 'ubuntu:22.04',
Cmd: ['nvidia-smi', QUERY, FMT],
HostConfig: {
AutoRemove: false,
Runtime: 'nvidia',
DeviceRequests: [{ Driver: 'nvidia', Count: -1, Capabilities: [['gpu']] }],
},
});
if (createRes.status !== 201) return [];
containerId = createRes.data.Id;
await dockerApi('POST', `/containers/${containerId}/start`);
for (let i = 0; i < 10; i++) {
await new Promise(r => setTimeout(r, 400));
const inspect = await dockerApi('GET', `/containers/${containerId}/json`);
if (!inspect.data?.State?.Running) break;
}
const logRes = await new Promise((resolve, reject) => {
const options = {
socketPath: '/var/run/docker.sock',
path: `/v1.43/containers/${containerId}/logs?stdout=1&stderr=0`,
method: 'GET',
};
const req = http.request(options, res => {
const chunks = [];
res.on('data', c => chunks.push(c));
res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
});
req.on('error', reject);
req.end();
});
const text = logRes.replace(/[\x00-\x07].{7}/g, '').trim();
const lines = text.split('\n').filter(l => /^\d+,/.test(l.trim()));
return lines.map(line => {
const [idx, util, memUsed, memTotal] = line.split(',').map(s => parseInt(s.trim(), 10));
return { index: idx, util_pct: util, mem_used_mb: memUsed, mem_total_mb: memTotal };
});
} catch (err) {
console.warn('[gpu-util] sampling failed:', err.message);
return [];
} finally {
if (containerId) {
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
}
}
}
// ── Hardware detection ────────────────────────────────────────────────────
// GPU_COUNT / BMD_COUNT env vars override filesystem detection when /dev isn't mapped
// Cached GPU info from nvidia-smi — populated once at startup via Docker API.
@ -351,6 +447,7 @@ function detectHardware() {
// ── Heartbeat ─────────────────────────────────────────────────────────────
async function heartbeat() {
const cpu_usage = await sampleCpu();
const gpu_util = await sampleGpuUtil();
const totalMem = os.totalmem();
const freeMem = os.freemem();
const ip_address = getIp();
@ -366,6 +463,7 @@ async function heartbeat() {
mem_used_mb: Math.round((totalMem - freeMem) / 1048576),
mem_total_mb: Math.round(totalMem / 1048576),
capabilities,
gpu_util,
};
const headers = { 'Content-Type': 'application/json' };
@ -402,6 +500,22 @@ probeGpusViaSmi().then(() => {
setInterval(heartbeat, HEARTBEAT_MS);
});
// Serve the local HLS live-preview files (written by the capture sidecar to
// LIVE_DIR) so the primary can reverse-proxy them to the browser. Read-only.
function serveLiveFile(pathname, res) {
const rel = decodeURIComponent(pathname.slice('/live/'.length));
if (!rel || rel.includes('..') || rel.startsWith('/')) { res.writeHead(403); return res.end(); }
const filePath = LIVE_DIR + '/' + rel;
fs.readFile(filePath, (err, data) => {
if (err) { res.writeHead(404); return res.end(); }
const ct = filePath.endsWith('.m3u8') ? 'application/vnd.apple.mpegurl'
: filePath.endsWith('.ts') ? 'video/mp2t'
: 'application/octet-stream';
res.writeHead(200, { 'Content-Type': ct, 'Cache-Control': 'no-cache', 'Access-Control-Allow-Origin': '*' });
res.end(data);
});
}
// ── HTTP server ───────────────────────────────────────────────────────────
const server = http.createServer((req, res) => {
const { pathname } = new URL(req.url, 'http://localhost');
@ -431,6 +545,9 @@ const server = http.createServer((req, res) => {
const id = pathname.slice('/sidecar/'.length, -'/status'.length);
handleSidecarStatus(id, res);
} else if (req.method === 'GET' && pathname.startsWith('/live/')) {
serveLiveFile(pathname, res);
} else {
res.writeHead(404);
res.end();

View file

@ -660,7 +660,7 @@ function OnAirTile({ recorder, onClick }) {
<div className="dash-onair-tile" onClick={onClick}>
<div className="dash-onair-video">
{recorder.live_asset_id
? <HlsPreview assetId={recorder.live_asset_id} />
? <HlsPreview assetId={recorder.live_asset_id} recorderId={recorder.id} />
: <FauxFrame />}
<span className="dash-onair-rec-pip">
<span className="dash-onair-rec-dot" />

View file

@ -384,13 +384,15 @@ function YouTubeImport({ navigate }) {
and nginx.conf); we attach hls.js to a <video> when a recorder is
actively recording and has a live asset.
============================================================ */
function HlsPreview({ assetId, muted = true, controls = false, className }) {
function HlsPreview({ assetId, recorderId, muted = true, controls = false, className }) {
const videoRef = React.useRef(null);
const [err, setErr] = React.useState(null);
React.useEffect(() => {
if (!assetId || !videoRef.current) return;
const url = '/live/' + assetId + '/index.m3u8';
const url = recorderId
? '/api/v1/recorders/' + recorderId + '/live/' + assetId + '/index.m3u8'
: '/live/' + assetId + '/index.m3u8';
const v = videoRef.current;
let destroyed = false;
let retryTimer = 0;
@ -652,7 +654,7 @@ function RecorderRow({ recorder: initialRecorder, onRefresh }) {
<div className={'recorder-row ' + recorder.status}>
<div className="recorder-preview">
{isRec && recorder.live_asset_id
? <HlsPreview assetId={recorder.live_asset_id} />
? <HlsPreview assetId={recorder.live_asset_id} recorderId={recorder.id} />
: isRec
? <LiveStrip seed={recorder.id.length * 3} count={6} />
: <div className="recorder-empty"><Icon name={recorder.status === 'error' ? 'alert' : 'video'} size={20} style={{ opacity: 0.4 }} /></div>}
@ -1057,7 +1059,7 @@ function MonitorTile({ feed, seed }) {
return (
<div className="monitor-tile">
{isLive && feed.live_asset_id
? <HlsPreview assetId={feed.live_asset_id} />
? <HlsPreview assetId={feed.live_asset_id} recorderId={feed.id} />
: <FauxFrame />}
{isLive && <div style={{ position: 'absolute', inset: 0, border: '2px solid var(--live)', pointerEvents: 'none', borderRadius: 'inherit' }} />}
<div style={{ position: 'absolute', top: 8, left: 8, display: 'flex', gap: 6 }}>