merge: capture cleanup + standby reconcile helper (base for recorder redesign)
This commit is contained in:
commit
9f2eac7b61
5 changed files with 120 additions and 439 deletions
|
|
@ -95,12 +95,21 @@ detect_gpu() {
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
# SDI capture card present? Blackmagic DeckLink or Deltacast, via lspci.
|
# SDI capture card present? Blackmagic DeckLink or Deltacast.
|
||||||
|
# Checks (any hit ⇒ present), so a driver/PCI-enumeration race at onboard time
|
||||||
|
# can't silently drop the capture profile and break recorders:
|
||||||
|
# 1) lspci vendor match
|
||||||
|
# 2) Deltacast device nodes (/dev/deltacast*, /dev/delta-*)
|
||||||
|
# 3) Blackmagic device nodes (/dev/blackmagic*, /dev/decklink*)
|
||||||
detect_sdi() {
|
detect_sdi() {
|
||||||
if command -v lspci &>/dev/null; then
|
if command -v lspci &>/dev/null && lspci 2>/dev/null | grep -iE 'blackmagic|deltacast' &>/dev/null; then
|
||||||
if lspci 2>/dev/null | grep -iE 'blackmagic|deltacast' &>/dev/null; then
|
return 0
|
||||||
return 0
|
fi
|
||||||
fi
|
if ls /dev/deltacast* /dev/delta-* &>/dev/null; then
|
||||||
|
return 0
|
||||||
|
fi
|
||||||
|
if ls /dev/blackmagic* /dev/decklink* &>/dev/null; then
|
||||||
|
return 0
|
||||||
fi
|
fi
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
@ -209,6 +218,10 @@ info "Writing $ENV_FILE"
|
||||||
echo "NODE_IP=$NODE_IP"
|
echo "NODE_IP=$NODE_IP"
|
||||||
echo "AGENT_PORT=$AGENT_PORT"
|
echo "AGENT_PORT=$AGENT_PORT"
|
||||||
echo "HEARTBEAT_MS=30000"
|
echo "HEARTBEAT_MS=30000"
|
||||||
|
# Persist detected compose profiles so every subsequent `docker compose up`
|
||||||
|
# (manual or scripted) brings up the right services — capture/framecache must
|
||||||
|
# always run on SDI nodes or recorders silently fail. Comma-sep for COMPOSE_PROFILES.
|
||||||
|
echo "COMPOSE_PROFILES=$(echo $PROFILES | tr ' ' ',')"
|
||||||
[[ -n "$BMD_MODEL" ]] && echo "BMD_MODEL=$BMD_MODEL"
|
[[ -n "$BMD_MODEL" ]] && echo "BMD_MODEL=$BMD_MODEL"
|
||||||
for v in REDIS_URL DATABASE_URL S3_ENDPOINT S3_BUCKET S3_ACCESS_KEY S3_SECRET_KEY S3_REGION; do
|
for v in REDIS_URL DATABASE_URL S3_ENDPOINT S3_BUCKET S3_ACCESS_KEY S3_SECRET_KEY S3_REGION; do
|
||||||
val="${!v:-}"
|
val="${!v:-}"
|
||||||
|
|
|
||||||
|
|
@ -611,11 +611,13 @@ class CaptureManager {
|
||||||
// their own cursor, enabling simultaneous growing + proxy + HLS from one
|
// their own cursor, enabling simultaneous growing + proxy + HLS from one
|
||||||
// SDI input without any frame splitting.
|
// SDI input without any frame splitting.
|
||||||
//
|
//
|
||||||
// Audio stays on the named FIFO path (same as before — audio fan-out via
|
// Audio stays on the named FIFO path (audio fan-out via shm is a roadmap
|
||||||
// shm is a roadmap item).
|
// item).
|
||||||
//
|
//
|
||||||
// Falls back to the legacy FIFO path when FC_SLOT_ID is not set (e.g. on
|
// node-agent ALWAYS injects FC_SLOT_ID for SDI sidecars (deterministic
|
||||||
// nodes running an older node-agent or without framecache deployed).
|
// `deltacast-<board>-<port>` / `decklink-<node>-<dev>`), so this is the sole
|
||||||
|
// SDI path. The old FC_SLOT_ID-absent legacy FIFO fallback was removed once
|
||||||
|
// framecache became mandatory on every capture node.
|
||||||
if ((sourceType === 'deltacast' || sourceType === 'sdi' || sourceType === 'blackmagic')
|
if ((sourceType === 'deltacast' || sourceType === 'sdi' || sourceType === 'blackmagic')
|
||||||
&& process.env.FC_SLOT_ID) {
|
&& process.env.FC_SLOT_ID) {
|
||||||
|
|
||||||
|
|
@ -712,67 +714,6 @@ class CaptureManager {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Legacy FIFO path for deltacast ───────────────────────────────────────
|
|
||||||
// Used when FC_SLOT_ID is not set (framecache not deployed on this node,
|
|
||||||
// or older node-agent). Will be removed once framecache is everywhere.
|
|
||||||
if (sourceType === 'deltacast') {
|
|
||||||
const idx = (typeof device === 'number' || /^\d+$/.test(String(device)))
|
|
||||||
? parseInt(device, 10) : 0;
|
|
||||||
const portIdx = (typeof port === 'number' || /^\d+$/.test(String(port)))
|
|
||||||
? parseInt(port, 10) : idx;
|
|
||||||
|
|
||||||
const DC_PIPE_DIR = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast';
|
|
||||||
const videoFifo = `${DC_PIPE_DIR}/video-${portIdx}.fifo`;
|
|
||||||
const audioFifo = `${DC_PIPE_DIR}/audio-${portIdx}.fifo`;
|
|
||||||
|
|
||||||
const { existsSync: _exists } = await import('node:fs');
|
|
||||||
const WAIT_MS = 30_000;
|
|
||||||
const POLL_MS = 500;
|
|
||||||
const deadline = Date.now() + WAIT_MS;
|
|
||||||
let videoReady = false;
|
|
||||||
let audioReady = false;
|
|
||||||
while (Date.now() < deadline) {
|
|
||||||
videoReady = _exists(videoFifo);
|
|
||||||
audioReady = _exists(audioFifo);
|
|
||||||
if (videoReady && audioReady) break;
|
|
||||||
await new Promise(r => setTimeout(r, POLL_MS));
|
|
||||||
}
|
|
||||||
if (!videoReady || !audioReady) {
|
|
||||||
throw new Error(
|
|
||||||
`deltacast bridge FIFOs not ready after ${WAIT_MS / 1000}s ` +
|
|
||||||
`(video=${videoReady} audio=${audioReady}) — is deltacast-bridge running?`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
console.log(`[deltacast] port ${portIdx} FIFOs ready (legacy): ${videoFifo}, ${audioFifo}`);
|
|
||||||
|
|
||||||
const dcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080';
|
|
||||||
const dcFps = process.env.DELTACAST_FRAMERATE || '60000/1001';
|
|
||||||
const dcInterlaced = process.env.DELTACAST_INTERLACED === '1';
|
|
||||||
|
|
||||||
return {
|
|
||||||
inputArgs: [
|
|
||||||
'-use_wallclock_as_timestamps', '1',
|
|
||||||
'-thread_queue_size', '512',
|
|
||||||
'-f', 'rawvideo',
|
|
||||||
'-pix_fmt', 'uyvy422',
|
|
||||||
'-video_size', dcSize,
|
|
||||||
'-framerate', dcFps,
|
|
||||||
'-i', videoFifo,
|
|
||||||
'-use_wallclock_as_timestamps', '1',
|
|
||||||
'-thread_queue_size', '512',
|
|
||||||
'-f', 's16le',
|
|
||||||
'-ar', '48000',
|
|
||||||
'-ac', '2',
|
|
||||||
'-i', audioFifo,
|
|
||||||
],
|
|
||||||
isNetwork: false,
|
|
||||||
bridgeProcess: null,
|
|
||||||
audioFifo: null,
|
|
||||||
interlaced: dcInterlaced,
|
|
||||||
audioInputIndex: 1, /* legacy deltacast: video FIFO=0, audio FIFO=1 */
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// Default: SDI via a pluggable source backend (issue #168). The backend
|
// Default: SDI via a pluggable source backend (issue #168). The backend
|
||||||
// selection defaults to `blackmagic` (DeckLink) so existing SDI recorders
|
// selection defaults to `blackmagic` (DeckLink) so existing SDI recorders
|
||||||
// behave exactly as before. Deltacast/AJA backends throw until their
|
// behave exactly as before. Deltacast/AJA backends throw until their
|
||||||
|
|
|
||||||
|
|
@ -1,330 +0,0 @@
|
||||||
import express from 'express';
|
|
||||||
import { execSync, spawn } from 'child_process';
|
|
||||||
import captureManager from '../capture-manager.js';
|
|
||||||
|
|
||||||
import dgram from 'dgram';
|
|
||||||
import net from 'net';
|
|
||||||
|
|
||||||
function parseUrl(u) {
|
|
||||||
try {
|
|
||||||
const m = String(u).match(/^[a-z]+:\/\/([^:\/?#]+)(?::(\d+))?/i);
|
|
||||||
if (!m) return null;
|
|
||||||
return { host: m[1], port: parseInt(m[2] || '0', 10) };
|
|
||||||
} catch (_) { return null; }
|
|
||||||
}
|
|
||||||
|
|
||||||
async function checkReachable(host, port, sourceType) {
|
|
||||||
if (!port) return { ok: true };
|
|
||||||
if (sourceType === 'srt') return await udpSendProbe(host, port);
|
|
||||||
if (sourceType === 'rtmp') return await tcpConnectProbe(host, port);
|
|
||||||
return { ok: true };
|
|
||||||
}
|
|
||||||
|
|
||||||
function udpSendProbe(host, port) {
|
|
||||||
return new Promise((resolve) => {
|
|
||||||
const sock = dgram.createSocket('udp4');
|
|
||||||
let done = false;
|
|
||||||
const finish = (result) => { if (done) return; done = true; try { sock.close(); } catch (_) {} resolve(result); };
|
|
||||||
sock.on('error', (err) => {
|
|
||||||
const msg = String(err && err.message || err);
|
|
||||||
if (/EHOSTUNREACH|ENETUNREACH/i.test(msg)) {
|
|
||||||
finish({ ok: false, error: 'Host ' + host + ' is unreachable from the capture container (no route). Confirm the IP is correct and the machine is online.', diagnostic: msg });
|
|
||||||
} else if (/ECONNREFUSED|EPORTUNREACH/i.test(msg)) {
|
|
||||||
finish({ ok: false, error: 'Nothing is listening on UDP ' + host + ':' + port + '. In vMix, confirm the SRT output is Started and the port matches.', diagnostic: msg });
|
|
||||||
} else {
|
|
||||||
finish({ ok: false, error: 'UDP probe failed: ' + msg, diagnostic: msg });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
sock.send(Buffer.from('Z-AMPP-PROBE'), port, host, () => {});
|
|
||||||
setTimeout(() => finish({ ok: true }), 1500);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function tcpConnectProbe(host, port) {
|
|
||||||
return new Promise((resolve) => {
|
|
||||||
const sock = new net.Socket();
|
|
||||||
let done = false;
|
|
||||||
const finish = (r) => { if (done) return; done = true; try { sock.destroy(); } catch (_) {} resolve(r); };
|
|
||||||
sock.setTimeout(2500);
|
|
||||||
sock.once('connect', () => finish({ ok: true }));
|
|
||||||
sock.once('timeout', () => finish({ ok: false, error: 'TCP connect to ' + host + ':' + port + ' timed out. Confirm the host is reachable and a TCP listener is running.' }));
|
|
||||||
sock.once('error', (err) => {
|
|
||||||
const msg = String(err && err.message || err);
|
|
||||||
if (/EHOSTUNREACH|ENETUNREACH/i.test(msg)) finish({ ok: false, error: 'Host ' + host + ' unreachable (no route).', diagnostic: msg });
|
|
||||||
else if (/ECONNREFUSED/i.test(msg)) finish({ ok: false, error: 'Nothing is listening on TCP ' + host + ':' + port + '.', diagnostic: msg });
|
|
||||||
else finish({ ok: false, error: 'TCP probe failed: ' + msg, diagnostic: msg });
|
|
||||||
});
|
|
||||||
sock.connect(port, host);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function classifyProbeError(raw, sourceType) {
|
|
||||||
const r = (raw || '').toLowerCase();
|
|
||||||
if (sourceType === 'srt') {
|
|
||||||
if (/connection .* failed: (input\/output|timer expired|connection setup failure)/i.test(raw)) {
|
|
||||||
return 'SRT handshake failed. In vMix: confirm the External Output is Started, Type=SRT, Mode=Listener, port matches, and any passphrase / stream ID is empty (or copied exactly).';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (sourceType === 'rtmp') {
|
|
||||||
if (/connection refused/i.test(r)) return 'Nothing is listening on RTMP at this address. Start your RTMP source.';
|
|
||||||
if (/end-of-file|invalid data found/i.test(r)) return 'Got a TCP connection but no RTMP stream. Confirm the source is publishing and the path / stream-key match.';
|
|
||||||
}
|
|
||||||
return raw;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
const router = express.Router();
|
|
||||||
|
|
||||||
const MAM_API_URL = process.env.MAM_API_URL || 'http://mam-api:3000';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* GET /devices
|
|
||||||
* List available DeckLink devices
|
|
||||||
*/
|
|
||||||
router.get('/devices', (req, res) => {
|
|
||||||
try {
|
|
||||||
const devices = [];
|
|
||||||
let output = '';
|
|
||||||
|
|
||||||
try {
|
|
||||||
output = execSync('ffmpeg -f decklink -list_devices 1 -i dummy 2>&1', {
|
|
||||||
encoding: 'utf-8',
|
|
||||||
});
|
|
||||||
} catch (error) {
|
|
||||||
// ffmpeg returns non-zero, but stderr is still captured
|
|
||||||
output = error.stderr ? error.stderr.toString() : error.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse ffmpeg output for DeckLink device names
|
|
||||||
// Format: [decklink @ ...] "DeckLink Quad 2" (input #0)
|
|
||||||
const lines = output.split('\n');
|
|
||||||
let deviceIndex = 0;
|
|
||||||
|
|
||||||
for (const line of lines) {
|
|
||||||
const match = line.match(/^\s*\[decklink[^\]]*\]\s+"([^"]+)"/);
|
|
||||||
if (match) {
|
|
||||||
devices.push({
|
|
||||||
index: deviceIndex,
|
|
||||||
name: match[1],
|
|
||||||
});
|
|
||||||
deviceIndex++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res.json({ devices });
|
|
||||||
} catch (error) {
|
|
||||||
console.error('Error listing devices:', error);
|
|
||||||
res.status(500).json({ error: 'Failed to list devices' });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
|
||||||
* GET /status
|
|
||||||
* Get current capture status
|
|
||||||
*/
|
|
||||||
router.get('/status', (req, res) => {
|
|
||||||
try {
|
|
||||||
const status = captureManager.getStatus();
|
|
||||||
res.json(status);
|
|
||||||
} catch (error) {
|
|
||||||
console.error('Error getting status:', error);
|
|
||||||
res.status(500).json({ error: 'Failed to get status' });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
router.post('/probe', async (req, res) => {
|
|
||||||
try {
|
|
||||||
const { source_type = 'sdi', source_url, listen = false } = req.body || {};
|
|
||||||
|
|
||||||
if (source_type === 'sdi') {
|
|
||||||
try {
|
|
||||||
const raw = execSync('ffmpeg -hide_banner -f decklink -list_devices 1 -i dummy 2>&1', { encoding: 'utf-8', timeout: 5000 });
|
|
||||||
const devices = [];
|
|
||||||
for (const line of raw.split('\n')) {
|
|
||||||
const m = line.match(/\[decklink[^\]]*\]\s+"([^"]+)"/);
|
|
||||||
if (m) devices.push(m[1]);
|
|
||||||
}
|
|
||||||
return res.json({ ok: true, source_type, devices });
|
|
||||||
} catch (err) {
|
|
||||||
const out = (err.stderr || err.stdout || err.toString()).toString();
|
|
||||||
return res.json({ ok: false, source_type, error: out.slice(0, 800) });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (listen) {
|
|
||||||
return res.json({ ok: false, source_type, error: 'Listener-mode sources cannot be probed standalone. Start the recorder and watch the signal indicator.' });
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!source_url) return res.status(400).json({ error: 'source_url is required' });
|
|
||||||
|
|
||||||
// Pre-flight: parse host:port and check L3/L4 reachability so we can give
|
|
||||||
// an actionable error instead of the opaque libsrt "Input/output error".
|
|
||||||
const parsed = parseUrl(source_url);
|
|
||||||
if (!parsed) {
|
|
||||||
return res.json({ ok: false, source_type, source_url, error: 'Could not parse host:port from URL.' });
|
|
||||||
}
|
|
||||||
const reach = await checkReachable(parsed.host, parsed.port, source_type);
|
|
||||||
if (!reach.ok) {
|
|
||||||
return res.json({ ok: false, source_type, source_url, error: reach.error, diagnostic: reach.diagnostic });
|
|
||||||
}
|
|
||||||
|
|
||||||
let url = source_url;
|
|
||||||
if (source_type === 'srt' && !/mode=/.test(url)) {
|
|
||||||
url += (url.includes('?') ? '&' : '?') + 'mode=caller';
|
|
||||||
}
|
|
||||||
|
|
||||||
const args = ['-hide_banner','-v','error','-probesize','32M','-analyzeduration','8M','-rw_timeout','7000000','-i', url, '-show_streams','-show_format','-of','json'];
|
|
||||||
const ff = spawn('ffprobe', args);
|
|
||||||
let stdout = '', stderr = '';
|
|
||||||
ff.stdout.on('data', (c) => { stdout += c; });
|
|
||||||
ff.stderr.on('data', (c) => { stderr += c; });
|
|
||||||
const killer = setTimeout(() => { try { ff.kill('SIGKILL'); } catch (_) {} }, 10000);
|
|
||||||
ff.on('close', (code) => {
|
|
||||||
clearTimeout(killer);
|
|
||||||
if (code !== 0) {
|
|
||||||
const rawErr = (stderr || 'ffprobe failed').slice(0, 800);
|
|
||||||
const friendly = classifyProbeError(rawErr, source_type);
|
|
||||||
return res.json({ ok: false, source_type, source_url, error: friendly, diagnostic: rawErr });
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
const parsed = JSON.parse(stdout);
|
|
||||||
const streams = (parsed.streams || []).map(s => ({
|
|
||||||
index: s.index, codec_type: s.codec_type, codec_name: s.codec_name,
|
|
||||||
width: s.width, height: s.height, pix_fmt: s.pix_fmt,
|
|
||||||
r_frame_rate: s.r_frame_rate, avg_frame_rate: s.avg_frame_rate,
|
|
||||||
sample_rate: s.sample_rate, channels: s.channels,
|
|
||||||
channel_layout: s.channel_layout, bit_rate: s.bit_rate,
|
|
||||||
}));
|
|
||||||
return res.json({ ok: true, source_type, source_url,
|
|
||||||
format: { format_name: parsed.format && parsed.format.format_name, duration: parsed.format && parsed.format.duration, bit_rate: parsed.format && parsed.format.bit_rate },
|
|
||||||
streams });
|
|
||||||
} catch (err) {
|
|
||||||
return res.json({ ok: false, source_type, source_url, error: 'Could not parse ffprobe output: ' + err.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (error) {
|
|
||||||
console.error('Probe error:', error);
|
|
||||||
res.status(500).json({ error: error.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
|
||||||
* POST /start
|
|
||||||
* Start a new capture session
|
|
||||||
*
|
|
||||||
* Body (SDI):
|
|
||||||
* { project_id, clip_name, device, bin_id?, source_type? }
|
|
||||||
*
|
|
||||||
* Body (SRT/RTMP caller):
|
|
||||||
* { project_id, clip_name, source_type, source_url, bin_id? }
|
|
||||||
*
|
|
||||||
* Body (SRT/RTMP listener):
|
|
||||||
* { project_id, clip_name, source_type, listen: true, listen_port?, stream_key?, bin_id? }
|
|
||||||
*/
|
|
||||||
router.post('/start', async (req, res) => {
|
|
||||||
try {
|
|
||||||
const {
|
|
||||||
project_id,
|
|
||||||
bin_id,
|
|
||||||
clip_name,
|
|
||||||
device,
|
|
||||||
source_type = 'sdi',
|
|
||||||
source_url,
|
|
||||||
listen = false,
|
|
||||||
listen_port,
|
|
||||||
stream_key,
|
|
||||||
} = req.body;
|
|
||||||
|
|
||||||
if (!project_id || !clip_name) {
|
|
||||||
return res.status(400).json({
|
|
||||||
error: 'Missing required fields: project_id, clip_name',
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Source-specific validation
|
|
||||||
if (source_type === 'sdi') {
|
|
||||||
if (device === undefined || device === null) {
|
|
||||||
return res.status(400).json({ error: 'SDI source requires: device' });
|
|
||||||
}
|
|
||||||
} else if (source_type === 'srt' || source_type === 'rtmp') {
|
|
||||||
if (!listen && !source_url) {
|
|
||||||
return res.status(400).json({
|
|
||||||
error: `${source_type.toUpperCase()} caller mode requires: source_url`,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return res.status(400).json({
|
|
||||||
error: `Unknown source_type: ${source_type}. Must be sdi, srt, or rtmp`,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
const session = await captureManager.start({
|
|
||||||
projectId: project_id,
|
|
||||||
binId: bin_id || null,
|
|
||||||
clipName: clip_name,
|
|
||||||
device,
|
|
||||||
sourceType: source_type,
|
|
||||||
sourceUrl: source_url,
|
|
||||||
listen,
|
|
||||||
listenPort: listen_port,
|
|
||||||
streamKey: stream_key,
|
|
||||||
});
|
|
||||||
|
|
||||||
res.json(session);
|
|
||||||
} catch (error) {
|
|
||||||
console.error('Error starting capture:', error);
|
|
||||||
res.status(500).json({ error: error.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
|
||||||
* POST /stop
|
|
||||||
* Stop the current capture session
|
|
||||||
* Body: { session_id }
|
|
||||||
*/
|
|
||||||
router.post('/stop', async (req, res) => {
|
|
||||||
try {
|
|
||||||
const { session_id } = req.body;
|
|
||||||
|
|
||||||
if (!session_id) {
|
|
||||||
return res.status(400).json({ error: 'Missing required field: session_id' });
|
|
||||||
}
|
|
||||||
|
|
||||||
const completedSession = await captureManager.stop(session_id);
|
|
||||||
|
|
||||||
// Register asset with mam-api.
|
|
||||||
// If proxyKey is null (SRT/RTMP source), set needsProxy=true so the
|
|
||||||
// worker generates a proxy from the hires file asynchronously.
|
|
||||||
try {
|
|
||||||
const mamResponse = await fetch(`${MAM_API_URL}/api/v1/assets`, {
|
|
||||||
method: 'POST',
|
|
||||||
headers: { 'Content-Type': 'application/json' },
|
|
||||||
body: JSON.stringify({
|
|
||||||
projectId: completedSession.projectId,
|
|
||||||
binId: completedSession.binId,
|
|
||||||
clipName: completedSession.clipName,
|
|
||||||
sourceType: completedSession.sourceType,
|
|
||||||
hiresKey: completedSession.hiresKey,
|
|
||||||
proxyKey: completedSession.proxyKey,
|
|
||||||
needsProxy: completedSession.proxyKey === null,
|
|
||||||
duration: completedSession.duration,
|
|
||||||
capturedAt: completedSession.startedAt,
|
|
||||||
}),
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!mamResponse.ok) {
|
|
||||||
console.warn(
|
|
||||||
`MAM API registration returned ${mamResponse.status}: ${await mamResponse.text()}`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} catch (mamError) {
|
|
||||||
console.warn('Failed to register asset with MAM API:', mamError.message);
|
|
||||||
}
|
|
||||||
|
|
||||||
res.json(completedSession);
|
|
||||||
} catch (error) {
|
|
||||||
console.error('Error stopping capture:', error);
|
|
||||||
res.status(500).json({ error: error.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
export default router;
|
|
||||||
|
|
@ -280,6 +280,55 @@ function buildStandbyEnv(recorder) {
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Source types that run a long-lived standby sidecar (idle-preview container
|
||||||
|
// kept up 24/7 so `record` is a sub-second HTTP call, not a Docker cold start).
|
||||||
|
const STANDBY_SOURCE_TYPES = ['deltacast', 'sdi', 'blackmagic'];
|
||||||
|
|
||||||
|
// Provision (or re-provision) the single persistent standby sidecar for one
|
||||||
|
// recorder by asking its node's agent to create the idle container. Idempotent
|
||||||
|
// at the node-agent layer (one container per capture port). Updates the
|
||||||
|
// recorder row with the new container_id + status='standby'. Returns:
|
||||||
|
// { ok, containerId?, reason? }
|
||||||
|
// Non-fatal by contract — the caller logs/aggregates; a recorder is still
|
||||||
|
// usable via the on-demand spawn fallback in /start if this fails.
|
||||||
|
async function ensureStandbySidecar(recorder) {
|
||||||
|
if (!recorder.node_id || !STANDBY_SOURCE_TYPES.includes(recorder.source_type)) {
|
||||||
|
return { ok: false, reason: 'not a standby source / no node' };
|
||||||
|
}
|
||||||
|
const { remote: isRemote, apiUrl: targetNodeApiUrl } =
|
||||||
|
await resolveNodeTarget(recorder.node_id).catch(() => ({ remote: false }));
|
||||||
|
if (!isRemote || !targetNodeApiUrl) {
|
||||||
|
return { ok: false, reason: 'node not remote/reachable' };
|
||||||
|
}
|
||||||
|
const capturePort = SIDECAR_PORT_BASE + (recorder.device_index || 0);
|
||||||
|
const useGpu = GPU_CODECS.includes(recorder.recording_codec);
|
||||||
|
const standbyRes = await fetch(`${targetNodeApiUrl}/sidecar/standby`, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({
|
||||||
|
image: 'wild-dragon-capture:latest',
|
||||||
|
env: buildStandbyEnv(recorder),
|
||||||
|
capturePort,
|
||||||
|
sourceType: recorder.source_type,
|
||||||
|
useGpu,
|
||||||
|
gpuUuid: recorder.gpu_uuid || null,
|
||||||
|
}),
|
||||||
|
signal: AbortSignal.timeout(15000),
|
||||||
|
});
|
||||||
|
if (!standbyRes.ok) {
|
||||||
|
return { ok: false, reason: `node-agent returned ${standbyRes.status}` };
|
||||||
|
}
|
||||||
|
const { containerId } = await standbyRes.json();
|
||||||
|
await pool.query(
|
||||||
|
`UPDATE recorders SET container_id = $1, status = 'standby', updated_at = NOW() WHERE id = $2`,
|
||||||
|
[containerId, recorder.id]
|
||||||
|
);
|
||||||
|
recorder.container_id = containerId;
|
||||||
|
recorder.status = 'standby';
|
||||||
|
console.log(`[recorders] standby sidecar spawned for ${recorder.id}: ${containerId}`);
|
||||||
|
return { ok: true, containerId };
|
||||||
|
}
|
||||||
|
|
||||||
// Issue #162 — after a local-spawn stop, wait for the capture container to
|
// Issue #162 — after a local-spawn stop, wait for the capture container to
|
||||||
// finalize its master. The asset row was pre-created at start with
|
// finalize its master. The asset row was pre-created at start with
|
||||||
// status='live' (display_name = current_session_id); the ingest/finalize step
|
// status='live' (display_name = current_session_id); the ingest/finalize step
|
||||||
|
|
@ -432,43 +481,8 @@ router.post('/', async (req, res, next) => {
|
||||||
// Spawn a standby sidecar immediately for SDI/deltacast/blackmagic recorders
|
// Spawn a standby sidecar immediately for SDI/deltacast/blackmagic recorders
|
||||||
// that have an assigned node, so the container + bridge are ready before the
|
// that have an assigned node, so the container + bridge are ready before the
|
||||||
// user hits record. Non-fatal — recorder is still usable if this fails.
|
// user hits record. Non-fatal — recorder is still usable if this fails.
|
||||||
const STANDBY_SOURCE_TYPES = ['deltacast', 'sdi', 'blackmagic'];
|
await ensureStandbySidecar(recorder).catch(e =>
|
||||||
if (recorder.node_id && STANDBY_SOURCE_TYPES.includes(recorder.source_type)) {
|
console.warn(`[recorders] standby spawn failed for ${recorder.id} (non-fatal): ${e.message}`));
|
||||||
const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(recorder.node_id).catch(() => ({ remote: false }));
|
|
||||||
if (isRemote && targetNodeApiUrl) {
|
|
||||||
const capturePort = SIDECAR_PORT_BASE + (recorder.device_index || 0);
|
|
||||||
const useGpu = GPU_CODECS.includes(recorder.recording_codec);
|
|
||||||
try {
|
|
||||||
const standbyRes = await fetch(`${targetNodeApiUrl}/sidecar/standby`, {
|
|
||||||
method: 'POST',
|
|
||||||
headers: { 'Content-Type': 'application/json' },
|
|
||||||
body: JSON.stringify({
|
|
||||||
image: 'wild-dragon-capture:latest',
|
|
||||||
env: buildStandbyEnv(recorder),
|
|
||||||
capturePort,
|
|
||||||
sourceType: recorder.source_type,
|
|
||||||
useGpu,
|
|
||||||
gpuUuid: recorder.gpu_uuid || null,
|
|
||||||
}),
|
|
||||||
signal: AbortSignal.timeout(15000),
|
|
||||||
});
|
|
||||||
if (standbyRes.ok) {
|
|
||||||
const { containerId } = await standbyRes.json();
|
|
||||||
await pool.query(
|
|
||||||
`UPDATE recorders SET container_id = $1, status = 'standby', updated_at = NOW() WHERE id = $2`,
|
|
||||||
[containerId, recorder.id]
|
|
||||||
);
|
|
||||||
recorder.container_id = containerId;
|
|
||||||
recorder.status = 'standby';
|
|
||||||
console.log(`[recorders] standby sidecar spawned for ${recorder.id}: ${containerId}`);
|
|
||||||
} else {
|
|
||||||
console.warn(`[recorders] standby spawn returned ${standbyRes.status} for ${recorder.id} — will spawn on start`);
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
console.warn(`[recorders] standby spawn failed for ${recorder.id} (non-fatal): ${e.message}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res.status(201).json(recorder);
|
res.status(201).json(recorder);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|
@ -476,6 +490,48 @@ router.post('/', async (req, res, next) => {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// POST /reconcile-standby - (re)provision the persistent standby sidecar for
|
||||||
|
// every SDI/deltacast recorder that should have one. Standby sidecars are
|
||||||
|
// created on recorder-create and kept up 24/7 (RestartPolicy=unless-stopped),
|
||||||
|
// but if they're externally removed (manual cleanup, node redeploy, a wiped
|
||||||
|
// /dev/shm) nothing recreates them — the recorder then falls back to the slow
|
||||||
|
// on-demand spawn on /start, which can collide on the capture port. This
|
||||||
|
// endpoint re-warms them so all recorders return to the fast standby path.
|
||||||
|
//
|
||||||
|
// Optional body: { force: true } recreates even recorders that currently claim
|
||||||
|
// a container_id (the node-agent is idempotent per capture port, so a stale id
|
||||||
|
// is replaced cleanly). Without force, only recorders with no container_id are
|
||||||
|
// (re)provisioned.
|
||||||
|
router.post('/reconcile-standby', requireRecorderEdit, async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const force = !!(req.body && req.body.force);
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`SELECT * FROM recorders
|
||||||
|
WHERE source_type = ANY($1)
|
||||||
|
AND node_id IS NOT NULL
|
||||||
|
ORDER BY name`,
|
||||||
|
[STANDBY_SOURCE_TYPES]
|
||||||
|
);
|
||||||
|
const results = [];
|
||||||
|
for (const recorder of rows) {
|
||||||
|
if (!force && recorder.container_id) {
|
||||||
|
results.push({ id: recorder.id, name: recorder.name, ok: true, skipped: 'already has container_id' });
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const r = await ensureStandbySidecar(recorder);
|
||||||
|
results.push({ id: recorder.id, name: recorder.name, ...r });
|
||||||
|
} catch (e) {
|
||||||
|
results.push({ id: recorder.id, name: recorder.name, ok: false, reason: e.message });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const provisioned = results.filter(r => r.ok && r.containerId).length;
|
||||||
|
res.json({ provisioned, total: rows.length, results });
|
||||||
|
} catch (err) {
|
||||||
|
next(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// GET /:id - Get single recorder
|
// GET /:id - Get single recorder
|
||||||
router.get('/:id', async (req, res, next) => {
|
router.get('/:id', async (req, res, next) => {
|
||||||
try {
|
try {
|
||||||
|
|
@ -970,7 +1026,6 @@ router.post('/:id/stop', requireRecorderEdit, async (req, res, next) => {
|
||||||
// /start call immediately.
|
// /start call immediately.
|
||||||
//
|
//
|
||||||
// If NOT in standby (legacy on-demand spawn), use the old docker-stop path.
|
// If NOT in standby (legacy on-demand spawn), use the old docker-stop path.
|
||||||
const STANDBY_SOURCE_TYPES = ['deltacast', 'sdi', 'blackmagic'];
|
|
||||||
const isStandbySource = STANDBY_SOURCE_TYPES.includes(recorder.source_type);
|
const isStandbySource = STANDBY_SOURCE_TYPES.includes(recorder.source_type);
|
||||||
|
|
||||||
if (isStandbySource && recorder.container_id) {
|
if (isStandbySource && recorder.container_id) {
|
||||||
|
|
|
||||||
|
|
@ -555,7 +555,9 @@ async function handleSidecarStart(body, res) {
|
||||||
// "deltacast-<board>-<port>" (both known here), so we construct it
|
// "deltacast-<board>-<port>" (both known here), so we construct it
|
||||||
// directly and DO NOT wait for the bridge's async format JSON. This is
|
// directly and DO NOT wait for the bridge's async format JSON. This is
|
||||||
// the fix for the cold-start race where _dcPortFmt was still empty on
|
// the fix for the cold-start race where _dcPortFmt was still empty on
|
||||||
// first recorder start, silently falling back to the legacy FIFO path.
|
// first recorder start. FC_SLOT_ID is now MANDATORY — the legacy
|
||||||
|
// FIFO-video fallback in capture-manager was removed, so a missing slot
|
||||||
|
// id would hard-fail rather than silently degrade.
|
||||||
const _slotId = `deltacast-${DC_BOARD}-${_portNum}`;
|
const _slotId = `deltacast-${DC_BOARD}-${_portNum}`;
|
||||||
sidecarEnv.push(`FC_SLOT_ID=${_slotId}`);
|
sidecarEnv.push(`FC_SLOT_ID=${_slotId}`);
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue