import { spawn } from 'child_process'; import { v4 as uuidv4 } from 'uuid'; import { createUploadStream } from './s3/client.js'; const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon'; class CaptureManager { constructor() { this.state = { recording: false, sessionId: null, processes: {}, currentSession: {}, // Live signal metrics derived from ffmpeg stderr framesReceived: 0, currentFps: 0, lastFrameAt: null, lastError: null, }; } /** * Build FFmpeg input arguments based on source type. * Returns { inputArgs, isNetwork } * @private */ _buildInputArgs({ sourceType, device, sourceUrl, listen, listenPort, streamKey }) { if (sourceType === 'srt') { let url; if (listen) { const port = listenPort || 9000; url = `srt://0.0.0.0:${port}?mode=listener`; } else { // Caller mode — ensure mode=caller is appended if not already present url = sourceUrl; if (!url.includes('mode=')) { url += (url.includes('?') ? '&' : '?') + 'mode=caller'; } } return { inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-i', url], isNetwork: true }; } if (sourceType === 'rtmp') { if (listen) { const port = listenPort || 1935; const key = streamKey || 'stream'; return { inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-listen', '1', '-i', `rtmp://0.0.0.0:${port}/live/${key}`], isNetwork: true, }; } return { inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-i', sourceUrl], isNetwork: true }; } // Default: SDI via DeckLink return { inputArgs: ['-f', 'decklink', '-i', String(device)], isNetwork: false, }; } /** * Start a new capture session * @param {Object} params * - projectId, binId, clipName — always required * - device — DeckLink device index (SDI only) * - sourceType — 'sdi' | 'srt' | 'rtmp' (default: 'sdi') * - sourceUrl — URL for caller mode (SRT/RTMP caller) * - listen — true for listener/server mode * - listenPort — port to bind in listener mode * - streamKey — RTMP stream key for listener mode * @returns {Object} Session info */ async start({ assetId, projectId, binId, clipName, device, sourceType = 'sdi', sourceUrl, listen = false, listenPort, streamKey, }) { this._assetIdForHls = assetId || null; if (this.state.recording) { throw new Error('Capture already in progress'); } const sessionId = uuidv4(); const hiresKey = `projects/${projectId}/masters/${clipName}.mov`; // Network sources cannot be opened by two FFmpeg processes simultaneously. // proxyKey is null for SRT/RTMP — the BullMQ worker generates the proxy // after the recording stops (same pipeline used for uploaded files). const proxyKey = sourceType === 'sdi' ? `projects/${projectId}/proxies/${clipName}.mp4` : null; const startedAt = new Date().toISOString(); const { inputArgs, isNetwork } = this._buildInputArgs({ sourceType, device, sourceUrl, listen, listenPort, streamKey, }); // ProRes hires — fragmented moov for pipe-safe output on network sources const hiresCodecArgs = isNetwork ? [ '-map', '0:v:0?', '-map', '0:a:0?', '-c:v', 'prores_ks', '-profile:v', '3', '-c:a', 'pcm_s24le', '-movflags', '+frag_keyframe+empty_moov', '-f', 'mov', ] : [ '-c:v', 'prores_ks', '-profile:v', '3', '-c:a', 'pcm_s24le', '-f', 'mov', ]; // Spawn hires FFmpeg process const hiresProcess = spawn('ffmpeg', [ ...inputArgs, ...hiresCodecArgs, 'pipe:1', ], { stdio: ['ignore', 'pipe', 'pipe'], }); const hiresUpload = createUploadStream(S3_BUCKET, hiresKey, hiresProcess.stdout); const processes = { hires: hiresProcess }; const uploads = { hires: hiresUpload }; let hlsProcess = null; let hlsDir = null; if (isNetwork && this._assetIdForHls) { try { const fs = await import('node:fs'); hlsDir = '/live/' + this._assetIdForHls; fs.mkdirSync(hlsDir, { recursive: true }); const hlsArgs = [ ...inputArgs, '-map', '0:v:0?', '-map', '0:a:0?', '-c:v', 'libx264', '-preset', 'veryfast', '-tune', 'zerolatency', '-pix_fmt', 'yuv420p', '-b:v', '2M', '-g', '60', '-sc_threshold', '0', '-c:a', 'aac', '-b:a', '128k', '-ar', '44100', '-f', 'hls', '-hls_time', '2', '-hls_list_size', '15', '-hls_flags', 'delete_segments+append_list+omit_endlist', '-hls_segment_filename', hlsDir + '/seg-%05d.ts', hlsDir + '/index.m3u8', ]; hlsProcess = spawn('ffmpeg', hlsArgs, { stdio: ['ignore', 'pipe', 'pipe'] }); hlsProcess.stderr.on('data', (d) => { console.error('[HLS] ' + d); }); hlsProcess.on('exit', (c) => console.log('[HLS] exited ' + c)); processes.hls = hlsProcess; console.log('[HLS] tee started -> ' + hlsDir); } catch (err) { console.error('[HLS] tee failed:', err.message); } } hiresProcess.stderr.on('data', (data) => { const text = data.toString(); console.error(`[HIRES] ${text}`); // Track stream signal: ffmpeg prints "frame= 123 fps= 30 ..." every ~1s const m = text.match(/frame=\s*(\d+)\s+fps=\s*([\d.]+)/); if (m) { this.state.framesReceived = parseInt(m[1], 10); this.state.currentFps = parseFloat(m[2]); this.state.lastFrameAt = new Date().toISOString(); } // Surface fatal-looking lines for the status endpoint if (/Connection refused|No route to host|Connection failed|Input\/output error|Server returned|404 Not Found|Connection timed out/i.test(text)) { this.state.lastError = text.trim().slice(0, 240); } }); // SDI only: spawn a second FFmpeg process for the proxy. // DeckLink cards can be opened simultaneously by multiple processes; // network streams cannot. if (!isNetwork) { const proxyProcess = spawn('ffmpeg', [ ...inputArgs, '-c:v', 'libx264', '-preset', 'fast', '-b:v', '10M', '-c:a', 'aac', '-b:a', '192k', '-movflags', '+frag_keyframe+empty_moov', '-f', 'mp4', 'pipe:1', ], { stdio: ['ignore', 'pipe', 'pipe'], }); const proxyUpload = createUploadStream(S3_BUCKET, proxyKey, proxyProcess.stdout); processes.proxy = proxyProcess; uploads.proxy = proxyUpload; proxyProcess.stderr.on('data', (data) => { console.error(`[PROXY] ${data}`); }); } this.state.recording = true; this.state.sessionId = sessionId; this.state.processes = processes; this.state.framesReceived = 0; this.state.currentFps = 0; this.state.lastFrameAt = null; this.state.lastError = null; this.state.currentSession = { sessionId, projectId, binId, clipName, device, sourceType, sourceUrl, hiresKey, proxyKey, startedAt, duration: 0, uploads, }; return this._formatSessionResponse(); } /** * Stop the current capture session * @param {string} sessionId - Session ID to stop * @returns {Object} Completed session info */ async stop(sessionId) { if (!this.state.recording || this.state.sessionId !== sessionId) { throw new Error('No active capture session or session ID mismatch'); } const { processes, currentSession } = this.state; // Gracefully terminate all FFmpeg processes if (processes.hires) { processes.hires.kill('SIGINT'); } if (processes.proxy) processes.proxy.kill('SIGINT'); if (processes.hls) { try { processes.hls.kill('SIGINT'); } catch (_) {} } try { // Wait for all in-flight S3 uploads to complete const uploadPromises = [currentSession.uploads.hires]; if (currentSession.uploads.proxy) { uploadPromises.push(currentSession.uploads.proxy); } await Promise.all(uploadPromises); } catch (error) { console.error('Error during upload completion:', error); } const stoppedAt = new Date().toISOString(); const startTime = new Date(currentSession.startedAt); const stopTime = new Date(stoppedAt); const duration = Math.round((stopTime - startTime) / 1000); // Reset state this.state.recording = false; this.state.sessionId = null; this.state.processes = {}; return { sessionId, projectId: currentSession.projectId, binId: currentSession.binId, clipName: currentSession.clipName, sourceType: currentSession.sourceType, hiresKey: currentSession.hiresKey, proxyKey: currentSession.proxyKey, // null for SRT/RTMP startedAt: currentSession.startedAt, stoppedAt, duration, }; } /** * Get current capture status * @returns {Object} Current state */ getStatus() { if (!this.state.recording) { return { recording: false, }; } const startTime = new Date(this.state.currentSession.startedAt); const now = new Date(); const duration = Math.round((now - startTime) / 1000); const lastFrameAt = this.state.lastFrameAt; const msSinceFrame = lastFrameAt ? (Date.now() - new Date(lastFrameAt).getTime()) : null; let signal = 'connecting'; if (this.state.framesReceived > 0) { signal = (msSinceFrame !== null && msSinceFrame < 5000) ? 'receiving' : 'lost'; } else if (this.state.lastError) { signal = 'error'; } return { recording: true, sessionId: this.state.sessionId, sourceType: this.state.currentSession.sourceType, device: this.state.currentSession.device, clipName: this.state.currentSession.clipName, projectId: this.state.currentSession.projectId, binId: this.state.currentSession.binId, duration, startedAt: this.state.currentSession.startedAt, signal, framesReceived: this.state.framesReceived, currentFps: this.state.currentFps, lastFrameAt, msSinceFrame, lastError: this.state.lastError, }; } /** * Format session response * @private */ _formatSessionResponse() { const { currentSession, sessionId } = this.state; return { sessionId, projectId: currentSession.projectId, binId: currentSession.binId, clipName: currentSession.clipName, device: currentSession.device, sourceType: currentSession.sourceType, hiresKey: currentSession.hiresKey, proxyKey: currentSession.proxyKey, startedAt: currentSession.startedAt, }; } } export default new CaptureManager();