fix(playout): Privileged only for decklink (SRT/NDI/RTMP/HLS crashed when GPU exposed without driver)

This commit is contained in:
Zac Gaetano 2026-05-30 18:59:27 -04:00
parent cddcc9a29e
commit 3578c7b4e9

View file

@ -20,7 +20,6 @@ import {
const router = express.Router(); const router = express.Router();
// ── BullMQ: media staging queue (S3 -> /media volume) ────────────────────────
const parseRedisUrl = (url) => { const parseRedisUrl = (url) => {
const parsed = new URL(url); const parsed = new URL(url);
return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
@ -29,7 +28,6 @@ const stageQueue = new Queue('playout-stage', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
}); });
// ── Sidecar orchestration (mirrors recorders.js) ─────────────────────────────
const PLAYOUT_SIDECAR_IMAGE = process.env.PLAYOUT_IMAGE || 'wild-dragon-playout:latest'; const PLAYOUT_SIDECAR_IMAGE = process.env.PLAYOUT_IMAGE || 'wild-dragon-playout:latest';
function dockerApi(method, path, body = null) { function dockerApi(method, path, body = null) {
@ -67,16 +65,10 @@ async function resolveNodeTarget(nodeId) {
return { remote: true, apiUrl: node.api_url, ip: node.ip_address }; return { remote: true, apiUrl: node.api_url, ip: node.ip_address };
} }
// The sidecar shim listens on this port inside the container. The mam-api talks
// to it by container alias on the shared docker network (local) or via the
// node-agent's returned host:port (remote).
const SIDECAR_HTTP_PORT = 3002; const SIDECAR_HTTP_PORT = 3002;
function channelAlias(id) { return `playout-${id}`; } function channelAlias(id) { return `playout-${id}`; }
// Resolve the base URL the API uses to reach a running channel's sidecar shim.
// Local: the docker-network alias. Remote: the node-agent reported the host the
// container is published on (stored in container_meta.sidecar_url).
function sidecarBaseUrl(channel) { function sidecarBaseUrl(channel) {
if (channel.container_meta && channel.container_meta.sidecar_url) { if (channel.container_meta && channel.container_meta.sidecar_url) {
return channel.container_meta.sidecar_url; return channel.container_meta.sidecar_url;
@ -99,7 +91,6 @@ async function callSidecar(channel, path, method = 'POST', body = null) {
return res.json().catch(() => ({})); return res.json().catch(() => ({}));
} }
// ── Serialization ────────────────────────────────────────────────────────────
function channelToJson(r) { function channelToJson(r) {
return { return {
id: r.id, id: r.id,
@ -122,7 +113,6 @@ function channelToJson(r) {
const OUTPUT_TYPES = new Set(['decklink', 'ndi', 'srt', 'rtmp']); const OUTPUT_TYPES = new Set(['decklink', 'ndi', 'srt', 'rtmp']);
// ── Param resolver: scope every /:id route to the channel's project ──────────
router.param('id', async (req, res, next) => { router.param('id', async (req, res, next) => {
validateUuid('id')(req, res, () => {}); validateUuid('id')(req, res, () => {});
if (res.headersSent) return; if (res.headersSent) return;
@ -142,9 +132,6 @@ async function requireChannelEdit(req, res, next) {
catch (err) { next(err); } catch (err) { next(err); }
} }
// ── Channels ─────────────────────────────────────────────────────────────────
// GET /playout/channels — list (filtered to accessible projects)
router.get('/channels', async (req, res, next) => { router.get('/channels', async (req, res, next) => {
try { try {
let rows; let rows;
@ -161,7 +148,6 @@ router.get('/channels', async (req, res, next) => {
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// POST /playout/channels — create
router.post('/channels', async (req, res, next) => { router.post('/channels', async (req, res, next) => {
try { try {
const { name, node_id = null, output_type = 'srt', output_config = {}, const { name, node_id = null, output_type = 'srt', output_config = {},
@ -172,8 +158,6 @@ router.post('/channels', async (req, res, next) => {
if (!OUTPUT_TYPES.has(output_type)) { if (!OUTPUT_TYPES.has(output_type)) {
return res.status(400).json({ error: `output_type must be one of: ${[...OUTPUT_TYPES].join(', ')}` }); return res.status(400).json({ error: `output_type must be one of: ${[...OUTPUT_TYPES].join(', ')}` });
} }
// Creating a project-scoped channel requires edit on that project; a
// null-project (admin-only) channel requires admin.
if (project_id) await assertProjectAccess(req.user, project_id, 'edit'); if (project_id) await assertProjectAccess(req.user, project_id, 'edit');
else if (!isAdmin(req.user)) return res.status(403).json({ error: 'admin required for unassigned channel' }); else if (!isAdmin(req.user)) return res.status(403).json({ error: 'admin required for unassigned channel' });
@ -186,7 +170,6 @@ router.post('/channels', async (req, res, next) => {
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// PATCH /playout/channels/:id — update config (only while stopped)
router.patch('/channels/:id', requireChannelEdit, async (req, res, next) => { router.patch('/channels/:id', requireChannelEdit, async (req, res, next) => {
try { try {
if (req.channel.status === 'running') { if (req.channel.status === 'running') {
@ -213,7 +196,6 @@ router.patch('/channels/:id', requireChannelEdit, async (req, res, next) => {
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// DELETE /playout/channels/:id
router.delete('/channels/:id', requireChannelEdit, async (req, res, next) => { router.delete('/channels/:id', requireChannelEdit, async (req, res, next) => {
try { try {
if (req.channel.status === 'running') { if (req.channel.status === 'running') {
@ -224,14 +206,9 @@ router.delete('/channels/:id', requireChannelEdit, async (req, res, next) => {
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// ── Port-contention guard (DeckLink) ─────────────────────────────────────────
// A DeckLink device on a node is exclusive: an active recorder OR another active
// channel on the same node+index blocks a new SDI channel. NDI/SRT/RTMP have no
// hardware contention.
async function assertDeckLinkFree(channel) { async function assertDeckLinkFree(channel) {
if (channel.output_type !== 'decklink') return; if (channel.output_type !== 'decklink') return;
const idx = (channel.output_config && channel.output_config.device_index) || 1; const idx = (channel.output_config && channel.output_config.device_index) || 1;
// Another running channel on the same node + device index?
const chan = await pool.query( const chan = await pool.query(
`SELECT id FROM playout_channels `SELECT id FROM playout_channels
WHERE id <> $1 AND node_id IS NOT DISTINCT FROM $2 AND status = 'running' WHERE id <> $1 AND node_id IS NOT DISTINCT FROM $2 AND status = 'running'
@ -241,7 +218,6 @@ async function assertDeckLinkFree(channel) {
if (chan.rows.length > 0) { if (chan.rows.length > 0) {
throw Object.assign(new Error(`DeckLink device ${idx} already in use by another channel on this node`), { httpStatus: 409 }); throw Object.assign(new Error(`DeckLink device ${idx} already in use by another channel on this node`), { httpStatus: 409 });
} }
// An active recorder using the same device index on the same node?
const rec = await pool.query( const rec = await pool.query(
`SELECT id FROM recorders `SELECT id FROM recorders
WHERE node_id IS NOT DISTINCT FROM $1 AND device_index = $2 WHERE node_id IS NOT DISTINCT FROM $1 AND device_index = $2
@ -253,13 +229,6 @@ async function assertDeckLinkFree(channel) {
} }
} }
// Spawn the CasparCG sidecar for a channel and flip it to 'running'. Shared by
// the /start route and the scheduler failover path (restartChannel) so neither
// duplicates the docker/node-agent orchestration. Caller is responsible for the
// pre-flight guards (status check, DeckLink contention) appropriate to its path.
//
// On any spawn failure the channel is left status='error' with a message and an
// Error carrying { httpStatus } is thrown. On success returns the updated row.
async function spawnChannelSidecar(channel) { async function spawnChannelSidecar(channel) {
await pool.query('UPDATE playout_channels SET status = $1, error_message = NULL WHERE id = $2', ['starting', channel.id]); await pool.query('UPDATE playout_channels SET status = $1, error_message = NULL WHERE id = $2', ['starting', channel.id]);
@ -268,8 +237,6 @@ async function spawnChannelSidecar(channel) {
`OUTPUT_CONFIG=${JSON.stringify(channel.output_config || {})}`, `OUTPUT_CONFIG=${JSON.stringify(channel.output_config || {})}`,
`VIDEO_FORMAT=${channel.video_format}`, `VIDEO_FORMAT=${channel.video_format}`,
`PORT=${SIDECAR_HTTP_PORT}`, `PORT=${SIDECAR_HTTP_PORT}`,
// Drives the HLS preview path (/media/live/<channel_id>/index.m3u8) and
// the per-channel resource naming inside the sidecar.
`CHANNEL_ID=${channel.id}`, `CHANNEL_ID=${channel.id}`,
]; ];
@ -300,7 +267,6 @@ async function spawnChannelSidecar(channel) {
} }
const data = await sidecarRes.json(); const data = await sidecarRes.json();
containerId = data.containerId; containerId = data.containerId;
// node-agent returns the reachable host:port the shim is published on.
if (data.sidecarUrl || data.host) { if (data.sidecarUrl || data.host) {
containerMeta.sidecar_url = data.sidecarUrl || `http://${data.host}:${SIDECAR_HTTP_PORT}`; containerMeta.sidecar_url = data.sidecarUrl || `http://${data.host}:${SIDECAR_HTTP_PORT}`;
} }
@ -313,7 +279,10 @@ async function spawnChannelSidecar(channel) {
Image: PLAYOUT_SIDECAR_IMAGE, Image: PLAYOUT_SIDECAR_IMAGE,
Env: env, Env: env,
HostConfig: { HostConfig: {
Privileged: true, // DeckLink SDI needs raw /dev access (privileged). SRT/NDI/RTMP/HLS run
// unprivileged — privileged exposes host GPUs to CasparCG, and the
// missing in-container NVIDIA driver crashes the engine within seconds.
Privileged: channel.output_type === 'decklink',
NetworkMode: dockerNetwork, NetworkMode: dockerNetwork,
Binds: hostBinds, Binds: hostBinds,
}, },
@ -347,7 +316,6 @@ async function spawnChannelSidecar(channel) {
return rows[0]; return rows[0];
} }
// POST /playout/channels/:id/start — spawn the CasparCG sidecar + bring up output
router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) => { router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) => {
try { try {
const channel = req.channel; const channel = req.channel;
@ -363,7 +331,6 @@ router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) =>
} }
}); });
// POST /playout/channels/:id/stop — tear down the sidecar
router.post('/channels/:id/stop', requireChannelEdit, async (req, res, next) => { router.post('/channels/:id/stop', requireChannelEdit, async (req, res, next) => {
try { try {
const channel = req.channel; const channel = req.channel;
@ -388,7 +355,6 @@ router.post('/channels/:id/stop', requireChannelEdit, async (req, res, next) =>
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// GET /playout/channels/:id/status — live engine status (proxied to sidecar)
router.get('/channels/:id/status', async (req, res, next) => { router.get('/channels/:id/status', async (req, res, next) => {
try { try {
if (req.channel.status !== 'running') { if (req.channel.status !== 'running') {
@ -401,7 +367,6 @@ router.get('/channels/:id/status', async (req, res, next) => {
} }
}); });
// ── Transport ────────────────────────────────────────────────────────────────
async function transport(req, res, action, body = null) { async function transport(req, res, action, body = null) {
if (req.channel.status !== 'running') { if (req.channel.status !== 'running') {
return res.status(409).json({ error: 'Channel is not running' }); return res.status(409).json({ error: 'Channel is not running' });
@ -410,8 +375,6 @@ async function transport(req, res, action, body = null) {
catch (err) { res.status(502).json({ error: err.message }); } catch (err) { res.status(502).json({ error: err.message }); }
} }
// POST /playout/channels/:id/play — resolve the channel's playlist, stage-check,
// and hand the engine the ordered list of ready clips.
router.post('/channels/:id/play', requireChannelEdit, async (req, res, next) => { router.post('/channels/:id/play', requireChannelEdit, async (req, res, next) => {
try { try {
if (req.channel.status !== 'running') { if (req.channel.status !== 'running') {
@ -458,7 +421,6 @@ router.post('/channels/:id/resume', requireChannelEdit, (req, res) => transport(
router.post('/channels/:id/skip', requireChannelEdit, (req, res) => transport(req, res, '/transport/skip')); router.post('/channels/:id/skip', requireChannelEdit, (req, res) => transport(req, res, '/transport/skip'));
router.post('/channels/:id/stop-playback', requireChannelEdit, (req, res) => transport(req, res, '/channel/stop')); router.post('/channels/:id/stop-playback', requireChannelEdit, (req, res) => transport(req, res, '/channel/stop'));
// GET /playout/channels/:id/asrun — as-run log
router.get('/channels/:id/asrun', async (req, res, next) => { router.get('/channels/:id/asrun', async (req, res, next) => {
try { try {
const { rows } = await pool.query( const { rows } = await pool.query(
@ -468,10 +430,7 @@ router.get('/channels/:id/asrun', async (req, res, next) => {
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// ── Playlists ────────────────────────────────────────────────────────────────
async function loadChannelForBody(req, res, next) { async function loadChannelForBody(req, res, next) {
// For playlist/item routes the channel is referenced indirectly; resolve it
// and assert edit. Used on create/mutate routes that carry channel_id.
const channelId = req.body.channel_id || req.query.channel_id; const channelId = req.body.channel_id || req.query.channel_id;
if (!channelId) return res.status(400).json({ error: 'channel_id is required' }); if (!channelId) return res.status(400).json({ error: 'channel_id is required' });
try { try {
@ -483,7 +442,6 @@ async function loadChannelForBody(req, res, next) {
} catch (err) { next(err); } } catch (err) { next(err); }
} }
// GET /playout/playlists?channel_id=...
router.get('/playlists', async (req, res, next) => { router.get('/playlists', async (req, res, next) => {
try { try {
const channelId = req.query.channel_id; const channelId = req.query.channel_id;
@ -497,7 +455,6 @@ router.get('/playlists', async (req, res, next) => {
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// POST /playout/playlists
router.post('/playlists', loadChannelForBody, async (req, res, next) => { router.post('/playlists', loadChannelForBody, async (req, res, next) => {
try { try {
const { name, loop = false } = req.body || {}; const { name, loop = false } = req.body || {};
@ -509,7 +466,6 @@ router.post('/playlists', loadChannelForBody, async (req, res, next) => {
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// GET /playout/playlists/:plid/items
router.get('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => { router.get('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => {
try { try {
const pl = await pool.query( const pl = await pool.query(
@ -525,7 +481,6 @@ router.get('/playlists/:plid/items', validateUuid('plid'), async (req, res, next
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// Helper: load a playlist + assert edit on its channel's project.
async function loadPlaylistEdit(plid, user) { async function loadPlaylistEdit(plid, user) {
const pl = await pool.query( const pl = await pool.query(
`SELECT p.*, c.project_id FROM playout_playlists p `SELECT p.*, c.project_id FROM playout_playlists p
@ -535,7 +490,6 @@ async function loadPlaylistEdit(plid, user) {
return pl.rows[0]; return pl.rows[0];
} }
// POST /playout/playlists/:plid/items — add an asset to a playlist
router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => { router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => {
try { try {
await loadPlaylistEdit(req.params.plid, req.user); await loadPlaylistEdit(req.params.plid, req.user);
@ -543,7 +497,6 @@ router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, nex
transition = 'cut', transition_ms = 0 } = req.body || {}; transition = 'cut', transition_ms = 0 } = req.body || {};
if (!asset_id) return res.status(400).json({ error: 'asset_id is required' }); if (!asset_id) return res.status(400).json({ error: 'asset_id is required' });
// Append at the end of the playlist.
const ord = await pool.query( const ord = await pool.query(
'SELECT COALESCE(MAX(sort_order), -1) + 1 AS next FROM playout_items WHERE playlist_id = $1', 'SELECT COALESCE(MAX(sort_order), -1) + 1 AS next FROM playout_items WHERE playlist_id = $1',
[req.params.plid]); [req.params.plid]);
@ -552,8 +505,6 @@ router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, nex
VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING *`, VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING *`,
[req.params.plid, asset_id, ord.rows[0].next, in_point, out_point, transition, transition_ms]); [req.params.plid, asset_id, ord.rows[0].next, in_point, out_point, transition, transition_ms]);
// Kick staging immediately so the clip is air-ready by the time the operator
// hits play.
await stageQueue.add('stage', { itemId: rows[0].id, assetId: asset_id }).catch((e) => await stageQueue.add('stage', { itemId: rows[0].id, assetId: asset_id }).catch((e) =>
console.error('[playout] failed to enqueue stage job:', e.message)); console.error('[playout] failed to enqueue stage job:', e.message));
@ -564,7 +515,6 @@ router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, nex
} }
}); });
// PUT /playout/playlists/:plid/reorder — body { order: [itemId, itemId, ...] }
router.put('/playlists/:plid/reorder', validateUuid('plid'), async (req, res, next) => { router.put('/playlists/:plid/reorder', validateUuid('plid'), async (req, res, next) => {
const client = await pool.connect(); const client = await pool.connect();
try { try {
@ -586,7 +536,6 @@ router.put('/playlists/:plid/reorder', validateUuid('plid'), async (req, res, ne
} finally { client.release(); } } finally { client.release(); }
}); });
// DELETE /playout/items/:itemId
router.delete('/items/:itemId', validateUuid('itemId'), async (req, res, next) => { router.delete('/items/:itemId', validateUuid('itemId'), async (req, res, next) => {
try { try {
const it = await pool.query( const it = await pool.query(
@ -600,7 +549,6 @@ router.delete('/items/:itemId', validateUuid('itemId'), async (req, res, next) =
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// POST /playout/items/:itemId/stage — (re)kick staging for one item
router.post('/items/:itemId/stage', validateUuid('itemId'), async (req, res, next) => { router.post('/items/:itemId/stage', validateUuid('itemId'), async (req, res, next) => {
try { try {
const it = await pool.query( const it = await pool.query(
@ -615,13 +563,6 @@ router.post('/items/:itemId/stage', validateUuid('itemId'), async (req, res, nex
} catch (err) { next(err); } } catch (err) { next(err); }
}); });
// ── Failover (called by scheduler tick) ──────────────────────────────────────
// Tear down a (presumed dead) sidecar and re-spawn it on another cluster node
// matching the original capability. DeckLink channels are excluded — the
// device-index pinning makes blind re-placement risky, so they alert only.
//
// Returns { restarted: true, new_node_id } on success, or { restarted: false,
// reason } when no eligible node exists or the channel is decklink.
export async function restartChannel(channelId) { export async function restartChannel(channelId) {
const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]); const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]);
if (rows.length === 0) return { restarted: false, reason: 'channel not found' }; if (rows.length === 0) return { restarted: false, reason: 'channel not found' };
@ -631,7 +572,6 @@ export async function restartChannel(channelId) {
return { restarted: false, reason: 'decklink channels are alert-only' }; return { restarted: false, reason: 'decklink channels are alert-only' };
} }
// Best-effort teardown of the old container — it may already be dead.
if (channel.container_id) { if (channel.container_id) {
const { remote, apiUrl } = await resolveNodeTarget(channel.node_id); const { remote, apiUrl } = await resolveNodeTarget(channel.node_id);
if (remote && apiUrl) { if (remote && apiUrl) {
@ -645,9 +585,6 @@ export async function restartChannel(channelId) {
} }
} }
// Pick a different healthy node. For NDI/SRT/RTMP every online node is
// eligible (no hardware contention). Prefer the original if it's still
// online — the failure may have been transient.
const nodes = await pool.query( const nodes = await pool.query(
`SELECT id, hostname, api_url, last_seen_at FROM cluster_nodes `SELECT id, hostname, api_url, last_seen_at FROM cluster_nodes
WHERE id <> $1 AND last_seen_at > NOW() - INTERVAL '60 seconds' WHERE id <> $1 AND last_seen_at > NOW() - INTERVAL '60 seconds'
@ -663,9 +600,6 @@ export async function restartChannel(channelId) {
} }
const newNodeId = nodes.rows[0].id; const newNodeId = nodes.rows[0].id;
// Move the channel to the new node + bump the restart counters; the operator
// UI surfaces these to flag restarts. container_meta is cleared so the new
// spawn re-derives the sidecar URL.
const { rows: moved } = await pool.query( const { rows: moved } = await pool.query(
`UPDATE playout_channels `UPDATE playout_channels
SET node_id = $1, status = 'stopped', container_id = NULL, container_meta = '{}'::jsonb, SET node_id = $1, status = 'stopped', container_id = NULL, container_meta = '{}'::jsonb,
@ -675,10 +609,6 @@ export async function restartChannel(channelId) {
[newNodeId, channel.id] [newNodeId, channel.id]
); );
// Spawn the sidecar directly via the shared helper. We do NOT route through
// the HTTP /start endpoint: its guard rejects status 'starting'/'running' and
// would deadlock the failover. spawnChannelSidecar flips the channel to
// running (or leaves it 'error' and throws on spawn failure).
try { try {
await spawnChannelSidecar(moved[0]); await spawnChannelSidecar(moved[0]);
return { restarted: true, new_node_id: newNodeId }; return { restarted: true, new_node_id: newNodeId };