fix(decklink): mount /dev/blackmagic in sidecar + remote node routing via node-agent

Two bugs fixed:
1. SDI capture sidecar never had /dev/blackmagic bound — ffmpeg opened the
   decklink input inside a container with no device nodes, so frame=0.
   Fix: local spawns now push '/dev/blackmagic:/dev/blackmagic' onto Binds
   when source_type='sdi'.

2. recorders.js always spawned sidecars against the local Docker socket
   (zampp1), even when a recorder's node_id pointed at zampp2 (where the
   card is). Fix: resolveNodeTarget() looks up the recorder's cluster node;
   if it's a different hostname the sidecar is spawned via a new
   POST /sidecar/start endpoint on the remote node-agent.

node-agent gains three new routes (all talk to the local Docker socket):
  POST   /sidecar/start         — create + start container (host network,
                                   privileged, /dev/blackmagic bind for sdi)
  DELETE /sidecar/:id           — stop + remove
  GET    /sidecar/:id/status    — inspect + poll capture service

docker-compose.worker.yml: add /var/run/docker.sock and LIVE_DIR to
node-agent so it can spawn sidecars, and document build-capture prerequisite.: recorders.js
This commit is contained in:
Zac Gaetano 2026-05-21 18:51:10 -04:00
parent 8186b181cc
commit bbed2a7059

View file

@ -8,6 +8,10 @@ const router = express.Router();
router.use(requireAuth); router.use(requireAuth);
// Base port for on-demand SDI sidecar containers on remote worker nodes.
// Device index 0 → 7438, index 1 → 7439, etc.
const SIDECAR_PORT_BASE = 7438;
// Docker API helper function // Docker API helper function
function dockerApi(method, path, body = null) { function dockerApi(method, path, body = null) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
@ -34,6 +38,22 @@ function dockerApi(method, path, body = null) {
}); });
} }
// Look up the cluster node for a recorder and decide if it is remote.
// Returns { remote: false } when the node is local or unset;
// { remote: true, apiUrl, ip } when it is a different host.
async function resolveNodeTarget(nodeId) {
if (!nodeId) return { remote: false };
const r = await pool.query(
'SELECT hostname, ip_address, api_url FROM cluster_nodes WHERE id = $1',
[nodeId]
);
if (r.rows.length === 0) return { remote: false };
const node = r.rows[0];
const localHostname = process.env.NODE_HOSTNAME || '';
if (!node.api_url || node.hostname === localHostname) return { remote: false };
return { remote: true, apiUrl: node.api_url, ip: node.ip_address };
}
// Helper function to generate clip name with timestamp // Helper function to generate clip name with timestamp
function generateClipName(recorderName) { function generateClipName(recorderName) {
const now = new Date(); const now = new Date();
@ -284,8 +304,6 @@ router.post('/:id/start', async (req, res, next) => {
const sourceType = recorder.source_type; const sourceType = recorder.source_type;
const deviceIndex = recorder.device_index ?? sourceConfig.device ?? 0; const deviceIndex = recorder.device_index ?? sourceConfig.device ?? 0;
const { portBindings, exposedPorts } = buildPortConfig(sourceType, sourceConfig);
// Build container env — all codec controls flow through here. // Build container env — all codec controls flow through here.
const env = [ const env = [
`S3_ENDPOINT=${s3Endpoint}`, `S3_ENDPOINT=${s3Endpoint}`,
@ -337,7 +355,34 @@ router.post('/:id/start', async (req, res, next) => {
} }
} }
// Determine whether to spawn locally or via a remote node-agent.
const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(recorder.node_id);
let containerId;
if (isRemote) {
// Remote node: delegate container lifecycle to that node's agent.
const capturePort = SIDECAR_PORT_BASE + (deviceIndex || 0);
const sidecarRes = await fetch(`${targetNodeApiUrl}/sidecar/start`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ image: 'wild-dragon-capture:latest', env, capturePort, sourceType }),
signal: AbortSignal.timeout(15000),
});
if (!sidecarRes.ok) {
const details = await sidecarRes.json().catch(() => ({}));
return res.status(502).json({ error: 'Remote node failed to start sidecar', details });
}
const sidecarData = await sidecarRes.json();
containerId = sidecarData.containerId;
} else {
// Local spawn via Docker socket.
const { portBindings, exposedPorts } = buildPortConfig(sourceType, sourceConfig);
const alias = `recorder-${id}`; const alias = `recorder-${id}`;
const hostBinds = ['/mnt/NVME/MAM/wild-dragon-live:/live'];
if (sourceType === 'sdi') hostBinds.push('/dev/blackmagic:/dev/blackmagic');
const containerConfig = { const containerConfig = {
Image: 'wild-dragon-capture:latest', Image: 'wild-dragon-capture:latest',
Env: env, Env: env,
@ -346,7 +391,7 @@ router.post('/:id/start', async (req, res, next) => {
Privileged: true, Privileged: true,
NetworkMode: dockerNetwork, NetworkMode: dockerNetwork,
PortBindings: Object.keys(portBindings).length > 0 ? portBindings : undefined, PortBindings: Object.keys(portBindings).length > 0 ? portBindings : undefined,
Binds: ['/mnt/NVME/MAM/wild-dragon-live:/live'], Binds: hostBinds,
}, },
NetworkingConfig: { NetworkingConfig: {
EndpointsConfig: { EndpointsConfig: {
@ -357,7 +402,6 @@ router.post('/:id/start', async (req, res, next) => {
}; };
const createRes = await dockerApi('POST', '/containers/create', containerConfig); const createRes = await dockerApi('POST', '/containers/create', containerConfig);
if (createRes.status !== 201) { if (createRes.status !== 201) {
return res.status(500).json({ return res.status(500).json({
error: 'Failed to create container', error: 'Failed to create container',
@ -365,15 +409,15 @@ router.post('/:id/start', async (req, res, next) => {
}); });
} }
const containerId = createRes.data.Id; containerId = createRes.data.Id;
const startRes = await dockerApi('POST', `/containers/${containerId}/start`); const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
if (startRes.status !== 204) { if (startRes.status !== 204) {
return res.status(500).json({ return res.status(500).json({
error: 'Failed to start container', error: 'Failed to start container',
details: startRes.data, details: startRes.data,
}); });
} }
}
const updateResult = await pool.query( const updateResult = await pool.query(
`UPDATE recorders `UPDATE recorders
@ -409,6 +453,17 @@ router.post('/:id/stop', async (req, res, next) => {
return res.status(400).json({ error: 'No container running' }); return res.status(400).json({ error: 'No container running' });
} }
const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(recorder.node_id);
if (isRemote) {
const stopRes = await fetch(`${targetNodeApiUrl}/sidecar/${recorder.container_id}`, {
method: 'DELETE',
signal: AbortSignal.timeout(15000),
});
if (!stopRes.ok && stopRes.status !== 404) {
return res.status(502).json({ error: 'Remote node failed to stop sidecar' });
}
} else {
const stopRes = await dockerApi( const stopRes = await dockerApi(
'POST', 'POST',
`/containers/${recorder.container_id}/stop` `/containers/${recorder.container_id}/stop`
@ -432,6 +487,7 @@ router.post('/:id/stop', async (req, res, next) => {
details: removeRes.data, details: removeRes.data,
}); });
} }
}
const updateResult = await pool.query( const updateResult = await pool.query(
`UPDATE recorders `UPDATE recorders
@ -471,6 +527,30 @@ router.get('/:id/status', async (req, res, next) => {
}); });
} }
const deviceIndex = recorder.device_index ?? (recorder.source_config?.device ?? 0);
const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(recorder.node_id);
let isRunning = false;
let duration = 0;
let signal = 'connecting';
let signalKnown = false;
let live = null;
if (isRemote) {
try {
const statusRes = await fetch(`${targetNodeApiUrl}/sidecar/${recorder.container_id}/status`, {
signal: AbortSignal.timeout(4000),
});
if (statusRes.ok) {
const data = await statusRes.json();
isRunning = data.running;
if (data.startedAt) {
duration = Math.floor((Date.now() - new Date(data.startedAt).getTime()) / 1000);
}
live = data.live;
}
} catch (_) { /* node unreachable */ }
} else {
const inspectRes = await dockerApi( const inspectRes = await dockerApi(
'GET', 'GET',
`/containers/${recorder.container_id}/json` `/containers/${recorder.container_id}/json`
@ -485,23 +565,21 @@ router.get('/:id/status', async (req, res, next) => {
} }
const container = inspectRes.data; const container = inspectRes.data;
const startedAt = new Date(container.State.StartedAt).getTime(); isRunning = container.State.Running;
const now = Date.now(); duration = Math.floor((Date.now() - new Date(container.State.StartedAt).getTime()) / 1000);
const duration = Math.floor((now - startedAt) / 1000);
let signal = container.State.Running ? 'receiving' : 'stopped';
let signalKnown = false;
let live = null;
try { try {
const captureRes = await fetch(`http://recorder-${id}:3001/capture/status`, { signal: AbortSignal.timeout(2000) }); const captureRes = await fetch(`http://recorder-${id}:3001/capture/status`, { signal: AbortSignal.timeout(2000) });
if (captureRes.ok) { if (captureRes.ok) live = await captureRes.json();
live = await captureRes.json();
if (live && live.signal) { signal = live.signal; signalKnown = true; }
}
} catch (_) { /* not ready yet */ } } catch (_) { /* not ready yet */ }
}
if (isRunning) signal = 'receiving';
if (!isRunning) signal = 'stopped';
if (live && live.signal) { signal = live.signal; signalKnown = true; }
res.json({ res.json({
status: container.State.Running ? 'recording' : 'stopped', status: isRunning ? 'recording' : 'stopped',
duration, duration,
containerId: recorder.container_id, containerId: recorder.container_id,
signal, signal,
@ -533,9 +611,17 @@ router.delete('/:id', async (req, res, next) => {
const recorder = recorderResult.rows[0]; const recorder = recorderResult.rows[0];
if (recorder.container_id) { if (recorder.container_id) {
const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(recorder.node_id);
try { try {
if (isRemote) {
await fetch(`${targetNodeApiUrl}/sidecar/${recorder.container_id}`, {
method: 'DELETE',
signal: AbortSignal.timeout(10000),
});
} else {
await dockerApi('POST', `/containers/${recorder.container_id}/stop`); await dockerApi('POST', `/containers/${recorder.container_id}/stop`);
await dockerApi('DELETE', `/containers/${recorder.container_id}`); await dockerApi('DELETE', `/containers/${recorder.container_id}`);
}
} catch (err) { } catch (err) {
console.error('Error stopping container during delete:', err); console.error('Error stopping container during delete:', err);
} }