dragonflight/services/mam-api/src/scheduler.js
Zac 5538683d78 feat(mam-api): /playout control plane + auto-failover
Routes: channel + playlist CRUD, start/stop/play/pause/skip transport, as-run
log. RBAC via assertProjectAccess on channel.project_id; null project ⇒
admin-only (recorder convention).

Sidecar orchestration mirrors recorders.js: Docker socket for local node,
node-agent /sidecar/start for remote. Channel start passes CHANNEL_ID env so
the sidecar can write HLS preview to /media/live/<id>.

DeckLink port-contention guard: blocks starting a decklink channel when a
recorder or another channel on the same node+device_index is active.

restartChannel(id) helper picks another healthy cluster node and re-places
non-decklink channels; decklink is alert-only. Exposed for the scheduler.

Scheduler tick adds step 6: poll each running channel's sidecar /status,
update last_heartbeat_at, and after ~3 misses trigger restartChannel +
self-call /start. Reuses the existing PG advisory lock so multi-replica
deploys don't double-fire failovers.
2026-05-30 14:02:25 +00:00

287 lines
12 KiB
JavaScript

// Scheduler tick — every TICK_INTERVAL_MS scan the recorder_schedules table
// and transition any rows whose window opens or closes. The actual recorder
// start/stop is delegated to the existing /recorders/:id/start|stop routes
// via an in-process HTTP call, so we reuse all of the existing container
// orchestration, growing-files handling, asset row creation, etc.
//
// On schedule completion: a 'daily' or 'weekly' recurring schedule is cloned
// forward by 1 day / 7 days into a new 'pending' row.
import pool from './db/pool.js';
import { syncToAmpp } from './routes/upload.js';
import { restartChannel } from './routes/playout.js';
const TICK_INTERVAL_MS = parseInt(process.env.SCHEDULER_TICK_MS || '15000', 10);
const SELF_URL = process.env.MAM_API_SELF_URL || `http://127.0.0.1:${process.env.PORT || 3000}`;
let _tickRunning = false;
let _interval = null;
async function callSelf(path, method = 'POST') {
const res = await fetch(`${SELF_URL}${path}`, {
method,
headers: { 'Content-Type': 'application/json' },
signal: AbortSignal.timeout(30000),
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`${method} ${path} → HTTP ${res.status}: ${text.slice(0, 200)}`);
}
return res.json().catch(() => ({}));
}
// Issue #103 — every mam-api replica runs the same tick on the same interval,
// so a multi-node deploy would double-fire recorder starts/stops. We guard
// the whole tick with a PG advisory lock (1 = scheduler) so exactly one
// replica processes a given interval. Pure-Postgres, no extra infra.
const SCHEDULER_LOCK_KEY = 8210301; // arbitrary, must be stable across replicas
async function tryAcquireSchedulerLock(client) {
const r = await client.query('SELECT pg_try_advisory_lock($1) AS got', [SCHEDULER_LOCK_KEY]);
return !!r.rows[0]?.got;
}
async function releaseSchedulerLock(client) {
await client.query('SELECT pg_advisory_unlock($1)', [SCHEDULER_LOCK_KEY]).catch(() => {});
}
async function tick() {
if (_tickRunning) return;
_tickRunning = true;
const client = await pool.connect();
let haveLock = false;
try {
haveLock = await tryAcquireSchedulerLock(client);
if (!haveLock) {
// Another replica is processing this interval — bail silently.
return;
}
// 1) Atomically claim pending schedules whose window has opened. The
// UPDATE...RETURNING flips status to 'running' in the same statement
// so even if another replica got past the lock (it can't, but
// belt-and-braces) each row can only be claimed once.
const dueStart = await client.query(
`UPDATE recorder_schedules
SET status = 'starting', updated_at = NOW()
WHERE id IN (
SELECT id FROM recorder_schedules
WHERE status = 'pending' AND start_at <= NOW() AND end_at > NOW()
ORDER BY start_at ASC
FOR UPDATE SKIP LOCKED
)
RETURNING *`
);
for (const s of dueStart.rows) {
try {
const result = await callSelf(`/api/v1/recorders/${s.recorder_id}/start`);
await client.query(
`UPDATE recorder_schedules
SET status = 'running', last_asset_id = NULL, updated_at = NOW()
WHERE id = $1`,
[s.id]
);
console.log(`[scheduler] started schedule "${s.name}" on recorder ${s.recorder_id} (session=${result.current_session_id || '?'})`);
} catch (err) {
await client.query(
`UPDATE recorder_schedules
SET status = 'failed', error_message = $2, updated_at = NOW()
WHERE id = $1`,
[s.id, err.message.slice(0, 500)]
);
console.error(`[scheduler] start failed for schedule ${s.id}: ${err.message}`);
}
}
// 2) Atomically claim running schedules whose window has closed.
const dueStop = await client.query(
`UPDATE recorder_schedules
SET status = 'stopping', updated_at = NOW()
WHERE id IN (
SELECT id FROM recorder_schedules
WHERE status = 'running' AND end_at <= NOW()
ORDER BY end_at ASC
FOR UPDATE SKIP LOCKED
)
RETURNING *`
);
for (const s of dueStop.rows) {
try {
await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`);
await pool.query(
`UPDATE recorder_schedules SET status = 'completed', updated_at = NOW()
WHERE id = $1`,
[s.id]
);
console.log(`[scheduler] stopped schedule "${s.name}" on recorder ${s.recorder_id}`);
await enqueueNextOccurrence(s, client);
} catch (err) {
// Stop failed — flag as failed but don't keep trying forever.
await client.query(
`UPDATE recorder_schedules
SET status = 'failed', error_message = $2, updated_at = NOW()
WHERE id = $1`,
[s.id, ('stop: ' + err.message).slice(0, 500)]
);
console.error(`[scheduler] stop failed for schedule ${s.id}: ${err.message}`);
}
}
// 3) If a schedule was cancelled while running, stop the recorder.
const cancelledRunning = await client.query(
`SELECT s.* FROM recorder_schedules s
JOIN recorders r ON r.id = s.recorder_id
WHERE s.status = 'cancelled' AND r.status = 'recording'
AND s.updated_at > NOW() - INTERVAL '5 minutes'`
);
for (const s of cancelledRunning.rows) {
try {
await callSelf(`/api/v1/recorders/${s.recorder_id}/stop`);
console.log(`[scheduler] cancelled schedule "${s.name}" — stopped recorder ${s.recorder_id}`);
} catch (err) {
console.warn(`[scheduler] cancel-stop failed for ${s.id}: ${err.message}`);
}
}
// 4) Mark stale live assets as 'error' (#66).
// If a capture container crashes without calling mark-empty/mark-complete,
// the asset row stays status='live' indefinitely. Timeout after 2 hours.
const LIVE_TIMEOUT_MINUTES = parseInt(process.env.LIVE_ASSET_TIMEOUT_MINUTES || '120', 10);
const staleResult = await client.query(
`UPDATE assets
SET status = 'error',
updated_at = NOW()
WHERE status = 'live'
AND created_at < NOW() - ($1 || ' minutes')::INTERVAL
RETURNING id, display_name`,
[LIVE_TIMEOUT_MINUTES]
);
if (staleResult.rows.length > 0) {
for (const row of staleResult.rows) {
console.warn(`[scheduler] marked stale live asset as error: ${row.id} (${row.display_name})`);
}
}
// 5) AMPP sync retry (#77). Pick up any pending/failed rows whose
// next-attempt time has arrived and retry them. Cap per tick so we
// don't burn budget on a single rough interval.
const ampps = await client.query(
`SELECT id, project_id, bin_id FROM assets
WHERE ampp_sync_status IN ('pending', 'failed')
AND (ampp_sync_next_attempt_at IS NULL OR ampp_sync_next_attempt_at <= NOW())
AND ampp_sync_attempts < 8
ORDER BY ampp_sync_next_attempt_at NULLS FIRST
LIMIT 25`
);
for (const row of ampps.rows) {
await syncToAmpp(row.id, row.project_id, row.bin_id);
}
// 6) Playout channel health checks. Ping each running channel's sidecar
// /status; on success bump last_heartbeat_at, on failure increment a
// transient miss counter (in playout_sidecars.last_heartbeat_at age).
// Three consecutive misses → auto-restart on a healthy node (non-
// decklink), or alert-only for decklink.
await playoutHealthTick(client);
} catch (err) {
console.error('[scheduler] tick error:', err);
} finally {
if (haveLock) await releaseSchedulerLock(client);
client.release();
_tickRunning = false;
}
}
async function enqueueNextOccurrence(schedule, client) {
if (schedule.recurrence === 'none') return;
const days = schedule.recurrence === 'weekly' ? 7 : 1;
const start = new Date(schedule.start_at);
const end = new Date(schedule.end_at);
start.setUTCDate(start.getUTCDate() + days);
end.setUTCDate(end.getUTCDate() + days);
const q = client || pool;
await q.query(
`INSERT INTO recorder_schedules
(name, recorder_id, start_at, end_at, recurrence, status)
VALUES ($1, $2, $3, $4, $5, 'pending')`,
[schedule.name, schedule.recorder_id, start.toISOString(), end.toISOString(), schedule.recurrence]
);
console.log(`[scheduler] queued next "${schedule.name}" → ${start.toISOString()}`);
}
// ── Playout channel health + failover ────────────────────────────────────────
// Tick step 6. Reuses the same advisory lock so only one replica probes the
// sidecars; multi-replica pings would just waste cycles. A missed probe is
// counted via last_heartbeat_at age: > 3 * TICK_INTERVAL means 3 consecutive
// misses.
async function playoutHealthTick(client) {
let channels;
try {
({ rows: channels } = await client.query(
`SELECT id, output_type, container_meta, node_id, last_heartbeat_at, restart_count
FROM playout_channels WHERE status = 'running'`
));
} catch (err) {
// Migration 029 may not be applied yet — bail silently rather than crash.
if (err.code === '42P01') return;
throw err;
}
const TIMEOUT_MS = TICK_INTERVAL_MS * 3 + 5000;
for (const ch of channels) {
const sidecarUrl =
ch.container_meta && ch.container_meta.sidecar_url
? ch.container_meta.sidecar_url
: `http://playout-${ch.id}:3002`;
try {
const r = await fetch(`${sidecarUrl}/status`, { signal: AbortSignal.timeout(5000) });
if (!r.ok) throw new Error(`status HTTP ${r.status}`);
await client.query(
'UPDATE playout_channels SET last_heartbeat_at = NOW() WHERE id = $1', [ch.id]
);
} catch (err) {
const lastSeen = ch.last_heartbeat_at ? new Date(ch.last_heartbeat_at).getTime() : 0;
const ageMs = Date.now() - lastSeen;
if (ageMs < TIMEOUT_MS) continue; // not yet 3 misses
if (ch.output_type === 'decklink') {
await client.query(
"UPDATE playout_channels SET status = 'error', error_message = $1 WHERE id = $2",
[`sidecar unreachable (${err.message}); decklink channels require manual recovery`, ch.id]
);
console.error(`[scheduler] decklink channel ${ch.id} unreachable — alert-only, no auto-failover`);
continue;
}
console.warn(`[scheduler] failover: channel ${ch.id} unreachable (${err.message}), restart #${ch.restart_count + 1}`);
try {
const res = await restartChannel(ch.id);
if (res.restarted) {
console.log(`[scheduler] failover: channel ${ch.id} re-placed on node ${res.new_node_id}`);
// Kick the new sidecar via the /start route — the helper updates the
// DB but the actual docker spawn lives on the start endpoint.
await callSelf(`/api/v1/playout/channels/${ch.id}/start`).catch((e) => {
console.error(`[scheduler] failover: /start call failed: ${e.message}`);
});
} else {
console.error(`[scheduler] failover: channel ${ch.id} restart skipped — ${res.reason}`);
}
} catch (err2) {
console.error(`[scheduler] failover error for ${ch.id}: ${err2.message}`);
}
}
}
}
export function startSchedulerLoop() {
if (_interval) return;
console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`);
// Fire once on startup so a window that opened while the API was down
// doesn't have to wait a full interval.
setTimeout(() => tick().catch(() => {}), 2000);
_interval = setInterval(() => tick().catch(() => {}), TICK_INTERVAL_MS);
}
export function stopSchedulerLoop() {
if (_interval) { clearInterval(_interval); _interval = null; }
}