dragonflight/services/capture/src/capture-manager.js

328 lines
9.6 KiB
JavaScript
Raw Normal View History

import { spawn } from 'child_process';
import { v4 as uuidv4 } from 'uuid';
import { createUploadStream } from './s3/client.js';
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
class CaptureManager {
constructor() {
this.state = {
recording: false,
sessionId: null,
processes: {},
currentSession: {},
2026-05-17 07:39:19 -04:00
// Live signal metrics derived from ffmpeg stderr
framesReceived: 0,
currentFps: 0,
lastFrameAt: null,
lastError: null,
};
}
/**
* Build FFmpeg input arguments based on source type.
* Returns { inputArgs, isNetwork }
* @private
*/
_buildInputArgs({ sourceType, device, sourceUrl, listen, listenPort, streamKey }) {
if (sourceType === 'srt') {
let url;
if (listen) {
const port = listenPort || 9000;
url = `srt://0.0.0.0:${port}?mode=listener`;
} else {
// Caller mode — ensure mode=caller is appended if not already present
url = sourceUrl;
if (!url.includes('mode=')) {
url += (url.includes('?') ? '&' : '?') + 'mode=caller';
}
}
2026-05-17 07:39:19 -04:00
return { inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-i', url], isNetwork: true };
}
if (sourceType === 'rtmp') {
if (listen) {
const port = listenPort || 1935;
const key = streamKey || 'stream';
return {
2026-05-17 07:39:19 -04:00
inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-listen', '1', '-i', `rtmp://0.0.0.0:${port}/live/${key}`],
isNetwork: true,
};
}
2026-05-17 07:39:19 -04:00
return { inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-i', sourceUrl], isNetwork: true };
}
// Default: SDI via DeckLink
return {
inputArgs: ['-f', 'decklink', '-i', String(device)],
isNetwork: false,
};
}
/**
* Start a new capture session
* @param {Object} params
* - projectId, binId, clipName always required
* - device DeckLink device index (SDI only)
* - sourceType 'sdi' | 'srt' | 'rtmp' (default: 'sdi')
* - sourceUrl URL for caller mode (SRT/RTMP caller)
* - listen true for listener/server mode
* - listenPort port to bind in listener mode
* - streamKey RTMP stream key for listener mode
* @returns {Object} Session info
*/
async start({
projectId,
binId,
clipName,
device,
sourceType = 'sdi',
sourceUrl,
listen = false,
listenPort,
streamKey,
}) {
if (this.state.recording) {
throw new Error('Capture already in progress');
}
const sessionId = uuidv4();
const hiresKey = `projects/${projectId}/masters/${clipName}.mov`;
// Network sources cannot be opened by two FFmpeg processes simultaneously.
// proxyKey is null for SRT/RTMP — the BullMQ worker generates the proxy
// after the recording stops (same pipeline used for uploaded files).
const proxyKey = sourceType === 'sdi'
? `projects/${projectId}/proxies/${clipName}.mp4`
: null;
const startedAt = new Date().toISOString();
const { inputArgs, isNetwork } = this._buildInputArgs({
sourceType,
device,
sourceUrl,
listen,
listenPort,
streamKey,
});
// ProRes hires — fragmented moov for pipe-safe output on network sources
const hiresCodecArgs = isNetwork
? [
2026-05-17 07:39:19 -04:00
'-map', '0:v:0?',
'-map', '0:a:0?',
'-c:v', 'prores_ks',
'-profile:v', '3',
'-c:a', 'pcm_s24le',
'-movflags', '+frag_keyframe+empty_moov',
'-f', 'mov',
]
: [
'-c:v', 'prores_ks',
'-profile:v', '3',
'-c:a', 'pcm_s24le',
'-f', 'mov',
];
// Spawn hires FFmpeg process
const hiresProcess = spawn('ffmpeg', [
...inputArgs,
...hiresCodecArgs,
'pipe:1',
], {
stdio: ['ignore', 'pipe', 'pipe'],
});
const hiresUpload = createUploadStream(S3_BUCKET, hiresKey, hiresProcess.stdout);
const processes = { hires: hiresProcess };
const uploads = { hires: hiresUpload };
hiresProcess.stderr.on('data', (data) => {
2026-05-17 07:39:19 -04:00
const text = data.toString();
console.error(`[HIRES] ${text}`);
// Track stream signal: ffmpeg prints "frame= 123 fps= 30 ..." every ~1s
const m = text.match(/frame=\s*(\d+)\s+fps=\s*([\d.]+)/);
if (m) {
this.state.framesReceived = parseInt(m[1], 10);
this.state.currentFps = parseFloat(m[2]);
this.state.lastFrameAt = new Date().toISOString();
}
// Surface fatal-looking lines for the status endpoint
if (/Connection refused|No route to host|Connection failed|Input\/output error|Server returned|404 Not Found|Connection timed out/i.test(text)) {
this.state.lastError = text.trim().slice(0, 240);
}
});
// SDI only: spawn a second FFmpeg process for the proxy.
// DeckLink cards can be opened simultaneously by multiple processes;
// network streams cannot.
if (!isNetwork) {
const proxyProcess = spawn('ffmpeg', [
...inputArgs,
'-c:v', 'libx264',
'-preset', 'fast',
'-b:v', '10M',
'-c:a', 'aac',
'-b:a', '192k',
'-movflags', '+frag_keyframe+empty_moov',
'-f', 'mp4',
'pipe:1',
], {
stdio: ['ignore', 'pipe', 'pipe'],
});
const proxyUpload = createUploadStream(S3_BUCKET, proxyKey, proxyProcess.stdout);
processes.proxy = proxyProcess;
uploads.proxy = proxyUpload;
proxyProcess.stderr.on('data', (data) => {
console.error(`[PROXY] ${data}`);
});
}
this.state.recording = true;
this.state.sessionId = sessionId;
this.state.processes = processes;
2026-05-17 07:39:19 -04:00
this.state.framesReceived = 0;
this.state.currentFps = 0;
this.state.lastFrameAt = null;
this.state.lastError = null;
this.state.currentSession = {
sessionId,
projectId,
binId,
clipName,
device,
sourceType,
sourceUrl,
hiresKey,
proxyKey,
startedAt,
duration: 0,
uploads,
};
return this._formatSessionResponse();
}
/**
* Stop the current capture session
* @param {string} sessionId - Session ID to stop
* @returns {Object} Completed session info
*/
async stop(sessionId) {
if (!this.state.recording || this.state.sessionId !== sessionId) {
throw new Error('No active capture session or session ID mismatch');
}
const { processes, currentSession } = this.state;
// Gracefully terminate all FFmpeg processes
if (processes.hires) {
processes.hires.kill('SIGINT');
}
if (processes.proxy) {
processes.proxy.kill('SIGINT');
}
try {
// Wait for all in-flight S3 uploads to complete
const uploadPromises = [currentSession.uploads.hires];
if (currentSession.uploads.proxy) {
uploadPromises.push(currentSession.uploads.proxy);
}
await Promise.all(uploadPromises);
} catch (error) {
console.error('Error during upload completion:', error);
}
const stoppedAt = new Date().toISOString();
const startTime = new Date(currentSession.startedAt);
const stopTime = new Date(stoppedAt);
const duration = Math.round((stopTime - startTime) / 1000);
// Reset state
this.state.recording = false;
this.state.sessionId = null;
this.state.processes = {};
return {
sessionId,
projectId: currentSession.projectId,
binId: currentSession.binId,
clipName: currentSession.clipName,
sourceType: currentSession.sourceType,
hiresKey: currentSession.hiresKey,
proxyKey: currentSession.proxyKey, // null for SRT/RTMP
startedAt: currentSession.startedAt,
stoppedAt,
duration,
};
}
/**
* Get current capture status
* @returns {Object} Current state
*/
getStatus() {
if (!this.state.recording) {
return {
recording: false,
};
}
const startTime = new Date(this.state.currentSession.startedAt);
const now = new Date();
const duration = Math.round((now - startTime) / 1000);
2026-05-17 07:39:19 -04:00
const lastFrameAt = this.state.lastFrameAt;
const msSinceFrame = lastFrameAt ? (Date.now() - new Date(lastFrameAt).getTime()) : null;
let signal = 'connecting';
if (this.state.framesReceived > 0) {
signal = (msSinceFrame !== null && msSinceFrame < 5000) ? 'receiving' : 'lost';
} else if (this.state.lastError) {
signal = 'error';
}
return {
recording: true,
sessionId: this.state.sessionId,
sourceType: this.state.currentSession.sourceType,
device: this.state.currentSession.device,
clipName: this.state.currentSession.clipName,
projectId: this.state.currentSession.projectId,
binId: this.state.currentSession.binId,
duration,
startedAt: this.state.currentSession.startedAt,
2026-05-17 07:39:19 -04:00
signal,
framesReceived: this.state.framesReceived,
currentFps: this.state.currentFps,
lastFrameAt,
msSinceFrame,
lastError: this.state.lastError,
};
}
/**
* Format session response
* @private
*/
_formatSessionResponse() {
const { currentSession, sessionId } = this.state;
return {
sessionId,
projectId: currentSession.projectId,
binId: currentSession.binId,
clipName: currentSession.clipName,
device: currentSession.device,
sourceType: currentSession.sourceType,
hiresKey: currentSession.hiresKey,
proxyKey: currentSession.proxyKey,
startedAt: currentSession.startedAt,
};
}
}
export default new CaptureManager();