Drop in the redesigned timeline-centric Playout (PGM monitor, transport, SCTE-35 card, as-run drawer) from the on-node redesign, fully wired to the real playout API (channels/transport/HLS preview w/ error-recovery/as-run); no mock data. In-page ConfirmModal for destructive actions. SCTE-35: new playout_scte_breaks table (migration 033), endpoints to schedule/trigger/list/cancel breaks (POST/GET/DELETE /channels/:id/scte[/trigger]), scheduler due-break sweep, engine triggerScte + auto-return + as-run 'scte' rows + on-air SCTE-BREAK state and timeline AD markers. In-stream SCTE-35 cue injection is a documented stub (CasparCG FFMPEG consumer exposes no scte35 muxer) — scheduling/triggering/countdown/as-run are functional. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
777 lines
33 KiB
JavaScript
777 lines
33 KiB
JavaScript
// Playout / Master Control routes.
|
|
//
|
|
// Control plane for the CasparCG-backed playout subsystem. Channels are placed
|
|
// on cluster nodes and their engine containers spawned via the same Docker-socket
|
|
// / node-agent path recorders use; the channel's transport (play / pause / skip)
|
|
// is proxied through to the sidecar's HTTP shim, which drives CasparCG over AMCP.
|
|
//
|
|
// RBAC: every channel carries a project_id (NULL = admin-only, the recorder
|
|
// convention). List routes filter by accessible projects; mutating routes assert
|
|
// 'edit'. See docs/superpowers/specs/2026-05-30-playout-mcr-design.md.
|
|
|
|
import express from 'express';
|
|
import http from 'http';
|
|
import { readFile } from 'fs/promises';
|
|
import { Queue } from 'bullmq';
|
|
import pool from '../db/pool.js';
|
|
import { validateUuid } from '../middleware/errors.js';
|
|
import {
|
|
assertProjectAccess, accessibleProjectIds, isAdmin,
|
|
} from '../auth/authz.js';
|
|
|
|
const router = express.Router();
|
|
|
|
const parseRedisUrl = (url) => {
|
|
const parsed = new URL(url);
|
|
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
|
|
};
|
|
const stageQueue = new Queue('playout-stage', {
|
|
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
|
});
|
|
|
|
const PLAYOUT_SIDECAR_IMAGE = process.env.PLAYOUT_IMAGE || 'wild-dragon-playout:latest';
|
|
|
|
function dockerApi(method, path, body = null) {
|
|
return new Promise((resolve, reject) => {
|
|
const options = {
|
|
socketPath: '/var/run/docker.sock',
|
|
path: `/v1.43${path}`,
|
|
method,
|
|
headers: { 'Content-Type': 'application/json' },
|
|
};
|
|
const req = http.request(options, (res) => {
|
|
let data = '';
|
|
res.on('data', (chunk) => (data += chunk));
|
|
res.on('end', () => {
|
|
try { resolve({ status: res.statusCode, data: data ? JSON.parse(data) : {} }); }
|
|
catch { resolve({ status: res.statusCode, data }); }
|
|
});
|
|
});
|
|
req.on('error', reject);
|
|
req.setTimeout(10000, () => req.destroy(new Error('Docker API timeout after 10s')));
|
|
if (body) req.write(JSON.stringify(body));
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
async function resolveNodeTarget(nodeId) {
|
|
if (!nodeId) return { remote: false };
|
|
const r = await pool.query(
|
|
'SELECT hostname, ip_address, api_url FROM cluster_nodes WHERE id = $1', [nodeId]
|
|
);
|
|
if (r.rows.length === 0) return { remote: false };
|
|
const node = r.rows[0];
|
|
const localHostname = process.env.NODE_HOSTNAME || '';
|
|
if (!node.api_url || node.hostname === localHostname) return { remote: false };
|
|
return { remote: true, apiUrl: node.api_url, ip: node.ip_address };
|
|
}
|
|
|
|
const SIDECAR_HTTP_PORT = 3002;
|
|
|
|
function channelAlias(id) { return `playout-${id}`; }
|
|
|
|
function sidecarBaseUrl(channel) {
|
|
if (channel.container_meta && channel.container_meta.sidecar_url) {
|
|
return channel.container_meta.sidecar_url;
|
|
}
|
|
return `http://${channelAlias(channel.id)}:${SIDECAR_HTTP_PORT}`;
|
|
}
|
|
|
|
async function callSidecar(channel, path, method = 'POST', body = null) {
|
|
const url = `${sidecarBaseUrl(channel)}${path}`;
|
|
const res = await fetch(url, {
|
|
method,
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: body ? JSON.stringify(body) : undefined,
|
|
signal: AbortSignal.timeout(20000),
|
|
});
|
|
if (!res.ok) {
|
|
const text = await res.text().catch(() => '');
|
|
throw new Error(`sidecar ${method} ${path} -> HTTP ${res.status}: ${text.slice(0, 200)}`);
|
|
}
|
|
return res.json().catch(() => ({}));
|
|
}
|
|
|
|
function channelToJson(r) {
|
|
return {
|
|
id: r.id,
|
|
name: r.name,
|
|
node_id: r.node_id,
|
|
output_type: r.output_type,
|
|
output_config: r.output_config,
|
|
video_format: r.video_format,
|
|
status: r.status,
|
|
container_id: r.container_id,
|
|
error_message: r.error_message,
|
|
project_id: r.project_id,
|
|
restart_count: r.restart_count ?? 0,
|
|
last_restart_at: r.last_restart_at,
|
|
last_heartbeat_at: r.last_heartbeat_at,
|
|
created_at: r.created_at,
|
|
updated_at: r.updated_at,
|
|
};
|
|
}
|
|
|
|
const OUTPUT_TYPES = new Set(['decklink', 'ndi', 'srt', 'rtmp']);
|
|
|
|
router.param('id', async (req, res, next) => {
|
|
validateUuid('id')(req, res, () => {});
|
|
if (res.headersSent) return;
|
|
try {
|
|
const { rows } = await pool.query(
|
|
'SELECT * FROM playout_channels WHERE id = $1', [req.params.id]
|
|
);
|
|
if (rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
|
|
req.channel = rows[0];
|
|
await assertProjectAccess(req.user, req.channel.project_id, 'view');
|
|
next();
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
async function requireChannelEdit(req, res, next) {
|
|
try { await assertProjectAccess(req.user, req.channel.project_id, 'edit'); next(); }
|
|
catch (err) { next(err); }
|
|
}
|
|
|
|
router.get('/channels', async (req, res, next) => {
|
|
try {
|
|
let rows;
|
|
if (isAdmin(req.user)) {
|
|
({ rows } = await pool.query('SELECT * FROM playout_channels ORDER BY created_at DESC'));
|
|
} else {
|
|
const ids = await accessibleProjectIds(req.user);
|
|
if (ids.length === 0) return res.json([]);
|
|
({ rows } = await pool.query(
|
|
'SELECT * FROM playout_channels WHERE project_id = ANY($1) ORDER BY created_at DESC', [ids]
|
|
));
|
|
}
|
|
res.json(rows.map(channelToJson));
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
router.post('/channels', async (req, res, next) => {
|
|
try {
|
|
const { name, node_id = null, output_type = 'srt', output_config = {},
|
|
video_format = '1080p5994', project_id = null } = req.body || {};
|
|
if (!name || typeof name !== 'string') {
|
|
return res.status(400).json({ error: 'name is required' });
|
|
}
|
|
if (!OUTPUT_TYPES.has(output_type)) {
|
|
return res.status(400).json({ error: `output_type must be one of: ${[...OUTPUT_TYPES].join(', ')}` });
|
|
}
|
|
if (project_id) await assertProjectAccess(req.user, project_id, 'edit');
|
|
else if (!isAdmin(req.user)) return res.status(403).json({ error: 'admin required for unassigned channel' });
|
|
|
|
const { rows } = await pool.query(
|
|
`INSERT INTO playout_channels (name, node_id, output_type, output_config, video_format, project_id)
|
|
VALUES ($1,$2,$3,$4,$5,$6) RETURNING *`,
|
|
[name.trim(), node_id, output_type, JSON.stringify(output_config), video_format, project_id]
|
|
);
|
|
res.status(201).json(channelToJson(rows[0]));
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
router.patch('/channels/:id', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
if (req.channel.status === 'running') {
|
|
return res.status(409).json({ error: 'Cannot edit a running channel — stop it first' });
|
|
}
|
|
const allowed = ['name', 'node_id', 'output_type', 'output_config', 'video_format', 'project_id'];
|
|
const sets = [];
|
|
const vals = [];
|
|
let i = 1;
|
|
for (const k of allowed) {
|
|
if (req.body[k] === undefined) continue;
|
|
if (k === 'output_type' && !OUTPUT_TYPES.has(req.body[k])) {
|
|
return res.status(400).json({ error: 'invalid output_type' });
|
|
}
|
|
sets.push(`${k} = $${i++}`);
|
|
vals.push(k === 'output_config' ? JSON.stringify(req.body[k]) : req.body[k]);
|
|
}
|
|
if (sets.length === 0) return res.json(channelToJson(req.channel));
|
|
vals.push(req.channel.id);
|
|
const { rows } = await pool.query(
|
|
`UPDATE playout_channels SET ${sets.join(', ')}, updated_at = NOW() WHERE id = $${i} RETURNING *`, vals
|
|
);
|
|
res.json(channelToJson(rows[0]));
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
router.delete('/channels/:id', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
if (req.channel.status === 'running') {
|
|
return res.status(409).json({ error: 'Stop the channel before deleting it' });
|
|
}
|
|
await pool.query('DELETE FROM playout_channels WHERE id = $1', [req.channel.id]);
|
|
res.json({ deleted: true });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
async function assertDeckLinkFree(channel) {
|
|
if (channel.output_type !== 'decklink') return;
|
|
const idx = (channel.output_config && channel.output_config.device_index) || 1;
|
|
const chan = await pool.query(
|
|
`SELECT id FROM playout_channels
|
|
WHERE id <> $1 AND node_id IS NOT DISTINCT FROM $2 AND status = 'running'
|
|
AND output_type = 'decklink' AND (output_config->>'device_index')::int = $3`,
|
|
[channel.id, channel.node_id, idx]
|
|
);
|
|
if (chan.rows.length > 0) {
|
|
throw Object.assign(new Error(`DeckLink device ${idx} already in use by another channel on this node`), { httpStatus: 409 });
|
|
}
|
|
const rec = await pool.query(
|
|
`SELECT id FROM recorders
|
|
WHERE node_id IS NOT DISTINCT FROM $1 AND device_index = $2
|
|
AND status = 'recording' AND source_type = 'sdi'`,
|
|
[channel.node_id, idx]
|
|
);
|
|
if (rec.rows.length > 0) {
|
|
throw Object.assign(new Error(`DeckLink device ${idx} is in use by a recorder on this node`), { httpStatus: 409 });
|
|
}
|
|
}
|
|
|
|
async function spawnChannelSidecar(channel) {
|
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = NULL WHERE id = $2', ['starting', channel.id]);
|
|
|
|
const env = [
|
|
`OUTPUT_TYPE=${channel.output_type}`,
|
|
`OUTPUT_CONFIG=${JSON.stringify(channel.output_config || {})}`,
|
|
`VIDEO_FORMAT=${channel.video_format}`,
|
|
`PORT=${SIDECAR_HTTP_PORT}`,
|
|
`CHANNEL_ID=${channel.id}`,
|
|
];
|
|
|
|
const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(channel.node_id);
|
|
const dockerNetwork = process.env.DOCKER_NETWORK || 'wild-dragon_wild-dragon';
|
|
let containerId;
|
|
let containerMeta = {};
|
|
|
|
if (isRemote) {
|
|
const sidecarRes = await fetch(`${targetNodeApiUrl}/sidecar/start`, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({
|
|
image: PLAYOUT_SIDECAR_IMAGE, env,
|
|
capturePort: SIDECAR_HTTP_PORT,
|
|
sourceType: channel.output_type,
|
|
useGpu: false,
|
|
publishHttp: true,
|
|
}),
|
|
signal: AbortSignal.timeout(20000),
|
|
});
|
|
if (!sidecarRes.ok) {
|
|
const details = await sidecarRes.json().catch(() => ({}));
|
|
console.error('[playout] remote sidecar start failed:', JSON.stringify(details));
|
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
|
|
['error', 'remote node failed to start sidecar', channel.id]);
|
|
throw Object.assign(new Error('Remote node failed to start sidecar'), { httpStatus: 502 });
|
|
}
|
|
const data = await sidecarRes.json();
|
|
containerId = data.containerId;
|
|
if (data.sidecarUrl || data.host) {
|
|
containerMeta.sidecar_url = data.sidecarUrl || `http://${data.host}:${SIDECAR_HTTP_PORT}`;
|
|
}
|
|
} else {
|
|
const alias = channelAlias(channel.id);
|
|
const hostBinds = ['/mnt/NVME/MAM/wild-dragon-media:/media'];
|
|
if (channel.output_type === 'decklink') hostBinds.push('/dev/blackmagic:/dev/blackmagic');
|
|
|
|
const containerConfig = {
|
|
Image: PLAYOUT_SIDECAR_IMAGE,
|
|
Env: env,
|
|
HostConfig: {
|
|
// DeckLink SDI needs raw /dev access (privileged). SRT/NDI/RTMP/HLS run
|
|
// unprivileged — privileged exposes host GPUs to CasparCG, and the
|
|
// missing in-container NVIDIA driver crashes the engine within seconds.
|
|
Privileged: channel.output_type === 'decklink',
|
|
NetworkMode: dockerNetwork,
|
|
Binds: hostBinds,
|
|
},
|
|
NetworkingConfig: { EndpointsConfig: { [dockerNetwork]: { Aliases: [alias] } } },
|
|
Hostname: alias,
|
|
};
|
|
const createRes = await dockerApi('POST', '/containers/create', containerConfig);
|
|
if (createRes.status !== 201) {
|
|
console.error('[playout] container create failed:', JSON.stringify(createRes.data));
|
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
|
|
['error', 'container create failed', channel.id]);
|
|
throw Object.assign(new Error('Failed to create container'), { httpStatus: 500 });
|
|
}
|
|
containerId = createRes.data.Id;
|
|
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
|
|
if (startRes.status !== 204) {
|
|
console.error('[playout] container start failed:', JSON.stringify(startRes.data));
|
|
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
|
|
['error', 'container start failed', channel.id]);
|
|
throw Object.assign(new Error('Failed to start container'), { httpStatus: 500 });
|
|
}
|
|
}
|
|
|
|
// Set last_heartbeat_at = NOW() so the scheduler health tick treats this
|
|
// channel as freshly alive. Without this, last_heartbeat_at starts as NULL
|
|
// (epoch=0), and the very first tick sees ageMs >> TIMEOUT_MS and triggers
|
|
// failover immediately — before the sidecar has had a chance to respond.
|
|
const { rows } = await pool.query(
|
|
`UPDATE playout_channels
|
|
SET status = 'running', container_id = $1, container_meta = $2,
|
|
last_heartbeat_at = NOW(), updated_at = NOW()
|
|
WHERE id = $3 RETURNING *`,
|
|
[containerId, JSON.stringify(containerMeta), channel.id]
|
|
);
|
|
return rows[0];
|
|
}
|
|
|
|
router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
const channel = req.channel;
|
|
if (channel.status === 'running' || channel.status === 'starting') {
|
|
return res.status(409).json({ error: `Channel already ${channel.status}` });
|
|
}
|
|
await assertDeckLinkFree(channel);
|
|
const row = await spawnChannelSidecar(channel);
|
|
res.json(channelToJson(row));
|
|
} catch (err) {
|
|
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
router.post('/channels/:id/stop', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
const channel = req.channel;
|
|
if (channel.container_id) {
|
|
const { remote: isRemote, apiUrl } = await resolveNodeTarget(channel.node_id);
|
|
if (isRemote) {
|
|
await fetch(`${apiUrl}/sidecar/stop`, {
|
|
method: 'POST', headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({ containerId: channel.container_id }),
|
|
signal: AbortSignal.timeout(20000),
|
|
}).catch((e) => console.error('[playout] remote stop failed:', e.message));
|
|
} else {
|
|
await dockerApi('POST', `/containers/${channel.container_id}/stop?t=10`).catch(() => {});
|
|
await dockerApi('DELETE', `/containers/${channel.container_id}?force=true`).catch(() => {});
|
|
}
|
|
}
|
|
const { rows } = await pool.query(
|
|
`UPDATE playout_channels SET status = 'stopped', container_id = NULL, updated_at = NOW()
|
|
WHERE id = $1 RETURNING *`, [channel.id]
|
|
);
|
|
res.json(channelToJson(rows[0]));
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
router.get('/channels/:id/status', async (req, res, next) => {
|
|
try {
|
|
if (req.channel.status !== 'running') {
|
|
return res.json({ running: false, status: req.channel.status });
|
|
}
|
|
const out = await callSidecar(req.channel, '/status', 'GET');
|
|
res.json({ running: true, status: req.channel.status, engine: out });
|
|
} catch (err) {
|
|
res.json({ running: true, status: req.channel.status, engine: null, engine_error: err.message });
|
|
}
|
|
});
|
|
|
|
// GET /playout/channels/:id/hls/index.m3u8 — the live preview playlist, served
|
|
// through the API (not the static /media/live path) so it bypasses the public
|
|
// reverse proxy's static cache. That proxy caches the .m3u8 by path with a
|
|
// multi-second TTL and ignores the origin's no-store, so hls.js's ~1s reloads
|
|
// always got a STALE playlist ("MISSED" forever → monitor stayed black). The
|
|
// /api/ path is not proxy-cached (the status poll updates fine), so this always
|
|
// returns the fresh live edge. Segment (.ts) lines are rewritten to absolute
|
|
// /media/live/<id>/ URLs so they still load from the static path (immutable,
|
|
// caching them is fine). mam-api shares the same /media volume the sidecars
|
|
// write to.
|
|
const HLS_MEDIA_DIR = process.env.PLAYOUT_MEDIA_DIR || '/media';
|
|
router.get('/channels/:id/hls/index.m3u8', async (req, res, next) => {
|
|
try {
|
|
const cid = req.channel.id;
|
|
let body;
|
|
try {
|
|
body = await readFile(`${HLS_MEDIA_DIR}/live/${cid}/index.m3u8`, 'utf8');
|
|
} catch (e) {
|
|
return res.status(404).json({ error: 'No live preview for this channel yet' });
|
|
}
|
|
// Rewrite bare segment names to absolute static URLs.
|
|
const rewritten = body
|
|
.split('\n')
|
|
.map((line) => (/^[^#].*\.ts\s*$/.test(line) ? `/media/live/${cid}/${line.trim()}` : line))
|
|
.join('\n');
|
|
res.setHeader('Content-Type', 'application/vnd.apple.mpegurl');
|
|
res.setHeader('Cache-Control', 'no-store, no-cache, must-revalidate');
|
|
res.setHeader('Pragma', 'no-cache');
|
|
res.send(rewritten);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// ── Transport ────────────────────────────────────────────────────────────────
|
|
async function transport(req, res, action, body = null) {
|
|
if (req.channel.status !== 'running') {
|
|
return res.status(409).json({ error: 'Channel is not running' });
|
|
}
|
|
try { res.json(await callSidecar(req.channel, action, 'POST', body)); }
|
|
catch (err) { res.status(502).json({ error: err.message }); }
|
|
}
|
|
|
|
router.post('/channels/:id/play', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
if (req.channel.status !== 'running') {
|
|
return res.status(409).json({ error: 'Start the channel before playing' });
|
|
}
|
|
const { playlist_id } = req.body || {};
|
|
if (!playlist_id) return res.status(400).json({ error: 'playlist_id is required' });
|
|
|
|
const pl = await pool.query('SELECT * FROM playout_playlists WHERE id = $1 AND channel_id = $2',
|
|
[playlist_id, req.channel.id]);
|
|
if (pl.rows.length === 0) return res.status(404).json({ error: 'Playlist not found for this channel' });
|
|
|
|
const items = await pool.query(
|
|
`SELECT i.*, a.filename AS clip_name, a.duration_ms AS asset_duration_ms
|
|
FROM playout_items i JOIN assets a ON a.id = i.asset_id
|
|
WHERE i.playlist_id = $1 ORDER BY i.sort_order ASC`, [playlist_id]);
|
|
|
|
const notReady = items.rows.filter((i) => i.media_status !== 'ready' || !i.media_path);
|
|
if (notReady.length > 0) {
|
|
return res.status(409).json({
|
|
error: 'Some items are not staged yet',
|
|
pending: notReady.map((i) => i.id),
|
|
});
|
|
}
|
|
|
|
const payload = {
|
|
loop: pl.rows[0].loop,
|
|
items: items.rows.map((i) => ({
|
|
id: i.id, asset_id: i.asset_id, media_path: i.media_path,
|
|
in_point: i.in_point ? Number(i.in_point) : null,
|
|
out_point: i.out_point ? Number(i.out_point) : null,
|
|
transition: i.transition, transition_ms: i.transition_ms,
|
|
clip_name: i.clip_name,
|
|
asset_duration_ms: i.asset_duration_ms != null ? Number(i.asset_duration_ms) : null,
|
|
})),
|
|
};
|
|
// callSidecar throws on network/timeout errors. Return 502 (not 409) so
|
|
// the UI and operators know it's a gateway problem, not a state conflict.
|
|
let out;
|
|
try {
|
|
out = await callSidecar(req.channel, '/playlist/load', 'POST', payload);
|
|
} catch (err) {
|
|
return res.status(502).json({ error: 'Sidecar unreachable: ' + err.message });
|
|
}
|
|
res.json(out);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
router.post('/channels/:id/pause', requireChannelEdit, (req, res) => transport(req, res, '/transport/pause'));
|
|
router.post('/channels/:id/resume', requireChannelEdit, (req, res) => transport(req, res, '/transport/resume'));
|
|
router.post('/channels/:id/skip', requireChannelEdit, (req, res) => transport(req, res, '/transport/skip'));
|
|
router.post('/channels/:id/stop-playback', requireChannelEdit, (req, res) => transport(req, res, '/channel/stop'));
|
|
|
|
router.get('/channels/:id/asrun', async (req, res, next) => {
|
|
try {
|
|
const { rows } = await pool.query(
|
|
`SELECT * FROM playout_as_run WHERE channel_id = $1 ORDER BY started_at DESC LIMIT 500`,
|
|
[req.channel.id]);
|
|
res.json(rows);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// ── SCTE-35 ad-break splices ───────────────────────────────────────────────
|
|
// Schedule, trigger, and list SCTE-35 ad breaks on a channel. A break can be
|
|
// scheduled (after a playlist position, or at a wall-clock time) or fired
|
|
// immediately. Firing tells the sidecar to splice the live output, marks the
|
|
// break 'fired', and stamps a row in the as-run compliance log.
|
|
const SCTE_TYPES = new Set(['splice_insert', 'immediate', 'splice_out', 'splice_in']);
|
|
|
|
// Fire a break row on the sidecar + record it. Shared by the immediate-trigger
|
|
// route and the scheduler's due-break sweep. Best-effort: a sidecar failure
|
|
// marks the break 'error' via error_message but never throws to the caller's
|
|
// HTTP path beyond what's handled here.
|
|
export async function fireScteBreak(channel, brk) {
|
|
const out = await callSidecar(channel, '/scte/trigger', 'POST', {
|
|
eventId: brk.event_id,
|
|
type: brk.type === 'immediate' ? 'splice_insert' : brk.type,
|
|
durationS: brk.duration_s,
|
|
});
|
|
await pool.query(
|
|
`UPDATE playout_scte_breaks SET status = 'fired', fired_at = NOW(), updated_at = NOW() WHERE id = $1`,
|
|
[brk.id]
|
|
);
|
|
// Stamp the compliance log. ended_at/duration are known up front for a
|
|
// fixed-duration break, so the row is written closed.
|
|
await pool.query(
|
|
`INSERT INTO playout_as_run
|
|
(channel_id, item_id, clip_name, started_at, ended_at, duration_s, result)
|
|
VALUES ($1, $2, $3, NOW(),
|
|
CASE WHEN $4 > 0 THEN NOW() + ($4 || ' seconds')::interval ELSE NULL END,
|
|
$4, 'scte')`,
|
|
[channel.id, brk.id, `SCTE-35 ${brk.type} (${brk.duration_s}s)`, brk.duration_s]
|
|
);
|
|
return out;
|
|
}
|
|
|
|
router.get('/channels/:id/scte', async (req, res, next) => {
|
|
try {
|
|
const { rows } = await pool.query(
|
|
`SELECT * FROM playout_scte_breaks WHERE channel_id = $1 ORDER BY created_at DESC LIMIT 200`,
|
|
[req.channel.id]);
|
|
res.json(rows);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// Schedule a break. Body: { type, duration_s, playlist_pos?, scheduled_at? }.
|
|
// A pending break with a playlist_pos / scheduled_at is fired later by the
|
|
// scheduler; one with neither is fired immediately for convenience.
|
|
router.post('/channels/:id/scte', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
const { type = 'splice_insert', duration_s = 30,
|
|
playlist_pos = null, scheduled_at = null } = req.body || {};
|
|
if (!SCTE_TYPES.has(type)) {
|
|
return res.status(400).json({ error: `type must be one of: ${[...SCTE_TYPES].join(', ')}` });
|
|
}
|
|
const dur = Math.max(0, parseInt(duration_s, 10) || 0);
|
|
// Auto-assign a monotonically increasing splice_event_id per channel.
|
|
const ev = await pool.query(
|
|
`SELECT COALESCE(MAX(event_id), 0) + 1 AS next FROM playout_scte_breaks WHERE channel_id = $1`,
|
|
[req.channel.id]);
|
|
const eventId = ev.rows[0].next;
|
|
const { rows } = await pool.query(
|
|
`INSERT INTO playout_scte_breaks
|
|
(channel_id, playlist_pos, scheduled_at, duration_s, event_id, type, created_by)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING *`,
|
|
[req.channel.id, playlist_pos, scheduled_at, dur, eventId, type, req.user?.id || null]);
|
|
res.status(201).json(rows[0]);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// Fire an ad break immediately ("splice now"). Body: { type?, duration_s? }.
|
|
// Creates the break row and triggers the splice on the live output in one shot.
|
|
router.post('/channels/:id/scte/trigger', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
if (req.channel.status !== 'running') {
|
|
return res.status(409).json({ error: 'Channel is not running' });
|
|
}
|
|
const { type = 'immediate', duration_s = 30 } = req.body || {};
|
|
if (!SCTE_TYPES.has(type)) {
|
|
return res.status(400).json({ error: `type must be one of: ${[...SCTE_TYPES].join(', ')}` });
|
|
}
|
|
const dur = Math.max(0, parseInt(duration_s, 10) || 0);
|
|
const ev = await pool.query(
|
|
`SELECT COALESCE(MAX(event_id), 0) + 1 AS next FROM playout_scte_breaks WHERE channel_id = $1`,
|
|
[req.channel.id]);
|
|
const eventId = ev.rows[0].next;
|
|
const { rows } = await pool.query(
|
|
`INSERT INTO playout_scte_breaks (channel_id, duration_s, event_id, type, status, created_by)
|
|
VALUES ($1,$2,$3,$4,'pending',$5) RETURNING *`,
|
|
[req.channel.id, dur, eventId, type, req.user?.id || null]);
|
|
try {
|
|
const out = await fireScteBreak(req.channel, rows[0]);
|
|
const updated = await pool.query('SELECT * FROM playout_scte_breaks WHERE id = $1', [rows[0].id]);
|
|
res.json({ break: updated.rows[0], engine: out });
|
|
} catch (err) {
|
|
await pool.query(`UPDATE playout_scte_breaks SET status = 'cancelled', updated_at = NOW() WHERE id = $1`,
|
|
[rows[0].id]).catch(() => {});
|
|
return res.status(502).json({ error: 'Sidecar unreachable: ' + err.message });
|
|
}
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
router.delete('/channels/:id/scte/:scteId', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
const { rows } = await pool.query(
|
|
`UPDATE playout_scte_breaks SET status = 'cancelled', updated_at = NOW()
|
|
WHERE id = $1 AND channel_id = $2 AND status = 'pending' RETURNING id`,
|
|
[req.params.scteId, req.channel.id]);
|
|
if (rows.length === 0) return res.status(404).json({ error: 'Pending break not found' });
|
|
res.json({ cancelled: true });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
async function loadChannelForBody(req, res, next) {
|
|
const channelId = req.body.channel_id || req.query.channel_id;
|
|
if (!channelId) return res.status(400).json({ error: 'channel_id is required' });
|
|
try {
|
|
const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]);
|
|
if (rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
|
|
req.channel = rows[0];
|
|
await assertProjectAccess(req.user, req.channel.project_id, 'edit');
|
|
next();
|
|
} catch (err) { next(err); }
|
|
}
|
|
|
|
router.get('/playlists', async (req, res, next) => {
|
|
try {
|
|
const channelId = req.query.channel_id;
|
|
if (!channelId) return res.status(400).json({ error: 'channel_id is required' });
|
|
const ch = await pool.query('SELECT project_id FROM playout_channels WHERE id = $1', [channelId]);
|
|
if (ch.rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
|
|
await assertProjectAccess(req.user, ch.rows[0].project_id, 'view');
|
|
const { rows } = await pool.query(
|
|
'SELECT * FROM playout_playlists WHERE channel_id = $1 ORDER BY created_at ASC', [channelId]);
|
|
res.json(rows);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
router.post('/playlists', loadChannelForBody, async (req, res, next) => {
|
|
try {
|
|
const { name, loop = false } = req.body || {};
|
|
if (!name) return res.status(400).json({ error: 'name is required' });
|
|
const { rows } = await pool.query(
|
|
'INSERT INTO playout_playlists (channel_id, name, loop) VALUES ($1,$2,$3) RETURNING *',
|
|
[req.channel.id, name.trim(), !!loop]);
|
|
res.status(201).json(rows[0]);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
router.get('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => {
|
|
try {
|
|
const pl = await pool.query(
|
|
`SELECT p.*, c.project_id FROM playout_playlists p
|
|
JOIN playout_channels c ON c.id = p.channel_id WHERE p.id = $1`, [req.params.plid]);
|
|
if (pl.rows.length === 0) return res.status(404).json({ error: 'Playlist not found' });
|
|
await assertProjectAccess(req.user, pl.rows[0].project_id, 'view');
|
|
const { rows } = await pool.query(
|
|
`SELECT i.*, a.filename AS clip_name, a.duration_ms AS asset_duration_ms
|
|
FROM playout_items i JOIN assets a ON a.id = i.asset_id
|
|
WHERE i.playlist_id = $1 ORDER BY i.sort_order ASC`, [req.params.plid]);
|
|
res.json(rows);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
async function loadPlaylistEdit(plid, user) {
|
|
const pl = await pool.query(
|
|
`SELECT p.*, c.project_id FROM playout_playlists p
|
|
JOIN playout_channels c ON c.id = p.channel_id WHERE p.id = $1`, [plid]);
|
|
if (pl.rows.length === 0) { throw Object.assign(new Error('Playlist not found'), { httpStatus: 404 }); }
|
|
await assertProjectAccess(user, pl.rows[0].project_id, 'edit');
|
|
return pl.rows[0];
|
|
}
|
|
|
|
router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => {
|
|
try {
|
|
await loadPlaylistEdit(req.params.plid, req.user);
|
|
const { asset_id, in_point = null, out_point = null,
|
|
transition = 'cut', transition_ms = 0 } = req.body || {};
|
|
if (!asset_id) return res.status(400).json({ error: 'asset_id is required' });
|
|
|
|
const ord = await pool.query(
|
|
'SELECT COALESCE(MAX(sort_order), -1) + 1 AS next FROM playout_items WHERE playlist_id = $1',
|
|
[req.params.plid]);
|
|
const { rows } = await pool.query(
|
|
`INSERT INTO playout_items (playlist_id, asset_id, sort_order, in_point, out_point, transition, transition_ms)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING *`,
|
|
[req.params.plid, asset_id, ord.rows[0].next, in_point, out_point, transition, transition_ms]);
|
|
|
|
await stageQueue.add('stage', { itemId: rows[0].id, assetId: asset_id }).catch((e) =>
|
|
console.error('[playout] failed to enqueue stage job:', e.message));
|
|
|
|
res.status(201).json(rows[0]);
|
|
} catch (err) {
|
|
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
router.put('/playlists/:plid/reorder', validateUuid('plid'), async (req, res, next) => {
|
|
const client = await pool.connect();
|
|
try {
|
|
await loadPlaylistEdit(req.params.plid, req.user);
|
|
const { order } = req.body || {};
|
|
if (!Array.isArray(order)) return res.status(400).json({ error: 'order must be an array of item ids' });
|
|
await client.query('BEGIN');
|
|
for (let i = 0; i < order.length; i++) {
|
|
await client.query(
|
|
'UPDATE playout_items SET sort_order = $1, updated_at = NOW() WHERE id = $2 AND playlist_id = $3',
|
|
[i, order[i], req.params.plid]);
|
|
}
|
|
await client.query('COMMIT');
|
|
res.json({ reordered: order.length });
|
|
} catch (err) {
|
|
await client.query('ROLLBACK').catch(() => {});
|
|
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
|
|
next(err);
|
|
} finally { client.release(); }
|
|
});
|
|
|
|
router.delete('/items/:itemId', validateUuid('itemId'), async (req, res, next) => {
|
|
try {
|
|
const it = await pool.query(
|
|
`SELECT i.id, c.project_id FROM playout_items i
|
|
JOIN playout_playlists p ON p.id = i.playlist_id
|
|
JOIN playout_channels c ON c.id = p.channel_id WHERE i.id = $1`, [req.params.itemId]);
|
|
if (it.rows.length === 0) return res.status(404).json({ error: 'Item not found' });
|
|
await assertProjectAccess(req.user, it.rows[0].project_id, 'edit');
|
|
await pool.query('DELETE FROM playout_items WHERE id = $1', [req.params.itemId]);
|
|
res.json({ deleted: true });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
router.post('/items/:itemId/stage', validateUuid('itemId'), async (req, res, next) => {
|
|
try {
|
|
const it = await pool.query(
|
|
`SELECT i.id, i.asset_id, c.project_id FROM playout_items i
|
|
JOIN playout_playlists p ON p.id = i.playlist_id
|
|
JOIN playout_channels c ON c.id = p.channel_id WHERE i.id = $1`, [req.params.itemId]);
|
|
if (it.rows.length === 0) return res.status(404).json({ error: 'Item not found' });
|
|
await assertProjectAccess(req.user, it.rows[0].project_id, 'edit');
|
|
await pool.query("UPDATE playout_items SET media_status = 'pending' WHERE id = $1", [req.params.itemId]);
|
|
await stageQueue.add('stage', { itemId: it.rows[0].id, assetId: it.rows[0].asset_id });
|
|
res.json({ queued: true });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
export async function restartChannel(channelId) {
|
|
const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]);
|
|
if (rows.length === 0) return { restarted: false, reason: 'channel not found' };
|
|
const channel = rows[0];
|
|
|
|
if (channel.output_type === 'decklink') {
|
|
return { restarted: false, reason: 'decklink channels are alert-only' };
|
|
}
|
|
|
|
if (channel.container_id) {
|
|
const { remote, apiUrl } = await resolveNodeTarget(channel.node_id);
|
|
if (remote && apiUrl) {
|
|
await fetch(`${apiUrl}/sidecar/stop`, {
|
|
method: 'POST', headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({ containerId: channel.container_id }),
|
|
signal: AbortSignal.timeout(10000),
|
|
}).catch(() => {});
|
|
} else {
|
|
await dockerApi('DELETE', `/containers/${channel.container_id}?force=true`).catch(() => {});
|
|
}
|
|
}
|
|
|
|
const nodes = await pool.query(
|
|
`SELECT id, hostname, api_url, last_seen_at FROM cluster_nodes
|
|
WHERE id <> $1 AND last_seen_at > NOW() - INTERVAL '60 seconds'
|
|
ORDER BY last_seen_at DESC LIMIT 1`,
|
|
[channel.node_id]
|
|
);
|
|
if (nodes.rows.length === 0) {
|
|
await pool.query(
|
|
"UPDATE playout_channels SET status = 'error', error_message = $1 WHERE id = $2",
|
|
['no healthy node available for failover', channel.id]
|
|
);
|
|
return { restarted: false, reason: 'no eligible node' };
|
|
}
|
|
const newNodeId = nodes.rows[0].id;
|
|
|
|
const { rows: moved } = await pool.query(
|
|
`UPDATE playout_channels
|
|
SET node_id = $1, status = 'stopped', container_id = NULL, container_meta = '{}'::jsonb,
|
|
restart_count = restart_count + 1, last_restart_at = NOW(),
|
|
error_message = NULL, updated_at = NOW()
|
|
WHERE id = $2 RETURNING *`,
|
|
[newNodeId, channel.id]
|
|
);
|
|
|
|
try {
|
|
await spawnChannelSidecar(moved[0]);
|
|
return { restarted: true, new_node_id: newNodeId };
|
|
} catch (err) {
|
|
return { restarted: false, reason: `respawn failed: ${err.message}` };
|
|
}
|
|
}
|
|
|
|
export default router;
|