From ea48e9846542115ebcddd571de9d2efc4a258600 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Sat, 16 May 2026 08:19:41 -0400 Subject: [PATCH] feat(capture): add SRT/RTMP source type support - Add _buildInputArgs() to build FFmpeg input args per source type - SRT caller: srt://host:port?mode=caller - SRT listener: srt://0.0.0.0:PORT?mode=listener - RTMP caller: -i rtmp://host/app/key - RTMP listener: -listen 1 -i rtmp://0.0.0.0:PORT/live/key - Network sources spawn hires-only FFmpeg process (can't open stream twice) - proxyKey is null for network sources; proxy generated by worker post-stop - SDI keeps existing dual-process behavior unchanged --- services/capture/src/capture-manager.js | 194 +++++++++++++++++------- 1 file changed, 143 insertions(+), 51 deletions(-) diff --git a/services/capture/src/capture-manager.js b/services/capture/src/capture-manager.js index a55e9b6..6a56d3d 100644 --- a/services/capture/src/capture-manager.js +++ b/services/capture/src/capture-manager.js @@ -14,83 +14,173 @@ class CaptureManager { }; } + /** + * Build FFmpeg input arguments based on source type. + * Returns { inputArgs, isNetwork } + * @private + */ + _buildInputArgs({ sourceType, device, sourceUrl, listen, listenPort, streamKey }) { + if (sourceType === 'srt') { + let url; + if (listen) { + const port = listenPort || 9000; + url = `srt://0.0.0.0:${port}?mode=listener`; + } else { + // Caller mode — ensure mode=caller is appended if not already present + url = sourceUrl; + if (!url.includes('mode=')) { + url += (url.includes('?') ? '&' : '?') + 'mode=caller'; + } + } + return { inputArgs: ['-i', url], isNetwork: true }; + } + + if (sourceType === 'rtmp') { + if (listen) { + const port = listenPort || 1935; + const key = streamKey || 'stream'; + return { + inputArgs: ['-listen', '1', '-i', `rtmp://0.0.0.0:${port}/live/${key}`], + isNetwork: true, + }; + } + return { inputArgs: ['-i', sourceUrl], isNetwork: true }; + } + + // Default: SDI via DeckLink + return { + inputArgs: ['-f', 'decklink', '-i', String(device)], + isNetwork: false, + }; + } + /** * Start a new capture session - * @param {Object} params - { projectId, binId, clipName, device } + * @param {Object} params + * - projectId, binId, clipName — always required + * - device — DeckLink device index (SDI only) + * - sourceType — 'sdi' | 'srt' | 'rtmp' (default: 'sdi') + * - sourceUrl — URL for caller mode (SRT/RTMP caller) + * - listen — true for listener/server mode + * - listenPort — port to bind in listener mode + * - streamKey — RTMP stream key for listener mode * @returns {Object} Session info */ - async start({ projectId, binId, clipName, device }) { + async start({ + projectId, + binId, + clipName, + device, + sourceType = 'sdi', + sourceUrl, + listen = false, + listenPort, + streamKey, + }) { if (this.state.recording) { throw new Error('Capture already in progress'); } const sessionId = uuidv4(); const hiresKey = `projects/${projectId}/masters/${clipName}.mov`; - const proxyKey = `projects/${projectId}/proxies/${clipName}.mp4`; + + // Network sources cannot be opened by two FFmpeg processes simultaneously. + // proxyKey is null for SRT/RTMP — the BullMQ worker generates the proxy + // after the recording stops (same pipeline used for uploaded files). + const proxyKey = sourceType === 'sdi' + ? `projects/${projectId}/proxies/${clipName}.mp4` + : null; + const startedAt = new Date().toISOString(); - // Spawn FFmpeg processes + const { inputArgs, isNetwork } = this._buildInputArgs({ + sourceType, + device, + sourceUrl, + listen, + listenPort, + streamKey, + }); + + // ProRes hires — fragmented moov for pipe-safe output on network sources + const hiresCodecArgs = isNetwork + ? [ + '-c:v', 'prores_ks', + '-profile:v', '3', + '-c:a', 'pcm_s24le', + '-movflags', '+frag_keyframe+empty_moov', + '-f', 'mov', + ] + : [ + '-c:v', 'prores_ks', + '-profile:v', '3', + '-c:a', 'pcm_s24le', + '-f', 'mov', + ]; + + // Spawn hires FFmpeg process const hiresProcess = spawn('ffmpeg', [ - '-f', 'decklink', - '-i', device, - '-c:v', 'prores_ks', - '-profile:v', '3', - '-c:a', 'pcm_s24le', - '-f', 'mov', + ...inputArgs, + ...hiresCodecArgs, 'pipe:1', ], { stdio: ['ignore', 'pipe', 'pipe'], }); - const proxyProcess = spawn('ffmpeg', [ - '-f', 'decklink', - '-i', device, - '-c:v', 'libx264', - '-preset', 'fast', - '-b:v', '10M', - '-c:a', 'aac', - '-b:a', '192k', - '-movflags', '+frag_keyframe+empty_moov', - '-f', 'mp4', - 'pipe:1', - ], { - stdio: ['ignore', 'pipe', 'pipe'], - }); - - // Start S3 uploads from FFmpeg stdout const hiresUpload = createUploadStream(S3_BUCKET, hiresKey, hiresProcess.stdout); - const proxyUpload = createUploadStream(S3_BUCKET, proxyKey, proxyProcess.stdout); + + const processes = { hires: hiresProcess }; + const uploads = { hires: hiresUpload }; + + hiresProcess.stderr.on('data', (data) => { + console.error(`[HIRES] ${data}`); + }); + + // SDI only: spawn a second FFmpeg process for the proxy. + // DeckLink cards can be opened simultaneously by multiple processes; + // network streams cannot. + if (!isNetwork) { + const proxyProcess = spawn('ffmpeg', [ + ...inputArgs, + '-c:v', 'libx264', + '-preset', 'fast', + '-b:v', '10M', + '-c:a', 'aac', + '-b:a', '192k', + '-movflags', '+frag_keyframe+empty_moov', + '-f', 'mp4', + 'pipe:1', + ], { + stdio: ['ignore', 'pipe', 'pipe'], + }); + + const proxyUpload = createUploadStream(S3_BUCKET, proxyKey, proxyProcess.stdout); + processes.proxy = proxyProcess; + uploads.proxy = proxyUpload; + + proxyProcess.stderr.on('data', (data) => { + console.error(`[PROXY] ${data}`); + }); + } this.state.recording = true; this.state.sessionId = sessionId; - this.state.processes = { - hires: hiresProcess, - proxy: proxyProcess, - }; + this.state.processes = processes; this.state.currentSession = { sessionId, projectId, binId, clipName, device, + sourceType, + sourceUrl, hiresKey, proxyKey, startedAt, duration: 0, - uploads: { - hires: hiresUpload, - proxy: proxyUpload, - }, + uploads, }; - // Handle process errors - hiresProcess.stderr.on('data', (data) => { - console.error(`[HIRES] ${data}`); - }); - proxyProcess.stderr.on('data', (data) => { - console.error(`[PROXY] ${data}`); - }); - return this._formatSessionResponse(); } @@ -106,7 +196,7 @@ class CaptureManager { const { processes, currentSession } = this.state; - // Send SIGINT to both processes + // Gracefully terminate all FFmpeg processes if (processes.hires) { processes.hires.kill('SIGINT'); } @@ -115,13 +205,12 @@ class CaptureManager { } try { - // Wait for uploads to complete - if (currentSession.uploads) { - await Promise.all([ - currentSession.uploads.hires, - currentSession.uploads.proxy, - ]); + // Wait for all in-flight S3 uploads to complete + const uploadPromises = [currentSession.uploads.hires]; + if (currentSession.uploads.proxy) { + uploadPromises.push(currentSession.uploads.proxy); } + await Promise.all(uploadPromises); } catch (error) { console.error('Error during upload completion:', error); } @@ -141,8 +230,9 @@ class CaptureManager { projectId: currentSession.projectId, binId: currentSession.binId, clipName: currentSession.clipName, + sourceType: currentSession.sourceType, hiresKey: currentSession.hiresKey, - proxyKey: currentSession.proxyKey, + proxyKey: currentSession.proxyKey, // null for SRT/RTMP startedAt: currentSession.startedAt, stoppedAt, duration, @@ -167,6 +257,7 @@ class CaptureManager { return { recording: true, sessionId: this.state.sessionId, + sourceType: this.state.currentSession.sourceType, device: this.state.currentSession.device, clipName: this.state.currentSession.clipName, projectId: this.state.currentSession.projectId, @@ -188,6 +279,7 @@ class CaptureManager { binId: currentSession.binId, clipName: currentSession.clipName, device: currentSession.device, + sourceType: currentSession.sourceType, hiresKey: currentSession.hiresKey, proxyKey: currentSession.proxyKey, startedAt: currentSession.startedAt,