dragonflight/services/playout/src/amcp.js
Zac d62af34e98 feat(playout): CasparCG sidecar image + Node AMCP shim
One container per channel. Built like capture/build-with-decklink: NDI +
DeckLink SDKs fetched at build, runs --privileged with Xvfb for the GL
context where no real display is present.

Components:
- entrypoint.sh: Xvfb + CasparCG launch, creates /media/live/<CHANNEL_ID>
- src/amcp.js: TCP AMCP client
- src/playout-manager.js: channel lifecycle, playlist walk via LOADBG AUTO
  for gapless transitions; primary consumer (decklink/ndi/srt/rtmp) plus a
  second FFMPEG HLS consumer (~600 kbps, 2s segments) for the UI preview
- src/index.js: HTTP shim — /channel/start, /playlist/load, transport
- frame-rate helper picks fps from video_format (59.94 → 60000/1001) so
  SEEK / LENGTH frame math is correct
2026-05-30 14:02:25 +00:00

182 lines
6.4 KiB
JavaScript

import net from 'node:net';
// Minimal AMCP (Advanced Media Control Protocol) client for CasparCG.
//
// AMCP is a line-based TCP protocol: each command is a single CRLF-terminated
// line, and the server replies with a status line ("201 PLAY OK\r\n") optionally
// followed by data lines. We keep one persistent socket per CasparCG instance
// and serialize commands through a FIFO queue — CasparCG processes one command
// at a time per connection, so interleaving replies would otherwise be
// ambiguous.
//
// We only implement the subset the playout sidecar needs (PLAY / LOADBG / STOP /
// CLEAR / INFO / ADD / REMOVE). Responses are returned raw; callers parse the
// status code where they care.
const CRLF = '\r\n';
export class AmcpClient {
constructor({ host = '127.0.0.1', port = 5250 } = {}) {
this.host = host;
this.port = port;
this.socket = null;
this.connected = false;
this._buffer = '';
this._queue = []; // pending { command, resolve, reject, timer }
this._active = null; // command currently awaiting a reply
this._reconnectTimer = null;
}
connect() {
if (this.socket) return;
const socket = net.createConnection({ host: this.host, port: this.port });
socket.setEncoding('utf8');
socket.setKeepAlive(true, 10000);
socket.on('connect', () => {
this.connected = true;
console.log(`[amcp] connected to ${this.host}:${this.port}`);
});
socket.on('data', (chunk) => this._onData(chunk));
socket.on('error', (err) => {
console.error(`[amcp] socket error: ${err.message}`);
});
socket.on('close', () => {
this.connected = false;
this.socket = null;
// Fail any in-flight + queued commands so callers don't hang.
const pending = this._active ? [this._active, ...this._queue] : [...this._queue];
this._active = null;
this._queue = [];
for (const p of pending) {
clearTimeout(p.timer);
p.reject(new Error('AMCP connection closed'));
}
this._scheduleReconnect();
});
this.socket = socket;
}
_scheduleReconnect() {
if (this._reconnectTimer) return;
this._reconnectTimer = setTimeout(() => {
this._reconnectTimer = null;
console.log('[amcp] reconnecting...');
this.connect();
}, 2000);
}
// Wait until the socket is usable, up to timeoutMs.
async waitReady(timeoutMs = 30000) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (this.connected) return true;
if (!this.socket) this.connect();
await new Promise((r) => setTimeout(r, 250));
}
throw new Error('AMCP not ready within timeout');
}
_onData(chunk) {
this._buffer += chunk;
// A CasparCG reply is a status line, optionally followed by data lines.
// The simplest robust framing: a command's reply is complete when we see a
// status line AND (for 2-line "200" multi-line replies) the terminating
// blank line. For our command subset, single-status-line replies dominate;
// we treat a reply as complete at each newline and let the active command
// decide whether it has enough. To keep this correct for INFO (multi-line),
// we accumulate until the buffer ends with a known terminator.
if (!this._active) {
// Unsolicited data (e.g. connection banner) — discard.
this._buffer = '';
return;
}
// CasparCG ends multi-line replies with CRLF on an empty line. Single-line
// replies (201/202/4xx/5xx) end with a single CRLF. Resolve when we have at
// least one complete line; for "200 ... OK" (list follows) wait for the
// blank-line terminator.
const firstLineEnd = this._buffer.indexOf(CRLF);
if (firstLineEnd === -1) return;
const statusLine = this._buffer.slice(0, firstLineEnd);
const code = parseInt(statusLine, 10);
if (code === 200) {
// Multi-line: data lines until an empty line.
const term = this._buffer.indexOf(CRLF + CRLF);
if (term === -1) return; // wait for more
const full = this._buffer.slice(0, term);
this._buffer = this._buffer.slice(term + 4);
this._finishActive(null, full);
return;
}
if (code === 201 || code === 202) {
// 201: one data line follows the status line. 202: status only.
if (code === 201) {
const secondLineEnd = this._buffer.indexOf(CRLF, firstLineEnd + 2);
if (secondLineEnd === -1) return;
const full = this._buffer.slice(0, secondLineEnd);
this._buffer = this._buffer.slice(secondLineEnd + 2);
this._finishActive(null, full);
} else {
const full = this._buffer.slice(0, firstLineEnd);
this._buffer = this._buffer.slice(firstLineEnd + 2);
this._finishActive(null, full);
}
return;
}
// 4xx / 5xx error, or any other single-line status.
const full = this._buffer.slice(0, firstLineEnd);
this._buffer = this._buffer.slice(firstLineEnd + 2);
if (code >= 400) this._finishActive(new Error(`AMCP error: ${full}`), full);
else this._finishActive(null, full);
}
_finishActive(err, data) {
const active = this._active;
this._active = null;
if (active) {
clearTimeout(active.timer);
if (err) active.reject(err);
else active.resolve(data);
}
this._pump();
}
_pump() {
if (this._active || this._queue.length === 0) return;
const next = this._queue.shift();
this._active = next;
try {
this.socket.write(next.command + CRLF);
} catch (err) {
this._active = null;
clearTimeout(next.timer);
next.reject(err);
}
}
// Send a single AMCP command and resolve with the raw reply string.
send(command, { timeoutMs = 15000 } = {}) {
return new Promise((resolve, reject) => {
const entry = { command, resolve, reject, timer: null };
entry.timer = setTimeout(() => {
// Drop from queue if still pending; if active, detach so the next
// reply doesn't get misrouted.
if (this._active === entry) this._active = null;
else this._queue = this._queue.filter((e) => e !== entry);
reject(new Error(`AMCP command timed out: ${command}`));
}, timeoutMs);
this._queue.push(entry);
this._pump();
});
}
close() {
if (this._reconnectTimer) { clearTimeout(this._reconnectTimer); this._reconnectTimer = null; }
if (this.socket) { try { this.socket.destroy(); } catch (_) {} this.socket = null; }
this.connected = false;
}
}