dragonflight/services/node-agent/index.js
Zac Gaetano f2542bc929 feat(nvenc): GPU sidecar passthrough + All-Intra HEVC capture codec
Phase 0.2 of the NVENC All-Intra HEVC ingest plan.

node-agent/handleSidecarStart:
- Accept useGpu: true in the sidecar start body
- When useGpu: adds Runtime=nvidia, DeviceRequests=[gpu], and injects
  NVIDIA_VISIBLE_DEVICES=all + NVIDIA_DRIVER_CAPABILITIES=video,compute,utility
  into the container env. CPU-codec recorders are unaffected (useGpu defaults false).

mam-api/recorders (start endpoint):
- Derive useGpu from recorder.recording_codec — true for hevc_nvenc/h264_nvenc
- Pass useGpu to remote sidecar start body
- Apply same Runtime/DeviceRequests to the local Docker spawn path

capture/capture-manager:
- Update hevc_nvenc codec entry with all-intra flags:
  -g 1 -bf 0 (every frame IDR, no B-frames — required for growing-file
  edit-while-record), -rc vbr, -profile:v main10, pixFmt p010le (10-bit 4:2:0)

Next: validation gate (§8) — test MXF OP1a then fragmented MOV on one
DeckLink channel, mount in Premiere while recording.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 12:35:23 -04:00

591 lines
23 KiB
JavaScript

import http from 'http';
import os from 'os';
import fs from 'fs';
const MAM_API_URL = (process.env.MAM_API_URL || 'http://localhost:3000').replace(/\/$/, '');
const NODE_TOKEN = process.env.NODE_TOKEN || '';
const NODE_ROLE = process.env.NODE_ROLE || 'worker';
const AGENT_PORT = parseInt(process.env.AGENT_PORT || '7436', 10);
const HEARTBEAT_MS = parseInt(process.env.HEARTBEAT_MS || '30000', 10);
const LIVE_DIR = process.env.LIVE_DIR || '/mnt/NVME/MAM/wild-dragon-live';
const VERSION = '1.3.0';
// Pick the host's LAN IP. Inside a bridge-mode container,
// os.networkInterfaces() returns the container's docker-bridge IP (172.x),
// not the host's LAN address. Two strategies:
// 1. honour an explicit NODE_IP override (set by onboard-node.sh)
// 2. filter out interfaces that obviously belong to container/virtual
// bridges and down-rank the docker bridge range so real LAN IPs win
//
// Combine with `network_mode: host` in docker-compose.worker.yml for the
// most reliable result — that lets os.networkInterfaces() see the host's
// real adapters directly.
function getIp() {
if (process.env.NODE_IP) return process.env.NODE_IP;
const SKIP_IFACE = /^(lo|docker\d*|br-|veth|cni|flannel|cali|tun|tap|virbr|kube)/i;
const isContainerBridge = (ip) => /^172\.(1[6-9]|2\d|3[01])\./.test(ip);
const candidates = [];
for (const [name, addrs] of Object.entries(os.networkInterfaces())) {
if (SKIP_IFACE.test(name)) continue;
for (const a of (addrs || [])) {
if (a.family !== 'IPv4' || a.internal) continue;
candidates.push({ name, address: a.address, rank: isContainerBridge(a.address) ? 1 : 0 });
}
}
candidates.sort((a, b) => a.rank - b.rank);
return candidates.length ? candidates[0].address : null;
}
// ── Docker API helper (talks to local /var/run/docker.sock) ───────────────
function dockerApi(method, path, body = null) {
return new Promise((resolve, reject) => {
const options = {
socketPath: '/var/run/docker.sock',
path: `/v1.43${path}`,
method,
headers: { 'Content-Type': 'application/json' },
};
const req = http.request(options, (res) => {
let data = '';
res.on('data', chunk => data += chunk);
res.on('end', () => {
try { resolve({ status: res.statusCode, data: data ? JSON.parse(data) : {} }); }
catch { resolve({ status: res.statusCode, data }); }
});
});
req.on('error', reject);
if (body) req.write(JSON.stringify(body));
req.end();
});
}
function readBody(req) {
return new Promise((resolve, reject) => {
let data = '';
req.on('data', chunk => data += chunk);
req.on('end', () => {
try { resolve(JSON.parse(data || '{}')); }
catch { reject(new Error('Invalid JSON')); }
});
req.on('error', reject);
});
}
function jsonResponse(res, status, body) {
res.writeHead(status, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(body));
}
// ── Sidecar: spawn a capture container on this node ───────────────────────
async function handleSidecarStart(body, res) {
try {
const {
image = 'wild-dragon-capture:latest',
env = [],
capturePort = 3001,
sourceType = 'sdi',
// useGpu: true → attach NVIDIA runtime + NVIDIA_VISIBLE_DEVICES so the
// sidecar can call hevc_nvenc / h264_nvenc inside capture ffmpeg.
// Only set this when the recorder codec is GPU-accelerated; CPU codecs
// (ProRes, DNxHR, libx264) don't need it and it avoids a hard dep on the
// NVIDIA container runtime on nodes that have no GPU.
useGpu = false,
} = body;
const binds = [`${LIVE_DIR}:/live`];
if (sourceType === 'sdi') binds.unshift('/dev/blackmagic:/dev/blackmagic');
if (sourceType === 'deltacast') {
// Bind each /dev/deltacast* node that exists on the host into the container.
// If none exist the capture container falls back to test-card (lavfi) mode.
try {
const dcEntries = fs.readdirSync('/dev').filter(n => /^deltacast\d+$/.test(n));
for (const d of dcEntries) binds.push(`/dev/${d}:/dev/${d}`);
} catch (_) { /* /dev always exists */ }
}
// Build the sidecar environment, injecting NVIDIA vars when GPU is requested.
const sidecarEnv = [...env, `PORT=${capturePort}`];
if (useGpu) {
// NVIDIA_VISIBLE_DEVICES=all exposes every GPU on the host.
// For a single-GPU node (zampp2 / L4) this is equivalent to pinning GPU 0.
// When we later store per-recorder GPU affinity in the DB we can pass a
// specific UUID here instead.
sidecarEnv.push('NVIDIA_VISIBLE_DEVICES=all');
sidecarEnv.push('NVIDIA_DRIVER_CAPABILITIES=video,compute,utility');
}
const hostConfig = {
NetworkMode: 'host',
Privileged: true,
Binds: binds,
};
if (useGpu) {
// Tell Docker to use the NVIDIA container runtime for this container.
// Equivalent to `docker run --gpus all` / `--runtime=nvidia`.
hostConfig.Runtime = 'nvidia';
hostConfig.DeviceRequests = [
{ Driver: 'nvidia', Count: -1, Capabilities: [['gpu']] },
];
}
const spec = {
Image: image,
Env: sidecarEnv,
HostConfig: hostConfig,
};
const createRes = await dockerApi('POST', '/containers/create', spec);
if (createRes.status !== 201) {
return jsonResponse(res, 502, { error: 'Failed to create container', details: createRes.data });
}
const containerId = createRes.data.Id;
const _u = (env.find(e => e.startsWith('MAM_API_URL=')) || '').slice(12);
const _tok = env.some(e => e.startsWith('MAM_API_TOKEN=') && e.length > 14);
console.log(`[sidecar-start] ${containerId} image=${image} src=${sourceType} MAM_API_URL=${_u} token=${_tok}`);
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
if (startRes.status !== 204) {
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
return jsonResponse(res, 502, { error: 'Failed to start container', details: startRes.data });
}
jsonResponse(res, 201, { containerId, capturePort });
} catch (err) {
jsonResponse(res, 500, { error: err.message });
}
}
async function fetchContainerLogs(containerId) {
return await new Promise((resolve) => {
const options = {
socketPath: '/var/run/docker.sock',
path: `/v1.43/containers/${containerId}/logs?stdout=1&stderr=1&tail=200`,
method: 'GET',
};
const req = http.request(options, res => {
const chunks = [];
res.on('data', c => chunks.push(c));
res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8').replace(/[\x00-\x08]/g, '')));
});
req.on('error', () => resolve('(log fetch failed)'));
req.end();
});
}
async function handleSidecarStop(containerId, res) {
try {
console.log(`[sidecar-stop] stopping ${containerId} (grace 180s)...`);
// Grace period must exceed the capture container's shutdown work
// (finalise ffmpeg session + register asset via callback). Default
// docker stop is only 10s, which SIGKILLs capture mid-finalise and
// loses the POST /assets callback -> asset stuck 'live', no jobs.
await dockerApi('POST', `/containers/${containerId}/stop?t=180`).catch(() => {});
// Dump the capture container's shutdown logs into our persistent log
// BEFORE removing it, so failed callbacks are diagnosable.
const logs = await fetchContainerLogs(containerId);
console.log(`[sidecar-stop] ==== capture logs for ${containerId} ====\n${logs}\n[sidecar-stop] ==== end logs ====`);
// Container has now exited gracefully (or hit the 180s cap); remove it.
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
jsonResponse(res, 200, { ok: true });
} catch (err) {
console.error(`[sidecar-stop] error: ${err.message}`);
jsonResponse(res, 500, { error: err.message });
}
}
async function handleSidecarStatus(containerId, res) {
try {
const inspectRes = await dockerApi('GET', `/containers/${containerId}/json`);
if (inspectRes.status !== 200) {
return jsonResponse(res, 200, { running: false });
}
const container = inspectRes.data;
const running = container.State?.Running || false;
const startedAt = container.State?.StartedAt;
// Derive capturePort from the container's env so we know where to poll.
const portEnv = (container.Config?.Env || []).find(e => e.startsWith('PORT='));
const capturePort = portEnv ? parseInt(portEnv.split('=')[1], 10) : 3001;
let live = null;
if (running) {
try {
const captureRes = await fetch(`http://127.0.0.1:${capturePort}/capture/status`, {
signal: AbortSignal.timeout(2000),
});
if (captureRes.ok) live = await captureRes.json();
} catch (_) {}
}
jsonResponse(res, 200, { running, startedAt, live });
} catch (err) {
jsonResponse(res, 500, { error: err.message });
}
}
// ── CPU sampling (500ms window) ───────────────────────────────────────────
function sampleCpu() {
return new Promise(resolve => {
const s1 = os.cpus();
setTimeout(() => {
const s2 = os.cpus();
let idle = 0, total = 0;
s2.forEach((cpu, i) => {
for (const t of Object.keys(cpu.times)) {
const d = cpu.times[t] - (s1[i]?.times[t] ?? 0);
total += d;
if (t === 'idle') idle += d;
}
});
resolve(total > 0 ? Math.round((1 - idle / total) * 10000) / 100 : 0);
}, 500);
});
}
// -- Live GPU utilization sampling -----------------------------------------
// Spawns a short-lived nvidia container via Docker API on each heartbeat call.
// Returns array of { index, util_pct, mem_used_mb, mem_total_mb } per GPU,
// or [] if no GPUs / nvidia runtime unavailable.
async function sampleGpuUtil() {
if (!_gpuCache || _gpuCache.length === 0) return [];
const QUERY = '--query-gpu=index,utilization.gpu,memory.used,memory.total';
const FMT = '--format=csv,noheader,nounits';
let containerId;
try {
const createRes = await dockerApi('POST', '/containers/create', {
Image: 'ubuntu:22.04',
Cmd: ['nvidia-smi', QUERY, FMT],
HostConfig: {
AutoRemove: false,
Runtime: 'nvidia',
DeviceRequests: [{ Driver: 'nvidia', Count: -1, Capabilities: [['gpu']] }],
},
});
if (createRes.status !== 201) return [];
containerId = createRes.data.Id;
await dockerApi('POST', `/containers/${containerId}/start`);
for (let i = 0; i < 10; i++) {
await new Promise(r => setTimeout(r, 400));
const inspect = await dockerApi('GET', `/containers/${containerId}/json`);
if (!inspect.data?.State?.Running) break;
}
const logRes = await new Promise((resolve, reject) => {
const options = {
socketPath: '/var/run/docker.sock',
path: `/v1.43/containers/${containerId}/logs?stdout=1&stderr=0`,
method: 'GET',
};
const req = http.request(options, res => {
const chunks = [];
res.on('data', c => chunks.push(c));
res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
});
req.on('error', reject);
req.end();
});
const text = logRes.replace(/[\x00-\x07].{7}/g, '').trim();
const lines = text.split('\n').filter(l => /^\d+,/.test(l.trim()));
return lines.map(line => {
const [idx, util, memUsed, memTotal] = line.split(',').map(s => parseInt(s.trim(), 10));
return { index: idx, util_pct: util, mem_used_mb: memUsed, mem_total_mb: memTotal };
});
} catch (err) {
console.warn('[gpu-util] sampling failed:', err.message);
return [];
} finally {
if (containerId) {
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
}
}
}
// ── Hardware detection ────────────────────────────────────────────────────
// GPU_COUNT / BMD_COUNT env vars override filesystem detection when /dev isn't mapped
// Cached GPU info from nvidia-smi — populated once at startup via Docker API.
// null = not yet probed; [] = probed but no GPUs or no runtime.
let _gpuCache = null;
async function probeGpusViaSmi() {
// Use Docker API (socket) to run nvidia-smi inside a GPU-enabled container.
// The NVIDIA Container Runtime injects nvidia-smi + libs into ubuntu:22.04.
// This sidesteps the Alpine/glibc incompatibility in the node-agent image.
const QUERY = '--query-gpu=index,name,memory.total,driver_version';
const FMT = '--format=csv,noheader,nounits';
let containerId;
try {
// Create container with nvidia runtime and GPU access
const createRes = await dockerApi('POST', '/containers/create', {
Image: 'ubuntu:22.04',
Cmd: ['nvidia-smi', QUERY, FMT],
HostConfig: {
AutoRemove: false,
Runtime: 'nvidia',
DeviceRequests: [{ Driver: 'nvidia', Count: -1, Capabilities: [['gpu']] }],
},
});
if (createRes.status !== 201) return;
containerId = createRes.data.Id;
await dockerApi('POST', `/containers/${containerId}/start`);
// Wait for it to finish (poll status)
for (let i = 0; i < 20; i++) {
await new Promise(r => setTimeout(r, 500));
const inspect = await dockerApi('GET', `/containers/${containerId}/json`);
if (!inspect.data?.State?.Running) break;
}
// Grab stdout logs
const logRes = await new Promise((resolve, reject) => {
const options = {
socketPath: '/var/run/docker.sock',
path: `/v1.43/containers/${containerId}/logs?stdout=1&stderr=0`,
method: 'GET',
};
const req = http.request(options, res => {
const chunks = [];
res.on('data', c => chunks.push(c));
res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
});
req.on('error', reject);
req.end();
});
// Docker log frames have an 8-byte header — strip them
const text = logRes.replace(/[\x00-\x07].{7}/g, '').trim();
const lines = text.split('\n').filter(l => /^\d+,/.test(l.trim()));
if (lines.length) {
_gpuCache = lines.map(line => {
const [idx, name, memMiB, driver] = line.split(',').map(s => s.trim());
return {
index: parseInt(idx, 10),
name: name,
memory_mb: parseInt(memMiB, 10),
driver: driver,
device: `/dev/nvidia${idx}`,
type: 'nvidia',
};
});
console.log(`[gpu] detected ${_gpuCache.length} GPU(s) via nvidia-smi: ${_gpuCache.map(g => g.name).join(', ')}`);
} else {
_gpuCache = [];
}
} catch (err) {
console.warn('[gpu] nvidia-smi probe failed:', err.message);
_gpuCache = [];
} finally {
if (containerId) {
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
}
}
}
function detectHardware() {
const capabilities = { gpus: [], blackmagic: [], deltacast: [] };
// Issue #108 — previously GPU_COUNT short-circuited the entire detection
// path, throwing away the nvidia-smi enrichment (model, memory, driver
// version). Now: override sets the *count*, but if nvidia-smi successfully
// probed at startup we keep its rich entries for the overridden indexes.
const gpuOverride = parseInt(process.env.GPU_COUNT || '-1', 10);
if (gpuOverride >= 0) {
for (let i = 0; i < gpuOverride; i++) {
const enriched = (_gpuCache || []).find(g => g.index === i);
capabilities.gpus.push(enriched || { device: `/dev/nvidia${i}`, type: 'nvidia', index: i });
}
} else if (_gpuCache !== null && _gpuCache.length > 0) {
capabilities.gpus = _gpuCache;
} else {
for (let i = 0; i < 16; i++) {
try {
fs.accessSync(`/dev/nvidia${i}`, fs.constants.F_OK);
capabilities.gpus.push({ device: `/dev/nvidia${i}`, type: 'nvidia', index: i });
} catch (_) { break; }
}
}
// Issue #109 — DeckLink device naming differs by model: capture-only cards
// expose /dev/blackmagic/dv${i}, while DeckLink IO / Quad cards expose
// /dev/blackmagic/io${i}. The previous code hardcoded `dv` which broke
// capability reporting on every IO / Quad node. Detect what's actually
// present and only synthesize names when /dev/blackmagic isn't mounted.
const bmdOverride = parseInt(process.env.BMD_COUNT || '-1', 10);
try {
const bmdEntries = fs.readdirSync('/dev/blackmagic');
// Filesystem wins — devices speak for themselves.
capabilities.blackmagic = bmdEntries.map((d, i) => ({ device: `/dev/blackmagic/${d}`, index: i }));
} catch (_) {
// No /dev/blackmagic mount — fall back to the BMD_COUNT override.
if (bmdOverride >= 0) {
const namePrefix = (process.env.BMD_DEVICE_PREFIX || 'dv').replace(/[^a-z]/gi, '');
for (let i = 0; i < bmdOverride; i++) {
capabilities.blackmagic.push({ device: `/dev/blackmagic/${namePrefix}${i}`, index: i });
}
}
}
// Best-effort model name from BMD_MODEL env (set manually) — used by the UI
// to render the correct card layout (Duo 2, Quad 2, Mini Recorder, ...).
if (process.env.BMD_MODEL) capabilities.blackmagic_model = process.env.BMD_MODEL;
// Deltacast SDI cards — enumerate /dev/deltacast* device nodes.
// DELTACAST_PORT_COUNT env overrides when devices aren't mapped (test/dev mode).
const dcOverride = parseInt(process.env.DELTACAST_PORT_COUNT || '-1', 10);
try {
const dcEntries = fs.readdirSync('/dev').filter(n => /^deltacast\d+$/.test(n)).sort();
if (dcEntries.length > 0) {
capabilities.deltacast = dcEntries.map((d, i) => ({
device: `/dev/${d}`,
index: i,
present: true,
}));
} else if (dcOverride >= 0) {
// No device nodes but count is configured — test-card mode.
for (let i = 0; i < dcOverride; i++) {
capabilities.deltacast.push({ device: `/dev/deltacast${i}`, index: i, present: false });
}
}
} catch (_) {
if (dcOverride >= 0) {
for (let i = 0; i < dcOverride; i++) {
capabilities.deltacast.push({ device: `/dev/deltacast${i}`, index: i, present: false });
}
}
}
if (process.env.DELTACAST_MODEL) capabilities.deltacast_model = process.env.DELTACAST_MODEL;
return capabilities;
}
// ── Heartbeat ─────────────────────────────────────────────────────────────
async function heartbeat() {
const cpu_usage = await sampleCpu();
const gpu_util = await sampleGpuUtil();
const totalMem = os.totalmem();
const freeMem = os.freemem();
const ip_address = getIp();
const capabilities = detectHardware();
const payload = {
hostname: os.hostname(),
ip_address,
role: NODE_ROLE,
version: VERSION,
api_url: `http://${ip_address || os.hostname()}:${AGENT_PORT}`,
cpu_usage,
mem_used_mb: Math.round((totalMem - freeMem) / 1048576),
mem_total_mb: Math.round(totalMem / 1048576),
capabilities,
gpu_util,
};
const headers = { 'Content-Type': 'application/json' };
if (NODE_TOKEN) headers['Authorization'] = `Bearer ${NODE_TOKEN}`;
try {
const res = await fetch(`${MAM_API_URL}/api/v1/cluster/heartbeat`, {
method: 'POST',
headers,
body: JSON.stringify(payload),
signal: AbortSignal.timeout(8000),
});
if (res.ok) {
const gpuStr = capabilities.gpus.length ? ` gpu=${capabilities.gpus.length}` : '';
const bmdStr = capabilities.blackmagic.length ? ` bmd=${capabilities.blackmagic.length}` : '';
const dcStr = capabilities.deltacast.length ? ` dc=${capabilities.deltacast.length}` : '';
process.stdout.write(
`[hb] ${payload.hostname} ip=${ip_address || '?'} cpu=${cpu_usage}% ` +
`mem=${payload.mem_used_mb}/${payload.mem_total_mb}MB${gpuStr}${bmdStr}${dcStr}\n`
);
} else {
const txt = await res.text().catch(() => '');
console.error(`[hb] ${res.status} ${txt.slice(0, 120)}`);
}
} catch (err) {
console.error(`[hb] failed — ${err.message}`);
}
}
// Probe GPU info once at startup (async, before first heartbeat).
// _gpuCache is populated here; heartbeats use the cached value.
probeGpusViaSmi().then(() => {
heartbeat();
setInterval(heartbeat, HEARTBEAT_MS);
});
// Serve the local HLS live-preview files (written by the capture sidecar to
// LIVE_DIR) so the primary can reverse-proxy them to the browser. Read-only.
function serveLiveFile(pathname, res) {
const rel = decodeURIComponent(pathname.slice('/live/'.length));
if (!rel || rel.includes('..') || rel.startsWith('/')) { res.writeHead(403); return res.end(); }
const filePath = LIVE_DIR + '/' + rel;
fs.readFile(filePath, (err, data) => {
if (err) { res.writeHead(404); return res.end(); }
const ct = filePath.endsWith('.m3u8') ? 'application/vnd.apple.mpegurl'
: filePath.endsWith('.ts') ? 'video/mp2t'
: 'application/octet-stream';
res.writeHead(200, { 'Content-Type': ct, 'Cache-Control': 'no-cache', 'Access-Control-Allow-Origin': '*' });
res.end(data);
});
}
// ── HTTP server ───────────────────────────────────────────────────────────
const server = http.createServer((req, res) => {
const { pathname } = new URL(req.url, 'http://localhost');
if (req.method === 'GET' && pathname === '/health') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
ok: true,
hostname: os.hostname(),
uptime: Math.round(process.uptime()),
version: VERSION,
role: NODE_ROLE,
ip: getIp(),
}));
} else if (req.method === 'POST' && pathname === '/sidecar/start') {
readBody(req)
.then(body => handleSidecarStart(body, res))
.catch(() => jsonResponse(res, 400, { error: 'Invalid request body' }));
} else if (req.method === 'DELETE' && pathname.startsWith('/sidecar/')) {
const id = pathname.slice('/sidecar/'.length);
if (!id || id.includes('/')) { res.writeHead(404); return res.end(); }
handleSidecarStop(id, res);
} else if (req.method === 'GET' && /^\/sidecar\/[^/]+\/status$/.test(pathname)) {
const id = pathname.slice('/sidecar/'.length, -'/status'.length);
handleSidecarStatus(id, res);
} else if (req.method === 'GET' && pathname.startsWith('/live/')) {
serveLiveFile(pathname, res);
} else {
res.writeHead(404);
res.end();
}
});
server.listen(AGENT_PORT, () => {
console.log(`wild-dragon-node-agent v${VERSION}`);
console.log(` Listening :${AGENT_PORT}`);
console.log(` Host IP ${getIp() || '(unresolved)'}`);
console.log(` Primary ${MAM_API_URL}`);
console.log(` Role ${NODE_ROLE}`);
console.log(` Heartbeat every ${HEARTBEAT_MS / 1000}s`);
});