feat(capture): add SRT/RTMP source type support

- Add _buildInputArgs() to build FFmpeg input args per source type
- SRT caller: srt://host:port?mode=caller
- SRT listener: srt://0.0.0.0:PORT?mode=listener
- RTMP caller: -i rtmp://host/app/key
- RTMP listener: -listen 1 -i rtmp://0.0.0.0:PORT/live/key
- Network sources spawn hires-only FFmpeg process (can't open stream twice)
- proxyKey is null for network sources; proxy generated by worker post-stop
- SDI keeps existing dual-process behavior unchanged
This commit is contained in:
Zac Gaetano 2026-05-16 08:19:41 -04:00
parent ed52dfcafb
commit ea48e98465

View file

@ -14,83 +14,173 @@ class CaptureManager {
};
}
/**
* Build FFmpeg input arguments based on source type.
* Returns { inputArgs, isNetwork }
* @private
*/
_buildInputArgs({ sourceType, device, sourceUrl, listen, listenPort, streamKey }) {
if (sourceType === 'srt') {
let url;
if (listen) {
const port = listenPort || 9000;
url = `srt://0.0.0.0:${port}?mode=listener`;
} else {
// Caller mode — ensure mode=caller is appended if not already present
url = sourceUrl;
if (!url.includes('mode=')) {
url += (url.includes('?') ? '&' : '?') + 'mode=caller';
}
}
return { inputArgs: ['-i', url], isNetwork: true };
}
if (sourceType === 'rtmp') {
if (listen) {
const port = listenPort || 1935;
const key = streamKey || 'stream';
return {
inputArgs: ['-listen', '1', '-i', `rtmp://0.0.0.0:${port}/live/${key}`],
isNetwork: true,
};
}
return { inputArgs: ['-i', sourceUrl], isNetwork: true };
}
// Default: SDI via DeckLink
return {
inputArgs: ['-f', 'decklink', '-i', String(device)],
isNetwork: false,
};
}
/**
* Start a new capture session
* @param {Object} params - { projectId, binId, clipName, device }
* @param {Object} params
* - projectId, binId, clipName always required
* - device DeckLink device index (SDI only)
* - sourceType 'sdi' | 'srt' | 'rtmp' (default: 'sdi')
* - sourceUrl URL for caller mode (SRT/RTMP caller)
* - listen true for listener/server mode
* - listenPort port to bind in listener mode
* - streamKey RTMP stream key for listener mode
* @returns {Object} Session info
*/
async start({ projectId, binId, clipName, device }) {
async start({
projectId,
binId,
clipName,
device,
sourceType = 'sdi',
sourceUrl,
listen = false,
listenPort,
streamKey,
}) {
if (this.state.recording) {
throw new Error('Capture already in progress');
}
const sessionId = uuidv4();
const hiresKey = `projects/${projectId}/masters/${clipName}.mov`;
const proxyKey = `projects/${projectId}/proxies/${clipName}.mp4`;
// Network sources cannot be opened by two FFmpeg processes simultaneously.
// proxyKey is null for SRT/RTMP — the BullMQ worker generates the proxy
// after the recording stops (same pipeline used for uploaded files).
const proxyKey = sourceType === 'sdi'
? `projects/${projectId}/proxies/${clipName}.mp4`
: null;
const startedAt = new Date().toISOString();
// Spawn FFmpeg processes
const { inputArgs, isNetwork } = this._buildInputArgs({
sourceType,
device,
sourceUrl,
listen,
listenPort,
streamKey,
});
// ProRes hires — fragmented moov for pipe-safe output on network sources
const hiresCodecArgs = isNetwork
? [
'-c:v', 'prores_ks',
'-profile:v', '3',
'-c:a', 'pcm_s24le',
'-movflags', '+frag_keyframe+empty_moov',
'-f', 'mov',
]
: [
'-c:v', 'prores_ks',
'-profile:v', '3',
'-c:a', 'pcm_s24le',
'-f', 'mov',
];
// Spawn hires FFmpeg process
const hiresProcess = spawn('ffmpeg', [
'-f', 'decklink',
'-i', device,
'-c:v', 'prores_ks',
'-profile:v', '3',
'-c:a', 'pcm_s24le',
'-f', 'mov',
...inputArgs,
...hiresCodecArgs,
'pipe:1',
], {
stdio: ['ignore', 'pipe', 'pipe'],
});
const proxyProcess = spawn('ffmpeg', [
'-f', 'decklink',
'-i', device,
'-c:v', 'libx264',
'-preset', 'fast',
'-b:v', '10M',
'-c:a', 'aac',
'-b:a', '192k',
'-movflags', '+frag_keyframe+empty_moov',
'-f', 'mp4',
'pipe:1',
], {
stdio: ['ignore', 'pipe', 'pipe'],
});
// Start S3 uploads from FFmpeg stdout
const hiresUpload = createUploadStream(S3_BUCKET, hiresKey, hiresProcess.stdout);
const proxyUpload = createUploadStream(S3_BUCKET, proxyKey, proxyProcess.stdout);
const processes = { hires: hiresProcess };
const uploads = { hires: hiresUpload };
hiresProcess.stderr.on('data', (data) => {
console.error(`[HIRES] ${data}`);
});
// SDI only: spawn a second FFmpeg process for the proxy.
// DeckLink cards can be opened simultaneously by multiple processes;
// network streams cannot.
if (!isNetwork) {
const proxyProcess = spawn('ffmpeg', [
...inputArgs,
'-c:v', 'libx264',
'-preset', 'fast',
'-b:v', '10M',
'-c:a', 'aac',
'-b:a', '192k',
'-movflags', '+frag_keyframe+empty_moov',
'-f', 'mp4',
'pipe:1',
], {
stdio: ['ignore', 'pipe', 'pipe'],
});
const proxyUpload = createUploadStream(S3_BUCKET, proxyKey, proxyProcess.stdout);
processes.proxy = proxyProcess;
uploads.proxy = proxyUpload;
proxyProcess.stderr.on('data', (data) => {
console.error(`[PROXY] ${data}`);
});
}
this.state.recording = true;
this.state.sessionId = sessionId;
this.state.processes = {
hires: hiresProcess,
proxy: proxyProcess,
};
this.state.processes = processes;
this.state.currentSession = {
sessionId,
projectId,
binId,
clipName,
device,
sourceType,
sourceUrl,
hiresKey,
proxyKey,
startedAt,
duration: 0,
uploads: {
hires: hiresUpload,
proxy: proxyUpload,
},
uploads,
};
// Handle process errors
hiresProcess.stderr.on('data', (data) => {
console.error(`[HIRES] ${data}`);
});
proxyProcess.stderr.on('data', (data) => {
console.error(`[PROXY] ${data}`);
});
return this._formatSessionResponse();
}
@ -106,7 +196,7 @@ class CaptureManager {
const { processes, currentSession } = this.state;
// Send SIGINT to both processes
// Gracefully terminate all FFmpeg processes
if (processes.hires) {
processes.hires.kill('SIGINT');
}
@ -115,13 +205,12 @@ class CaptureManager {
}
try {
// Wait for uploads to complete
if (currentSession.uploads) {
await Promise.all([
currentSession.uploads.hires,
currentSession.uploads.proxy,
]);
// Wait for all in-flight S3 uploads to complete
const uploadPromises = [currentSession.uploads.hires];
if (currentSession.uploads.proxy) {
uploadPromises.push(currentSession.uploads.proxy);
}
await Promise.all(uploadPromises);
} catch (error) {
console.error('Error during upload completion:', error);
}
@ -141,8 +230,9 @@ class CaptureManager {
projectId: currentSession.projectId,
binId: currentSession.binId,
clipName: currentSession.clipName,
sourceType: currentSession.sourceType,
hiresKey: currentSession.hiresKey,
proxyKey: currentSession.proxyKey,
proxyKey: currentSession.proxyKey, // null for SRT/RTMP
startedAt: currentSession.startedAt,
stoppedAt,
duration,
@ -167,6 +257,7 @@ class CaptureManager {
return {
recording: true,
sessionId: this.state.sessionId,
sourceType: this.state.currentSession.sourceType,
device: this.state.currentSession.device,
clipName: this.state.currentSession.clipName,
projectId: this.state.currentSession.projectId,
@ -188,6 +279,7 @@ class CaptureManager {
binId: currentSession.binId,
clipName: currentSession.clipName,
device: currentSession.device,
sourceType: currentSession.sourceType,
hiresKey: currentSession.hiresKey,
proxyKey: currentSession.proxyKey,
startedAt: currentSession.startedAt,