fix(decklink): mount /dev/blackmagic in sidecar + remote node routing via node-agent
Two bugs fixed:
1. SDI capture sidecar never had /dev/blackmagic bound — ffmpeg opened the
decklink input inside a container with no device nodes, so frame=0.
Fix: local spawns now push '/dev/blackmagic:/dev/blackmagic' onto Binds
when source_type='sdi'.
2. recorders.js always spawned sidecars against the local Docker socket
(zampp1), even when a recorder's node_id pointed at zampp2 (where the
card is). Fix: resolveNodeTarget() looks up the recorder's cluster node;
if it's a different hostname the sidecar is spawned via a new
POST /sidecar/start endpoint on the remote node-agent.
node-agent gains three new routes (all talk to the local Docker socket):
POST /sidecar/start — create + start container (host network,
privileged, /dev/blackmagic bind for sdi)
DELETE /sidecar/:id — stop + remove
GET /sidecar/:id/status — inspect + poll capture service
docker-compose.worker.yml: add /var/run/docker.sock and LIVE_DIR to
node-agent so it can spawn sidecars, and document build-capture prerequisite.: index.js
This commit is contained in:
parent
539429c058
commit
8186b181cc
1 changed files with 158 additions and 16 deletions
|
|
@ -7,7 +7,8 @@ const NODE_TOKEN = process.env.NODE_TOKEN || '';
|
||||||
const NODE_ROLE = process.env.NODE_ROLE || 'worker';
|
const NODE_ROLE = process.env.NODE_ROLE || 'worker';
|
||||||
const AGENT_PORT = parseInt(process.env.AGENT_PORT || '7436', 10);
|
const AGENT_PORT = parseInt(process.env.AGENT_PORT || '7436', 10);
|
||||||
const HEARTBEAT_MS = parseInt(process.env.HEARTBEAT_MS || '30000', 10);
|
const HEARTBEAT_MS = parseInt(process.env.HEARTBEAT_MS || '30000', 10);
|
||||||
const VERSION = '1.1.0';
|
const LIVE_DIR = process.env.LIVE_DIR || '/mnt/NVME/MAM/wild-dragon-live';
|
||||||
|
const VERSION = '1.2.0';
|
||||||
|
|
||||||
// Pick the host's LAN IP. Inside a bridge-mode container,
|
// Pick the host's LAN IP. Inside a bridge-mode container,
|
||||||
// os.networkInterfaces() returns the container's docker-bridge IP (172.x),
|
// os.networkInterfaces() returns the container's docker-bridge IP (172.x),
|
||||||
|
|
@ -37,22 +38,128 @@ function getIp() {
|
||||||
return candidates.length ? candidates[0].address : null;
|
return candidates.length ? candidates[0].address : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
const server = http.createServer((req, res) => {
|
// ── Docker API helper (talks to local /var/run/docker.sock) ───────────────
|
||||||
if (req.method === 'GET' && req.url === '/health') {
|
function dockerApi(method, path, body = null) {
|
||||||
res.writeHead(200, { 'Content-Type': 'application/json' });
|
return new Promise((resolve, reject) => {
|
||||||
res.end(JSON.stringify({
|
const options = {
|
||||||
ok: true,
|
socketPath: '/var/run/docker.sock',
|
||||||
hostname: os.hostname(),
|
path: `/v1.43${path}`,
|
||||||
uptime: Math.round(process.uptime()),
|
method,
|
||||||
version: VERSION,
|
headers: { 'Content-Type': 'application/json' },
|
||||||
role: NODE_ROLE,
|
};
|
||||||
ip: getIp(),
|
const req = http.request(options, (res) => {
|
||||||
}));
|
let data = '';
|
||||||
} else {
|
res.on('data', chunk => data += chunk);
|
||||||
res.writeHead(404);
|
res.on('end', () => {
|
||||||
res.end();
|
try { resolve({ status: res.statusCode, data: data ? JSON.parse(data) : {} }); }
|
||||||
|
catch { resolve({ status: res.statusCode, data }); }
|
||||||
|
});
|
||||||
|
});
|
||||||
|
req.on('error', reject);
|
||||||
|
if (body) req.write(JSON.stringify(body));
|
||||||
|
req.end();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function readBody(req) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
let data = '';
|
||||||
|
req.on('data', chunk => data += chunk);
|
||||||
|
req.on('end', () => {
|
||||||
|
try { resolve(JSON.parse(data || '{}')); }
|
||||||
|
catch { reject(new Error('Invalid JSON')); }
|
||||||
|
});
|
||||||
|
req.on('error', reject);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function jsonResponse(res, status, body) {
|
||||||
|
res.writeHead(status, { 'Content-Type': 'application/json' });
|
||||||
|
res.end(JSON.stringify(body));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Sidecar: spawn a capture container on this node ───────────────────────
|
||||||
|
|
||||||
|
async function handleSidecarStart(body, res) {
|
||||||
|
try {
|
||||||
|
const {
|
||||||
|
image = 'wild-dragon-capture:latest',
|
||||||
|
env = [],
|
||||||
|
capturePort = 3001,
|
||||||
|
sourceType = 'sdi',
|
||||||
|
} = body;
|
||||||
|
|
||||||
|
const binds = [`${LIVE_DIR}:/live`];
|
||||||
|
if (sourceType === 'sdi') binds.unshift('/dev/blackmagic:/dev/blackmagic');
|
||||||
|
|
||||||
|
const spec = {
|
||||||
|
Image: image,
|
||||||
|
Env: [...env, `PORT=${capturePort}`],
|
||||||
|
HostConfig: {
|
||||||
|
NetworkMode: 'host',
|
||||||
|
Privileged: true,
|
||||||
|
Binds: binds,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const createRes = await dockerApi('POST', '/containers/create', spec);
|
||||||
|
if (createRes.status !== 201) {
|
||||||
|
return jsonResponse(res, 502, { error: 'Failed to create container', details: createRes.data });
|
||||||
|
}
|
||||||
|
|
||||||
|
const containerId = createRes.data.Id;
|
||||||
|
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
|
||||||
|
if (startRes.status !== 204) {
|
||||||
|
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
||||||
|
return jsonResponse(res, 502, { error: 'Failed to start container', details: startRes.data });
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonResponse(res, 201, { containerId, capturePort });
|
||||||
|
} catch (err) {
|
||||||
|
jsonResponse(res, 500, { error: err.message });
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
|
||||||
|
async function handleSidecarStop(containerId, res) {
|
||||||
|
try {
|
||||||
|
await dockerApi('POST', `/containers/${containerId}/stop`).catch(() => {});
|
||||||
|
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
||||||
|
jsonResponse(res, 200, { ok: true });
|
||||||
|
} catch (err) {
|
||||||
|
jsonResponse(res, 500, { error: err.message });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function handleSidecarStatus(containerId, res) {
|
||||||
|
try {
|
||||||
|
const inspectRes = await dockerApi('GET', `/containers/${containerId}/json`);
|
||||||
|
if (inspectRes.status !== 200) {
|
||||||
|
return jsonResponse(res, 200, { running: false });
|
||||||
|
}
|
||||||
|
|
||||||
|
const container = inspectRes.data;
|
||||||
|
const running = container.State?.Running || false;
|
||||||
|
const startedAt = container.State?.StartedAt;
|
||||||
|
|
||||||
|
// Derive capturePort from the container's env so we know where to poll.
|
||||||
|
const portEnv = (container.Config?.Env || []).find(e => e.startsWith('PORT='));
|
||||||
|
const capturePort = portEnv ? parseInt(portEnv.split('=')[1], 10) : 3001;
|
||||||
|
|
||||||
|
let live = null;
|
||||||
|
if (running) {
|
||||||
|
try {
|
||||||
|
const captureRes = await fetch(`http://127.0.0.1:${capturePort}/capture/status`, {
|
||||||
|
signal: AbortSignal.timeout(2000),
|
||||||
|
});
|
||||||
|
if (captureRes.ok) live = await captureRes.json();
|
||||||
|
} catch (_) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonResponse(res, 200, { running, startedAt, live });
|
||||||
|
} catch (err) {
|
||||||
|
jsonResponse(res, 500, { error: err.message });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── CPU sampling (500ms window) ───────────────────────────────────────────
|
// ── CPU sampling (500ms window) ───────────────────────────────────────────
|
||||||
function sampleCpu() {
|
function sampleCpu() {
|
||||||
|
|
@ -160,6 +267,41 @@ async function heartbeat() {
|
||||||
heartbeat();
|
heartbeat();
|
||||||
setInterval(heartbeat, HEARTBEAT_MS);
|
setInterval(heartbeat, HEARTBEAT_MS);
|
||||||
|
|
||||||
|
// ── HTTP server ───────────────────────────────────────────────────────────
|
||||||
|
const server = http.createServer((req, res) => {
|
||||||
|
const { pathname } = new URL(req.url, 'http://localhost');
|
||||||
|
|
||||||
|
if (req.method === 'GET' && pathname === '/health') {
|
||||||
|
res.writeHead(200, { 'Content-Type': 'application/json' });
|
||||||
|
res.end(JSON.stringify({
|
||||||
|
ok: true,
|
||||||
|
hostname: os.hostname(),
|
||||||
|
uptime: Math.round(process.uptime()),
|
||||||
|
version: VERSION,
|
||||||
|
role: NODE_ROLE,
|
||||||
|
ip: getIp(),
|
||||||
|
}));
|
||||||
|
|
||||||
|
} else if (req.method === 'POST' && pathname === '/sidecar/start') {
|
||||||
|
readBody(req)
|
||||||
|
.then(body => handleSidecarStart(body, res))
|
||||||
|
.catch(() => jsonResponse(res, 400, { error: 'Invalid request body' }));
|
||||||
|
|
||||||
|
} else if (req.method === 'DELETE' && pathname.startsWith('/sidecar/')) {
|
||||||
|
const id = pathname.slice('/sidecar/'.length);
|
||||||
|
if (!id || id.includes('/')) { res.writeHead(404); return res.end(); }
|
||||||
|
handleSidecarStop(id, res);
|
||||||
|
|
||||||
|
} else if (req.method === 'GET' && /^\/sidecar\/[^/]+\/status$/.test(pathname)) {
|
||||||
|
const id = pathname.slice('/sidecar/'.length, -'/status'.length);
|
||||||
|
handleSidecarStatus(id, res);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
res.writeHead(404);
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
server.listen(AGENT_PORT, () => {
|
server.listen(AGENT_PORT, () => {
|
||||||
console.log(`wild-dragon-node-agent v${VERSION}`);
|
console.log(`wild-dragon-node-agent v${VERSION}`);
|
||||||
console.log(` Listening :${AGENT_PORT}`);
|
console.log(` Listening :${AGENT_PORT}`);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue