dragonflight/services/mam-api/src/routes/cluster.js

462 lines
17 KiB
JavaScript
Raw Normal View History

import express from 'express';
import http from 'http';
import pool from '../db/pool.js';
const router = express.Router();
// If the agent reported Docker's default bridge IP (172.17.x) but the request
// itself came from a real LAN address, prefer the request source IP instead.
// We only check 172.17.x — the default docker0 bridge — not the full RFC1918
// 172.16/12 block, since real LANs (e.g. 172.18.91.x) fall in that range.
function pickIp(reportedIp, reqIp) {
const clean = (s) => (s || '').replace(/^::ffff:/, '');
const isDockerBridge = (ip) => /^172\.17\./.test(ip || '');
const r = clean(reqIp);
if (!reportedIp) return r || null;
if (isDockerBridge(reportedIp) && r && !isDockerBridge(r)) return r;
return reportedIp;
}
function dockerRequest(path, method = 'GET', body = null) {
return new Promise((resolve, reject) => {
const opts = {
socketPath: '/var/run/docker.sock',
path: `/v1.41${path}`,
method,
headers: { 'Accept': 'application/json', 'Content-Type': 'application/json' },
};
const req = http.request(opts, (res) => {
let data = '';
res.on('data', d => { data += d; });
res.on('end', () => {
if (!data.trim()) return resolve(null);
try { resolve(JSON.parse(data)); }
catch (e) { resolve(null); }
});
});
req.on('error', reject);
req.setTimeout(5000, () => { req.destroy(); reject(new Error('Docker socket timeout')); });
if (body) req.write(JSON.stringify(body));
req.end();
});
}
// GET / list all registered cluster nodes with online status
router.get('/', async (req, res, next) => {
try {
const r = await pool.query(
`SELECT *,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes
ORDER BY registered_at ASC`
);
res.json(r.rows.map(row => ({
...row,
online: Number(row.stale_seconds) < 120,
})));
} catch (err) { next(err); }
});
// GET /containers list all containers on the local Docker host
router.get('/containers', async (req, res, next) => {
try {
const containers = await dockerRequest('/containers/json?all=true');
if (!Array.isArray(containers)) return res.json([]);
const out = containers.map(c => {
const rawName = (c.Names[0] || '').replace(/^\//, '');
const name = rawName.replace(/^wild-dragon-/, '').replace(/-\d+$/, '');
const ports = (c.Ports || [])
.filter(p => p.PublicPort)
.map(p => `${p.PublicPort}${p.PrivatePort}`)
.join(', ');
return {
id: c.Id.slice(0, 12),
name,
image: (c.Image || '').replace(/^sha256:/, '').slice(0, 40),
state: c.State,
uptime: (c.Status || '').replace(/\s*\(.*\)/, '').trim(),
healthy: (c.Status || '').includes('healthy'),
ports,
cpu: 0,
mem: 0,
};
});
res.json(out);
} catch (err) {
if (err.code === 'ENOENT' || err.code === 'EACCES') return res.json([]);
next(err);
}
});
// POST /containers/:nameOrId/restart
router.post('/containers/:nameOrId/restart', async (req, res, next) => {
try {
await dockerRequest(`/containers/${encodeURIComponent(req.params.nameOrId)}/restart`, 'POST');
res.json({ ok: true });
} catch (err) { next(err); }
});
// POST /heartbeat upsert this node's registration (includes hardware capabilities)
router.post('/heartbeat', async (req, res, next) => {
try {
const {
hostname, ip_address,
role = 'worker', version, api_url,
cpu_usage, mem_used_mb, mem_total_mb,
capabilities, metadata,
} = req.body;
if (!hostname) return res.status(400).json({ error: 'hostname is required' });
chore: 1.2 ship-prep sweep — close 38 issues Frontend / UX / a11y - Sidebar collapse/expand toggle with localStorage persistence (#142) - Settings sections wrap inputs in <form> with Enter-to-submit + native validation; password autocomplete=new-password (#141, #138) - Asset thumbnails get descriptive alt text (#140) - Production deploy now precompiles JSX via esbuild and loads the production React UMD instead of dev builds + in-browser Babel (#139, #122) - Search wrapper gets role=search; global search input gets aria-label, role=combobox, aria-controls/aria-expanded/aria-activedescendant wiring (#137, #135) - Dashboard and Library no longer share the same nav icon (#136) - Sidebar collapses off-canvas with a topbar menu button below 768 px; mobile default is collapsed (#134) - --text-3 bumped to #8B92A0 for WCAG AA contrast on --bg-0 (#133) - Schedule and Library routes were rendering empty inside the .main flex container — switched to flex:1 + min-height:0 (#131, #132, editor + asset detail get the same fix) - Jobs nav badge now polls /jobs?status=active every 10 s and reflects the live count (#130, #113) - aria-label sweep on every icon-only button (#126) - Premiere panel release list moved to window.PREMIERE_RELEASES in data.jsx; Editor + Settings read from the same source (#125) - Typo setPgMclips → setPgmClips (#124) - Stray console.error / console.warn calls gated behind window.DF_LOG.{warn,error} (#123) - Hardcoded /api/v1 paths route through window.ZAMPP_API_PREFIX (#115) - Schedule rows no longer crash on null recorder_id (#117) - EditorKeyboard guards against document.activeElement === null (#116) - Unmount-safe timers for PasswordResetModal, Containers, Editor (#111) - Player seek clamps below totalMs, server-side range clamping + uncached 416 on EOF, client-side EOF-stall watchdog (#143) - Duration badge overlap fix on narrow asset cards (#52) Backend / security / reliability - GET /recorders fixed N+1: single LATERAL JOIN for live_asset_id; Docker inspects bounded to actually-recording rows (#121) - Upload disk-storage (multer.diskStorage) streams parts to S3 instead of buffering 500 MB in RAM (#120) - /assets list clamps limit to MAX_LIMIT=500 to prevent OOM (#119) - SDK upload archive listing + post-extract sanitize block zip-slip / tar-slip and symlink escapes (#118) - Migrations track applied state in schema_migrations, run in a transaction, and exit non-zero on failure (#107) - node-agent BMD_COUNT override uses BMD_DEVICE_PREFIX; filesystem detection wins (#109, #127) - GPU_COUNT override now merges with nvidia-smi enrichment (#108) - /cluster/heartbeat requires a node-bound token or admin user; tokens carry bound_hostname (#106) - /recorders/:id/start error responses no longer echo the Docker create payload — env vars stay out of client responses (#105) - /recorders/probe restricts schemes (srt/rtmp/rtsp/udp/rtp), blocks private + loopback hosts for non-admins, denies common service ports (#104) - Scheduler tick guarded by a Postgres advisory lock; pending/running rows claimed via UPDATE...RETURNING + FOR UPDATE SKIP LOCKED to survive multi-node deploys (#103) - UUID validateUuid('id') param middleware on every /:id route (#102) - Error handler scrubs Postgres error messages and 5xx detail (#101) - Graceful SIGTERM/SIGINT shutdown — stops scheduler, drains the HTTP server, ends the pool, 25 s force-exit watchdog (#100) - AMPP sync moved from fire-and-forget to a persisted retry queue (ampp_sync_status / attempts / next_attempt_at + scheduler retry loop with exponential backoff) (#77) Migrations - 019: api_tokens.bound_hostname (#106) - 020: assets.ampp_sync_status + retry bookkeeping (#77) Other - Defer #92 Growing-files per-upload toggle, #80 Audio tab, #57 Dashboard redesign, #56 Editor SPA polish phase 3, #114 S3 migration tool to v1.3
2026-05-26 22:06:14 -04:00
// Issue #106 — any authenticated user used to be able to POST a heartbeat
// for an arbitrary hostname and overwrite the primary node's `api_url`,
// effectively hijacking job dispatch. Now: if the caller's token is bound
// to a hostname (node-agent tokens are bound at issue time), the body
// hostname must match. Admin users with no binding are allowed for ops.
if (process.env.AUTH_ENABLED === 'true') {
const bound = req.tokenBoundHostname;
if (bound && bound !== hostname) {
return res.status(403).json({
error: `Token is bound to "${bound}" but heartbeat reported "${hostname}"`,
});
}
if (!bound && req.user?.role !== 'admin') {
return res.status(403).json({
error: 'Heartbeat requires a node-bound token or admin session',
});
}
}
const effectiveIp = pickIp(ip_address, req.ip || req.socket?.remoteAddress);
const r = await pool.query(
`INSERT INTO cluster_nodes
(hostname, ip_address, role, version, api_url,
cpu_usage, mem_used_mb, mem_total_mb, last_seen, capabilities, metadata)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,NOW(),$9,$10)
ON CONFLICT (hostname) DO UPDATE SET
ip_address = EXCLUDED.ip_address,
role = EXCLUDED.role,
version = EXCLUDED.version,
api_url = EXCLUDED.api_url,
cpu_usage = EXCLUDED.cpu_usage,
mem_used_mb = EXCLUDED.mem_used_mb,
mem_total_mb = EXCLUDED.mem_total_mb,
last_seen = NOW(),
capabilities = EXCLUDED.capabilities,
metadata = EXCLUDED.metadata
RETURNING *`,
[
hostname,
effectiveIp,
role,
version || null,
api_url || null,
cpu_usage != null ? cpu_usage : null,
mem_used_mb != null ? mem_used_mb : null,
mem_total_mb != null ? mem_total_mb : null,
capabilities != null ? JSON.stringify(capabilities) : '{}',
metadata != null ? JSON.stringify(metadata) : null,
]
);
res.json(r.rows[0]);
} catch (err) { next(err); }
});
// GET /devices/blackmagic/signal live video-presence state for every
// DeckLink port across the cluster. For each port we check whether there is
// an active SDI recorder assigned to it and, if so, query the capture
// container for its real signal state (receiving / lost / connecting /
// error). Ports without a recorder get signal = 'no-recorder'.
//
// Response shape (array):
// { node_id, hostname, index, device, model,
// signal, framesReceived, currentFps, recorder_id, recorder_status }
router.get('/devices/blackmagic/signal', async (req, res, next) => {
try {
// 1. Fetch all cluster nodes with DeckLink capabilities.
const nodesResult = await pool.query(
`SELECT id, hostname, ip_address, api_url, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes
WHERE capabilities IS NOT NULL`
);
// 2. Fetch all SDI recorders that are pinned to a node+device_index.
const recResult = await pool.query(
`SELECT id, name, status, container_id, node_id, device_index,
source_config
FROM recorders
WHERE source_type = 'sdi' AND node_id IS NOT NULL`
);
// Build a fast lookup: "${node_id}:${device_index}" → recorder row.
const recByPort = new Map();
for (const r of recResult.rows) {
const devIdx = r.device_index ?? r.source_config?.device ?? 0;
recByPort.set(`${r.node_id}:${devIdx}`, r);
}
// 3. For each port, determine signal state. We fire all capture-container
// fetches concurrently so the endpoint stays fast even with many ports.
const tasks = [];
for (const node of nodesResult.rows) {
const nodeOnline = Number(node.stale_seconds) < 120;
const bm = (node.capabilities && node.capabilities.blackmagic) || [];
const model = (node.capabilities && node.capabilities.blackmagic_model) || null;
const localHostname = process.env.NODE_HOSTNAME || '';
const isRemote = node.api_url && node.hostname !== localHostname;
bm.forEach((d, idx) => {
const portIndex = d.index !== undefined ? d.index : idx;
const rec = recByPort.get(`${node.id}:${portIndex}`);
tasks.push((async () => {
const base = {
node_id: node.id,
hostname: node.hostname,
index: portIndex,
device: d.device || null,
model,
node_online: nodeOnline,
recorder_id: rec ? rec.id : null,
recorder_name: rec ? rec.name : null,
recorder_status: rec ? rec.status : null,
signal: 'no-recorder',
framesReceived: null,
currentFps: null,
};
if (!rec || rec.status !== 'recording' || !rec.container_id) {
// No active capture — if there's a recorder but it's not recording,
// report that; otherwise the port is unassigned.
if (rec && rec.status !== 'recording') base.signal = 'idle';
return base;
}
// Active recording — query the capture container for real signal.
try {
let live = null;
if (isRemote) {
const r = await fetch(
`${node.api_url}/sidecar/${rec.container_id}/status`,
{ signal: AbortSignal.timeout(2500) }
);
if (r.ok) live = (await r.json()).live;
} else {
const r = await fetch(
`http://recorder-${rec.id}:3001/capture/status`,
{ signal: AbortSignal.timeout(2000) }
);
if (r.ok) live = await r.json();
}
if (live && live.signal) {
base.signal = live.signal;
base.framesReceived = live.framesReceived ?? null;
base.currentFps = live.currentFps ?? null;
} else {
base.signal = 'connecting';
}
} catch (_) {
base.signal = 'connecting';
}
return base;
})());
});
}
const results = await Promise.all(tasks);
res.json(results);
} catch (err) { next(err); }
});
// GET /devices/blackmagic flatten every node's DeckLink cards for the
// recorder picker. Returns one entry per device with the host node info.
router.get('/devices/blackmagic', async (req, res, next) => {
try {
const r = await pool.query(
`SELECT id, hostname, ip_address, role, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes
WHERE capabilities IS NOT NULL`
);
const out = [];
for (const row of r.rows) {
const online = Number(row.stale_seconds) < 120;
const bm = (row.capabilities && row.capabilities.blackmagic) || [];
const model = (row.capabilities && row.capabilities.blackmagic_model) || null;
bm.forEach((d, idx) => {
out.push({
node_id: row.id,
hostname: row.hostname,
ip_address: row.ip_address,
role: row.role,
online,
model,
index: d.index !== undefined ? d.index : idx,
device: d.device,
});
});
}
res.json(out);
} catch (err) { next(err); }
});
feat(capture): Deltacast SDI framework — test-card capture, cluster detection, UI ## capture service - capture-manager.js: add 'deltacast' source_type to _buildInputArgs. Uses 'deltacast://<index>' with ffmpeg deltacast demuxer when /dev/deltacast<N> exists; falls back to lavfi testsrc2 + sine test card (matching deltacast-sdi-recorder standalone app) when hardware absent. - routes/capture.js: add GET /devices/deltacast endpoint (enumerates /dev/deltacast* + DELTACAST_PORT_COUNT env fallback). Extend /probe to handle source_type=deltacast. ## node-agent - detectHardware(): add 'deltacast' array to capabilities payload. Enumerates /dev/deltacast* nodes; falls back to DELTACAST_PORT_COUNT env. Adds DELTACAST_MODEL env support. Logs dc= count in heartbeat line. - sidecar /start: bind /dev/deltacast* device nodes into capture containers when sourceType='deltacast'. ## mam-api - cluster.js: add GET /cluster/devices/deltacast and GET /cluster/devices/deltacast/signal endpoints — same shape as blackmagic equivalents for UI parity. - recorders.js /start: pass DELTACAST_PORT_COUNT env to capture container; bind /dev/deltacast* device nodes on local spawn. - migration 024: ALTER TYPE source_type ADD VALUE 'deltacast' (idempotent). - schema.sql: add 'deltacast' to source_type ENUM for fresh installs. ## web-ui - modal-new-recorder.jsx: add 'Deltacast' source type card; fetch /cluster/devices/deltacast on selection; port picker with TEST CARD badge when hardware absent; falls through to manual index entry if no devices detected.
2026-05-28 19:12:40 -04:00
// GET /devices/deltacast flatten every node's Deltacast cards for the
// recorder picker. Mirrors /devices/blackmagic shape so the UI can treat
// both card types uniformly.
router.get('/devices/deltacast', async (req, res, next) => {
try {
const r = await pool.query(
`SELECT id, hostname, ip_address, role, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes
WHERE capabilities IS NOT NULL`
);
const out = [];
for (const row of r.rows) {
const online = Number(row.stale_seconds) < 120;
const dc = (row.capabilities && row.capabilities.deltacast) || [];
const model = (row.capabilities && row.capabilities.deltacast_model) || null;
// Also synthesise entries from DELTACAST_PORT_COUNT if no entries reported yet —
// useful for nodes that haven't sent a heartbeat since the agent was updated.
dc.forEach((d, idx) => {
out.push({
node_id: row.id,
hostname: row.hostname,
ip_address: row.ip_address,
role: row.role,
online,
model: model || 'Deltacast',
index: d.index !== undefined ? d.index : idx,
device: d.device,
present: d.present !== false,
port_count: dc.length,
});
});
}
res.json(out);
} catch (err) { next(err); }
});
// GET /devices/deltacast/signal live signal state for Deltacast ports.
// Same pattern as /devices/blackmagic/signal.
router.get('/devices/deltacast/signal', async (req, res, next) => {
try {
const [nodesRes, recordersRes] = await Promise.all([
pool.query(
`SELECT id, hostname, ip_address, api_url, capabilities,
EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds
FROM cluster_nodes
WHERE capabilities IS NOT NULL`
),
pool.query(
`SELECT id, node_id, device_index, status, source_type, container_id
FROM recorders WHERE source_type = 'deltacast'`
),
]);
const recByNodePort = {};
for (const rec of recordersRes.rows) {
recByNodePort[`${rec.node_id}:${rec.device_index}`] = rec;
}
const results = [];
const fetchPromises = [];
for (const node of nodesRes.rows) {
const online = Number(node.stale_seconds) < 120;
const dc = (node.capabilities && node.capabilities.deltacast) || [];
const model = (node.capabilities && node.capabilities.deltacast_model) || 'Deltacast';
for (const port of dc) {
const idx = port.index !== undefined ? port.index : dc.indexOf(port);
const rec = recByNodePort[`${node.id}:${idx}`];
const base = {
node_id: node.id,
hostname: node.hostname,
ip_address: node.ip_address,
online,
model,
index: idx,
device: port.device,
present: port.present !== false,
recorder_id: rec ? rec.id : null,
recorder_status: rec ? rec.status : null,
signal: 'no-recorder',
framesReceived: null,
currentFps: null,
};
if (!rec) { results.push(base); continue; }
if (rec.status !== 'recording') { base.signal = 'idle'; results.push(base); continue; }
// Active recording — query capture container for real signal.
const fetchIdx = results.length;
results.push(base);
fetchPromises.push((async () => {
try {
const url = node.api_url
? `${node.api_url}/sidecar/${rec.container_id}/status`
: `http://recorder-${rec.id}:3001/capture/status`;
const r = await fetch(url, { signal: AbortSignal.timeout(2500) });
if (r.ok) {
const live = await r.json();
if (live && live.signal) {
results[fetchIdx].signal = live.signal;
results[fetchIdx].framesReceived = live.framesReceived ?? null;
results[fetchIdx].currentFps = live.currentFps ?? null;
}
}
} catch (_) {
results[fetchIdx].signal = 'connecting';
}
})());
}
}
await Promise.all(fetchPromises);
res.json(results);
} catch (err) { next(err); }
});
// GET /:id/ping probe the node's api_url/health endpoint directly
router.get('/:id/ping', async (req, res, next) => {
try {
const r = await pool.query(
'SELECT id, hostname, api_url FROM cluster_nodes WHERE id = $1',
[req.params.id]
);
if (r.rowCount === 0) return res.status(404).json({ error: 'Node not found' });
const node = r.rows[0];
if (!node.api_url) return res.json({ reachable: false, reason: 'no api_url registered' });
const start = Date.now();
try {
const upstream = await fetch(`${node.api_url}/health`, {
signal: AbortSignal.timeout(4000),
});
const latency_ms = Date.now() - start;
const body = await upstream.json().catch(() => ({}));
res.json({ reachable: upstream.ok, latency_ms, status: upstream.status, agent: body });
} catch (err) {
res.json({ reachable: false, latency_ms: Date.now() - start, reason: err.message });
}
} catch (err) { next(err); }
});
// DELETE /:id deregister a node
router.delete('/:id', async (req, res, next) => {
try {
const r = await pool.query(
'DELETE FROM cluster_nodes WHERE id = $1 RETURNING id',
[req.params.id]
);
if (r.rowCount === 0) return res.status(404).json({ error: 'Node not found' });
res.json({ ok: true });
} catch (err) { next(err); }
});
export default router;