From ca71e47035a363e938bff47a06f129eb22b49391 Mon Sep 17 00:00:00 2001 From: Zac Date: Sat, 30 May 2026 14:51:35 +0000 Subject: [PATCH] fix(playout): repair failover, authenticate scheduler self-calls, fix playlist walk + CasparCG consumer syntax Post-review fixes for the 8-commit playout-mcr drop: - Scheduler self-calls (callSelf -> /recorders, /playout) carried no auth, so under AUTH_ENABLED=true requireUiHeader 403'd every mutating POST. This broke playout failover AND scheduled recordings. Add a per-boot in-process service token (x-internal-token) the scheduler attaches; requireAuth/requireUiHeader treat it as the seeded admin. No env/compose config needed. - Failover deadlocked: restartChannel set status='starting' then the scheduler called the guarded /start route, which 409s on 'starting'. Extract the spawn body into spawnChannelSidecar() shared by /start and restartChannel; failover now spawns directly with no self-call. - Phase A playlist stalled after 2 clips: _scheduleAdvance cued the next clip via LOADBG AUTO but never advanced the pointer. Pass asset_duration_ms in the /play payload and arm a duration-based timer that advances currentIndex and cues subsequent clips, keeping as-run in sync for arbitrary-length playlists. - CasparCG consumer syntax was invalid: "ADD FFMPEG" is the producer name, not a consumer keyword, and old -vcodec/-acodec short args are rejected. Use STREAM/FILE with -codec:v / -codec:a / -preset:v / -tune:v and a format=yuv420p filter ahead of libx264 (channel output is RGBA). Co-Authored-By: Claude Opus 4.7 --- services/mam-api/src/middleware/auth.js | 26 +++ services/mam-api/src/routes/playout.js | 213 +++++++++++++----------- services/mam-api/src/scheduler.js | 13 +- services/playout/src/playout-manager.js | 65 ++++++-- 4 files changed, 199 insertions(+), 118 deletions(-) diff --git a/services/mam-api/src/middleware/auth.js b/services/mam-api/src/middleware/auth.js index f323e29..21f1d39 100644 --- a/services/mam-api/src/middleware/auth.js +++ b/services/mam-api/src/middleware/auth.js @@ -1,6 +1,23 @@ +import crypto from 'crypto'; import pool from '../db/pool.js'; import { parseBearer, hashToken } from '../auth/tokens.js'; +// In-process service token for the scheduler's loopback self-calls +// (scheduler.js -> /recorders|/playout). The scheduler runs in THIS process, so +// a per-boot random constant needs no env/compose config and is never exposed: +// it only travels over the loopback fetch inside the same process. Multi-replica +// is safe because each replica's scheduler only ever calls 127.0.0.1 (itself), +// matching that replica's token. Requests bearing it are treated as the seeded +// admin (DEV_USER) so RBAC + FK-bearing routes work. +export const INTERNAL_TOKEN = crypto.randomBytes(32).toString('hex'); +const INTERNAL_HEADER = 'x-internal-token'; + +function isInternalCall(req) { + const got = req.headers[INTERNAL_HEADER]; + if (typeof got !== 'string' || got.length !== INTERNAL_TOKEN.length) return false; + return crypto.timingSafeEqual(Buffer.from(got), Buffer.from(INTERNAL_TOKEN)); +} + // Stable UUID matching migration 023's seeded dev user. /** UUID of the seeded dev-mode placeholder. NOT a sentinel for "any unauthenticated user". */ export const DEV_USER_ID = '00000000-0000-4000-8000-000000000000'; @@ -25,6 +42,13 @@ async function loadUser(id) { } export async function requireAuth(req, res, next) { + // Internal loopback self-call (scheduler). Acts as the seeded admin so RBAC + // and FK-bearing routes work, regardless of AUTH_ENABLED. + if (isInternalCall(req)) { + req.user = DEV_USER; + return next(); + } + // Dev mode — attach the seeded dev user so FK-bearing routes work. if (process.env.AUTH_ENABLED !== 'true') { req.user = DEV_USER; @@ -98,6 +122,8 @@ const CSRF_EXEMPT_PATHS = new Set(['/cluster/heartbeat']); export function requireUiHeader(req, res, next) { if (!MUTATING.has(req.method)) return next(); + // Internal loopback self-call (scheduler) — not a browser, can't be drive-by'd. + if (isInternalCall(req)) return next(); // Bearer-authed requests (Premiere panel, scripts) are exempt — they're not // browsers and can't be drive-by'd from another origin. if (req.headers.authorization?.toLowerCase().startsWith('bearer ')) return next(); diff --git a/services/mam-api/src/routes/playout.js b/services/mam-api/src/routes/playout.js index 161eb5f..4b68e0c 100644 --- a/services/mam-api/src/routes/playout.js +++ b/services/mam-api/src/routes/playout.js @@ -253,6 +253,100 @@ async function assertDeckLinkFree(channel) { } } +// Spawn the CasparCG sidecar for a channel and flip it to 'running'. Shared by +// the /start route and the scheduler failover path (restartChannel) so neither +// duplicates the docker/node-agent orchestration. Caller is responsible for the +// pre-flight guards (status check, DeckLink contention) appropriate to its path. +// +// On any spawn failure the channel is left status='error' with a message and an +// Error carrying { httpStatus } is thrown. On success returns the updated row. +async function spawnChannelSidecar(channel) { + await pool.query('UPDATE playout_channels SET status = $1, error_message = NULL WHERE id = $2', ['starting', channel.id]); + + const env = [ + `OUTPUT_TYPE=${channel.output_type}`, + `OUTPUT_CONFIG=${JSON.stringify(channel.output_config || {})}`, + `VIDEO_FORMAT=${channel.video_format}`, + `PORT=${SIDECAR_HTTP_PORT}`, + // Drives the HLS preview path (/media/live//index.m3u8) and + // the per-channel resource naming inside the sidecar. + `CHANNEL_ID=${channel.id}`, + ]; + + const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(channel.node_id); + const dockerNetwork = process.env.DOCKER_NETWORK || 'wild-dragon_wild-dragon'; + let containerId; + let containerMeta = {}; + + if (isRemote) { + const sidecarRes = await fetch(`${targetNodeApiUrl}/sidecar/start`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + image: PLAYOUT_SIDECAR_IMAGE, env, + capturePort: SIDECAR_HTTP_PORT, + sourceType: channel.output_type, + useGpu: false, + publishHttp: true, + }), + signal: AbortSignal.timeout(20000), + }); + if (!sidecarRes.ok) { + const details = await sidecarRes.json().catch(() => ({})); + console.error('[playout] remote sidecar start failed:', JSON.stringify(details)); + await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3', + ['error', 'remote node failed to start sidecar', channel.id]); + throw Object.assign(new Error('Remote node failed to start sidecar'), { httpStatus: 502 }); + } + const data = await sidecarRes.json(); + containerId = data.containerId; + // node-agent returns the reachable host:port the shim is published on. + if (data.sidecarUrl || data.host) { + containerMeta.sidecar_url = data.sidecarUrl || `http://${data.host}:${SIDECAR_HTTP_PORT}`; + } + } else { + const alias = channelAlias(channel.id); + const hostBinds = ['/mnt/NVME/MAM/wild-dragon-media:/media']; + if (channel.output_type === 'decklink') hostBinds.push('/dev/blackmagic:/dev/blackmagic'); + + const containerConfig = { + Image: PLAYOUT_SIDECAR_IMAGE, + Env: env, + HostConfig: { + Privileged: true, + NetworkMode: dockerNetwork, + Binds: hostBinds, + }, + NetworkingConfig: { EndpointsConfig: { [dockerNetwork]: { Aliases: [alias] } } }, + Hostname: alias, + }; + const createRes = await dockerApi('POST', '/containers/create', containerConfig); + if (createRes.status !== 201) { + console.error('[playout] container create failed:', JSON.stringify(createRes.data)); + await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3', + ['error', 'container create failed', channel.id]); + throw Object.assign(new Error('Failed to create container'), { httpStatus: 500 }); + } + containerId = createRes.data.Id; + const startRes = await dockerApi('POST', `/containers/${containerId}/start`); + if (startRes.status !== 204) { + console.error('[playout] container start failed:', JSON.stringify(startRes.data)); + await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {}); + await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3', + ['error', 'container start failed', channel.id]); + throw Object.assign(new Error('Failed to start container'), { httpStatus: 500 }); + } + } + + const { rows } = await pool.query( + `UPDATE playout_channels + SET status = 'running', container_id = $1, container_meta = $2, updated_at = NOW() + WHERE id = $3 RETURNING *`, + [containerId, JSON.stringify(containerMeta), channel.id] + ); + return rows[0]; +} + // POST /playout/channels/:id/start — spawn the CasparCG sidecar + bring up output router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) => { try { @@ -261,91 +355,8 @@ router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) => return res.status(409).json({ error: `Channel already ${channel.status}` }); } await assertDeckLinkFree(channel); - - await pool.query('UPDATE playout_channels SET status = $1, error_message = NULL WHERE id = $2', ['starting', channel.id]); - - const env = [ - `OUTPUT_TYPE=${channel.output_type}`, - `OUTPUT_CONFIG=${JSON.stringify(channel.output_config || {})}`, - `VIDEO_FORMAT=${channel.video_format}`, - `PORT=${SIDECAR_HTTP_PORT}`, - // Drives the HLS preview path (/media/live//index.m3u8) and - // the per-channel resource naming inside the sidecar. - `CHANNEL_ID=${channel.id}`, - ]; - - const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(channel.node_id); - const dockerNetwork = process.env.DOCKER_NETWORK || 'wild-dragon_wild-dragon'; - let containerId; - let containerMeta = {}; - - if (isRemote) { - const sidecarRes = await fetch(`${targetNodeApiUrl}/sidecar/start`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - image: PLAYOUT_SIDECAR_IMAGE, env, - capturePort: SIDECAR_HTTP_PORT, - sourceType: channel.output_type, - useGpu: false, - publishHttp: true, - }), - signal: AbortSignal.timeout(20000), - }); - if (!sidecarRes.ok) { - const details = await sidecarRes.json().catch(() => ({})); - console.error('[playout] remote sidecar start failed:', JSON.stringify(details)); - await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3', - ['error', 'remote node failed to start sidecar', channel.id]); - return res.status(502).json({ error: 'Remote node failed to start sidecar' }); - } - const data = await sidecarRes.json(); - containerId = data.containerId; - // node-agent returns the reachable host:port the shim is published on. - if (data.sidecarUrl || data.host) { - containerMeta.sidecar_url = data.sidecarUrl || `http://${data.host}:${SIDECAR_HTTP_PORT}`; - } - } else { - const alias = channelAlias(channel.id); - const hostBinds = ['/mnt/NVME/MAM/wild-dragon-media:/media']; - if (channel.output_type === 'decklink') hostBinds.push('/dev/blackmagic:/dev/blackmagic'); - - const containerConfig = { - Image: PLAYOUT_SIDECAR_IMAGE, - Env: env, - HostConfig: { - Privileged: true, - NetworkMode: dockerNetwork, - Binds: hostBinds, - }, - NetworkingConfig: { EndpointsConfig: { [dockerNetwork]: { Aliases: [alias] } } }, - Hostname: alias, - }; - const createRes = await dockerApi('POST', '/containers/create', containerConfig); - if (createRes.status !== 201) { - console.error('[playout] container create failed:', JSON.stringify(createRes.data)); - await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3', - ['error', 'container create failed', channel.id]); - return res.status(500).json({ error: 'Failed to create container' }); - } - containerId = createRes.data.Id; - const startRes = await dockerApi('POST', `/containers/${containerId}/start`); - if (startRes.status !== 204) { - console.error('[playout] container start failed:', JSON.stringify(startRes.data)); - await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {}); - await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3', - ['error', 'container start failed', channel.id]); - return res.status(500).json({ error: 'Failed to start container' }); - } - } - - const { rows } = await pool.query( - `UPDATE playout_channels - SET status = 'running', container_id = $1, container_meta = $2, updated_at = NOW() - WHERE id = $3 RETURNING *`, - [containerId, JSON.stringify(containerMeta), channel.id] - ); - res.json(channelToJson(rows[0])); + const row = await spawnChannelSidecar(channel); + res.json(channelToJson(row)); } catch (err) { if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message }); next(err); @@ -414,7 +425,7 @@ router.post('/channels/:id/play', requireChannelEdit, async (req, res, next) => if (pl.rows.length === 0) return res.status(404).json({ error: 'Playlist not found for this channel' }); const items = await pool.query( - `SELECT i.*, a.filename AS clip_name + `SELECT i.*, a.filename AS clip_name, a.duration_ms AS asset_duration_ms FROM playout_items i JOIN assets a ON a.id = i.asset_id WHERE i.playlist_id = $1 ORDER BY i.sort_order ASC`, [playlist_id]); @@ -434,6 +445,7 @@ router.post('/channels/:id/play', requireChannelEdit, async (req, res, next) => out_point: i.out_point ? Number(i.out_point) : null, transition: i.transition, transition_ms: i.transition_ms, clip_name: i.clip_name, + asset_duration_ms: i.asset_duration_ms != null ? Number(i.asset_duration_ms) : null, })), }; const out = await callSidecar(req.channel, '/playlist/load', 'POST', payload); @@ -651,25 +663,28 @@ export async function restartChannel(channelId) { } const newNodeId = nodes.rows[0].id; - // Move the channel to the new node + bump the counters; the operator UI - // surfaces these to flag restarts. - await pool.query( + // Move the channel to the new node + bump the restart counters; the operator + // UI surfaces these to flag restarts. container_meta is cleared so the new + // spawn re-derives the sidecar URL. + const { rows: moved } = await pool.query( `UPDATE playout_channels - SET node_id = $1, status = 'starting', container_id = NULL, container_meta = '{}'::jsonb, + SET node_id = $1, status = 'stopped', container_id = NULL, container_meta = '{}'::jsonb, restart_count = restart_count + 1, last_restart_at = NOW(), error_message = NULL, updated_at = NOW() - WHERE id = $2`, + WHERE id = $2 RETURNING *`, [newNodeId, channel.id] ); - // The actual sidecar spawn re-uses the same path as /start. We POST to - // ourselves rather than duplicating the docker/agent code; scheduler runs - // in-process so this is a local function call shape, but going through the - // route keeps RBAC/permission paths consistent. - // NOTE: scheduler-driven restart bypasses HTTP — it imports startSidecar - // directly. Surfaced as a separate helper in a follow-up if the inline - // simple path proves insufficient. - return { restarted: true, new_node_id: newNodeId }; + // Spawn the sidecar directly via the shared helper. We do NOT route through + // the HTTP /start endpoint: its guard rejects status 'starting'/'running' and + // would deadlock the failover. spawnChannelSidecar flips the channel to + // running (or leaves it 'error' and throws on spawn failure). + try { + await spawnChannelSidecar(moved[0]); + return { restarted: true, new_node_id: newNodeId }; + } catch (err) { + return { restarted: false, reason: `respawn failed: ${err.message}` }; + } } export default router; diff --git a/services/mam-api/src/scheduler.js b/services/mam-api/src/scheduler.js index ef0577d..027fa5e 100644 --- a/services/mam-api/src/scheduler.js +++ b/services/mam-api/src/scheduler.js @@ -10,6 +10,7 @@ import pool from './db/pool.js'; import { syncToAmpp } from './routes/upload.js'; import { restartChannel } from './routes/playout.js'; +import { INTERNAL_TOKEN } from './middleware/auth.js'; const TICK_INTERVAL_MS = parseInt(process.env.SCHEDULER_TICK_MS || '15000', 10); const SELF_URL = process.env.MAM_API_SELF_URL || `http://127.0.0.1:${process.env.PORT || 3000}`; @@ -20,7 +21,10 @@ let _interval = null; async function callSelf(path, method = 'POST') { const res = await fetch(`${SELF_URL}${path}`, { method, - headers: { 'Content-Type': 'application/json' }, + headers: { + 'Content-Type': 'application/json', + 'x-internal-token': INTERNAL_TOKEN, + }, signal: AbortSignal.timeout(30000), }); if (!res.ok) { @@ -255,14 +259,11 @@ async function playoutHealthTick(client) { console.warn(`[scheduler] failover: channel ${ch.id} unreachable (${err.message}), restart #${ch.restart_count + 1}`); try { + // restartChannel re-places the channel on a healthy node AND spawns the + // new sidecar directly (shared helper) — no /start self-call needed. const res = await restartChannel(ch.id); if (res.restarted) { console.log(`[scheduler] failover: channel ${ch.id} re-placed on node ${res.new_node_id}`); - // Kick the new sidecar via the /start route — the helper updates the - // DB but the actual docker spawn lives on the start endpoint. - await callSelf(`/api/v1/playout/channels/${ch.id}/start`).catch((e) => { - console.error(`[scheduler] failover: /start call failed: ${e.message}`); - }); } else { console.error(`[scheduler] failover: channel ${ch.id} restart skipped — ${res.reason}`); } diff --git a/services/playout/src/playout-manager.js b/services/playout/src/playout-manager.js index 9c2ba84..95b856c 100644 --- a/services/playout/src/playout-manager.js +++ b/services/playout/src/playout-manager.js @@ -90,16 +90,20 @@ export class PlayoutManager { return `NDI NAME "${name}"`; } if (outputType === 'srt' || outputType === 'rtmp') { - // CasparCG 2.3+ FFMPEG consumer streams the channel out via libavformat. - // SRT/RTMP both go through the ffmpeg mpegts/flv muxers. + // CasparCG 2.3 streams via the FFMPEG consumer, invoked with the STREAM + // keyword (FILE/STREAM are interchangeable aliases for it; the bare word + // "FFMPEG" is the PRODUCER and is NOT a valid consumer keyword). Args must + // use ffmpeg's -param:stream form (-codec:v, not -vcodec) or CasparCG + // rejects them. The channel feeds the consumer as RGBA, so a + // format=yuv420p filter is required before libx264. const url = cfg.url || ''; if (outputType === 'srt') { const latency = cfg.latency || 200; const full = url.includes('latency=') ? url : `${url}${url.includes('?') ? '&' : '?'}latency=${latency}`; - return `FFMPEG "${full}" -format mpegts -vcodec libx264 -preset veryfast -tune zerolatency -b:v 6M -acodec aac -b:a 192k`; + return `STREAM "${full}" -format mpegts -codec:v libx264 -preset:v veryfast -tune:v zerolatency -b:v 6M -codec:a aac -b:a 192k -filter:v format=yuv420p`; } const target = cfg.key ? `${url}/${cfg.key}` : url; - return `FFMPEG "${target}" -format flv -vcodec libx264 -preset veryfast -tune zerolatency -b:v 6M -acodec aac -b:a 192k`; + return `STREAM "${target}" -format flv -codec:v libx264 -preset:v veryfast -tune:v zerolatency -b:v 6M -codec:a aac -b:a 192k -filter:v format=yuv420p`; } throw new Error(`Unknown output_type: ${outputType}`); } @@ -144,16 +148,20 @@ export class PlayoutManager { // mkdir is done by the entrypoint; CasparCG's ffmpeg consumer creates the // playlist on first segment. 2s segments / 6-window list keeps lag low // without thrashing disk. + // FILE keyword (alias of the FFMPEG consumer) writing a segmented HLS + // playlist. Same arg rules as the STREAM consumer: -param:stream form and a + // format=yuv420p filter ahead of libx264 (channel output is RGBA). const out = `${HLS_DIR}/index.m3u8`; const args = [ - `FFMPEG "${out}"`, + `FILE "${out}"`, '-format hls', '-hls_time 2', '-hls_list_size 6', '-hls_flags delete_segments+append_list', - '-vcodec libx264 -preset veryfast -tune zerolatency -b:v 800k -maxrate 1M -bufsize 2M', + '-codec:v libx264 -preset:v veryfast -tune:v zerolatency -b:v 800k -maxrate 1M -bufsize 2M', '-g 60 -keyint_min 60 -sc_threshold 0', - '-acodec aac -b:a 96k', + '-codec:a aac -b:a 96k', + '-filter:v format=yuv420p', ].join(' '); await this.amcp.send(`ADD ${CHANNEL} ${args}`); } @@ -203,15 +211,26 @@ export class PlayoutManager { this._scheduleAdvance(item); } - // CasparCG's LOADBG ... AUTO automatically swaps the background layer to - // foreground when the current clip ends. To keep our bookkeeping (currentIndex - // / as-run) in sync we additionally poll INFO and advance our pointer when the - // foreground clip changes. For Phase A we use a simpler model: cue the next - // clip with AUTO and use a duration-based timer to move our pointer. + // Effective on-air duration of an item in milliseconds. Prefers an explicit + // in/out trim, else the asset's full duration. Returns null when unknown (no + // duration metadata + no out_point) so the caller can skip the timer. + _itemDurationMs(item) { + const inS = item.in_point || 0; + if (item.out_point && item.out_point > inS) return (item.out_point - inS) * 1000; + if (item.asset_duration_ms != null) return Math.max(0, item.asset_duration_ms - inS * 1000); + return null; + } + + // CasparCG's LOADBG ... AUTO swaps the cued background clip to foreground when + // the current clip ends, giving a gapless visual take. But CasparCG won't cue + // clip N+2 on its own and won't move OUR pointer / as-run bookkeeping. So we + // also arm a duration-based timer: when the current clip is due to end we + // advance currentIndex and cue the following clip. This keeps an arbitrary- + // length playlist walking, not just the first two items. _scheduleAdvance(item) { this._clearAdvance(); const next = this._nextIndex(); - if (next === null) return; + if (next === null) return; // end of a non-looping playlist const nextItem = this.state.playlist[next]; const nextToken = toCasparToken(nextItem.media_path); const fps = this.state.fps || fpsFor(this.state.videoFormat); @@ -219,6 +238,26 @@ export class PlayoutManager { // Cue next on background with AUTO so CasparCG performs the gapless take. this.amcp.send(`LOADBG ${CHANNEL}-${FG_LAYER} "${nextToken}" AUTO${trans}`) .catch((err) => console.warn(`[playout] LOADBG failed: ${err.message}`)); + + // Arm the pointer-advance timer. Without duration metadata we can't time the + // hand-off; leave AUTO to take clip N+1 visually but log a warning since the + // pointer (and thus clip N+2 cueing) will stall. + const durMs = this._itemDurationMs(item); + if (durMs == null) { + console.warn(`[playout] no duration for clip [${this.state.currentIndex}] — pointer advance stalled after this clip`); + return; + } + this._advanceTimer = setTimeout(() => { + this._advanceTimer = null; + // The AUTO take already happened in CasparCG; just move our pointer and + // cue the clip after next. _playIndex would re-PLAY and double-take, so we + // advance state directly and re-arm. + this.state.currentIndex = next; + this.state.currentClip = nextItem.clip_name || nextToken; + console.log(`[playout] advance -> [${next}] ${nextToken}`); + this._reportAsRunStart(nextItem); + this._scheduleAdvance(nextItem); + }, Math.max(250, durMs)); } _nextIndex() {