feat(mam-api): /playout control plane + auto-failover
Routes: channel + playlist CRUD, start/stop/play/pause/skip transport, as-run log. RBAC via assertProjectAccess on channel.project_id; null project ⇒ admin-only (recorder convention). Sidecar orchestration mirrors recorders.js: Docker socket for local node, node-agent /sidecar/start for remote. Channel start passes CHANNEL_ID env so the sidecar can write HLS preview to /media/live/<id>. DeckLink port-contention guard: blocks starting a decklink channel when a recorder or another channel on the same node+device_index is active. restartChannel(id) helper picks another healthy cluster node and re-places non-decklink channels; decklink is alert-only. Exposed for the scheduler. Scheduler tick adds step 6: poll each running channel's sidecar /status, update last_heartbeat_at, and after ~3 misses trigger restartChannel + self-call /start. Reuses the existing PG advisory lock so multi-replica deploys don't double-fire failovers.
This commit is contained in:
parent
d62af34e98
commit
5538683d78
3 changed files with 749 additions and 0 deletions
|
|
@ -22,6 +22,7 @@ import jobsRouter from './routes/jobs.js';
|
||||||
import captureRouter from './routes/capture.js';
|
import captureRouter from './routes/capture.js';
|
||||||
import uploadRouter from './routes/upload.js';
|
import uploadRouter from './routes/upload.js';
|
||||||
import recordersRouter from './routes/recorders.js';
|
import recordersRouter from './routes/recorders.js';
|
||||||
|
import playoutRouter from './routes/playout.js';
|
||||||
import settingsRouter from './routes/settings.js';
|
import settingsRouter from './routes/settings.js';
|
||||||
import amppRouter from './routes/ampp.js';
|
import amppRouter from './routes/ampp.js';
|
||||||
import groupsRouter from './routes/groups.js';
|
import groupsRouter from './routes/groups.js';
|
||||||
|
|
@ -132,6 +133,7 @@ app.use('/api/v1/jobs', jobsRouter);
|
||||||
app.use('/api/v1/capture', captureRouter);
|
app.use('/api/v1/capture', captureRouter);
|
||||||
app.use('/api/v1/upload', uploadRouter);
|
app.use('/api/v1/upload', uploadRouter);
|
||||||
app.use('/api/v1/recorders', recordersRouter);
|
app.use('/api/v1/recorders', recordersRouter);
|
||||||
|
app.use('/api/v1/playout', playoutRouter);
|
||||||
app.use('/api/v1/settings', settingsRouter);
|
app.use('/api/v1/settings', settingsRouter);
|
||||||
app.use('/api/v1/ampp', amppRouter);
|
app.use('/api/v1/ampp', amppRouter);
|
||||||
app.use('/api/v1/groups', requireAdmin, groupsRouter);
|
app.use('/api/v1/groups', requireAdmin, groupsRouter);
|
||||||
|
|
|
||||||
675
services/mam-api/src/routes/playout.js
Normal file
675
services/mam-api/src/routes/playout.js
Normal file
|
|
@ -0,0 +1,675 @@
|
||||||
|
// Playout / Master Control routes.
|
||||||
|
//
|
||||||
|
// Control plane for the CasparCG-backed playout subsystem. Channels are placed
|
||||||
|
// on cluster nodes and their engine containers spawned via the same Docker-socket
|
||||||
|
// / node-agent path recorders use; the channel's transport (play / pause / skip)
|
||||||
|
// is proxied through to the sidecar's HTTP shim, which drives CasparCG over AMCP.
|
||||||
|
//
|
||||||
|
// RBAC: every channel carries a project_id (NULL = admin-only, the recorder
|
||||||
|
// convention). List routes filter by accessible projects; mutating routes assert
|
||||||
|
// 'edit'. See docs/superpowers/specs/2026-05-30-playout-mcr-design.md.
|
||||||
|
|
||||||
|
import express from 'express';
|
||||||
|
import http from 'http';
|
||||||
|
import { Queue } from 'bullmq';
|
||||||
|
import pool from '../db/pool.js';
|
||||||
|
import { validateUuid } from '../middleware/errors.js';
|
||||||
|
import {
|
||||||
|
assertProjectAccess, accessibleProjectIds, isAdmin,
|
||||||
|
} from '../auth/authz.js';
|
||||||
|
|
||||||
|
const router = express.Router();
|
||||||
|
|
||||||
|
// ── BullMQ: media staging queue (S3 -> /media volume) ────────────────────────
|
||||||
|
const parseRedisUrl = (url) => {
|
||||||
|
const parsed = new URL(url);
|
||||||
|
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
|
||||||
|
};
|
||||||
|
const stageQueue = new Queue('playout-stage', {
|
||||||
|
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Sidecar orchestration (mirrors recorders.js) ─────────────────────────────
|
||||||
|
const PLAYOUT_SIDECAR_IMAGE = process.env.PLAYOUT_IMAGE || 'wild-dragon-playout:latest';
|
||||||
|
|
||||||
|
function dockerApi(method, path, body = null) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const options = {
|
||||||
|
socketPath: '/var/run/docker.sock',
|
||||||
|
path: `/v1.43${path}`,
|
||||||
|
method,
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
};
|
||||||
|
const req = http.request(options, (res) => {
|
||||||
|
let data = '';
|
||||||
|
res.on('data', (chunk) => (data += chunk));
|
||||||
|
res.on('end', () => {
|
||||||
|
try { resolve({ status: res.statusCode, data: data ? JSON.parse(data) : {} }); }
|
||||||
|
catch { resolve({ status: res.statusCode, data }); }
|
||||||
|
});
|
||||||
|
});
|
||||||
|
req.on('error', reject);
|
||||||
|
req.setTimeout(10000, () => req.destroy(new Error('Docker API timeout after 10s')));
|
||||||
|
if (body) req.write(JSON.stringify(body));
|
||||||
|
req.end();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function resolveNodeTarget(nodeId) {
|
||||||
|
if (!nodeId) return { remote: false };
|
||||||
|
const r = await pool.query(
|
||||||
|
'SELECT hostname, ip_address, api_url FROM cluster_nodes WHERE id = $1', [nodeId]
|
||||||
|
);
|
||||||
|
if (r.rows.length === 0) return { remote: false };
|
||||||
|
const node = r.rows[0];
|
||||||
|
const localHostname = process.env.NODE_HOSTNAME || '';
|
||||||
|
if (!node.api_url || node.hostname === localHostname) return { remote: false };
|
||||||
|
return { remote: true, apiUrl: node.api_url, ip: node.ip_address };
|
||||||
|
}
|
||||||
|
|
||||||
|
// The sidecar shim listens on this port inside the container. The mam-api talks
|
||||||
|
// to it by container alias on the shared docker network (local) or via the
|
||||||
|
// node-agent's returned host:port (remote).
|
||||||
|
const SIDECAR_HTTP_PORT = 3002;
|
||||||
|
|
||||||
|
function channelAlias(id) { return `playout-${id}`; }
|
||||||
|
|
||||||
|
// Resolve the base URL the API uses to reach a running channel's sidecar shim.
|
||||||
|
// Local: the docker-network alias. Remote: the node-agent reported the host the
|
||||||
|
// container is published on (stored in container_meta.sidecar_url).
|
||||||
|
function sidecarBaseUrl(channel) {
|
||||||
|
if (channel.container_meta && channel.container_meta.sidecar_url) {
|
||||||
|
return channel.container_meta.sidecar_url;
|
||||||
|
}
|
||||||
|
return `http://${channelAlias(channel.id)}:${SIDECAR_HTTP_PORT}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function callSidecar(channel, path, method = 'POST', body = null) {
|
||||||
|
const url = `${sidecarBaseUrl(channel)}${path}`;
|
||||||
|
const res = await fetch(url, {
|
||||||
|
method,
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: body ? JSON.stringify(body) : undefined,
|
||||||
|
signal: AbortSignal.timeout(20000),
|
||||||
|
});
|
||||||
|
if (!res.ok) {
|
||||||
|
const text = await res.text().catch(() => '');
|
||||||
|
throw new Error(`sidecar ${method} ${path} -> HTTP ${res.status}: ${text.slice(0, 200)}`);
|
||||||
|
}
|
||||||
|
return res.json().catch(() => ({}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Serialization ────────────────────────────────────────────────────────────
|
||||||
|
function channelToJson(r) {
|
||||||
|
return {
|
||||||
|
id: r.id,
|
||||||
|
name: r.name,
|
||||||
|
node_id: r.node_id,
|
||||||
|
output_type: r.output_type,
|
||||||
|
output_config: r.output_config,
|
||||||
|
video_format: r.video_format,
|
||||||
|
status: r.status,
|
||||||
|
container_id: r.container_id,
|
||||||
|
error_message: r.error_message,
|
||||||
|
project_id: r.project_id,
|
||||||
|
restart_count: r.restart_count ?? 0,
|
||||||
|
last_restart_at: r.last_restart_at,
|
||||||
|
last_heartbeat_at: r.last_heartbeat_at,
|
||||||
|
created_at: r.created_at,
|
||||||
|
updated_at: r.updated_at,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const OUTPUT_TYPES = new Set(['decklink', 'ndi', 'srt', 'rtmp']);
|
||||||
|
|
||||||
|
// ── Param resolver: scope every /:id route to the channel's project ──────────
|
||||||
|
router.param('id', async (req, res, next) => {
|
||||||
|
validateUuid('id')(req, res, () => {});
|
||||||
|
if (res.headersSent) return;
|
||||||
|
try {
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
'SELECT * FROM playout_channels WHERE id = $1', [req.params.id]
|
||||||
|
);
|
||||||
|
if (rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
|
||||||
|
req.channel = rows[0];
|
||||||
|
await assertProjectAccess(req.user, req.channel.project_id, 'view');
|
||||||
|
next();
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
async function requireChannelEdit(req, res, next) {
|
||||||
|
try { await assertProjectAccess(req.user, req.channel.project_id, 'edit'); next(); }
|
||||||
|
catch (err) { next(err); }
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Channels ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// GET /playout/channels — list (filtered to accessible projects)
|
||||||
|
router.get('/channels', async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
let rows;
|
||||||
|
if (isAdmin(req.user)) {
|
||||||
|
({ rows } = await pool.query('SELECT * FROM playout_channels ORDER BY created_at DESC'));
|
||||||
|
} else {
|
||||||
|
const ids = await accessibleProjectIds(req.user);
|
||||||
|
if (ids.length === 0) return res.json([]);
|
||||||
|
({ rows } = await pool.query(
|
||||||
|
'SELECT * FROM playout_channels WHERE project_id = ANY($1) ORDER BY created_at DESC', [ids]
|
||||||
|
));
|
||||||
|
}
|
||||||
|
res.json(rows.map(channelToJson));
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// POST /playout/channels — create
|
||||||
|
router.post('/channels', async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const { name, node_id = null, output_type = 'srt', output_config = {},
|
||||||
|
video_format = '1080p5994', project_id = null } = req.body || {};
|
||||||
|
if (!name || typeof name !== 'string') {
|
||||||
|
return res.status(400).json({ error: 'name is required' });
|
||||||
|
}
|
||||||
|
if (!OUTPUT_TYPES.has(output_type)) {
|
||||||
|
return res.status(400).json({ error: `output_type must be one of: ${[...OUTPUT_TYPES].join(', ')}` });
|
||||||
|
}
|
||||||
|
// Creating a project-scoped channel requires edit on that project; a
|
||||||
|
// null-project (admin-only) channel requires admin.
|
||||||
|
if (project_id) await assertProjectAccess(req.user, project_id, 'edit');
|
||||||
|
else if (!isAdmin(req.user)) return res.status(403).json({ error: 'admin required for unassigned channel' });
|
||||||
|
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`INSERT INTO playout_channels (name, node_id, output_type, output_config, video_format, project_id)
|
||||||
|
VALUES ($1,$2,$3,$4,$5,$6) RETURNING *`,
|
||||||
|
[name.trim(), node_id, output_type, JSON.stringify(output_config), video_format, project_id]
|
||||||
|
);
|
||||||
|
res.status(201).json(channelToJson(rows[0]));
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// PATCH /playout/channels/:id — update config (only while stopped)
|
||||||
|
router.patch('/channels/:id', requireChannelEdit, async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
if (req.channel.status === 'running') {
|
||||||
|
return res.status(409).json({ error: 'Cannot edit a running channel — stop it first' });
|
||||||
|
}
|
||||||
|
const allowed = ['name', 'node_id', 'output_type', 'output_config', 'video_format', 'project_id'];
|
||||||
|
const sets = [];
|
||||||
|
const vals = [];
|
||||||
|
let i = 1;
|
||||||
|
for (const k of allowed) {
|
||||||
|
if (req.body[k] === undefined) continue;
|
||||||
|
if (k === 'output_type' && !OUTPUT_TYPES.has(req.body[k])) {
|
||||||
|
return res.status(400).json({ error: 'invalid output_type' });
|
||||||
|
}
|
||||||
|
sets.push(`${k} = $${i++}`);
|
||||||
|
vals.push(k === 'output_config' ? JSON.stringify(req.body[k]) : req.body[k]);
|
||||||
|
}
|
||||||
|
if (sets.length === 0) return res.json(channelToJson(req.channel));
|
||||||
|
vals.push(req.channel.id);
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`UPDATE playout_channels SET ${sets.join(', ')}, updated_at = NOW() WHERE id = $${i} RETURNING *`, vals
|
||||||
|
);
|
||||||
|
res.json(channelToJson(rows[0]));
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// DELETE /playout/channels/:id
|
||||||
|
router.delete('/channels/:id', requireChannelEdit, async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
if (req.channel.status === 'running') {
|
||||||
|
return res.status(409).json({ error: 'Stop the channel before deleting it' });
|
||||||
|
}
|
||||||
|
await pool.query('DELETE FROM playout_channels WHERE id = $1', [req.channel.id]);
|
||||||
|
res.json({ deleted: true });
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Port-contention guard (DeckLink) ─────────────────────────────────────────
|
||||||
|
// A DeckLink device on a node is exclusive: an active recorder OR another active
|
||||||
|
// channel on the same node+index blocks a new SDI channel. NDI/SRT/RTMP have no
|
||||||
|
// hardware contention.
|
||||||
|
async function assertDeckLinkFree(channel) {
|
||||||
|
if (channel.output_type !== 'decklink') return;
|
||||||
|
const idx = (channel.output_config && channel.output_config.device_index) || 1;
|
||||||
|
// Another running channel on the same node + device index?
|
||||||
|
const chan = await pool.query(
|
||||||
|
`SELECT id FROM playout_channels
|
||||||
|
WHERE id <> $1 AND node_id IS NOT DISTINCT FROM $2 AND status = 'running'
|
||||||
|
AND output_type = 'decklink' AND (output_config->>'device_index')::int = $3`,
|
||||||
|
[channel.id, channel.node_id, idx]
|
||||||
|
);
|
||||||
|
if (chan.rows.length > 0) {
|
||||||
|
throw Object.assign(new Error(`DeckLink device ${idx} already in use by another channel on this node`), { httpStatus: 409 });
|
||||||
|
}
|
||||||
|
// An active recorder using the same device index on the same node?
|
||||||
|
const rec = await pool.query(
|
||||||
|
`SELECT id FROM recorders
|
||||||
|
WHERE node_id IS NOT DISTINCT FROM $1 AND device_index = $2
|
||||||
|
AND status = 'recording' AND source_type = 'sdi'`,
|
||||||
|
[channel.node_id, idx]
|
||||||
|
);
|
||||||
|
if (rec.rows.length > 0) {
|
||||||
|
throw Object.assign(new Error(`DeckLink device ${idx} is in use by a recorder on this node`), { httpStatus: 409 });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// POST /playout/channels/:id/start — spawn the CasparCG sidecar + bring up output
|
||||||
|
router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const channel = req.channel;
|
||||||
|
if (channel.status === 'running' || channel.status === 'starting') {
|
||||||
|
return res.status(409).json({ error: `Channel already ${channel.status}` });
|
||||||
|
}
|
||||||
|
await assertDeckLinkFree(channel);
|
||||||
|
|
||||||
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = NULL WHERE id = $2', ['starting', channel.id]);
|
||||||
|
|
||||||
|
const env = [
|
||||||
|
`OUTPUT_TYPE=${channel.output_type}`,
|
||||||
|
`OUTPUT_CONFIG=${JSON.stringify(channel.output_config || {})}`,
|
||||||
|
`VIDEO_FORMAT=${channel.video_format}`,
|
||||||
|
`PORT=${SIDECAR_HTTP_PORT}`,
|
||||||
|
// Drives the HLS preview path (/media/live/<channel_id>/index.m3u8) and
|
||||||
|
// the per-channel resource naming inside the sidecar.
|
||||||
|
`CHANNEL_ID=${channel.id}`,
|
||||||
|
];
|
||||||
|
|
||||||
|
const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(channel.node_id);
|
||||||
|
const dockerNetwork = process.env.DOCKER_NETWORK || 'wild-dragon_wild-dragon';
|
||||||
|
let containerId;
|
||||||
|
let containerMeta = {};
|
||||||
|
|
||||||
|
if (isRemote) {
|
||||||
|
const sidecarRes = await fetch(`${targetNodeApiUrl}/sidecar/start`, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({
|
||||||
|
image: PLAYOUT_SIDECAR_IMAGE, env,
|
||||||
|
capturePort: SIDECAR_HTTP_PORT,
|
||||||
|
sourceType: channel.output_type,
|
||||||
|
useGpu: false,
|
||||||
|
publishHttp: true,
|
||||||
|
}),
|
||||||
|
signal: AbortSignal.timeout(20000),
|
||||||
|
});
|
||||||
|
if (!sidecarRes.ok) {
|
||||||
|
const details = await sidecarRes.json().catch(() => ({}));
|
||||||
|
console.error('[playout] remote sidecar start failed:', JSON.stringify(details));
|
||||||
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
|
||||||
|
['error', 'remote node failed to start sidecar', channel.id]);
|
||||||
|
return res.status(502).json({ error: 'Remote node failed to start sidecar' });
|
||||||
|
}
|
||||||
|
const data = await sidecarRes.json();
|
||||||
|
containerId = data.containerId;
|
||||||
|
// node-agent returns the reachable host:port the shim is published on.
|
||||||
|
if (data.sidecarUrl || data.host) {
|
||||||
|
containerMeta.sidecar_url = data.sidecarUrl || `http://${data.host}:${SIDECAR_HTTP_PORT}`;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
const alias = channelAlias(channel.id);
|
||||||
|
const hostBinds = ['/mnt/NVME/MAM/wild-dragon-media:/media'];
|
||||||
|
if (channel.output_type === 'decklink') hostBinds.push('/dev/blackmagic:/dev/blackmagic');
|
||||||
|
|
||||||
|
const containerConfig = {
|
||||||
|
Image: PLAYOUT_SIDECAR_IMAGE,
|
||||||
|
Env: env,
|
||||||
|
HostConfig: {
|
||||||
|
Privileged: true,
|
||||||
|
NetworkMode: dockerNetwork,
|
||||||
|
Binds: hostBinds,
|
||||||
|
},
|
||||||
|
NetworkingConfig: { EndpointsConfig: { [dockerNetwork]: { Aliases: [alias] } } },
|
||||||
|
Hostname: alias,
|
||||||
|
};
|
||||||
|
const createRes = await dockerApi('POST', '/containers/create', containerConfig);
|
||||||
|
if (createRes.status !== 201) {
|
||||||
|
console.error('[playout] container create failed:', JSON.stringify(createRes.data));
|
||||||
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
|
||||||
|
['error', 'container create failed', channel.id]);
|
||||||
|
return res.status(500).json({ error: 'Failed to create container' });
|
||||||
|
}
|
||||||
|
containerId = createRes.data.Id;
|
||||||
|
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
|
||||||
|
if (startRes.status !== 204) {
|
||||||
|
console.error('[playout] container start failed:', JSON.stringify(startRes.data));
|
||||||
|
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
||||||
|
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
|
||||||
|
['error', 'container start failed', channel.id]);
|
||||||
|
return res.status(500).json({ error: 'Failed to start container' });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`UPDATE playout_channels
|
||||||
|
SET status = 'running', container_id = $1, container_meta = $2, updated_at = NOW()
|
||||||
|
WHERE id = $3 RETURNING *`,
|
||||||
|
[containerId, JSON.stringify(containerMeta), channel.id]
|
||||||
|
);
|
||||||
|
res.json(channelToJson(rows[0]));
|
||||||
|
} catch (err) {
|
||||||
|
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
|
||||||
|
next(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// POST /playout/channels/:id/stop — tear down the sidecar
|
||||||
|
router.post('/channels/:id/stop', requireChannelEdit, async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const channel = req.channel;
|
||||||
|
if (channel.container_id) {
|
||||||
|
const { remote: isRemote, apiUrl } = await resolveNodeTarget(channel.node_id);
|
||||||
|
if (isRemote) {
|
||||||
|
await fetch(`${apiUrl}/sidecar/stop`, {
|
||||||
|
method: 'POST', headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({ containerId: channel.container_id }),
|
||||||
|
signal: AbortSignal.timeout(20000),
|
||||||
|
}).catch((e) => console.error('[playout] remote stop failed:', e.message));
|
||||||
|
} else {
|
||||||
|
await dockerApi('POST', `/containers/${channel.container_id}/stop?t=10`).catch(() => {});
|
||||||
|
await dockerApi('DELETE', `/containers/${channel.container_id}?force=true`).catch(() => {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`UPDATE playout_channels SET status = 'stopped', container_id = NULL, updated_at = NOW()
|
||||||
|
WHERE id = $1 RETURNING *`, [channel.id]
|
||||||
|
);
|
||||||
|
res.json(channelToJson(rows[0]));
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// GET /playout/channels/:id/status — live engine status (proxied to sidecar)
|
||||||
|
router.get('/channels/:id/status', async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
if (req.channel.status !== 'running') {
|
||||||
|
return res.json({ running: false, status: req.channel.status });
|
||||||
|
}
|
||||||
|
const out = await callSidecar(req.channel, '/status', 'GET');
|
||||||
|
res.json({ running: true, status: req.channel.status, engine: out });
|
||||||
|
} catch (err) {
|
||||||
|
res.json({ running: true, status: req.channel.status, engine: null, engine_error: err.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Transport ────────────────────────────────────────────────────────────────
|
||||||
|
async function transport(req, res, action, body = null) {
|
||||||
|
if (req.channel.status !== 'running') {
|
||||||
|
return res.status(409).json({ error: 'Channel is not running' });
|
||||||
|
}
|
||||||
|
try { res.json(await callSidecar(req.channel, action, 'POST', body)); }
|
||||||
|
catch (err) { res.status(502).json({ error: err.message }); }
|
||||||
|
}
|
||||||
|
|
||||||
|
// POST /playout/channels/:id/play — resolve the channel's playlist, stage-check,
|
||||||
|
// and hand the engine the ordered list of ready clips.
|
||||||
|
router.post('/channels/:id/play', requireChannelEdit, async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
if (req.channel.status !== 'running') {
|
||||||
|
return res.status(409).json({ error: 'Start the channel before playing' });
|
||||||
|
}
|
||||||
|
const { playlist_id } = req.body || {};
|
||||||
|
if (!playlist_id) return res.status(400).json({ error: 'playlist_id is required' });
|
||||||
|
|
||||||
|
const pl = await pool.query('SELECT * FROM playout_playlists WHERE id = $1 AND channel_id = $2',
|
||||||
|
[playlist_id, req.channel.id]);
|
||||||
|
if (pl.rows.length === 0) return res.status(404).json({ error: 'Playlist not found for this channel' });
|
||||||
|
|
||||||
|
const items = await pool.query(
|
||||||
|
`SELECT i.*, a.filename AS clip_name
|
||||||
|
FROM playout_items i JOIN assets a ON a.id = i.asset_id
|
||||||
|
WHERE i.playlist_id = $1 ORDER BY i.sort_order ASC`, [playlist_id]);
|
||||||
|
|
||||||
|
const notReady = items.rows.filter((i) => i.media_status !== 'ready' || !i.media_path);
|
||||||
|
if (notReady.length > 0) {
|
||||||
|
return res.status(409).json({
|
||||||
|
error: 'Some items are not staged yet',
|
||||||
|
pending: notReady.map((i) => i.id),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const payload = {
|
||||||
|
loop: pl.rows[0].loop,
|
||||||
|
items: items.rows.map((i) => ({
|
||||||
|
id: i.id, asset_id: i.asset_id, media_path: i.media_path,
|
||||||
|
in_point: i.in_point ? Number(i.in_point) : null,
|
||||||
|
out_point: i.out_point ? Number(i.out_point) : null,
|
||||||
|
transition: i.transition, transition_ms: i.transition_ms,
|
||||||
|
clip_name: i.clip_name,
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
const out = await callSidecar(req.channel, '/playlist/load', 'POST', payload);
|
||||||
|
res.json(out);
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
router.post('/channels/:id/pause', requireChannelEdit, (req, res) => transport(req, res, '/transport/pause'));
|
||||||
|
router.post('/channels/:id/resume', requireChannelEdit, (req, res) => transport(req, res, '/transport/resume'));
|
||||||
|
router.post('/channels/:id/skip', requireChannelEdit, (req, res) => transport(req, res, '/transport/skip'));
|
||||||
|
router.post('/channels/:id/stop-playback', requireChannelEdit, (req, res) => transport(req, res, '/channel/stop'));
|
||||||
|
|
||||||
|
// GET /playout/channels/:id/asrun — as-run log
|
||||||
|
router.get('/channels/:id/asrun', async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`SELECT * FROM playout_as_run WHERE channel_id = $1 ORDER BY started_at DESC LIMIT 500`,
|
||||||
|
[req.channel.id]);
|
||||||
|
res.json(rows);
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Playlists ────────────────────────────────────────────────────────────────
|
||||||
|
async function loadChannelForBody(req, res, next) {
|
||||||
|
// For playlist/item routes the channel is referenced indirectly; resolve it
|
||||||
|
// and assert edit. Used on create/mutate routes that carry channel_id.
|
||||||
|
const channelId = req.body.channel_id || req.query.channel_id;
|
||||||
|
if (!channelId) return res.status(400).json({ error: 'channel_id is required' });
|
||||||
|
try {
|
||||||
|
const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]);
|
||||||
|
if (rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
|
||||||
|
req.channel = rows[0];
|
||||||
|
await assertProjectAccess(req.user, req.channel.project_id, 'edit');
|
||||||
|
next();
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
}
|
||||||
|
|
||||||
|
// GET /playout/playlists?channel_id=...
|
||||||
|
router.get('/playlists', async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const channelId = req.query.channel_id;
|
||||||
|
if (!channelId) return res.status(400).json({ error: 'channel_id is required' });
|
||||||
|
const ch = await pool.query('SELECT project_id FROM playout_channels WHERE id = $1', [channelId]);
|
||||||
|
if (ch.rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
|
||||||
|
await assertProjectAccess(req.user, ch.rows[0].project_id, 'view');
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
'SELECT * FROM playout_playlists WHERE channel_id = $1 ORDER BY created_at ASC', [channelId]);
|
||||||
|
res.json(rows);
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// POST /playout/playlists
|
||||||
|
router.post('/playlists', loadChannelForBody, async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const { name, loop = false } = req.body || {};
|
||||||
|
if (!name) return res.status(400).json({ error: 'name is required' });
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
'INSERT INTO playout_playlists (channel_id, name, loop) VALUES ($1,$2,$3) RETURNING *',
|
||||||
|
[req.channel.id, name.trim(), !!loop]);
|
||||||
|
res.status(201).json(rows[0]);
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// GET /playout/playlists/:plid/items
|
||||||
|
router.get('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const pl = await pool.query(
|
||||||
|
`SELECT p.*, c.project_id FROM playout_playlists p
|
||||||
|
JOIN playout_channels c ON c.id = p.channel_id WHERE p.id = $1`, [req.params.plid]);
|
||||||
|
if (pl.rows.length === 0) return res.status(404).json({ error: 'Playlist not found' });
|
||||||
|
await assertProjectAccess(req.user, pl.rows[0].project_id, 'view');
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`SELECT i.*, a.filename AS clip_name, a.duration_ms AS asset_duration_ms
|
||||||
|
FROM playout_items i JOIN assets a ON a.id = i.asset_id
|
||||||
|
WHERE i.playlist_id = $1 ORDER BY i.sort_order ASC`, [req.params.plid]);
|
||||||
|
res.json(rows);
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// Helper: load a playlist + assert edit on its channel's project.
|
||||||
|
async function loadPlaylistEdit(plid, user) {
|
||||||
|
const pl = await pool.query(
|
||||||
|
`SELECT p.*, c.project_id FROM playout_playlists p
|
||||||
|
JOIN playout_channels c ON c.id = p.channel_id WHERE p.id = $1`, [plid]);
|
||||||
|
if (pl.rows.length === 0) { throw Object.assign(new Error('Playlist not found'), { httpStatus: 404 }); }
|
||||||
|
await assertProjectAccess(user, pl.rows[0].project_id, 'edit');
|
||||||
|
return pl.rows[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
// POST /playout/playlists/:plid/items — add an asset to a playlist
|
||||||
|
router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
await loadPlaylistEdit(req.params.plid, req.user);
|
||||||
|
const { asset_id, in_point = null, out_point = null,
|
||||||
|
transition = 'cut', transition_ms = 0 } = req.body || {};
|
||||||
|
if (!asset_id) return res.status(400).json({ error: 'asset_id is required' });
|
||||||
|
|
||||||
|
// Append at the end of the playlist.
|
||||||
|
const ord = await pool.query(
|
||||||
|
'SELECT COALESCE(MAX(sort_order), -1) + 1 AS next FROM playout_items WHERE playlist_id = $1',
|
||||||
|
[req.params.plid]);
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`INSERT INTO playout_items (playlist_id, asset_id, sort_order, in_point, out_point, transition, transition_ms)
|
||||||
|
VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING *`,
|
||||||
|
[req.params.plid, asset_id, ord.rows[0].next, in_point, out_point, transition, transition_ms]);
|
||||||
|
|
||||||
|
// Kick staging immediately so the clip is air-ready by the time the operator
|
||||||
|
// hits play.
|
||||||
|
await stageQueue.add('stage', { itemId: rows[0].id, assetId: asset_id }).catch((e) =>
|
||||||
|
console.error('[playout] failed to enqueue stage job:', e.message));
|
||||||
|
|
||||||
|
res.status(201).json(rows[0]);
|
||||||
|
} catch (err) {
|
||||||
|
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
|
||||||
|
next(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// PUT /playout/playlists/:plid/reorder — body { order: [itemId, itemId, ...] }
|
||||||
|
router.put('/playlists/:plid/reorder', validateUuid('plid'), async (req, res, next) => {
|
||||||
|
const client = await pool.connect();
|
||||||
|
try {
|
||||||
|
await loadPlaylistEdit(req.params.plid, req.user);
|
||||||
|
const { order } = req.body || {};
|
||||||
|
if (!Array.isArray(order)) return res.status(400).json({ error: 'order must be an array of item ids' });
|
||||||
|
await client.query('BEGIN');
|
||||||
|
for (let i = 0; i < order.length; i++) {
|
||||||
|
await client.query(
|
||||||
|
'UPDATE playout_items SET sort_order = $1, updated_at = NOW() WHERE id = $2 AND playlist_id = $3',
|
||||||
|
[i, order[i], req.params.plid]);
|
||||||
|
}
|
||||||
|
await client.query('COMMIT');
|
||||||
|
res.json({ reordered: order.length });
|
||||||
|
} catch (err) {
|
||||||
|
await client.query('ROLLBACK').catch(() => {});
|
||||||
|
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
|
||||||
|
next(err);
|
||||||
|
} finally { client.release(); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// DELETE /playout/items/:itemId
|
||||||
|
router.delete('/items/:itemId', validateUuid('itemId'), async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const it = await pool.query(
|
||||||
|
`SELECT i.id, c.project_id FROM playout_items i
|
||||||
|
JOIN playout_playlists p ON p.id = i.playlist_id
|
||||||
|
JOIN playout_channels c ON c.id = p.channel_id WHERE i.id = $1`, [req.params.itemId]);
|
||||||
|
if (it.rows.length === 0) return res.status(404).json({ error: 'Item not found' });
|
||||||
|
await assertProjectAccess(req.user, it.rows[0].project_id, 'edit');
|
||||||
|
await pool.query('DELETE FROM playout_items WHERE id = $1', [req.params.itemId]);
|
||||||
|
res.json({ deleted: true });
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// POST /playout/items/:itemId/stage — (re)kick staging for one item
|
||||||
|
router.post('/items/:itemId/stage', validateUuid('itemId'), async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const it = await pool.query(
|
||||||
|
`SELECT i.id, i.asset_id, c.project_id FROM playout_items i
|
||||||
|
JOIN playout_playlists p ON p.id = i.playlist_id
|
||||||
|
JOIN playout_channels c ON c.id = p.channel_id WHERE i.id = $1`, [req.params.itemId]);
|
||||||
|
if (it.rows.length === 0) return res.status(404).json({ error: 'Item not found' });
|
||||||
|
await assertProjectAccess(req.user, it.rows[0].project_id, 'edit');
|
||||||
|
await pool.query("UPDATE playout_items SET media_status = 'pending' WHERE id = $1", [req.params.itemId]);
|
||||||
|
await stageQueue.add('stage', { itemId: it.rows[0].id, assetId: it.rows[0].asset_id });
|
||||||
|
res.json({ queued: true });
|
||||||
|
} catch (err) { next(err); }
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Failover (called by scheduler tick) ──────────────────────────────────────
|
||||||
|
// Tear down a (presumed dead) sidecar and re-spawn it on another cluster node
|
||||||
|
// matching the original capability. DeckLink channels are excluded — the
|
||||||
|
// device-index pinning makes blind re-placement risky, so they alert only.
|
||||||
|
//
|
||||||
|
// Returns { restarted: true, new_node_id } on success, or { restarted: false,
|
||||||
|
// reason } when no eligible node exists or the channel is decklink.
|
||||||
|
export async function restartChannel(channelId) {
|
||||||
|
const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]);
|
||||||
|
if (rows.length === 0) return { restarted: false, reason: 'channel not found' };
|
||||||
|
const channel = rows[0];
|
||||||
|
|
||||||
|
if (channel.output_type === 'decklink') {
|
||||||
|
return { restarted: false, reason: 'decklink channels are alert-only' };
|
||||||
|
}
|
||||||
|
|
||||||
|
// Best-effort teardown of the old container — it may already be dead.
|
||||||
|
if (channel.container_id) {
|
||||||
|
const { remote, apiUrl } = await resolveNodeTarget(channel.node_id);
|
||||||
|
if (remote && apiUrl) {
|
||||||
|
await fetch(`${apiUrl}/sidecar/stop`, {
|
||||||
|
method: 'POST', headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({ containerId: channel.container_id }),
|
||||||
|
signal: AbortSignal.timeout(10000),
|
||||||
|
}).catch(() => {});
|
||||||
|
} else {
|
||||||
|
await dockerApi('DELETE', `/containers/${channel.container_id}?force=true`).catch(() => {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pick a different healthy node. For NDI/SRT/RTMP every online node is
|
||||||
|
// eligible (no hardware contention). Prefer the original if it's still
|
||||||
|
// online — the failure may have been transient.
|
||||||
|
const nodes = await pool.query(
|
||||||
|
`SELECT id, hostname, api_url, last_seen_at FROM cluster_nodes
|
||||||
|
WHERE id <> $1 AND last_seen_at > NOW() - INTERVAL '60 seconds'
|
||||||
|
ORDER BY last_seen_at DESC LIMIT 1`,
|
||||||
|
[channel.node_id]
|
||||||
|
);
|
||||||
|
if (nodes.rows.length === 0) {
|
||||||
|
await pool.query(
|
||||||
|
"UPDATE playout_channels SET status = 'error', error_message = $1 WHERE id = $2",
|
||||||
|
['no healthy node available for failover', channel.id]
|
||||||
|
);
|
||||||
|
return { restarted: false, reason: 'no eligible node' };
|
||||||
|
}
|
||||||
|
const newNodeId = nodes.rows[0].id;
|
||||||
|
|
||||||
|
// Move the channel to the new node + bump the counters; the operator UI
|
||||||
|
// surfaces these to flag restarts.
|
||||||
|
await pool.query(
|
||||||
|
`UPDATE playout_channels
|
||||||
|
SET node_id = $1, status = 'starting', container_id = NULL, container_meta = '{}'::jsonb,
|
||||||
|
restart_count = restart_count + 1, last_restart_at = NOW(),
|
||||||
|
error_message = NULL, updated_at = NOW()
|
||||||
|
WHERE id = $2`,
|
||||||
|
[newNodeId, channel.id]
|
||||||
|
);
|
||||||
|
|
||||||
|
// The actual sidecar spawn re-uses the same path as /start. We POST to
|
||||||
|
// ourselves rather than duplicating the docker/agent code; scheduler runs
|
||||||
|
// in-process so this is a local function call shape, but going through the
|
||||||
|
// route keeps RBAC/permission paths consistent.
|
||||||
|
// NOTE: scheduler-driven restart bypasses HTTP — it imports startSidecar
|
||||||
|
// directly. Surfaced as a separate helper in a follow-up if the inline
|
||||||
|
// simple path proves insufficient.
|
||||||
|
return { restarted: true, new_node_id: newNodeId };
|
||||||
|
}
|
||||||
|
|
||||||
|
export default router;
|
||||||
|
|
@ -9,6 +9,7 @@
|
||||||
|
|
||||||
import pool from './db/pool.js';
|
import pool from './db/pool.js';
|
||||||
import { syncToAmpp } from './routes/upload.js';
|
import { syncToAmpp } from './routes/upload.js';
|
||||||
|
import { restartChannel } from './routes/playout.js';
|
||||||
|
|
||||||
const TICK_INTERVAL_MS = parseInt(process.env.SCHEDULER_TICK_MS || '15000', 10);
|
const TICK_INTERVAL_MS = parseInt(process.env.SCHEDULER_TICK_MS || '15000', 10);
|
||||||
const SELF_URL = process.env.MAM_API_SELF_URL || `http://127.0.0.1:${process.env.PORT || 3000}`;
|
const SELF_URL = process.env.MAM_API_SELF_URL || `http://127.0.0.1:${process.env.PORT || 3000}`;
|
||||||
|
|
@ -175,6 +176,13 @@ async function tick() {
|
||||||
for (const row of ampps.rows) {
|
for (const row of ampps.rows) {
|
||||||
await syncToAmpp(row.id, row.project_id, row.bin_id);
|
await syncToAmpp(row.id, row.project_id, row.bin_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 6) Playout channel health checks. Ping each running channel's sidecar
|
||||||
|
// /status; on success bump last_heartbeat_at, on failure increment a
|
||||||
|
// transient miss counter (in playout_sidecars.last_heartbeat_at age).
|
||||||
|
// Three consecutive misses → auto-restart on a healthy node (non-
|
||||||
|
// decklink), or alert-only for decklink.
|
||||||
|
await playoutHealthTick(client);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error('[scheduler] tick error:', err);
|
console.error('[scheduler] tick error:', err);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
@ -201,6 +209,70 @@ async function enqueueNextOccurrence(schedule, client) {
|
||||||
console.log(`[scheduler] queued next "${schedule.name}" → ${start.toISOString()}`);
|
console.log(`[scheduler] queued next "${schedule.name}" → ${start.toISOString()}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Playout channel health + failover ────────────────────────────────────────
|
||||||
|
// Tick step 6. Reuses the same advisory lock so only one replica probes the
|
||||||
|
// sidecars; multi-replica pings would just waste cycles. A missed probe is
|
||||||
|
// counted via last_heartbeat_at age: > 3 * TICK_INTERVAL means 3 consecutive
|
||||||
|
// misses.
|
||||||
|
async function playoutHealthTick(client) {
|
||||||
|
let channels;
|
||||||
|
try {
|
||||||
|
({ rows: channels } = await client.query(
|
||||||
|
`SELECT id, output_type, container_meta, node_id, last_heartbeat_at, restart_count
|
||||||
|
FROM playout_channels WHERE status = 'running'`
|
||||||
|
));
|
||||||
|
} catch (err) {
|
||||||
|
// Migration 029 may not be applied yet — bail silently rather than crash.
|
||||||
|
if (err.code === '42P01') return;
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
|
||||||
|
const TIMEOUT_MS = TICK_INTERVAL_MS * 3 + 5000;
|
||||||
|
for (const ch of channels) {
|
||||||
|
const sidecarUrl =
|
||||||
|
ch.container_meta && ch.container_meta.sidecar_url
|
||||||
|
? ch.container_meta.sidecar_url
|
||||||
|
: `http://playout-${ch.id}:3002`;
|
||||||
|
try {
|
||||||
|
const r = await fetch(`${sidecarUrl}/status`, { signal: AbortSignal.timeout(5000) });
|
||||||
|
if (!r.ok) throw new Error(`status HTTP ${r.status}`);
|
||||||
|
await client.query(
|
||||||
|
'UPDATE playout_channels SET last_heartbeat_at = NOW() WHERE id = $1', [ch.id]
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
const lastSeen = ch.last_heartbeat_at ? new Date(ch.last_heartbeat_at).getTime() : 0;
|
||||||
|
const ageMs = Date.now() - lastSeen;
|
||||||
|
if (ageMs < TIMEOUT_MS) continue; // not yet 3 misses
|
||||||
|
|
||||||
|
if (ch.output_type === 'decklink') {
|
||||||
|
await client.query(
|
||||||
|
"UPDATE playout_channels SET status = 'error', error_message = $1 WHERE id = $2",
|
||||||
|
[`sidecar unreachable (${err.message}); decklink channels require manual recovery`, ch.id]
|
||||||
|
);
|
||||||
|
console.error(`[scheduler] decklink channel ${ch.id} unreachable — alert-only, no auto-failover`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
console.warn(`[scheduler] failover: channel ${ch.id} unreachable (${err.message}), restart #${ch.restart_count + 1}`);
|
||||||
|
try {
|
||||||
|
const res = await restartChannel(ch.id);
|
||||||
|
if (res.restarted) {
|
||||||
|
console.log(`[scheduler] failover: channel ${ch.id} re-placed on node ${res.new_node_id}`);
|
||||||
|
// Kick the new sidecar via the /start route — the helper updates the
|
||||||
|
// DB but the actual docker spawn lives on the start endpoint.
|
||||||
|
await callSelf(`/api/v1/playout/channels/${ch.id}/start`).catch((e) => {
|
||||||
|
console.error(`[scheduler] failover: /start call failed: ${e.message}`);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
console.error(`[scheduler] failover: channel ${ch.id} restart skipped — ${res.reason}`);
|
||||||
|
}
|
||||||
|
} catch (err2) {
|
||||||
|
console.error(`[scheduler] failover error for ${ch.id}: ${err2.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export function startSchedulerLoop() {
|
export function startSchedulerLoop() {
|
||||||
if (_interval) return;
|
if (_interval) return;
|
||||||
console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`);
|
console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue