dragonflight/services/mam-api/src/routes/cluster.js

662 lines
28 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import express from 'express';
import http from 'http';
import os from 'os';
import pool from '../db/pool.js';
import { requireAdmin } from '../middleware/auth.js';
const router = express.Router();
// Hostname the primary mam-api self-registers as (mirrors selfHeartbeat()).
const SELF_HOSTNAME = process.env.NODE_HOSTNAME || os.hostname();
// Format a process uptime (seconds) the way the Cluster UI expects — a short
// human string like "3d 4h" / "12m". Workers don't report uptime today, so the
// primary is the only row that populates this.
function formatUptime(seconds) {
const s = Math.floor(seconds);
const d = Math.floor(s / 86400);
const h = Math.floor((s % 86400) / 3600);
const m = Math.floor((s % 3600) / 60);
if (d > 0) return `${d}d ${h}h`;
if (h > 0) return `${h}h ${m}m`;
return `${m}m`;
}
// GET /onboard-info admin-only. Supplies the Add Node wizard with the bits it
// needs to build a `curl … | bash` onboarding command: the primary API URL the
// remote node-agent should heartbeat to, the raw URL of onboard-node.sh, and
// the deploy branch. apiUrl is a best guess the UI lets the operator edit.
router.get('/onboard-info', requireAdmin, (req, res) => {
const branch = process.env.DEPLOY_BRANCH || 'main';
const apiUrl = process.env.PUBLIC_API_URL
|| `${req.protocol}://${req.hostname}:${process.env.API_PORT || 47432}`;
const scriptUrl =
`https://forge.wilddragon.net/zgaetano/wild-dragon/raw/branch/${branch}/deploy/onboard-node.sh`;
res.json({ apiUrl, scriptUrl, branch });
});
// If the agent reported Docker's default bridge IP (172.17.x) but the request
// itself came from a real LAN address, prefer the request source IP instead.
// We only check 172.17.x — the default docker0 bridge — not the full RFC1918
// 172.16/12 block, since real LANs (e.g. 172.18.91.x) fall in that range.
function pickIp(reportedIp, reqIp) {
const clean = (s) => (s || '').replace(/^::ffff:/, '');
const isDockerBridge = (ip) => /^172\.17\./.test(ip || '');
const r = clean(reqIp);
// Trust payload ip_address when present and not docker0 bridge
if (reportedIp && !isDockerBridge(reportedIp)) return reportedIp;
// Fall back to req.ip when payload missing or docker0
return r || null;
}
function dockerRequest(path, method = 'GET', body = null) {
return new Promise((resolve, reject) => {
const opts = {
socketPath: '/var/run/docker.sock',
path: `/v1.41${path}`,
method,
headers: { 'Accept': 'application/json', 'Content-Type': 'application/json' },
};
const req = http.request(opts, (res) => {
let data = '';
res.on('data', d => { data += d; });
res.on('end', () => {
if (!data.trim()) return resolve(null);
try { resolve(JSON.parse(data)); }
catch (e) { resolve(null); }
});
});
req.on('error', reject);
req.setTimeout(5000, () => { req.destroy(); reject(new Error('Docker socket timeout')); });
if (body) req.write(JSON.stringify(body));
req.end();
});
}
// Fetch a container's logs via the Docker socket and return PLAIN TEXT. The
// Docker /logs endpoint returns a multiplexed stream (8-byte stdcopy headers
// prefix each chunk for non-TTY containers), NOT JSON — so dockerRequest()'s
// JSON.parse always yielded null ('(no logs)'). Here we collect the raw bytes
// and strip the stdcopy framing so the UI gets readable log lines.
function dockerLogs(containerId, tail = 200) {
return new Promise((resolve, reject) => {
const opts = {
socketPath: '/var/run/docker.sock',
path: `/v1.41/containers/${encodeURIComponent(containerId)}/logs?stdout=1&stderr=1&tail=${tail}&timestamps=1`,
method: 'GET',
};
const req = http.request(opts, (res) => {
const chunks = [];
res.on('data', d => chunks.push(d));
res.on('end', () => {
try {
const buf = Buffer.concat(chunks);
resolve(demuxDockerStream(buf));
} catch (e) { resolve(''); }
});
});
req.on('error', reject);
req.setTimeout(6000, () => { req.destroy(); reject(new Error('Docker socket timeout')); });
req.end();
});
}
// Strip Docker's stdcopy multiplexing headers (8 bytes per frame: [stream type,
// 0,0,0, big-endian uint32 length]). TTY containers send raw text with no
// framing; detect that and pass through. Returns a UTF-8 string.
function demuxDockerStream(buf) {
if (!buf || buf.length === 0) return '';
// Heuristic: a valid stdcopy frame has byte0 in {0,1,2} and bytes 1-3 == 0.
const looksFramed = buf.length >= 8 && buf[0] <= 2 && buf[1] === 0 && buf[2] === 0 && buf[3] === 0;
if (!looksFramed) return buf.toString('utf8');
const out = [];
let off = 0;
while (off + 8 <= buf.length) {
const len = buf.readUInt32BE(off + 4);
off += 8;
if (len <= 0) continue;
out.push(buf.toString('utf8', off, Math.min(off + len, buf.length)));
off += len;
}
return out.join('');
}
router.get('/', async (req, res, next) => {
try {
const r = await pool.query(
`SELECT *,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes
ORDER BY registered_at ASC`
);
res.json(r.rows.map(row => {
const out = { ...row, online: Number(row.stale_seconds) < 120 };
// The primary (this mam-api host) does not heartbeat via the node-agent,
// so its version/uptime are never populated. Self-populate them here so
// the Cluster screen renders them like worker nodes instead of dashes.
if (row.role === 'primary' && row.hostname === SELF_HOSTNAME) {
out.version = process.env.npm_package_version || row.version || null;
out.uptime = formatUptime(process.uptime());
}
return out;
}));
} catch (err) { next(err); }
});
router.get('/containers', async (req, res, next) => {
try {
const nodesRes = await pool.query(
`SELECT id, hostname, api_url,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes
ORDER BY registered_at ASC`
);
const tasks = nodesRes.rows.map(async node => {
const isOnline = Number(node.stale_seconds) < 120;
if (!isOnline) return [];
const localHostname = process.env.NODE_HOSTNAME || os.hostname();
const isLocal = node.hostname === localHostname || !node.api_url;
try {
let rawContainers = [];
if (isLocal) {
rawContainers = await dockerRequest('/containers/json?all=true') || [];
} else {
const resp = await fetch(`${node.api_url}/containers`, {
headers: agentAuthHeaders(),
signal: AbortSignal.timeout(4000),
});
if (resp.ok) rawContainers = await resp.json();
}
if (!Array.isArray(rawContainers)) return [];
return rawContainers.map(c => {
const rawName = (c.Names && c.Names[0] || '').replace(/^\//, '');
const name = rawName.replace(/^wild-dragon-/, '').replace(/-\d+$/, '');
return {
id: c.Id.slice(0, 12),
name,
image: (c.Image || '').replace(/^sha256:/, '').slice(0, 40),
state: c.State,
status: c.Status,
uptime: (c.Status || '').replace(/\s*\(.*\)/, '').trim(),
healthy: (c.Status || '').includes('healthy'),
node_hostname: node.hostname,
node_id: node.id,
};
});
} catch (err) {
console.warn(`[cluster] failed to fetch containers from ${node.hostname}:`, err.message);
return [];
}
});
const results = await Promise.all(tasks);
const flattened = results.flat();
res.json(flattened);
} catch (err) { next(err); }
});
router.get('/containers/:nodeId/:containerId/logs', requireAdmin, async (req, res, next) => {
try {
const { nodeId, containerId } = req.params;
const node = await resolveNode(nodeId);
if (!node) return res.status(404).json({ error: 'Node not found' });
const localHostname = process.env.NODE_HOSTNAME || os.hostname();
const isLocal = node.hostname === localHostname || !node.api_url;
if (isLocal) {
const tail = Math.min(parseInt(req.query.tail, 10) || 200, 2000);
const logs = await dockerLogs(containerId, tail);
res.json({ logs: logs || '(no logs)' });
} else {
const resp = await fetch(`${node.api_url}/sidecar/${containerId}/logs`, {
headers: agentAuthHeaders(),
signal: AbortSignal.timeout(6000),
});
if (!resp.ok) return res.status(resp.status).json({ error: 'Failed to fetch remote logs' });
const data = await resp.json();
res.json(data);
}
} catch (err) { next(err); }
});
router.post('/containers/:nameOrId/restart', async (req, res, next) => {
try {
await dockerRequest(`/containers/${encodeURIComponent(req.params.nameOrId)}/restart`, 'POST');
res.json({ ok: true });
} catch (err) { next(err); }
});
router.post('/heartbeat', async (req, res, next) => {
try {
const {
hostname, ip_address,
role = 'worker', version, api_url,
cpu_usage, mem_used_mb, mem_total_mb,
capabilities, metadata, metrics,
} = req.body;
if (!hostname) return res.status(400).json({ error: 'hostname is required' });
if (process.env.AUTH_ENABLED === 'true') {
const bound = req.tokenBoundHostname;
if (bound && bound !== hostname) {
return res.status(403).json({
error: `Token is bound to "${bound}" but heartbeat reported "${hostname}"`,
});
}
if (!bound && req.user?.role !== 'admin') {
return res.status(403).json({
error: 'Heartbeat requires a node-bound token or admin session',
});
}
}
const effectiveIp = pickIp(ip_address, req.ip || req.socket?.remoteAddress);
const r = await pool.query(
`INSERT INTO cluster_nodes
(hostname, ip_address, role, version, api_url,
cpu_usage, mem_used_mb, mem_total_mb, last_seen, last_seen_at, capabilities, metadata, metrics)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,NOW(),NOW(),$9,$10,$11)
ON CONFLICT (hostname) DO UPDATE SET
ip_address = EXCLUDED.ip_address,
role = EXCLUDED.role,
version = EXCLUDED.version,
api_url = EXCLUDED.api_url,
cpu_usage = EXCLUDED.cpu_usage,
mem_used_mb = EXCLUDED.mem_used_mb,
mem_total_mb = EXCLUDED.mem_total_mb,
last_seen = NOW(),
last_seen_at = NOW(),
capabilities = EXCLUDED.capabilities,
metadata = EXCLUDED.metadata,
metrics = COALESCE(EXCLUDED.metrics, cluster_nodes.metrics)
RETURNING *`,
[
hostname,
effectiveIp,
role,
version || null,
api_url || null,
cpu_usage != null ? cpu_usage : null,
mem_used_mb != null ? mem_used_mb : null,
mem_total_mb != null ? mem_total_mb : null,
capabilities != null ? JSON.stringify(capabilities) : '{}',
metadata != null ? JSON.stringify(metadata) : null,
metrics != null ? JSON.stringify(metrics) : null,
]
);
// Auto-provision recorder rows from this node's capture hardware. One row
// per physical port, keyed (node_id, device_index). Discovery only — it
// never enables, records, or deletes; the operator opts a port in via the
// Enable button. Non-fatal so a reconcile hiccup never drops a heartbeat.
reconcileRecordersForNode(r.rows[0]).catch(e =>
console.warn(`[recorders] auto-provision for ${hostname} failed (non-fatal): ${e.message}`));
res.json(r.rows[0]);
} catch (err) { next(err); }
});
// Discover capture ports from a node's heartbeat capabilities and upsert one
// recorder row per port. Idempotent via UNIQUE(node_id, device_index): a row
// is created the first time a port is seen (disabled, no sidecar) and left
// untouched on every subsequent heartbeat — operator config/label/enabled
// state is preserved. Ports that vanish are NOT deleted (node may be briefly
// offline); the UI greys them via the node's last_seen.
async function reconcileRecordersForNode(node) {
if (!node || !node.id) return;
const cap = node.capabilities || {};
// Each entry: { source_type, device_index }. Deltacast uses 'port', DeckLink
// uses 'index'; both become device_index (the capture-port offset).
const ports = [];
for (const d of (cap.deltacast || [])) {
const idx = d.index ?? d.port;
if (Number.isInteger(idx)) ports.push({ source_type: 'deltacast', device_index: idx });
}
for (const b of (cap.blackmagic || [])) {
const idx = b.index;
if (Number.isInteger(idx)) ports.push({ source_type: 'blackmagic', device_index: idx });
}
if (ports.length === 0) return;
// Default master codec for newly-discovered ports. SDI capture at 1080p59.94
// CANNOT be encoded in realtime on CPU (ProRes/x264 fall behind → dropped
// frames → short, fast-playing files). Nodes with an NVENC-capable GPU default
// to GPU HEVC; only GPU-less nodes fall back to CPU ProRes.
const hasGpu = Array.isArray(cap.gpus) && cap.gpus.length > 0;
const defaultCodec = hasGpu ? 'hevc_nvenc' : 'prores_hq';
for (const p of ports) {
// INSERT … ON CONFLICT DO NOTHING: create-once. Never overwrite an existing
// row (preserves label, enabled, codec config, status). source_config keeps
// the legacy {port}/{device} shape the capture pipeline already reads.
const srcCfg = p.source_type === 'deltacast'
? { port: p.device_index }
: { device: p.device_index };
await pool.query(
`INSERT INTO recorders
(node_id, device_index, source_type, source_config, name, enabled, auto_provisioned,
recording_codec, recording_container, recording_video_bitrate, recording_audio_channels)
VALUES ($1, $2, $3::source_type, $4, $5, false, true, $6, 'mov', '25M', 2)
ON CONFLICT (node_id, device_index) WHERE node_id IS NOT NULL AND device_index IS NOT NULL
DO NOTHING`,
[
node.id,
p.device_index,
p.source_type,
JSON.stringify(srcCfg),
// Deterministic hardware name; the operator can set a friendly `label`.
`${node.hostname}-${p.source_type === 'deltacast' ? 'dc' : 'bmd'}${p.device_index}`,
defaultCodec,
]
);
}
}
router.get('/devices/blackmagic/signal', async (req, res, next) => {
try {
const nodesResult = await pool.query(
`SELECT id, hostname, ip_address, api_url, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes
WHERE capabilities IS NOT NULL`
);
const recResult = await pool.query(
`SELECT id, name, status, container_id, node_id, device_index,
source_config
FROM recorders
WHERE source_type = 'sdi' AND node_id IS NOT NULL`
);
const recByPort = new Map();
for (const r of recResult.rows) {
const devIdx = r.device_index ?? r.source_config?.device ?? 0;
recByPort.set(`${r.node_id}:${devIdx}`, r);
}
const tasks = [];
for (const node of nodesResult.rows) {
const nodeOnline = Number(node.stale_seconds) < 120;
const bm = (node.capabilities && node.capabilities.blackmagic) || [];
const model = (node.capabilities && node.capabilities.blackmagic_model) || null;
const localHostname = process.env.NODE_HOSTNAME || '';
const isRemote = node.api_url && node.hostname !== localHostname;
bm.forEach((d, idx) => {
const portIndex = d.index !== undefined ? d.index : idx;
const rec = recByPort.get(`${node.id}:${portIndex}`);
tasks.push((async () => {
const base = {
node_id: node.id, hostname: node.hostname, index: portIndex,
device: d.device || null, model, node_online: nodeOnline,
recorder_id: rec ? rec.id : null, recorder_name: rec ? rec.name : null,
recorder_status: rec ? rec.status : null,
signal: 'no-recorder', framesReceived: null, currentFps: null,
};
if (!rec || rec.status !== 'recording' || !rec.container_id) {
if (rec && rec.status !== 'recording') base.signal = 'idle';
return base;
}
try {
let live = null;
if (isRemote) {
const r = await fetch(`${node.api_url}/sidecar/${rec.container_id}/status`, { signal: AbortSignal.timeout(2500) });
if (r.ok) live = (await r.json()).live;
} else {
const r = await fetch(`http://recorder-${rec.id}:3001/capture/status`, { signal: AbortSignal.timeout(2000) });
if (r.ok) live = await r.json();
}
if (live && live.signal) {
base.signal = live.signal;
base.framesReceived = live.framesReceived ?? null;
base.currentFps = live.currentFps ?? null;
} else { base.signal = 'connecting'; }
} catch (_) { base.signal = 'connecting'; }
return base;
})());
});
}
const results = await Promise.all(tasks);
res.json(results);
} catch (err) { next(err); }
});
router.get('/devices/blackmagic', async (req, res, next) => {
try {
const r = await pool.query(
`SELECT id, hostname, ip_address, role, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes WHERE capabilities IS NOT NULL`
);
const out = [];
for (const row of r.rows) {
const online = Number(row.stale_seconds) < 120;
const bm = (row.capabilities && row.capabilities.blackmagic) || [];
const model = (row.capabilities && row.capabilities.blackmagic_model) || null;
bm.forEach((d, idx) => {
out.push({ node_id: row.id, hostname: row.hostname, ip_address: row.ip_address,
role: row.role, online, model, index: d.index !== undefined ? d.index : idx, device: d.device });
});
}
res.json(out);
} catch (err) { next(err); }
});
router.get('/devices/deltacast', async (req, res, next) => {
try {
const r = await pool.query(
`SELECT id, hostname, ip_address, role, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes WHERE capabilities IS NOT NULL`
);
const out = [];
for (const row of r.rows) {
const online = Number(row.stale_seconds) < 120;
const dc = (row.capabilities && row.capabilities.deltacast) || [];
const model = (row.capabilities && row.capabilities.deltacast_model) || null;
dc.forEach((d, idx) => {
out.push({ node_id: row.id, hostname: row.hostname, ip_address: row.ip_address,
role: row.role, online, model: model || 'Deltacast',
index: d.index !== undefined ? d.index : idx, device: d.device,
present: d.present !== false, port_count: dc.length });
});
}
res.json(out);
} catch (err) { next(err); }
});
router.get('/devices/deltacast/signal', async (req, res, next) => {
try {
const [nodesRes, recordersRes] = await Promise.all([
pool.query(`SELECT id, hostname, ip_address, api_url, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes WHERE capabilities IS NOT NULL`),
pool.query(`SELECT id, node_id, device_index, status, source_type, container_id
FROM recorders WHERE source_type = 'deltacast'`),
]);
const recByNodePort = {};
for (const rec of recordersRes.rows) {
recByNodePort[`${rec.node_id}:${rec.device_index}`] = rec;
}
const results = [];
const fetchPromises = [];
for (const node of nodesRes.rows) {
const online = Number(node.stale_seconds) < 120;
const dc = (node.capabilities && node.capabilities.deltacast) || [];
const model = (node.capabilities && node.capabilities.deltacast_model) || 'Deltacast';
for (const port of dc) {
const idx = port.index !== undefined ? port.index : dc.indexOf(port);
const rec = recByNodePort[`${node.id}:${idx}`];
const base = { node_id: node.id, hostname: node.hostname, ip_address: node.ip_address,
online, model, index: idx, device: port.device, present: port.present !== false,
recorder_id: rec ? rec.id : null, recorder_status: rec ? rec.status : null,
signal: 'no-recorder', framesReceived: null, currentFps: null };
if (!rec) { results.push(base); continue; }
if (rec.status !== 'recording') { base.signal = 'idle'; results.push(base); continue; }
const fetchIdx = results.length;
results.push(base);
fetchPromises.push((async () => {
try {
const url = node.api_url ? `${node.api_url}/sidecar/${rec.container_id}/status`
: `http://recorder-${rec.id}:3001/capture/status`;
const r = await fetch(url, { signal: AbortSignal.timeout(2500) });
if (r.ok) {
const live = await r.json();
if (live && live.signal) {
results[fetchIdx].signal = live.signal;
results[fetchIdx].framesReceived = live.framesReceived ?? null;
results[fetchIdx].currentFps = live.currentFps ?? null;
}
}
} catch (_) { results[fetchIdx].signal = 'connecting'; }
})());
}
}
await Promise.all(fetchPromises);
res.json(results);
} catch (err) { next(err); }
});
router.get('/:id/ping', async (req, res, next) => {
try {
const r = await pool.query('SELECT id, hostname, api_url FROM cluster_nodes WHERE id = $1', [req.params.id]);
if (r.rowCount === 0) return res.status(404).json({ error: 'Node not found' });
const node = r.rows[0];
if (!node.api_url) return res.json({ reachable: false, reason: 'no api_url registered' });
const start = Date.now();
try {
const upstream = await fetch(`${node.api_url}/health`, { signal: AbortSignal.timeout(4000) });
const latency_ms = Date.now() - start;
const body = await upstream.json().catch(() => ({}));
res.json({ reachable: upstream.ok, latency_ms, status: upstream.status, agent: body });
} catch (err) {
res.json({ reachable: false, latency_ms: Date.now() - start, reason: err.message });
}
} catch (err) { next(err); }
});
// ── Capture-driver / SDK deployment ────────────────────────────────────────
// Admins install/update vendor capture-card drivers on a node from the UI.
// We resolve the node's api_url (like /:id/ping) and forward to its node-agent,
// which runs deploy/install-driver.sh <vendor> in a privileged one-shot
// container against the host kernel. Vendor is allowlisted here AND on the
// agent. We never echo the agent token or proprietary paths back to the client.
const DRIVER_VENDORS = ['blackmagic', 'aja', 'deltacast', 'ndi'];
// Bearer the agent expects (its NODE_TOKEN). Configured server-side; never
// derived from client input and never returned to the browser.
function agentAuthHeaders() {
const tok = process.env.NODE_AGENT_TOKEN || '';
return tok ? { Authorization: `Bearer ${tok}` } : {};
}
async function resolveNode(id) {
const r = await pool.query('SELECT id, hostname, api_url, capabilities FROM cluster_nodes WHERE id = $1', [id]);
return r.rowCount === 0 ? null : r.rows[0];
}
router.get('/:id/driver-status', requireAdmin, async (req, res, next) => {
try {
const node = await resolveNode(req.params.id);
if (!node) return res.status(404).json({ error: 'Node not found' });
if (!node.api_url) return res.status(409).json({ error: 'Node has no api_url registered' });
try {
const upstream = await fetch(`${node.api_url}/driver/status`, {
headers: agentAuthHeaders(),
signal: AbortSignal.timeout(6000),
});
const body = await upstream.json().catch(() => ({}));
if (!upstream.ok) {
return res.status(502).json({ error: 'Agent driver-status failed', status: upstream.status });
}
res.json(body);
} catch (err) {
res.status(502).json({ error: 'Node unreachable', reason: err.message });
}
} catch (err) { next(err); }
});
router.post('/:id/install-driver', requireAdmin, async (req, res, next) => {
try {
const vendor = String(req.body?.vendor || '').toLowerCase();
if (!DRIVER_VENDORS.includes(vendor)) {
return res.status(400).json({ error: `Invalid vendor (allowed: ${DRIVER_VENDORS.join(', ')})` });
}
const node = await resolveNode(req.params.id);
if (!node) return res.status(404).json({ error: 'Node not found' });
if (!node.api_url) return res.status(409).json({ error: 'Node has no api_url registered' });
try {
// DKMS builds can take minutes — generous timeout.
const upstream = await fetch(`${node.api_url}/driver/install`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...agentAuthHeaders() },
body: JSON.stringify({ vendor }),
signal: AbortSignal.timeout(600000),
});
const body = await upstream.json().catch(() => ({}));
// Relay logs/result. install-driver.sh never echoes secrets; the agent
// returns only its structured [install-driver] log lines + status.
res.status(upstream.ok ? 200 : 502).json({
ok: !!body.ok,
vendor,
exitCode: body.exitCode ?? null,
rebootRequired: !!body.rebootRequired,
status: body.status ?? null,
logs: typeof body.logs === 'string' ? body.logs : '',
error: body.ok ? undefined : (body.error || 'Install failed — see logs'),
});
} catch (err) {
res.status(502).json({ error: 'Node unreachable or install timed out', reason: err.message });
}
} catch (err) { next(err); }
});
router.get('/metrics', async (req, res, next) => {
try {
const r = await pool.query(
`SELECT id, hostname, role, last_seen,
cpu_usage, mem_used_mb, mem_total_mb,
capabilities, metrics,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes ORDER BY registered_at ASC`
);
const nodes = r.rows.map(row => {
const capGpus = (row.capabilities && row.capabilities.gpus) || [];
const liveGpus = (row.metrics && row.metrics.gpus) || [];
const gpus = capGpus.map((g, idx) => {
const live = liveGpus.find(l => l.index === g.index) || liveGpus[idx] || {};
return { name: g.name || null, util_pct: live.util_pct != null ? live.util_pct : null,
memory_used_mb: live.memory_used_mb != null ? live.memory_used_mb : null,
memory_total_mb: g.memory_mb != null ? g.memory_mb : (live.memory_total_mb ?? null) };
});
for (const lg of liveGpus) {
if (!capGpus.some(g => g.index === lg.index)) {
gpus.push({ name: lg.name || null, util_pct: lg.util_pct != null ? lg.util_pct : null,
memory_used_mb: lg.memory_used_mb != null ? lg.memory_used_mb : null,
memory_total_mb: lg.memory_total_mb != null ? lg.memory_total_mb : null });
}
}
return { id: row.id, hostname: row.hostname, role: row.role,
online: Number(row.stale_seconds) < 120, last_seen: row.last_seen,
cpu_util_pct: row.cpu_usage != null ? Number(row.cpu_usage) : null,
ram_used_mb: row.mem_used_mb != null ? row.mem_used_mb : null,
ram_total_mb: row.mem_total_mb != null ? row.mem_total_mb : null, gpus };
});
res.json({ nodes });
} catch (err) { next(err); }
});
router.delete('/:id', async (req, res, next) => {
try {
const r = await pool.query('DELETE FROM cluster_nodes WHERE id = $1 RETURNING id', [req.params.id]);
if (r.rowCount === 0) return res.status(404).json({ error: 'Node not found' });
res.json({ ok: true });
} catch (err) { next(err); }
});
export default router;