dragonflight/services/mam-api/src/scheduler.js
opencode 04ce096e67 chore: 1.2 ship-prep sweep — close 38 issues
Frontend / UX / a11y
- Sidebar collapse/expand toggle with localStorage persistence (#142)
- Settings sections wrap inputs in <form> with Enter-to-submit + native
  validation; password autocomplete=new-password (#141, #138)
- Asset thumbnails get descriptive alt text (#140)
- Production deploy now precompiles JSX via esbuild and loads the
  production React UMD instead of dev builds + in-browser Babel (#139,
  #122)
- Search wrapper gets role=search; global search input gets aria-label,
  role=combobox, aria-controls/aria-expanded/aria-activedescendant
  wiring (#137, #135)
- Dashboard and Library no longer share the same nav icon (#136)
- Sidebar collapses off-canvas with a topbar menu button below 768 px;
  mobile default is collapsed (#134)
- --text-3 bumped to #8B92A0 for WCAG AA contrast on --bg-0 (#133)
- Schedule and Library routes were rendering empty inside the .main
  flex container — switched to flex:1 + min-height:0 (#131, #132,
  editor + asset detail get the same fix)
- Jobs nav badge now polls /jobs?status=active every 10 s and reflects
  the live count (#130, #113)
- aria-label sweep on every icon-only button (#126)
- Premiere panel release list moved to window.PREMIERE_RELEASES in
  data.jsx; Editor + Settings read from the same source (#125)
- Typo setPgMclips → setPgmClips (#124)
- Stray console.error / console.warn calls gated behind
  window.DF_LOG.{warn,error} (#123)
- Hardcoded /api/v1 paths route through window.ZAMPP_API_PREFIX (#115)
- Schedule rows no longer crash on null recorder_id (#117)
- EditorKeyboard guards against document.activeElement === null (#116)
- Unmount-safe timers for PasswordResetModal, Containers, Editor (#111)
- Player seek clamps below totalMs, server-side range clamping +
  uncached 416 on EOF, client-side EOF-stall watchdog (#143)
- Duration badge overlap fix on narrow asset cards (#52)

Backend / security / reliability
- GET /recorders fixed N+1: single LATERAL JOIN for live_asset_id;
  Docker inspects bounded to actually-recording rows (#121)
- Upload disk-storage (multer.diskStorage) streams parts to S3 instead
  of buffering 500 MB in RAM (#120)
- /assets list clamps limit to MAX_LIMIT=500 to prevent OOM (#119)
- SDK upload archive listing + post-extract sanitize block zip-slip /
  tar-slip and symlink escapes (#118)
- Migrations track applied state in schema_migrations, run in a
  transaction, and exit non-zero on failure (#107)
- node-agent BMD_COUNT override uses BMD_DEVICE_PREFIX; filesystem
  detection wins (#109, #127)
- GPU_COUNT override now merges with nvidia-smi enrichment (#108)
- /cluster/heartbeat requires a node-bound token or admin user;
  tokens carry bound_hostname (#106)
- /recorders/:id/start error responses no longer echo the Docker
  create payload — env vars stay out of client responses (#105)
- /recorders/probe restricts schemes (srt/rtmp/rtsp/udp/rtp), blocks
  private + loopback hosts for non-admins, denies common service
  ports (#104)
- Scheduler tick guarded by a Postgres advisory lock; pending/running
  rows claimed via UPDATE...RETURNING + FOR UPDATE SKIP LOCKED to
  survive multi-node deploys (#103)
- UUID validateUuid('id') param middleware on every /:id route (#102)
- Error handler scrubs Postgres error messages and 5xx detail (#101)
- Graceful SIGTERM/SIGINT shutdown — stops scheduler, drains the HTTP
  server, ends the pool, 25 s force-exit watchdog (#100)
- AMPP sync moved from fire-and-forget to a persisted retry queue
  (ampp_sync_status / attempts / next_attempt_at + scheduler retry
  loop with exponential backoff) (#77)

Migrations
- 019: api_tokens.bound_hostname (#106)
- 020: assets.ampp_sync_status + retry bookkeeping (#77)

Other
- Defer #92 Growing-files per-upload toggle, #80 Audio tab, #57
  Dashboard redesign, #56 Editor SPA polish phase 3, #114 S3
  migration tool to v1.3
2026-05-27 02:06:14 +00:00

215 lines
8.3 KiB
JavaScript

// Scheduler tick — every TICK_INTERVAL_MS scan the recorder_schedules table
// and transition any rows whose window opens or closes. The actual recorder
// start/stop is delegated to the existing /recorders/:id/start|stop routes
// via an in-process HTTP call, so we reuse all of the existing container
// orchestration, growing-files handling, asset row creation, etc.
//
// On schedule completion: a 'daily' or 'weekly' recurring schedule is cloned
// forward by 1 day / 7 days into a new 'pending' row.
import pool from './db/pool.js';
import { syncToAmpp } from './routes/upload.js';
const TICK_INTERVAL_MS = parseInt(process.env.SCHEDULER_TICK_MS || '15000', 10);
const SELF_URL = process.env.MAM_API_SELF_URL || `http://127.0.0.1:${process.env.PORT || 3000}`;
let _tickRunning = false;
let _interval = null;
async function callSelf(path, method = 'POST') {
const res = await fetch(`${SELF_URL}${path}`, {
method,
headers: { 'Content-Type': 'application/json' },
signal: AbortSignal.timeout(30000),
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`${method} ${path} → HTTP ${res.status}: ${text.slice(0, 200)}`);
}
return res.json().catch(() => ({}));
}
// Issue #103 — every mam-api replica runs the same tick on the same interval,
// so a multi-node deploy would double-fire recorder starts/stops. We guard
// the whole tick with a PG advisory lock (1 = scheduler) so exactly one
// replica processes a given interval. Pure-Postgres, no extra infra.
const SCHEDULER_LOCK_KEY = 8210301; // arbitrary, must be stable across replicas
async function tryAcquireSchedulerLock(client) {
const r = await client.query('SELECT pg_try_advisory_lock($1) AS got', [SCHEDULER_LOCK_KEY]);
return !!r.rows[0]?.got;
}
async function releaseSchedulerLock(client) {
await client.query('SELECT pg_advisory_unlock($1)', [SCHEDULER_LOCK_KEY]).catch(() => {});
}
async function tick() {
if (_tickRunning) return;
_tickRunning = true;
const client = await pool.connect();
let haveLock = false;
try {
haveLock = await tryAcquireSchedulerLock(client);
if (!haveLock) {
// Another replica is processing this interval — bail silently.
return;
}
// 1) Atomically claim pending schedules whose window has opened. The
// UPDATE...RETURNING flips status to 'running' in the same statement
// so even if another replica got past the lock (it can't, but
// belt-and-braces) each row can only be claimed once.
const dueStart = await client.query(
`UPDATE recorder_schedules
SET status = 'starting', updated_at = NOW()
WHERE id IN (
SELECT id FROM recorder_schedules
WHERE status = 'pending' AND start_at <= NOW() AND end_at > NOW()
ORDER BY start_at ASC
FOR UPDATE SKIP LOCKED
)
RETURNING *`
);
for (const s of dueStart.rows) {
try {
const result = await callSelf(`/api/v1/recorders/${s.recorder_id}/start`);
await client.query(
`UPDATE recorder_schedules
SET status = 'running', last_asset_id = NULL, updated_at = NOW()
WHERE id = $1`,
[s.id]
);
console.log(`[scheduler] started schedule "${s.name}" on recorder ${s.recorder_id} (session=${result.current_session_id || '?'})`);
} catch (err) {
await client.query(
`UPDATE recorder_schedules
SET status = 'failed', error_message = $2, updated_at = NOW()
WHERE id = $1`,
[s.id, err.message.slice(0, 500)]
);
console.error(`[scheduler] start failed for schedule ${s.id}: ${err.message}`);
}
}
// 2) Atomically claim running schedules whose window has closed.
const dueStop = await client.query(
`UPDATE recorder_schedules
SET status = 'stopping', updated_at = NOW()
WHERE id IN (
SELECT id FROM recorder_schedules
WHERE status = 'running' AND end_at <= NOW()
ORDER BY end_at ASC
FOR UPDATE SKIP LOCKED
)
RETURNING *`
);
for (const s of dueStop.rows) {
try {
await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`);
await pool.query(
`UPDATE recorder_schedules SET status = 'completed', updated_at = NOW()
WHERE id = $1`,
[s.id]
);
console.log(`[scheduler] stopped schedule "${s.name}" on recorder ${s.recorder_id}`);
await enqueueNextOccurrence(s, client);
} catch (err) {
// Stop failed — flag as failed but don't keep trying forever.
await client.query(
`UPDATE recorder_schedules
SET status = 'failed', error_message = $2, updated_at = NOW()
WHERE id = $1`,
[s.id, ('stop: ' + err.message).slice(0, 500)]
);
console.error(`[scheduler] stop failed for schedule ${s.id}: ${err.message}`);
}
}
// 3) If a schedule was cancelled while running, stop the recorder.
const cancelledRunning = await client.query(
`SELECT s.* FROM recorder_schedules s
JOIN recorders r ON r.id = s.recorder_id
WHERE s.status = 'cancelled' AND r.status = 'recording'
AND s.updated_at > NOW() - INTERVAL '5 minutes'`
);
for (const s of cancelledRunning.rows) {
try {
await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`);
console.log(`[scheduler] cancelled schedule "${s.name}" — stopped recorder ${s.recorder_id}`);
} catch (err) {
console.warn(`[scheduler] cancel-stop failed for ${s.id}: ${err.message}`);
}
}
// 4) Mark stale live assets as 'error' (#66).
// If a capture container crashes without calling mark-empty/mark-complete,
// the asset row stays status='live' indefinitely. Timeout after 2 hours.
const LIVE_TIMEOUT_MINUTES = parseInt(process.env.LIVE_ASSET_TIMEOUT_MINUTES || '120', 10);
const staleResult = await client.query(
`UPDATE assets
SET status = 'error',
updated_at = NOW()
WHERE status = 'live'
AND created_at < NOW() - ($1 || ' minutes')::INTERVAL
RETURNING id, display_name`,
[LIVE_TIMEOUT_MINUTES]
);
if (staleResult.rows.length > 0) {
for (const row of staleResult.rows) {
console.warn(`[scheduler] marked stale live asset as error: ${row.id} (${row.display_name})`);
}
}
// 5) AMPP sync retry (#77). Pick up any pending/failed rows whose
// next-attempt time has arrived and retry them. Cap per tick so we
// don't burn budget on a single rough interval.
const ampps = await client.query(
`SELECT id, project_id, bin_id FROM assets
WHERE ampp_sync_status IN ('pending', 'failed')
AND (ampp_sync_next_attempt_at IS NULL OR ampp_sync_next_attempt_at <= NOW())
AND ampp_sync_attempts < 8
ORDER BY ampp_sync_next_attempt_at NULLS FIRST
LIMIT 25`
);
for (const row of ampps.rows) {
await syncToAmpp(row.id, row.project_id, row.bin_id);
}
} catch (err) {
console.error('[scheduler] tick error:', err);
} finally {
if (haveLock) await releaseSchedulerLock(client);
client.release();
_tickRunning = false;
}
}
async function enqueueNextOccurrence(schedule, client) {
if (schedule.recurrence === 'none') return;
const days = schedule.recurrence === 'weekly' ? 7 : 1;
const start = new Date(schedule.start_at);
const end = new Date(schedule.end_at);
start.setUTCDate(start.getUTCDate() + days);
end.setUTCDate(end.getUTCDate() + days);
const q = client || pool;
await q.query(
`INSERT INTO recorder_schedules
(name, recorder_id, start_at, end_at, recurrence, status)
VALUES ($1, $2, $3, $4, $5, 'pending')`,
[schedule.name, schedule.recorder_id, start.toISOString(), end.toISOString(), schedule.recurrence]
);
console.log(`[scheduler] queued next "${schedule.name}" → ${start.toISOString()}`);
}
export function startSchedulerLoop() {
if (_interval) return;
console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`);
// Fire once on startup so a window that opened while the API was down
// doesn't have to wait a full interval.
setTimeout(() => tick().catch(() => {}), 2000);
_interval = setInterval(() => tick().catch(() => {}), TICK_INTERVAL_MS);
}
export function stopSchedulerLoop() {
if (_interval) { clearInterval(_interval); _interval = null; }
}