import 'dotenv/config'; import express from 'express'; import cors from 'cors'; import session from 'express-session'; import connectPgSimple from 'connect-pg-simple'; const PgStore = connectPgSimple(session); import os from 'node:os'; import { exec } from 'node:child_process'; import pool from './db/pool.js'; import { errorHandler } from './middleware/errors.js'; import { requireAuth } from './middleware/auth.js'; import { loadS3ConfigFromDb } from './s3/client.js'; // Routes import assetsRouter from './routes/assets.js'; import projectsRouter from './routes/projects.js'; import binsRouter from './routes/bins.js'; import jobsRouter from './routes/jobs.js'; import captureRouter from './routes/capture.js'; import uploadRouter from './routes/upload.js'; import recordersRouter from './routes/recorders.js'; import settingsRouter from './routes/settings.js'; import amppRouter from './routes/ampp.js'; import groupsRouter from './routes/groups.js'; import sequencesRouter from './routes/sequences.js'; import systemRouter from './routes/system.js'; import clusterRouter from './routes/cluster.js'; import sdkRouter from './routes/sdk.js'; import schedulesRouter from './routes/schedules.js'; import metricsRouter from './routes/metrics.js'; import commentsRouter from './routes/comments.js'; import importsRouter from './routes/imports.js'; import storageRouter from './routes/storage.js'; import { startSchedulerLoop, stopSchedulerLoop } from './scheduler.js'; import { startCleanupLoop } from './tasks/cleanupTempSegments.js'; const app = express(); const PORT = process.env.PORT || 3000; // ── Middleware ──────────────────────────────────────────────────────────────── // Tightened CORS — once cookies carry authority, `origin: true` would let // any site forge requests with the cookie. Drive the allowlist from env. const allowedOrigins = (process.env.ALLOWED_ORIGINS || '') .split(',').map(s => s.trim()).filter(Boolean); app.use(cors({ origin: (origin, cb) => { // No Origin header (same-origin or curl) — allow. if (!origin) return cb(null, true); if (allowedOrigins.length === 0 || allowedOrigins.includes(origin)) return cb(null, true); // Reject cleanly: omit the Allow-Origin header so the browser surfaces // a real CORS error instead of a 500 from a thrown Error in the callback. console.warn('[cors] rejected origin:', origin); return cb(null, false); }, credentials: true, })); app.use(express.json({ limit: '50mb' })); // Trust the reverse proxy only when explicitly told to (production HTTPS). if (process.env.TRUST_PROXY === 'true') app.set('trust proxy', 1); // Hard-fail when production-mode auth has no stable session secret. Without // this, express-session falls back to an in-memory random secret which // invalidates every session on restart and breaks multi-node deployments. if (process.env.AUTH_ENABLED === 'true' && !process.env.SESSION_SECRET) { console.error('[fatal] SESSION_SECRET is required when AUTH_ENABLED=true'); process.exit(1); } // Session — actually wired this time. See specs/2026-05-27-auth-system-design.md. app.use(session({ store: new PgStore({ pool, tableName: 'sessions', pruneSessionInterval: 60 * 15 /* seconds = 15 min */ }), secret: process.env.SESSION_SECRET, name: 'dragonflight.sid', cookie: { httpOnly: true, sameSite: 'lax', secure: process.env.TRUST_PROXY === 'true', path: '/', maxAge: 8 * 3600 * 1000, }, rolling: false, // sliding renewal handled in requireAuth so idle + absolute can be enforced separately resave: false, saveUninitialized: false, })); // ── Health ──────────────────────────────────────────────────────────────────── app.get('/health', (_req, res) => res.json({ status: 'ok' })); // ── Auth gate ───────────────────────────────────────────────────────────────── // req.path is relative to the /api/v1 mount, so /auth/login NOT /api/v1/auth/login. const UNAUTH_PATHS = new Set(['/auth/login', '/auth/setup', '/auth/setup-required']); // Service-auth carve-outs: node-agent uses migration 019's bound-hostname // api_token mechanism, not user auth. Today only /cluster/heartbeat is // reached without a user session — operator/UI endpoints in cluster.js // (containers restart, DELETE /:id, blackmagic device queries) ARE expected // to require auth. If node-agent grows another endpoint, add it here. // TODO: long-term, issue node-agent a real bound api_token and drop this carve-out. const SERVICE_PATHS = new Set(['/cluster/heartbeat']); app.use('/api/v1', (req, res, next) => { if (UNAUTH_PATHS.has(req.path)) return next(); if (SERVICE_PATHS.has(req.path)) return next(); return requireAuth(req, res, next); }); // ── API Routes ──────────────────────────────────────────────────────────────── app.use('/api/v1/assets', assetsRouter); app.use('/api/v1/projects', projectsRouter); app.use('/api/v1/bins', binsRouter); app.use('/api/v1/jobs', jobsRouter); app.use('/api/v1/capture', captureRouter); app.use('/api/v1/upload', uploadRouter); app.use('/api/v1/recorders', recordersRouter); app.use('/api/v1/settings', settingsRouter); app.use('/api/v1/ampp', amppRouter); app.use('/api/v1/groups', groupsRouter); app.use('/api/v1/sequences', sequencesRouter); app.use('/api/v1/system', systemRouter); app.use('/api/v1/cluster', clusterRouter); app.use('/api/v1/sdk', sdkRouter); app.use('/api/v1/schedules', schedulesRouter); app.use('/api/v1/metrics', metricsRouter); app.use('/api/v1/assets/:assetId/comments', commentsRouter); app.use('/api/v1/imports', importsRouter); app.use('/api/v1/storage', storageRouter); // ── Error handler ───────────────────────────────────────────────────────────── app.use(errorHandler); // ── Start ──────────────────────────────────────────────────────────────────── import { readdirSync, readFileSync } from 'node:fs'; import { fileURLToPath } from 'node:url'; import { dirname, join } from 'node:path'; const __dirnameMig = dirname(fileURLToPath(import.meta.url)); async function runMigrations() { // Issue #107 — previously the loop swallowed errors and let the server boot // on a half-migrated schema. Now: track applied migrations in a table, run // every pending one inside a transaction, and exit non-zero on failure so // the orchestrator restarts (and so an operator notices) instead of serving // 500s for the next month. const dir = join(__dirnameMig, 'db', 'migrations'); let files = []; try { files = readdirSync(dir).filter(f => f.endsWith('.sql')).sort(); } catch { return; } await pool.query(` CREATE TABLE IF NOT EXISTS schema_migrations ( filename TEXT PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), checksum_sha TEXT ) `); // Allow forcing a re-run via env when iterating locally. const force = process.env.MIGRATIONS_FORCE === '1'; const allowFailures = process.env.MIGRATIONS_ALLOW_FAILURES === '1'; const appliedRes = await pool.query('SELECT filename FROM schema_migrations'); const applied = new Set(appliedRes.rows.map(r => r.filename)); for (const f of files) { if (!force && applied.has(f)) continue; const sql = readFileSync(join(dir, f), 'utf8'); const client = await pool.connect(); try { await client.query('BEGIN'); await client.query(sql); await client.query( `INSERT INTO schema_migrations (filename) VALUES ($1) ON CONFLICT (filename) DO UPDATE SET applied_at = NOW()`, [f] ); await client.query('COMMIT'); console.log('[migration] applied ' + f); } catch (err) { await client.query('ROLLBACK').catch(() => {}); console.error('[migration] FAILED ' + f + ': ' + err.message); client.release(); if (allowFailures) continue; // Hard fail — better to crash now than serve traffic on a broken schema. console.error('[migration] aborting startup. Set MIGRATIONS_ALLOW_FAILURES=1 to override.'); process.exit(1); } client.release(); } } await runMigrations(); // Load S3 config from DB so any settings saved via the Settings page override env vars await loadS3ConfigFromDb(); // ── Cluster self-heartbeat ──────────────────────────────────────────────────── function getLocalIp() { // Prefer an explicit override — useful when running inside Docker where // os.networkInterfaces() returns container bridge IPs, not the host LAN IP. if (process.env.NODE_IP) return process.env.NODE_IP; const ifaces = os.networkInterfaces(); for (const name of Object.keys(ifaces)) { for (const iface of (ifaces[name] || [])) { if (iface.family === 'IPv4' && !iface.internal) return iface.address; } } return '127.0.0.1'; } // Detect NVIDIA GPUs available to this container via nvidia-smi. // Returns an array like [{ index: 0, name: 'Tesla P4', memory_mb: 7680 }, ...] // or an empty array if nvidia-smi is unavailable or no GPUs found. function detectGpus() { return new Promise(resolve => { exec( 'nvidia-smi --query-gpu=index,name,memory.total --format=csv,noheader,nounits', { timeout: 5000 }, (err, stdout) => { if (err || !stdout.trim()) return resolve([]); const gpus = stdout.trim().split('\n').map(line => { const parts = line.split(',').map(s => s.trim()); return { index: parseInt(parts[0], 10), name: parts[1] || 'Unknown GPU', memory_mb: parseInt(parts[2], 10) || 0, }; }).filter(g => !isNaN(g.index)); resolve(gpus); } ); }); } async function selfHeartbeat() { const load = os.loadavg()[0]; const total = os.totalmem(); const used = total - os.freemem(); const gpus = await detectGpus(); const capabilities = { gpus, blackmagic: [] }; pool.query( `INSERT INTO cluster_nodes (hostname, ip_address, role, version, api_url, cpu_usage, mem_used_mb, mem_total_mb, capabilities, last_seen) VALUES ($1,$2,'primary',$3,$4,$5,$6,$7,$8,NOW()) ON CONFLICT (hostname) DO UPDATE SET ip_address = EXCLUDED.ip_address, cpu_usage = EXCLUDED.cpu_usage, mem_used_mb = EXCLUDED.mem_used_mb, mem_total_mb = EXCLUDED.mem_total_mb, capabilities = EXCLUDED.capabilities, last_seen = NOW()`, [ process.env.NODE_HOSTNAME || os.hostname(), getLocalIp(), process.env.npm_package_version || null, `http://${getLocalIp()}:${PORT}`, parseFloat(load.toFixed(2)), Math.round(used / 1024 / 1024), Math.round(total / 1024 / 1024), JSON.stringify(capabilities), ] ).catch(err => console.error('[cluster] heartbeat failed:', err.message)); } setInterval(selfHeartbeat, 30_000); selfHeartbeat(); const server = app.listen(PORT, () => { const authMode = process.env.AUTH_ENABLED === 'true' ? 'ENABLED' : 'DISABLED (set AUTH_ENABLED=true for production)'; console.log(`MAM API listening on port ${PORT}`); console.log(`Authentication: ${authMode}`); // Boot the recorder scheduler tick loop after the HTTP server is live so // the loop's self-calls to /recorders/:id/start|stop reach a ready socket. startSchedulerLoop(); // Boot the temp-segment cleanup loop (runs hourly). startCleanupLoop(); }); // Issue #100 — graceful shutdown. Without this, `docker stop` (SIGTERM) killed // the process mid-scheduler-tick, leaving Redis connections and Docker // sockets dangling and producing partial DB writes. Now: stop the scheduler, // finish in-flight HTTP requests, close PG/Redis pools, and exit cleanly // (or hard-exit after 25 s if something is stuck). let _shuttingDown = false; async function gracefulShutdown(signal) { if (_shuttingDown) return; _shuttingDown = true; console.log(`[shutdown] received ${signal} — closing gracefully…`); // Stop accepting new requests + wind down the scheduler tick. try { stopSchedulerLoop(); } catch (_) {} // Force-exit watchdog so a hung connection can't keep us alive forever. const killSwitch = setTimeout(() => { console.error('[shutdown] forced exit after 25s timeout'); process.exit(1); }, 25_000); killSwitch.unref(); // Stop the HTTP server (waits for in-flight requests to finish). await new Promise(resolve => server.close(resolve)); // Close DB pool + S3 client + any other resources. Best-effort. try { await pool.end(); } catch (e) { console.warn('[shutdown] pool.end:', e.message); } console.log('[shutdown] clean exit'); process.exit(0); } process.on('SIGTERM', () => gracefulShutdown('SIGTERM')); process.on('SIGINT', () => gracefulShutdown('SIGINT')); process.on('uncaughtException', (err) => { console.error('[fatal] uncaughtException:', err); gracefulShutdown('uncaughtException'); }); process.on('unhandledRejection', (reason) => { console.error('[fatal] unhandledRejection:', reason); });