// Scheduler tick — every TICK_INTERVAL_MS scan the recorder_schedules table // and transition any rows whose window opens or closes. The actual recorder // start/stop is delegated to the existing /recorders/:id/start|stop routes // via an in-process HTTP call, so we reuse all of the existing container // orchestration, growing-files handling, asset row creation, etc. // // On schedule completion: a 'daily' or 'weekly' recurring schedule is cloned // forward by 1 day / 7 days into a new 'pending' row. import pool from './db/pool.js'; import { syncToAmpp } from './routes/upload.js'; const TICK_INTERVAL_MS = parseInt(process.env.SCHEDULER_TICK_MS || '15000', 10); const SELF_URL = process.env.MAM_API_SELF_URL || `http://127.0.0.1:${process.env.PORT || 3000}`; let _tickRunning = false; let _interval = null; async function callSelf(path, method = 'POST') { const res = await fetch(`${SELF_URL}${path}`, { method, headers: { 'Content-Type': 'application/json' }, signal: AbortSignal.timeout(30000), }); if (!res.ok) { const text = await res.text().catch(() => ''); throw new Error(`${method} ${path} → HTTP ${res.status}: ${text.slice(0, 200)}`); } return res.json().catch(() => ({})); } // Issue #103 — every mam-api replica runs the same tick on the same interval, // so a multi-node deploy would double-fire recorder starts/stops. We guard // the whole tick with a PG advisory lock (1 = scheduler) so exactly one // replica processes a given interval. Pure-Postgres, no extra infra. const SCHEDULER_LOCK_KEY = 8210301; // arbitrary, must be stable across replicas async function tryAcquireSchedulerLock(client) { const r = await client.query('SELECT pg_try_advisory_lock($1) AS got', [SCHEDULER_LOCK_KEY]); return !!r.rows[0]?.got; } async function releaseSchedulerLock(client) { await client.query('SELECT pg_advisory_unlock($1)', [SCHEDULER_LOCK_KEY]).catch(() => {}); } async function tick() { if (_tickRunning) return; _tickRunning = true; const client = await pool.connect(); let haveLock = false; try { haveLock = await tryAcquireSchedulerLock(client); if (!haveLock) { // Another replica is processing this interval — bail silently. return; } // 1) Atomically claim pending schedules whose window has opened. The // UPDATE...RETURNING flips status to 'running' in the same statement // so even if another replica got past the lock (it can't, but // belt-and-braces) each row can only be claimed once. const dueStart = await client.query( `UPDATE recorder_schedules SET status = 'starting', updated_at = NOW() WHERE id IN ( SELECT id FROM recorder_schedules WHERE status = 'pending' AND start_at <= NOW() AND end_at > NOW() ORDER BY start_at ASC FOR UPDATE SKIP LOCKED ) RETURNING *` ); for (const s of dueStart.rows) { try { const result = await callSelf(`/api/v1/recorders/${s.recorder_id}/start`); await client.query( `UPDATE recorder_schedules SET status = 'running', last_asset_id = NULL, updated_at = NOW() WHERE id = $1`, [s.id] ); console.log(`[scheduler] started schedule "${s.name}" on recorder ${s.recorder_id} (session=${result.current_session_id || '?'})`); } catch (err) { await client.query( `UPDATE recorder_schedules SET status = 'failed', error_message = $2, updated_at = NOW() WHERE id = $1`, [s.id, err.message.slice(0, 500)] ); console.error(`[scheduler] start failed for schedule ${s.id}: ${err.message}`); } } // 2) Atomically claim running schedules whose window has closed. const dueStop = await client.query( `UPDATE recorder_schedules SET status = 'stopping', updated_at = NOW() WHERE id IN ( SELECT id FROM recorder_schedules WHERE status = 'running' AND end_at <= NOW() ORDER BY end_at ASC FOR UPDATE SKIP LOCKED ) RETURNING *` ); for (const s of dueStop.rows) { try { await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`); await pool.query( `UPDATE recorder_schedules SET status = 'completed', updated_at = NOW() WHERE id = $1`, [s.id] ); console.log(`[scheduler] stopped schedule "${s.name}" on recorder ${s.recorder_id}`); await enqueueNextOccurrence(s, client); } catch (err) { // Stop failed — flag as failed but don't keep trying forever. await client.query( `UPDATE recorder_schedules SET status = 'failed', error_message = $2, updated_at = NOW() WHERE id = $1`, [s.id, ('stop: ' + err.message).slice(0, 500)] ); console.error(`[scheduler] stop failed for schedule ${s.id}: ${err.message}`); } } // 3) If a schedule was cancelled while running, stop the recorder. const cancelledRunning = await client.query( `SELECT s.* FROM recorder_schedules s JOIN recorders r ON r.id = s.recorder_id WHERE s.status = 'cancelled' AND r.status = 'recording' AND s.updated_at > NOW() - INTERVAL '5 minutes'` ); for (const s of cancelledRunning.rows) { try { await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`); console.log(`[scheduler] cancelled schedule "${s.name}" — stopped recorder ${s.recorder_id}`); } catch (err) { console.warn(`[scheduler] cancel-stop failed for ${s.id}: ${err.message}`); } } // 4) Mark stale live assets as 'error' (#66). // If a capture container crashes without calling mark-empty/mark-complete, // the asset row stays status='live' indefinitely. Timeout after 2 hours. const LIVE_TIMEOUT_MINUTES = parseInt(process.env.LIVE_ASSET_TIMEOUT_MINUTES || '120', 10); const staleResult = await client.query( `UPDATE assets SET status = 'error', updated_at = NOW() WHERE status = 'live' AND created_at < NOW() - ($1 || ' minutes')::INTERVAL RETURNING id, display_name`, [LIVE_TIMEOUT_MINUTES] ); if (staleResult.rows.length > 0) { for (const row of staleResult.rows) { console.warn(`[scheduler] marked stale live asset as error: ${row.id} (${row.display_name})`); } } // 5) AMPP sync retry (#77). Pick up any pending/failed rows whose // next-attempt time has arrived and retry them. Cap per tick so we // don't burn budget on a single rough interval. const ampps = await client.query( `SELECT id, project_id, bin_id FROM assets WHERE ampp_sync_status IN ('pending', 'failed') AND (ampp_sync_next_attempt_at IS NULL OR ampp_sync_next_attempt_at <= NOW()) AND ampp_sync_attempts < 8 ORDER BY ampp_sync_next_attempt_at NULLS FIRST LIMIT 25` ); for (const row of ampps.rows) { await syncToAmpp(row.id, row.project_id, row.bin_id); } } catch (err) { console.error('[scheduler] tick error:', err); } finally { if (haveLock) await releaseSchedulerLock(client); client.release(); _tickRunning = false; } } async function enqueueNextOccurrence(schedule, client) { if (schedule.recurrence === 'none') return; const days = schedule.recurrence === 'weekly' ? 7 : 1; const start = new Date(schedule.start_at); const end = new Date(schedule.end_at); start.setUTCDate(start.getUTCDate() + days); end.setUTCDate(end.getUTCDate() + days); const q = client || pool; await q.query( `INSERT INTO recorder_schedules (name, recorder_id, start_at, end_at, recurrence, status) VALUES ($1, $2, $3, $4, $5, 'pending')`, [schedule.name, schedule.recorder_id, start.toISOString(), end.toISOString(), schedule.recurrence] ); console.log(`[scheduler] queued next "${schedule.name}" → ${start.toISOString()}`); } export function startSchedulerLoop() { if (_interval) return; console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`); // Fire once on startup so a window that opened while the API was down // doesn't have to wait a full interval. setTimeout(() => tick().catch(() => {}), 2000); _interval = setInterval(() => tick().catch(() => {}), TICK_INTERVAL_MS); } export function stopSchedulerLoop() { if (_interval) { clearInterval(_interval); _interval = null; } }