import 'dotenv/config'; import express from 'express'; import cors from 'cors'; import session from 'express-session'; import ConnectPgSimple from 'connect-pg-simple'; import os from 'node:os'; import { exec } from 'node:child_process'; import pool from './db/pool.js'; import { errorHandler } from './middleware/errors.js'; import { loadS3ConfigFromDb } from './s3/client.js'; // Routes import authRouter from './routes/auth.js'; import assetsRouter from './routes/assets.js'; import projectsRouter from './routes/projects.js'; import binsRouter from './routes/bins.js'; import jobsRouter from './routes/jobs.js'; import captureRouter from './routes/capture.js'; import uploadRouter from './routes/upload.js'; import recordersRouter from './routes/recorders.js'; import settingsRouter from './routes/settings.js'; import amppRouter from './routes/ampp.js'; import usersRouter from './routes/users.js'; import groupsRouter from './routes/groups.js'; import tokensRouter from './routes/tokens.js'; import sequencesRouter from './routes/sequences.js'; import systemRouter from './routes/system.js'; import clusterRouter from './routes/cluster.js'; import sdkRouter from './routes/sdk.js'; import schedulesRouter from './routes/schedules.js'; import metricsRouter from './routes/metrics.js'; import commentsRouter from './routes/comments.js'; import importsRouter from './routes/imports.js'; import storageRouter from './routes/storage.js'; import { startSchedulerLoop, stopSchedulerLoop } from './scheduler.js'; import { startCleanupLoop } from './tasks/cleanupTempSegments.js'; const app = express(); const PORT = process.env.PORT || 3000; // ── Middleware ──────────────────────────────────────────────────────────────── // Trust the first proxy (nginx in front of us) so req.ip, req.secure, and // req.protocol reflect the real client request — required for both the // login rate-limiter's IP keying and `cookie.secure` cookie issuance. app.set('trust proxy', 1); app.use(cors({ origin: true, credentials: true })); app.use(express.json({ limit: '50mb' })); const PgSession = ConnectPgSimple(session); // Session security knobs. // // - `secure` is set from SESSION_COOKIE_SECURE (default: true when AUTH_ENABLED). // `trust proxy` above tells express-session that x-forwarded-proto can be // trusted, so it issues Secure cookies on HTTPS requests forwarded by // nginx/Cloudflare even though the proxy → mam-api hop is plain HTTP. // Set SESSION_COOKIE_SECURE=false explicitly for local-only HTTP testing. // - `sameSite: 'lax'` ships the cookie on top-level navigations (including // the post-login redirect from /login.html) but blocks cross-site POSTs. // - Renamed from default `connect.sid` to `df.sid` so it's obvious in DevTools. // - `rolling: true` refreshes maxAge on every request so an active user // doesn't get bounced to login after the 7-day TTL. const authEnabled = process.env.AUTH_ENABLED === 'true'; const SESSION_SECRET = process.env.SESSION_SECRET || (authEnabled ? (() => { throw new Error('SESSION_SECRET is required when AUTH_ENABLED=true'); })() : 'dev-only-not-for-production'); const SESSION_COOKIE_SECURE = process.env.SESSION_COOKIE_SECURE ? process.env.SESSION_COOKIE_SECURE === 'true' : authEnabled; // default: secure cookies whenever auth is on app.use( session({ name: 'df.sid', store: new PgSession({ pool, tableName: 'sessions', pruneSessionInterval: 3600, }), secret: SESSION_SECRET, resave: false, saveUninitialized: false, rolling: true, cookie: { secure: SESSION_COOKIE_SECURE, httpOnly: true, sameSite: 'lax', maxAge: 1000 * 60 * 60 * 24 * 7, // 7 days }, }) ); // ── Health (no auth) ────────────────────────────────────────────────────────── app.get('/health', (_req, res) => res.json({ status: 'ok' })); // ── API Routes ──────────────────────────────────────────────────────────────── app.use('/api/v1/auth', authRouter); app.use('/api/v1/assets', assetsRouter); app.use('/api/v1/projects', projectsRouter); app.use('/api/v1/bins', binsRouter); app.use('/api/v1/jobs', jobsRouter); app.use('/api/v1/capture', captureRouter); app.use('/api/v1/upload', uploadRouter); app.use('/api/v1/recorders', recordersRouter); app.use('/api/v1/settings', settingsRouter); app.use('/api/v1/ampp', amppRouter); app.use('/api/v1/users', usersRouter); app.use('/api/v1/groups', groupsRouter); app.use('/api/v1/tokens', tokensRouter); app.use('/api/v1/sequences', sequencesRouter); app.use('/api/v1/system', systemRouter); app.use('/api/v1/cluster', clusterRouter); app.use('/api/v1/sdk', sdkRouter); app.use('/api/v1/schedules', schedulesRouter); app.use('/api/v1/metrics', metricsRouter); app.use('/api/v1/assets/:assetId/comments', commentsRouter); app.use('/api/v1/imports', importsRouter); app.use('/api/v1/storage', storageRouter); // ── Error handler ───────────────────────────────────────────────────────────── app.use(errorHandler); // ── Start ──────────────────────────────────────────────────────────────────── import { readdirSync, readFileSync } from 'node:fs'; import { fileURLToPath } from 'node:url'; import { dirname, join } from 'node:path'; const __dirnameMig = dirname(fileURLToPath(import.meta.url)); async function runMigrations() { // Issue #107 — previously the loop swallowed errors and let the server boot // on a half-migrated schema. Now: track applied migrations in a table, run // every pending one inside a transaction, and exit non-zero on failure so // the orchestrator restarts (and so an operator notices) instead of serving // 500s for the next month. const dir = join(__dirnameMig, 'db', 'migrations'); let files = []; try { files = readdirSync(dir).filter(f => f.endsWith('.sql')).sort(); } catch { return; } await pool.query(` CREATE TABLE IF NOT EXISTS schema_migrations ( filename TEXT PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), checksum_sha TEXT ) `); // Allow forcing a re-run via env when iterating locally. const force = process.env.MIGRATIONS_FORCE === '1'; const allowFailures = process.env.MIGRATIONS_ALLOW_FAILURES === '1'; const appliedRes = await pool.query('SELECT filename FROM schema_migrations'); const applied = new Set(appliedRes.rows.map(r => r.filename)); for (const f of files) { if (!force && applied.has(f)) continue; const sql = readFileSync(join(dir, f), 'utf8'); const client = await pool.connect(); try { await client.query('BEGIN'); await client.query(sql); await client.query( `INSERT INTO schema_migrations (filename) VALUES ($1) ON CONFLICT (filename) DO UPDATE SET applied_at = NOW()`, [f] ); await client.query('COMMIT'); console.log('[migration] applied ' + f); } catch (err) { await client.query('ROLLBACK').catch(() => {}); console.error('[migration] FAILED ' + f + ': ' + err.message); client.release(); if (allowFailures) continue; // Hard fail — better to crash now than serve traffic on a broken schema. console.error('[migration] aborting startup. Set MIGRATIONS_ALLOW_FAILURES=1 to override.'); process.exit(1); } client.release(); } } await runMigrations(); // Load S3 config from DB so any settings saved via the Settings page override env vars await loadS3ConfigFromDb(); // ── Cluster self-heartbeat ──────────────────────────────────────────────────── function getLocalIp() { // Prefer an explicit override — useful when running inside Docker where // os.networkInterfaces() returns container bridge IPs, not the host LAN IP. if (process.env.NODE_IP) return process.env.NODE_IP; const ifaces = os.networkInterfaces(); for (const name of Object.keys(ifaces)) { for (const iface of (ifaces[name] || [])) { if (iface.family === 'IPv4' && !iface.internal) return iface.address; } } return '127.0.0.1'; } // Detect NVIDIA GPUs available to this container via nvidia-smi. // Returns an array like [{ index: 0, name: 'Tesla P4', memory_mb: 7680 }, ...] // or an empty array if nvidia-smi is unavailable or no GPUs found. function detectGpus() { return new Promise(resolve => { exec( 'nvidia-smi --query-gpu=index,name,memory.total --format=csv,noheader,nounits', { timeout: 5000 }, (err, stdout) => { if (err || !stdout.trim()) return resolve([]); const gpus = stdout.trim().split('\n').map(line => { const parts = line.split(',').map(s => s.trim()); return { index: parseInt(parts[0], 10), name: parts[1] || 'Unknown GPU', memory_mb: parseInt(parts[2], 10) || 0, }; }).filter(g => !isNaN(g.index)); resolve(gpus); } ); }); } async function selfHeartbeat() { const load = os.loadavg()[0]; const total = os.totalmem(); const used = total - os.freemem(); const gpus = await detectGpus(); const capabilities = { gpus, blackmagic: [] }; pool.query( `INSERT INTO cluster_nodes (hostname, ip_address, role, version, api_url, cpu_usage, mem_used_mb, mem_total_mb, capabilities, last_seen) VALUES ($1,$2,'primary',$3,$4,$5,$6,$7,$8,NOW()) ON CONFLICT (hostname) DO UPDATE SET ip_address = EXCLUDED.ip_address, cpu_usage = EXCLUDED.cpu_usage, mem_used_mb = EXCLUDED.mem_used_mb, mem_total_mb = EXCLUDED.mem_total_mb, capabilities = EXCLUDED.capabilities, last_seen = NOW()`, [ process.env.NODE_HOSTNAME || os.hostname(), getLocalIp(), process.env.npm_package_version || null, `http://${getLocalIp()}:${PORT}`, parseFloat(load.toFixed(2)), Math.round(used / 1024 / 1024), Math.round(total / 1024 / 1024), JSON.stringify(capabilities), ] ).catch(err => console.error('[cluster] heartbeat failed:', err.message)); } setInterval(selfHeartbeat, 30_000); selfHeartbeat(); const server = app.listen(PORT, () => { const authMode = process.env.AUTH_ENABLED === 'true' ? 'ENABLED' : 'DISABLED (set AUTH_ENABLED=true for production)'; console.log(`MAM API listening on port ${PORT}`); console.log(`Authentication: ${authMode}`); // Boot the recorder scheduler tick loop after the HTTP server is live so // the loop's self-calls to /recorders/:id/start|stop reach a ready socket. startSchedulerLoop(); // Boot the temp-segment cleanup loop (runs hourly). startCleanupLoop(); }); // Issue #100 — graceful shutdown. Without this, `docker stop` (SIGTERM) killed // the process mid-scheduler-tick, leaving Redis connections and Docker // sockets dangling and producing partial DB writes. Now: stop the scheduler, // finish in-flight HTTP requests, close PG/Redis pools, and exit cleanly // (or hard-exit after 25 s if something is stuck). let _shuttingDown = false; async function gracefulShutdown(signal) { if (_shuttingDown) return; _shuttingDown = true; console.log(`[shutdown] received ${signal} — closing gracefully…`); // Stop accepting new requests + wind down the scheduler tick. try { stopSchedulerLoop(); } catch (_) {} // Force-exit watchdog so a hung connection can't keep us alive forever. const killSwitch = setTimeout(() => { console.error('[shutdown] forced exit after 25s timeout'); process.exit(1); }, 25_000); killSwitch.unref(); // Stop the HTTP server (waits for in-flight requests to finish). await new Promise(resolve => server.close(resolve)); // Close DB pool + S3 client + any other resources. Best-effort. try { await pool.end(); } catch (e) { console.warn('[shutdown] pool.end:', e.message); } console.log('[shutdown] clean exit'); process.exit(0); } process.on('SIGTERM', () => gracefulShutdown('SIGTERM')); process.on('SIGINT', () => gracefulShutdown('SIGINT')); process.on('uncaughtException', (err) => { console.error('[fatal] uncaughtException:', err); gracefulShutdown('uncaughtException'); }); process.on('unhandledRejection', (reason) => { console.error('[fatal] unhandledRejection:', reason); });