Root cause of the persistent black preview, fully isolated: ZAMPP1's nginx
serves the live .m3u8 fresh on every request (no-store works there), but
the PUBLIC reverse proxy (159.112.211.103 -> ZAMPP1) caches the static
.m3u8 by path with a multi-second TTL, ignoring both the origin's no-store
and query params. hls.js reloads the playlist ~every second, always landing
inside that TTL, so it sees the live playlist as never advancing
("live playlist MISSED" forever), never establishes the timeline, and never
loads a fragment -> readyState 0 (black). Proven: rapid reads via ZAMPP1
localhost advance (404->405); the same rapid reads via the public URL are
stuck; query-busting doesn't help (proxy caches by path).
Fix: serve the playlist through GET /api/v1/playout/channels/:id/hls/index.m3u8
instead of the static /media/live path. /api/ is not proxy-cached (the live
status poll already updates fine through it), so hls.js always gets the fresh
live edge. Segment (.ts) lines are rewritten to absolute /media/live/<id>/
URLs so they still load from the static path (immutable; caching them is
correct). ProgramMonitor points hls.js at the /api playlist and sends the
session cookie (xhrSetup withCredentials) since /api is auth-gated.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
735 lines
33 KiB
JavaScript
735 lines
33 KiB
JavaScript
// Playout / Master Control routes.
|
|
//
|
|
// Control plane for the CasparCG-backed playout subsystem. Channels are placed
|
|
// on cluster nodes and their engine containers spawned via the same Docker-socket
|
|
// / node-agent path recorders use; the channel's transport (play / pause / skip)
|
|
// is proxied through to the sidecar's HTTP shim, which drives CasparCG over AMCP.
|
|
//
|
|
// RBAC: every channel carries a project_id (NULL = admin-only, the recorder
|
|
// convention). List routes filter by accessible projects; mutating routes assert
|
|
// 'edit'. See docs/superpowers/specs/2026-05-30-playout-mcr-design.md.
|
|
|
|
import express from 'express';
|
|
import http from 'http';
|
|
import { readFile } from 'fs/promises';
|
|
import { Queue } from 'bullmq';
|
|
import pool from '../db/pool.js';
|
|
import { validateUuid } from '../middleware/errors.js';
|
|
import {
|
|
assertProjectAccess, accessibleProjectIds, isAdmin,
|
|
} from '../auth/authz.js';
|
|
|
|
const router = express.Router();
|
|
|
|
// ── BullMQ: media staging queue (S3 -> /media volume) ────────────────────────
|
|
const parseRedisUrl = (url) => {
|
|
const parsed = new URL(url);
|
|
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
|
|
};
|
|
const stageQueue = new Queue('playout-stage', {
|
|
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
|
});
|
|
|
|
// ── Sidecar orchestration (mirrors recorders.js) ─────────────────────────────
|
|
const PLAYOUT_SIDECAR_IMAGE = process.env.PLAYOUT_IMAGE || 'wild-dragon-playout:latest';
|
|
|
|
function dockerApi(method, path, body = null) {
|
|
return new Promise((resolve, reject) => {
|
|
const options = {
|
|
socketPath: '/var/run/docker.sock',
|
|
path: `/v1.43${path}`,
|
|
method,
|
|
headers: { 'Content-Type': 'application/json' },
|
|
};
|
|
const req = http.request(options, (res) => {
|
|
let data = '';
|
|
res.on('data', (chunk) => (data += chunk));
|
|
res.on('end', () => {
|
|
try { resolve({ status: res.statusCode, data: data ? JSON.parse(data) : {} }); }
|
|
catch { resolve({ status: res.statusCode, data }); }
|
|
});
|
|
});
|
|
req.on('error', reject);
|
|
req.setTimeout(10000, () => req.destroy(new Error('Docker API timeout after 10s')));
|
|
if (body) req.write(JSON.stringify(body));
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
async function resolveNodeTarget(nodeId) {
|
|
if (!nodeId) return { remote: false };
|
|
const r = await pool.query(
|
|
'SELECT hostname, ip_address, api_url FROM cluster_nodes WHERE id = $1', [nodeId]
|
|
);
|
|
if (r.rows.length === 0) return { remote: false };
|
|
const node = r.rows[0];
|
|
const localHostname = process.env.NODE_HOSTNAME || '';
|
|
if (!node.api_url || node.hostname === localHostname) return { remote: false };
|
|
return { remote: true, apiUrl: node.api_url, ip: node.ip_address };
|
|
}
|
|
|
|
// The sidecar shim listens on this port inside the container. The mam-api talks
|
|
// to it by container alias on the shared docker network (local) or via the
|
|
// node-agent's returned host:port (remote).
|
|
const SIDECAR_HTTP_PORT = 3002;
|
|
|
|
function channelAlias(id) { return `playout-${id}`; }
|
|
|
|
// Resolve the base URL the API uses to reach a running channel's sidecar shim.
|
|
// Local: the docker-network alias. Remote: the node-agent reported the host the
|
|
// container is published on (stored in container_meta.sidecar_url).
|
|
function sidecarBaseUrl(channel) {
|
|
if (channel.container_meta && channel.container_meta.sidecar_url) {
|
|
return channel.container_meta.sidecar_url;
|
|
}
|
|
return `http://${channelAlias(channel.id)}:${SIDECAR_HTTP_PORT}`;
|
|
}
|
|
|
|
async function callSidecar(channel, path, method = 'POST', body = null) {
|
|
const url = `${sidecarBaseUrl(channel)}${path}`;
|
|
const res = await fetch(url, {
|
|
method,
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: body ? JSON.stringify(body) : undefined,
|
|
signal: AbortSignal.timeout(20000),
|
|
});
|
|
if (!res.ok) {
|
|
const text = await res.text().catch(() => '');
|
|
throw new Error(`sidecar ${method} ${path} -> HTTP ${res.status}: ${text.slice(0, 200)}`);
|
|
}
|
|
return res.json().catch(() => ({}));
|
|
}
|
|
|
|
// ── Serialization ────────────────────────────────────────────────────────────
|
|
function channelToJson(r) {
|
|
return {
|
|
id: r.id,
|
|
name: r.name,
|
|
node_id: r.node_id,
|
|
output_type: r.output_type,
|
|
output_config: r.output_config,
|
|
video_format: r.video_format,
|
|
status: r.status,
|
|
container_id: r.container_id,
|
|
error_message: r.error_message,
|
|
project_id: r.project_id,
|
|
restart_count: r.restart_count ?? 0,
|
|
last_restart_at: r.last_restart_at,
|
|
last_heartbeat_at: r.last_heartbeat_at,
|
|
created_at: r.created_at,
|
|
updated_at: r.updated_at,
|
|
};
|
|
}
|
|
|
|
const OUTPUT_TYPES = new Set(['decklink', 'ndi', 'srt', 'rtmp']);
|
|
|
|
// ── Param resolver: scope every /:id route to the channel's project ──────────
|
|
router.param('id', async (req, res, next) => {
|
|
validateUuid('id')(req, res, () => {});
|
|
if (res.headersSent) return;
|
|
try {
|
|
const { rows } = await pool.query(
|
|
'SELECT * FROM playout_channels WHERE id = $1', [req.params.id]
|
|
);
|
|
if (rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
|
|
req.channel = rows[0];
|
|
await assertProjectAccess(req.user, req.channel.project_id, 'view');
|
|
next();
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
async function requireChannelEdit(req, res, next) {
|
|
try { await assertProjectAccess(req.user, req.channel.project_id, 'edit'); next(); }
|
|
catch (err) { next(err); }
|
|
}
|
|
|
|
// ── Channels ─────────────────────────────────────────────────────────────────
|
|
|
|
// GET /playout/channels — list (filtered to accessible projects)
|
|
router.get('/channels', async (req, res, next) => {
|
|
try {
|
|
let rows;
|
|
if (isAdmin(req.user)) {
|
|
({ rows } = await pool.query('SELECT * FROM playout_channels ORDER BY created_at DESC'));
|
|
} else {
|
|
const ids = await accessibleProjectIds(req.user);
|
|
if (ids.length === 0) return res.json([]);
|
|
({ rows } = await pool.query(
|
|
'SELECT * FROM playout_channels WHERE project_id = ANY($1) ORDER BY created_at DESC', [ids]
|
|
));
|
|
}
|
|
res.json(rows.map(channelToJson));
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /playout/channels — create
|
|
router.post('/channels', async (req, res, next) => {
|
|
try {
|
|
const { name, node_id = null, output_type = 'srt', output_config = {},
|
|
video_format = '1080p5994', project_id = null } = req.body || {};
|
|
if (!name || typeof name !== 'string') {
|
|
return res.status(400).json({ error: 'name is required' });
|
|
}
|
|
if (!OUTPUT_TYPES.has(output_type)) {
|
|
return res.status(400).json({ error: `output_type must be one of: ${[...OUTPUT_TYPES].join(', ')}` });
|
|
}
|
|
// Creating a project-scoped channel requires edit on that project; a
|
|
// null-project (admin-only) channel requires admin.
|
|
if (project_id) await assertProjectAccess(req.user, project_id, 'edit');
|
|
else if (!isAdmin(req.user)) return res.status(403).json({ error: 'admin required for unassigned channel' });
|
|
|
|
const { rows } = await pool.query(
|
|
`INSERT INTO playout_channels (name, node_id, output_type, output_config, video_format, project_id)
|
|
VALUES ($1,$2,$3,$4,$5,$6) RETURNING *`,
|
|
[name.trim(), node_id, output_type, JSON.stringify(output_config), video_format, project_id]
|
|
);
|
|
res.status(201).json(channelToJson(rows[0]));
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// PATCH /playout/channels/:id — update config (only while stopped)
|
|
router.patch('/channels/:id', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
if (req.channel.status === 'running') {
|
|
return res.status(409).json({ error: 'Cannot edit a running channel — stop it first' });
|
|
}
|
|
const allowed = ['name', 'node_id', 'output_type', 'output_config', 'video_format', 'project_id'];
|
|
const sets = [];
|
|
const vals = [];
|
|
let i = 1;
|
|
for (const k of allowed) {
|
|
if (req.body[k] === undefined) continue;
|
|
if (k === 'output_type' && !OUTPUT_TYPES.has(req.body[k])) {
|
|
return res.status(400).json({ error: 'invalid output_type' });
|
|
}
|
|
sets.push(`${k} = $${i++}`);
|
|
vals.push(k === 'output_config' ? JSON.stringify(req.body[k]) : req.body[k]);
|
|
}
|
|
if (sets.length === 0) return res.json(channelToJson(req.channel));
|
|
vals.push(req.channel.id);
|
|
const { rows } = await pool.query(
|
|
`UPDATE playout_channels SET ${sets.join(', ')}, updated_at = NOW() WHERE id = $${i} RETURNING *`, vals
|
|
);
|
|
res.json(channelToJson(rows[0]));
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// DELETE /playout/channels/:id
|
|
router.delete('/channels/:id', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
if (req.channel.status === 'running') {
|
|
return res.status(409).json({ error: 'Stop the channel before deleting it' });
|
|
}
|
|
await pool.query('DELETE FROM playout_channels WHERE id = $1', [req.channel.id]);
|
|
res.json({ deleted: true });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// ── Port-contention guard (DeckLink) ─────────────────────────────────────────
|
|
// A DeckLink device on a node is exclusive: an active recorder OR another active
|
|
// channel on the same node+index blocks a new SDI channel. NDI/SRT/RTMP have no
|
|
// hardware contention.
|
|
async function assertDeckLinkFree(channel) {
|
|
if (channel.output_type !== 'decklink') return;
|
|
const idx = (channel.output_config && channel.output_config.device_index) || 1;
|
|
// Another running channel on the same node + device index?
|
|
const chan = await pool.query(
|
|
`SELECT id FROM playout_channels
|
|
WHERE id <> $1 AND node_id IS NOT DISTINCT FROM $2 AND status = 'running'
|
|
AND output_type = 'decklink' AND (output_config->>'device_index')::int = $3`,
|
|
[channel.id, channel.node_id, idx]
|
|
);
|
|
if (chan.rows.length > 0) {
|
|
throw Object.assign(new Error(`DeckLink device ${idx} already in use by another channel on this node`), { httpStatus: 409 });
|
|
}
|
|
// An active recorder using the same device index on the same node?
|
|
const rec = await pool.query(
|
|
`SELECT id FROM recorders
|
|
WHERE node_id IS NOT DISTINCT FROM $1 AND device_index = $2
|
|
AND status = 'recording' AND source_type = 'sdi'`,
|
|
[channel.node_id, idx]
|
|
);
|
|
if (rec.rows.length > 0) {
|
|
throw Object.assign(new Error(`DeckLink device ${idx} is in use by a recorder on this node`), { httpStatus: 409 });
|
|
}
|
|
}
|
|
|
|
// Spawn the CasparCG sidecar for a channel and flip it to 'running'. Shared by
|
|
// the /start route and the scheduler failover path (restartChannel) so neither
|
|
// duplicates the docker/node-agent orchestration. Caller is responsible for the
|
|
// pre-flight guards (status check, DeckLink contention) appropriate to its path.
|
|
//
|
|
// On any spawn failure the channel is left status='error' with a message and an
|
|
// Error carrying { httpStatus } is thrown. On success returns the updated row.
|
|
async function spawnChannelSidecar(channel) {
|
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = NULL WHERE id = $2', ['starting', channel.id]);
|
|
|
|
const env = [
|
|
`OUTPUT_TYPE=${channel.output_type}`,
|
|
`OUTPUT_CONFIG=${JSON.stringify(channel.output_config || {})}`,
|
|
`VIDEO_FORMAT=${channel.video_format}`,
|
|
`PORT=${SIDECAR_HTTP_PORT}`,
|
|
// Drives the HLS preview path (/media/live/<channel_id>/index.m3u8) and
|
|
// the per-channel resource naming inside the sidecar.
|
|
`CHANNEL_ID=${channel.id}`,
|
|
];
|
|
|
|
const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(channel.node_id);
|
|
const dockerNetwork = process.env.DOCKER_NETWORK || 'wild-dragon_wild-dragon';
|
|
let containerId;
|
|
let containerMeta = {};
|
|
|
|
if (isRemote) {
|
|
const sidecarRes = await fetch(`${targetNodeApiUrl}/sidecar/start`, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({
|
|
image: PLAYOUT_SIDECAR_IMAGE, env,
|
|
capturePort: SIDECAR_HTTP_PORT,
|
|
sourceType: channel.output_type,
|
|
useGpu: false,
|
|
publishHttp: true,
|
|
}),
|
|
signal: AbortSignal.timeout(20000),
|
|
});
|
|
if (!sidecarRes.ok) {
|
|
const details = await sidecarRes.json().catch(() => ({}));
|
|
console.error('[playout] remote sidecar start failed:', JSON.stringify(details));
|
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
|
|
['error', 'remote node failed to start sidecar', channel.id]);
|
|
throw Object.assign(new Error('Remote node failed to start sidecar'), { httpStatus: 502 });
|
|
}
|
|
const data = await sidecarRes.json();
|
|
containerId = data.containerId;
|
|
// node-agent returns the reachable host:port the shim is published on.
|
|
if (data.sidecarUrl || data.host) {
|
|
containerMeta.sidecar_url = data.sidecarUrl || `http://${data.host}:${SIDECAR_HTTP_PORT}`;
|
|
}
|
|
} else {
|
|
const alias = channelAlias(channel.id);
|
|
const hostBinds = ['/mnt/NVME/MAM/wild-dragon-media:/media'];
|
|
if (channel.output_type === 'decklink') hostBinds.push('/dev/blackmagic:/dev/blackmagic');
|
|
|
|
const containerConfig = {
|
|
Image: PLAYOUT_SIDECAR_IMAGE,
|
|
Env: env,
|
|
HostConfig: {
|
|
Privileged: true,
|
|
NetworkMode: dockerNetwork,
|
|
Binds: hostBinds,
|
|
},
|
|
NetworkingConfig: { EndpointsConfig: { [dockerNetwork]: { Aliases: [alias] } } },
|
|
Hostname: alias,
|
|
};
|
|
const createRes = await dockerApi('POST', '/containers/create', containerConfig);
|
|
if (createRes.status !== 201) {
|
|
console.error('[playout] container create failed:', JSON.stringify(createRes.data));
|
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
|
|
['error', 'container create failed', channel.id]);
|
|
throw Object.assign(new Error('Failed to create container'), { httpStatus: 500 });
|
|
}
|
|
containerId = createRes.data.Id;
|
|
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
|
|
if (startRes.status !== 204) {
|
|
console.error('[playout] container start failed:', JSON.stringify(startRes.data));
|
|
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
|
|
['error', 'container start failed', channel.id]);
|
|
throw Object.assign(new Error('Failed to start container'), { httpStatus: 500 });
|
|
}
|
|
}
|
|
|
|
// Set last_heartbeat_at = NOW() so the scheduler health tick treats this
|
|
// channel as freshly alive. Without this, last_heartbeat_at starts as NULL
|
|
// (epoch=0), and the very first tick sees ageMs >> TIMEOUT_MS and triggers
|
|
// failover immediately — before the sidecar has had a chance to respond.
|
|
const { rows } = await pool.query(
|
|
`UPDATE playout_channels
|
|
SET status = 'running', container_id = $1, container_meta = $2,
|
|
last_heartbeat_at = NOW(), updated_at = NOW()
|
|
WHERE id = $3 RETURNING *`,
|
|
[containerId, JSON.stringify(containerMeta), channel.id]
|
|
);
|
|
return rows[0];
|
|
}
|
|
|
|
// POST /playout/channels/:id/start — spawn the CasparCG sidecar + bring up output
|
|
router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
const channel = req.channel;
|
|
if (channel.status === 'running' || channel.status === 'starting') {
|
|
return res.status(409).json({ error: `Channel already ${channel.status}` });
|
|
}
|
|
await assertDeckLinkFree(channel);
|
|
const row = await spawnChannelSidecar(channel);
|
|
res.json(channelToJson(row));
|
|
} catch (err) {
|
|
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// POST /playout/channels/:id/stop — tear down the sidecar
|
|
router.post('/channels/:id/stop', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
const channel = req.channel;
|
|
if (channel.container_id) {
|
|
const { remote: isRemote, apiUrl } = await resolveNodeTarget(channel.node_id);
|
|
if (isRemote) {
|
|
await fetch(`${apiUrl}/sidecar/stop`, {
|
|
method: 'POST', headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({ containerId: channel.container_id }),
|
|
signal: AbortSignal.timeout(20000),
|
|
}).catch((e) => console.error('[playout] remote stop failed:', e.message));
|
|
} else {
|
|
await dockerApi('POST', `/containers/${channel.container_id}/stop?t=10`).catch(() => {});
|
|
await dockerApi('DELETE', `/containers/${channel.container_id}?force=true`).catch(() => {});
|
|
}
|
|
}
|
|
const { rows } = await pool.query(
|
|
`UPDATE playout_channels SET status = 'stopped', container_id = NULL, updated_at = NOW()
|
|
WHERE id = $1 RETURNING *`, [channel.id]
|
|
);
|
|
res.json(channelToJson(rows[0]));
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// GET /playout/channels/:id/status — live engine status (proxied to sidecar)
|
|
router.get('/channels/:id/status', async (req, res, next) => {
|
|
try {
|
|
if (req.channel.status !== 'running') {
|
|
return res.json({ running: false, status: req.channel.status });
|
|
}
|
|
const out = await callSidecar(req.channel, '/status', 'GET');
|
|
res.json({ running: true, status: req.channel.status, engine: out });
|
|
} catch (err) {
|
|
res.json({ running: true, status: req.channel.status, engine: null, engine_error: err.message });
|
|
}
|
|
});
|
|
|
|
// GET /playout/channels/:id/hls/index.m3u8 — the live preview playlist, served
|
|
// through the API (not the static /media/live path) so it bypasses the public
|
|
// reverse proxy's static cache. That proxy caches the .m3u8 by path with a
|
|
// multi-second TTL and ignores the origin's no-store, so hls.js's ~1s reloads
|
|
// always got a STALE playlist ("MISSED" forever → monitor stayed black). The
|
|
// /api/ path is not proxy-cached (the status poll updates fine), so this always
|
|
// returns the fresh live edge. Segment (.ts) lines are rewritten to absolute
|
|
// /media/live/<id>/ URLs so they still load from the static path (immutable,
|
|
// caching them is fine). mam-api shares the same /media volume the sidecars
|
|
// write to.
|
|
const HLS_MEDIA_DIR = process.env.PLAYOUT_MEDIA_DIR || '/media';
|
|
router.get('/channels/:id/hls/index.m3u8', async (req, res, next) => {
|
|
try {
|
|
const cid = req.channel.id;
|
|
let body;
|
|
try {
|
|
body = await readFile(`${HLS_MEDIA_DIR}/live/${cid}/index.m3u8`, 'utf8');
|
|
} catch (e) {
|
|
return res.status(404).json({ error: 'No live preview for this channel yet' });
|
|
}
|
|
// Rewrite bare segment names to absolute static URLs.
|
|
const rewritten = body
|
|
.split('\n')
|
|
.map((line) => (/^[^#].*\.ts\s*$/.test(line) ? `/media/live/${cid}/${line.trim()}` : line))
|
|
.join('\n');
|
|
res.setHeader('Content-Type', 'application/vnd.apple.mpegurl');
|
|
res.setHeader('Cache-Control', 'no-store, no-cache, must-revalidate');
|
|
res.setHeader('Pragma', 'no-cache');
|
|
res.send(rewritten);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// ── Transport ────────────────────────────────────────────────────────────────
|
|
async function transport(req, res, action, body = null) {
|
|
if (req.channel.status !== 'running') {
|
|
return res.status(409).json({ error: 'Channel is not running' });
|
|
}
|
|
try { res.json(await callSidecar(req.channel, action, 'POST', body)); }
|
|
catch (err) { res.status(502).json({ error: err.message }); }
|
|
}
|
|
|
|
// POST /playout/channels/:id/play — resolve the channel's playlist, stage-check,
|
|
// and hand the engine the ordered list of ready clips.
|
|
router.post('/channels/:id/play', requireChannelEdit, async (req, res, next) => {
|
|
try {
|
|
if (req.channel.status !== 'running') {
|
|
return res.status(409).json({ error: 'Start the channel before playing' });
|
|
}
|
|
const { playlist_id } = req.body || {};
|
|
if (!playlist_id) return res.status(400).json({ error: 'playlist_id is required' });
|
|
|
|
const pl = await pool.query('SELECT * FROM playout_playlists WHERE id = $1 AND channel_id = $2',
|
|
[playlist_id, req.channel.id]);
|
|
if (pl.rows.length === 0) return res.status(404).json({ error: 'Playlist not found for this channel' });
|
|
|
|
const items = await pool.query(
|
|
`SELECT i.*, a.filename AS clip_name, a.duration_ms AS asset_duration_ms
|
|
FROM playout_items i JOIN assets a ON a.id = i.asset_id
|
|
WHERE i.playlist_id = $1 ORDER BY i.sort_order ASC`, [playlist_id]);
|
|
|
|
const notReady = items.rows.filter((i) => i.media_status !== 'ready' || !i.media_path);
|
|
if (notReady.length > 0) {
|
|
return res.status(409).json({
|
|
error: 'Some items are not staged yet',
|
|
pending: notReady.map((i) => i.id),
|
|
});
|
|
}
|
|
|
|
const payload = {
|
|
loop: pl.rows[0].loop,
|
|
items: items.rows.map((i) => ({
|
|
id: i.id, asset_id: i.asset_id, media_path: i.media_path,
|
|
in_point: i.in_point ? Number(i.in_point) : null,
|
|
out_point: i.out_point ? Number(i.out_point) : null,
|
|
transition: i.transition, transition_ms: i.transition_ms,
|
|
clip_name: i.clip_name,
|
|
asset_duration_ms: i.asset_duration_ms != null ? Number(i.asset_duration_ms) : null,
|
|
})),
|
|
};
|
|
// callSidecar throws on network/timeout errors. Return 502 (not 409) so
|
|
// the UI and operators know it's a gateway problem, not a state conflict.
|
|
let out;
|
|
try {
|
|
out = await callSidecar(req.channel, '/playlist/load', 'POST', payload);
|
|
} catch (err) {
|
|
return res.status(502).json({ error: 'Sidecar unreachable: ' + err.message });
|
|
}
|
|
res.json(out);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
router.post('/channels/:id/pause', requireChannelEdit, (req, res) => transport(req, res, '/transport/pause'));
|
|
router.post('/channels/:id/resume', requireChannelEdit, (req, res) => transport(req, res, '/transport/resume'));
|
|
router.post('/channels/:id/skip', requireChannelEdit, (req, res) => transport(req, res, '/transport/skip'));
|
|
router.post('/channels/:id/stop-playback', requireChannelEdit, (req, res) => transport(req, res, '/channel/stop'));
|
|
|
|
// GET /playout/channels/:id/asrun — as-run log
|
|
router.get('/channels/:id/asrun', async (req, res, next) => {
|
|
try {
|
|
const { rows } = await pool.query(
|
|
`SELECT * FROM playout_as_run WHERE channel_id = $1 ORDER BY started_at DESC LIMIT 500`,
|
|
[req.channel.id]);
|
|
res.json(rows);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// ── Playlists ────────────────────────────────────────────────────────────────
|
|
async function loadChannelForBody(req, res, next) {
|
|
// For playlist/item routes the channel is referenced indirectly; resolve it
|
|
// and assert edit. Used on create/mutate routes that carry channel_id.
|
|
const channelId = req.body.channel_id || req.query.channel_id;
|
|
if (!channelId) return res.status(400).json({ error: 'channel_id is required' });
|
|
try {
|
|
const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]);
|
|
if (rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
|
|
req.channel = rows[0];
|
|
await assertProjectAccess(req.user, req.channel.project_id, 'edit');
|
|
next();
|
|
} catch (err) { next(err); }
|
|
}
|
|
|
|
// GET /playout/playlists?channel_id=...
|
|
router.get('/playlists', async (req, res, next) => {
|
|
try {
|
|
const channelId = req.query.channel_id;
|
|
if (!channelId) return res.status(400).json({ error: 'channel_id is required' });
|
|
const ch = await pool.query('SELECT project_id FROM playout_channels WHERE id = $1', [channelId]);
|
|
if (ch.rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
|
|
await assertProjectAccess(req.user, ch.rows[0].project_id, 'view');
|
|
const { rows } = await pool.query(
|
|
'SELECT * FROM playout_playlists WHERE channel_id = $1 ORDER BY created_at ASC', [channelId]);
|
|
res.json(rows);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /playout/playlists
|
|
router.post('/playlists', loadChannelForBody, async (req, res, next) => {
|
|
try {
|
|
const { name, loop = false } = req.body || {};
|
|
if (!name) return res.status(400).json({ error: 'name is required' });
|
|
const { rows } = await pool.query(
|
|
'INSERT INTO playout_playlists (channel_id, name, loop) VALUES ($1,$2,$3) RETURNING *',
|
|
[req.channel.id, name.trim(), !!loop]);
|
|
res.status(201).json(rows[0]);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// GET /playout/playlists/:plid/items
|
|
router.get('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => {
|
|
try {
|
|
const pl = await pool.query(
|
|
`SELECT p.*, c.project_id FROM playout_playlists p
|
|
JOIN playout_channels c ON c.id = p.channel_id WHERE p.id = $1`, [req.params.plid]);
|
|
if (pl.rows.length === 0) return res.status(404).json({ error: 'Playlist not found' });
|
|
await assertProjectAccess(req.user, pl.rows[0].project_id, 'view');
|
|
const { rows } = await pool.query(
|
|
`SELECT i.*, a.filename AS clip_name, a.duration_ms AS asset_duration_ms
|
|
FROM playout_items i JOIN assets a ON a.id = i.asset_id
|
|
WHERE i.playlist_id = $1 ORDER BY i.sort_order ASC`, [req.params.plid]);
|
|
res.json(rows);
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// Helper: load a playlist + assert edit on its channel's project.
|
|
async function loadPlaylistEdit(plid, user) {
|
|
const pl = await pool.query(
|
|
`SELECT p.*, c.project_id FROM playout_playlists p
|
|
JOIN playout_channels c ON c.id = p.channel_id WHERE p.id = $1`, [plid]);
|
|
if (pl.rows.length === 0) { throw Object.assign(new Error('Playlist not found'), { httpStatus: 404 }); }
|
|
await assertProjectAccess(user, pl.rows[0].project_id, 'edit');
|
|
return pl.rows[0];
|
|
}
|
|
|
|
// POST /playout/playlists/:plid/items — add an asset to a playlist
|
|
router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => {
|
|
try {
|
|
await loadPlaylistEdit(req.params.plid, req.user);
|
|
const { asset_id, in_point = null, out_point = null,
|
|
transition = 'cut', transition_ms = 0 } = req.body || {};
|
|
if (!asset_id) return res.status(400).json({ error: 'asset_id is required' });
|
|
|
|
// Append at the end of the playlist.
|
|
const ord = await pool.query(
|
|
'SELECT COALESCE(MAX(sort_order), -1) + 1 AS next FROM playout_items WHERE playlist_id = $1',
|
|
[req.params.plid]);
|
|
const { rows } = await pool.query(
|
|
`INSERT INTO playout_items (playlist_id, asset_id, sort_order, in_point, out_point, transition, transition_ms)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING *`,
|
|
[req.params.plid, asset_id, ord.rows[0].next, in_point, out_point, transition, transition_ms]);
|
|
|
|
// Kick staging immediately so the clip is air-ready by the time the operator
|
|
// hits play.
|
|
await stageQueue.add('stage', { itemId: rows[0].id, assetId: asset_id }).catch((e) =>
|
|
console.error('[playout] failed to enqueue stage job:', e.message));
|
|
|
|
res.status(201).json(rows[0]);
|
|
} catch (err) {
|
|
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// PUT /playout/playlists/:plid/reorder — body { order: [itemId, itemId, ...] }
|
|
router.put('/playlists/:plid/reorder', validateUuid('plid'), async (req, res, next) => {
|
|
const client = await pool.connect();
|
|
try {
|
|
await loadPlaylistEdit(req.params.plid, req.user);
|
|
const { order } = req.body || {};
|
|
if (!Array.isArray(order)) return res.status(400).json({ error: 'order must be an array of item ids' });
|
|
await client.query('BEGIN');
|
|
for (let i = 0; i < order.length; i++) {
|
|
await client.query(
|
|
'UPDATE playout_items SET sort_order = $1, updated_at = NOW() WHERE id = $2 AND playlist_id = $3',
|
|
[i, order[i], req.params.plid]);
|
|
}
|
|
await client.query('COMMIT');
|
|
res.json({ reordered: order.length });
|
|
} catch (err) {
|
|
await client.query('ROLLBACK').catch(() => {});
|
|
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
|
|
next(err);
|
|
} finally { client.release(); }
|
|
});
|
|
|
|
// DELETE /playout/items/:itemId
|
|
router.delete('/items/:itemId', validateUuid('itemId'), async (req, res, next) => {
|
|
try {
|
|
const it = await pool.query(
|
|
`SELECT i.id, c.project_id FROM playout_items i
|
|
JOIN playout_playlists p ON p.id = i.playlist_id
|
|
JOIN playout_channels c ON c.id = p.channel_id WHERE i.id = $1`, [req.params.itemId]);
|
|
if (it.rows.length === 0) return res.status(404).json({ error: 'Item not found' });
|
|
await assertProjectAccess(req.user, it.rows[0].project_id, 'edit');
|
|
await pool.query('DELETE FROM playout_items WHERE id = $1', [req.params.itemId]);
|
|
res.json({ deleted: true });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// POST /playout/items/:itemId/stage — (re)kick staging for one item
|
|
router.post('/items/:itemId/stage', validateUuid('itemId'), async (req, res, next) => {
|
|
try {
|
|
const it = await pool.query(
|
|
`SELECT i.id, i.asset_id, c.project_id FROM playout_items i
|
|
JOIN playout_playlists p ON p.id = i.playlist_id
|
|
JOIN playout_channels c ON c.id = p.channel_id WHERE i.id = $1`, [req.params.itemId]);
|
|
if (it.rows.length === 0) return res.status(404).json({ error: 'Item not found' });
|
|
await assertProjectAccess(req.user, it.rows[0].project_id, 'edit');
|
|
await pool.query("UPDATE playout_items SET media_status = 'pending' WHERE id = $1", [req.params.itemId]);
|
|
await stageQueue.add('stage', { itemId: it.rows[0].id, assetId: it.rows[0].asset_id });
|
|
res.json({ queued: true });
|
|
} catch (err) { next(err); }
|
|
});
|
|
|
|
// ── Failover (called by scheduler tick) ──────────────────────────────────────
|
|
// Tear down a (presumed dead) sidecar and re-spawn it on another cluster node
|
|
// matching the original capability. DeckLink channels are excluded — the
|
|
// device-index pinning makes blind re-placement risky, so they alert only.
|
|
//
|
|
// Returns { restarted: true, new_node_id } on success, or { restarted: false,
|
|
// reason } when no eligible node exists or the channel is decklink.
|
|
export async function restartChannel(channelId) {
|
|
const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]);
|
|
if (rows.length === 0) return { restarted: false, reason: 'channel not found' };
|
|
const channel = rows[0];
|
|
|
|
if (channel.output_type === 'decklink') {
|
|
return { restarted: false, reason: 'decklink channels are alert-only' };
|
|
}
|
|
|
|
// Best-effort teardown of the old container — it may already be dead.
|
|
if (channel.container_id) {
|
|
const { remote, apiUrl } = await resolveNodeTarget(channel.node_id);
|
|
if (remote && apiUrl) {
|
|
await fetch(`${apiUrl}/sidecar/stop`, {
|
|
method: 'POST', headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({ containerId: channel.container_id }),
|
|
signal: AbortSignal.timeout(10000),
|
|
}).catch(() => {});
|
|
} else {
|
|
await dockerApi('DELETE', `/containers/${channel.container_id}?force=true`).catch(() => {});
|
|
}
|
|
}
|
|
|
|
// Pick a different healthy node. For NDI/SRT/RTMP every online node is
|
|
// eligible (no hardware contention). Prefer the original if it's still
|
|
// online — the failure may have been transient.
|
|
const nodes = await pool.query(
|
|
`SELECT id, hostname, api_url, last_seen_at FROM cluster_nodes
|
|
WHERE id <> $1 AND last_seen_at > NOW() - INTERVAL '60 seconds'
|
|
ORDER BY last_seen_at DESC LIMIT 1`,
|
|
[channel.node_id]
|
|
);
|
|
if (nodes.rows.length === 0) {
|
|
await pool.query(
|
|
"UPDATE playout_channels SET status = 'error', error_message = $1 WHERE id = $2",
|
|
['no healthy node available for failover', channel.id]
|
|
);
|
|
return { restarted: false, reason: 'no eligible node' };
|
|
}
|
|
const newNodeId = nodes.rows[0].id;
|
|
|
|
// Move the channel to the new node + bump the restart counters; the operator
|
|
// UI surfaces these to flag restarts. container_meta is cleared so the new
|
|
// spawn re-derives the sidecar URL.
|
|
const { rows: moved } = await pool.query(
|
|
`UPDATE playout_channels
|
|
SET node_id = $1, status = 'stopped', container_id = NULL, container_meta = '{}'::jsonb,
|
|
restart_count = restart_count + 1, last_restart_at = NOW(),
|
|
error_message = NULL, updated_at = NOW()
|
|
WHERE id = $2 RETURNING *`,
|
|
[newNodeId, channel.id]
|
|
);
|
|
|
|
// Spawn the sidecar directly via the shared helper. We do NOT route through
|
|
// the HTTP /start endpoint: its guard rejects status 'starting'/'running' and
|
|
// would deadlock the failover. spawnChannelSidecar flips the channel to
|
|
// running (or leaves it 'error' and throws on spawn failure).
|
|
try {
|
|
await spawnChannelSidecar(moved[0]);
|
|
return { restarted: true, new_node_id: newNodeId };
|
|
} catch (err) {
|
|
return { restarted: false, reason: `respawn failed: ${err.message}` };
|
|
}
|
|
}
|
|
|
|
export default router;
|