From 209f9fda523305bbb33f39f475f1937218b49ce9 Mon Sep 17 00:00:00 2001 From: Zac Date: Sat, 30 May 2026 13:17:40 +0000 Subject: [PATCH] =?UTF-8?q?feat(worker):=20playout-stage=20job=20=E2=80=94?= =?UTF-8?q?=20S3=20=E2=86=92=20/media=20+=20EBU=20R128=20loudnorm?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stages playlist items from S3 to the shared CasparCG media volume. Pass 1 measures, pass 2 applies linear loudnorm (I=-23 LUFS, TP=-1 dBTP, LRA=11); output is AAC 192k @ 48 kHz, video stream copied. Atomic rename on success so CasparCG never sees a partial file. Per-item audio_normalized flag means re-stages of the same asset skip the loudnorm pass. Wired into worker/src/index.js behind WORKER_QUEUES=playout-stage so capability-routed deploys can pin it to nodes that already have ffmpeg + the media mount. --- services/worker/src/index.js | 4 + services/worker/src/workers/playout-stage.js | 137 +++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 services/worker/src/workers/playout-stage.js diff --git a/services/worker/src/index.js b/services/worker/src/index.js index 9152e76..31476a4 100644 --- a/services/worker/src/index.js +++ b/services/worker/src/index.js @@ -7,6 +7,7 @@ import { conformWorker } from './workers/conform.js'; import { youtubeImportWorker, proxyQueue as youtubeProxyQueue } from './workers/youtube-import.js'; import { trimWorker } from './workers/trimWorker.js'; import { hlsWorker } from './workers/hls.js'; +import { playoutStageWorker } from './workers/playout-stage.js'; import { startPromotionWorker } from './workers/promotion.js'; const parseRedisUrl = (url) => { @@ -94,6 +95,9 @@ const workers = [ lockDuration: 10 * 60 * 1000, lockRenewTime: 60000, }), + // playout-stage = S3 → /media volume + EBU R128 loudnorm. CPU/IO-bound; + // colocate with workers that already have ffmpeg + the media mount. + want('playout-stage') && createWorker('playout-stage', playoutStageWorker, { concurrency: 1 }), ].filter(Boolean); console.log(`WORKER_QUEUES=${_wq || '(all)'}`); diff --git a/services/worker/src/workers/playout-stage.js b/services/worker/src/workers/playout-stage.js new file mode 100644 index 0000000..b65066a --- /dev/null +++ b/services/worker/src/workers/playout-stage.js @@ -0,0 +1,137 @@ +import { join, extname } from 'path'; +import { mkdir, stat, rename, unlink } from 'fs/promises'; +import { spawn } from 'child_process'; +import { query } from '../db/client.js'; +import { downloadFromS3 } from '../s3/client.js'; + +// Playout media staging — copy an asset from S3 into the shared CasparCG media +// volume so a playout channel can play it. CasparCG plays from a local folder +// (/media), not from S3, so every playlist item must be staged to 'ready' +// before it can go on air. See docs/superpowers/specs/2026-05-30-playout-mcr-design.md §4. +// +// Two passes: +// 1. download from S3 to /media/playout/.raw +// 2. ffmpeg loudnorm (EBU R128, target I=-23 LUFS, TP=-1 dBTP, LRA=11) to the +// final path, then atomic rename. Skipped when items.audio_normalized=true. +// +// The media volume is mounted into BOTH this worker and the playout sidecars at +// the same path (PLAYOUT_MEDIA_DIR, default /media). We stage under a per-asset +// filename so re-staging is idempotent and multiple items referencing the same +// asset share one file. + +const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; +const MEDIA_DIR = process.env.PLAYOUT_MEDIA_DIR || '/media'; + +async function fileExists(p) { + try { const s = await stat(p); return s.size > 0; } catch { return false; } +} + +// Two-pass loudnorm: pass 1 measures, pass 2 applies linear normalization with +// the measured values. Linear mode preserves dynamics at the cost of accuracy +// vs the target — fine for broadcast playout where transparent levels matter +// more than hitting -23 LUFS to the decibel. +function runFfmpeg(args) { + return new Promise((resolve, reject) => { + const proc = spawn('ffmpeg', args, { stdio: ['ignore', 'pipe', 'pipe'] }); + let stderr = ''; + proc.stderr.on('data', (d) => { stderr += d.toString(); }); + proc.on('error', reject); + proc.on('close', (code) => { + if (code === 0) resolve(stderr); + else reject(new Error(`ffmpeg exited ${code}: ${stderr.slice(-500)}`)); + }); + }); +} + +async function measureLoudness(inputPath) { + // -23 / -1 / 11 are the EBU R128 broadcast targets; loudnorm prints a JSON + // block to stderr after the analysis pass which feeds pass 2's measured_* + // params. + const stderr = await runFfmpeg([ + '-hide_banner', '-nostats', '-i', inputPath, + '-af', 'loudnorm=I=-23:TP=-1:LRA=11:print_format=json', + '-f', 'null', '-', + ]); + const match = stderr.match(/\{[\s\S]*?"input_i"[\s\S]*?\}/); + if (!match) throw new Error('loudnorm pass 1 produced no JSON'); + return JSON.parse(match[0]); +} + +async function applyLoudnorm(inputPath, outputPath, m) { + // Pass 2: linear normalization using pass 1's measurements. -c:v copy keeps + // the video stream intact so we only re-encode audio (target AAC stereo, the + // common-denominator CasparCG ffmpeg producer accepts). + await runFfmpeg([ + '-hide_banner', '-nostats', '-y', '-i', inputPath, + '-af', `loudnorm=I=-23:TP=-1:LRA=11:measured_I=${m.input_i}:measured_TP=${m.input_tp}:measured_LRA=${m.input_lra}:measured_thresh=${m.input_thresh}:offset=${m.target_offset}:linear=true:print_format=summary`, + '-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k', '-ar', '48000', + outputPath, + ]); +} + +export async function playoutStageWorker(job) { + const { itemId, assetId } = job.data; + if (!itemId || !assetId) throw new Error('playout-stage requires itemId + assetId'); + + await query("UPDATE playout_items SET media_status = 'staging', updated_at = NOW() WHERE id = $1", [itemId]); + + try { + const a = await query( + 'SELECT id, filename, original_s3_key, proxy_s3_key FROM assets WHERE id = $1', [assetId]); + if (a.rows.length === 0) throw new Error(`asset ${assetId} not found`); + const asset = a.rows[0]; + + // Prefer the master for air quality; fall back to proxy if no master key. + const s3Key = asset.original_s3_key || asset.proxy_s3_key; + if (!s3Key) throw new Error(`asset ${assetId} has no S3 media key to stage`); + + const ext = extname(s3Key) || extname(asset.filename || '') || '.mp4'; + // Stable per-asset path under the media volume; CasparCG resolves the token + // "playout/" against MEDIA_DIR. + const relDir = 'playout'; + const fileName = `${assetId}${ext}`; + const absDir = join(MEDIA_DIR, relDir); + const absPath = join(absDir, fileName); + const mediaPath = join(MEDIA_DIR, relDir, fileName); + + await mkdir(absDir, { recursive: true }); + + // Skip the whole pipeline when the final file already exists from a prior + // stage of the same asset. The audio_normalized flag is per-item so a + // second item referencing the same staged file gets flipped to true below. + const itemRow = await query('SELECT audio_normalized FROM playout_items WHERE id = $1', [itemId]); + const alreadyNormalized = itemRow.rows[0]?.audio_normalized === true; + + if (!(await fileExists(absPath))) { + const rawPath = `${absPath}.raw${ext}`; + console.log(`[playout-stage] downloading ${s3Key} -> ${rawPath}`); + await downloadFromS3(S3_BUCKET, s3Key, rawPath); + + if (alreadyNormalized) { + // Asset was previously normalized for another item — keep the bytes + // as-is. Atomic rename so CasparCG never sees a partial file. + await rename(rawPath, absPath); + } else { + console.log(`[playout-stage] loudnorm pass 1: ${rawPath}`); + const measured = await measureLoudness(rawPath); + const tmpOut = `${absPath}.tmp${ext}`; + console.log(`[playout-stage] loudnorm pass 2: I=${measured.input_i} TP=${measured.input_tp} -> ${tmpOut}`); + await applyLoudnorm(rawPath, tmpOut, measured); + await rename(tmpOut, absPath); + await unlink(rawPath).catch(() => {}); + } + } else { + console.log(`[playout-stage] already staged: ${absPath}`); + } + + await query( + "UPDATE playout_items SET media_status = 'ready', media_path = $1, audio_normalized = TRUE, updated_at = NOW() WHERE id = $2", + [mediaPath, itemId]); + console.log(`[playout-stage] item ${itemId} ready at ${mediaPath}`); + return { itemId, mediaPath }; + } catch (err) { + await query("UPDATE playout_items SET media_status = 'error', updated_at = NOW() WHERE id = $1", [itemId]) + .catch(() => {}); + throw err; + } +}