feat(worker): playout-stage job — S3 → /media + EBU R128 loudnorm

Stages playlist items from S3 to the shared CasparCG media volume. Pass 1
measures, pass 2 applies linear loudnorm (I=-23 LUFS, TP=-1 dBTP, LRA=11);
output is AAC 192k @ 48 kHz, video stream copied. Atomic rename on success
so CasparCG never sees a partial file. Per-item audio_normalized flag means
re-stages of the same asset skip the loudnorm pass.

Wired into worker/src/index.js behind WORKER_QUEUES=playout-stage so
capability-routed deploys can pin it to nodes that already have ffmpeg +
the media mount.
This commit is contained in:
Zac 2026-05-30 13:17:40 +00:00
parent 29187a90df
commit 209f9fda52
2 changed files with 141 additions and 0 deletions

View file

@ -7,6 +7,7 @@ import { conformWorker } from './workers/conform.js';
import { youtubeImportWorker, proxyQueue as youtubeProxyQueue } from './workers/youtube-import.js'; import { youtubeImportWorker, proxyQueue as youtubeProxyQueue } from './workers/youtube-import.js';
import { trimWorker } from './workers/trimWorker.js'; import { trimWorker } from './workers/trimWorker.js';
import { hlsWorker } from './workers/hls.js'; import { hlsWorker } from './workers/hls.js';
import { playoutStageWorker } from './workers/playout-stage.js';
import { startPromotionWorker } from './workers/promotion.js'; import { startPromotionWorker } from './workers/promotion.js';
const parseRedisUrl = (url) => { const parseRedisUrl = (url) => {
@ -94,6 +95,9 @@ const workers = [
lockDuration: 10 * 60 * 1000, lockDuration: 10 * 60 * 1000,
lockRenewTime: 60000, lockRenewTime: 60000,
}), }),
// playout-stage = S3 → /media volume + EBU R128 loudnorm. CPU/IO-bound;
// colocate with workers that already have ffmpeg + the media mount.
want('playout-stage') && createWorker('playout-stage', playoutStageWorker, { concurrency: 1 }),
].filter(Boolean); ].filter(Boolean);
console.log(`WORKER_QUEUES=${_wq || '(all)'}`); console.log(`WORKER_QUEUES=${_wq || '(all)'}`);

View file

@ -0,0 +1,137 @@
import { join, extname } from 'path';
import { mkdir, stat, rename, unlink } from 'fs/promises';
import { spawn } from 'child_process';
import { query } from '../db/client.js';
import { downloadFromS3 } from '../s3/client.js';
// Playout media staging — copy an asset from S3 into the shared CasparCG media
// volume so a playout channel can play it. CasparCG plays from a local folder
// (/media), not from S3, so every playlist item must be staged to 'ready'
// before it can go on air. See docs/superpowers/specs/2026-05-30-playout-mcr-design.md §4.
//
// Two passes:
// 1. download from S3 to /media/playout/<assetId><ext>.raw
// 2. ffmpeg loudnorm (EBU R128, target I=-23 LUFS, TP=-1 dBTP, LRA=11) to the
// final path, then atomic rename. Skipped when items.audio_normalized=true.
//
// The media volume is mounted into BOTH this worker and the playout sidecars at
// the same path (PLAYOUT_MEDIA_DIR, default /media). We stage under a per-asset
// filename so re-staging is idempotent and multiple items referencing the same
// asset share one file.
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
const MEDIA_DIR = process.env.PLAYOUT_MEDIA_DIR || '/media';
async function fileExists(p) {
try { const s = await stat(p); return s.size > 0; } catch { return false; }
}
// Two-pass loudnorm: pass 1 measures, pass 2 applies linear normalization with
// the measured values. Linear mode preserves dynamics at the cost of accuracy
// vs the target — fine for broadcast playout where transparent levels matter
// more than hitting -23 LUFS to the decibel.
function runFfmpeg(args) {
return new Promise((resolve, reject) => {
const proc = spawn('ffmpeg', args, { stdio: ['ignore', 'pipe', 'pipe'] });
let stderr = '';
proc.stderr.on('data', (d) => { stderr += d.toString(); });
proc.on('error', reject);
proc.on('close', (code) => {
if (code === 0) resolve(stderr);
else reject(new Error(`ffmpeg exited ${code}: ${stderr.slice(-500)}`));
});
});
}
async function measureLoudness(inputPath) {
// -23 / -1 / 11 are the EBU R128 broadcast targets; loudnorm prints a JSON
// block to stderr after the analysis pass which feeds pass 2's measured_*
// params.
const stderr = await runFfmpeg([
'-hide_banner', '-nostats', '-i', inputPath,
'-af', 'loudnorm=I=-23:TP=-1:LRA=11:print_format=json',
'-f', 'null', '-',
]);
const match = stderr.match(/\{[\s\S]*?"input_i"[\s\S]*?\}/);
if (!match) throw new Error('loudnorm pass 1 produced no JSON');
return JSON.parse(match[0]);
}
async function applyLoudnorm(inputPath, outputPath, m) {
// Pass 2: linear normalization using pass 1's measurements. -c:v copy keeps
// the video stream intact so we only re-encode audio (target AAC stereo, the
// common-denominator CasparCG ffmpeg producer accepts).
await runFfmpeg([
'-hide_banner', '-nostats', '-y', '-i', inputPath,
'-af', `loudnorm=I=-23:TP=-1:LRA=11:measured_I=${m.input_i}:measured_TP=${m.input_tp}:measured_LRA=${m.input_lra}:measured_thresh=${m.input_thresh}:offset=${m.target_offset}:linear=true:print_format=summary`,
'-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k', '-ar', '48000',
outputPath,
]);
}
export async function playoutStageWorker(job) {
const { itemId, assetId } = job.data;
if (!itemId || !assetId) throw new Error('playout-stage requires itemId + assetId');
await query("UPDATE playout_items SET media_status = 'staging', updated_at = NOW() WHERE id = $1", [itemId]);
try {
const a = await query(
'SELECT id, filename, original_s3_key, proxy_s3_key FROM assets WHERE id = $1', [assetId]);
if (a.rows.length === 0) throw new Error(`asset ${assetId} not found`);
const asset = a.rows[0];
// Prefer the master for air quality; fall back to proxy if no master key.
const s3Key = asset.original_s3_key || asset.proxy_s3_key;
if (!s3Key) throw new Error(`asset ${assetId} has no S3 media key to stage`);
const ext = extname(s3Key) || extname(asset.filename || '') || '.mp4';
// Stable per-asset path under the media volume; CasparCG resolves the token
// "playout/<assetId>" against MEDIA_DIR.
const relDir = 'playout';
const fileName = `${assetId}${ext}`;
const absDir = join(MEDIA_DIR, relDir);
const absPath = join(absDir, fileName);
const mediaPath = join(MEDIA_DIR, relDir, fileName);
await mkdir(absDir, { recursive: true });
// Skip the whole pipeline when the final file already exists from a prior
// stage of the same asset. The audio_normalized flag is per-item so a
// second item referencing the same staged file gets flipped to true below.
const itemRow = await query('SELECT audio_normalized FROM playout_items WHERE id = $1', [itemId]);
const alreadyNormalized = itemRow.rows[0]?.audio_normalized === true;
if (!(await fileExists(absPath))) {
const rawPath = `${absPath}.raw${ext}`;
console.log(`[playout-stage] downloading ${s3Key} -> ${rawPath}`);
await downloadFromS3(S3_BUCKET, s3Key, rawPath);
if (alreadyNormalized) {
// Asset was previously normalized for another item — keep the bytes
// as-is. Atomic rename so CasparCG never sees a partial file.
await rename(rawPath, absPath);
} else {
console.log(`[playout-stage] loudnorm pass 1: ${rawPath}`);
const measured = await measureLoudness(rawPath);
const tmpOut = `${absPath}.tmp${ext}`;
console.log(`[playout-stage] loudnorm pass 2: I=${measured.input_i} TP=${measured.input_tp} -> ${tmpOut}`);
await applyLoudnorm(rawPath, tmpOut, measured);
await rename(tmpOut, absPath);
await unlink(rawPath).catch(() => {});
}
} else {
console.log(`[playout-stage] already staged: ${absPath}`);
}
await query(
"UPDATE playout_items SET media_status = 'ready', media_path = $1, audio_normalized = TRUE, updated_at = NOW() WHERE id = $2",
[mediaPath, itemId]);
console.log(`[playout-stage] item ${itemId} ready at ${mediaPath}`);
return { itemId, mediaPath };
} catch (err) {
await query("UPDATE playout_items SET media_status = 'error', updated_at = NOW() WHERE id = $1", [itemId])
.catch(() => {});
throw err;
}
}