dragonflight/services/worker/src/workers/proxy.js

201 lines
7.8 KiB
JavaScript
Raw Normal View History

import { join } from 'path';
import { unlink } from 'fs/promises'
import { tmpdir } from 'os';
import { Queue } from 'bullmq';
import { query } from '../db/client.js';
import { downloadFromS3, uploadToS3 } from '../s3/client.js';
import { transcodeVideo, transcodeImage, getMediaInfo, isHwCodec } from '../ffmpeg/executor.js';
// Read the global proxy-encoder settings from the DB. These are written by
// Settings → Proxy encoding in the GUI. Falls back to libx264 defaults if
// nothing is configured (first-run / fresh install).
async function loadProxyEncodingSettings() {
const result = await query(
`SELECT key, value FROM settings
WHERE key IN ('gpu_transcode_enabled','gpu_codec','gpu_preset','gpu_bitrate_mbps',
'gpu_rc_mode','gpu_audio_codec','gpu_audio_bitrate_kbps')`
);
const map = {};
for (const { key, value } of result.rows) map[key] = value;
const gpuEnabled = map.gpu_transcode_enabled === 'true';
const codec = map.gpu_codec || (gpuEnabled ? 'h264_nvenc' : 'libx264');
const preset = map.gpu_preset || (gpuEnabled ? 'p4' : 'fast');
const bitrateM = parseInt(map.gpu_bitrate_mbps || '10', 10);
const rcMode = map.gpu_rc_mode || null;
const audioCodec = map.gpu_audio_codec || 'aac';
const audioKbps = parseInt(map.gpu_audio_bitrate_kbps || '192', 10);
return {
videoCodec: codec,
videoPreset: preset,
videoBitrate: `${bitrateM}M`,
rateControl: rcMode,
audioCodec,
audioBitrate: `${audioKbps}k`,
_gpu: gpuEnabled && isHwCodec(codec),
};
}
// Codec names ffprobe reports for still-image inputs. These bypass the video
// transcode entirely — see proxyWorker below.
const IMAGE_CODECS = new Set(['png', 'mjpeg', 'jpeg', 'webp', 'gif', 'tiff', 'bmp', 'jpegls']);
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
// Dispatch thumbnail job once proxy is ready — same Redis connection as the worker
const parseRedisUrl = (url) => {
const parsed = new URL(url);
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
};
const thumbnailQueue = new Queue('thumbnail', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
export const proxyWorker = async (job) => {
const { assetId, inputKey, outputKey } = job.data;
const tmpDir = tmpdir();
const inputPath = join(tmpDir, `proxy-input-${job.id}.mov`);
const outputPath = join(tmpDir, `proxy-output-${job.id}.mp4`);
try {
// Download original from S3
await job.updateProgress(10);
console.log(`[proxy] Downloading ${inputKey} for asset ${assetId}`);
await downloadFromS3(S3_BUCKET, inputKey, inputPath);
// Extract source metadata (fps, codec, resolution, duration, size, audio)
await job.updateProgress(20);
let mediaInfo = {};
let hasAudio = true;
try {
mediaInfo = await getMediaInfo(inputPath);
hasAudio = !!mediaInfo.hasAudio;
console.log(`[proxy] Metadata for ${assetId}: ${JSON.stringify(mediaInfo)}`);
} catch (err) {
console.warn(`[proxy] getMediaInfo failed for ${assetId}: ${err.message}`);
}
// Still images skip the video proxy — they have no temporal stream and
// x264 with a one-frame PNG input fails (Could not open encoder before EOF).
// Generate a scaled JPEG poster instead; the thumbnail job will downsize it.
const isImage = mediaInfo.codec && IMAGE_CODECS.has(mediaInfo.codec.toLowerCase());
if (isImage) {
const imageOutputKey = outputKey.replace(/\.mp4$/, '.jpg');
const imageOutputPath = outputPath.replace(/\.mp4$/, '.jpg');
console.log(`[proxy] Image asset ${assetId} (${mediaInfo.codec}) — emitting poster instead of video proxy`);
await job.updateProgress(40);
await transcodeImage(inputPath, imageOutputPath);
await job.updateProgress(70);
await uploadToS3(S3_BUCKET, imageOutputKey, imageOutputPath);
await job.updateProgress(90);
await query(
`UPDATE assets
SET thumbnail_s3_key = $1,
proxy_s3_key = NULL,
resolution = COALESCE($2, resolution),
file_size = COALESCE($3, file_size),
status = 'ready',
updated_at = NOW()
WHERE id = $4`,
[imageOutputKey, mediaInfo.resolution ?? null, mediaInfo.fileSizeBytes ?? null, assetId]
);
await unlink(imageOutputPath).catch(() => {});
await job.updateProgress(100);
return { assetId, outputKey: imageOutputKey };
}
// Empty/truncated capture: probe returned a video stream but ffmpeg can't
// read any frames. Bail with a clear message instead of dumping ~3KB of
// ffmpeg stderr into the failed-jobs list.
if (mediaInfo.durationMs === null && mediaInfo.codec) {
throw new Error(
`Empty or truncated source: codec=${mediaInfo.codec}, ` +
`resolution=${mediaInfo.resolution || 'unknown'}, no readable frames.`
);
}
// Transcode using the operator-configured encoder. The proxy worker is
// the only consumer of these settings; recorders manage their own codecs.
await job.updateProgress(30);
const encSettings = await loadProxyEncodingSettings();
console.log(
`[proxy] Transcoding asset ${assetId} via ${encSettings._gpu ? 'GPU' : 'CPU'} ` +
`(${encSettings.videoCodec} ${encSettings.videoPreset} ${encSettings.videoBitrate})`
);
try {
await transcodeVideo(inputPath, outputPath, { ...encSettings, hasAudio });
} catch (err) {
if (encSettings._gpu) {
// Hardware encoder failed — typically "no NVIDIA driver" or "VAAPI
// device not found". Fall back to libx264 so the job doesn't fail
// when the worker host has no GPU.
console.warn(`[proxy] GPU encode failed (${err.message}); falling back to libx264`);
await transcodeVideo(inputPath, outputPath, {
videoCodec: 'libx264', videoPreset: 'fast',
videoBitrate: encSettings.videoBitrate,
audioCodec: encSettings.audioCodec, audioBitrate: encSettings.audioBitrate,
hasAudio,
});
} else { throw err; }
}
// Upload proxy to S3
await job.updateProgress(70);
console.log(`[proxy] Uploading to ${outputKey}`);
await uploadToS3(S3_BUCKET, outputKey, outputPath);
// Update asset record — store extracted metadata + proxy key
// Use COALESCE so we never overwrite fields the capture service already set
await job.updateProgress(90);
await query(
`UPDATE assets
SET proxy_s3_key = $1,
fps = COALESCE($2, fps),
codec = COALESCE($3, codec),
resolution = COALESCE($4, resolution),
duration_ms = COALESCE($5, duration_ms),
file_size = COALESCE($6, file_size),
updated_at = NOW()
WHERE id = $7`,
[
outputKey,
mediaInfo.fps ?? null,
mediaInfo.codec ?? null,
mediaInfo.resolution ?? null,
mediaInfo.durationMs ?? null,
mediaInfo.fileSizeBytes ?? null,
assetId,
]
);
// Now proxy exists in S3 — safe to queue thumbnail generation
const thumbnailKey = `thumbnails/${assetId}.jpg`;
await thumbnailQueue.add('generate', {
assetId,
proxyKey: outputKey,
outputKey: thumbnailKey,
});
console.log(`[proxy] Asset ${assetId} proxy complete, thumbnail job queued`);
await job.updateProgress(100);
return { assetId, outputKey };
} catch (error) {
console.error(`[proxy] Error processing asset ${assetId}:`, error);
await query(
`UPDATE assets SET status = 'error', updated_at = NOW() WHERE id = $1`,
[assetId]
);
throw error;
} finally {
await Promise.all([
unlink(inputPath).catch(() => {}),
unlink(outputPath).catch(() => {}),
]);
}
};