dragonflight/services/capture/src/capture-manager.js
Zac Gaetano 96f0f58e68 capture: yadif deint=1 so progressive SDI passes through unchanged
Bug: yadif=mode=1 unconditionally doubled output framerate for SDI input. On 1080p29.97 progressive sources the encoder produced zero frames (time advanced, size stayed at 1KiB MOV header).

Fix: deint=1 makes yadif only process frames flagged as interlaced; progressive frames pass through at the source rate.
2026-05-22 00:14:02 +00:00

433 lines
16 KiB
JavaScript

import { spawn } from 'child_process';
import { v4 as uuidv4 } from 'uuid';
import { createUploadStream } from './s3/client.js';
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
// ── Codec catalogue ──────────────────────────────────────────────────────
// Each entry maps the UI value to a base ffmpeg flag set. Bitrate / framerate
// / pix_fmt are layered on top from the per-recorder configuration.
const VIDEO_CODECS = {
prores_hq: { args: ['-c:v', 'prores_ks', '-profile:v', '3'], bitrateControl: false, pixFmt: 'yuv422p10le' },
prores_422: { args: ['-c:v', 'prores_ks', '-profile:v', '2'], bitrateControl: false, pixFmt: 'yuv422p10le' },
prores_lt: { args: ['-c:v', 'prores_ks', '-profile:v', '1'], bitrateControl: false, pixFmt: 'yuv422p10le' },
prores_proxy: { args: ['-c:v', 'prores_ks', '-profile:v', '0'], bitrateControl: false, pixFmt: 'yuv422p10le' },
dnxhd: { args: ['-c:v', 'dnxhd'], bitrateControl: true, pixFmt: 'yuv422p' },
dnxhr_hq: { args: ['-c:v', 'dnxhd', '-profile:v', 'dnxhr_hq'], bitrateControl: false, pixFmt: 'yuv422p' },
dnxhr_sq: { args: ['-c:v', 'dnxhd', '-profile:v', 'dnxhr_sq'], bitrateControl: false, pixFmt: 'yuv422p' },
h264: { args: ['-c:v', 'libx264', '-preset', 'medium'], bitrateControl: true, pixFmt: 'yuv420p' },
h264_nvenc: { args: ['-c:v', 'h264_nvenc', '-preset', 'p5'], bitrateControl: true, pixFmt: 'yuv420p' },
h265: { args: ['-c:v', 'libx265', '-preset', 'medium'], bitrateControl: true, pixFmt: 'yuv420p' },
hevc_nvenc: { args: ['-c:v', 'hevc_nvenc', '-preset', 'p5'], bitrateControl: true, pixFmt: 'yuv420p' },
};
const AUDIO_CODECS = {
pcm_s16le: { args: ['-c:a', 'pcm_s16le'], bitrateControl: false },
pcm_s24le: { args: ['-c:a', 'pcm_s24le'], bitrateControl: false },
pcm_s32le: { args: ['-c:a', 'pcm_s32le'], bitrateControl: false },
aac: { args: ['-c:a', 'aac'], bitrateControl: true },
ac3: { args: ['-c:a', 'ac3'], bitrateControl: true },
opus: { args: ['-c:a', 'libopus'], bitrateControl: true },
flac: { args: ['-c:a', 'flac'], bitrateControl: false },
};
const CONTAINER_FMT = {
mov: 'mov',
mp4: 'mp4',
mkv: 'matroska',
mxf: 'mxf',
ts: 'mpegts',
};
const CONTAINER_EXT = {
mov: 'mov', mp4: 'mp4', mkv: 'mkv', mxf: 'mxf', ts: 'ts',
};
function buildEncodeArgs({
codec, videoBitrate, framerate,
audioCodec, audioBitrate, audioChannels,
container, isNetwork, isProxy = false,
}) {
const v = VIDEO_CODECS[codec] || (isProxy ? VIDEO_CODECS.h264 : VIDEO_CODECS.prores_hq);
const a = AUDIO_CODECS[audioCodec] || (isProxy ? AUDIO_CODECS.aac : AUDIO_CODECS.pcm_s24le);
const fmt = CONTAINER_FMT[container] || (isProxy ? 'mp4' : 'mov');
const args = [];
if (isNetwork) args.push('-map', '0:v:0?', '-map', '0:a:0?');
args.push(...v.args);
if (v.pixFmt) args.push('-pix_fmt', v.pixFmt);
if (v.bitrateControl && videoBitrate) args.push('-b:v', videoBitrate);
if (framerate && framerate !== 'native') args.push('-r', framerate);
args.push(...a.args);
if (a.bitrateControl && audioBitrate) args.push('-b:a', audioBitrate);
if (audioChannels) args.push('-ac', String(audioChannels));
if (fmt === 'mov' || fmt === 'mp4') {
args.push('-movflags', '+frag_keyframe+empty_moov');
}
args.push('-f', fmt);
return args;
}
class CaptureManager {
constructor() {
this.state = {
recording: false,
sessionId: null,
processes: {},
currentSession: {},
framesReceived: 0,
currentFps: 0,
lastFrameAt: null,
lastError: null,
};
}
/**
* Build FFmpeg input arguments based on source type.
* Returns { inputArgs, isNetwork }
* @private
*/
async _buildInputArgs({ sourceType, device, sourceUrl, listen, listenPort, streamKey }) {
if (sourceType === 'srt') {
let url;
if (listen) {
const port = listenPort || 9000;
url = `srt://0.0.0.0:${port}?mode=listener`;
} else {
url = sourceUrl;
if (!url.includes('mode=')) {
url += (url.includes('?') ? '&' : '?') + 'mode=caller';
}
}
return { inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-i', url], isNetwork: true };
}
if (sourceType === 'rtmp') {
if (listen) {
const port = listenPort || 1935;
const key = streamKey || 'stream';
return {
inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-listen', '1', '-i', `rtmp://0.0.0.0:${port}/live/${key}`],
isNetwork: true,
};
}
return { inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-i', sourceUrl], isNetwork: true };
}
// Default: SDI via DeckLink
// device may be an integer index (0-based) or a full device name string.
// FFmpeg 7.x DeckLink requires the full name (e.g. 'DeckLink Duo (2)').
// Map integer index -> name using ffmpeg -sources decklink at runtime.
let deckLinkName = String(device);
if (typeof device === 'number' || /^\d+$/.test(String(device))) {
const idx = parseInt(device, 10);
try {
const { execSync } = await import('child_process');
const out = execSync('ffmpeg -sources decklink 2>&1', { encoding: 'utf-8', timeout: 5000 });
const names = [];
for (const line of out.split('\n')) {
const m = line.match(/^\s+[0-9a-f:]+\s+\[([^\]]+)\]/);
if (m) names.push(m[1]);
}
if (names[idx]) deckLinkName = names[idx];
else deckLinkName = `DeckLink Duo (${idx + 1})`;
} catch (_) {
deckLinkName = `DeckLink Duo (${parseInt(device, 10) + 1})`;
}
}
return {
inputArgs: ['-f', 'decklink', '-i', deckLinkName],
isNetwork: false,
};
}
/**
* Start a new capture session.
*
* Codec parameters all have sensible defaults so legacy callers (no codec
* args) still produce ProRes HQ master + H.264 proxy.
*/
async start({
assetId,
projectId,
binId,
clipName,
device,
sourceType = 'sdi',
sourceUrl,
listen = false,
listenPort,
streamKey,
// ── Recording codec ─────────────────────────────────────────────
videoCodec = 'prores_hq',
videoBitrate = null,
framerate = null,
audioCodec = 'pcm_s24le',
audioBitrate = null,
audioChannels = 2,
container = 'mov',
// ── Proxy codec ─────────────────────────────────────────────────
proxyEnabled = true,
proxyVideoCodec = 'h264',
proxyVideoBitrate = '8M',
proxyFramerate = null,
proxyAudioCodec = 'aac',
proxyAudioBitrate = '192k',
proxyAudioChannels = 2,
proxyContainer = 'mp4',
}) {
this._assetIdForHls = assetId || null;
if (this.state.recording) {
throw new Error('Capture already in progress');
}
const sessionId = uuidv4();
const hiresExt = CONTAINER_EXT[container] || 'mov';
const proxyExt = CONTAINER_EXT[proxyContainer] || 'mp4';
const hiresKey = `projects/${projectId}/masters/${clipName}.${hiresExt}`;
// Network sources cannot be opened by two FFmpeg processes simultaneously
// (one socket = one consumer). For SRT/RTMP the BullMQ worker generates
// the proxy after the recording stops.
const proxyKey = (sourceType === 'sdi' && proxyEnabled)
? `projects/${projectId}/proxies/${clipName}.${proxyExt}`
: null;
const startedAt = new Date().toISOString();
const { inputArgs, isNetwork } = await this._buildInputArgs({
sourceType, device, sourceUrl, listen, listenPort, streamKey,
});
const hiresCodecArgs = buildEncodeArgs({
codec: videoCodec, videoBitrate, framerate,
audioCodec, audioBitrate, audioChannels,
container,
isNetwork,
isProxy: false,
});
console.log('[capture] hires ffmpeg args:', hiresCodecArgs.join(' '));
const sdiFilterArgs = (sourceType === 'sdi') ? ['-vf', 'yadif=mode=1:deint=1'] : [];
const hiresProcess = spawn('ffmpeg', [
...inputArgs,
...sdiFilterArgs,
...hiresCodecArgs,
'pipe:1',
], { stdio: ['ignore', 'pipe', 'pipe'] });
const hiresUpload = createUploadStream(S3_BUCKET, hiresKey, hiresProcess.stdout);
const processes = { hires: hiresProcess };
const uploads = { hires: hiresUpload };
// ── HLS tee for network sources (live preview in the UI) ──────────
let hlsProcess = null;
let hlsDir = null;
if (isNetwork && this._assetIdForHls) {
try {
const fs = await import('node:fs');
hlsDir = '/live/' + this._assetIdForHls;
fs.mkdirSync(hlsDir, { recursive: true });
const hlsArgs = [
...inputArgs,
'-map', '0:v:0?', '-map', '0:a:0?',
'-c:v', 'libx264', '-preset', 'veryfast', '-tune', 'zerolatency',
'-pix_fmt', 'yuv420p', '-b:v', '2M', '-g', '60', '-sc_threshold', '0',
'-c:a', 'aac', '-b:a', '128k', '-ar', '44100',
'-f', 'hls', '-hls_time', '2', '-hls_list_size', '15',
'-hls_flags', 'delete_segments+append_list+omit_endlist',
'-hls_segment_filename', hlsDir + '/seg-%05d.ts',
hlsDir + '/index.m3u8',
];
hlsProcess = spawn('ffmpeg', hlsArgs, { stdio: ['ignore', 'pipe', 'pipe'] });
hlsProcess.stderr.on('data', (d) => { console.error('[HLS] ' + d); });
hlsProcess.on('exit', (c) => console.log('[HLS] exited ' + c));
processes.hls = hlsProcess;
console.log('[HLS] tee started -> ' + hlsDir);
} catch (err) {
console.error('[HLS] tee failed:', err.message);
}
}
hiresProcess.stderr.on('data', (data) => {
const text = data.toString();
console.error(`[HIRES] ${text}`);
const m = text.match(/frame=\s*(\d+)\s+fps=\s*([\d.]+)/);
if (m) {
this.state.framesReceived = parseInt(m[1], 10);
this.state.currentFps = parseFloat(m[2]);
this.state.lastFrameAt = new Date().toISOString();
}
if (/Connection refused|No route to host|Connection failed|Input\/output error|Server returned|404 Not Found|Connection timed out/i.test(text)) {
this.state.lastError = text.trim().slice(0, 240);
}
});
// SDI only: spawn a second ffmpeg for the proxy.
// DeckLink cards allow concurrent reads; network sockets do not.
if (!isNetwork && proxyEnabled) {
const proxyCodecArgs = buildEncodeArgs({
codec: proxyVideoCodec,
videoBitrate: proxyVideoBitrate,
framerate: proxyFramerate,
audioCodec: proxyAudioCodec,
audioBitrate: proxyAudioBitrate,
audioChannels: proxyAudioChannels,
container: proxyContainer,
isNetwork: false,
isProxy: true,
});
console.log('[capture] proxy ffmpeg args:', proxyCodecArgs.join(' '));
const proxyProcess = spawn('ffmpeg', [
...inputArgs,
...sdiFilterArgs,
...proxyCodecArgs,
'-movflags', '+frag_keyframe+empty_moov',
'pipe:1',
], { stdio: ['ignore', 'pipe', 'pipe'] });
const proxyUpload = createUploadStream(S3_BUCKET, proxyKey, proxyProcess.stdout);
processes.proxy = proxyProcess;
uploads.proxy = proxyUpload;
proxyProcess.stderr.on('data', (data) => {
console.error(`[PROXY] ${data}`);
});
}
this.state.recording = true;
this.state.sessionId = sessionId;
this.state.processes = processes;
this.state.framesReceived = 0;
this.state.currentFps = 0;
this.state.lastFrameAt = null;
this.state.lastError = null;
this.state.currentSession = {
sessionId,
projectId,
binId,
clipName,
device,
sourceType,
sourceUrl,
hiresKey,
proxyKey,
startedAt,
duration: 0,
uploads,
codecs: {
videoCodec, videoBitrate, framerate,
audioCodec, audioBitrate, audioChannels, container,
proxyEnabled, proxyVideoCodec, proxyVideoBitrate,
proxyAudioCodec, proxyAudioBitrate, proxyAudioChannels, proxyContainer,
},
};
return this._formatSessionResponse();
}
async stop(sessionId) {
if (!this.state.recording || this.state.sessionId !== sessionId) {
throw new Error('No active capture session or session ID mismatch');
}
const { processes, currentSession } = this.state;
if (processes.hires) processes.hires.kill('SIGINT');
if (processes.proxy) processes.proxy.kill('SIGINT');
if (processes.hls) { try { processes.hls.kill('SIGINT'); } catch (_) {} }
try {
const uploadPromises = [currentSession.uploads.hires];
if (currentSession.uploads.proxy) uploadPromises.push(currentSession.uploads.proxy);
await Promise.all(uploadPromises);
} catch (error) {
console.error('Error during upload completion:', error);
}
const stoppedAt = new Date().toISOString();
const startTime = new Date(currentSession.startedAt);
const stopTime = new Date(stoppedAt);
const duration = Math.round((stopTime - startTime) / 1000);
this.state.recording = false;
this.state.sessionId = null;
this.state.processes = {};
return {
sessionId,
projectId: currentSession.projectId,
binId: currentSession.binId,
clipName: currentSession.clipName,
sourceType: currentSession.sourceType,
hiresKey: currentSession.hiresKey,
proxyKey: currentSession.proxyKey,
startedAt: currentSession.startedAt,
stoppedAt,
duration,
};
}
getStatus() {
if (!this.state.recording) return { recording: false };
const startTime = new Date(this.state.currentSession.startedAt);
const now = new Date();
const duration = Math.round((now - startTime) / 1000);
const lastFrameAt = this.state.lastFrameAt;
const msSinceFrame = lastFrameAt ? (Date.now() - new Date(lastFrameAt).getTime()) : null;
let signal = 'connecting';
if (this.state.framesReceived > 0) {
signal = (msSinceFrame !== null && msSinceFrame < 5000) ? 'receiving' : 'lost';
} else if (this.state.lastError) {
signal = 'error';
}
return {
recording: true,
sessionId: this.state.sessionId,
sourceType: this.state.currentSession.sourceType,
device: this.state.currentSession.device,
clipName: this.state.currentSession.clipName,
projectId: this.state.currentSession.projectId,
binId: this.state.currentSession.binId,
duration,
startedAt: this.state.currentSession.startedAt,
signal,
framesReceived: this.state.framesReceived,
currentFps: this.state.currentFps,
lastFrameAt,
msSinceFrame,
lastError: this.state.lastError,
codecs: this.state.currentSession.codecs,
};
}
_formatSessionResponse() {
const { currentSession, sessionId } = this.state;
return {
sessionId,
projectId: currentSession.projectId,
binId: currentSession.binId,
clipName: currentSession.clipName,
device: currentSession.device,
sourceType: currentSession.sourceType,
hiresKey: currentSession.hiresKey,
proxyKey: currentSession.proxyKey,
startedAt: currentSession.startedAt,
codecs: currentSession.codecs,
};
}
}
export default new CaptureManager();
export { VIDEO_CODECS, AUDIO_CODECS, CONTAINER_FMT, CONTAINER_EXT };