From 6ee8dd5694f8833706b238e03b25be5aa9db117d Mon Sep 17 00:00:00 2001 From: Wild Dragon Dev Date: Thu, 4 Jun 2026 00:46:19 +0000 Subject: [PATCH] feat(node-agent): containerized decklink-bridge + async bridge management --- services/node-agent/index.js | 144 ++++++++++++++++++++++------------- 1 file changed, 89 insertions(+), 55 deletions(-) diff --git a/services/node-agent/index.js b/services/node-agent/index.js index 3d6dac9..569e88f 100644 --- a/services/node-agent/index.js +++ b/services/node-agent/index.js @@ -150,78 +150,111 @@ function stopNetIngest(containerId) { } // ── DeckLink bridge ─────────────────────────────────────────────────────── -// One decklink-bridge process per node, managing all DeckLink devices. +// One decklink-bridge container per node, managing all DeckLink devices. // Mirrors the deltacast-bridge singleton pattern. -const DL_BRIDGE_BIN = process.env.DECKLINK_BRIDGE_BIN || 'decklink-bridge'; const DL_AUDIO_DIR = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink'; -let _dlBridge = null; // ChildProcess | null +let _dlBridgeId = null; // containerId | null let _dlSidecarCount = 0; // device_idx -> fmt JSON from bridge stderr const _dlDevFmt = new Map(); -function _dlBridgeRunning() { - return _dlBridge !== null && _dlBridge.exitCode === null && _dlBridge.signalCode === null; +async function _dlBridgeRunning() { + if (!_dlBridgeId) return false; + try { + const res = await dockerApi('GET', `/containers/${_dlBridgeId}/json`); + return res.status === 200 && res.data.State?.Running; + } catch (_) { return false; } } -function startDecklinkBridge(deviceIndices) { - if (_dlBridgeRunning()) return; +/** + * Connect to container stderr stream and parse format JSONs. + */ +function _attachDlBridgeLogs(containerId) { + const options = { + socketPath: '/var/run/docker.sock', + path: `/v1.43/containers/${containerId}/attach?stderr=1&stream=1`, + method: 'POST', + }; + const req = http.request(options, (res) => { + res.on('data', (chunk) => { + // Docker multiplexed stream header: [1/2, 0, 0, 0, size_32be] + let offset = 0; + while (offset + 8 <= chunk.length) { + const size = chunk.readUInt32BE(offset + 4); + const end = offset + 8 + size; + if (end > chunk.length) break; + const text = chunk.toString('utf8', offset + 8, end); + for (const line of text.split('\n')) { + const t = line.trim(); + if (!t || !t.startsWith('{')) continue; + try { + const f = JSON.parse(t); + if (typeof f.device === 'number') _dlDevFmt.set(f.device, f); + } catch (_) {} + } + offset = end; + } + }); + }); + req.on('error', (err) => console.error(`[dl-bridge] log attach error: ${err.message}`)); + req.end(); +} - try { require('fs').mkdirSync(DL_AUDIO_DIR, { recursive: true }); } catch (_) {} +async function startDecklinkBridge(deviceIndices) { + if (await _dlBridgeRunning()) return; const devCsv = Array.isArray(deviceIndices) ? deviceIndices.join(',') : String(deviceIndices || '0'); - const args = [ - '--devices', devCsv, - '--fc-url', FC_URL, - '--audio-pipe-dir', DL_AUDIO_DIR, - ]; - console.log(`[dl-bridge] launching: ${DL_BRIDGE_BIN} ${args.join(' ')}`); + const DL_IMAGE = 'wild-dragon-capture:latest'; + const DL_BIN = '/usr/local/bin/decklink-bridge'; + + console.log(`[dl-bridge] spawning containerized bridge for devices: ${devCsv}`); - const proc = spawn(DL_BRIDGE_BIN, args, { - stdio: ['ignore', 'ignore', 'pipe'], - detached: false, - env: { ...process.env, NODE_ID: FC_NODE_ID, FC_URL }, - }); + const spec = { + Image: DL_IMAGE, + Entrypoint: [DL_BIN], + Cmd: ['--devices', devCsv, '--fc-url', FC_URL, '--audio-pipe-dir', DL_AUDIO_DIR], + Env: [`NODE_ID=${FC_NODE_ID}`, `FC_URL=${FC_URL}`], + HostConfig: { + NetworkMode: 'host', + Privileged: true, + Binds: ['/dev:/dev', '/dev/shm:/dev/shm'], + RestartPolicy: { Name: 'unless-stopped' }, + }, + }; - proc.stderr.setEncoding('utf8'); - proc.stderr.on('data', (chunk) => { - for (const line of chunk.split('\n')) { - const t = line.trim(); - if (!t) continue; - if (t.startsWith('{')) { - console.log('[dl-bridge] ' + t); - try { - const f = JSON.parse(t); - if (typeof f.device === 'number') _dlDevFmt.set(f.device, f); - } catch (_) {} - } else { - console.error('[dl-bridge] ' + t); - } + try { + const createRes = await dockerApi('POST', '/containers/create?name=decklink-bridge', spec); + if (createRes.status !== 201 && createRes.status !== 409) { + console.error('[dl-bridge] create failed:', createRes.data); + return; } - }); - proc.on('error', (err) => { + const containerId = createRes.status === 409 ? 'decklink-bridge' : createRes.data.Id; + const startRes = await dockerApi('POST', `/containers/${containerId}/start`); + if (startRes.status !== 204 && startRes.status !== 304) { + console.error('[dl-bridge] start failed:', startRes.data); + return; + } + + _dlBridgeId = containerId; + _attachDlBridgeLogs(containerId); + console.log(`[dl-bridge] running in container ${containerId}`); + } catch (err) { console.error(`[dl-bridge] spawn error: ${err.message}`); - _dlBridge = null; - }); - proc.on('exit', (code, sig) => { - console.error(`[dl-bridge] exited code=${code} signal=${sig}`); - _dlBridge = null; - }); - - _dlBridge = proc; - console.log(`[dl-bridge] pid=${proc.pid} devices=${devCsv}`); + } } -function stopDecklinkBridge() { - if (!_dlBridgeRunning()) return; - console.log('[dl-bridge] stopping'); - try { _dlBridge.kill('SIGTERM'); } catch (_) {} - const proc = _dlBridge; - setTimeout(() => { - try { if (proc.exitCode === null) proc.kill('SIGKILL'); } catch (_) {} - }, 5000); - _dlBridge = null; +async function stopDecklinkBridge() { + if (!_dlBridgeId) return; + console.log('[dl-bridge] stopping container'); + try { + await dockerApi('POST', `/containers/${_dlBridgeId}/stop?t=5`); + await dockerApi('DELETE', `/containers/${_dlBridgeId}?force=true`); + } catch (err) { + console.error(`[dl-bridge] stop error: ${err.message}`); + } + _dlBridgeId = null; } function _dcBridgeRunning() { @@ -542,7 +575,7 @@ async function handleSidecarStart(body, res) { const _bmdEntries = fs.readdirSync(_bmdDir).filter(n => /^(dv|io)\d+$/.test(n)); _bmdEntries.forEach((_, i) => _bmdDevices.push(i)); } catch (_) { _bmdDevices.push(0); } - startDecklinkBridge(_bmdDevices); + await startDecklinkBridge(_bmdDevices); const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14); let _devIdx = NaN; @@ -714,7 +747,8 @@ async function handleSidecarStandby(body, res) { const _bmdEntries = fs.readdirSync('/dev/blackmagic').filter(n => /^(dv|io)\d+$/.test(n)); _bmdEntries.forEach((_, i) => _bmdDevices.push(i)); } catch (_) { _bmdDevices.push(0); } - startDecklinkBridge(_bmdDevices); + await startDecklinkBridge(_bmdDevices); + const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14); let _devIdx = NaN; try { _devIdx = JSON.parse(_srcCfg).device ?? JSON.parse(_srcCfg).index; } catch (_) {}