executor.js: - transcodeVideo() now accepts videoMinRate, videoMaxRate, videoBufSize - When set, passes -minrate/-maxrate/-bufsize to FFmpeg for ABR/VBR mode - libx264 operates with per-scene quality variation within the envelope proxy.js: - Target average: 750k (gpu_bitrate_mbps=0.75) - Min: 375k (50% of target), Max: 998k (~133%), Buffer: 2× max - Gives effective range of ~500k-1M depending on scene complexity - Log now shows VBR min-max-avg - GPU fallback also passes VBR params - Default videoBitrate changed from 10M to 750k in executor.js
250 lines
10 KiB
JavaScript
250 lines
10 KiB
JavaScript
import { join } from 'path';
|
||
import { stat, unlink } from 'fs/promises';
|
||
import { tmpdir } from 'os';
|
||
import { Queue } from 'bullmq';
|
||
import { query } from '../db/client.js';
|
||
import { downloadFromS3, uploadToS3 } from '../s3/client.js';
|
||
import { transcodeVideo, transcodeImage, getMediaInfo, isHwCodec } from '../ffmpeg/executor.js';
|
||
|
||
// Read the global proxy-encoder settings from the DB. These are written by
|
||
// Settings → Proxy encoding in the GUI. Falls back to libx264 defaults if
|
||
// nothing is configured (first-run / fresh install).
|
||
async function loadProxyEncodingSettings() {
|
||
const result = await query(
|
||
`SELECT key, value FROM settings
|
||
WHERE key IN ('gpu_transcode_enabled','gpu_codec','gpu_preset','gpu_bitrate_mbps',
|
||
'gpu_rc_mode','gpu_audio_codec','gpu_audio_bitrate_kbps')`
|
||
);
|
||
const map = {};
|
||
for (const { key, value } of result.rows) map[key] = value;
|
||
|
||
const gpuEnabled = map.gpu_transcode_enabled === 'true';
|
||
const codec = map.gpu_codec || (gpuEnabled ? 'h264_nvenc' : 'libx264');
|
||
const preset = map.gpu_preset || (gpuEnabled ? 'p4' : 'fast');
|
||
const rcMode = map.gpu_rc_mode || null;
|
||
const audioCodec = map.gpu_audio_codec || 'aac';
|
||
const audioKbps = parseInt(map.gpu_audio_bitrate_kbps || '128', 10);
|
||
|
||
// VBR 500k–1M: target average 750k, hard cap 1M, buffer 2M.
|
||
// libx264 ABR mode — quality varies per-scene within the envelope.
|
||
// These are stored in the DB as the bitrate field; min/max derived from it.
|
||
const bitrateM = parseFloat(map.gpu_bitrate_mbps || '0.75');
|
||
const targetBps = Math.round(bitrateM * 1000); // kbps
|
||
const minKbps = Math.round(targetBps * 0.5); // 50% of target
|
||
const maxKbps = Math.round(targetBps * 1.33); // 133% of target, capped at ~1M for 750k target
|
||
const bufKbps = maxKbps * 2; // 2× maxrate recommended
|
||
|
||
return {
|
||
videoCodec: codec,
|
||
videoPreset: preset,
|
||
videoBitrate: `${targetBps}k`,
|
||
videoMinRate: `${minKbps}k`,
|
||
videoMaxRate: `${maxKbps}k`,
|
||
videoBufSize: `${bufKbps}k`,
|
||
rateControl: rcMode,
|
||
audioCodec,
|
||
audioBitrate: `${audioKbps}k`,
|
||
_gpu: gpuEnabled && isHwCodec(codec),
|
||
};
|
||
}
|
||
|
||
// Codec names ffprobe reports for still-image inputs. These bypass the video
|
||
// transcode entirely — see proxyWorker below.
|
||
const IMAGE_CODECS = new Set([
|
||
'png', 'mjpeg', 'jpeg', 'webp', 'gif', 'tiff', 'bmp', 'jpegls', 'svg',
|
||
]);
|
||
|
||
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
|
||
|
||
// BUG FIX #7: Keep thumbnailQueue as a module-level singleton so it is only
|
||
// opened once and can be closed during SIGTERM (via the exported closer).
|
||
// Previously a new Queue was created inside the worker function; BullMQ Queue
|
||
// instances hold an open Redis connection that prevents clean shutdown.
|
||
const parseRedisUrl = (url) => {
|
||
const parsed = new URL(url);
|
||
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
|
||
};
|
||
|
||
export const thumbnailQueue = new Queue('thumbnail', {
|
||
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
||
});
|
||
|
||
export const proxyWorker = async (job) => {
|
||
const { assetId, inputKey, outputKey } = job.data;
|
||
|
||
const tmpDir = tmpdir();
|
||
const inputPath = join(tmpDir, `proxy-input-${job.id}.mov`);
|
||
const outputPath = join(tmpDir, `proxy-output-${job.id}.mp4`);
|
||
|
||
try {
|
||
// Download original from S3
|
||
await job.updateProgress(10);
|
||
console.log(`[proxy] Downloading ${inputKey} for asset ${assetId}`);
|
||
await downloadFromS3(S3_BUCKET, inputKey, inputPath);
|
||
|
||
// Look up the asset row early — we want media_type before deciding how
|
||
// to process. That lets us route 'image' assets to the poster path even
|
||
// when ffprobe doesn't return a codec name in IMAGE_CODECS (e.g. future
|
||
// formats like AVIF / HEIF / JPEG-XL).
|
||
const assetRow = await query(
|
||
'SELECT media_type FROM assets WHERE id = $1',
|
||
[assetId]
|
||
);
|
||
const dbMediaType = assetRow.rows[0]?.media_type || null;
|
||
|
||
// Reject obviously-empty inputs before handing them to ffmpeg. Aborted
|
||
// SRT/RTMP recordings end up as 0-byte (or ftyp-only ~1KB) objects in S3
|
||
// when the source disconnects before any frame is received; the proxy
|
||
// pipeline used to bomb on "moov atom not found", which buried the
|
||
// real reason. Bail with a clear message and let the asset go to 'error'.
|
||
//
|
||
// Skip this check for image assets — a single PNG icon can legitimately
|
||
// be a few hundred bytes.
|
||
const { size: inputBytes } = await stat(inputPath);
|
||
if (dbMediaType !== 'image' && inputBytes < 4096) {
|
||
throw new Error(
|
||
`Source is empty or truncated (${inputBytes} bytes). The recording ` +
|
||
`likely ended before any frames were received — check the source ` +
|
||
`URL / SDI signal and re-record.`
|
||
);
|
||
}
|
||
|
||
// Extract source metadata (fps, codec, resolution, duration, size, audio)
|
||
await job.updateProgress(20);
|
||
let mediaInfo = {};
|
||
let hasAudio = true;
|
||
try {
|
||
mediaInfo = await getMediaInfo(inputPath);
|
||
hasAudio = !!mediaInfo.hasAudio;
|
||
console.log(`[proxy] Metadata for ${assetId}: ${JSON.stringify(mediaInfo)}`);
|
||
} catch (err) {
|
||
console.warn(`[proxy] getMediaInfo failed for ${assetId}: ${err.message}`);
|
||
}
|
||
|
||
// Still images skip the video proxy — they have no temporal stream and
|
||
// x264 with a one-frame PNG input fails (Could not open encoder before EOF).
|
||
// Generate a scaled JPEG poster instead; the thumbnail job will downsize it.
|
||
//
|
||
// We treat the input as an image if EITHER the DB says so (media_type =
|
||
// 'image', set by upload.js based on Content-Type or extension), OR
|
||
// ffprobe reports a codec we know is a still-image format.
|
||
const codecLower = mediaInfo.codec ? mediaInfo.codec.toLowerCase() : null;
|
||
const isImage = dbMediaType === 'image' || (codecLower && IMAGE_CODECS.has(codecLower));
|
||
|
||
if (isImage) {
|
||
const imageOutputKey = outputKey.replace(/\.mp4$/, '.jpg');
|
||
const imageOutputPath = outputPath.replace(/\.mp4$/, '.jpg');
|
||
console.log(`[proxy] Image asset ${assetId} (codec=${codecLower || 'unknown'}, db_media_type=${dbMediaType}) — emitting poster instead of video proxy`);
|
||
await job.updateProgress(40);
|
||
await transcodeImage(inputPath, imageOutputPath);
|
||
await job.updateProgress(70);
|
||
await uploadToS3(S3_BUCKET, imageOutputKey, imageOutputPath);
|
||
await job.updateProgress(90);
|
||
await query(
|
||
`UPDATE assets
|
||
SET thumbnail_s3_key = $1,
|
||
proxy_s3_key = NULL,
|
||
resolution = COALESCE($2, resolution),
|
||
file_size = COALESCE($3, file_size),
|
||
status = 'ready',
|
||
updated_at = NOW()
|
||
WHERE id = $4`,
|
||
[imageOutputKey, mediaInfo.resolution ?? null, mediaInfo.fileSizeBytes ?? null, assetId]
|
||
);
|
||
await unlink(imageOutputPath).catch(() => {});
|
||
await job.updateProgress(100);
|
||
return { assetId, outputKey: imageOutputKey };
|
||
}
|
||
|
||
// Empty/truncated capture: probe returned a video stream but ffmpeg can't
|
||
// read any frames. Bail with a clear message instead of dumping ~3KB of
|
||
// ffmpeg stderr into the failed-jobs list.
|
||
if (mediaInfo.durationMs === null && mediaInfo.codec) {
|
||
throw new Error(
|
||
`Empty or truncated source: codec=${mediaInfo.codec}, ` +
|
||
`resolution=${mediaInfo.resolution || 'unknown'}, no readable frames.`
|
||
);
|
||
}
|
||
|
||
// Transcode using the operator-configured encoder. The proxy worker is
|
||
// the only consumer of these settings; recorders manage their own codecs.
|
||
await job.updateProgress(30);
|
||
const encSettings = await loadProxyEncodingSettings();
|
||
console.log(
|
||
`[proxy] Transcoding asset ${assetId} via ${encSettings._gpu ? 'GPU' : 'CPU'} ` +
|
||
`(${encSettings.videoCodec} ${encSettings.videoPreset} VBR ${encSettings.videoMinRate}-${encSettings.videoMaxRate} avg=${encSettings.videoBitrate})`
|
||
);
|
||
try {
|
||
await transcodeVideo(inputPath, outputPath, { ...encSettings, hasAudio });
|
||
} catch (err) {
|
||
if (encSettings._gpu) {
|
||
console.warn(`[proxy] GPU encode failed (${err.message}); falling back to libx264`);
|
||
await transcodeVideo(inputPath, outputPath, {
|
||
videoCodec: 'libx264', videoPreset: 'fast',
|
||
videoBitrate: encSettings.videoBitrate,
|
||
videoMinRate: encSettings.videoMinRate,
|
||
videoMaxRate: encSettings.videoMaxRate,
|
||
videoBufSize: encSettings.videoBufSize,
|
||
audioCodec: encSettings.audioCodec,
|
||
audioBitrate: encSettings.audioBitrate,
|
||
hasAudio,
|
||
});
|
||
} else { throw err; }
|
||
}
|
||
|
||
// Upload proxy to S3
|
||
await job.updateProgress(70);
|
||
console.log(`[proxy] Uploading to ${outputKey}`);
|
||
await uploadToS3(S3_BUCKET, outputKey, outputPath);
|
||
|
||
// Update asset record — store extracted metadata + proxy key
|
||
// Use COALESCE so we never overwrite fields the capture service already set
|
||
await job.updateProgress(90);
|
||
await query(
|
||
`UPDATE assets
|
||
SET proxy_s3_key = $1,
|
||
fps = COALESCE($2, fps),
|
||
codec = COALESCE($3, codec),
|
||
resolution = COALESCE($4, resolution),
|
||
duration_ms = COALESCE($5, duration_ms),
|
||
file_size = COALESCE($6, file_size),
|
||
updated_at = NOW()
|
||
WHERE id = $7`,
|
||
[
|
||
outputKey,
|
||
mediaInfo.fps ?? null,
|
||
mediaInfo.codec ?? null,
|
||
mediaInfo.resolution ?? null,
|
||
mediaInfo.durationMs ?? null,
|
||
mediaInfo.fileSizeBytes ?? null,
|
||
assetId,
|
||
]
|
||
);
|
||
|
||
// Now proxy exists in S3 — safe to queue thumbnail generation
|
||
const thumbnailKey = `thumbnails/${assetId}.jpg`;
|
||
await thumbnailQueue.add('generate', {
|
||
assetId,
|
||
proxyKey: outputKey,
|
||
outputKey: thumbnailKey,
|
||
});
|
||
|
||
console.log(`[proxy] Asset ${assetId} proxy complete, thumbnail job queued`);
|
||
await job.updateProgress(100);
|
||
|
||
return { assetId, outputKey };
|
||
} catch (error) {
|
||
console.error(`[proxy] Error processing asset ${assetId}:`, error);
|
||
await query(
|
||
`UPDATE assets SET status = 'error', updated_at = NOW()
|
||
WHERE id = $1 AND status NOT IN ('live', 'ingesting')`,
|
||
[assetId]
|
||
);
|
||
throw error;
|
||
} finally {
|
||
await Promise.all([
|
||
unlink(inputPath).catch(() => {}),
|
||
unlink(outputPath).catch(() => {}),
|
||
]);
|
||
}
|
||
};
|