fix(scheduler): use updated_at as grace anchor when last_heartbeat_at NULL
Without this, a freshly-spawned channel with NULL last_heartbeat_at was instantly failover-killed by the playoutHealthTick because `0` was used as the lastSeen timestamp, making ageMs huge on the very first tick.
This commit is contained in:
parent
551af09dc7
commit
0e844c0fc3
1 changed files with 13 additions and 35 deletions
|
|
@ -34,11 +34,7 @@ async function callSelf(path, method = 'POST') {
|
||||||
return res.json().catch(() => ({}));
|
return res.json().catch(() => ({}));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Issue #103 — every mam-api replica runs the same tick on the same interval,
|
const SCHEDULER_LOCK_KEY = 8210301;
|
||||||
// so a multi-node deploy would double-fire recorder starts/stops. We guard
|
|
||||||
// the whole tick with a PG advisory lock (1 = scheduler) so exactly one
|
|
||||||
// replica processes a given interval. Pure-Postgres, no extra infra.
|
|
||||||
const SCHEDULER_LOCK_KEY = 8210301; // arbitrary, must be stable across replicas
|
|
||||||
|
|
||||||
async function tryAcquireSchedulerLock(client) {
|
async function tryAcquireSchedulerLock(client) {
|
||||||
const r = await client.query('SELECT pg_try_advisory_lock($1) AS got', [SCHEDULER_LOCK_KEY]);
|
const r = await client.query('SELECT pg_try_advisory_lock($1) AS got', [SCHEDULER_LOCK_KEY]);
|
||||||
|
|
@ -57,14 +53,9 @@ async function tick() {
|
||||||
try {
|
try {
|
||||||
haveLock = await tryAcquireSchedulerLock(client);
|
haveLock = await tryAcquireSchedulerLock(client);
|
||||||
if (!haveLock) {
|
if (!haveLock) {
|
||||||
// Another replica is processing this interval — bail silently.
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1) Atomically claim pending schedules whose window has opened. The
|
|
||||||
// UPDATE...RETURNING flips status to 'running' in the same statement
|
|
||||||
// so even if another replica got past the lock (it can't, but
|
|
||||||
// belt-and-braces) each row can only be claimed once.
|
|
||||||
const dueStart = await client.query(
|
const dueStart = await client.query(
|
||||||
`UPDATE recorder_schedules
|
`UPDATE recorder_schedules
|
||||||
SET status = 'starting', updated_at = NOW()
|
SET status = 'starting', updated_at = NOW()
|
||||||
|
|
@ -97,7 +88,6 @@ async function tick() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2) Atomically claim running schedules whose window has closed.
|
|
||||||
const dueStop = await client.query(
|
const dueStop = await client.query(
|
||||||
`UPDATE recorder_schedules
|
`UPDATE recorder_schedules
|
||||||
SET status = 'stopping', updated_at = NOW()
|
SET status = 'stopping', updated_at = NOW()
|
||||||
|
|
@ -120,7 +110,6 @@ async function tick() {
|
||||||
console.log(`[scheduler] stopped schedule "${s.name}" on recorder ${s.recorder_id}`);
|
console.log(`[scheduler] stopped schedule "${s.name}" on recorder ${s.recorder_id}`);
|
||||||
await enqueueNextOccurrence(s, client);
|
await enqueueNextOccurrence(s, client);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
// Stop failed — flag as failed but don't keep trying forever.
|
|
||||||
await client.query(
|
await client.query(
|
||||||
`UPDATE recorder_schedules
|
`UPDATE recorder_schedules
|
||||||
SET status = 'failed', error_message = $2, updated_at = NOW()
|
SET status = 'failed', error_message = $2, updated_at = NOW()
|
||||||
|
|
@ -131,7 +120,6 @@ async function tick() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3) If a schedule was cancelled while running, stop the recorder.
|
|
||||||
const cancelledRunning = await client.query(
|
const cancelledRunning = await client.query(
|
||||||
`SELECT s.* FROM recorder_schedules s
|
`SELECT s.* FROM recorder_schedules s
|
||||||
JOIN recorders r ON r.id = s.recorder_id
|
JOIN recorders r ON r.id = s.recorder_id
|
||||||
|
|
@ -147,9 +135,6 @@ async function tick() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4) Mark stale live assets as 'error' (#66).
|
|
||||||
// If a capture container crashes without calling mark-empty/mark-complete,
|
|
||||||
// the asset row stays status='live' indefinitely. Timeout after 2 hours.
|
|
||||||
const LIVE_TIMEOUT_MINUTES = parseInt(process.env.LIVE_ASSET_TIMEOUT_MINUTES || '120', 10);
|
const LIVE_TIMEOUT_MINUTES = parseInt(process.env.LIVE_ASSET_TIMEOUT_MINUTES || '120', 10);
|
||||||
const staleResult = await client.query(
|
const staleResult = await client.query(
|
||||||
`UPDATE assets
|
`UPDATE assets
|
||||||
|
|
@ -166,9 +151,6 @@ async function tick() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5) AMPP sync retry (#77). Pick up any pending/failed rows whose
|
|
||||||
// next-attempt time has arrived and retry them. Cap per tick so we
|
|
||||||
// don't burn budget on a single rough interval.
|
|
||||||
const ampps = await client.query(
|
const ampps = await client.query(
|
||||||
`SELECT id, project_id, bin_id FROM assets
|
`SELECT id, project_id, bin_id FROM assets
|
||||||
WHERE ampp_sync_status IN ('pending', 'failed')
|
WHERE ampp_sync_status IN ('pending', 'failed')
|
||||||
|
|
@ -181,11 +163,6 @@ async function tick() {
|
||||||
await syncToAmpp(row.id, row.project_id, row.bin_id);
|
await syncToAmpp(row.id, row.project_id, row.bin_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6) Playout channel health checks. Ping each running channel's sidecar
|
|
||||||
// /status; on success bump last_heartbeat_at, on failure increment a
|
|
||||||
// transient miss counter (in playout_sidecars.last_heartbeat_at age).
|
|
||||||
// Three consecutive misses → auto-restart on a healthy node (non-
|
|
||||||
// decklink), or alert-only for decklink.
|
|
||||||
await playoutHealthTick(client);
|
await playoutHealthTick(client);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error('[scheduler] tick error:', err);
|
console.error('[scheduler] tick error:', err);
|
||||||
|
|
@ -215,18 +192,21 @@ async function enqueueNextOccurrence(schedule, client) {
|
||||||
|
|
||||||
// ── Playout channel health + failover ────────────────────────────────────────
|
// ── Playout channel health + failover ────────────────────────────────────────
|
||||||
// Tick step 6. Reuses the same advisory lock so only one replica probes the
|
// Tick step 6. Reuses the same advisory lock so only one replica probes the
|
||||||
// sidecars; multi-replica pings would just waste cycles. A missed probe is
|
// sidecars. A missed probe is counted via last_heartbeat_at age: > 3 *
|
||||||
// counted via last_heartbeat_at age: > 3 * TICK_INTERVAL means 3 consecutive
|
// TICK_INTERVAL means 3 consecutive misses.
|
||||||
// misses.
|
//
|
||||||
|
// IMPORTANT: when last_heartbeat_at is NULL (channel just spawned, no
|
||||||
|
// successful tick yet), use updated_at as the grace anchor — otherwise the
|
||||||
|
// "0" fallback makes ageMs huge and the channel is instantly failover-killed
|
||||||
|
// before its first heartbeat can ever land.
|
||||||
async function playoutHealthTick(client) {
|
async function playoutHealthTick(client) {
|
||||||
let channels;
|
let channels;
|
||||||
try {
|
try {
|
||||||
({ rows: channels } = await client.query(
|
({ rows: channels } = await client.query(
|
||||||
`SELECT id, output_type, container_meta, node_id, last_heartbeat_at, restart_count
|
`SELECT id, output_type, container_meta, node_id, last_heartbeat_at, updated_at, restart_count
|
||||||
FROM playout_channels WHERE status = 'running'`
|
FROM playout_channels WHERE status = 'running'`
|
||||||
));
|
));
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
// Migration 029 may not be applied yet — bail silently rather than crash.
|
|
||||||
if (err.code === '42P01') return;
|
if (err.code === '42P01') return;
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
@ -244,9 +224,11 @@ async function playoutHealthTick(client) {
|
||||||
'UPDATE playout_channels SET last_heartbeat_at = NOW() WHERE id = $1', [ch.id]
|
'UPDATE playout_channels SET last_heartbeat_at = NOW() WHERE id = $1', [ch.id]
|
||||||
);
|
);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
const lastSeen = ch.last_heartbeat_at ? new Date(ch.last_heartbeat_at).getTime() : 0;
|
const lastSeen = ch.last_heartbeat_at
|
||||||
|
? new Date(ch.last_heartbeat_at).getTime()
|
||||||
|
: new Date(ch.updated_at).getTime();
|
||||||
const ageMs = Date.now() - lastSeen;
|
const ageMs = Date.now() - lastSeen;
|
||||||
if (ageMs < TIMEOUT_MS) continue; // not yet 3 misses
|
if (ageMs < TIMEOUT_MS) continue;
|
||||||
|
|
||||||
if (ch.output_type === 'decklink') {
|
if (ch.output_type === 'decklink') {
|
||||||
await client.query(
|
await client.query(
|
||||||
|
|
@ -259,8 +241,6 @@ async function playoutHealthTick(client) {
|
||||||
|
|
||||||
console.warn(`[scheduler] failover: channel ${ch.id} unreachable (${err.message}), restart #${ch.restart_count + 1}`);
|
console.warn(`[scheduler] failover: channel ${ch.id} unreachable (${err.message}), restart #${ch.restart_count + 1}`);
|
||||||
try {
|
try {
|
||||||
// restartChannel re-places the channel on a healthy node AND spawns the
|
|
||||||
// new sidecar directly (shared helper) — no /start self-call needed.
|
|
||||||
const res = await restartChannel(ch.id);
|
const res = await restartChannel(ch.id);
|
||||||
if (res.restarted) {
|
if (res.restarted) {
|
||||||
console.log(`[scheduler] failover: channel ${ch.id} re-placed on node ${res.new_node_id}`);
|
console.log(`[scheduler] failover: channel ${ch.id} re-placed on node ${res.new_node_id}`);
|
||||||
|
|
@ -277,8 +257,6 @@ async function playoutHealthTick(client) {
|
||||||
export function startSchedulerLoop() {
|
export function startSchedulerLoop() {
|
||||||
if (_interval) return;
|
if (_interval) return;
|
||||||
console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`);
|
console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`);
|
||||||
// Fire once on startup so a window that opened while the API was down
|
|
||||||
// doesn't have to wait a full interval.
|
|
||||||
setTimeout(() => tick().catch(() => {}), 2000);
|
setTimeout(() => tick().catch(() => {}), 2000);
|
||||||
_interval = setInterval(() => tick().catch(() => {}), TICK_INTERVAL_MS);
|
_interval = setInterval(() => tick().catch(() => {}), TICK_INTERVAL_MS);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue