The cluster heartbeat upserts cluster_nodes ON CONFLICT (hostname), so two machines reporting the same os.hostname() clobber each other's row. A cloned capture VM whose /etc/hostname was "zampp1" (same as the primary) caused its 4 DeckLink cards to land on the primary's row, then get overwritten by the primary's cardless heartbeat — so the New Recorder modal showed "No SDI devices auto-detected" despite healthy hardware. - node-agent now reports process.env.NODE_NAME || os.hostname() as its cluster identity, so node identity is explicit and collision-proof. - docker-compose.worker.yml exposes NODE_NAME to the container. - onboard-node.sh always writes NODE_NAME to the node .env (defaults to the OS hostname) so future onboarding pins identity even on cloned images. Live remediation already applied to the zampp2 capture node: compose hostname pinned to zampp2 and its node token rebound to zampp2; DB now reports bmd=4 for zampp2. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
841 lines
34 KiB
JavaScript
841 lines
34 KiB
JavaScript
import http from 'http';
|
|
import os from 'os';
|
|
import fs from 'fs';
|
|
|
|
const MAM_API_URL = (process.env.MAM_API_URL || 'http://localhost:3000').replace(/\/$/, '');
|
|
const NODE_TOKEN = process.env.NODE_TOKEN || '';
|
|
const NODE_ROLE = process.env.NODE_ROLE || 'worker';
|
|
// Cluster identity. The heartbeat keys cluster_nodes on hostname (ON CONFLICT
|
|
// (hostname)), so two machines reporting the SAME os.hostname() clobber each
|
|
// other's row — exactly what happens with cloned VMs that share /etc/hostname
|
|
// (e.g. two boxes both named "zampp1"). The capture node's DeckLink capability
|
|
// then lands on the wrong row and gets overwritten by the primary's cardless
|
|
// heartbeat, so the recorder UI shows "No SDI devices auto-detected".
|
|
// NODE_NAME (set per-node by onboard-node.sh / the node's .env) overrides
|
|
// os.hostname() so identity is explicit and collision-proof. Falls back to the
|
|
// OS hostname when unset, preserving existing single-host behaviour.
|
|
const NODE_NAME = process.env.NODE_NAME || os.hostname();
|
|
const AGENT_PORT = parseInt(process.env.AGENT_PORT || '7436', 10);
|
|
const HEARTBEAT_MS = parseInt(process.env.HEARTBEAT_MS || '30000', 10);
|
|
const LIVE_DIR = process.env.LIVE_DIR || '/mnt/NVME/MAM/wild-dragon-live';
|
|
// Host path to the checked-out repo (onboard-node.sh clones to /opt/wild-dragon).
|
|
// The driver-install container bind-mounts this so install-driver.sh can read
|
|
// sdk/<vendor>/ and run from deploy/. Overridable for non-standard layouts.
|
|
const REPO_DIR = process.env.REPO_DIR || '/opt/wild-dragon';
|
|
const VERSION = '1.4.0';
|
|
|
|
// Capture-driver vendor allowlist. NOTHING outside this set is ever passed to
|
|
// the host installer — the value is only ever used to pick a script arg, never
|
|
// interpolated into a shell string.
|
|
const DRIVER_VENDORS = ['blackmagic', 'aja', 'deltacast', 'ndi'];
|
|
|
|
// Pick the host's LAN IP. Inside a bridge-mode container,
|
|
// os.networkInterfaces() returns the container's docker-bridge IP (172.x),
|
|
// not the host's LAN address. Two strategies:
|
|
// 1. honour an explicit NODE_IP override (set by onboard-node.sh)
|
|
// 2. filter out interfaces that obviously belong to container/virtual
|
|
// bridges and down-rank the docker bridge range so real LAN IPs win
|
|
//
|
|
// Combine with `network_mode: host` in docker-compose.worker.yml for the
|
|
// most reliable result — that lets os.networkInterfaces() see the host's
|
|
// real adapters directly.
|
|
function getIp() {
|
|
if (process.env.NODE_IP) return process.env.NODE_IP;
|
|
|
|
const SKIP_IFACE = /^(lo|docker\d*|br-|veth|cni|flannel|cali|tun|tap|virbr|kube)/i;
|
|
const isContainerBridge = (ip) => /^172\.(1[6-9]|2\d|3[01])\./.test(ip);
|
|
|
|
const candidates = [];
|
|
for (const [name, addrs] of Object.entries(os.networkInterfaces())) {
|
|
if (SKIP_IFACE.test(name)) continue;
|
|
for (const a of (addrs || [])) {
|
|
if (a.family !== 'IPv4' || a.internal) continue;
|
|
candidates.push({ name, address: a.address, rank: isContainerBridge(a.address) ? 1 : 0 });
|
|
}
|
|
}
|
|
candidates.sort((a, b) => a.rank - b.rank);
|
|
return candidates.length ? candidates[0].address : null;
|
|
}
|
|
|
|
// ── Docker API helper (talks to local /var/run/docker.sock) ───────────────
|
|
function dockerApi(method, path, body = null) {
|
|
return new Promise((resolve, reject) => {
|
|
const options = {
|
|
socketPath: '/var/run/docker.sock',
|
|
path: `/v1.43${path}`,
|
|
method,
|
|
headers: { 'Content-Type': 'application/json' },
|
|
};
|
|
const req = http.request(options, (res) => {
|
|
let data = '';
|
|
res.on('data', chunk => data += chunk);
|
|
res.on('end', () => {
|
|
try { resolve({ status: res.statusCode, data: data ? JSON.parse(data) : {} }); }
|
|
catch { resolve({ status: res.statusCode, data }); }
|
|
});
|
|
});
|
|
req.on('error', reject);
|
|
if (body) req.write(JSON.stringify(body));
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
function readBody(req) {
|
|
return new Promise((resolve, reject) => {
|
|
let data = '';
|
|
req.on('data', chunk => data += chunk);
|
|
req.on('end', () => {
|
|
try { resolve(JSON.parse(data || '{}')); }
|
|
catch { reject(new Error('Invalid JSON')); }
|
|
});
|
|
req.on('error', reject);
|
|
});
|
|
}
|
|
|
|
function jsonResponse(res, status, body) {
|
|
res.writeHead(status, { 'Content-Type': 'application/json' });
|
|
res.end(JSON.stringify(body));
|
|
}
|
|
|
|
// ── Sidecar: spawn a capture container on this node ───────────────────────
|
|
|
|
async function handleSidecarStart(body, res) {
|
|
try {
|
|
const {
|
|
image = 'wild-dragon-capture:latest',
|
|
env = [],
|
|
capturePort = 3001,
|
|
sourceType = 'sdi',
|
|
// useGpu: true → attach NVIDIA runtime + NVIDIA_VISIBLE_DEVICES so the
|
|
// sidecar can call hevc_nvenc / h264_nvenc inside capture ffmpeg.
|
|
// Only set this when the recorder codec is GPU-accelerated; CPU codecs
|
|
// (ProRes, DNxHR, libx264) don't need it and it avoids a hard dep on the
|
|
// NVIDIA container runtime on nodes that have no GPU.
|
|
useGpu = false,
|
|
// Issue #167 — optional per-recorder GPU affinity. When set to a GPU
|
|
// UUID (e.g. "GPU-xxxx") or a numeric index, the sidecar is pinned to
|
|
// that single device via NVIDIA_VISIBLE_DEVICES instead of "all". null /
|
|
// undefined keeps the legacy "all" behavior (expose every GPU).
|
|
gpuUuid = null,
|
|
} = body;
|
|
|
|
const binds = [`${LIVE_DIR}:/live`];
|
|
if (sourceType === 'sdi') binds.unshift('/dev/blackmagic:/dev/blackmagic');
|
|
if (sourceType === 'deltacast') {
|
|
// Bind each /dev/deltacast* node that exists on the host into the container.
|
|
// If none exist the capture container falls back to test-card (lavfi) mode.
|
|
try {
|
|
const dcEntries = fs.readdirSync('/dev').filter(n => /^deltacast\d+$/.test(n));
|
|
for (const d of dcEntries) binds.push(`/dev/${d}:/dev/${d}`);
|
|
} catch (_) { /* /dev always exists */ }
|
|
}
|
|
|
|
// Build the sidecar environment, injecting NVIDIA vars when GPU is requested.
|
|
const sidecarEnv = [...env, `PORT=${capturePort}`];
|
|
if (useGpu) {
|
|
// Issue #167 — per-recorder GPU affinity. A gpuUuid (UUID string or
|
|
// numeric index) pins the sidecar to exactly that device; otherwise
|
|
// NVIDIA_VISIBLE_DEVICES=all exposes every GPU on the host (legacy
|
|
// behavior — for a single-GPU node like zampp2 / L4 this equals GPU 0).
|
|
const visibleDevices = (gpuUuid != null && String(gpuUuid).trim() !== '')
|
|
? String(gpuUuid).trim()
|
|
: 'all';
|
|
sidecarEnv.push(`NVIDIA_VISIBLE_DEVICES=${visibleDevices}`);
|
|
sidecarEnv.push('NVIDIA_DRIVER_CAPABILITIES=video,compute,utility');
|
|
}
|
|
|
|
const hostConfig = {
|
|
NetworkMode: 'host',
|
|
Privileged: true,
|
|
Binds: binds,
|
|
};
|
|
if (useGpu) {
|
|
// Tell Docker to use the NVIDIA container runtime for this container.
|
|
// Equivalent to `docker run --gpus all` / `--runtime=nvidia`.
|
|
hostConfig.Runtime = 'nvidia';
|
|
hostConfig.DeviceRequests = [
|
|
{ Driver: 'nvidia', Count: -1, Capabilities: [['gpu']] },
|
|
];
|
|
}
|
|
|
|
const spec = {
|
|
Image: image,
|
|
Env: sidecarEnv,
|
|
HostConfig: hostConfig,
|
|
};
|
|
|
|
const createRes = await dockerApi('POST', '/containers/create', spec);
|
|
if (createRes.status !== 201) {
|
|
return jsonResponse(res, 502, { error: 'Failed to create container', details: createRes.data });
|
|
}
|
|
|
|
const containerId = createRes.data.Id;
|
|
const _u = (env.find(e => e.startsWith('MAM_API_URL=')) || '').slice(12);
|
|
const _tok = env.some(e => e.startsWith('MAM_API_TOKEN=') && e.length > 14);
|
|
console.log(`[sidecar-start] ${containerId} image=${image} src=${sourceType} MAM_API_URL=${_u} token=${_tok}`);
|
|
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
|
|
if (startRes.status !== 204) {
|
|
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
|
return jsonResponse(res, 502, { error: 'Failed to start container', details: startRes.data });
|
|
}
|
|
|
|
jsonResponse(res, 201, { containerId, capturePort });
|
|
} catch (err) {
|
|
jsonResponse(res, 500, { error: err.message });
|
|
}
|
|
}
|
|
|
|
async function fetchContainerLogs(containerId) {
|
|
return await new Promise((resolve) => {
|
|
const options = {
|
|
socketPath: '/var/run/docker.sock',
|
|
path: `/v1.43/containers/${containerId}/logs?stdout=1&stderr=1&tail=200`,
|
|
method: 'GET',
|
|
};
|
|
const req = http.request(options, res => {
|
|
const chunks = [];
|
|
res.on('data', c => chunks.push(c));
|
|
res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8').replace(/[\x00-\x08]/g, '')));
|
|
});
|
|
req.on('error', () => resolve('(log fetch failed)'));
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
async function handleSidecarStop(containerId, res) {
|
|
try {
|
|
console.log(`[sidecar-stop] stopping ${containerId} (grace 180s)...`);
|
|
// Grace period must exceed the capture container's shutdown work
|
|
// (finalise ffmpeg session + register asset via callback). Default
|
|
// docker stop is only 10s, which SIGKILLs capture mid-finalise and
|
|
// loses the POST /assets callback -> asset stuck 'live', no jobs.
|
|
await dockerApi('POST', `/containers/${containerId}/stop?t=180`).catch(() => {});
|
|
// Dump the capture container's shutdown logs into our persistent log
|
|
// BEFORE removing it, so failed callbacks are diagnosable.
|
|
const logs = await fetchContainerLogs(containerId);
|
|
console.log(`[sidecar-stop] ==== capture logs for ${containerId} ====\n${logs}\n[sidecar-stop] ==== end logs ====`);
|
|
// Container has now exited gracefully (or hit the 180s cap); remove it.
|
|
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
|
jsonResponse(res, 200, { ok: true });
|
|
} catch (err) {
|
|
console.error(`[sidecar-stop] error: ${err.message}`);
|
|
jsonResponse(res, 500, { error: err.message });
|
|
}
|
|
}
|
|
|
|
async function handleSidecarStatus(containerId, res) {
|
|
try {
|
|
const inspectRes = await dockerApi('GET', `/containers/${containerId}/json`);
|
|
if (inspectRes.status !== 200) {
|
|
return jsonResponse(res, 200, { running: false });
|
|
}
|
|
|
|
const container = inspectRes.data;
|
|
const running = container.State?.Running || false;
|
|
const startedAt = container.State?.StartedAt;
|
|
|
|
// Derive capturePort from the container's env so we know where to poll.
|
|
const portEnv = (container.Config?.Env || []).find(e => e.startsWith('PORT='));
|
|
const capturePort = portEnv ? parseInt(portEnv.split('=')[1], 10) : 3001;
|
|
|
|
let live = null;
|
|
if (running) {
|
|
try {
|
|
const captureRes = await fetch(`http://127.0.0.1:${capturePort}/capture/status`, {
|
|
signal: AbortSignal.timeout(2000),
|
|
});
|
|
if (captureRes.ok) live = await captureRes.json();
|
|
} catch (_) {}
|
|
}
|
|
|
|
jsonResponse(res, 200, { running, startedAt, live });
|
|
} catch (err) {
|
|
jsonResponse(res, 500, { error: err.message });
|
|
}
|
|
}
|
|
|
|
// ── Agent auth ────────────────────────────────────────────────────────────
|
|
// When NODE_TOKEN is configured, privileged control endpoints (driver install)
|
|
// require a matching `Authorization: Bearer <NODE_TOKEN>`. mam-api forwards the
|
|
// node's stored token. If NODE_TOKEN is empty (dev), auth is not enforced.
|
|
function checkAgentAuth(req) {
|
|
if (!NODE_TOKEN) return true;
|
|
const hdr = req.headers['authorization'] || '';
|
|
const m = /^Bearer\s+(.+)$/i.exec(hdr);
|
|
return !!m && m[1] === NODE_TOKEN;
|
|
}
|
|
|
|
// ── Driver/SDK install ────────────────────────────────────────────────────
|
|
// Probe host presence of each capture-driver vendor. Mirrors the detection the
|
|
// install script uses, so the UI can show "installed / not installed" without
|
|
// running the installer. Best-effort: every probe is guarded.
|
|
function probeDriverStatus() {
|
|
const out = {};
|
|
|
|
// blackmagic — kernel module + /dev/blackmagic device tree.
|
|
let bmLoaded = false;
|
|
try { bmLoaded = fs.readFileSync('/proc/modules', 'utf8').split('\n').some(l => /^blackmagic\b/.test(l)); } catch (_) {}
|
|
let bmDev = false;
|
|
try { bmDev = fs.existsSync('/dev/blackmagic') && fs.readdirSync('/dev/blackmagic').length > 0; } catch (_) {}
|
|
out.blackmagic = { installed: bmLoaded || bmDev, module_loaded: bmLoaded, device_present: bmDev };
|
|
|
|
// aja — ajantv2 kernel module.
|
|
let ajaLoaded = false;
|
|
try { ajaLoaded = fs.readFileSync('/proc/modules', 'utf8').split('\n').some(l => /ajantv2/.test(l)); } catch (_) {}
|
|
out.aja = { installed: ajaLoaded, module_loaded: ajaLoaded };
|
|
|
|
// deltacast — videomaster module or /dev/deltacast* node.
|
|
let dcLoaded = false;
|
|
try { dcLoaded = fs.readFileSync('/proc/modules', 'utf8').split('\n').some(l => /videomaster/.test(l)); } catch (_) {}
|
|
let dcDev = false;
|
|
try { dcDev = fs.readdirSync('/dev').some(n => /^deltacast\d+$/.test(n)); } catch (_) {}
|
|
out.deltacast = { installed: dcLoaded || dcDev, module_loaded: dcLoaded, device_present: dcDev };
|
|
|
|
// ndi — user-space libs only. Look in the install target + common lib dirs.
|
|
let ndiPresent = false;
|
|
try {
|
|
for (const dir of ['/opt/ndi-lib', '/usr/local/lib', '/usr/lib/x86_64-linux-gnu']) {
|
|
let entries = [];
|
|
try { entries = fs.readdirSync(dir); } catch (_) { continue; }
|
|
if (entries.some(n => /^libndi\.so/.test(n))) { ndiPresent = true; break; }
|
|
}
|
|
} catch (_) {}
|
|
out.ndi = { installed: ndiPresent, libs_present: ndiPresent };
|
|
|
|
return out;
|
|
}
|
|
|
|
async function handleDriverStatus(res) {
|
|
try {
|
|
jsonResponse(res, 200, { kernel: os.release(), vendors: probeDriverStatus() });
|
|
} catch (err) {
|
|
jsonResponse(res, 500, { error: err.message });
|
|
}
|
|
}
|
|
|
|
// Run install-driver.sh <vendor> inside a one-shot PRIVILEGED ubuntu container.
|
|
// The repo is bind-mounted read-only at /repo; host kernel paths are mounted so
|
|
// dkms/modprobe/ldconfig affect the host. Logs are streamed back to the caller.
|
|
async function handleDriverInstall(body, res) {
|
|
const vendor = String(body?.vendor || '').toLowerCase();
|
|
if (!DRIVER_VENDORS.includes(vendor)) {
|
|
return jsonResponse(res, 400, { error: `Invalid vendor (allowed: ${DRIVER_VENDORS.join(', ')})` });
|
|
}
|
|
|
|
let containerId;
|
|
try {
|
|
// Host paths the installer needs to reach the host kernel:
|
|
// /lib/modules,/usr/src,/boot → DKMS / module build + install
|
|
// /dev → device-node visibility + udev
|
|
// The repo (sdk/<vendor>/ + deploy/install-driver.sh) is mounted read-only.
|
|
const binds = [
|
|
`${REPO_DIR}:/repo:ro`,
|
|
'/lib/modules:/lib/modules',
|
|
'/usr/src:/usr/src',
|
|
'/boot:/boot',
|
|
'/dev:/dev',
|
|
// NDI install target lives under /opt; expose host /opt so libs land on host.
|
|
'/opt:/opt',
|
|
];
|
|
|
|
const spec = {
|
|
Image: 'ubuntu:22.04',
|
|
// NOTE: vendor is a value from DRIVER_VENDORS only — never arbitrary input.
|
|
// Passed as a distinct argv element (Cmd array), not a shell string.
|
|
Cmd: ['bash', '/repo/deploy/install-driver.sh', vendor],
|
|
Env: [`REPO_DIR=/repo`],
|
|
WorkingDir: '/repo',
|
|
HostConfig: {
|
|
Privileged: true,
|
|
NetworkMode: 'host',
|
|
Binds: binds,
|
|
AutoRemove: false,
|
|
},
|
|
};
|
|
|
|
const createRes = await dockerApi('POST', '/containers/create', spec);
|
|
if (createRes.status !== 201) {
|
|
return jsonResponse(res, 502, { error: 'Failed to create install container', details: createRes.data });
|
|
}
|
|
containerId = createRes.data.Id;
|
|
console.log(`[driver-install] ${containerId} vendor=${vendor}`);
|
|
|
|
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
|
|
if (startRes.status !== 204) {
|
|
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
|
return jsonResponse(res, 502, { error: 'Failed to start install container', details: startRes.data });
|
|
}
|
|
|
|
// Wait for the install to finish (DKMS builds can take a minute+).
|
|
let exitCode = null;
|
|
for (let i = 0; i < 600; i++) {
|
|
await new Promise(r => setTimeout(r, 1000));
|
|
const inspect = await dockerApi('GET', `/containers/${containerId}/json`);
|
|
const state = inspect.data?.State;
|
|
if (state && !state.Running) { exitCode = state.ExitCode; break; }
|
|
}
|
|
|
|
const logs = await fetchContainerLogs(containerId);
|
|
const rebootRequired = /REBOOT_REQUIRED=1/.test(logs);
|
|
const ok = exitCode === 0;
|
|
jsonResponse(res, ok ? 200 : 500, {
|
|
ok,
|
|
vendor,
|
|
exitCode,
|
|
rebootRequired,
|
|
logs,
|
|
status: probeDriverStatus()[vendor] || null,
|
|
});
|
|
} catch (err) {
|
|
jsonResponse(res, 500, { error: err.message });
|
|
} finally {
|
|
if (containerId) {
|
|
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── CPU sampling (500ms window) ───────────────────────────────────────────
|
|
function sampleCpu() {
|
|
return new Promise(resolve => {
|
|
const s1 = os.cpus();
|
|
setTimeout(() => {
|
|
const s2 = os.cpus();
|
|
let idle = 0, total = 0;
|
|
s2.forEach((cpu, i) => {
|
|
for (const t of Object.keys(cpu.times)) {
|
|
const d = cpu.times[t] - (s1[i]?.times[t] ?? 0);
|
|
total += d;
|
|
if (t === 'idle') idle += d;
|
|
}
|
|
});
|
|
resolve(total > 0 ? Math.round((1 - idle / total) * 10000) / 100 : 0);
|
|
}, 500);
|
|
});
|
|
}
|
|
|
|
|
|
// -- Live GPU / NVENC encode telemetry sampling -----------------------------
|
|
// Spawns a short-lived nvidia container via Docker API on each heartbeat call.
|
|
// Returns array of { index, util_pct, enc_util_pct, mem_used_mb, mem_total_mb,
|
|
// nvenc_sessions } per GPU, or [] if no GPUs / nvidia runtime unavailable.
|
|
//
|
|
// Two nvidia-smi queries are run inside one container via `sh -c`, each guarded
|
|
// with `|| true` so a query unsupported on a given driver/GPU (e.g. older cards
|
|
// that don't expose utilization.encoder) doesn't abort the whole sample:
|
|
// 1. --query-gpu → per-GPU gpu/encoder util + memory
|
|
// 2. --query-compute-apps → pid,used_memory,gpu_uuid for live processes; we
|
|
// count rows per GPU as an NVENC/compute "session" approximation. Marked
|
|
// with a SEP line so the two CSV blocks can be told apart in the log.
|
|
async function sampleGpuUtil() {
|
|
if (!_gpuCache || _gpuCache.length === 0) return [];
|
|
|
|
const GPU_QUERY = '--query-gpu=index,utilization.gpu,utilization.encoder,memory.used,memory.total';
|
|
const APP_QUERY = '--query-compute-apps=gpu_uuid,pid,used_memory';
|
|
const FMT = '--format=csv,noheader,nounits';
|
|
// Map GPU index → uuid so compute-app rows (keyed by uuid) attach to a GPU.
|
|
const UUID_QUERY = '--query-gpu=index,uuid';
|
|
const SCRIPT = [
|
|
`nvidia-smi ${GPU_QUERY} ${FMT} || true`,
|
|
`echo '---SEP-APPS---'`,
|
|
`nvidia-smi ${APP_QUERY} ${FMT} 2>/dev/null || true`,
|
|
`echo '---SEP-UUID---'`,
|
|
`nvidia-smi ${UUID_QUERY} ${FMT} 2>/dev/null || true`,
|
|
].join('; ');
|
|
|
|
let containerId;
|
|
try {
|
|
const createRes = await dockerApi('POST', '/containers/create', {
|
|
Image: 'ubuntu:22.04',
|
|
Cmd: ['sh', '-c', SCRIPT],
|
|
HostConfig: {
|
|
AutoRemove: false,
|
|
Runtime: 'nvidia',
|
|
DeviceRequests: [{ Driver: 'nvidia', Count: -1, Capabilities: [['gpu']] }],
|
|
},
|
|
});
|
|
if (createRes.status !== 201) return [];
|
|
containerId = createRes.data.Id;
|
|
|
|
await dockerApi('POST', `/containers/${containerId}/start`);
|
|
|
|
for (let i = 0; i < 10; i++) {
|
|
await new Promise(r => setTimeout(r, 400));
|
|
const inspect = await dockerApi('GET', `/containers/${containerId}/json`);
|
|
if (!inspect.data?.State?.Running) break;
|
|
}
|
|
|
|
const logRes = await new Promise((resolve, reject) => {
|
|
const options = {
|
|
socketPath: '/var/run/docker.sock',
|
|
path: `/v1.43/containers/${containerId}/logs?stdout=1&stderr=0`,
|
|
method: 'GET',
|
|
};
|
|
const req = http.request(options, res => {
|
|
const chunks = [];
|
|
res.on('data', c => chunks.push(c));
|
|
res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
|
|
});
|
|
req.on('error', reject);
|
|
req.end();
|
|
});
|
|
|
|
const text = logRes.replace(/[\x00-\x07].{7}/g, '').trim();
|
|
const [gpuBlock = '', appBlock = '', uuidBlock = ''] =
|
|
text.split(/---SEP-(?:APPS|UUID)---/);
|
|
|
|
// uuid → index map (for attributing compute-app rows to a GPU)
|
|
const uuidToIndex = {};
|
|
uuidBlock.split('\n').forEach(l => {
|
|
const m = l.trim().match(/^(\d+)\s*,\s*(GPU-[0-9a-fA-F-]+)/);
|
|
if (m) uuidToIndex[m[2]] = parseInt(m[1], 10);
|
|
});
|
|
|
|
// NVENC/compute session count per GPU index (best-effort).
|
|
const sessionsByIndex = {};
|
|
appBlock.split('\n').forEach(l => {
|
|
const parts = l.split(',').map(s => s.trim());
|
|
const uuid = parts[0];
|
|
if (!uuid || !uuid.startsWith('GPU-')) return;
|
|
const idx = uuidToIndex[uuid];
|
|
if (idx == null) return;
|
|
sessionsByIndex[idx] = (sessionsByIndex[idx] || 0) + 1;
|
|
});
|
|
|
|
const lines = gpuBlock.split('\n').filter(l => /^\s*\d+\s*,/.test(l));
|
|
|
|
return lines.map(line => {
|
|
// utilization.encoder may report "[N/A]" on cards/drivers that don't
|
|
// expose it — parseInt yields NaN there, which we coerce to null.
|
|
const cols = line.split(',').map(s => s.trim());
|
|
const idx = parseInt(cols[0], 10);
|
|
const util = parseInt(cols[1], 10);
|
|
const encUtil = parseInt(cols[2], 10);
|
|
const memUsed = parseInt(cols[3], 10);
|
|
const memTotal = parseInt(cols[4], 10);
|
|
return {
|
|
index: idx,
|
|
util_pct: Number.isNaN(util) ? null : util,
|
|
enc_util_pct: Number.isNaN(encUtil) ? null : encUtil,
|
|
mem_used_mb: Number.isNaN(memUsed) ? null : memUsed,
|
|
mem_total_mb: Number.isNaN(memTotal) ? null : memTotal,
|
|
nvenc_sessions: sessionsByIndex[idx] || 0,
|
|
};
|
|
});
|
|
} catch (err) {
|
|
console.warn('[gpu-util] sampling failed:', err.message);
|
|
return [];
|
|
} finally {
|
|
if (containerId) {
|
|
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── Hardware detection ────────────────────────────────────────────────────
|
|
// GPU_COUNT / BMD_COUNT env vars override filesystem detection when /dev isn't mapped
|
|
// Cached GPU info from nvidia-smi — populated once at startup via Docker API.
|
|
// null = not yet probed; [] = probed but no GPUs or no runtime.
|
|
let _gpuCache = null;
|
|
|
|
async function probeGpusViaSmi() {
|
|
// Use Docker API (socket) to run nvidia-smi inside a GPU-enabled container.
|
|
// The NVIDIA Container Runtime injects nvidia-smi + libs into ubuntu:22.04.
|
|
// This sidesteps the Alpine/glibc incompatibility in the node-agent image.
|
|
const QUERY = '--query-gpu=index,name,memory.total,driver_version';
|
|
const FMT = '--format=csv,noheader,nounits';
|
|
|
|
let containerId;
|
|
try {
|
|
// Create container with nvidia runtime and GPU access
|
|
const createRes = await dockerApi('POST', '/containers/create', {
|
|
Image: 'ubuntu:22.04',
|
|
Cmd: ['nvidia-smi', QUERY, FMT],
|
|
HostConfig: {
|
|
AutoRemove: false,
|
|
Runtime: 'nvidia',
|
|
DeviceRequests: [{ Driver: 'nvidia', Count: -1, Capabilities: [['gpu']] }],
|
|
},
|
|
});
|
|
if (createRes.status !== 201) return;
|
|
containerId = createRes.data.Id;
|
|
|
|
await dockerApi('POST', `/containers/${containerId}/start`);
|
|
|
|
// Wait for it to finish (poll status)
|
|
for (let i = 0; i < 20; i++) {
|
|
await new Promise(r => setTimeout(r, 500));
|
|
const inspect = await dockerApi('GET', `/containers/${containerId}/json`);
|
|
if (!inspect.data?.State?.Running) break;
|
|
}
|
|
|
|
// Grab stdout logs
|
|
const logRes = await new Promise((resolve, reject) => {
|
|
const options = {
|
|
socketPath: '/var/run/docker.sock',
|
|
path: `/v1.43/containers/${containerId}/logs?stdout=1&stderr=0`,
|
|
method: 'GET',
|
|
};
|
|
const req = http.request(options, res => {
|
|
const chunks = [];
|
|
res.on('data', c => chunks.push(c));
|
|
res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
|
|
});
|
|
req.on('error', reject);
|
|
req.end();
|
|
});
|
|
|
|
// Docker log frames have an 8-byte header — strip them
|
|
const text = logRes.replace(/[\x00-\x07].{7}/g, '').trim();
|
|
const lines = text.split('\n').filter(l => /^\d+,/.test(l.trim()));
|
|
|
|
if (lines.length) {
|
|
_gpuCache = lines.map(line => {
|
|
const [idx, name, memMiB, driver] = line.split(',').map(s => s.trim());
|
|
return {
|
|
index: parseInt(idx, 10),
|
|
name: name,
|
|
memory_mb: parseInt(memMiB, 10),
|
|
driver: driver,
|
|
device: `/dev/nvidia${idx}`,
|
|
type: 'nvidia',
|
|
};
|
|
});
|
|
console.log(`[gpu] detected ${_gpuCache.length} GPU(s) via nvidia-smi: ${_gpuCache.map(g => g.name).join(', ')}`);
|
|
} else {
|
|
_gpuCache = [];
|
|
}
|
|
} catch (err) {
|
|
console.warn('[gpu] nvidia-smi probe failed:', err.message);
|
|
_gpuCache = [];
|
|
} finally {
|
|
if (containerId) {
|
|
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
|
}
|
|
}
|
|
}
|
|
|
|
function detectHardware() {
|
|
const capabilities = { gpus: [], blackmagic: [], deltacast: [] };
|
|
|
|
// Issue #108 — previously GPU_COUNT short-circuited the entire detection
|
|
// path, throwing away the nvidia-smi enrichment (model, memory, driver
|
|
// version). Now: override sets the *count*, but if nvidia-smi successfully
|
|
// probed at startup we keep its rich entries for the overridden indexes.
|
|
const gpuOverride = parseInt(process.env.GPU_COUNT || '-1', 10);
|
|
if (gpuOverride >= 0) {
|
|
for (let i = 0; i < gpuOverride; i++) {
|
|
const enriched = (_gpuCache || []).find(g => g.index === i);
|
|
capabilities.gpus.push(enriched || { device: `/dev/nvidia${i}`, type: 'nvidia', index: i });
|
|
}
|
|
} else if (_gpuCache !== null && _gpuCache.length > 0) {
|
|
capabilities.gpus = _gpuCache;
|
|
} else {
|
|
for (let i = 0; i < 16; i++) {
|
|
try {
|
|
fs.accessSync(`/dev/nvidia${i}`, fs.constants.F_OK);
|
|
capabilities.gpus.push({ device: `/dev/nvidia${i}`, type: 'nvidia', index: i });
|
|
} catch (_) { break; }
|
|
}
|
|
}
|
|
|
|
// Issue #109 — DeckLink device naming differs by model: capture-only cards
|
|
// expose /dev/blackmagic/dv${i}, while DeckLink IO / Quad cards expose
|
|
// /dev/blackmagic/io${i}. The previous code hardcoded `dv` which broke
|
|
// capability reporting on every IO / Quad node. Detect what's actually
|
|
// present and only synthesize names when /dev/blackmagic isn't mounted.
|
|
const bmdOverride = parseInt(process.env.BMD_COUNT || '-1', 10);
|
|
try {
|
|
const bmdEntries = fs.readdirSync('/dev/blackmagic');
|
|
// Filesystem wins — devices speak for themselves.
|
|
capabilities.blackmagic = bmdEntries.map((d, i) => ({ device: `/dev/blackmagic/${d}`, index: i }));
|
|
} catch (_) {
|
|
// No /dev/blackmagic mount — fall back to the BMD_COUNT override.
|
|
if (bmdOverride >= 0) {
|
|
const namePrefix = (process.env.BMD_DEVICE_PREFIX || 'dv').replace(/[^a-z]/gi, '');
|
|
for (let i = 0; i < bmdOverride; i++) {
|
|
capabilities.blackmagic.push({ device: `/dev/blackmagic/${namePrefix}${i}`, index: i });
|
|
}
|
|
}
|
|
}
|
|
|
|
// Best-effort model name from BMD_MODEL env (set manually) — used by the UI
|
|
// to render the correct card layout (Duo 2, Quad 2, Mini Recorder, ...).
|
|
if (process.env.BMD_MODEL) capabilities.blackmagic_model = process.env.BMD_MODEL;
|
|
|
|
// Deltacast SDI cards — enumerate /dev/deltacast* device nodes.
|
|
// DELTACAST_PORT_COUNT env overrides when devices aren't mapped (test/dev mode).
|
|
const dcOverride = parseInt(process.env.DELTACAST_PORT_COUNT || '-1', 10);
|
|
try {
|
|
const dcEntries = fs.readdirSync('/dev').filter(n => /^deltacast\d+$/.test(n)).sort();
|
|
if (dcEntries.length > 0) {
|
|
capabilities.deltacast = dcEntries.map((d, i) => ({
|
|
device: `/dev/${d}`,
|
|
index: i,
|
|
present: true,
|
|
}));
|
|
} else if (dcOverride >= 0) {
|
|
// No device nodes but count is configured — test-card mode.
|
|
for (let i = 0; i < dcOverride; i++) {
|
|
capabilities.deltacast.push({ device: `/dev/deltacast${i}`, index: i, present: false });
|
|
}
|
|
}
|
|
} catch (_) {
|
|
if (dcOverride >= 0) {
|
|
for (let i = 0; i < dcOverride; i++) {
|
|
capabilities.deltacast.push({ device: `/dev/deltacast${i}`, index: i, present: false });
|
|
}
|
|
}
|
|
}
|
|
if (process.env.DELTACAST_MODEL) capabilities.deltacast_model = process.env.DELTACAST_MODEL;
|
|
|
|
return capabilities;
|
|
}
|
|
|
|
// ── Heartbeat ─────────────────────────────────────────────────────────────
|
|
async function heartbeat() {
|
|
const cpu_usage = await sampleCpu();
|
|
const gpu_util = await sampleGpuUtil();
|
|
const totalMem = os.totalmem();
|
|
const freeMem = os.freemem();
|
|
const ip_address = getIp();
|
|
const capabilities = detectHardware();
|
|
|
|
// Issue #166 — fold live NVENC/GPU encode telemetry into capabilities.gpus so
|
|
// the Cluster screen (which reads cluster_nodes.capabilities.gpus) can render
|
|
// per-GPU util / encoder util / NVENC sessions alongside the static name+VRAM.
|
|
// gpu_util is also sent verbatim below for any consumer reading metrics.gpus.
|
|
if (Array.isArray(capabilities.gpus) && gpu_util.length) {
|
|
capabilities.gpus = capabilities.gpus.map(g => {
|
|
const live = gpu_util.find(u => u.index === g.index);
|
|
if (!live) return g;
|
|
return {
|
|
...g,
|
|
util_pct: live.util_pct,
|
|
enc_util_pct: live.enc_util_pct,
|
|
mem_used_mb: live.mem_used_mb,
|
|
mem_total_mb: live.mem_total_mb ?? g.memory_mb ?? null,
|
|
nvenc_sessions: live.nvenc_sessions,
|
|
};
|
|
});
|
|
}
|
|
|
|
const payload = {
|
|
hostname: NODE_NAME,
|
|
ip_address,
|
|
role: NODE_ROLE,
|
|
version: VERSION,
|
|
api_url: `http://${ip_address || NODE_NAME}:${AGENT_PORT}`,
|
|
cpu_usage,
|
|
mem_used_mb: Math.round((totalMem - freeMem) / 1048576),
|
|
mem_total_mb: Math.round(totalMem / 1048576),
|
|
capabilities,
|
|
gpu_util,
|
|
};
|
|
|
|
const headers = { 'Content-Type': 'application/json' };
|
|
if (NODE_TOKEN) headers['Authorization'] = `Bearer ${NODE_TOKEN}`;
|
|
|
|
try {
|
|
const res = await fetch(`${MAM_API_URL}/api/v1/cluster/heartbeat`, {
|
|
method: 'POST',
|
|
headers,
|
|
body: JSON.stringify(payload),
|
|
signal: AbortSignal.timeout(8000),
|
|
});
|
|
if (res.ok) {
|
|
const gpuStr = capabilities.gpus.length ? ` gpu=${capabilities.gpus.length}` : '';
|
|
const bmdStr = capabilities.blackmagic.length ? ` bmd=${capabilities.blackmagic.length}` : '';
|
|
const dcStr = capabilities.deltacast.length ? ` dc=${capabilities.deltacast.length}` : '';
|
|
process.stdout.write(
|
|
`[hb] ${payload.hostname} ip=${ip_address || '?'} cpu=${cpu_usage}% ` +
|
|
`mem=${payload.mem_used_mb}/${payload.mem_total_mb}MB${gpuStr}${bmdStr}${dcStr}\n`
|
|
);
|
|
} else {
|
|
const txt = await res.text().catch(() => '');
|
|
console.error(`[hb] ${res.status} ${txt.slice(0, 120)}`);
|
|
}
|
|
} catch (err) {
|
|
console.error(`[hb] failed — ${err.message}`);
|
|
}
|
|
}
|
|
|
|
// Probe GPU info once at startup (async, before first heartbeat).
|
|
// _gpuCache is populated here; heartbeats use the cached value.
|
|
probeGpusViaSmi().then(() => {
|
|
heartbeat();
|
|
setInterval(heartbeat, HEARTBEAT_MS);
|
|
});
|
|
|
|
// Serve the local HLS live-preview files (written by the capture sidecar to
|
|
// LIVE_DIR) so the primary can reverse-proxy them to the browser. Read-only.
|
|
function serveLiveFile(pathname, res) {
|
|
const rel = decodeURIComponent(pathname.slice('/live/'.length));
|
|
if (!rel || rel.includes('..') || rel.startsWith('/')) { res.writeHead(403); return res.end(); }
|
|
const filePath = LIVE_DIR + '/' + rel;
|
|
fs.readFile(filePath, (err, data) => {
|
|
if (err) { res.writeHead(404); return res.end(); }
|
|
const ct = filePath.endsWith('.m3u8') ? 'application/vnd.apple.mpegurl'
|
|
: filePath.endsWith('.ts') ? 'video/mp2t'
|
|
: 'application/octet-stream';
|
|
res.writeHead(200, { 'Content-Type': ct, 'Cache-Control': 'no-cache', 'Access-Control-Allow-Origin': '*' });
|
|
res.end(data);
|
|
});
|
|
}
|
|
|
|
// ── HTTP server ───────────────────────────────────────────────────────────
|
|
const server = http.createServer((req, res) => {
|
|
const { pathname } = new URL(req.url, 'http://localhost');
|
|
|
|
if (req.method === 'GET' && pathname === '/health') {
|
|
res.writeHead(200, { 'Content-Type': 'application/json' });
|
|
res.end(JSON.stringify({
|
|
ok: true,
|
|
hostname: os.hostname(),
|
|
uptime: Math.round(process.uptime()),
|
|
version: VERSION,
|
|
role: NODE_ROLE,
|
|
ip: getIp(),
|
|
}));
|
|
|
|
} else if (req.method === 'POST' && pathname === '/sidecar/start') {
|
|
readBody(req)
|
|
.then(body => handleSidecarStart(body, res))
|
|
.catch(() => jsonResponse(res, 400, { error: 'Invalid request body' }));
|
|
|
|
} else if (req.method === 'DELETE' && pathname.startsWith('/sidecar/')) {
|
|
const id = pathname.slice('/sidecar/'.length);
|
|
if (!id || id.includes('/')) { res.writeHead(404); return res.end(); }
|
|
handleSidecarStop(id, res);
|
|
|
|
} else if (req.method === 'GET' && /^\/sidecar\/[^/]+\/status$/.test(pathname)) {
|
|
const id = pathname.slice('/sidecar/'.length, -'/status'.length);
|
|
handleSidecarStatus(id, res);
|
|
|
|
} else if (req.method === 'GET' && pathname === '/driver/status') {
|
|
if (!checkAgentAuth(req)) return jsonResponse(res, 401, { error: 'Unauthorized' });
|
|
handleDriverStatus(res);
|
|
|
|
} else if (req.method === 'POST' && pathname === '/driver/install') {
|
|
if (!checkAgentAuth(req)) return jsonResponse(res, 401, { error: 'Unauthorized' });
|
|
readBody(req)
|
|
.then(body => handleDriverInstall(body, res))
|
|
.catch(() => jsonResponse(res, 400, { error: 'Invalid request body' }));
|
|
|
|
} else if (req.method === 'GET' && pathname.startsWith('/live/')) {
|
|
serveLiveFile(pathname, res);
|
|
|
|
} else {
|
|
res.writeHead(404);
|
|
res.end();
|
|
}
|
|
});
|
|
|
|
server.listen(AGENT_PORT, () => {
|
|
console.log(`wild-dragon-node-agent v${VERSION}`);
|
|
console.log(` Listening :${AGENT_PORT}`);
|
|
console.log(` Host IP ${getIp() || '(unresolved)'}`);
|
|
console.log(` Primary ${MAM_API_URL}`);
|
|
console.log(` Role ${NODE_ROLE}`);
|
|
console.log(` Heartbeat every ${HEARTBEAT_MS / 1000}s`);
|
|
});
|