Routes: channel + playlist CRUD, start/stop/play/pause/skip transport, as-run log. RBAC via assertProjectAccess on channel.project_id; null project ⇒ admin-only (recorder convention). Sidecar orchestration mirrors recorders.js: Docker socket for local node, node-agent /sidecar/start for remote. Channel start passes CHANNEL_ID env so the sidecar can write HLS preview to /media/live/<id>. DeckLink port-contention guard: blocks starting a decklink channel when a recorder or another channel on the same node+device_index is active. restartChannel(id) helper picks another healthy cluster node and re-places non-decklink channels; decklink is alert-only. Exposed for the scheduler. Scheduler tick adds step 6: poll each running channel's sidecar /status, update last_heartbeat_at, and after ~3 misses trigger restartChannel + self-call /start. Reuses the existing PG advisory lock so multi-replica deploys don't double-fire failovers.
344 lines
15 KiB
JavaScript
344 lines
15 KiB
JavaScript
import 'dotenv/config';
|
|
import express from 'express';
|
|
import cors from 'cors';
|
|
import session from 'express-session';
|
|
import connectPgSimple from 'connect-pg-simple';
|
|
const PgStore = connectPgSimple(session);
|
|
import os from 'node:os';
|
|
import { exec } from 'node:child_process';
|
|
import pool from './db/pool.js';
|
|
import { errorHandler } from './middleware/errors.js';
|
|
import { requireAuth, requireUiHeader, requireAdmin } from './middleware/auth.js';
|
|
import { loadS3ConfigFromDb } from './s3/client.js';
|
|
|
|
import authRouter from './routes/auth.js';
|
|
import tokensRouter from './routes/tokens.js';
|
|
import usersRouter from './routes/users.js';
|
|
// Routes
|
|
import assetsRouter from './routes/assets.js';
|
|
import projectsRouter from './routes/projects.js';
|
|
import binsRouter from './routes/bins.js';
|
|
import jobsRouter from './routes/jobs.js';
|
|
import captureRouter from './routes/capture.js';
|
|
import uploadRouter from './routes/upload.js';
|
|
import recordersRouter from './routes/recorders.js';
|
|
import playoutRouter from './routes/playout.js';
|
|
import settingsRouter from './routes/settings.js';
|
|
import amppRouter from './routes/ampp.js';
|
|
import groupsRouter from './routes/groups.js';
|
|
import sequencesRouter from './routes/sequences.js';
|
|
import systemRouter from './routes/system.js';
|
|
import clusterRouter from './routes/cluster.js';
|
|
import sdkRouter from './routes/sdk.js';
|
|
import schedulesRouter from './routes/schedules.js';
|
|
import metricsRouter from './routes/metrics.js';
|
|
import commentsRouter from './routes/comments.js';
|
|
import importsRouter from './routes/imports.js';
|
|
import storageRouter from './routes/storage.js';
|
|
import { startSchedulerLoop, stopSchedulerLoop } from './scheduler.js';
|
|
import { startCleanupLoop } from './tasks/cleanupTempSegments.js';
|
|
|
|
const app = express();
|
|
const PORT = process.env.PORT || 3000;
|
|
|
|
// ── Middleware ────────────────────────────────────────────────────────────────
|
|
// Tightened CORS — once cookies carry authority, `origin: true` would let
|
|
// any site forge requests with the cookie. Drive the allowlist from env.
|
|
const allowedOrigins = (process.env.ALLOWED_ORIGINS || '')
|
|
.split(',').map(s => s.trim()).filter(Boolean);
|
|
app.use(cors({
|
|
origin: (origin, cb) => {
|
|
// No Origin header (same-origin or curl) — allow.
|
|
if (!origin) return cb(null, true);
|
|
if (allowedOrigins.length === 0 || allowedOrigins.includes(origin)) return cb(null, true);
|
|
// Reject cleanly: omit the Allow-Origin header so the browser surfaces
|
|
// a real CORS error instead of a 500 from a thrown Error in the callback.
|
|
console.warn('[cors] rejected origin:', origin);
|
|
return cb(null, false);
|
|
},
|
|
credentials: true,
|
|
}));
|
|
app.use(express.json({ limit: '50mb' }));
|
|
|
|
// Trust the reverse proxy only when explicitly told to (production HTTPS).
|
|
if (process.env.TRUST_PROXY === 'true') app.set('trust proxy', 1);
|
|
|
|
// HSTS — once a browser has seen this header over HTTPS for dragonflight.live,
|
|
// it auto-upgrades every future http:// request to https:// before hitting the
|
|
// wire. Cookies are Secure-only (below) and the CORS allowlist rejects HTTP,
|
|
// so without HSTS a user who lands on http:// silently can't log in.
|
|
// Only emit on actual HTTPS responses; req.secure honors trust proxy + X-Forwarded-Proto.
|
|
if (process.env.AUTH_ENABLED === 'true') {
|
|
app.use((req, res, next) => {
|
|
if (req.secure) res.setHeader('Strict-Transport-Security', 'max-age=31536000; includeSubDomains');
|
|
next();
|
|
});
|
|
}
|
|
|
|
// Hard-fail when production-mode auth has no stable session secret. Without
|
|
// this, express-session falls back to an in-memory random secret which
|
|
// invalidates every session on restart and breaks multi-node deployments.
|
|
if (process.env.AUTH_ENABLED === 'true' && !process.env.SESSION_SECRET) {
|
|
console.error('[fatal] SESSION_SECRET is required when AUTH_ENABLED=true');
|
|
process.exit(1);
|
|
}
|
|
|
|
// Session — actually wired this time. See specs/2026-05-27-auth-system-design.md.
|
|
app.use(session({
|
|
store: new PgStore({ pool, tableName: 'sessions', pruneSessionInterval: 60 * 15 /* seconds = 15 min */ }),
|
|
secret: process.env.SESSION_SECRET,
|
|
name: 'dragonflight.sid',
|
|
cookie: {
|
|
httpOnly: true,
|
|
sameSite: 'lax',
|
|
secure: process.env.TRUST_PROXY === 'true',
|
|
path: '/',
|
|
maxAge: 8 * 3600 * 1000,
|
|
},
|
|
rolling: false, // sliding renewal handled in requireAuth so idle + absolute can be enforced separately
|
|
resave: false,
|
|
saveUninitialized: false,
|
|
}));
|
|
|
|
// ── Health ────────────────────────────────────────────────────────────────────
|
|
app.get('/health', (_req, res) => res.json({ status: 'ok' }));
|
|
|
|
// ── Auth gate ─────────────────────────────────────────────────────────────────
|
|
// req.path is relative to the /api/v1 mount, so /auth/login NOT /api/v1/auth/login.
|
|
const UNAUTH_PATHS = new Set([
|
|
'/auth/login', '/auth/login/totp', '/auth/setup', '/auth/setup-required',
|
|
'/auth/google', '/auth/google/callback', '/auth/google/enabled',
|
|
]);
|
|
// node-agent now authenticates /cluster/heartbeat with a bound api_token
|
|
// (migration 019 + bound_hostname on the token). requireAuth handles the
|
|
// bearer lookup and sets req.tokenBoundHostname; the heartbeat handler in
|
|
// routes/cluster.js verifies body.hostname matches that binding.
|
|
app.use('/api/v1', requireUiHeader);
|
|
app.use('/api/v1', (req, res, next) => {
|
|
if (UNAUTH_PATHS.has(req.path)) return next();
|
|
return requireAuth(req, res, next);
|
|
});
|
|
|
|
// ── API Routes ────────────────────────────────────────────────────────────────
|
|
app.use('/api/v1/auth', authRouter);
|
|
// User and group administration is admin-only (RBAC v2). The auth gate above
|
|
// already established req.user; requireAdmin rejects non-admins with 403.
|
|
app.use('/api/v1/auth/users', requireAdmin, usersRouter);
|
|
app.use('/api/v1/users', requireAdmin, usersRouter); // alias for the SPA Users page
|
|
app.use('/api/v1/auth/tokens', requireAuth, tokensRouter);
|
|
app.use('/api/v1/assets', assetsRouter);
|
|
app.use('/api/v1/projects', projectsRouter);
|
|
app.use('/api/v1/bins', binsRouter);
|
|
app.use('/api/v1/jobs', jobsRouter);
|
|
app.use('/api/v1/capture', captureRouter);
|
|
app.use('/api/v1/upload', uploadRouter);
|
|
app.use('/api/v1/recorders', recordersRouter);
|
|
app.use('/api/v1/playout', playoutRouter);
|
|
app.use('/api/v1/settings', settingsRouter);
|
|
app.use('/api/v1/ampp', amppRouter);
|
|
app.use('/api/v1/groups', requireAdmin, groupsRouter);
|
|
app.use('/api/v1/sequences', sequencesRouter);
|
|
app.use('/api/v1/system', systemRouter);
|
|
app.use('/api/v1/cluster', clusterRouter);
|
|
app.use('/api/v1/sdk', sdkRouter);
|
|
app.use('/api/v1/schedules', schedulesRouter);
|
|
app.use('/api/v1/metrics', metricsRouter);
|
|
app.use('/api/v1/assets/:assetId/comments', commentsRouter);
|
|
app.use('/api/v1/imports', importsRouter);
|
|
app.use('/api/v1/storage', storageRouter);
|
|
|
|
// ── Error handler ─────────────────────────────────────────────────────────────
|
|
app.use(errorHandler);
|
|
|
|
// ── Start ────────────────────────────────────────────────────────────────────
|
|
import { readdirSync, readFileSync } from 'node:fs';
|
|
import { fileURLToPath } from 'node:url';
|
|
import { dirname, join } from 'node:path';
|
|
|
|
const __dirnameMig = dirname(fileURLToPath(import.meta.url));
|
|
async function runMigrations() {
|
|
// Issue #107 — previously the loop swallowed errors and let the server boot
|
|
// on a half-migrated schema. Now: track applied migrations in a table, run
|
|
// every pending one inside a transaction, and exit non-zero on failure so
|
|
// the orchestrator restarts (and so an operator notices) instead of serving
|
|
// 500s for the next month.
|
|
const dir = join(__dirnameMig, 'db', 'migrations');
|
|
let files = [];
|
|
try { files = readdirSync(dir).filter(f => f.endsWith('.sql')).sort(); } catch { return; }
|
|
|
|
await pool.query(`
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
filename TEXT PRIMARY KEY,
|
|
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
checksum_sha TEXT
|
|
)
|
|
`);
|
|
|
|
// Allow forcing a re-run via env when iterating locally.
|
|
const force = process.env.MIGRATIONS_FORCE === '1';
|
|
const allowFailures = process.env.MIGRATIONS_ALLOW_FAILURES === '1';
|
|
|
|
const appliedRes = await pool.query('SELECT filename FROM schema_migrations');
|
|
const applied = new Set(appliedRes.rows.map(r => r.filename));
|
|
|
|
for (const f of files) {
|
|
if (!force && applied.has(f)) continue;
|
|
const sql = readFileSync(join(dir, f), 'utf8');
|
|
const client = await pool.connect();
|
|
try {
|
|
await client.query('BEGIN');
|
|
await client.query(sql);
|
|
await client.query(
|
|
`INSERT INTO schema_migrations (filename) VALUES ($1)
|
|
ON CONFLICT (filename) DO UPDATE SET applied_at = NOW()`,
|
|
[f]
|
|
);
|
|
await client.query('COMMIT');
|
|
console.log('[migration] applied ' + f);
|
|
} catch (err) {
|
|
await client.query('ROLLBACK').catch(() => {});
|
|
console.error('[migration] FAILED ' + f + ': ' + err.message);
|
|
client.release();
|
|
if (allowFailures) continue;
|
|
// Hard fail — better to crash now than serve traffic on a broken schema.
|
|
console.error('[migration] aborting startup. Set MIGRATIONS_ALLOW_FAILURES=1 to override.');
|
|
process.exit(1);
|
|
}
|
|
client.release();
|
|
}
|
|
}
|
|
await runMigrations();
|
|
|
|
// Load S3 config from DB so any settings saved via the Settings page override env vars
|
|
await loadS3ConfigFromDb();
|
|
|
|
// ── Cluster self-heartbeat ────────────────────────────────────────────────────
|
|
function getLocalIp() {
|
|
// Prefer an explicit override — useful when running inside Docker where
|
|
// os.networkInterfaces() returns container bridge IPs, not the host LAN IP.
|
|
if (process.env.NODE_IP) return process.env.NODE_IP;
|
|
|
|
const ifaces = os.networkInterfaces();
|
|
for (const name of Object.keys(ifaces)) {
|
|
for (const iface of (ifaces[name] || [])) {
|
|
if (iface.family === 'IPv4' && !iface.internal) return iface.address;
|
|
}
|
|
}
|
|
return '127.0.0.1';
|
|
}
|
|
|
|
// Detect NVIDIA GPUs available to this container via nvidia-smi.
|
|
// Returns an array like [{ index: 0, name: 'Tesla P4', memory_mb: 7680 }, ...]
|
|
// or an empty array if nvidia-smi is unavailable or no GPUs found.
|
|
function detectGpus() {
|
|
return new Promise(resolve => {
|
|
exec(
|
|
'nvidia-smi --query-gpu=index,name,memory.total --format=csv,noheader,nounits',
|
|
{ timeout: 5000 },
|
|
(err, stdout) => {
|
|
if (err || !stdout.trim()) return resolve([]);
|
|
const gpus = stdout.trim().split('\n').map(line => {
|
|
const parts = line.split(',').map(s => s.trim());
|
|
return {
|
|
index: parseInt(parts[0], 10),
|
|
name: parts[1] || 'Unknown GPU',
|
|
memory_mb: parseInt(parts[2], 10) || 0,
|
|
};
|
|
}).filter(g => !isNaN(g.index));
|
|
resolve(gpus);
|
|
}
|
|
);
|
|
});
|
|
}
|
|
|
|
async function selfHeartbeat() {
|
|
const load = os.loadavg()[0];
|
|
const total = os.totalmem();
|
|
const used = total - os.freemem();
|
|
const gpus = await detectGpus();
|
|
|
|
const capabilities = { gpus, blackmagic: [] };
|
|
|
|
pool.query(
|
|
`INSERT INTO cluster_nodes
|
|
(hostname, ip_address, role, version, api_url,
|
|
cpu_usage, mem_used_mb, mem_total_mb, capabilities, last_seen)
|
|
VALUES ($1,$2,'primary',$3,$4,$5,$6,$7,$8,NOW())
|
|
ON CONFLICT (hostname) DO UPDATE SET
|
|
ip_address = EXCLUDED.ip_address,
|
|
cpu_usage = EXCLUDED.cpu_usage,
|
|
mem_used_mb = EXCLUDED.mem_used_mb,
|
|
mem_total_mb = EXCLUDED.mem_total_mb,
|
|
capabilities = EXCLUDED.capabilities,
|
|
last_seen = NOW()`,
|
|
[
|
|
process.env.NODE_HOSTNAME || os.hostname(),
|
|
getLocalIp(),
|
|
process.env.npm_package_version || null,
|
|
`http://${getLocalIp()}:${PORT}`,
|
|
parseFloat(load.toFixed(2)),
|
|
Math.round(used / 1024 / 1024),
|
|
Math.round(total / 1024 / 1024),
|
|
JSON.stringify(capabilities),
|
|
]
|
|
).catch(err => console.error('[cluster] heartbeat failed:', err.message));
|
|
}
|
|
|
|
setInterval(selfHeartbeat, 30_000);
|
|
selfHeartbeat();
|
|
|
|
const server = app.listen(PORT, () => {
|
|
const authMode = process.env.AUTH_ENABLED === 'true' ? 'ENABLED' : 'DISABLED — dev mode (dev user attached to every request)';
|
|
console.log(`MAM API listening on port ${PORT}`);
|
|
console.log(`Authentication: ${authMode}`);
|
|
if (process.env.AUTH_ENABLED === 'true' && process.env.TRUST_PROXY !== 'true') {
|
|
console.warn('[auth] WARNING: AUTH_ENABLED=true but TRUST_PROXY=false — req.ip will be the proxy IP, login rate-limit will throttle all clients together. Set TRUST_PROXY=true when behind nginx/HTTPS.');
|
|
}
|
|
// Boot the recorder scheduler tick loop after the HTTP server is live so
|
|
// the loop's self-calls to /recorders/:id/start|stop reach a ready socket.
|
|
startSchedulerLoop();
|
|
|
|
// Boot the temp-segment cleanup loop (runs hourly).
|
|
startCleanupLoop();
|
|
});
|
|
|
|
// Issue #100 — graceful shutdown. Without this, `docker stop` (SIGTERM) killed
|
|
// the process mid-scheduler-tick, leaving Redis connections and Docker
|
|
// sockets dangling and producing partial DB writes. Now: stop the scheduler,
|
|
// finish in-flight HTTP requests, close PG/Redis pools, and exit cleanly
|
|
// (or hard-exit after 25 s if something is stuck).
|
|
let _shuttingDown = false;
|
|
async function gracefulShutdown(signal) {
|
|
if (_shuttingDown) return;
|
|
_shuttingDown = true;
|
|
console.log(`[shutdown] received ${signal} — closing gracefully…`);
|
|
|
|
// Stop accepting new requests + wind down the scheduler tick.
|
|
try { stopSchedulerLoop(); } catch (_) {}
|
|
|
|
// Force-exit watchdog so a hung connection can't keep us alive forever.
|
|
const killSwitch = setTimeout(() => {
|
|
console.error('[shutdown] forced exit after 25s timeout');
|
|
process.exit(1);
|
|
}, 25_000);
|
|
killSwitch.unref();
|
|
|
|
// Stop the HTTP server (waits for in-flight requests to finish).
|
|
await new Promise(resolve => server.close(resolve));
|
|
|
|
// Close DB pool + S3 client + any other resources. Best-effort.
|
|
try { await pool.end(); } catch (e) { console.warn('[shutdown] pool.end:', e.message); }
|
|
|
|
console.log('[shutdown] clean exit');
|
|
process.exit(0);
|
|
}
|
|
|
|
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
|
|
process.on('SIGINT', () => gracefulShutdown('SIGINT'));
|
|
process.on('uncaughtException', (err) => {
|
|
console.error('[fatal] uncaughtException:', err);
|
|
gracefulShutdown('uncaughtException');
|
|
});
|
|
process.on('unhandledRejection', (reason) => {
|
|
console.error('[fatal] unhandledRejection:', reason);
|
|
});
|