dragonflight/services/mam-api/src/routes/cluster.js
Zac Gaetano 43011bd794 Merge feat/playout-mcr into main
Playout/MCR, as-run log, redesigned dashboard, capture CIFS/growing-files,
SDI settings, cluster Add Node wizard, homepage refresh.

# Conflicts:
#	services/mam-api/src/routes/cluster.js
#	services/mam-api/src/routes/playout.js
#	services/mam-api/src/scheduler.js
#	services/playout/Dockerfile
#	services/playout/entrypoint.sh
#	services/web-ui/public/screens-home.jsx
2026-05-31 17:46:12 -04:00

396 lines
16 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import express from 'express';
import http from 'http';
import pool from '../db/pool.js';
import { requireAdmin } from '../middleware/auth.js';
const router = express.Router();
// GET /onboard-info admin-only. Supplies the Add Node wizard with the bits it
// needs to build a `curl … | bash` onboarding command: the primary API URL the
// remote node-agent should heartbeat to, the raw URL of onboard-node.sh, and
// the deploy branch. apiUrl is a best guess the UI lets the operator edit.
router.get('/onboard-info', requireAdmin, (req, res) => {
const branch = process.env.DEPLOY_BRANCH || 'main';
const apiUrl = process.env.PUBLIC_API_URL
|| `${req.protocol}://${req.hostname}:${process.env.API_PORT || 47432}`;
const scriptUrl =
`https://forge.wilddragon.net/zgaetano/wild-dragon/raw/branch/${branch}/deploy/onboard-node.sh`;
res.json({ apiUrl, scriptUrl, branch });
});
// If the agent reported Docker's default bridge IP (172.17.x) but the request
// itself came from a real LAN address, prefer the request source IP instead.
// We only check 172.17.x — the default docker0 bridge — not the full RFC1918
// 172.16/12 block, since real LANs (e.g. 172.18.91.x) fall in that range.
function pickIp(reportedIp, reqIp) {
const clean = (s) => (s || '').replace(/^::ffff:/, '');
const isDockerBridge = (ip) => /^172\.17\./.test(ip || '');
const r = clean(reqIp);
if (!reportedIp) return r || null;
if (isDockerBridge(reportedIp) && r && !isDockerBridge(r)) return r;
return reportedIp;
}
function dockerRequest(path, method = 'GET', body = null) {
return new Promise((resolve, reject) => {
const opts = {
socketPath: '/var/run/docker.sock',
path: `/v1.41${path}`,
method,
headers: { 'Accept': 'application/json', 'Content-Type': 'application/json' },
};
const req = http.request(opts, (res) => {
let data = '';
res.on('data', d => { data += d; });
res.on('end', () => {
if (!data.trim()) return resolve(null);
try { resolve(JSON.parse(data)); }
catch (e) { resolve(null); }
});
});
req.on('error', reject);
req.setTimeout(5000, () => { req.destroy(); reject(new Error('Docker socket timeout')); });
if (body) req.write(JSON.stringify(body));
req.end();
});
}
router.get('/', async (req, res, next) => {
try {
const r = await pool.query(
`SELECT *,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes
ORDER BY registered_at ASC`
);
res.json(r.rows.map(row => ({
...row,
online: Number(row.stale_seconds) < 120,
})));
} catch (err) { next(err); }
});
router.get('/containers', async (req, res, next) => {
try {
const containers = await dockerRequest('/containers/json?all=true');
if (!Array.isArray(containers)) return res.json([]);
const out = containers.map(c => {
const rawName = (c.Names[0] || '').replace(/^\//, '');
const name = rawName.replace(/^wild-dragon-/, '').replace(/-\d+$/, '');
const ports = (c.Ports || [])
.filter(p => p.PublicPort)
.map(p => `${p.PublicPort}${p.PrivatePort}`)
.join(', ');
return {
id: c.Id.slice(0, 12),
name,
image: (c.Image || '').replace(/^sha256:/, '').slice(0, 40),
state: c.State,
uptime: (c.Status || '').replace(/\s*\(.*\)/, '').trim(),
healthy: (c.Status || '').includes('healthy'),
ports,
cpu: 0,
mem: 0,
};
});
res.json(out);
} catch (err) {
if (err.code === 'ENOENT' || err.code === 'EACCES') return res.json([]);
next(err);
}
});
router.post('/containers/:nameOrId/restart', async (req, res, next) => {
try {
await dockerRequest(`/containers/${encodeURIComponent(req.params.nameOrId)}/restart`, 'POST');
res.json({ ok: true });
} catch (err) { next(err); }
});
router.post('/heartbeat', async (req, res, next) => {
try {
const {
hostname, ip_address,
role = 'worker', version, api_url,
cpu_usage, mem_used_mb, mem_total_mb,
capabilities, metadata, metrics,
} = req.body;
if (!hostname) return res.status(400).json({ error: 'hostname is required' });
if (process.env.AUTH_ENABLED === 'true') {
const bound = req.tokenBoundHostname;
if (bound && bound !== hostname) {
return res.status(403).json({
error: `Token is bound to "${bound}" but heartbeat reported "${hostname}"`,
});
}
if (!bound && req.user?.role !== 'admin') {
return res.status(403).json({
error: 'Heartbeat requires a node-bound token or admin session',
});
}
}
const effectiveIp = pickIp(ip_address, req.ip || req.socket?.remoteAddress);
const r = await pool.query(
`INSERT INTO cluster_nodes
(hostname, ip_address, role, version, api_url,
cpu_usage, mem_used_mb, mem_total_mb, last_seen, last_seen_at, capabilities, metadata, metrics)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,NOW(),NOW(),$9,$10,$11)
ON CONFLICT (hostname) DO UPDATE SET
ip_address = EXCLUDED.ip_address,
role = EXCLUDED.role,
version = EXCLUDED.version,
api_url = EXCLUDED.api_url,
cpu_usage = EXCLUDED.cpu_usage,
mem_used_mb = EXCLUDED.mem_used_mb,
mem_total_mb = EXCLUDED.mem_total_mb,
last_seen = NOW(),
last_seen_at = NOW(),
capabilities = EXCLUDED.capabilities,
metadata = EXCLUDED.metadata,
metrics = COALESCE(EXCLUDED.metrics, cluster_nodes.metrics)
RETURNING *`,
[
hostname,
effectiveIp,
role,
version || null,
api_url || null,
cpu_usage != null ? cpu_usage : null,
mem_used_mb != null ? mem_used_mb : null,
mem_total_mb != null ? mem_total_mb : null,
capabilities != null ? JSON.stringify(capabilities) : '{}',
metadata != null ? JSON.stringify(metadata) : null,
metrics != null ? JSON.stringify(metrics) : null,
]
);
res.json(r.rows[0]);
} catch (err) { next(err); }
});
router.get('/devices/blackmagic/signal', async (req, res, next) => {
try {
const nodesResult = await pool.query(
`SELECT id, hostname, ip_address, api_url, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes
WHERE capabilities IS NOT NULL`
);
const recResult = await pool.query(
`SELECT id, name, status, container_id, node_id, device_index,
source_config
FROM recorders
WHERE source_type = 'sdi' AND node_id IS NOT NULL`
);
const recByPort = new Map();
for (const r of recResult.rows) {
const devIdx = r.device_index ?? r.source_config?.device ?? 0;
recByPort.set(`${r.node_id}:${devIdx}`, r);
}
const tasks = [];
for (const node of nodesResult.rows) {
const nodeOnline = Number(node.stale_seconds) < 120;
const bm = (node.capabilities && node.capabilities.blackmagic) || [];
const model = (node.capabilities && node.capabilities.blackmagic_model) || null;
const localHostname = process.env.NODE_HOSTNAME || '';
const isRemote = node.api_url && node.hostname !== localHostname;
bm.forEach((d, idx) => {
const portIndex = d.index !== undefined ? d.index : idx;
const rec = recByPort.get(`${node.id}:${portIndex}`);
tasks.push((async () => {
const base = {
node_id: node.id, hostname: node.hostname, index: portIndex,
device: d.device || null, model, node_online: nodeOnline,
recorder_id: rec ? rec.id : null, recorder_name: rec ? rec.name : null,
recorder_status: rec ? rec.status : null,
signal: 'no-recorder', framesReceived: null, currentFps: null,
};
if (!rec || rec.status !== 'recording' || !rec.container_id) {
if (rec && rec.status !== 'recording') base.signal = 'idle';
return base;
}
try {
let live = null;
if (isRemote) {
const r = await fetch(`${node.api_url}/sidecar/${rec.container_id}/status`, { signal: AbortSignal.timeout(2500) });
if (r.ok) live = (await r.json()).live;
} else {
const r = await fetch(`http://recorder-${rec.id}:3001/capture/status`, { signal: AbortSignal.timeout(2000) });
if (r.ok) live = await r.json();
}
if (live && live.signal) {
base.signal = live.signal;
base.framesReceived = live.framesReceived ?? null;
base.currentFps = live.currentFps ?? null;
} else { base.signal = 'connecting'; }
} catch (_) { base.signal = 'connecting'; }
return base;
})());
});
}
const results = await Promise.all(tasks);
res.json(results);
} catch (err) { next(err); }
});
router.get('/devices/blackmagic', async (req, res, next) => {
try {
const r = await pool.query(
`SELECT id, hostname, ip_address, role, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes WHERE capabilities IS NOT NULL`
);
const out = [];
for (const row of r.rows) {
const online = Number(row.stale_seconds) < 120;
const bm = (row.capabilities && row.capabilities.blackmagic) || [];
const model = (row.capabilities && row.capabilities.blackmagic_model) || null;
bm.forEach((d, idx) => {
out.push({ node_id: row.id, hostname: row.hostname, ip_address: row.ip_address,
role: row.role, online, model, index: d.index !== undefined ? d.index : idx, device: d.device });
});
}
res.json(out);
} catch (err) { next(err); }
});
router.get('/devices/deltacast', async (req, res, next) => {
try {
const r = await pool.query(
`SELECT id, hostname, ip_address, role, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes WHERE capabilities IS NOT NULL`
);
const out = [];
for (const row of r.rows) {
const online = Number(row.stale_seconds) < 120;
const dc = (row.capabilities && row.capabilities.deltacast) || [];
const model = (row.capabilities && row.capabilities.deltacast_model) || null;
dc.forEach((d, idx) => {
out.push({ node_id: row.id, hostname: row.hostname, ip_address: row.ip_address,
role: row.role, online, model: model || 'Deltacast',
index: d.index !== undefined ? d.index : idx, device: d.device,
present: d.present !== false, port_count: dc.length });
});
}
res.json(out);
} catch (err) { next(err); }
});
router.get('/devices/deltacast/signal', async (req, res, next) => {
try {
const [nodesRes, recordersRes] = await Promise.all([
pool.query(`SELECT id, hostname, ip_address, api_url, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes WHERE capabilities IS NOT NULL`),
pool.query(`SELECT id, node_id, device_index, status, source_type, container_id
FROM recorders WHERE source_type = 'deltacast'`),
]);
const recByNodePort = {};
for (const rec of recordersRes.rows) {
recByNodePort[`${rec.node_id}:${rec.device_index}`] = rec;
}
const results = [];
const fetchPromises = [];
for (const node of nodesRes.rows) {
const online = Number(node.stale_seconds) < 120;
const dc = (node.capabilities && node.capabilities.deltacast) || [];
const model = (node.capabilities && node.capabilities.deltacast_model) || 'Deltacast';
for (const port of dc) {
const idx = port.index !== undefined ? port.index : dc.indexOf(port);
const rec = recByNodePort[`${node.id}:${idx}`];
const base = { node_id: node.id, hostname: node.hostname, ip_address: node.ip_address,
online, model, index: idx, device: port.device, present: port.present !== false,
recorder_id: rec ? rec.id : null, recorder_status: rec ? rec.status : null,
signal: 'no-recorder', framesReceived: null, currentFps: null };
if (!rec) { results.push(base); continue; }
if (rec.status !== 'recording') { base.signal = 'idle'; results.push(base); continue; }
const fetchIdx = results.length;
results.push(base);
fetchPromises.push((async () => {
try {
const url = node.api_url ? `${node.api_url}/sidecar/${rec.container_id}/status`
: `http://recorder-${rec.id}:3001/capture/status`;
const r = await fetch(url, { signal: AbortSignal.timeout(2500) });
if (r.ok) {
const live = await r.json();
if (live && live.signal) {
results[fetchIdx].signal = live.signal;
results[fetchIdx].framesReceived = live.framesReceived ?? null;
results[fetchIdx].currentFps = live.currentFps ?? null;
}
}
} catch (_) { results[fetchIdx].signal = 'connecting'; }
})());
}
}
await Promise.all(fetchPromises);
res.json(results);
} catch (err) { next(err); }
});
router.get('/:id/ping', async (req, res, next) => {
try {
const r = await pool.query('SELECT id, hostname, api_url FROM cluster_nodes WHERE id = $1', [req.params.id]);
if (r.rowCount === 0) return res.status(404).json({ error: 'Node not found' });
const node = r.rows[0];
if (!node.api_url) return res.json({ reachable: false, reason: 'no api_url registered' });
const start = Date.now();
try {
const upstream = await fetch(`${node.api_url}/health`, { signal: AbortSignal.timeout(4000) });
const latency_ms = Date.now() - start;
const body = await upstream.json().catch(() => ({}));
res.json({ reachable: upstream.ok, latency_ms, status: upstream.status, agent: body });
} catch (err) {
res.json({ reachable: false, latency_ms: Date.now() - start, reason: err.message });
}
} catch (err) { next(err); }
});
router.get('/metrics', async (req, res, next) => {
try {
const r = await pool.query(
`SELECT id, hostname, role, last_seen,
cpu_usage, mem_used_mb, mem_total_mb,
capabilities, metrics,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes ORDER BY registered_at ASC`
);
const nodes = r.rows.map(row => {
const capGpus = (row.capabilities && row.capabilities.gpus) || [];
const liveGpus = (row.metrics && row.metrics.gpus) || [];
const gpus = capGpus.map((g, idx) => {
const live = liveGpus.find(l => l.index === g.index) || liveGpus[idx] || {};
return { name: g.name || null, util_pct: live.util_pct != null ? live.util_pct : null,
memory_used_mb: live.memory_used_mb != null ? live.memory_used_mb : null,
memory_total_mb: g.memory_mb != null ? g.memory_mb : (live.memory_total_mb ?? null) };
});
for (const lg of liveGpus) {
if (!capGpus.some(g => g.index === lg.index)) {
gpus.push({ name: lg.name || null, util_pct: lg.util_pct != null ? lg.util_pct : null,
memory_used_mb: lg.memory_used_mb != null ? lg.memory_used_mb : null,
memory_total_mb: lg.memory_total_mb != null ? lg.memory_total_mb : null });
}
}
return { id: row.id, hostname: row.hostname, role: row.role,
online: Number(row.stale_seconds) < 120, last_seen: row.last_seen,
cpu_util_pct: row.cpu_usage != null ? Number(row.cpu_usage) : null,
ram_used_mb: row.mem_used_mb != null ? row.mem_used_mb : null,
ram_total_mb: row.mem_total_mb != null ? row.mem_total_mb : null, gpus };
});
res.json({ nodes });
} catch (err) { next(err); }
});
router.delete('/:id', async (req, res, next) => {
try {
const r = await pool.query('DELETE FROM cluster_nodes WHERE id = $1 RETURNING id', [req.params.id]);
if (r.rowCount === 0) return res.status(404).json({ error: 'Node not found' });
res.json({ ok: true });
} catch (err) { next(err); }
});
export default router;