diff --git a/services/mam-api/src/scheduler.js b/services/mam-api/src/scheduler.js index 027fa5e..91ba0bb 100644 --- a/services/mam-api/src/scheduler.js +++ b/services/mam-api/src/scheduler.js @@ -34,11 +34,7 @@ async function callSelf(path, method = 'POST') { return res.json().catch(() => ({})); } -// Issue #103 — every mam-api replica runs the same tick on the same interval, -// so a multi-node deploy would double-fire recorder starts/stops. We guard -// the whole tick with a PG advisory lock (1 = scheduler) so exactly one -// replica processes a given interval. Pure-Postgres, no extra infra. -const SCHEDULER_LOCK_KEY = 8210301; // arbitrary, must be stable across replicas +const SCHEDULER_LOCK_KEY = 8210301; async function tryAcquireSchedulerLock(client) { const r = await client.query('SELECT pg_try_advisory_lock($1) AS got', [SCHEDULER_LOCK_KEY]); @@ -57,14 +53,9 @@ async function tick() { try { haveLock = await tryAcquireSchedulerLock(client); if (!haveLock) { - // Another replica is processing this interval — bail silently. return; } - // 1) Atomically claim pending schedules whose window has opened. The - // UPDATE...RETURNING flips status to 'running' in the same statement - // so even if another replica got past the lock (it can't, but - // belt-and-braces) each row can only be claimed once. const dueStart = await client.query( `UPDATE recorder_schedules SET status = 'starting', updated_at = NOW() @@ -97,7 +88,6 @@ async function tick() { } } - // 2) Atomically claim running schedules whose window has closed. const dueStop = await client.query( `UPDATE recorder_schedules SET status = 'stopping', updated_at = NOW() @@ -120,7 +110,6 @@ async function tick() { console.log(`[scheduler] stopped schedule "${s.name}" on recorder ${s.recorder_id}`); await enqueueNextOccurrence(s, client); } catch (err) { - // Stop failed — flag as failed but don't keep trying forever. await client.query( `UPDATE recorder_schedules SET status = 'failed', error_message = $2, updated_at = NOW() @@ -131,7 +120,6 @@ async function tick() { } } - // 3) If a schedule was cancelled while running, stop the recorder. const cancelledRunning = await client.query( `SELECT s.* FROM recorder_schedules s JOIN recorders r ON r.id = s.recorder_id @@ -147,9 +135,6 @@ async function tick() { } } - // 4) Mark stale live assets as 'error' (#66). - // If a capture container crashes without calling mark-empty/mark-complete, - // the asset row stays status='live' indefinitely. Timeout after 2 hours. const LIVE_TIMEOUT_MINUTES = parseInt(process.env.LIVE_ASSET_TIMEOUT_MINUTES || '120', 10); const staleResult = await client.query( `UPDATE assets @@ -166,9 +151,6 @@ async function tick() { } } - // 5) AMPP sync retry (#77). Pick up any pending/failed rows whose - // next-attempt time has arrived and retry them. Cap per tick so we - // don't burn budget on a single rough interval. const ampps = await client.query( `SELECT id, project_id, bin_id FROM assets WHERE ampp_sync_status IN ('pending', 'failed') @@ -181,11 +163,6 @@ async function tick() { await syncToAmpp(row.id, row.project_id, row.bin_id); } - // 6) Playout channel health checks. Ping each running channel's sidecar - // /status; on success bump last_heartbeat_at, on failure increment a - // transient miss counter (in playout_sidecars.last_heartbeat_at age). - // Three consecutive misses → auto-restart on a healthy node (non- - // decklink), or alert-only for decklink. await playoutHealthTick(client); } catch (err) { console.error('[scheduler] tick error:', err); @@ -215,18 +192,21 @@ async function enqueueNextOccurrence(schedule, client) { // ── Playout channel health + failover ──────────────────────────────────────── // Tick step 6. Reuses the same advisory lock so only one replica probes the -// sidecars; multi-replica pings would just waste cycles. A missed probe is -// counted via last_heartbeat_at age: > 3 * TICK_INTERVAL means 3 consecutive -// misses. +// sidecars. A missed probe is counted via last_heartbeat_at age: > 3 * +// TICK_INTERVAL means 3 consecutive misses. +// +// IMPORTANT: when last_heartbeat_at is NULL (channel just spawned, no +// successful tick yet), use updated_at as the grace anchor — otherwise the +// "0" fallback makes ageMs huge and the channel is instantly failover-killed +// before its first heartbeat can ever land. async function playoutHealthTick(client) { let channels; try { ({ rows: channels } = await client.query( - `SELECT id, output_type, container_meta, node_id, last_heartbeat_at, restart_count + `SELECT id, output_type, container_meta, node_id, last_heartbeat_at, updated_at, restart_count FROM playout_channels WHERE status = 'running'` )); } catch (err) { - // Migration 029 may not be applied yet — bail silently rather than crash. if (err.code === '42P01') return; throw err; } @@ -244,9 +224,11 @@ async function playoutHealthTick(client) { 'UPDATE playout_channels SET last_heartbeat_at = NOW() WHERE id = $1', [ch.id] ); } catch (err) { - const lastSeen = ch.last_heartbeat_at ? new Date(ch.last_heartbeat_at).getTime() : 0; + const lastSeen = ch.last_heartbeat_at + ? new Date(ch.last_heartbeat_at).getTime() + : new Date(ch.updated_at).getTime(); const ageMs = Date.now() - lastSeen; - if (ageMs < TIMEOUT_MS) continue; // not yet 3 misses + if (ageMs < TIMEOUT_MS) continue; if (ch.output_type === 'decklink') { await client.query( @@ -259,8 +241,6 @@ async function playoutHealthTick(client) { console.warn(`[scheduler] failover: channel ${ch.id} unreachable (${err.message}), restart #${ch.restart_count + 1}`); try { - // restartChannel re-places the channel on a healthy node AND spawns the - // new sidecar directly (shared helper) — no /start self-call needed. const res = await restartChannel(ch.id); if (res.restarted) { console.log(`[scheduler] failover: channel ${ch.id} re-placed on node ${res.new_node_id}`); @@ -277,8 +257,6 @@ async function playoutHealthTick(client) { export function startSchedulerLoop() { if (_interval) return; console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`); - // Fire once on startup so a window that opened while the API was down - // doesn't have to wait a full interval. setTimeout(() => tick().catch(() => {}), 2000); _interval = setInterval(() => tick().catch(() => {}), TICK_INTERVAL_MS); }