From 3578c7b4e93a58a2b0f6354a4b89b2fa35c54e92 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Sat, 30 May 2026 18:59:27 -0400 Subject: [PATCH] fix(playout): Privileged only for decklink (SRT/NDI/RTMP/HLS crashed when GPU exposed without driver) --- services/mam-api/src/routes/playout.js | 78 ++------------------------ 1 file changed, 4 insertions(+), 74 deletions(-) diff --git a/services/mam-api/src/routes/playout.js b/services/mam-api/src/routes/playout.js index 4b68e0c..de1dcbb 100644 --- a/services/mam-api/src/routes/playout.js +++ b/services/mam-api/src/routes/playout.js @@ -20,7 +20,6 @@ import { const router = express.Router(); -// ── BullMQ: media staging queue (S3 -> /media volume) ──────────────────────── const parseRedisUrl = (url) => { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port, 10) }; @@ -29,7 +28,6 @@ const stageQueue = new Queue('playout-stage', { connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'), }); -// ── Sidecar orchestration (mirrors recorders.js) ───────────────────────────── const PLAYOUT_SIDECAR_IMAGE = process.env.PLAYOUT_IMAGE || 'wild-dragon-playout:latest'; function dockerApi(method, path, body = null) { @@ -67,16 +65,10 @@ async function resolveNodeTarget(nodeId) { return { remote: true, apiUrl: node.api_url, ip: node.ip_address }; } -// The sidecar shim listens on this port inside the container. The mam-api talks -// to it by container alias on the shared docker network (local) or via the -// node-agent's returned host:port (remote). const SIDECAR_HTTP_PORT = 3002; function channelAlias(id) { return `playout-${id}`; } -// Resolve the base URL the API uses to reach a running channel's sidecar shim. -// Local: the docker-network alias. Remote: the node-agent reported the host the -// container is published on (stored in container_meta.sidecar_url). function sidecarBaseUrl(channel) { if (channel.container_meta && channel.container_meta.sidecar_url) { return channel.container_meta.sidecar_url; @@ -99,7 +91,6 @@ async function callSidecar(channel, path, method = 'POST', body = null) { return res.json().catch(() => ({})); } -// ── Serialization ──────────────────────────────────────────────────────────── function channelToJson(r) { return { id: r.id, @@ -122,7 +113,6 @@ function channelToJson(r) { const OUTPUT_TYPES = new Set(['decklink', 'ndi', 'srt', 'rtmp']); -// ── Param resolver: scope every /:id route to the channel's project ────────── router.param('id', async (req, res, next) => { validateUuid('id')(req, res, () => {}); if (res.headersSent) return; @@ -142,9 +132,6 @@ async function requireChannelEdit(req, res, next) { catch (err) { next(err); } } -// ── Channels ───────────────────────────────────────────────────────────────── - -// GET /playout/channels — list (filtered to accessible projects) router.get('/channels', async (req, res, next) => { try { let rows; @@ -161,7 +148,6 @@ router.get('/channels', async (req, res, next) => { } catch (err) { next(err); } }); -// POST /playout/channels — create router.post('/channels', async (req, res, next) => { try { const { name, node_id = null, output_type = 'srt', output_config = {}, @@ -172,8 +158,6 @@ router.post('/channels', async (req, res, next) => { if (!OUTPUT_TYPES.has(output_type)) { return res.status(400).json({ error: `output_type must be one of: ${[...OUTPUT_TYPES].join(', ')}` }); } - // Creating a project-scoped channel requires edit on that project; a - // null-project (admin-only) channel requires admin. if (project_id) await assertProjectAccess(req.user, project_id, 'edit'); else if (!isAdmin(req.user)) return res.status(403).json({ error: 'admin required for unassigned channel' }); @@ -186,7 +170,6 @@ router.post('/channels', async (req, res, next) => { } catch (err) { next(err); } }); -// PATCH /playout/channels/:id — update config (only while stopped) router.patch('/channels/:id', requireChannelEdit, async (req, res, next) => { try { if (req.channel.status === 'running') { @@ -213,7 +196,6 @@ router.patch('/channels/:id', requireChannelEdit, async (req, res, next) => { } catch (err) { next(err); } }); -// DELETE /playout/channels/:id router.delete('/channels/:id', requireChannelEdit, async (req, res, next) => { try { if (req.channel.status === 'running') { @@ -224,14 +206,9 @@ router.delete('/channels/:id', requireChannelEdit, async (req, res, next) => { } catch (err) { next(err); } }); -// ── Port-contention guard (DeckLink) ───────────────────────────────────────── -// A DeckLink device on a node is exclusive: an active recorder OR another active -// channel on the same node+index blocks a new SDI channel. NDI/SRT/RTMP have no -// hardware contention. async function assertDeckLinkFree(channel) { if (channel.output_type !== 'decklink') return; const idx = (channel.output_config && channel.output_config.device_index) || 1; - // Another running channel on the same node + device index? const chan = await pool.query( `SELECT id FROM playout_channels WHERE id <> $1 AND node_id IS NOT DISTINCT FROM $2 AND status = 'running' @@ -241,7 +218,6 @@ async function assertDeckLinkFree(channel) { if (chan.rows.length > 0) { throw Object.assign(new Error(`DeckLink device ${idx} already in use by another channel on this node`), { httpStatus: 409 }); } - // An active recorder using the same device index on the same node? const rec = await pool.query( `SELECT id FROM recorders WHERE node_id IS NOT DISTINCT FROM $1 AND device_index = $2 @@ -253,13 +229,6 @@ async function assertDeckLinkFree(channel) { } } -// Spawn the CasparCG sidecar for a channel and flip it to 'running'. Shared by -// the /start route and the scheduler failover path (restartChannel) so neither -// duplicates the docker/node-agent orchestration. Caller is responsible for the -// pre-flight guards (status check, DeckLink contention) appropriate to its path. -// -// On any spawn failure the channel is left status='error' with a message and an -// Error carrying { httpStatus } is thrown. On success returns the updated row. async function spawnChannelSidecar(channel) { await pool.query('UPDATE playout_channels SET status = $1, error_message = NULL WHERE id = $2', ['starting', channel.id]); @@ -268,8 +237,6 @@ async function spawnChannelSidecar(channel) { `OUTPUT_CONFIG=${JSON.stringify(channel.output_config || {})}`, `VIDEO_FORMAT=${channel.video_format}`, `PORT=${SIDECAR_HTTP_PORT}`, - // Drives the HLS preview path (/media/live//index.m3u8) and - // the per-channel resource naming inside the sidecar. `CHANNEL_ID=${channel.id}`, ]; @@ -300,7 +267,6 @@ async function spawnChannelSidecar(channel) { } const data = await sidecarRes.json(); containerId = data.containerId; - // node-agent returns the reachable host:port the shim is published on. if (data.sidecarUrl || data.host) { containerMeta.sidecar_url = data.sidecarUrl || `http://${data.host}:${SIDECAR_HTTP_PORT}`; } @@ -313,7 +279,10 @@ async function spawnChannelSidecar(channel) { Image: PLAYOUT_SIDECAR_IMAGE, Env: env, HostConfig: { - Privileged: true, + // DeckLink SDI needs raw /dev access (privileged). SRT/NDI/RTMP/HLS run + // unprivileged — privileged exposes host GPUs to CasparCG, and the + // missing in-container NVIDIA driver crashes the engine within seconds. + Privileged: channel.output_type === 'decklink', NetworkMode: dockerNetwork, Binds: hostBinds, }, @@ -347,7 +316,6 @@ async function spawnChannelSidecar(channel) { return rows[0]; } -// POST /playout/channels/:id/start — spawn the CasparCG sidecar + bring up output router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) => { try { const channel = req.channel; @@ -363,7 +331,6 @@ router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) => } }); -// POST /playout/channels/:id/stop — tear down the sidecar router.post('/channels/:id/stop', requireChannelEdit, async (req, res, next) => { try { const channel = req.channel; @@ -388,7 +355,6 @@ router.post('/channels/:id/stop', requireChannelEdit, async (req, res, next) => } catch (err) { next(err); } }); -// GET /playout/channels/:id/status — live engine status (proxied to sidecar) router.get('/channels/:id/status', async (req, res, next) => { try { if (req.channel.status !== 'running') { @@ -401,7 +367,6 @@ router.get('/channels/:id/status', async (req, res, next) => { } }); -// ── Transport ──────────────────────────────────────────────────────────────── async function transport(req, res, action, body = null) { if (req.channel.status !== 'running') { return res.status(409).json({ error: 'Channel is not running' }); @@ -410,8 +375,6 @@ async function transport(req, res, action, body = null) { catch (err) { res.status(502).json({ error: err.message }); } } -// POST /playout/channels/:id/play — resolve the channel's playlist, stage-check, -// and hand the engine the ordered list of ready clips. router.post('/channels/:id/play', requireChannelEdit, async (req, res, next) => { try { if (req.channel.status !== 'running') { @@ -458,7 +421,6 @@ router.post('/channels/:id/resume', requireChannelEdit, (req, res) => transport( router.post('/channels/:id/skip', requireChannelEdit, (req, res) => transport(req, res, '/transport/skip')); router.post('/channels/:id/stop-playback', requireChannelEdit, (req, res) => transport(req, res, '/channel/stop')); -// GET /playout/channels/:id/asrun — as-run log router.get('/channels/:id/asrun', async (req, res, next) => { try { const { rows } = await pool.query( @@ -468,10 +430,7 @@ router.get('/channels/:id/asrun', async (req, res, next) => { } catch (err) { next(err); } }); -// ── Playlists ──────────────────────────────────────────────────────────────── async function loadChannelForBody(req, res, next) { - // For playlist/item routes the channel is referenced indirectly; resolve it - // and assert edit. Used on create/mutate routes that carry channel_id. const channelId = req.body.channel_id || req.query.channel_id; if (!channelId) return res.status(400).json({ error: 'channel_id is required' }); try { @@ -483,7 +442,6 @@ async function loadChannelForBody(req, res, next) { } catch (err) { next(err); } } -// GET /playout/playlists?channel_id=... router.get('/playlists', async (req, res, next) => { try { const channelId = req.query.channel_id; @@ -497,7 +455,6 @@ router.get('/playlists', async (req, res, next) => { } catch (err) { next(err); } }); -// POST /playout/playlists router.post('/playlists', loadChannelForBody, async (req, res, next) => { try { const { name, loop = false } = req.body || {}; @@ -509,7 +466,6 @@ router.post('/playlists', loadChannelForBody, async (req, res, next) => { } catch (err) { next(err); } }); -// GET /playout/playlists/:plid/items router.get('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => { try { const pl = await pool.query( @@ -525,7 +481,6 @@ router.get('/playlists/:plid/items', validateUuid('plid'), async (req, res, next } catch (err) { next(err); } }); -// Helper: load a playlist + assert edit on its channel's project. async function loadPlaylistEdit(plid, user) { const pl = await pool.query( `SELECT p.*, c.project_id FROM playout_playlists p @@ -535,7 +490,6 @@ async function loadPlaylistEdit(plid, user) { return pl.rows[0]; } -// POST /playout/playlists/:plid/items — add an asset to a playlist router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => { try { await loadPlaylistEdit(req.params.plid, req.user); @@ -543,7 +497,6 @@ router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, nex transition = 'cut', transition_ms = 0 } = req.body || {}; if (!asset_id) return res.status(400).json({ error: 'asset_id is required' }); - // Append at the end of the playlist. const ord = await pool.query( 'SELECT COALESCE(MAX(sort_order), -1) + 1 AS next FROM playout_items WHERE playlist_id = $1', [req.params.plid]); @@ -552,8 +505,6 @@ router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, nex VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING *`, [req.params.plid, asset_id, ord.rows[0].next, in_point, out_point, transition, transition_ms]); - // Kick staging immediately so the clip is air-ready by the time the operator - // hits play. await stageQueue.add('stage', { itemId: rows[0].id, assetId: asset_id }).catch((e) => console.error('[playout] failed to enqueue stage job:', e.message)); @@ -564,7 +515,6 @@ router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, nex } }); -// PUT /playout/playlists/:plid/reorder — body { order: [itemId, itemId, ...] } router.put('/playlists/:plid/reorder', validateUuid('plid'), async (req, res, next) => { const client = await pool.connect(); try { @@ -586,7 +536,6 @@ router.put('/playlists/:plid/reorder', validateUuid('plid'), async (req, res, ne } finally { client.release(); } }); -// DELETE /playout/items/:itemId router.delete('/items/:itemId', validateUuid('itemId'), async (req, res, next) => { try { const it = await pool.query( @@ -600,7 +549,6 @@ router.delete('/items/:itemId', validateUuid('itemId'), async (req, res, next) = } catch (err) { next(err); } }); -// POST /playout/items/:itemId/stage — (re)kick staging for one item router.post('/items/:itemId/stage', validateUuid('itemId'), async (req, res, next) => { try { const it = await pool.query( @@ -615,13 +563,6 @@ router.post('/items/:itemId/stage', validateUuid('itemId'), async (req, res, nex } catch (err) { next(err); } }); -// ── Failover (called by scheduler tick) ────────────────────────────────────── -// Tear down a (presumed dead) sidecar and re-spawn it on another cluster node -// matching the original capability. DeckLink channels are excluded — the -// device-index pinning makes blind re-placement risky, so they alert only. -// -// Returns { restarted: true, new_node_id } on success, or { restarted: false, -// reason } when no eligible node exists or the channel is decklink. export async function restartChannel(channelId) { const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]); if (rows.length === 0) return { restarted: false, reason: 'channel not found' }; @@ -631,7 +572,6 @@ export async function restartChannel(channelId) { return { restarted: false, reason: 'decklink channels are alert-only' }; } - // Best-effort teardown of the old container — it may already be dead. if (channel.container_id) { const { remote, apiUrl } = await resolveNodeTarget(channel.node_id); if (remote && apiUrl) { @@ -645,9 +585,6 @@ export async function restartChannel(channelId) { } } - // Pick a different healthy node. For NDI/SRT/RTMP every online node is - // eligible (no hardware contention). Prefer the original if it's still - // online — the failure may have been transient. const nodes = await pool.query( `SELECT id, hostname, api_url, last_seen_at FROM cluster_nodes WHERE id <> $1 AND last_seen_at > NOW() - INTERVAL '60 seconds' @@ -663,9 +600,6 @@ export async function restartChannel(channelId) { } const newNodeId = nodes.rows[0].id; - // Move the channel to the new node + bump the restart counters; the operator - // UI surfaces these to flag restarts. container_meta is cleared so the new - // spawn re-derives the sidecar URL. const { rows: moved } = await pool.query( `UPDATE playout_channels SET node_id = $1, status = 'stopped', container_id = NULL, container_meta = '{}'::jsonb, @@ -675,10 +609,6 @@ export async function restartChannel(channelId) { [newNodeId, channel.id] ); - // Spawn the sidecar directly via the shared helper. We do NOT route through - // the HTTP /start endpoint: its guard rejects status 'starting'/'running' and - // would deadlock the failover. spawnChannelSidecar flips the channel to - // running (or leaves it 'error' and throws on spawn failure). try { await spawnChannelSidecar(moved[0]); return { restarted: true, new_node_id: newNodeId };