// Scheduler tick — every TICK_INTERVAL_MS scan the recorder_schedules table // and transition any rows whose window opens or closes. The actual recorder // start/stop is delegated to the existing /recorders/:id/start|stop routes // via an in-process HTTP call, so we reuse all of the existing container // orchestration, growing-files handling, asset row creation, etc. // // On schedule completion: a 'daily' or 'weekly' recurring schedule is cloned // forward by 1 day / 7 days into a new 'pending' row. import pool from './db/pool.js'; import { syncToAmpp } from './routes/upload.js'; import { restartChannel } from './routes/playout.js'; import { INTERNAL_TOKEN } from './middleware/auth.js'; const TICK_INTERVAL_MS = parseInt(process.env.SCHEDULER_TICK_MS || '15000', 10); const SELF_URL = process.env.MAM_API_SELF_URL || `http://127.0.0.1:${process.env.PORT || 3000}`; let _tickRunning = false; let _interval = null; async function callSelf(path, method = 'POST') { const res = await fetch(`${SELF_URL}${path}`, { method, headers: { 'Content-Type': 'application/json', 'x-internal-token': INTERNAL_TOKEN, }, signal: AbortSignal.timeout(30000), }); if (!res.ok) { const text = await res.text().catch(() => ''); throw new Error(`${method} ${path} → HTTP ${res.status}: ${text.slice(0, 200)}`); } return res.json().catch(() => ({})); } // Issue #103 — every mam-api replica runs the same tick on the same interval, // so a multi-node deploy would double-fire recorder starts/stops. We guard // the whole tick with a PG advisory lock (1 = scheduler) so exactly one // replica processes a given interval. Pure-Postgres, no extra infra. const SCHEDULER_LOCK_KEY = 8210301; // arbitrary, must be stable across replicas async function tryAcquireSchedulerLock(client) { const r = await client.query('SELECT pg_try_advisory_lock($1) AS got', [SCHEDULER_LOCK_KEY]); return !!r.rows[0]?.got; } async function releaseSchedulerLock(client) { await client.query('SELECT pg_advisory_unlock($1)', [SCHEDULER_LOCK_KEY]).catch(() => {}); } async function tick() { if (_tickRunning) return; _tickRunning = true; const client = await pool.connect(); let haveLock = false; try { haveLock = await tryAcquireSchedulerLock(client); if (!haveLock) { // Another replica is processing this interval — bail silently. return; } // 1) Atomically claim pending schedules whose window has opened. The // UPDATE...RETURNING flips status to 'running' in the same statement // so even if another replica got past the lock (it can't, but // belt-and-braces) each row can only be claimed once. const dueStart = await client.query( `UPDATE recorder_schedules SET status = 'starting', updated_at = NOW() WHERE id IN ( SELECT id FROM recorder_schedules WHERE status = 'pending' AND start_at <= NOW() AND end_at > NOW() ORDER BY start_at ASC FOR UPDATE SKIP LOCKED ) RETURNING *` ); for (const s of dueStart.rows) { try { const result = await callSelf(`/api/v1/recorders/${s.recorder_id}/start`); await client.query( `UPDATE recorder_schedules SET status = 'running', last_asset_id = NULL, updated_at = NOW() WHERE id = $1`, [s.id] ); console.log(`[scheduler] started schedule "${s.name}" on recorder ${s.recorder_id} (session=${result.current_session_id || '?'})`); } catch (err) { await client.query( `UPDATE recorder_schedules SET status = 'failed', error_message = $2, updated_at = NOW() WHERE id = $1`, [s.id, err.message.slice(0, 500)] ); console.error(`[scheduler] start failed for schedule ${s.id}: ${err.message}`); } } // 2) Atomically claim running schedules whose window has closed. const dueStop = await client.query( `UPDATE recorder_schedules SET status = 'stopping', updated_at = NOW() WHERE id IN ( SELECT id FROM recorder_schedules WHERE status = 'running' AND end_at <= NOW() ORDER BY end_at ASC FOR UPDATE SKIP LOCKED ) RETURNING *` ); for (const s of dueStop.rows) { try { await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`); await pool.query( `UPDATE recorder_schedules SET status = 'completed', updated_at = NOW() WHERE id = $1`, [s.id] ); console.log(`[scheduler] stopped schedule "${s.name}" on recorder ${s.recorder_id}`); await enqueueNextOccurrence(s, client); } catch (err) { // Stop failed — flag as failed but don't keep trying forever. await client.query( `UPDATE recorder_schedules SET status = 'failed', error_message = $2, updated_at = NOW() WHERE id = $1`, [s.id, ('stop: ' + err.message).slice(0, 500)] ); console.error(`[scheduler] stop failed for schedule ${s.id}: ${err.message}`); } } // 3) If a schedule was cancelled while running, stop the recorder. const cancelledRunning = await client.query( `SELECT s.* FROM recorder_schedules s JOIN recorders r ON r.id = s.recorder_id WHERE s.status = 'cancelled' AND r.status = 'recording' AND s.updated_at > NOW() - INTERVAL '5 minutes'` ); for (const s of cancelledRunning.rows) { try { await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`); console.log(`[scheduler] cancelled schedule "${s.name}" — stopped recorder ${s.recorder_id}`); } catch (err) { console.warn(`[scheduler] cancel-stop failed for ${s.id}: ${err.message}`); } } // 4) Mark stale live assets as 'error' (#66). // If a capture container crashes without calling mark-empty/mark-complete, // the asset row stays status='live' indefinitely. Timeout after 2 hours. const LIVE_TIMEOUT_MINUTES = parseInt(process.env.LIVE_ASSET_TIMEOUT_MINUTES || '120', 10); const staleResult = await client.query( `UPDATE assets SET status = 'error', updated_at = NOW() WHERE status = 'live' AND created_at < NOW() - ($1 || ' minutes')::INTERVAL RETURNING id, display_name`, [LIVE_TIMEOUT_MINUTES] ); if (staleResult.rows.length > 0) { for (const row of staleResult.rows) { console.warn(`[scheduler] marked stale live asset as error: ${row.id} (${row.display_name})`); } } // 5) AMPP sync retry (#77). Pick up any pending/failed rows whose // next-attempt time has arrived and retry them. Cap per tick so we // don't burn budget on a single rough interval. const ampps = await client.query( `SELECT id, project_id, bin_id FROM assets WHERE ampp_sync_status IN ('pending', 'failed') AND (ampp_sync_next_attempt_at IS NULL OR ampp_sync_next_attempt_at <= NOW()) AND ampp_sync_attempts < 8 ORDER BY ampp_sync_next_attempt_at NULLS FIRST LIMIT 25` ); for (const row of ampps.rows) { await syncToAmpp(row.id, row.project_id, row.bin_id); } // 6) Playout channel health checks. Ping each running channel's sidecar // /status; on success bump last_heartbeat_at, on failure increment a // transient miss counter (in playout_sidecars.last_heartbeat_at age). // Three consecutive misses → auto-restart on a healthy node (non- // decklink), or alert-only for decklink. await playoutHealthTick(client); } catch (err) { console.error('[scheduler] tick error:', err); } finally { if (haveLock) await releaseSchedulerLock(client); client.release(); _tickRunning = false; } } async function enqueueNextOccurrence(schedule, client) { if (schedule.recurrence === 'none') return; const days = schedule.recurrence === 'weekly' ? 7 : 1; const start = new Date(schedule.start_at); const end = new Date(schedule.end_at); start.setUTCDate(start.getUTCDate() + days); end.setUTCDate(end.getUTCDate() + days); const q = client || pool; await q.query( `INSERT INTO recorder_schedules (name, recorder_id, start_at, end_at, recurrence, status) VALUES ($1, $2, $3, $4, $5, 'pending')`, [schedule.name, schedule.recorder_id, start.toISOString(), end.toISOString(), schedule.recurrence] ); console.log(`[scheduler] queued next "${schedule.name}" → ${start.toISOString()}`); } // ── Playout channel health + failover ──────────────────────────────────────── // Tick step 6. Reuses the same advisory lock so only one replica probes the // sidecars; multi-replica pings would just waste cycles. A missed probe is // counted via last_heartbeat_at age: > 3 * TICK_INTERVAL means 3 consecutive // misses. // Persist the as-run compliance log for one channel from a sidecar /status // payload. The sidecar reports the currently on-air item via currentItemId / // currentClip / currentItemStartedAt (playout-manager.getStatus). We keep at // most one "open" row (ended_at IS NULL) per channel: when the on-air item // changes (or playout stops) we close the open row — stamping ended_at and a // computed duration_s — and, if a new clip is on air, open a fresh row. // // playout_as_run columns (migration 029): id, channel_id, asset_id, item_id, // clip_name, started_at, ended_at, duration_s, result. async function writeAsRun(client, channelId, engine) { const currentItemId = engine && engine.currentItemId ? engine.currentItemId : null; // The currently-open as-run row for this channel, if any. const { rows: openRows } = await client.query( `SELECT id, item_id, started_at FROM playout_as_run WHERE channel_id = $1 AND ended_at IS NULL ORDER BY started_at DESC LIMIT 1`, [channelId] ); const open = openRows[0] || null; // Same clip still on air → nothing to do. if (open && currentItemId && open.item_id === currentItemId) return; // Nothing on air and nothing open → nothing to do. if (!open && !currentItemId) return; // Close the previous open row (clip changed, or playout stopped). if (open) { await client.query( `UPDATE playout_as_run SET ended_at = NOW(), duration_s = EXTRACT(EPOCH FROM (NOW() - started_at)) WHERE id = $1`, [open.id] ); } // Open a new row for the clip now on air. Resolve the item's asset_id so the // compliance log links back to the source asset even after the playlist item // is later deleted. if (currentItemId) { let assetId = null; try { const { rows } = await client.query( 'SELECT asset_id FROM playout_items WHERE id = $1', [currentItemId] ); if (rows.length > 0) assetId = rows[0].asset_id; } catch (_) { /* item may have been deleted; log without asset link */ } await client.query( `INSERT INTO playout_as_run (channel_id, asset_id, item_id, clip_name, started_at, result) VALUES ($1, $2, $3, $4, COALESCE($5::timestamptz, NOW()), 'played')`, [channelId, assetId, currentItemId, engine.currentClip || null, engine.currentItemStartedAt || null] ); } } async function playoutHealthTick(client) { let channels; try { ({ rows: channels } = await client.query( `SELECT id, output_type, container_meta, node_id, last_heartbeat_at, updated_at, restart_count FROM playout_channels WHERE status = 'running'` )); } catch (err) { // Migration 029 may not be applied yet — bail silently rather than crash. if (err.code === '42P01') return; throw err; } const TIMEOUT_MS = TICK_INTERVAL_MS * 3 + 5000; for (const ch of channels) { const sidecarUrl = ch.container_meta && ch.container_meta.sidecar_url ? ch.container_meta.sidecar_url : `http://playout-${ch.id}:3002`; try { const r = await fetch(`${sidecarUrl}/status`, { signal: AbortSignal.timeout(5000) }); if (!r.ok) throw new Error(`status HTTP ${r.status}`); await client.query( 'UPDATE playout_channels SET last_heartbeat_at = NOW() WHERE id = $1', [ch.id] ); // As-run compliance log: the sidecar only tracks the on-air clip locally // (playout-manager._reportAsRunStart). On every successful status poll we // detect a clip change here and persist it to playout_as_run — close the // previous open row and open a new one. Failures are swallowed so a logging // hiccup never knocks a healthy channel into failover. try { const engine = await r.json().catch(() => null); if (engine) await writeAsRun(client, ch.id, engine); } catch (e) { console.warn(`[scheduler] as-run write failed for ${ch.id}: ${e.message}`); } } catch (err) { // When last_heartbeat_at is NULL (channel just spawned), fall back to // updated_at (set to NOW() by spawnChannelSidecar). This prevents a // brand-new channel from being failed over on the very first tick because // epoch-0 age always exceeds TIMEOUT_MS. const baseline = ch.last_heartbeat_at || ch.updated_at; const lastSeen = baseline ? new Date(baseline).getTime() : Date.now(); const ageMs = Date.now() - lastSeen; if (ageMs < TIMEOUT_MS) continue; // not yet 3 misses if (ch.output_type === 'decklink') { await client.query( "UPDATE playout_channels SET status = 'error', error_message = $1 WHERE id = $2", [`sidecar unreachable (${err.message}); decklink channels require manual recovery`, ch.id] ); console.error(`[scheduler] decklink channel ${ch.id} unreachable — alert-only, no auto-failover`); continue; } console.warn(`[scheduler] failover: channel ${ch.id} unreachable (${err.message}), restart #${ch.restart_count + 1}`); try { // restartChannel re-places the channel on a healthy node AND spawns the // new sidecar directly (shared helper) — no /start self-call needed. const res = await restartChannel(ch.id); if (res.restarted) { console.log(`[scheduler] failover: channel ${ch.id} re-placed on node ${res.new_node_id}`); } else { console.error(`[scheduler] failover: channel ${ch.id} restart skipped — ${res.reason}`); } } catch (err2) { console.error(`[scheduler] failover error for ${ch.id}: ${err2.message}`); } } } } export function startSchedulerLoop() { if (_interval) return; console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`); // Fire once on startup so a window that opened while the API was down // doesn't have to wait a full interval. setTimeout(() => tick().catch(() => {}), 2000); _interval = setInterval(() => tick().catch(() => {}), TICK_INTERVAL_MS); } export function stopSchedulerLoop() { if (_interval) { clearInterval(_interval); _interval = null; } }