From 5538683d7897c5e3e0acea9af33334d299ebf831 Mon Sep 17 00:00:00 2001 From: Zac Date: Sat, 30 May 2026 13:18:01 +0000 Subject: [PATCH] feat(mam-api): /playout control plane + auto-failover MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Routes: channel + playlist CRUD, start/stop/play/pause/skip transport, as-run log. RBAC via assertProjectAccess on channel.project_id; null project ⇒ admin-only (recorder convention). Sidecar orchestration mirrors recorders.js: Docker socket for local node, node-agent /sidecar/start for remote. Channel start passes CHANNEL_ID env so the sidecar can write HLS preview to /media/live/. DeckLink port-contention guard: blocks starting a decklink channel when a recorder or another channel on the same node+device_index is active. restartChannel(id) helper picks another healthy cluster node and re-places non-decklink channels; decklink is alert-only. Exposed for the scheduler. Scheduler tick adds step 6: poll each running channel's sidecar /status, update last_heartbeat_at, and after ~3 misses trigger restartChannel + self-call /start. Reuses the existing PG advisory lock so multi-replica deploys don't double-fire failovers. --- services/mam-api/src/index.js | 2 + services/mam-api/src/routes/playout.js | 675 +++++++++++++++++++++++++ services/mam-api/src/scheduler.js | 72 +++ 3 files changed, 749 insertions(+) create mode 100644 services/mam-api/src/routes/playout.js diff --git a/services/mam-api/src/index.js b/services/mam-api/src/index.js index 51169fe..8831034 100644 --- a/services/mam-api/src/index.js +++ b/services/mam-api/src/index.js @@ -22,6 +22,7 @@ import jobsRouter from './routes/jobs.js'; import captureRouter from './routes/capture.js'; import uploadRouter from './routes/upload.js'; import recordersRouter from './routes/recorders.js'; +import playoutRouter from './routes/playout.js'; import settingsRouter from './routes/settings.js'; import amppRouter from './routes/ampp.js'; import groupsRouter from './routes/groups.js'; @@ -132,6 +133,7 @@ app.use('/api/v1/jobs', jobsRouter); app.use('/api/v1/capture', captureRouter); app.use('/api/v1/upload', uploadRouter); app.use('/api/v1/recorders', recordersRouter); +app.use('/api/v1/playout', playoutRouter); app.use('/api/v1/settings', settingsRouter); app.use('/api/v1/ampp', amppRouter); app.use('/api/v1/groups', requireAdmin, groupsRouter); diff --git a/services/mam-api/src/routes/playout.js b/services/mam-api/src/routes/playout.js new file mode 100644 index 0000000..161eb5f --- /dev/null +++ b/services/mam-api/src/routes/playout.js @@ -0,0 +1,675 @@ +// Playout / Master Control routes. +// +// Control plane for the CasparCG-backed playout subsystem. Channels are placed +// on cluster nodes and their engine containers spawned via the same Docker-socket +// / node-agent path recorders use; the channel's transport (play / pause / skip) +// is proxied through to the sidecar's HTTP shim, which drives CasparCG over AMCP. +// +// RBAC: every channel carries a project_id (NULL = admin-only, the recorder +// convention). List routes filter by accessible projects; mutating routes assert +// 'edit'. See docs/superpowers/specs/2026-05-30-playout-mcr-design.md. + +import express from 'express'; +import http from 'http'; +import { Queue } from 'bullmq'; +import pool from '../db/pool.js'; +import { validateUuid } from '../middleware/errors.js'; +import { + assertProjectAccess, accessibleProjectIds, isAdmin, +} from '../auth/authz.js'; + +const router = express.Router(); + +// ── BullMQ: media staging queue (S3 -> /media volume) ──────────────────────── +const parseRedisUrl = (url) => { + const parsed = new URL(url); + return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; +}; +const stageQueue = new Queue('playout-stage', { + connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), +}); + +// ── Sidecar orchestration (mirrors recorders.js) ───────────────────────────── +const PLAYOUT_SIDECAR_IMAGE = process.env.PLAYOUT_IMAGE || 'wild-dragon-playout:latest'; + +function dockerApi(method, path, body = null) { + return new Promise((resolve, reject) => { + const options = { + socketPath: '/var/run/docker.sock', + path: `/v1.43${path}`, + method, + headers: { 'Content-Type': 'application/json' }, + }; + const req = http.request(options, (res) => { + let data = ''; + res.on('data', (chunk) => (data += chunk)); + res.on('end', () => { + try { resolve({ status: res.statusCode, data: data ? JSON.parse(data) : {} }); } + catch { resolve({ status: res.statusCode, data }); } + }); + }); + req.on('error', reject); + req.setTimeout(10000, () => req.destroy(new Error('Docker API timeout after 10s'))); + if (body) req.write(JSON.stringify(body)); + req.end(); + }); +} + +async function resolveNodeTarget(nodeId) { + if (!nodeId) return { remote: false }; + const r = await pool.query( + 'SELECT hostname, ip_address, api_url FROM cluster_nodes WHERE id = $1', [nodeId] + ); + if (r.rows.length === 0) return { remote: false }; + const node = r.rows[0]; + const localHostname = process.env.NODE_HOSTNAME || ''; + if (!node.api_url || node.hostname === localHostname) return { remote: false }; + return { remote: true, apiUrl: node.api_url, ip: node.ip_address }; +} + +// The sidecar shim listens on this port inside the container. The mam-api talks +// to it by container alias on the shared docker network (local) or via the +// node-agent's returned host:port (remote). +const SIDECAR_HTTP_PORT = 3002; + +function channelAlias(id) { return `playout-${id}`; } + +// Resolve the base URL the API uses to reach a running channel's sidecar shim. +// Local: the docker-network alias. Remote: the node-agent reported the host the +// container is published on (stored in container_meta.sidecar_url). +function sidecarBaseUrl(channel) { + if (channel.container_meta && channel.container_meta.sidecar_url) { + return channel.container_meta.sidecar_url; + } + return `http://${channelAlias(channel.id)}:${SIDECAR_HTTP_PORT}`; +} + +async function callSidecar(channel, path, method = 'POST', body = null) { + const url = `${sidecarBaseUrl(channel)}${path}`; + const res = await fetch(url, { + method, + headers: { 'Content-Type': 'application/json' }, + body: body ? JSON.stringify(body) : undefined, + signal: AbortSignal.timeout(20000), + }); + if (!res.ok) { + const text = await res.text().catch(() => ''); + throw new Error(`sidecar ${method} ${path} -> HTTP ${res.status}: ${text.slice(0, 200)}`); + } + return res.json().catch(() => ({})); +} + +// ── Serialization ──────────────────────────────────────────────────────────── +function channelToJson(r) { + return { + id: r.id, + name: r.name, + node_id: r.node_id, + output_type: r.output_type, + output_config: r.output_config, + video_format: r.video_format, + status: r.status, + container_id: r.container_id, + error_message: r.error_message, + project_id: r.project_id, + restart_count: r.restart_count ?? 0, + last_restart_at: r.last_restart_at, + last_heartbeat_at: r.last_heartbeat_at, + created_at: r.created_at, + updated_at: r.updated_at, + }; +} + +const OUTPUT_TYPES = new Set(['decklink', 'ndi', 'srt', 'rtmp']); + +// ── Param resolver: scope every /:id route to the channel's project ────────── +router.param('id', async (req, res, next) => { + validateUuid('id')(req, res, () => {}); + if (res.headersSent) return; + try { + const { rows } = await pool.query( + 'SELECT * FROM playout_channels WHERE id = $1', [req.params.id] + ); + if (rows.length === 0) return res.status(404).json({ error: 'Channel not found' }); + req.channel = rows[0]; + await assertProjectAccess(req.user, req.channel.project_id, 'view'); + next(); + } catch (err) { next(err); } +}); + +async function requireChannelEdit(req, res, next) { + try { await assertProjectAccess(req.user, req.channel.project_id, 'edit'); next(); } + catch (err) { next(err); } +} + +// ── Channels ───────────────────────────────────────────────────────────────── + +// GET /playout/channels — list (filtered to accessible projects) +router.get('/channels', async (req, res, next) => { + try { + let rows; + if (isAdmin(req.user)) { + ({ rows } = await pool.query('SELECT * FROM playout_channels ORDER BY created_at DESC')); + } else { + const ids = await accessibleProjectIds(req.user); + if (ids.length === 0) return res.json([]); + ({ rows } = await pool.query( + 'SELECT * FROM playout_channels WHERE project_id = ANY($1) ORDER BY created_at DESC', [ids] + )); + } + res.json(rows.map(channelToJson)); + } catch (err) { next(err); } +}); + +// POST /playout/channels — create +router.post('/channels', async (req, res, next) => { + try { + const { name, node_id = null, output_type = 'srt', output_config = {}, + video_format = '1080p5994', project_id = null } = req.body || {}; + if (!name || typeof name !== 'string') { + return res.status(400).json({ error: 'name is required' }); + } + if (!OUTPUT_TYPES.has(output_type)) { + return res.status(400).json({ error: `output_type must be one of: ${[...OUTPUT_TYPES].join(', ')}` }); + } + // Creating a project-scoped channel requires edit on that project; a + // null-project (admin-only) channel requires admin. + if (project_id) await assertProjectAccess(req.user, project_id, 'edit'); + else if (!isAdmin(req.user)) return res.status(403).json({ error: 'admin required for unassigned channel' }); + + const { rows } = await pool.query( + `INSERT INTO playout_channels (name, node_id, output_type, output_config, video_format, project_id) + VALUES ($1,$2,$3,$4,$5,$6) RETURNING *`, + [name.trim(), node_id, output_type, JSON.stringify(output_config), video_format, project_id] + ); + res.status(201).json(channelToJson(rows[0])); + } catch (err) { next(err); } +}); + +// PATCH /playout/channels/:id — update config (only while stopped) +router.patch('/channels/:id', requireChannelEdit, async (req, res, next) => { + try { + if (req.channel.status === 'running') { + return res.status(409).json({ error: 'Cannot edit a running channel — stop it first' }); + } + const allowed = ['name', 'node_id', 'output_type', 'output_config', 'video_format', 'project_id']; + const sets = []; + const vals = []; + let i = 1; + for (const k of allowed) { + if (req.body[k] === undefined) continue; + if (k === 'output_type' && !OUTPUT_TYPES.has(req.body[k])) { + return res.status(400).json({ error: 'invalid output_type' }); + } + sets.push(`${k} = $${i++}`); + vals.push(k === 'output_config' ? JSON.stringify(req.body[k]) : req.body[k]); + } + if (sets.length === 0) return res.json(channelToJson(req.channel)); + vals.push(req.channel.id); + const { rows } = await pool.query( + `UPDATE playout_channels SET ${sets.join(', ')}, updated_at = NOW() WHERE id = $${i} RETURNING *`, vals + ); + res.json(channelToJson(rows[0])); + } catch (err) { next(err); } +}); + +// DELETE /playout/channels/:id +router.delete('/channels/:id', requireChannelEdit, async (req, res, next) => { + try { + if (req.channel.status === 'running') { + return res.status(409).json({ error: 'Stop the channel before deleting it' }); + } + await pool.query('DELETE FROM playout_channels WHERE id = $1', [req.channel.id]); + res.json({ deleted: true }); + } catch (err) { next(err); } +}); + +// ── Port-contention guard (DeckLink) ───────────────────────────────────────── +// A DeckLink device on a node is exclusive: an active recorder OR another active +// channel on the same node+index blocks a new SDI channel. NDI/SRT/RTMP have no +// hardware contention. +async function assertDeckLinkFree(channel) { + if (channel.output_type !== 'decklink') return; + const idx = (channel.output_config && channel.output_config.device_index) || 1; + // Another running channel on the same node + device index? + const chan = await pool.query( + `SELECT id FROM playout_channels + WHERE id <> $1 AND node_id IS NOT DISTINCT FROM $2 AND status = 'running' + AND output_type = 'decklink' AND (output_config->>'device_index')::int = $3`, + [channel.id, channel.node_id, idx] + ); + if (chan.rows.length > 0) { + throw Object.assign(new Error(`DeckLink device ${idx} already in use by another channel on this node`), { httpStatus: 409 }); + } + // An active recorder using the same device index on the same node? + const rec = await pool.query( + `SELECT id FROM recorders + WHERE node_id IS NOT DISTINCT FROM $1 AND device_index = $2 + AND status = 'recording' AND source_type = 'sdi'`, + [channel.node_id, idx] + ); + if (rec.rows.length > 0) { + throw Object.assign(new Error(`DeckLink device ${idx} is in use by a recorder on this node`), { httpStatus: 409 }); + } +} + +// POST /playout/channels/:id/start — spawn the CasparCG sidecar + bring up output +router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) => { + try { + const channel = req.channel; + if (channel.status === 'running' || channel.status === 'starting') { + return res.status(409).json({ error: `Channel already ${channel.status}` }); + } + await assertDeckLinkFree(channel); + + await pool.query('UPDATE playout_channels SET status = $1, error_message = NULL WHERE id = $2', ['starting', channel.id]); + + const env = [ + `OUTPUT_TYPE=${channel.output_type}`, + `OUTPUT_CONFIG=${JSON.stringify(channel.output_config || {})}`, + `VIDEO_FORMAT=${channel.video_format}`, + `PORT=${SIDECAR_HTTP_PORT}`, + // Drives the HLS preview path (/media/live//index.m3u8) and + // the per-channel resource naming inside the sidecar. + `CHANNEL_ID=${channel.id}`, + ]; + + const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(channel.node_id); + const dockerNetwork = process.env.DOCKER_NETWORK || 'wild-dragon_wild-dragon'; + let containerId; + let containerMeta = {}; + + if (isRemote) { + const sidecarRes = await fetch(`${targetNodeApiUrl}/sidecar/start`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + image: PLAYOUT_SIDECAR_IMAGE, env, + capturePort: SIDECAR_HTTP_PORT, + sourceType: channel.output_type, + useGpu: false, + publishHttp: true, + }), + signal: AbortSignal.timeout(20000), + }); + if (!sidecarRes.ok) { + const details = await sidecarRes.json().catch(() => ({})); + console.error('[playout] remote sidecar start failed:', JSON.stringify(details)); + await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3', + ['error', 'remote node failed to start sidecar', channel.id]); + return res.status(502).json({ error: 'Remote node failed to start sidecar' }); + } + const data = await sidecarRes.json(); + containerId = data.containerId; + // node-agent returns the reachable host:port the shim is published on. + if (data.sidecarUrl || data.host) { + containerMeta.sidecar_url = data.sidecarUrl || `http://${data.host}:${SIDECAR_HTTP_PORT}`; + } + } else { + const alias = channelAlias(channel.id); + const hostBinds = ['/mnt/NVME/MAM/wild-dragon-media:/media']; + if (channel.output_type === 'decklink') hostBinds.push('/dev/blackmagic:/dev/blackmagic'); + + const containerConfig = { + Image: PLAYOUT_SIDECAR_IMAGE, + Env: env, + HostConfig: { + Privileged: true, + NetworkMode: dockerNetwork, + Binds: hostBinds, + }, + NetworkingConfig: { EndpointsConfig: { [dockerNetwork]: { Aliases: [alias] } } }, + Hostname: alias, + }; + const createRes = await dockerApi('POST', '/containers/create', containerConfig); + if (createRes.status !== 201) { + console.error('[playout] container create failed:', JSON.stringify(createRes.data)); + await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3', + ['error', 'container create failed', channel.id]); + return res.status(500).json({ error: 'Failed to create container' }); + } + containerId = createRes.data.Id; + const startRes = await dockerApi('POST', `/containers/${containerId}/start`); + if (startRes.status !== 204) { + console.error('[playout] container start failed:', JSON.stringify(startRes.data)); + await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {}); + await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3', + ['error', 'container start failed', channel.id]); + return res.status(500).json({ error: 'Failed to start container' }); + } + } + + const { rows } = await pool.query( + `UPDATE playout_channels + SET status = 'running', container_id = $1, container_meta = $2, updated_at = NOW() + WHERE id = $3 RETURNING *`, + [containerId, JSON.stringify(containerMeta), channel.id] + ); + res.json(channelToJson(rows[0])); + } catch (err) { + if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message }); + next(err); + } +}); + +// POST /playout/channels/:id/stop — tear down the sidecar +router.post('/channels/:id/stop', requireChannelEdit, async (req, res, next) => { + try { + const channel = req.channel; + if (channel.container_id) { + const { remote: isRemote, apiUrl } = await resolveNodeTarget(channel.node_id); + if (isRemote) { + await fetch(`${apiUrl}/sidecar/stop`, { + method: 'POST', headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ containerId: channel.container_id }), + signal: AbortSignal.timeout(20000), + }).catch((e) => console.error('[playout] remote stop failed:', e.message)); + } else { + await dockerApi('POST', `/containers/${channel.container_id}/stop?t=10`).catch(() => {}); + await dockerApi('DELETE', `/containers/${channel.container_id}?force=true`).catch(() => {}); + } + } + const { rows } = await pool.query( + `UPDATE playout_channels SET status = 'stopped', container_id = NULL, updated_at = NOW() + WHERE id = $1 RETURNING *`, [channel.id] + ); + res.json(channelToJson(rows[0])); + } catch (err) { next(err); } +}); + +// GET /playout/channels/:id/status — live engine status (proxied to sidecar) +router.get('/channels/:id/status', async (req, res, next) => { + try { + if (req.channel.status !== 'running') { + return res.json({ running: false, status: req.channel.status }); + } + const out = await callSidecar(req.channel, '/status', 'GET'); + res.json({ running: true, status: req.channel.status, engine: out }); + } catch (err) { + res.json({ running: true, status: req.channel.status, engine: null, engine_error: err.message }); + } +}); + +// ── Transport ──────────────────────────────────────────────────────────────── +async function transport(req, res, action, body = null) { + if (req.channel.status !== 'running') { + return res.status(409).json({ error: 'Channel is not running' }); + } + try { res.json(await callSidecar(req.channel, action, 'POST', body)); } + catch (err) { res.status(502).json({ error: err.message }); } +} + +// POST /playout/channels/:id/play — resolve the channel's playlist, stage-check, +// and hand the engine the ordered list of ready clips. +router.post('/channels/:id/play', requireChannelEdit, async (req, res, next) => { + try { + if (req.channel.status !== 'running') { + return res.status(409).json({ error: 'Start the channel before playing' }); + } + const { playlist_id } = req.body || {}; + if (!playlist_id) return res.status(400).json({ error: 'playlist_id is required' }); + + const pl = await pool.query('SELECT * FROM playout_playlists WHERE id = $1 AND channel_id = $2', + [playlist_id, req.channel.id]); + if (pl.rows.length === 0) return res.status(404).json({ error: 'Playlist not found for this channel' }); + + const items = await pool.query( + `SELECT i.*, a.filename AS clip_name + FROM playout_items i JOIN assets a ON a.id = i.asset_id + WHERE i.playlist_id = $1 ORDER BY i.sort_order ASC`, [playlist_id]); + + const notReady = items.rows.filter((i) => i.media_status !== 'ready' || !i.media_path); + if (notReady.length > 0) { + return res.status(409).json({ + error: 'Some items are not staged yet', + pending: notReady.map((i) => i.id), + }); + } + + const payload = { + loop: pl.rows[0].loop, + items: items.rows.map((i) => ({ + id: i.id, asset_id: i.asset_id, media_path: i.media_path, + in_point: i.in_point ? Number(i.in_point) : null, + out_point: i.out_point ? Number(i.out_point) : null, + transition: i.transition, transition_ms: i.transition_ms, + clip_name: i.clip_name, + })), + }; + const out = await callSidecar(req.channel, '/playlist/load', 'POST', payload); + res.json(out); + } catch (err) { next(err); } +}); + +router.post('/channels/:id/pause', requireChannelEdit, (req, res) => transport(req, res, '/transport/pause')); +router.post('/channels/:id/resume', requireChannelEdit, (req, res) => transport(req, res, '/transport/resume')); +router.post('/channels/:id/skip', requireChannelEdit, (req, res) => transport(req, res, '/transport/skip')); +router.post('/channels/:id/stop-playback', requireChannelEdit, (req, res) => transport(req, res, '/channel/stop')); + +// GET /playout/channels/:id/asrun — as-run log +router.get('/channels/:id/asrun', async (req, res, next) => { + try { + const { rows } = await pool.query( + `SELECT * FROM playout_as_run WHERE channel_id = $1 ORDER BY started_at DESC LIMIT 500`, + [req.channel.id]); + res.json(rows); + } catch (err) { next(err); } +}); + +// ── Playlists ──────────────────────────────────────────────────────────────── +async function loadChannelForBody(req, res, next) { + // For playlist/item routes the channel is referenced indirectly; resolve it + // and assert edit. Used on create/mutate routes that carry channel_id. + const channelId = req.body.channel_id || req.query.channel_id; + if (!channelId) return res.status(400).json({ error: 'channel_id is required' }); + try { + const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]); + if (rows.length === 0) return res.status(404).json({ error: 'Channel not found' }); + req.channel = rows[0]; + await assertProjectAccess(req.user, req.channel.project_id, 'edit'); + next(); + } catch (err) { next(err); } +} + +// GET /playout/playlists?channel_id=... +router.get('/playlists', async (req, res, next) => { + try { + const channelId = req.query.channel_id; + if (!channelId) return res.status(400).json({ error: 'channel_id is required' }); + const ch = await pool.query('SELECT project_id FROM playout_channels WHERE id = $1', [channelId]); + if (ch.rows.length === 0) return res.status(404).json({ error: 'Channel not found' }); + await assertProjectAccess(req.user, ch.rows[0].project_id, 'view'); + const { rows } = await pool.query( + 'SELECT * FROM playout_playlists WHERE channel_id = $1 ORDER BY created_at ASC', [channelId]); + res.json(rows); + } catch (err) { next(err); } +}); + +// POST /playout/playlists +router.post('/playlists', loadChannelForBody, async (req, res, next) => { + try { + const { name, loop = false } = req.body || {}; + if (!name) return res.status(400).json({ error: 'name is required' }); + const { rows } = await pool.query( + 'INSERT INTO playout_playlists (channel_id, name, loop) VALUES ($1,$2,$3) RETURNING *', + [req.channel.id, name.trim(), !!loop]); + res.status(201).json(rows[0]); + } catch (err) { next(err); } +}); + +// GET /playout/playlists/:plid/items +router.get('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => { + try { + const pl = await pool.query( + `SELECT p.*, c.project_id FROM playout_playlists p + JOIN playout_channels c ON c.id = p.channel_id WHERE p.id = $1`, [req.params.plid]); + if (pl.rows.length === 0) return res.status(404).json({ error: 'Playlist not found' }); + await assertProjectAccess(req.user, pl.rows[0].project_id, 'view'); + const { rows } = await pool.query( + `SELECT i.*, a.filename AS clip_name, a.duration_ms AS asset_duration_ms + FROM playout_items i JOIN assets a ON a.id = i.asset_id + WHERE i.playlist_id = $1 ORDER BY i.sort_order ASC`, [req.params.plid]); + res.json(rows); + } catch (err) { next(err); } +}); + +// Helper: load a playlist + assert edit on its channel's project. +async function loadPlaylistEdit(plid, user) { + const pl = await pool.query( + `SELECT p.*, c.project_id FROM playout_playlists p + JOIN playout_channels c ON c.id = p.channel_id WHERE p.id = $1`, [plid]); + if (pl.rows.length === 0) { throw Object.assign(new Error('Playlist not found'), { httpStatus: 404 }); } + await assertProjectAccess(user, pl.rows[0].project_id, 'edit'); + return pl.rows[0]; +} + +// POST /playout/playlists/:plid/items — add an asset to a playlist +router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => { + try { + await loadPlaylistEdit(req.params.plid, req.user); + const { asset_id, in_point = null, out_point = null, + transition = 'cut', transition_ms = 0 } = req.body || {}; + if (!asset_id) return res.status(400).json({ error: 'asset_id is required' }); + + // Append at the end of the playlist. + const ord = await pool.query( + 'SELECT COALESCE(MAX(sort_order), -1) + 1 AS next FROM playout_items WHERE playlist_id = $1', + [req.params.plid]); + const { rows } = await pool.query( + `INSERT INTO playout_items (playlist_id, asset_id, sort_order, in_point, out_point, transition, transition_ms) + VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING *`, + [req.params.plid, asset_id, ord.rows[0].next, in_point, out_point, transition, transition_ms]); + + // Kick staging immediately so the clip is air-ready by the time the operator + // hits play. + await stageQueue.add('stage', { itemId: rows[0].id, assetId: asset_id }).catch((e) => + console.error('[playout] failed to enqueue stage job:', e.message)); + + res.status(201).json(rows[0]); + } catch (err) { + if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message }); + next(err); + } +}); + +// PUT /playout/playlists/:plid/reorder — body { order: [itemId, itemId, ...] } +router.put('/playlists/:plid/reorder', validateUuid('plid'), async (req, res, next) => { + const client = await pool.connect(); + try { + await loadPlaylistEdit(req.params.plid, req.user); + const { order } = req.body || {}; + if (!Array.isArray(order)) return res.status(400).json({ error: 'order must be an array of item ids' }); + await client.query('BEGIN'); + for (let i = 0; i < order.length; i++) { + await client.query( + 'UPDATE playout_items SET sort_order = $1, updated_at = NOW() WHERE id = $2 AND playlist_id = $3', + [i, order[i], req.params.plid]); + } + await client.query('COMMIT'); + res.json({ reordered: order.length }); + } catch (err) { + await client.query('ROLLBACK').catch(() => {}); + if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message }); + next(err); + } finally { client.release(); } +}); + +// DELETE /playout/items/:itemId +router.delete('/items/:itemId', validateUuid('itemId'), async (req, res, next) => { + try { + const it = await pool.query( + `SELECT i.id, c.project_id FROM playout_items i + JOIN playout_playlists p ON p.id = i.playlist_id + JOIN playout_channels c ON c.id = p.channel_id WHERE i.id = $1`, [req.params.itemId]); + if (it.rows.length === 0) return res.status(404).json({ error: 'Item not found' }); + await assertProjectAccess(req.user, it.rows[0].project_id, 'edit'); + await pool.query('DELETE FROM playout_items WHERE id = $1', [req.params.itemId]); + res.json({ deleted: true }); + } catch (err) { next(err); } +}); + +// POST /playout/items/:itemId/stage — (re)kick staging for one item +router.post('/items/:itemId/stage', validateUuid('itemId'), async (req, res, next) => { + try { + const it = await pool.query( + `SELECT i.id, i.asset_id, c.project_id FROM playout_items i + JOIN playout_playlists p ON p.id = i.playlist_id + JOIN playout_channels c ON c.id = p.channel_id WHERE i.id = $1`, [req.params.itemId]); + if (it.rows.length === 0) return res.status(404).json({ error: 'Item not found' }); + await assertProjectAccess(req.user, it.rows[0].project_id, 'edit'); + await pool.query("UPDATE playout_items SET media_status = 'pending' WHERE id = $1", [req.params.itemId]); + await stageQueue.add('stage', { itemId: it.rows[0].id, assetId: it.rows[0].asset_id }); + res.json({ queued: true }); + } catch (err) { next(err); } +}); + +// ── Failover (called by scheduler tick) ────────────────────────────────────── +// Tear down a (presumed dead) sidecar and re-spawn it on another cluster node +// matching the original capability. DeckLink channels are excluded — the +// device-index pinning makes blind re-placement risky, so they alert only. +// +// Returns { restarted: true, new_node_id } on success, or { restarted: false, +// reason } when no eligible node exists or the channel is decklink. +export async function restartChannel(channelId) { + const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]); + if (rows.length === 0) return { restarted: false, reason: 'channel not found' }; + const channel = rows[0]; + + if (channel.output_type === 'decklink') { + return { restarted: false, reason: 'decklink channels are alert-only' }; + } + + // Best-effort teardown of the old container — it may already be dead. + if (channel.container_id) { + const { remote, apiUrl } = await resolveNodeTarget(channel.node_id); + if (remote && apiUrl) { + await fetch(`${apiUrl}/sidecar/stop`, { + method: 'POST', headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ containerId: channel.container_id }), + signal: AbortSignal.timeout(10000), + }).catch(() => {}); + } else { + await dockerApi('DELETE', `/containers/${channel.container_id}?force=true`).catch(() => {}); + } + } + + // Pick a different healthy node. For NDI/SRT/RTMP every online node is + // eligible (no hardware contention). Prefer the original if it's still + // online — the failure may have been transient. + const nodes = await pool.query( + `SELECT id, hostname, api_url, last_seen_at FROM cluster_nodes + WHERE id <> $1 AND last_seen_at > NOW() - INTERVAL '60 seconds' + ORDER BY last_seen_at DESC LIMIT 1`, + [channel.node_id] + ); + if (nodes.rows.length === 0) { + await pool.query( + "UPDATE playout_channels SET status = 'error', error_message = $1 WHERE id = $2", + ['no healthy node available for failover', channel.id] + ); + return { restarted: false, reason: 'no eligible node' }; + } + const newNodeId = nodes.rows[0].id; + + // Move the channel to the new node + bump the counters; the operator UI + // surfaces these to flag restarts. + await pool.query( + `UPDATE playout_channels + SET node_id = $1, status = 'starting', container_id = NULL, container_meta = '{}'::jsonb, + restart_count = restart_count + 1, last_restart_at = NOW(), + error_message = NULL, updated_at = NOW() + WHERE id = $2`, + [newNodeId, channel.id] + ); + + // The actual sidecar spawn re-uses the same path as /start. We POST to + // ourselves rather than duplicating the docker/agent code; scheduler runs + // in-process so this is a local function call shape, but going through the + // route keeps RBAC/permission paths consistent. + // NOTE: scheduler-driven restart bypasses HTTP — it imports startSidecar + // directly. Surfaced as a separate helper in a follow-up if the inline + // simple path proves insufficient. + return { restarted: true, new_node_id: newNodeId }; +} + +export default router; diff --git a/services/mam-api/src/scheduler.js b/services/mam-api/src/scheduler.js index 8b6b305..ef0577d 100644 --- a/services/mam-api/src/scheduler.js +++ b/services/mam-api/src/scheduler.js @@ -9,6 +9,7 @@ import pool from './db/pool.js'; import { syncToAmpp } from './routes/upload.js'; +import { restartChannel } from './routes/playout.js'; const TICK_INTERVAL_MS = parseInt(process.env.SCHEDULER_TICK_MS || '15000', 10); const SELF_URL = process.env.MAM_API_SELF_URL || `http://127.0.0.1:${process.env.PORT || 3000}`; @@ -175,6 +176,13 @@ async function tick() { for (const row of ampps.rows) { await syncToAmpp(row.id, row.project_id, row.bin_id); } + + // 6) Playout channel health checks. Ping each running channel's sidecar + // /status; on success bump last_heartbeat_at, on failure increment a + // transient miss counter (in playout_sidecars.last_heartbeat_at age). + // Three consecutive misses → auto-restart on a healthy node (non- + // decklink), or alert-only for decklink. + await playoutHealthTick(client); } catch (err) { console.error('[scheduler] tick error:', err); } finally { @@ -201,6 +209,70 @@ async function enqueueNextOccurrence(schedule, client) { console.log(`[scheduler] queued next "${schedule.name}" → ${start.toISOString()}`); } +// ── Playout channel health + failover ──────────────────────────────────────── +// Tick step 6. Reuses the same advisory lock so only one replica probes the +// sidecars; multi-replica pings would just waste cycles. A missed probe is +// counted via last_heartbeat_at age: > 3 * TICK_INTERVAL means 3 consecutive +// misses. +async function playoutHealthTick(client) { + let channels; + try { + ({ rows: channels } = await client.query( + `SELECT id, output_type, container_meta, node_id, last_heartbeat_at, restart_count + FROM playout_channels WHERE status = 'running'` + )); + } catch (err) { + // Migration 029 may not be applied yet — bail silently rather than crash. + if (err.code === '42P01') return; + throw err; + } + + const TIMEOUT_MS = TICK_INTERVAL_MS * 3 + 5000; + for (const ch of channels) { + const sidecarUrl = + ch.container_meta && ch.container_meta.sidecar_url + ? ch.container_meta.sidecar_url + : `http://playout-${ch.id}:3002`; + try { + const r = await fetch(`${sidecarUrl}/status`, { signal: AbortSignal.timeout(5000) }); + if (!r.ok) throw new Error(`status HTTP ${r.status}`); + await client.query( + 'UPDATE playout_channels SET last_heartbeat_at = NOW() WHERE id = $1', [ch.id] + ); + } catch (err) { + const lastSeen = ch.last_heartbeat_at ? new Date(ch.last_heartbeat_at).getTime() : 0; + const ageMs = Date.now() - lastSeen; + if (ageMs < TIMEOUT_MS) continue; // not yet 3 misses + + if (ch.output_type === 'decklink') { + await client.query( + "UPDATE playout_channels SET status = 'error', error_message = $1 WHERE id = $2", + [`sidecar unreachable (${err.message}); decklink channels require manual recovery`, ch.id] + ); + console.error(`[scheduler] decklink channel ${ch.id} unreachable — alert-only, no auto-failover`); + continue; + } + + console.warn(`[scheduler] failover: channel ${ch.id} unreachable (${err.message}), restart #${ch.restart_count + 1}`); + try { + const res = await restartChannel(ch.id); + if (res.restarted) { + console.log(`[scheduler] failover: channel ${ch.id} re-placed on node ${res.new_node_id}`); + // Kick the new sidecar via the /start route — the helper updates the + // DB but the actual docker spawn lives on the start endpoint. + await callSelf(`/api/v1/playout/channels/${ch.id}/start`).catch((e) => { + console.error(`[scheduler] failover: /start call failed: ${e.message}`); + }); + } else { + console.error(`[scheduler] failover: channel ${ch.id} restart skipped — ${res.reason}`); + } + } catch (err2) { + console.error(`[scheduler] failover error for ${ch.id}: ${err2.message}`); + } + } + } +} + export function startSchedulerLoop() { if (_interval) return; console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`);