import express from 'express'; import http from 'http'; import pool from '../db/pool.js'; import { requireAuth } from '../middleware/auth.js'; const router = express.Router(); router.use(requireAuth); // If the agent reported Docker's default bridge IP (172.17.x) but the request // itself came from a real LAN address, prefer the request source IP instead. // We only check 172.17.x — the default docker0 bridge — not the full RFC1918 // 172.16/12 block, since real LANs (e.g. 172.18.91.x) fall in that range. function pickIp(reportedIp, reqIp) { const clean = (s) => (s || '').replace(/^::ffff:/, ''); const isDockerBridge = (ip) => /^172\.17\./.test(ip || ''); const r = clean(reqIp); if (!reportedIp) return r || null; if (isDockerBridge(reportedIp) && r && !isDockerBridge(r)) return r; return reportedIp; } function dockerRequest(path, method = 'GET', body = null) { return new Promise((resolve, reject) => { const opts = { socketPath: '/var/run/docker.sock', path: `/v1.41${path}`, method, headers: { 'Accept': 'application/json', 'Content-Type': 'application/json' }, }; const req = http.request(opts, (res) => { let data = ''; res.on('data', d => { data += d; }); res.on('end', () => { if (!data.trim()) return resolve(null); try { resolve(JSON.parse(data)); } catch (e) { resolve(null); } }); }); req.on('error', reject); req.setTimeout(5000, () => { req.destroy(); reject(new Error('Docker socket timeout')); }); if (body) req.write(JSON.stringify(body)); req.end(); }); } // GET / – list all registered cluster nodes with online status router.get('/', async (req, res, next) => { try { const r = await pool.query( `SELECT *, EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds FROM cluster_nodes ORDER BY registered_at ASC` ); res.json(r.rows.map(row => ({ ...row, online: Number(row.stale_seconds) < 120, }))); } catch (err) { next(err); } }); // GET /containers – list all containers on the local Docker host router.get('/containers', async (req, res, next) => { try { const containers = await dockerRequest('/containers/json?all=true'); if (!Array.isArray(containers)) return res.json([]); const out = containers.map(c => { const rawName = (c.Names[0] || '').replace(/^\//, ''); const name = rawName.replace(/^wild-dragon-/, '').replace(/-\d+$/, ''); const ports = (c.Ports || []) .filter(p => p.PublicPort) .map(p => `${p.PublicPort}→${p.PrivatePort}`) .join(', '); return { id: c.Id.slice(0, 12), name, image: (c.Image || '').replace(/^sha256:/, '').slice(0, 40), state: c.State, uptime: (c.Status || '').replace(/\s*\(.*\)/, '').trim(), healthy: (c.Status || '').includes('healthy'), ports, cpu: 0, mem: 0, }; }); res.json(out); } catch (err) { if (err.code === 'ENOENT' || err.code === 'EACCES') return res.json([]); next(err); } }); // POST /containers/:nameOrId/restart router.post('/containers/:nameOrId/restart', async (req, res, next) => { try { await dockerRequest(`/containers/${encodeURIComponent(req.params.nameOrId)}/restart`, 'POST'); res.json({ ok: true }); } catch (err) { next(err); } }); // POST /heartbeat – upsert this node's registration (includes hardware capabilities) router.post('/heartbeat', async (req, res, next) => { try { const { hostname, ip_address, role = 'worker', version, api_url, cpu_usage, mem_used_mb, mem_total_mb, capabilities, metadata, } = req.body; if (!hostname) return res.status(400).json({ error: 'hostname is required' }); const effectiveIp = pickIp(ip_address, req.ip || req.socket?.remoteAddress); const r = await pool.query( `INSERT INTO cluster_nodes (hostname, ip_address, role, version, api_url, cpu_usage, mem_used_mb, mem_total_mb, last_seen, capabilities, metadata) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,NOW(),$9,$10) ON CONFLICT (hostname) DO UPDATE SET ip_address = EXCLUDED.ip_address, role = EXCLUDED.role, version = EXCLUDED.version, api_url = EXCLUDED.api_url, cpu_usage = EXCLUDED.cpu_usage, mem_used_mb = EXCLUDED.mem_used_mb, mem_total_mb = EXCLUDED.mem_total_mb, last_seen = NOW(), capabilities = EXCLUDED.capabilities, metadata = EXCLUDED.metadata RETURNING *`, [ hostname, effectiveIp, role, version || null, api_url || null, cpu_usage != null ? cpu_usage : null, mem_used_mb != null ? mem_used_mb : null, mem_total_mb != null ? mem_total_mb : null, capabilities != null ? JSON.stringify(capabilities) : '{}', metadata != null ? JSON.stringify(metadata) : null, ] ); res.json(r.rows[0]); } catch (err) { next(err); } }); // GET /devices/blackmagic/signal – live video-presence state for every // DeckLink port across the cluster. For each port we check whether there is // an active SDI recorder assigned to it and, if so, query the capture // container for its real signal state (receiving / lost / connecting / // error). Ports without a recorder get signal = 'no-recorder'. // // Response shape (array): // { node_id, hostname, index, device, model, // signal, framesReceived, currentFps, recorder_id, recorder_status } router.get('/devices/blackmagic/signal', async (req, res, next) => { try { // 1. Fetch all cluster nodes with DeckLink capabilities. const nodesResult = await pool.query( `SELECT id, hostname, ip_address, api_url, capabilities, EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds FROM cluster_nodes WHERE capabilities IS NOT NULL` ); // 2. Fetch all SDI recorders that are pinned to a node+device_index. const recResult = await pool.query( `SELECT id, name, status, container_id, node_id, device_index, source_config FROM recorders WHERE source_type = 'sdi' AND node_id IS NOT NULL` ); // Build a fast lookup: "${node_id}:${device_index}" → recorder row. const recByPort = new Map(); for (const r of recResult.rows) { const devIdx = r.device_index ?? r.source_config?.device ?? 0; recByPort.set(`${r.node_id}:${devIdx}`, r); } // 3. For each port, determine signal state. We fire all capture-container // fetches concurrently so the endpoint stays fast even with many ports. const tasks = []; for (const node of nodesResult.rows) { const nodeOnline = Number(node.stale_seconds) < 120; const bm = (node.capabilities && node.capabilities.blackmagic) || []; const model = (node.capabilities && node.capabilities.blackmagic_model) || null; const localHostname = process.env.NODE_HOSTNAME || ''; const isRemote = node.api_url && node.hostname !== localHostname; bm.forEach((d, idx) => { const portIndex = d.index !== undefined ? d.index : idx; const rec = recByPort.get(`${node.id}:${portIndex}`); tasks.push((async () => { const base = { node_id: node.id, hostname: node.hostname, index: portIndex, device: d.device || null, model, node_online: nodeOnline, recorder_id: rec ? rec.id : null, recorder_name: rec ? rec.name : null, recorder_status: rec ? rec.status : null, signal: 'no-recorder', framesReceived: null, currentFps: null, }; if (!rec || rec.status !== 'recording' || !rec.container_id) { // No active capture — if there's a recorder but it's not recording, // report that; otherwise the port is unassigned. if (rec && rec.status !== 'recording') base.signal = 'idle'; return base; } // Active recording — query the capture container for real signal. try { let live = null; if (isRemote) { const r = await fetch( `${node.api_url}/sidecar/${rec.container_id}/status`, { signal: AbortSignal.timeout(2500) } ); if (r.ok) live = (await r.json()).live; } else { const r = await fetch( `http://recorder-${rec.id}:3001/capture/status`, { signal: AbortSignal.timeout(2000) } ); if (r.ok) live = await r.json(); } if (live && live.signal) { base.signal = live.signal; base.framesReceived = live.framesReceived ?? null; base.currentFps = live.currentFps ?? null; } else { base.signal = 'connecting'; } } catch (_) { base.signal = 'connecting'; } return base; })()); }); } const results = await Promise.all(tasks); res.json(results); } catch (err) { next(err); } }); // GET /devices/blackmagic – flatten every node's DeckLink cards for the // recorder picker. Returns one entry per device with the host node info. router.get('/devices/blackmagic', async (req, res, next) => { try { const r = await pool.query( `SELECT id, hostname, ip_address, role, capabilities, EXTRACT(EPOCH FROM (NOW() - last_seen)) AS stale_seconds FROM cluster_nodes WHERE capabilities IS NOT NULL` ); const out = []; for (const row of r.rows) { const online = Number(row.stale_seconds) < 120; const bm = (row.capabilities && row.capabilities.blackmagic) || []; const model = (row.capabilities && row.capabilities.blackmagic_model) || null; bm.forEach((d, idx) => { out.push({ node_id: row.id, hostname: row.hostname, ip_address: row.ip_address, role: row.role, online, model, index: d.index !== undefined ? d.index : idx, device: d.device, }); }); } res.json(out); } catch (err) { next(err); } }); // GET /:id/ping – probe the node's api_url/health endpoint directly router.get('/:id/ping', async (req, res, next) => { try { const r = await pool.query( 'SELECT id, hostname, api_url FROM cluster_nodes WHERE id = $1', [req.params.id] ); if (r.rowCount === 0) return res.status(404).json({ error: 'Node not found' }); const node = r.rows[0]; if (!node.api_url) return res.json({ reachable: false, reason: 'no api_url registered' }); const start = Date.now(); try { const upstream = await fetch(`${node.api_url}/health`, { signal: AbortSignal.timeout(4000), }); const latency_ms = Date.now() - start; const body = await upstream.json().catch(() => ({})); res.json({ reachable: upstream.ok, latency_ms, status: upstream.status, agent: body }); } catch (err) { res.json({ reachable: false, latency_ms: Date.now() - start, reason: err.message }); } } catch (err) { next(err); } }); // DELETE /:id – deregister a node router.delete('/:id', async (req, res, next) => { try { const r = await pool.query( 'DELETE FROM cluster_nodes WHERE id = $1 RETURNING id', [req.params.id] ); if (r.rowCount === 0) return res.status(404).json({ error: 'Node not found' }); res.json({ ok: true }); } catch (err) { next(err); } }); export default router;