dragonflight/services/worker/src/workers/proxy.js
ZGaetano c48c7e6d7d feat(audio-tab): full audio track inspector with meters, mute/solo, faders
Issue #80 — replaces the stub AudioTab (two static waveforms) with a
broadcast-ops-grade audio panel:

- DB: add audio_metadata JSONB column to assets (migration 022)
- Worker: getMediaInfo now extracts per-stream audio metadata
  (codec, channels, channel_layout, sample_rate, bit_depth, bit_rate,
  language, title, disposition)
- Worker: proxy job persists audio_metadata into the assets row
- API: new GET /assets/:id/audio returns structured track list
- Frontend AudioTab: per-track rows with:
  - Track name/index with language badge
  - SVG waveform per track (color-coded)
  - L/R level meters via Web Audio API AnalyserNode
  - Per-track metadata row (codec, layout, sample rate, bit depth, bitrate)
  - Mute / Solo buttons with proper solo-logic
  - Per-track volume fader
  - Master section with summed L/R meters and master fader
- MetadataTab: show audio track summary when audio_metadata present
- CSS: full audio-tab layout, responsive collapse at 900px
2026-05-27 04:53:52 +00:00

252 lines
10 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { join } from 'path';
import { stat, unlink } from 'fs/promises';
import { tmpdir } from 'os';
import { Queue } from 'bullmq';
import { query } from '../db/client.js';
import { downloadFromS3, uploadToS3 } from '../s3/client.js';
import { transcodeVideo, transcodeImage, getMediaInfo, isHwCodec } from '../ffmpeg/executor.js';
// Read the global proxy-encoder settings from the DB. These are written by
// Settings → Proxy encoding in the GUI. Falls back to libx264 defaults if
// nothing is configured (first-run / fresh install).
async function loadProxyEncodingSettings() {
const result = await query(
`SELECT key, value FROM settings
WHERE key IN ('gpu_transcode_enabled','gpu_codec','gpu_preset','gpu_bitrate_mbps',
'gpu_rc_mode','gpu_audio_codec','gpu_audio_bitrate_kbps')`
);
const map = {};
for (const { key, value } of result.rows) map[key] = value;
const gpuEnabled = map.gpu_transcode_enabled === 'true';
const codec = map.gpu_codec || (gpuEnabled ? 'h264_nvenc' : 'libx264');
const preset = map.gpu_preset || (gpuEnabled ? 'p4' : 'fast');
const rcMode = map.gpu_rc_mode || null;
const audioCodec = map.gpu_audio_codec || 'aac';
const audioKbps = parseInt(map.gpu_audio_bitrate_kbps || '128', 10);
// VBR 500k1M: target average 750k, hard cap 1M, buffer 2M.
// libx264 ABR mode — quality varies per-scene within the envelope.
// These are stored in the DB as the bitrate field; min/max derived from it.
const bitrateM = parseFloat(map.gpu_bitrate_mbps || '0.75');
const targetBps = Math.round(bitrateM * 1000); // kbps
const minKbps = Math.round(targetBps * 0.5); // 50% of target
const maxKbps = Math.round(targetBps * 1.33); // 133% of target, capped at ~1M for 750k target
const bufKbps = maxKbps * 2; // 2× maxrate recommended
return {
videoCodec: codec,
videoPreset: preset,
videoBitrate: `${targetBps}k`,
videoMinRate: `${minKbps}k`,
videoMaxRate: `${maxKbps}k`,
videoBufSize: `${bufKbps}k`,
rateControl: rcMode,
audioCodec,
audioBitrate: `${audioKbps}k`,
_gpu: gpuEnabled && isHwCodec(codec),
};
}
// Codec names ffprobe reports for still-image inputs. These bypass the video
// transcode entirely — see proxyWorker below.
const IMAGE_CODECS = new Set([
'png', 'mjpeg', 'jpeg', 'webp', 'gif', 'tiff', 'bmp', 'jpegls', 'svg',
]);
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
// BUG FIX #7: Keep thumbnailQueue as a module-level singleton so it is only
// opened once and can be closed during SIGTERM (via the exported closer).
// Previously a new Queue was created inside the worker function; BullMQ Queue
// instances hold an open Redis connection that prevents clean shutdown.
const parseRedisUrl = (url) => {
const parsed = new URL(url);
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
};
export const thumbnailQueue = new Queue('thumbnail', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
export const proxyWorker = async (job) => {
const { assetId, inputKey, outputKey } = job.data;
const tmpDir = tmpdir();
const inputPath = join(tmpDir, `proxy-input-${job.id}.mov`);
const outputPath = join(tmpDir, `proxy-output-${job.id}.mp4`);
try {
// Download original from S3
await job.updateProgress(10);
console.log(`[proxy] Downloading ${inputKey} for asset ${assetId}`);
await downloadFromS3(S3_BUCKET, inputKey, inputPath);
// Look up the asset row early — we want media_type before deciding how
// to process. That lets us route 'image' assets to the poster path even
// when ffprobe doesn't return a codec name in IMAGE_CODECS (e.g. future
// formats like AVIF / HEIF / JPEG-XL).
const assetRow = await query(
'SELECT media_type FROM assets WHERE id = $1',
[assetId]
);
const dbMediaType = assetRow.rows[0]?.media_type || null;
// Reject obviously-empty inputs before handing them to ffmpeg. Aborted
// SRT/RTMP recordings end up as 0-byte (or ftyp-only ~1KB) objects in S3
// when the source disconnects before any frame is received; the proxy
// pipeline used to bomb on "moov atom not found", which buried the
// real reason. Bail with a clear message and let the asset go to 'error'.
//
// Skip this check for image assets — a single PNG icon can legitimately
// be a few hundred bytes.
const { size: inputBytes } = await stat(inputPath);
if (dbMediaType !== 'image' && inputBytes < 4096) {
throw new Error(
`Source is empty or truncated (${inputBytes} bytes). The recording ` +
`likely ended before any frames were received — check the source ` +
`URL / SDI signal and re-record.`
);
}
// Extract source metadata (fps, codec, resolution, duration, size, audio)
await job.updateProgress(20);
let mediaInfo = {};
let hasAudio = true;
try {
mediaInfo = await getMediaInfo(inputPath);
hasAudio = !!mediaInfo.hasAudio;
console.log(`[proxy] Metadata for ${assetId}: ${JSON.stringify(mediaInfo)}`);
} catch (err) {
console.warn(`[proxy] getMediaInfo failed for ${assetId}: ${err.message}`);
}
// Still images skip the video proxy — they have no temporal stream and
// x264 with a one-frame PNG input fails (Could not open encoder before EOF).
// Generate a scaled JPEG poster instead; the thumbnail job will downsize it.
//
// We treat the input as an image if EITHER the DB says so (media_type =
// 'image', set by upload.js based on Content-Type or extension), OR
// ffprobe reports a codec we know is a still-image format.
const codecLower = mediaInfo.codec ? mediaInfo.codec.toLowerCase() : null;
const isImage = dbMediaType === 'image' || (codecLower && IMAGE_CODECS.has(codecLower));
if (isImage) {
const imageOutputKey = outputKey.replace(/\.mp4$/, '.jpg');
const imageOutputPath = outputPath.replace(/\.mp4$/, '.jpg');
console.log(`[proxy] Image asset ${assetId} (codec=${codecLower || 'unknown'}, db_media_type=${dbMediaType}) — emitting poster instead of video proxy`);
await job.updateProgress(40);
await transcodeImage(inputPath, imageOutputPath);
await job.updateProgress(70);
await uploadToS3(S3_BUCKET, imageOutputKey, imageOutputPath);
await job.updateProgress(90);
await query(
`UPDATE assets
SET thumbnail_s3_key = $1,
proxy_s3_key = NULL,
resolution = COALESCE($2, resolution),
file_size = COALESCE($3, file_size),
status = 'ready',
updated_at = NOW()
WHERE id = $4`,
[imageOutputKey, mediaInfo.resolution ?? null, mediaInfo.fileSizeBytes ?? null, assetId]
);
await unlink(imageOutputPath).catch(() => {});
await job.updateProgress(100);
return { assetId, outputKey: imageOutputKey };
}
// Empty/truncated capture: probe returned a video stream but ffmpeg can't
// read any frames. Bail with a clear message instead of dumping ~3KB of
// ffmpeg stderr into the failed-jobs list.
if (mediaInfo.durationMs === null && mediaInfo.codec) {
throw new Error(
`Empty or truncated source: codec=${mediaInfo.codec}, ` +
`resolution=${mediaInfo.resolution || 'unknown'}, no readable frames.`
);
}
// Transcode using the operator-configured encoder. The proxy worker is
// the only consumer of these settings; recorders manage their own codecs.
await job.updateProgress(30);
const encSettings = await loadProxyEncodingSettings();
console.log(
`[proxy] Transcoding asset ${assetId} via ${encSettings._gpu ? 'GPU' : 'CPU'} ` +
`(${encSettings.videoCodec} ${encSettings.videoPreset} VBR ${encSettings.videoMinRate}-${encSettings.videoMaxRate} avg=${encSettings.videoBitrate})`
);
try {
await transcodeVideo(inputPath, outputPath, { ...encSettings, hasAudio });
} catch (err) {
if (encSettings._gpu) {
console.warn(`[proxy] GPU encode failed (${err.message}); falling back to libx264`);
await transcodeVideo(inputPath, outputPath, {
videoCodec: 'libx264', videoPreset: 'fast',
videoBitrate: encSettings.videoBitrate,
videoMinRate: encSettings.videoMinRate,
videoMaxRate: encSettings.videoMaxRate,
videoBufSize: encSettings.videoBufSize,
audioCodec: encSettings.audioCodec,
audioBitrate: encSettings.audioBitrate,
hasAudio,
});
} else { throw err; }
}
// Upload proxy to S3
await job.updateProgress(70);
console.log(`[proxy] Uploading to ${outputKey}`);
await uploadToS3(S3_BUCKET, outputKey, outputPath);
// Update asset record — store extracted metadata + proxy key
// Use COALESCE so we never overwrite fields the capture service already set
await job.updateProgress(90);
await query(
`UPDATE assets
SET proxy_s3_key = $1,
fps = COALESCE($2, fps),
codec = COALESCE($3, codec),
resolution = COALESCE($4, resolution),
duration_ms = COALESCE($5, duration_ms),
file_size = COALESCE($6, file_size),
audio_metadata = COALESCE($8, audio_metadata),
updated_at = NOW()
WHERE id = $7`,
[
outputKey,
mediaInfo.fps ?? null,
mediaInfo.codec ?? null,
mediaInfo.resolution ?? null,
mediaInfo.durationMs ?? null,
mediaInfo.fileSizeBytes ?? null,
assetId,
mediaInfo.audioMetadata ? JSON.stringify(mediaInfo.audioMetadata) : null,
]
);
// Now proxy exists in S3 — safe to queue thumbnail generation
const thumbnailKey = `thumbnails/${assetId}.jpg`;
await thumbnailQueue.add('generate', {
assetId,
proxyKey: outputKey,
outputKey: thumbnailKey,
});
console.log(`[proxy] Asset ${assetId} proxy complete, thumbnail job queued`);
await job.updateProgress(100);
return { assetId, outputKey };
} catch (error) {
console.error(`[proxy] Error processing asset ${assetId}:`, error);
await query(
`UPDATE assets SET status = 'error', updated_at = NOW()
WHERE id = $1 AND status NOT IN ('live', 'ingesting')`,
[assetId]
);
throw error;
} finally {
await Promise.all([
unlink(inputPath).catch(() => {}),
unlink(outputPath).catch(() => {}),
]);
}
};