feat(node-agent): containerized decklink-bridge + async bridge management

This commit is contained in:
Wild Dragon Dev 2026-06-04 00:46:19 +00:00
parent 8ca7c79acd
commit 6ee8dd5694

View file

@ -150,78 +150,111 @@ function stopNetIngest(containerId) {
} }
// ── DeckLink bridge ─────────────────────────────────────────────────────── // ── DeckLink bridge ───────────────────────────────────────────────────────
// One decklink-bridge process per node, managing all DeckLink devices. // One decklink-bridge container per node, managing all DeckLink devices.
// Mirrors the deltacast-bridge singleton pattern. // Mirrors the deltacast-bridge singleton pattern.
const DL_BRIDGE_BIN = process.env.DECKLINK_BRIDGE_BIN || 'decklink-bridge';
const DL_AUDIO_DIR = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink'; const DL_AUDIO_DIR = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink';
let _dlBridge = null; // ChildProcess | null let _dlBridgeId = null; // containerId | null
let _dlSidecarCount = 0; let _dlSidecarCount = 0;
// device_idx -> fmt JSON from bridge stderr // device_idx -> fmt JSON from bridge stderr
const _dlDevFmt = new Map(); const _dlDevFmt = new Map();
function _dlBridgeRunning() { async function _dlBridgeRunning() {
return _dlBridge !== null && _dlBridge.exitCode === null && _dlBridge.signalCode === null; if (!_dlBridgeId) return false;
try {
const res = await dockerApi('GET', `/containers/${_dlBridgeId}/json`);
return res.status === 200 && res.data.State?.Running;
} catch (_) { return false; }
} }
function startDecklinkBridge(deviceIndices) { /**
if (_dlBridgeRunning()) return; * Connect to container stderr stream and parse format JSONs.
*/
function _attachDlBridgeLogs(containerId) {
const options = {
socketPath: '/var/run/docker.sock',
path: `/v1.43/containers/${containerId}/attach?stderr=1&stream=1`,
method: 'POST',
};
const req = http.request(options, (res) => {
res.on('data', (chunk) => {
// Docker multiplexed stream header: [1/2, 0, 0, 0, size_32be]
let offset = 0;
while (offset + 8 <= chunk.length) {
const size = chunk.readUInt32BE(offset + 4);
const end = offset + 8 + size;
if (end > chunk.length) break;
const text = chunk.toString('utf8', offset + 8, end);
for (const line of text.split('\n')) {
const t = line.trim();
if (!t || !t.startsWith('{')) continue;
try {
const f = JSON.parse(t);
if (typeof f.device === 'number') _dlDevFmt.set(f.device, f);
} catch (_) {}
}
offset = end;
}
});
});
req.on('error', (err) => console.error(`[dl-bridge] log attach error: ${err.message}`));
req.end();
}
try { require('fs').mkdirSync(DL_AUDIO_DIR, { recursive: true }); } catch (_) {} async function startDecklinkBridge(deviceIndices) {
if (await _dlBridgeRunning()) return;
const devCsv = Array.isArray(deviceIndices) ? deviceIndices.join(',') : String(deviceIndices || '0'); const devCsv = Array.isArray(deviceIndices) ? deviceIndices.join(',') : String(deviceIndices || '0');
const args = [ const DL_IMAGE = 'wild-dragon-capture:latest';
'--devices', devCsv, const DL_BIN = '/usr/local/bin/decklink-bridge';
'--fc-url', FC_URL,
'--audio-pipe-dir', DL_AUDIO_DIR,
];
console.log(`[dl-bridge] launching: ${DL_BRIDGE_BIN} ${args.join(' ')}`);
const proc = spawn(DL_BRIDGE_BIN, args, { console.log(`[dl-bridge] spawning containerized bridge for devices: ${devCsv}`);
stdio: ['ignore', 'ignore', 'pipe'],
detached: false,
env: { ...process.env, NODE_ID: FC_NODE_ID, FC_URL },
});
proc.stderr.setEncoding('utf8'); const spec = {
proc.stderr.on('data', (chunk) => { Image: DL_IMAGE,
for (const line of chunk.split('\n')) { Entrypoint: [DL_BIN],
const t = line.trim(); Cmd: ['--devices', devCsv, '--fc-url', FC_URL, '--audio-pipe-dir', DL_AUDIO_DIR],
if (!t) continue; Env: [`NODE_ID=${FC_NODE_ID}`, `FC_URL=${FC_URL}`],
if (t.startsWith('{')) { HostConfig: {
console.log('[dl-bridge] ' + t); NetworkMode: 'host',
try { Privileged: true,
const f = JSON.parse(t); Binds: ['/dev:/dev', '/dev/shm:/dev/shm'],
if (typeof f.device === 'number') _dlDevFmt.set(f.device, f); RestartPolicy: { Name: 'unless-stopped' },
} catch (_) {} },
} else { };
console.error('[dl-bridge] ' + t);
} try {
const createRes = await dockerApi('POST', '/containers/create?name=decklink-bridge', spec);
if (createRes.status !== 201 && createRes.status !== 409) {
console.error('[dl-bridge] create failed:', createRes.data);
return;
} }
});
proc.on('error', (err) => { const containerId = createRes.status === 409 ? 'decklink-bridge' : createRes.data.Id;
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
if (startRes.status !== 204 && startRes.status !== 304) {
console.error('[dl-bridge] start failed:', startRes.data);
return;
}
_dlBridgeId = containerId;
_attachDlBridgeLogs(containerId);
console.log(`[dl-bridge] running in container ${containerId}`);
} catch (err) {
console.error(`[dl-bridge] spawn error: ${err.message}`); console.error(`[dl-bridge] spawn error: ${err.message}`);
_dlBridge = null; }
});
proc.on('exit', (code, sig) => {
console.error(`[dl-bridge] exited code=${code} signal=${sig}`);
_dlBridge = null;
});
_dlBridge = proc;
console.log(`[dl-bridge] pid=${proc.pid} devices=${devCsv}`);
} }
function stopDecklinkBridge() { async function stopDecklinkBridge() {
if (!_dlBridgeRunning()) return; if (!_dlBridgeId) return;
console.log('[dl-bridge] stopping'); console.log('[dl-bridge] stopping container');
try { _dlBridge.kill('SIGTERM'); } catch (_) {} try {
const proc = _dlBridge; await dockerApi('POST', `/containers/${_dlBridgeId}/stop?t=5`);
setTimeout(() => { await dockerApi('DELETE', `/containers/${_dlBridgeId}?force=true`);
try { if (proc.exitCode === null) proc.kill('SIGKILL'); } catch (_) {} } catch (err) {
}, 5000); console.error(`[dl-bridge] stop error: ${err.message}`);
_dlBridge = null; }
_dlBridgeId = null;
} }
function _dcBridgeRunning() { function _dcBridgeRunning() {
@ -542,7 +575,7 @@ async function handleSidecarStart(body, res) {
const _bmdEntries = fs.readdirSync(_bmdDir).filter(n => /^(dv|io)\d+$/.test(n)); const _bmdEntries = fs.readdirSync(_bmdDir).filter(n => /^(dv|io)\d+$/.test(n));
_bmdEntries.forEach((_, i) => _bmdDevices.push(i)); _bmdEntries.forEach((_, i) => _bmdDevices.push(i));
} catch (_) { _bmdDevices.push(0); } } catch (_) { _bmdDevices.push(0); }
startDecklinkBridge(_bmdDevices); await startDecklinkBridge(_bmdDevices);
const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14); const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14);
let _devIdx = NaN; let _devIdx = NaN;
@ -714,7 +747,8 @@ async function handleSidecarStandby(body, res) {
const _bmdEntries = fs.readdirSync('/dev/blackmagic').filter(n => /^(dv|io)\d+$/.test(n)); const _bmdEntries = fs.readdirSync('/dev/blackmagic').filter(n => /^(dv|io)\d+$/.test(n));
_bmdEntries.forEach((_, i) => _bmdDevices.push(i)); _bmdEntries.forEach((_, i) => _bmdDevices.push(i));
} catch (_) { _bmdDevices.push(0); } } catch (_) { _bmdDevices.push(0); }
startDecklinkBridge(_bmdDevices); await startDecklinkBridge(_bmdDevices);
const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14); const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14);
let _devIdx = NaN; let _devIdx = NaN;
try { _devIdx = JSON.parse(_srcCfg).device ?? JSON.parse(_srcCfg).index; } catch (_) {} try { _devIdx = JSON.parse(_srcCfg).device ?? JSON.parse(_srcCfg).index; } catch (_) {}