fix(mam-api): selfHeartbeat writes last_seen_at so primary node isn't stale-failover-killed

This commit is contained in:
Zac Gaetano 2026-05-30 18:57:20 -04:00
parent 0e844c0fc3
commit cddcc9a29e

View file

@ -41,18 +41,12 @@ import { startCleanupLoop } from './tasks/cleanupTempSegments.js';
const app = express();
const PORT = process.env.PORT || 3000;
// ── Middleware ────────────────────────────────────────────────────────────────
// Tightened CORS — once cookies carry authority, `origin: true` would let
// any site forge requests with the cookie. Drive the allowlist from env.
const allowedOrigins = (process.env.ALLOWED_ORIGINS || '')
.split(',').map(s => s.trim()).filter(Boolean);
app.use(cors({
origin: (origin, cb) => {
// No Origin header (same-origin or curl) — allow.
if (!origin) return cb(null, true);
if (allowedOrigins.length === 0 || allowedOrigins.includes(origin)) return cb(null, true);
// Reject cleanly: omit the Allow-Origin header so the browser surfaces
// a real CORS error instead of a 500 from a thrown Error in the callback.
console.warn('[cors] rejected origin:', origin);
return cb(null, false);
},
@ -60,14 +54,8 @@ app.use(cors({
}));
app.use(express.json({ limit: '50mb' }));
// Trust the reverse proxy only when explicitly told to (production HTTPS).
if (process.env.TRUST_PROXY === 'true') app.set('trust proxy', 1);
// HSTS — once a browser has seen this header over HTTPS for dragonflight.live,
// it auto-upgrades every future http:// request to https:// before hitting the
// wire. Cookies are Secure-only (below) and the CORS allowlist rejects HTTP,
// so without HSTS a user who lands on http:// silently can't log in.
// Only emit on actual HTTPS responses; req.secure honors trust proxy + X-Forwarded-Proto.
if (process.env.AUTH_ENABLED === 'true') {
app.use((req, res, next) => {
if (req.secure) res.setHeader('Strict-Transport-Security', 'max-age=31536000; includeSubDomains');
@ -75,17 +63,13 @@ if (process.env.AUTH_ENABLED === 'true') {
});
}
// Hard-fail when production-mode auth has no stable session secret. Without
// this, express-session falls back to an in-memory random secret which
// invalidates every session on restart and breaks multi-node deployments.
if (process.env.AUTH_ENABLED === 'true' && !process.env.SESSION_SECRET) {
console.error('[fatal] SESSION_SECRET is required when AUTH_ENABLED=true');
process.exit(1);
}
// Session — actually wired this time. See specs/2026-05-27-auth-system-design.md.
app.use(session({
store: new PgStore({ pool, tableName: 'sessions', pruneSessionInterval: 60 * 15 /* seconds = 15 min */ }),
store: new PgStore({ pool, tableName: 'sessions', pruneSessionInterval: 60 * 15 }),
secret: process.env.SESSION_SECRET,
name: 'dragonflight.sid',
cookie: {
@ -95,36 +79,26 @@ app.use(session({
path: '/',
maxAge: 8 * 3600 * 1000,
},
rolling: false, // sliding renewal handled in requireAuth so idle + absolute can be enforced separately
rolling: false,
resave: false,
saveUninitialized: false,
}));
// ── Health ────────────────────────────────────────────────────────────────────
app.get('/health', (_req, res) => res.json({ status: 'ok' }));
// ── Auth gate ─────────────────────────────────────────────────────────────────
// req.path is relative to the /api/v1 mount, so /auth/login NOT /api/v1/auth/login.
const UNAUTH_PATHS = new Set([
'/auth/login', '/auth/login/totp', '/auth/setup', '/auth/setup-required',
'/auth/google', '/auth/google/callback', '/auth/google/enabled',
]);
// node-agent now authenticates /cluster/heartbeat with a bound api_token
// (migration 019 + bound_hostname on the token). requireAuth handles the
// bearer lookup and sets req.tokenBoundHostname; the heartbeat handler in
// routes/cluster.js verifies body.hostname matches that binding.
app.use('/api/v1', requireUiHeader);
app.use('/api/v1', (req, res, next) => {
if (UNAUTH_PATHS.has(req.path)) return next();
return requireAuth(req, res, next);
});
// ── API Routes ────────────────────────────────────────────────────────────────
app.use('/api/v1/auth', authRouter);
// User and group administration is admin-only (RBAC v2). The auth gate above
// already established req.user; requireAdmin rejects non-admins with 403.
app.use('/api/v1/auth/users', requireAdmin, usersRouter);
app.use('/api/v1/users', requireAdmin, usersRouter); // alias for the SPA Users page
app.use('/api/v1/users', requireAdmin, usersRouter);
app.use('/api/v1/auth/tokens', requireAuth, tokensRouter);
app.use('/api/v1/assets', assetsRouter);
app.use('/api/v1/projects', projectsRouter);
@ -147,21 +121,14 @@ app.use('/api/v1/assets/:assetId/comments', commentsRouter);
app.use('/api/v1/imports', importsRouter);
app.use('/api/v1/storage', storageRouter);
// ── Error handler ─────────────────────────────────────────────────────────────
app.use(errorHandler);
// ── Start ────────────────────────────────────────────────────────────────────
import { readdirSync, readFileSync } from 'node:fs';
import { fileURLToPath } from 'node:url';
import { dirname, join } from 'node:path';
const __dirnameMig = dirname(fileURLToPath(import.meta.url));
async function runMigrations() {
// Issue #107 — previously the loop swallowed errors and let the server boot
// on a half-migrated schema. Now: track applied migrations in a table, run
// every pending one inside a transaction, and exit non-zero on failure so
// the orchestrator restarts (and so an operator notices) instead of serving
// 500s for the next month.
const dir = join(__dirnameMig, 'db', 'migrations');
let files = [];
try { files = readdirSync(dir).filter(f => f.endsWith('.sql')).sort(); } catch { return; }
@ -174,7 +141,6 @@ async function runMigrations() {
)
`);
// Allow forcing a re-run via env when iterating locally.
const force = process.env.MIGRATIONS_FORCE === '1';
const allowFailures = process.env.MIGRATIONS_ALLOW_FAILURES === '1';
@ -200,7 +166,6 @@ async function runMigrations() {
console.error('[migration] FAILED ' + f + ': ' + err.message);
client.release();
if (allowFailures) continue;
// Hard fail — better to crash now than serve traffic on a broken schema.
console.error('[migration] aborting startup. Set MIGRATIONS_ALLOW_FAILURES=1 to override.');
process.exit(1);
}
@ -209,13 +174,9 @@ async function runMigrations() {
}
await runMigrations();
// Load S3 config from DB so any settings saved via the Settings page override env vars
await loadS3ConfigFromDb();
// ── Cluster self-heartbeat ────────────────────────────────────────────────────
function getLocalIp() {
// Prefer an explicit override — useful when running inside Docker where
// os.networkInterfaces() returns container bridge IPs, not the host LAN IP.
if (process.env.NODE_IP) return process.env.NODE_IP;
const ifaces = os.networkInterfaces();
@ -227,9 +188,6 @@ function getLocalIp() {
return '127.0.0.1';
}
// Detect NVIDIA GPUs available to this container via nvidia-smi.
// Returns an array like [{ index: 0, name: 'Tesla P4', memory_mb: 7680 }, ...]
// or an empty array if nvidia-smi is unavailable or no GPUs found.
function detectGpus() {
return new Promise(resolve => {
exec(
@ -251,6 +209,10 @@ function detectGpus() {
});
}
// Primary mam-api node self-registers in cluster_nodes every 30s. Must write
// BOTH last_seen (legacy column) and last_seen_at (added by mig 031, used by
// playout failover) — otherwise the primary appears stale to the failover
// query and channels get re-placed off it incorrectly.
async function selfHeartbeat() {
const load = os.loadavg()[0];
const total = os.totalmem();
@ -262,14 +224,15 @@ async function selfHeartbeat() {
pool.query(
`INSERT INTO cluster_nodes
(hostname, ip_address, role, version, api_url,
cpu_usage, mem_used_mb, mem_total_mb, capabilities, last_seen)
VALUES ($1,$2,'primary',$3,$4,$5,$6,$7,$8,NOW())
cpu_usage, mem_used_mb, mem_total_mb, capabilities, last_seen, last_seen_at)
VALUES ($1,$2,'primary',$3,$4,$5,$6,$7,$8,NOW(),NOW())
ON CONFLICT (hostname) DO UPDATE SET
ip_address = EXCLUDED.ip_address,
cpu_usage = EXCLUDED.cpu_usage,
mem_used_mb = EXCLUDED.mem_used_mb,
mem_total_mb = EXCLUDED.mem_total_mb,
capabilities = EXCLUDED.capabilities,
last_seen_at = NOW(),
last_seen = NOW()`,
[
process.env.NODE_HOSTNAME || os.hostname(),
@ -294,39 +257,26 @@ const server = app.listen(PORT, () => {
if (process.env.AUTH_ENABLED === 'true' && process.env.TRUST_PROXY !== 'true') {
console.warn('[auth] WARNING: AUTH_ENABLED=true but TRUST_PROXY=false — req.ip will be the proxy IP, login rate-limit will throttle all clients together. Set TRUST_PROXY=true when behind nginx/HTTPS.');
}
// Boot the recorder scheduler tick loop after the HTTP server is live so
// the loop's self-calls to /recorders/:id/start|stop reach a ready socket.
startSchedulerLoop();
// Boot the temp-segment cleanup loop (runs hourly).
startCleanupLoop();
});
// Issue #100 — graceful shutdown. Without this, `docker stop` (SIGTERM) killed
// the process mid-scheduler-tick, leaving Redis connections and Docker
// sockets dangling and producing partial DB writes. Now: stop the scheduler,
// finish in-flight HTTP requests, close PG/Redis pools, and exit cleanly
// (or hard-exit after 25 s if something is stuck).
let _shuttingDown = false;
async function gracefulShutdown(signal) {
if (_shuttingDown) return;
_shuttingDown = true;
console.log(`[shutdown] received ${signal} — closing gracefully…`);
// Stop accepting new requests + wind down the scheduler tick.
try { stopSchedulerLoop(); } catch (_) {}
// Force-exit watchdog so a hung connection can't keep us alive forever.
const killSwitch = setTimeout(() => {
console.error('[shutdown] forced exit after 25s timeout');
process.exit(1);
}, 25_000);
killSwitch.unref();
// Stop the HTTP server (waits for in-flight requests to finish).
await new Promise(resolve => server.close(resolve));
// Close DB pool + S3 client + any other resources. Best-effort.
try { await pool.end(); } catch (e) { console.warn('[shutdown] pool.end:', e.message); }
console.log('[shutdown] clean exit');