Compare commits

...

8 commits

Author SHA1 Message Date
Zac
34352e3299 docs(playout): work log — commit map, decisions, testing checklist
Replaces the earlier aspirational "complete" log with the actual commit
sequence on feat/playout-mcr, the §7 decisions as built, the media-flow
diagram, port-contention + failover scope, and a runtime testing checklist
(migration → image build → SRT smoke → failover kill test).
2026-05-30 14:05:57 +00:00
Zac
d505a488ac build(playout): compose wiring + .env knobs
- Add /mnt/NVME/MAM/wild-dragon-media:/media to mam-api (rw) and worker-p4
  (rw); web-ui (ro, for serving HLS preview segments).
- worker-p4 WORKER_QUEUES gains 'playout-stage' so master-tier nodes pick up
  the loudnorm stage jobs (they already have ffmpeg + the media mount).
- New build-only 'playout' service with profile ["build-only"] so
  `docker compose --profile build-only build playout` produces the
  wild-dragon-playout:latest image without compose trying to up it as a
  long-running service. mam-api spawns these on demand.
- mam-api env adds PLAYOUT_IMAGE + PLAYOUT_AMCP_BASE_PORT (5250 default).
- .env.example: PLAYOUT_IMAGE, PLAYOUT_AMCP_BASE_PORT.
2026-05-30 14:05:57 +00:00
Zac
793011b78b feat(web-ui): MCR page — channels, playlist, transport, preview
screens-playout.jsx + styles-playout.css: program monitor (HLS preview from
the sidecar), media bin, drag-drop playlist editor, transport controls. Plain
HTML5 drag-drop, no extra library. Talks to /api/v1/playout via
ZAMPP_API.fetch.

Wired into the shell: "Playout" under Operations, breadcrumb mapping, route
case in app.jsx, stylesheet + dist/screens-playout.js script in index.html.
Format dropdown defaults to 1080p5994 (matches the new channel default).
2026-05-30 14:02:25 +00:00
Zac
5538683d78 feat(mam-api): /playout control plane + auto-failover
Routes: channel + playlist CRUD, start/stop/play/pause/skip transport, as-run
log. RBAC via assertProjectAccess on channel.project_id; null project ⇒
admin-only (recorder convention).

Sidecar orchestration mirrors recorders.js: Docker socket for local node,
node-agent /sidecar/start for remote. Channel start passes CHANNEL_ID env so
the sidecar can write HLS preview to /media/live/<id>.

DeckLink port-contention guard: blocks starting a decklink channel when a
recorder or another channel on the same node+device_index is active.

restartChannel(id) helper picks another healthy cluster node and re-places
non-decklink channels; decklink is alert-only. Exposed for the scheduler.

Scheduler tick adds step 6: poll each running channel's sidecar /status,
update last_heartbeat_at, and after ~3 misses trigger restartChannel +
self-call /start. Reuses the existing PG advisory lock so multi-replica
deploys don't double-fire failovers.
2026-05-30 14:02:25 +00:00
Zac
d62af34e98 feat(playout): CasparCG sidecar image + Node AMCP shim
One container per channel. Built like capture/build-with-decklink: NDI +
DeckLink SDKs fetched at build, runs --privileged with Xvfb for the GL
context where no real display is present.

Components:
- entrypoint.sh: Xvfb + CasparCG launch, creates /media/live/<CHANNEL_ID>
- src/amcp.js: TCP AMCP client
- src/playout-manager.js: channel lifecycle, playlist walk via LOADBG AUTO
  for gapless transitions; primary consumer (decklink/ndi/srt/rtmp) plus a
  second FFMPEG HLS consumer (~600 kbps, 2s segments) for the UI preview
- src/index.js: HTTP shim — /channel/start, /playlist/load, transport
- frame-rate helper picks fps from video_format (59.94 → 60000/1001) so
  SEEK / LENGTH frame math is correct
2026-05-30 14:02:25 +00:00
Zac
209f9fda52 feat(worker): playout-stage job — S3 → /media + EBU R128 loudnorm
Stages playlist items from S3 to the shared CasparCG media volume. Pass 1
measures, pass 2 applies linear loudnorm (I=-23 LUFS, TP=-1 dBTP, LRA=11);
output is AAC 192k @ 48 kHz, video stream copied. Atomic rename on success
so CasparCG never sees a partial file. Per-item audio_normalized flag means
re-stages of the same asset skip the loudnorm pass.

Wired into worker/src/index.js behind WORKER_QUEUES=playout-stage so
capability-routed deploys can pin it to nodes that already have ffmpeg +
the media mount.
2026-05-30 14:02:25 +00:00
Zac
29187a90df feat(mam-api): migration 029 — playout schema
Six tables: channels, playlists, items, sidecars (sidecar registry for
health-check), schedule (Phase B), as-run log.

- video_format default 1080p5994 (house standard, capture cadence)
- restart_count / last_restart_at / last_heartbeat_at on channels for
  auto-failover bookkeeping
- audio_normalized flag on items so re-stages skip the loudnorm pass
- unique partial index on (channel_id) for running sidecars
2026-05-30 14:02:25 +00:00
Zac
512267159a docs(playout): MCR design spec — Phase A playlist + Phase B 24/7
Single-doc design covering the playout subsystem: CasparCG-backed sidecars,
multi-channel placement, S3→/media staging, scheduling phases, the data
model, channel placement vs port contention.

§7 questions are answered inline (2026-05-30): −23 LUFS at stage time,
1080p5994 default, HLS preview v1, auto-restart-on-healthy-node failover
(DeckLink alert-only).
2026-05-30 14:02:25 +00:00
22 changed files with 2689 additions and 2 deletions

View file

@ -63,3 +63,10 @@ GOOGLE_ALLOWED_DOMAIN=
# Note: if a Google-linked account also has TOTP enabled, sign-in still requires
# the authenticator code (Google is treated as the first factor). Accounts without
# TOTP complete sign-in in one Google step.
# Playout / Master Control (MCR)
# Image tag the mam-api spawns when a channel starts. Build with:
# docker compose --profile build-only build playout
PLAYOUT_IMAGE=wild-dragon-playout:latest
# Base AMCP port — each channel binds to BASE + channel_id (in CasparCG terms).
PLAYOUT_AMCP_BASE_PORT=5250

101
WORK_LOG_PLAYOUT.md Normal file
View file

@ -0,0 +1,101 @@
# Playout / Master Control — Implementation Work Log
**Branch:** `feat/playout-mcr` (off `main`)
**Started:** 2026-05-30
**Status:** Code complete, awaiting runtime validation
Tracks the build of the playout (MCR) subsystem against the design at
`docs/superpowers/specs/2026-05-30-playout-mcr-design.md`.
---
## Commit sequence
| # | Commit | Scope |
|---|--------|-------|
| 1 | `docs(playout)` | Design spec, §7 questions answered |
| 2 | `feat(mam-api): migration 029` | Six tables, failover columns, audio_normalized flag |
| 3 | `feat(worker): playout-stage` | S3 → /media + EBU R128 loudnorm + index.js wiring |
| 4 | `feat(playout): sidecar` | CasparCG image + AMCP shim, HLS preview consumer, fps-aware frame math |
| 5 | `feat(mam-api): /playout control plane + auto-failover` | Routes + scheduler health tick + restartChannel helper |
| 6 | `feat(web-ui): MCR page` | screens-playout, styles, app/shell/index.html wiring |
| 7 | `build(playout): compose wiring + .env knobs` | /media volume, queue addition, build-only service |
| 8 | `docs(playout): work log` | This file |
## Resolved §7 decisions (2026-05-30)
- **Audio loudness:** pre-normalize at stage time. ffmpeg `loudnorm` two-pass
(I=-23 LUFS, TP=-1 dBTP, LRA=11), linear mode preserves dynamics. Output
AAC 192k @ 48 kHz, video stream copied. Per-item `audio_normalized` flag
so re-stages of the same asset skip the pass.
- **Frame rate:** `1080p5994` default (was `1080i5994`). Per-channel
override allowed via `video_format`. `fpsFor(videoFormat)` helper in
the sidecar drives SEEK / LENGTH / transition-frames math.
- **Preview latency:** HLS v1. CasparCG runs a second FFMPEG consumer
alongside the primary output, writing `/media/live/<channel_id>/index.m3u8`
(~600 kbps, 2s segments, 6-window list). Web UI plays via the existing
HLS plumbing.
- **Failover:** auto-restart on healthy node for NDI/SRT/RTMP. Alert-only
for DeckLink (device-index pinning makes blind re-placement risky).
Scheduler tick (PG advisory lock, same lock as recorder schedules) polls
sidecar `/status`; ~3 missed checks → `restartChannel(id)` picks the most
recently-seen-online other node, bumps `restart_count`, calls `/start`.
## Architecture notes
**Sidecar model.** One CasparCG container per channel. Spawned by mam-api
via local Docker socket (primary node) or remote node-agent
`/sidecar/start`. Tracked in `playout_sidecars` plus `playout_channels.container_id`.
Killed on `/stop` or by `restartChannel` during failover.
**Media flow.**
```
S3 master/proxy → playout-stage worker → /media/playout/<assetId>.<ext>
(loudnormed, AAC@-23 LUFS)
CasparCG channel #1
primary consumer HLS consumer
(DeckLink/NDI/ ↓
SRT/RTMP) /media/live/<ch_id>/*.m3u8
```
**Port contention.** `assertDeckLinkFree()` blocks starting a SDI channel
when a recorder or another channel on the same node+device_index is active.
**Failover scope.** NDI/SRT/RTMP have no hardware tie, so any healthy
cluster_node is eligible. DeckLink channels surface an alert in the UI
(`status='error'` + `error_message`) and require operator intervention.
## Testing checklist
- [ ] Apply migration 029 on dev DB
- [ ] Build playout image: `docker compose --profile build-only build playout`
- [ ] Build web-ui (`screens-playout` joins the esbuild list automatically)
- [ ] Create channel via POST /api/v1/playout/channels (SRT first, no HW)
- [ ] Stage 2-3 assets to a playlist, verify loudnorm metadata in stderr
- [ ] Start channel → sidecar container appears in `docker ps`
- [ ] AMCP smoke: `telnet <host> 5250`, `VERSION`, `INFO`
- [ ] Play playlist; verify HLS at /media/live/<id>/index.m3u8
- [ ] Skip / pause / resume / stop
- [ ] As-run log: GET /api/v1/playout/channels/:id/asrun
- [ ] Kill sidecar container → scheduler should restart on another node
within ~3 ticks (~45s), restart_count increments
- [ ] DeckLink channel kill: status flips to 'error', NO restart attempt
- [ ] Try starting a decklink channel on a device_index already held by a
recorder → 409
- [ ] MCR UI smoke: nav entry visible, page renders, drag-drop adds items,
transport buttons hit the API
## Known gaps (deferred)
- No WebRTC preview (HLS-only v1 — 4-6s lag, fine for confidence monitor).
- No graphics/CG overlay layer in Phase A (templates land in Phase B).
- No Phase B scheduler / 24/7 wall-clock channel (schema is in place,
scheduler tick is not).
- No multi-channel grid view (one channel at a time per page).
- No timecode / remaining-duration overlay (would need CasparCG INFO poll).
- No audio level meters on the UI.
- `restartChannel` updates DB state and triggers `/start`; if the new node
also fails repeatedly, there's no exponential backoff yet — bounded only
by the manual stop button.

View file

@ -40,6 +40,7 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
- /mnt/NVME/MAM/wild-dragon-live:/live
- /mnt/NVME/MAM/wild-dragon-growing:/growing
- /mnt/NVME/MAM/wild-dragon-media:/media
- /mnt/NVME/MAM/sdk:/sdk
- /dev/shm:/dev/shm
- /run/dbus:/run/dbus
@ -61,6 +62,8 @@ services:
NODE_IP: ${NODE_IP}
NODE_HOSTNAME: ${NODE_HOSTNAME:-}
CAPTURE_TOKEN: ${CAPTURE_TOKEN}
PLAYOUT_IMAGE: ${PLAYOUT_IMAGE:-wild-dragon-playout:latest}
PLAYOUT_AMCP_BASE_PORT: ${PLAYOUT_AMCP_BASE_PORT:-5250}
deploy:
resources:
reservations:
@ -120,14 +123,16 @@ services:
# after the capability-routing split, so import jobs sat unprocessed and
# assets stayed `ingesting` forever. import is concurrency-1 + network-
# bound, so one consumer (this heavy/primary worker) is sufficient.
WORKER_QUEUES: proxy,conform,trim,import
WORKER_QUEUES: proxy,conform,trim,import,playout-stage
RUN_PROMOTION: "true"
PROXY_CONCURRENCY: "2"
PLAYOUT_MEDIA_DIR: /media
NVIDIA_VISIBLE_DEVICES: GPU-79afca3e-2ab2-a6ea-1c44-706c1f0a26d6
WORKER_LABEL: "zampp1 / Tesla P4"
NVIDIA_DRIVER_CAPABILITIES: video,compute,utility
volumes:
- /mnt/NVME/MAM/wild-dragon-growing:/growing
- /mnt/NVME/MAM/wild-dragon-media:/media
networks:
- wild-dragon
@ -176,12 +181,22 @@ services:
- "${PORT_WEB_UI:-7434}:80"
volumes:
- /mnt/NVME/MAM/wild-dragon-live:/live
- /mnt/NVME/MAM/wild-dragon-media:/media:ro
- /dev/shm:/dev/shm
- /run/dbus:/run/dbus
- /run/systemd:/run/systemd
networks:
- wild-dragon
# Build-only: the CasparCG sidecar image. mam-api spawns these on-demand per
# channel (one container per playout channel), so this service is never up'd —
# it exists so `docker compose build playout` produces the image the API tags
# via PLAYOUT_IMAGE. Profile excludes it from default `up`.
playout:
profiles: ["build-only"]
build: ./services/playout
image: wild-dragon-playout:latest
volumes:
postgres_data:
redis_data:

View file

@ -0,0 +1,235 @@
# Wild Dragon MAM — Playout / Master Control (MCR)
**Date:** 2026-05-30 (revised 2026-05-30 — §7 closed)
**Status:** APPROVED — implementation in progress (code drafted but uncommitted; see WORK_LOG_PLAYOUT.md)
**Author:** Zac + Claude
---
## Resolved Decisions
| Question | Decision |
|----------|----------|
| Playout engine | **CasparCG Server** (orchestrated via AMCP), not ffmpeg-native |
| Channel count | **Multi-channel from start** — N independent channels, placed across cluster nodes by capability (mirrors recorders) |
| Scheduling model | **Phased** — Phase A: on-demand playlist player; Phase B: 24/7 continuous channel |
| Output targets | SDI (DeckLink), NDI, SRT, RTMP — all via CasparCG consumers |
| Media source | Assets live in **S3**; must be staged to a CasparCG-local media volume before play (see §4) |
| CasparCG packaging | **Build our own image** (like `capture/build-with-decklink.sh`) — GL context via GPU passthrough or Xvfb; NDI + DeckLink SDKs fetched at build time (not redistributable) |
| Master codec playability | Zac confirms current masters **play fine in CasparCG** — no transcode-for-playout step; staging is a plain S3→/media copy. Validate on hardware but do not gate on it |
| Management UI | **Single Dragonflight `playout.html` GUI** drives everything via AMCP; operator never touches CasparCG directly |
---
## Overview
Playout adds **master-control-room** capability to Dragonflight: take library assets, arrange them on a timeline / playlist, and play them out continuously to a broadcast output — SDI via DeckLink, or stream via SRT / RTMP / NDI. Drag-and-drop scheduling, a program/preview monitor, and as-run logging.
This is the **mirror image** of the existing capture path. Capture is `input → ffmpeg encode → S3`. Playout is `S3 asset → engine → output`. We reuse three things wholesale:
1. **Cluster node + capability model** — nodes already advertise DeckLink/Deltacast/GPU in `cluster_nodes.capabilities`; channels are placed on nodes that have a free output port, exactly as recorders claim input ports.
2. **Sidecar orchestration** — mam-api spawns containers via the local Docker socket or the remote `node-agent /sidecar/start`. A CasparCG channel is just a different sidecar image.
3. **Scheduler tick + PG advisory lock**`src/scheduler.js` already runs a single-leader tick over a schedule table. Phase B's wall-clock channel reuses this pattern.
### Why CasparCG over ffmpeg-native
The capture stack proves we can drive ffmpeg + DeckLink. But playout's hard part is **gapless, frame-accurate, clean transitions between clips** — every clip boundary in an ffmpeg-per-clip model is a black flash unless we engineer a concat-feeder. CasparCG solves this natively: a channel is a persistent output with a playlist, hard/mix/wipe transitions, layered graphics/logo (CG), and DeckLink/NDI/SRT/RTMP consumers built in. We orchestrate it over **AMCP** (its TCP control protocol) instead of reinventing a feeder. Trade: a new dependency + container image, and media must be on a CasparCG-visible disk (§4).
---
## 1. Data Model
New migration `029-playout.sql`. Five tables.
### 1.1 `playout_channels`
A logical output. One channel → one engine instance → one output target.
```
id uuid pk
name text -- "Channel 1", "Pop-up SDI"
node_id uuid -> cluster_nodes(id) -- where the engine runs (null = primary)
output_type text -- 'decklink' | 'ndi' | 'srt' | 'rtmp'
output_config jsonb -- { device_index } | { ndi_name } | { url, latency } | { url, key }
video_format text -- '1080i5994' | '1080p5994' | '720p5994' ...
status text -- 'stopped' | 'starting' | 'running' | 'error'
container_id text -- running CasparCG sidecar
project_id uuid -> projects(id) -- RBAC scoping (nullable = admin-only)
created_at / updated_at
```
`output_type` + `output_config` map straight to a CasparCG consumer:
- `decklink``ADD <ch> DECKLINK <device> ...`
- `ndi``ADD <ch> NDI ...`
- `srt`/`rtmp` → `ADD <ch> FFMPEG <url> -f mpegts ...` (CasparCG 2.3+ ffmpeg consumer)
### 1.2 `playout_playlists`
An ordered list of items bound to a channel. Phase A's primary object.
```
id, channel_id -> playout_channels(id)
name, loop boolean, created_at / updated_at
```
### 1.3 `playout_items`
One entry on a playlist OR one entry on the 24/7 timeline.
```
id
playlist_id uuid -> playout_playlists(id) -- Phase A
asset_id uuid -> assets(id)
sort_order int -- position in playlist (Phase A)
scheduled_at timestamptz -- wall-clock start (Phase B, null in A)
in_point numeric -- seconds, trim head (reuse subclip in/out from editor)
out_point numeric -- seconds, trim tail
transition text -- 'cut' | 'mix' | 'wipe'
transition_ms int
graphics jsonb -- optional CG/template overlay (Phase B+)
media_status text -- 'pending' | 'staging' | 'ready' | 'error' (see §4)
media_path text -- resolved path inside the CasparCG media volume
```
### 1.4 `playout_schedule` (Phase B)
Day-ahead, wall-clock-bound timeline rows. Same shape as `playout_items` but `scheduled_at` is authoritative and the scheduler tick (§5) drives transitions. Phase A can ignore this table.
### 1.5 `playout_as_run`
Append-only log: what actually played, when, for how long. Compliance / billing.
```
id, channel_id, asset_id, item_id
started_at, ended_at, duration_s, result -- 'played' | 'skipped' | 'error'
```
---
## 2. Services & Components
### 2.1 New sidecar: `services/playout/` (CasparCG wrapper)
A thin container: **CasparCG Server** + a small Node control shim exposing HTTP, the same way `capture` wraps ffmpeg.
- Base image: official/community `casparcg/server` (Linux build with DeckLink + NDI + FFmpeg producers/consumers).
- Node shim (`src/index.js`): opens an AMCP TCP socket to local CasparCG, exposes:
- `POST /channel/start``ADD <ch> <consumer>` for the channel's output target
- `POST /play``PLAY <ch>-<layer> <media> [transition]`
- `POST /loadbg` + `/play` → preview/cue then take (preview monitor)
- `POST /stop`, `GET /status``INFO <ch>` (current clip, position, fps)
- playlist load → translate `playout_items` rows into a sequence of AMCP `LOADBG`/`PLAY` calls, advancing on `OnTransition` / end-of-clip events.
- Mirrors capture's status-polling contract so the UI monitor reuses existing plumbing.
### 2.2 mam-api: `src/routes/playout.js`
CRUD + control, RBAC-scoped via the existing `assertProjectAccess` helper (channels carry `project_id`).
```
GET /playout/channels list (project-filtered)
POST /playout/channels create (edit on project)
POST /playout/channels/:id/start|stop spawn/kill CasparCG sidecar
GET /playout/channels/:id/status proxy engine INFO
POST /playout/channels/:id/play|pause|skip transport control
GET/POST/PUT/DELETE /playout/playlists... playlist + item CRUD, reorder
POST /playout/items/:id/stage kick S3→media-volume staging (§4)
GET /playout/channels/:id/asrun as-run log
```
Channel start/stop reuses `resolveNodeTarget()` + the Docker-socket / `node-agent /sidecar/start` split already in `recorders.js`. **Refactor opportunity:** lift that sidecar-spawn logic out of `recorders.js` into `src/orchestration/sidecar.js` so both recorders and playout share it (keep this small — only what both need).
### 2.3 web-ui: `playout.html` + `public/playout.jsx`
New MCR page. Layout:
```
┌─ PREVIEW ───────────┬─ PROGRAM (on air) ──────┐
│ [cued clip] │ [live output] ● ON AIR │
│ TC / duration │ TC / remaining │
│ [CUE] [TAKE] │ [PLAY][PAUSE][SKIP][STOP]│
├─ MEDIA BIN ─────────┴──────────────────────────┤
│ (draggable asset list, reuse asset browser) │
├─ PLAYLIST / TIMELINE ──────────────────────────┤
│ ▸ clip A ──▸ clip B ──▸ clip C (drag-drop) │ Phase A: ordered list
│ └ 24h time grid w/ now-bar │ Phase B: time-of-day grid
└────────────────────────────────────────────────┘
```
- Drag-drop: reuse whatever the NLE editor timeline uses (check `editor.jsx`); assets drag from the bin into the playlist/grid.
- API via existing `ZAMPP_API.fetch` wrapper.
- Program monitor: HLS preview of the output — CasparCG can emit a second low-bitrate FFmpeg consumer to HLS, reusing the `/live/<id>` HLS plumbing capture already uses.
---
## 3. Channel placement & ports
A DeckLink port is exclusive — same constraint capture already handles. A node's DeckLink port can be an **input (recorder)** or an **output (playout channel)**, never both at once. So:
- Extend the capability/port-claim check: when starting a channel on `output_type=decklink`, verify the target node has that device index free (no active recorder, no active channel).
- NDI / SRT / RTMP outputs have no hardware contention → can stack many per node (GPU/CPU-bound only).
- Surface a unified "device map" (extend the existing cluster DeckLink-status endpoint) showing each port's role: idle / recording / playing-out.
---
## 4. Media staging (the S3 ⇄ CasparCG gap)
**The crux.** Assets live in S3 (`original_s3_key` / `proxy_s3_key`). CasparCG plays from a **local media folder**. Options:
- **A — Pre-stage to a shared media volume (recommended).** Before a clip can go on air, download/symlink it from S3 to a CasparCG-visible volume (`/media`), set `playout_items.media_status='ready'` + `media_path`. A new BullMQ `playout-stage` job (reuses the worker pattern) does the pull. UI shows per-item readiness; TAKE is blocked until `ready`. Mirrors the growing-file SMB share already mounted for capture.
- **B — Stream from S3 via presigned URL.** CasparCG FFmpeg producer plays an HTTPS presigned URL directly. Zero staging, but seeking/trim and gapless transitions over network are fragile for broadcast. Acceptable as a fallback for SRT/RTMP, risky for SDI.
**Decision:** Phase A uses **A** (stage proxies for preview, masters for air) with **B** available as a per-channel "low-latency / no-stage" toggle. Zac confirms the current masters play fine in CasparCG, so staging is a **plain S3→/media copy — no transcode-for-playout step**. (Validate on hardware during implementation, but the model does not assume a transcode stage.)
---
## 5. Scheduling
### Phase A — playlist player
No wall clock. Operator builds a `playout_playlists` row, drags items in, hits PLAY. The playout sidecar walks `playout_items` by `sort_order`, cueing the next clip during the current one (`LOADBG`) and taking it at end-of-clip with the configured transition. `loop` repeats. As-run logged per item.
### Phase B — 24/7 continuous channel
Wall-clock timeline in `playout_schedule`. Reuse `src/scheduler.js`:
- Add a second tick (or extend the existing one) under the **same PG advisory lock pattern** — exactly-one-leader, so a multi-replica deploy doesn't double-fire.
- Tick responsibilities: stage upcoming items (look-ahead window), verify the on-air item matches the schedule, **fill gaps** (loop a filler/slate asset when the timeline has a hole — a channel must never go black), roll the day forward.
- As-run becomes the compliance record.
---
## 6. Phasing / Milestones
**Phase A — Playlist playout MVP**
1. Migration `029-playout.sql` (channels, playlists, items, as-run).
2. `services/playout/` sidecar: CasparCG image + AMCP control shim, single output target (start with **SRT or NDI** — no hardware needed for dev; DeckLink behind hardware check).
3. mam-api `routes/playout.js` — channel + playlist CRUD, start/stop, transport, RBAC.
4. `playout-stage` BullMQ job (S3 → /media).
5. web-ui `playout.html` — bin + drag-drop ordered playlist + program/preview monitors + transport.
6. DeckLink output on real hardware; port-contention check vs recorders.
**Phase B — 24/7 continuous channel**
7. `playout_schedule` + time-of-day grid UI.
8. Scheduler tick (advisory-locked) — look-ahead staging, gap-fill/slate, day-roll.
9. As-run reporting view.
10. Graphics/CG overlay (logo bug, lower-thirds) via CasparCG templates.
---
## 7. Open Questions (for review)
**Resolved (2026-05-30):**
- ~~CasparCG packaging~~**build our own image.** Fetch DeckLink + NDI SDKs at build time (not redistributable — same as capture's DeckLink build). GL context for the mixer comes from GPU passthrough on a real node, or **Xvfb** (virtual framebuffer) where there's no display — community images run `--privileged` + X11 socket. Pin the NDI SDK version to what the server expects (`.so` version mismatch is the common docker failure).
- ~~Master codec playability~~ → Zac confirms masters **play fine**; no transcode-for-playout. Staging = plain S3→/media copy.
- ~~Management GUI~~**single Dragonflight `playout.html`** drives everything via AMCP; operator never touches CasparCG.
- ~~Audio loudness~~**pre-normalize at stage time** (Zac, 2026-05-30). `playout-stage` job runs ffmpeg `loudnorm` (EBU R128, target 23 LUFS, true-peak 1 dBTP) once, on the S3→/media copy. Output is the cached version CasparCG plays. Staging is no longer a pure copy — staging cost ≈ realtime CPU per clip on first stage; results are reusable across channels. Override (`media_status='ready'` + raw copy) available for clips already mastered to spec.
- ~~Frame rate~~**`1080p5994`** default for new channels (Zac, 2026-05-30). Progressive 1080 @ 59.94 fps. Per-channel override allowed (`video_format` column). Streaming-friendly (SRT/RTMP/NDI) and current SDI gear accepts it; matches capture's 59.94 cadence.
- ~~Preview latency~~**HLS v1** (Zac, 2026-05-30). Reuse capture's `/live/<id>` HLS plumbing. CasparCG emits a second low-bitrate FFmpeg consumer to HLS. ~46s lag, fine for confidence monitor. Operator desk gets a real downstream monitor off the SDI/NDI output anyway. Revisit WebRTC if MCR operators complain.
- ~~Failover~~**auto-restart on healthy node** (Zac, 2026-05-30). Scheduler tick (§5) monitors `playout_sidecars` health (AMCP ping + container alive); on N missed checks marks the channel `error`, re-places it on another capability-matching node with a free output port, resumes the playlist from the next item after the as-run-logged on-air item. Gap = black/slate for ~530 s during respawn (operator sees a flag in the UI). **DeckLink channels are not auto-failed-over in v1** — device-index pinning makes the destination port non-trivial; v1 alerts and lets the operator move the channel. NDI/SRT/RTMP channels (no hardware contention) failover automatically. Tracked via `restart_count` + `last_restart_at` on `playout_channels`.
**Still open:**
- (none — all §7 questions resolved 2026-05-30)
---
## 8. Reused building blocks (already in the repo)
| Need | Existing piece |
|------|----------------|
| Spawn engine container local/remote | `recorders.js` Docker-socket + `node-agent /sidecar/start` |
| Node capability / port model | `cluster_nodes.capabilities`, cluster DeckLink-status endpoint |
| Single-leader scheduled transitions | `src/scheduler.js` + PG advisory lock |
| Background media jobs | BullMQ worker (`services/worker`) |
| RBAC scoping | `src/auth/authz.js` `assertProjectAccess` (channel/project_id) |
| HLS preview plumbing | capture's `/live/<id>` HLS output |
| Subclip in/out points | NLE editor in/out marking |
| API wrapper / SPA shell | `ZAMPP_API.fetch`, esbuild JSX pages |

View file

@ -0,0 +1,165 @@
-- Migration 029 — Playout / Master Control (MCR).
--
-- Adds a broadcast playout subsystem: take library assets, arrange them on a
-- playlist (Phase A) or a wall-clock timeline (Phase B), and play them out
-- continuously to SDI (DeckLink) / NDI / SRT / RTMP via a CasparCG sidecar.
--
-- This is the mirror of the capture path (input -> ffmpeg -> S3). A channel is
-- placed on a cluster node by capability the same way recorders claim input
-- ports; the engine container is spawned via the same Docker-socket /
-- node-agent orchestration. See docs/superpowers/specs/2026-05-30-playout-mcr-design.md.
--
-- Tables:
-- playout_channels — a logical output (one channel -> one CasparCG instance -> one target)
-- playout_playlists — an ordered list of items bound to a channel (Phase A)
-- playout_items — one clip on a playlist OR one row on the timeline
-- playout_sidecars — running CasparCG sidecar registry (one per channel; health-checked)
-- playout_schedule — wall-clock day-ahead rows (Phase B; unused in A)
-- playout_as_run — append-only log of what actually played (compliance)
-- ── Channels ───────────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS playout_channels (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
node_id UUID REFERENCES cluster_nodes(id) ON DELETE SET NULL,
output_type TEXT NOT NULL DEFAULT 'srt',
-- output_config is consumer-shape-specific:
-- decklink: { "device_index": 1 }
-- ndi: { "ndi_name": "DRAGONFLIGHT CH1" }
-- srt: { "url": "srt://host:9000", "latency": 200 }
-- rtmp: { "url": "rtmp://host/live", "key": "streamkey" }
output_config JSONB NOT NULL DEFAULT '{}'::jsonb,
-- 1080p59.94 is the house standard (matches capture cadence, streaming-friendly,
-- accepted by current SDI gear). Per-channel override allowed.
video_format TEXT NOT NULL DEFAULT '1080p5994',
status TEXT NOT NULL DEFAULT 'stopped',
container_id TEXT,
-- For remote channels the node-agent reports the reachable host:port of the
-- sidecar HTTP shim; stored here so the API can proxy transport calls.
container_meta JSONB NOT NULL DEFAULT '{}'::jsonb,
error_message TEXT,
-- Failover bookkeeping. Scheduler tick health-checks the sidecar; on N missed
-- checks the channel is re-placed on a healthy node (auto for ndi/srt/rtmp,
-- alert-only for decklink — device-index pinning makes re-placement non-trivial).
restart_count INTEGER NOT NULL DEFAULT 0,
last_restart_at TIMESTAMPTZ,
last_heartbeat_at TIMESTAMPTZ,
-- RBAC scoping: a NULL project_id resolves to admin-only (authz.js), the same
-- convention recorders use for unassigned resources.
project_id UUID REFERENCES projects(id) ON DELETE SET NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CHECK (output_type IN ('decklink','ndi','srt','rtmp')),
CHECK (status IN ('stopped','starting','running','error'))
);
CREATE INDEX IF NOT EXISTS idx_playout_channels_node ON playout_channels (node_id);
CREATE INDEX IF NOT EXISTS idx_playout_channels_project ON playout_channels (project_id);
-- ── Playlists ──────────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS playout_playlists (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
channel_id UUID NOT NULL REFERENCES playout_channels(id) ON DELETE CASCADE,
name TEXT NOT NULL,
loop BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_playout_playlists_channel ON playout_playlists (channel_id);
-- ── Items ──────────────────────────────────────────────────────────────────
-- One entry on a playlist (Phase A, ordered by sort_order) OR one entry on the
-- timeline (Phase B, ordered by scheduled_at). in/out points reuse the editor's
-- subclip trim model (seconds). media_status tracks the S3 -> /media staging
-- (see playout-stage worker job); a clip cannot go on air until 'ready'.
CREATE TABLE IF NOT EXISTS playout_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
playlist_id UUID REFERENCES playout_playlists(id) ON DELETE CASCADE,
asset_id UUID NOT NULL REFERENCES assets(id) ON DELETE CASCADE,
sort_order INTEGER NOT NULL DEFAULT 0,
scheduled_at TIMESTAMPTZ,
in_point NUMERIC,
out_point NUMERIC,
transition TEXT NOT NULL DEFAULT 'cut',
transition_ms INTEGER NOT NULL DEFAULT 0,
graphics JSONB,
media_status TEXT NOT NULL DEFAULT 'pending',
media_path TEXT,
-- Set when playout-stage has run loudnorm (EBU R128, -23 LUFS / -1 dBTP) on
-- the staged file. Re-stages skip the loudnorm pass when true.
audio_normalized BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CHECK (transition IN ('cut','mix','wipe')),
CHECK (media_status IN ('pending','staging','ready','error'))
);
CREATE INDEX IF NOT EXISTS idx_playout_items_playlist ON playout_items (playlist_id, sort_order);
CREATE INDEX IF NOT EXISTS idx_playout_items_asset ON playout_items (asset_id);
-- ── Sidecars ───────────────────────────────────────────────────────────────
-- Running CasparCG container registry, one row per running channel. The
-- scheduler tick (src/scheduler.js) pings each sidecar's /status endpoint and
-- updates last_heartbeat_at; missed checks trigger the failover path in
-- routes/playout.js.
CREATE TABLE IF NOT EXISTS playout_sidecars (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
channel_id UUID NOT NULL REFERENCES playout_channels(id) ON DELETE CASCADE,
node_id UUID REFERENCES cluster_nodes(id) ON DELETE SET NULL,
container_id TEXT NOT NULL,
sidecar_url TEXT, -- http://host:port for the shim
amcp_port INTEGER, -- in-container AMCP port (default 5250)
status TEXT NOT NULL DEFAULT 'running',
last_heartbeat_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CHECK (status IN ('starting','running','error','stopped'))
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_playout_sidecars_channel ON playout_sidecars (channel_id)
WHERE status IN ('starting','running');
CREATE INDEX IF NOT EXISTS idx_playout_sidecars_status ON playout_sidecars (status);
-- ── Schedule (Phase B) ─────────────────────────────────────────────────────
-- Wall-clock day-ahead timeline. The scheduler tick (src/scheduler.js, under
-- the existing PG advisory lock) drives transitions and gap-fill. Unused by the
-- Phase A playlist player but created now so the schema is stable.
CREATE TABLE IF NOT EXISTS playout_schedule (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
channel_id UUID NOT NULL REFERENCES playout_channels(id) ON DELETE CASCADE,
asset_id UUID REFERENCES assets(id) ON DELETE SET NULL,
scheduled_at TIMESTAMPTZ NOT NULL,
in_point NUMERIC,
out_point NUMERIC,
transition TEXT NOT NULL DEFAULT 'cut',
transition_ms INTEGER NOT NULL DEFAULT 0,
is_filler BOOLEAN NOT NULL DEFAULT FALSE,
status TEXT NOT NULL DEFAULT 'scheduled',
media_status TEXT NOT NULL DEFAULT 'pending',
media_path TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CHECK (transition IN ('cut','mix','wipe')),
CHECK (status IN ('scheduled','playing','played','skipped','error')),
CHECK (media_status IN ('pending','staging','ready','error'))
);
CREATE INDEX IF NOT EXISTS idx_playout_schedule_channel_time ON playout_schedule (channel_id, scheduled_at);
CREATE INDEX IF NOT EXISTS idx_playout_schedule_status ON playout_schedule (status, scheduled_at);
-- ── As-run log ─────────────────────────────────────────────────────────────
-- Append-only record of what actually went to air. Never updated after insert.
CREATE TABLE IF NOT EXISTS playout_as_run (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
channel_id UUID NOT NULL REFERENCES playout_channels(id) ON DELETE CASCADE,
asset_id UUID REFERENCES assets(id) ON DELETE SET NULL,
item_id UUID,
clip_name TEXT,
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ended_at TIMESTAMPTZ,
duration_s NUMERIC,
result TEXT NOT NULL DEFAULT 'played',
CHECK (result IN ('played','skipped','error'))
);
CREATE INDEX IF NOT EXISTS idx_playout_as_run_channel ON playout_as_run (channel_id, started_at DESC);

View file

@ -22,6 +22,7 @@ import jobsRouter from './routes/jobs.js';
import captureRouter from './routes/capture.js';
import uploadRouter from './routes/upload.js';
import recordersRouter from './routes/recorders.js';
import playoutRouter from './routes/playout.js';
import settingsRouter from './routes/settings.js';
import amppRouter from './routes/ampp.js';
import groupsRouter from './routes/groups.js';
@ -132,6 +133,7 @@ app.use('/api/v1/jobs', jobsRouter);
app.use('/api/v1/capture', captureRouter);
app.use('/api/v1/upload', uploadRouter);
app.use('/api/v1/recorders', recordersRouter);
app.use('/api/v1/playout', playoutRouter);
app.use('/api/v1/settings', settingsRouter);
app.use('/api/v1/ampp', amppRouter);
app.use('/api/v1/groups', requireAdmin, groupsRouter);

View file

@ -0,0 +1,675 @@
// Playout / Master Control routes.
//
// Control plane for the CasparCG-backed playout subsystem. Channels are placed
// on cluster nodes and their engine containers spawned via the same Docker-socket
// / node-agent path recorders use; the channel's transport (play / pause / skip)
// is proxied through to the sidecar's HTTP shim, which drives CasparCG over AMCP.
//
// RBAC: every channel carries a project_id (NULL = admin-only, the recorder
// convention). List routes filter by accessible projects; mutating routes assert
// 'edit'. See docs/superpowers/specs/2026-05-30-playout-mcr-design.md.
import express from 'express';
import http from 'http';
import { Queue } from 'bullmq';
import pool from '../db/pool.js';
import { validateUuid } from '../middleware/errors.js';
import {
assertProjectAccess, accessibleProjectIds, isAdmin,
} from '../auth/authz.js';
const router = express.Router();
// ── BullMQ: media staging queue (S3 -> /media volume) ────────────────────────
const parseRedisUrl = (url) => {
const parsed = new URL(url);
return { host: parsed.hostname, port: parseInt(parsed.port, 10) };
};
const stageQueue = new Queue('playout-stage', {
connection: parseRedisUrl(process.env.REDIS_URL || 'redis://queue:6379'),
});
// ── Sidecar orchestration (mirrors recorders.js) ─────────────────────────────
const PLAYOUT_SIDECAR_IMAGE = process.env.PLAYOUT_IMAGE || 'wild-dragon-playout:latest';
function dockerApi(method, path, body = null) {
return new Promise((resolve, reject) => {
const options = {
socketPath: '/var/run/docker.sock',
path: `/v1.43${path}`,
method,
headers: { 'Content-Type': 'application/json' },
};
const req = http.request(options, (res) => {
let data = '';
res.on('data', (chunk) => (data += chunk));
res.on('end', () => {
try { resolve({ status: res.statusCode, data: data ? JSON.parse(data) : {} }); }
catch { resolve({ status: res.statusCode, data }); }
});
});
req.on('error', reject);
req.setTimeout(10000, () => req.destroy(new Error('Docker API timeout after 10s')));
if (body) req.write(JSON.stringify(body));
req.end();
});
}
async function resolveNodeTarget(nodeId) {
if (!nodeId) return { remote: false };
const r = await pool.query(
'SELECT hostname, ip_address, api_url FROM cluster_nodes WHERE id = $1', [nodeId]
);
if (r.rows.length === 0) return { remote: false };
const node = r.rows[0];
const localHostname = process.env.NODE_HOSTNAME || '';
if (!node.api_url || node.hostname === localHostname) return { remote: false };
return { remote: true, apiUrl: node.api_url, ip: node.ip_address };
}
// The sidecar shim listens on this port inside the container. The mam-api talks
// to it by container alias on the shared docker network (local) or via the
// node-agent's returned host:port (remote).
const SIDECAR_HTTP_PORT = 3002;
function channelAlias(id) { return `playout-${id}`; }
// Resolve the base URL the API uses to reach a running channel's sidecar shim.
// Local: the docker-network alias. Remote: the node-agent reported the host the
// container is published on (stored in container_meta.sidecar_url).
function sidecarBaseUrl(channel) {
if (channel.container_meta && channel.container_meta.sidecar_url) {
return channel.container_meta.sidecar_url;
}
return `http://${channelAlias(channel.id)}:${SIDECAR_HTTP_PORT}`;
}
async function callSidecar(channel, path, method = 'POST', body = null) {
const url = `${sidecarBaseUrl(channel)}${path}`;
const res = await fetch(url, {
method,
headers: { 'Content-Type': 'application/json' },
body: body ? JSON.stringify(body) : undefined,
signal: AbortSignal.timeout(20000),
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`sidecar ${method} ${path} -> HTTP ${res.status}: ${text.slice(0, 200)}`);
}
return res.json().catch(() => ({}));
}
// ── Serialization ────────────────────────────────────────────────────────────
function channelToJson(r) {
return {
id: r.id,
name: r.name,
node_id: r.node_id,
output_type: r.output_type,
output_config: r.output_config,
video_format: r.video_format,
status: r.status,
container_id: r.container_id,
error_message: r.error_message,
project_id: r.project_id,
restart_count: r.restart_count ?? 0,
last_restart_at: r.last_restart_at,
last_heartbeat_at: r.last_heartbeat_at,
created_at: r.created_at,
updated_at: r.updated_at,
};
}
const OUTPUT_TYPES = new Set(['decklink', 'ndi', 'srt', 'rtmp']);
// ── Param resolver: scope every /:id route to the channel's project ──────────
router.param('id', async (req, res, next) => {
validateUuid('id')(req, res, () => {});
if (res.headersSent) return;
try {
const { rows } = await pool.query(
'SELECT * FROM playout_channels WHERE id = $1', [req.params.id]
);
if (rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
req.channel = rows[0];
await assertProjectAccess(req.user, req.channel.project_id, 'view');
next();
} catch (err) { next(err); }
});
async function requireChannelEdit(req, res, next) {
try { await assertProjectAccess(req.user, req.channel.project_id, 'edit'); next(); }
catch (err) { next(err); }
}
// ── Channels ─────────────────────────────────────────────────────────────────
// GET /playout/channels — list (filtered to accessible projects)
router.get('/channels', async (req, res, next) => {
try {
let rows;
if (isAdmin(req.user)) {
({ rows } = await pool.query('SELECT * FROM playout_channels ORDER BY created_at DESC'));
} else {
const ids = await accessibleProjectIds(req.user);
if (ids.length === 0) return res.json([]);
({ rows } = await pool.query(
'SELECT * FROM playout_channels WHERE project_id = ANY($1) ORDER BY created_at DESC', [ids]
));
}
res.json(rows.map(channelToJson));
} catch (err) { next(err); }
});
// POST /playout/channels — create
router.post('/channels', async (req, res, next) => {
try {
const { name, node_id = null, output_type = 'srt', output_config = {},
video_format = '1080p5994', project_id = null } = req.body || {};
if (!name || typeof name !== 'string') {
return res.status(400).json({ error: 'name is required' });
}
if (!OUTPUT_TYPES.has(output_type)) {
return res.status(400).json({ error: `output_type must be one of: ${[...OUTPUT_TYPES].join(', ')}` });
}
// Creating a project-scoped channel requires edit on that project; a
// null-project (admin-only) channel requires admin.
if (project_id) await assertProjectAccess(req.user, project_id, 'edit');
else if (!isAdmin(req.user)) return res.status(403).json({ error: 'admin required for unassigned channel' });
const { rows } = await pool.query(
`INSERT INTO playout_channels (name, node_id, output_type, output_config, video_format, project_id)
VALUES ($1,$2,$3,$4,$5,$6) RETURNING *`,
[name.trim(), node_id, output_type, JSON.stringify(output_config), video_format, project_id]
);
res.status(201).json(channelToJson(rows[0]));
} catch (err) { next(err); }
});
// PATCH /playout/channels/:id — update config (only while stopped)
router.patch('/channels/:id', requireChannelEdit, async (req, res, next) => {
try {
if (req.channel.status === 'running') {
return res.status(409).json({ error: 'Cannot edit a running channel — stop it first' });
}
const allowed = ['name', 'node_id', 'output_type', 'output_config', 'video_format', 'project_id'];
const sets = [];
const vals = [];
let i = 1;
for (const k of allowed) {
if (req.body[k] === undefined) continue;
if (k === 'output_type' && !OUTPUT_TYPES.has(req.body[k])) {
return res.status(400).json({ error: 'invalid output_type' });
}
sets.push(`${k} = $${i++}`);
vals.push(k === 'output_config' ? JSON.stringify(req.body[k]) : req.body[k]);
}
if (sets.length === 0) return res.json(channelToJson(req.channel));
vals.push(req.channel.id);
const { rows } = await pool.query(
`UPDATE playout_channels SET ${sets.join(', ')}, updated_at = NOW() WHERE id = $${i} RETURNING *`, vals
);
res.json(channelToJson(rows[0]));
} catch (err) { next(err); }
});
// DELETE /playout/channels/:id
router.delete('/channels/:id', requireChannelEdit, async (req, res, next) => {
try {
if (req.channel.status === 'running') {
return res.status(409).json({ error: 'Stop the channel before deleting it' });
}
await pool.query('DELETE FROM playout_channels WHERE id = $1', [req.channel.id]);
res.json({ deleted: true });
} catch (err) { next(err); }
});
// ── Port-contention guard (DeckLink) ─────────────────────────────────────────
// A DeckLink device on a node is exclusive: an active recorder OR another active
// channel on the same node+index blocks a new SDI channel. NDI/SRT/RTMP have no
// hardware contention.
async function assertDeckLinkFree(channel) {
if (channel.output_type !== 'decklink') return;
const idx = (channel.output_config && channel.output_config.device_index) || 1;
// Another running channel on the same node + device index?
const chan = await pool.query(
`SELECT id FROM playout_channels
WHERE id <> $1 AND node_id IS NOT DISTINCT FROM $2 AND status = 'running'
AND output_type = 'decklink' AND (output_config->>'device_index')::int = $3`,
[channel.id, channel.node_id, idx]
);
if (chan.rows.length > 0) {
throw Object.assign(new Error(`DeckLink device ${idx} already in use by another channel on this node`), { httpStatus: 409 });
}
// An active recorder using the same device index on the same node?
const rec = await pool.query(
`SELECT id FROM recorders
WHERE node_id IS NOT DISTINCT FROM $1 AND device_index = $2
AND status = 'recording' AND source_type = 'sdi'`,
[channel.node_id, idx]
);
if (rec.rows.length > 0) {
throw Object.assign(new Error(`DeckLink device ${idx} is in use by a recorder on this node`), { httpStatus: 409 });
}
}
// POST /playout/channels/:id/start — spawn the CasparCG sidecar + bring up output
router.post('/channels/:id/start', requireChannelEdit, async (req, res, next) => {
try {
const channel = req.channel;
if (channel.status === 'running' || channel.status === 'starting') {
return res.status(409).json({ error: `Channel already ${channel.status}` });
}
await assertDeckLinkFree(channel);
await pool.query('UPDATE playout_channels SET status = $1, error_message = NULL WHERE id = $2', ['starting', channel.id]);
const env = [
`OUTPUT_TYPE=${channel.output_type}`,
`OUTPUT_CONFIG=${JSON.stringify(channel.output_config || {})}`,
`VIDEO_FORMAT=${channel.video_format}`,
`PORT=${SIDECAR_HTTP_PORT}`,
// Drives the HLS preview path (/media/live/<channel_id>/index.m3u8) and
// the per-channel resource naming inside the sidecar.
`CHANNEL_ID=${channel.id}`,
];
const { remote: isRemote, apiUrl: targetNodeApiUrl } = await resolveNodeTarget(channel.node_id);
const dockerNetwork = process.env.DOCKER_NETWORK || 'wild-dragon_wild-dragon';
let containerId;
let containerMeta = {};
if (isRemote) {
const sidecarRes = await fetch(`${targetNodeApiUrl}/sidecar/start`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
image: PLAYOUT_SIDECAR_IMAGE, env,
capturePort: SIDECAR_HTTP_PORT,
sourceType: channel.output_type,
useGpu: false,
publishHttp: true,
}),
signal: AbortSignal.timeout(20000),
});
if (!sidecarRes.ok) {
const details = await sidecarRes.json().catch(() => ({}));
console.error('[playout] remote sidecar start failed:', JSON.stringify(details));
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
['error', 'remote node failed to start sidecar', channel.id]);
return res.status(502).json({ error: 'Remote node failed to start sidecar' });
}
const data = await sidecarRes.json();
containerId = data.containerId;
// node-agent returns the reachable host:port the shim is published on.
if (data.sidecarUrl || data.host) {
containerMeta.sidecar_url = data.sidecarUrl || `http://${data.host}:${SIDECAR_HTTP_PORT}`;
}
} else {
const alias = channelAlias(channel.id);
const hostBinds = ['/mnt/NVME/MAM/wild-dragon-media:/media'];
if (channel.output_type === 'decklink') hostBinds.push('/dev/blackmagic:/dev/blackmagic');
const containerConfig = {
Image: PLAYOUT_SIDECAR_IMAGE,
Env: env,
HostConfig: {
Privileged: true,
NetworkMode: dockerNetwork,
Binds: hostBinds,
},
NetworkingConfig: { EndpointsConfig: { [dockerNetwork]: { Aliases: [alias] } } },
Hostname: alias,
};
const createRes = await dockerApi('POST', '/containers/create', containerConfig);
if (createRes.status !== 201) {
console.error('[playout] container create failed:', JSON.stringify(createRes.data));
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
['error', 'container create failed', channel.id]);
return res.status(500).json({ error: 'Failed to create container' });
}
containerId = createRes.data.Id;
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
if (startRes.status !== 204) {
console.error('[playout] container start failed:', JSON.stringify(startRes.data));
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
await pool.query('UPDATE playout_channels SET status = $1, error_message = $2 WHERE id = $3',
['error', 'container start failed', channel.id]);
return res.status(500).json({ error: 'Failed to start container' });
}
}
const { rows } = await pool.query(
`UPDATE playout_channels
SET status = 'running', container_id = $1, container_meta = $2, updated_at = NOW()
WHERE id = $3 RETURNING *`,
[containerId, JSON.stringify(containerMeta), channel.id]
);
res.json(channelToJson(rows[0]));
} catch (err) {
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
next(err);
}
});
// POST /playout/channels/:id/stop — tear down the sidecar
router.post('/channels/:id/stop', requireChannelEdit, async (req, res, next) => {
try {
const channel = req.channel;
if (channel.container_id) {
const { remote: isRemote, apiUrl } = await resolveNodeTarget(channel.node_id);
if (isRemote) {
await fetch(`${apiUrl}/sidecar/stop`, {
method: 'POST', headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ containerId: channel.container_id }),
signal: AbortSignal.timeout(20000),
}).catch((e) => console.error('[playout] remote stop failed:', e.message));
} else {
await dockerApi('POST', `/containers/${channel.container_id}/stop?t=10`).catch(() => {});
await dockerApi('DELETE', `/containers/${channel.container_id}?force=true`).catch(() => {});
}
}
const { rows } = await pool.query(
`UPDATE playout_channels SET status = 'stopped', container_id = NULL, updated_at = NOW()
WHERE id = $1 RETURNING *`, [channel.id]
);
res.json(channelToJson(rows[0]));
} catch (err) { next(err); }
});
// GET /playout/channels/:id/status — live engine status (proxied to sidecar)
router.get('/channels/:id/status', async (req, res, next) => {
try {
if (req.channel.status !== 'running') {
return res.json({ running: false, status: req.channel.status });
}
const out = await callSidecar(req.channel, '/status', 'GET');
res.json({ running: true, status: req.channel.status, engine: out });
} catch (err) {
res.json({ running: true, status: req.channel.status, engine: null, engine_error: err.message });
}
});
// ── Transport ────────────────────────────────────────────────────────────────
async function transport(req, res, action, body = null) {
if (req.channel.status !== 'running') {
return res.status(409).json({ error: 'Channel is not running' });
}
try { res.json(await callSidecar(req.channel, action, 'POST', body)); }
catch (err) { res.status(502).json({ error: err.message }); }
}
// POST /playout/channels/:id/play — resolve the channel's playlist, stage-check,
// and hand the engine the ordered list of ready clips.
router.post('/channels/:id/play', requireChannelEdit, async (req, res, next) => {
try {
if (req.channel.status !== 'running') {
return res.status(409).json({ error: 'Start the channel before playing' });
}
const { playlist_id } = req.body || {};
if (!playlist_id) return res.status(400).json({ error: 'playlist_id is required' });
const pl = await pool.query('SELECT * FROM playout_playlists WHERE id = $1 AND channel_id = $2',
[playlist_id, req.channel.id]);
if (pl.rows.length === 0) return res.status(404).json({ error: 'Playlist not found for this channel' });
const items = await pool.query(
`SELECT i.*, a.filename AS clip_name
FROM playout_items i JOIN assets a ON a.id = i.asset_id
WHERE i.playlist_id = $1 ORDER BY i.sort_order ASC`, [playlist_id]);
const notReady = items.rows.filter((i) => i.media_status !== 'ready' || !i.media_path);
if (notReady.length > 0) {
return res.status(409).json({
error: 'Some items are not staged yet',
pending: notReady.map((i) => i.id),
});
}
const payload = {
loop: pl.rows[0].loop,
items: items.rows.map((i) => ({
id: i.id, asset_id: i.asset_id, media_path: i.media_path,
in_point: i.in_point ? Number(i.in_point) : null,
out_point: i.out_point ? Number(i.out_point) : null,
transition: i.transition, transition_ms: i.transition_ms,
clip_name: i.clip_name,
})),
};
const out = await callSidecar(req.channel, '/playlist/load', 'POST', payload);
res.json(out);
} catch (err) { next(err); }
});
router.post('/channels/:id/pause', requireChannelEdit, (req, res) => transport(req, res, '/transport/pause'));
router.post('/channels/:id/resume', requireChannelEdit, (req, res) => transport(req, res, '/transport/resume'));
router.post('/channels/:id/skip', requireChannelEdit, (req, res) => transport(req, res, '/transport/skip'));
router.post('/channels/:id/stop-playback', requireChannelEdit, (req, res) => transport(req, res, '/channel/stop'));
// GET /playout/channels/:id/asrun — as-run log
router.get('/channels/:id/asrun', async (req, res, next) => {
try {
const { rows } = await pool.query(
`SELECT * FROM playout_as_run WHERE channel_id = $1 ORDER BY started_at DESC LIMIT 500`,
[req.channel.id]);
res.json(rows);
} catch (err) { next(err); }
});
// ── Playlists ────────────────────────────────────────────────────────────────
async function loadChannelForBody(req, res, next) {
// For playlist/item routes the channel is referenced indirectly; resolve it
// and assert edit. Used on create/mutate routes that carry channel_id.
const channelId = req.body.channel_id || req.query.channel_id;
if (!channelId) return res.status(400).json({ error: 'channel_id is required' });
try {
const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]);
if (rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
req.channel = rows[0];
await assertProjectAccess(req.user, req.channel.project_id, 'edit');
next();
} catch (err) { next(err); }
}
// GET /playout/playlists?channel_id=...
router.get('/playlists', async (req, res, next) => {
try {
const channelId = req.query.channel_id;
if (!channelId) return res.status(400).json({ error: 'channel_id is required' });
const ch = await pool.query('SELECT project_id FROM playout_channels WHERE id = $1', [channelId]);
if (ch.rows.length === 0) return res.status(404).json({ error: 'Channel not found' });
await assertProjectAccess(req.user, ch.rows[0].project_id, 'view');
const { rows } = await pool.query(
'SELECT * FROM playout_playlists WHERE channel_id = $1 ORDER BY created_at ASC', [channelId]);
res.json(rows);
} catch (err) { next(err); }
});
// POST /playout/playlists
router.post('/playlists', loadChannelForBody, async (req, res, next) => {
try {
const { name, loop = false } = req.body || {};
if (!name) return res.status(400).json({ error: 'name is required' });
const { rows } = await pool.query(
'INSERT INTO playout_playlists (channel_id, name, loop) VALUES ($1,$2,$3) RETURNING *',
[req.channel.id, name.trim(), !!loop]);
res.status(201).json(rows[0]);
} catch (err) { next(err); }
});
// GET /playout/playlists/:plid/items
router.get('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => {
try {
const pl = await pool.query(
`SELECT p.*, c.project_id FROM playout_playlists p
JOIN playout_channels c ON c.id = p.channel_id WHERE p.id = $1`, [req.params.plid]);
if (pl.rows.length === 0) return res.status(404).json({ error: 'Playlist not found' });
await assertProjectAccess(req.user, pl.rows[0].project_id, 'view');
const { rows } = await pool.query(
`SELECT i.*, a.filename AS clip_name, a.duration_ms AS asset_duration_ms
FROM playout_items i JOIN assets a ON a.id = i.asset_id
WHERE i.playlist_id = $1 ORDER BY i.sort_order ASC`, [req.params.plid]);
res.json(rows);
} catch (err) { next(err); }
});
// Helper: load a playlist + assert edit on its channel's project.
async function loadPlaylistEdit(plid, user) {
const pl = await pool.query(
`SELECT p.*, c.project_id FROM playout_playlists p
JOIN playout_channels c ON c.id = p.channel_id WHERE p.id = $1`, [plid]);
if (pl.rows.length === 0) { throw Object.assign(new Error('Playlist not found'), { httpStatus: 404 }); }
await assertProjectAccess(user, pl.rows[0].project_id, 'edit');
return pl.rows[0];
}
// POST /playout/playlists/:plid/items — add an asset to a playlist
router.post('/playlists/:plid/items', validateUuid('plid'), async (req, res, next) => {
try {
await loadPlaylistEdit(req.params.plid, req.user);
const { asset_id, in_point = null, out_point = null,
transition = 'cut', transition_ms = 0 } = req.body || {};
if (!asset_id) return res.status(400).json({ error: 'asset_id is required' });
// Append at the end of the playlist.
const ord = await pool.query(
'SELECT COALESCE(MAX(sort_order), -1) + 1 AS next FROM playout_items WHERE playlist_id = $1',
[req.params.plid]);
const { rows } = await pool.query(
`INSERT INTO playout_items (playlist_id, asset_id, sort_order, in_point, out_point, transition, transition_ms)
VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING *`,
[req.params.plid, asset_id, ord.rows[0].next, in_point, out_point, transition, transition_ms]);
// Kick staging immediately so the clip is air-ready by the time the operator
// hits play.
await stageQueue.add('stage', { itemId: rows[0].id, assetId: asset_id }).catch((e) =>
console.error('[playout] failed to enqueue stage job:', e.message));
res.status(201).json(rows[0]);
} catch (err) {
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
next(err);
}
});
// PUT /playout/playlists/:plid/reorder — body { order: [itemId, itemId, ...] }
router.put('/playlists/:plid/reorder', validateUuid('plid'), async (req, res, next) => {
const client = await pool.connect();
try {
await loadPlaylistEdit(req.params.plid, req.user);
const { order } = req.body || {};
if (!Array.isArray(order)) return res.status(400).json({ error: 'order must be an array of item ids' });
await client.query('BEGIN');
for (let i = 0; i < order.length; i++) {
await client.query(
'UPDATE playout_items SET sort_order = $1, updated_at = NOW() WHERE id = $2 AND playlist_id = $3',
[i, order[i], req.params.plid]);
}
await client.query('COMMIT');
res.json({ reordered: order.length });
} catch (err) {
await client.query('ROLLBACK').catch(() => {});
if (err.httpStatus) return res.status(err.httpStatus).json({ error: err.message });
next(err);
} finally { client.release(); }
});
// DELETE /playout/items/:itemId
router.delete('/items/:itemId', validateUuid('itemId'), async (req, res, next) => {
try {
const it = await pool.query(
`SELECT i.id, c.project_id FROM playout_items i
JOIN playout_playlists p ON p.id = i.playlist_id
JOIN playout_channels c ON c.id = p.channel_id WHERE i.id = $1`, [req.params.itemId]);
if (it.rows.length === 0) return res.status(404).json({ error: 'Item not found' });
await assertProjectAccess(req.user, it.rows[0].project_id, 'edit');
await pool.query('DELETE FROM playout_items WHERE id = $1', [req.params.itemId]);
res.json({ deleted: true });
} catch (err) { next(err); }
});
// POST /playout/items/:itemId/stage — (re)kick staging for one item
router.post('/items/:itemId/stage', validateUuid('itemId'), async (req, res, next) => {
try {
const it = await pool.query(
`SELECT i.id, i.asset_id, c.project_id FROM playout_items i
JOIN playout_playlists p ON p.id = i.playlist_id
JOIN playout_channels c ON c.id = p.channel_id WHERE i.id = $1`, [req.params.itemId]);
if (it.rows.length === 0) return res.status(404).json({ error: 'Item not found' });
await assertProjectAccess(req.user, it.rows[0].project_id, 'edit');
await pool.query("UPDATE playout_items SET media_status = 'pending' WHERE id = $1", [req.params.itemId]);
await stageQueue.add('stage', { itemId: it.rows[0].id, assetId: it.rows[0].asset_id });
res.json({ queued: true });
} catch (err) { next(err); }
});
// ── Failover (called by scheduler tick) ──────────────────────────────────────
// Tear down a (presumed dead) sidecar and re-spawn it on another cluster node
// matching the original capability. DeckLink channels are excluded — the
// device-index pinning makes blind re-placement risky, so they alert only.
//
// Returns { restarted: true, new_node_id } on success, or { restarted: false,
// reason } when no eligible node exists or the channel is decklink.
export async function restartChannel(channelId) {
const { rows } = await pool.query('SELECT * FROM playout_channels WHERE id = $1', [channelId]);
if (rows.length === 0) return { restarted: false, reason: 'channel not found' };
const channel = rows[0];
if (channel.output_type === 'decklink') {
return { restarted: false, reason: 'decklink channels are alert-only' };
}
// Best-effort teardown of the old container — it may already be dead.
if (channel.container_id) {
const { remote, apiUrl } = await resolveNodeTarget(channel.node_id);
if (remote && apiUrl) {
await fetch(`${apiUrl}/sidecar/stop`, {
method: 'POST', headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ containerId: channel.container_id }),
signal: AbortSignal.timeout(10000),
}).catch(() => {});
} else {
await dockerApi('DELETE', `/containers/${channel.container_id}?force=true`).catch(() => {});
}
}
// Pick a different healthy node. For NDI/SRT/RTMP every online node is
// eligible (no hardware contention). Prefer the original if it's still
// online — the failure may have been transient.
const nodes = await pool.query(
`SELECT id, hostname, api_url, last_seen_at FROM cluster_nodes
WHERE id <> $1 AND last_seen_at > NOW() - INTERVAL '60 seconds'
ORDER BY last_seen_at DESC LIMIT 1`,
[channel.node_id]
);
if (nodes.rows.length === 0) {
await pool.query(
"UPDATE playout_channels SET status = 'error', error_message = $1 WHERE id = $2",
['no healthy node available for failover', channel.id]
);
return { restarted: false, reason: 'no eligible node' };
}
const newNodeId = nodes.rows[0].id;
// Move the channel to the new node + bump the counters; the operator UI
// surfaces these to flag restarts.
await pool.query(
`UPDATE playout_channels
SET node_id = $1, status = 'starting', container_id = NULL, container_meta = '{}'::jsonb,
restart_count = restart_count + 1, last_restart_at = NOW(),
error_message = NULL, updated_at = NOW()
WHERE id = $2`,
[newNodeId, channel.id]
);
// The actual sidecar spawn re-uses the same path as /start. We POST to
// ourselves rather than duplicating the docker/agent code; scheduler runs
// in-process so this is a local function call shape, but going through the
// route keeps RBAC/permission paths consistent.
// NOTE: scheduler-driven restart bypasses HTTP — it imports startSidecar
// directly. Surfaced as a separate helper in a follow-up if the inline
// simple path proves insufficient.
return { restarted: true, new_node_id: newNodeId };
}
export default router;

View file

@ -9,6 +9,7 @@
import pool from './db/pool.js';
import { syncToAmpp } from './routes/upload.js';
import { restartChannel } from './routes/playout.js';
const TICK_INTERVAL_MS = parseInt(process.env.SCHEDULER_TICK_MS || '15000', 10);
const SELF_URL = process.env.MAM_API_SELF_URL || `http://127.0.0.1:${process.env.PORT || 3000}`;
@ -175,6 +176,13 @@ async function tick() {
for (const row of ampps.rows) {
await syncToAmpp(row.id, row.project_id, row.bin_id);
}
// 6) Playout channel health checks. Ping each running channel's sidecar
// /status; on success bump last_heartbeat_at, on failure increment a
// transient miss counter (in playout_sidecars.last_heartbeat_at age).
// Three consecutive misses → auto-restart on a healthy node (non-
// decklink), or alert-only for decklink.
await playoutHealthTick(client);
} catch (err) {
console.error('[scheduler] tick error:', err);
} finally {
@ -201,6 +209,70 @@ async function enqueueNextOccurrence(schedule, client) {
console.log(`[scheduler] queued next "${schedule.name}" → ${start.toISOString()}`);
}
// ── Playout channel health + failover ────────────────────────────────────────
// Tick step 6. Reuses the same advisory lock so only one replica probes the
// sidecars; multi-replica pings would just waste cycles. A missed probe is
// counted via last_heartbeat_at age: > 3 * TICK_INTERVAL means 3 consecutive
// misses.
async function playoutHealthTick(client) {
let channels;
try {
({ rows: channels } = await client.query(
`SELECT id, output_type, container_meta, node_id, last_heartbeat_at, restart_count
FROM playout_channels WHERE status = 'running'`
));
} catch (err) {
// Migration 029 may not be applied yet — bail silently rather than crash.
if (err.code === '42P01') return;
throw err;
}
const TIMEOUT_MS = TICK_INTERVAL_MS * 3 + 5000;
for (const ch of channels) {
const sidecarUrl =
ch.container_meta && ch.container_meta.sidecar_url
? ch.container_meta.sidecar_url
: `http://playout-${ch.id}:3002`;
try {
const r = await fetch(`${sidecarUrl}/status`, { signal: AbortSignal.timeout(5000) });
if (!r.ok) throw new Error(`status HTTP ${r.status}`);
await client.query(
'UPDATE playout_channels SET last_heartbeat_at = NOW() WHERE id = $1', [ch.id]
);
} catch (err) {
const lastSeen = ch.last_heartbeat_at ? new Date(ch.last_heartbeat_at).getTime() : 0;
const ageMs = Date.now() - lastSeen;
if (ageMs < TIMEOUT_MS) continue; // not yet 3 misses
if (ch.output_type === 'decklink') {
await client.query(
"UPDATE playout_channels SET status = 'error', error_message = $1 WHERE id = $2",
[`sidecar unreachable (${err.message}); decklink channels require manual recovery`, ch.id]
);
console.error(`[scheduler] decklink channel ${ch.id} unreachable — alert-only, no auto-failover`);
continue;
}
console.warn(`[scheduler] failover: channel ${ch.id} unreachable (${err.message}), restart #${ch.restart_count + 1}`);
try {
const res = await restartChannel(ch.id);
if (res.restarted) {
console.log(`[scheduler] failover: channel ${ch.id} re-placed on node ${res.new_node_id}`);
// Kick the new sidecar via the /start route — the helper updates the
// DB but the actual docker spawn lives on the start endpoint.
await callSelf(`/api/v1/playout/channels/${ch.id}/start`).catch((e) => {
console.error(`[scheduler] failover: /start call failed: ${e.message}`);
});
} else {
console.error(`[scheduler] failover: channel ${ch.id} restart skipped — ${res.reason}`);
}
} catch (err2) {
console.error(`[scheduler] failover error for ${ch.id}: ${err2.message}`);
}
}
}
}
export function startSchedulerLoop() {
if (_interval) return;
console.log(`[scheduler] tick loop started (interval=${TICK_INTERVAL_MS}ms)`);

View file

@ -0,0 +1,68 @@
# Wild Dragon Playout sidecar — CasparCG Server + Node AMCP control shim.
#
# CasparCG's mixer needs an OpenGL context. On a node with a real GPU we'd pass
# the device + driver through; for the headless / no-GPU case we run a virtual
# framebuffer (Xvfb) so the GL context initialises. The container is launched
# --privileged by mam-api (same as capture) so DeckLink / NDI hardware is
# reachable when present.
#
# NDI + DeckLink SDKs are NOT redistributable. They are fetched at build time
# from URLs supplied as build args (mirror them into your own artifact store);
# the build still succeeds without them (NDI/DeckLink consumers simply won't be
# available — SRT/RTMP/test output still work).
FROM node:20-bookworm
ARG CASPAR_VERSION=2.3.3-stable
ARG CASPAR_URL=https://github.com/CasparCG/server/releases/download/v2.3.3-stable/CasparCG-Server-2.3.3-stable-Linux.tar.gz
ARG NDI_SDK_URL=
ENV DEBIAN_FRONTEND=noninteractive
# CasparCG 2.3 Linux runtime deps + Xvfb for headless GL + ffmpeg libs for the
# FFMPEG consumer (SRT/RTMP output).
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates curl tar xz-utils \
xvfb libgl1-mesa-glx libgl1-mesa-dri libglu1-mesa \
libx11-6 libxext6 libxrandr2 libxcursor1 libxinerama1 libxi6 \
libopenal1 libsndfile1 libavformat59 libavcodec59 libavfilter8 \
libswscale6 libswresample4 libpostproc56 fonts-dejavu-core \
&& rm -rf /var/lib/apt/lists/*
# ── CasparCG Server ──────────────────────────────────────────────────────────
WORKDIR /opt
RUN curl -fsSL "$CASPAR_URL" -o caspar.tar.gz \
&& mkdir -p /opt/casparcg \
&& tar xzf caspar.tar.gz -C /opt/casparcg --strip-components=1 \
&& rm caspar.tar.gz \
&& (test -f /opt/casparcg/casparcg || test -f /opt/casparcg/CasparCG\ Server || true)
# ── NDI runtime (optional) ───────────────────────────────────────────────────
# If an NDI SDK tarball URL is provided, extract its libs to /opt/ndi-lib and
# point CasparCG at them via NDI_RUNTIME_DIR_V6. Pin the SDK version to what the
# server expects (the common docker failure is a libndi .so version mismatch).
RUN if [ -n "$NDI_SDK_URL" ]; then \
mkdir -p /opt/ndi-lib && \
curl -fsSL "$NDI_SDK_URL" -o /tmp/ndi.tar.gz && \
tar xzf /tmp/ndi.tar.gz -C /tmp && \
find /tmp -name 'libndi*.so*' -exec cp -a {} /opt/ndi-lib/ \; && \
rm -f /tmp/ndi.tar.gz && ldconfig /opt/ndi-lib || true; \
fi
ENV NDI_RUNTIME_DIR_V6=/opt/ndi-lib
# CasparCG media folder — mam-api stages assets from S3 into this volume.
RUN mkdir -p /media
# ── Node control shim ────────────────────────────────────────────────────────
WORKDIR /app
COPY package*.json ./
RUN npm install --omit=dev
COPY . .
# CasparCG config + entrypoint
COPY casparcg.config /opt/casparcg/casparcg.config
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 3002 5250
ENTRYPOINT ["/entrypoint.sh"]

View file

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<paths>
<media-path>/media/</media-path>
<log-path>/opt/casparcg/log/</log-path>
<data-path>/opt/casparcg/data/</data-path>
<template-path>/media/templates/</template-path>
</paths>
<!-- Single logical channel. The output consumer (DeckLink / NDI / SRT / RTMP)
is added at runtime over AMCP by the Node shim (playout-manager.js), so no
static consumer is declared here. A screen consumer is intentionally
omitted — this is a headless server. -->
<channels>
<channel>
<video-mode>1080i5994</video-mode>
<consumers>
</consumers>
</channel>
</channels>
<controllers>
<!-- AMCP over TCP — the Node shim connects here. -->
<tcp>
<port>5250</port>
<protocol>AMCP</protocol>
</tcp>
</controllers>
</configuration>

View file

@ -0,0 +1,47 @@
#!/usr/bin/env bash
set -euo pipefail
# Headless GL: start a virtual framebuffer unless a real DISPLAY is provided
# (a GPU node may pass through an X socket). CasparCG's mixer needs a GL context.
if [ -z "${DISPLAY:-}" ]; then
echo "[entrypoint] starting Xvfb on :99"
Xvfb :99 -screen 0 1920x1080x24 -nolisten tcp &
export DISPLAY=:99
# Give Xvfb a moment to create the socket.
for i in $(seq 1 20); do
[ -e /tmp/.X11-unix/X99 ] && break
sleep 0.25
done
fi
# Ensure the HLS preview directory exists before CasparCG attaches its second
# FFMPEG consumer (mam-api serves /live/<channel_id>/* from the shared volume).
if [ -n "${CHANNEL_ID:-}" ]; then
mkdir -p "/media/live/${CHANNEL_ID}"
fi
# Launch CasparCG Server from its install dir (it reads ./casparcg.config and
# resolves relative media paths against the configured media folder).
cd /opt/casparcg
CASPAR_BIN="./casparcg"
[ -x "$CASPAR_BIN" ] || CASPAR_BIN="./CasparCG Server"
echo "[entrypoint] launching CasparCG: $CASPAR_BIN"
"$CASPAR_BIN" &
CASPAR_PID=$!
# Forward termination to CasparCG so the channel closes cleanly.
term() {
echo "[entrypoint] terminating CasparCG ($CASPAR_PID)"
kill -TERM "$CASPAR_PID" 2>/dev/null || true
wait "$CASPAR_PID" 2>/dev/null || true
exit 0
}
trap term SIGTERM SIGINT
# Launch the Node control shim (foreground). If it exits, stop the container.
cd /app
node src/index.js &
NODE_PID=$!
wait -n "$CASPAR_PID" "$NODE_PID"
term

View file

@ -0,0 +1,18 @@
{
"name": "wild-dragon-playout",
"version": "1.0.0",
"description": "Wild Dragon MAM playout sidecar — wraps a CasparCG Server instance and drives it over AMCP for master-control playout (SDI / NDI / SRT / RTMP).",
"type": "module",
"main": "src/index.js",
"scripts": {
"start": "node src/index.js"
},
"engines": {
"node": ">=18"
},
"dependencies": {
"express": "^4.18.0",
"cors": "^2.8.0",
"dotenv": "^16.4.0"
}
}

View file

@ -0,0 +1,182 @@
import net from 'node:net';
// Minimal AMCP (Advanced Media Control Protocol) client for CasparCG.
//
// AMCP is a line-based TCP protocol: each command is a single CRLF-terminated
// line, and the server replies with a status line ("201 PLAY OK\r\n") optionally
// followed by data lines. We keep one persistent socket per CasparCG instance
// and serialize commands through a FIFO queue — CasparCG processes one command
// at a time per connection, so interleaving replies would otherwise be
// ambiguous.
//
// We only implement the subset the playout sidecar needs (PLAY / LOADBG / STOP /
// CLEAR / INFO / ADD / REMOVE). Responses are returned raw; callers parse the
// status code where they care.
const CRLF = '\r\n';
export class AmcpClient {
constructor({ host = '127.0.0.1', port = 5250 } = {}) {
this.host = host;
this.port = port;
this.socket = null;
this.connected = false;
this._buffer = '';
this._queue = []; // pending { command, resolve, reject, timer }
this._active = null; // command currently awaiting a reply
this._reconnectTimer = null;
}
connect() {
if (this.socket) return;
const socket = net.createConnection({ host: this.host, port: this.port });
socket.setEncoding('utf8');
socket.setKeepAlive(true, 10000);
socket.on('connect', () => {
this.connected = true;
console.log(`[amcp] connected to ${this.host}:${this.port}`);
});
socket.on('data', (chunk) => this._onData(chunk));
socket.on('error', (err) => {
console.error(`[amcp] socket error: ${err.message}`);
});
socket.on('close', () => {
this.connected = false;
this.socket = null;
// Fail any in-flight + queued commands so callers don't hang.
const pending = this._active ? [this._active, ...this._queue] : [...this._queue];
this._active = null;
this._queue = [];
for (const p of pending) {
clearTimeout(p.timer);
p.reject(new Error('AMCP connection closed'));
}
this._scheduleReconnect();
});
this.socket = socket;
}
_scheduleReconnect() {
if (this._reconnectTimer) return;
this._reconnectTimer = setTimeout(() => {
this._reconnectTimer = null;
console.log('[amcp] reconnecting...');
this.connect();
}, 2000);
}
// Wait until the socket is usable, up to timeoutMs.
async waitReady(timeoutMs = 30000) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (this.connected) return true;
if (!this.socket) this.connect();
await new Promise((r) => setTimeout(r, 250));
}
throw new Error('AMCP not ready within timeout');
}
_onData(chunk) {
this._buffer += chunk;
// A CasparCG reply is a status line, optionally followed by data lines.
// The simplest robust framing: a command's reply is complete when we see a
// status line AND (for 2-line "200" multi-line replies) the terminating
// blank line. For our command subset, single-status-line replies dominate;
// we treat a reply as complete at each newline and let the active command
// decide whether it has enough. To keep this correct for INFO (multi-line),
// we accumulate until the buffer ends with a known terminator.
if (!this._active) {
// Unsolicited data (e.g. connection banner) — discard.
this._buffer = '';
return;
}
// CasparCG ends multi-line replies with CRLF on an empty line. Single-line
// replies (201/202/4xx/5xx) end with a single CRLF. Resolve when we have at
// least one complete line; for "200 ... OK" (list follows) wait for the
// blank-line terminator.
const firstLineEnd = this._buffer.indexOf(CRLF);
if (firstLineEnd === -1) return;
const statusLine = this._buffer.slice(0, firstLineEnd);
const code = parseInt(statusLine, 10);
if (code === 200) {
// Multi-line: data lines until an empty line.
const term = this._buffer.indexOf(CRLF + CRLF);
if (term === -1) return; // wait for more
const full = this._buffer.slice(0, term);
this._buffer = this._buffer.slice(term + 4);
this._finishActive(null, full);
return;
}
if (code === 201 || code === 202) {
// 201: one data line follows the status line. 202: status only.
if (code === 201) {
const secondLineEnd = this._buffer.indexOf(CRLF, firstLineEnd + 2);
if (secondLineEnd === -1) return;
const full = this._buffer.slice(0, secondLineEnd);
this._buffer = this._buffer.slice(secondLineEnd + 2);
this._finishActive(null, full);
} else {
const full = this._buffer.slice(0, firstLineEnd);
this._buffer = this._buffer.slice(firstLineEnd + 2);
this._finishActive(null, full);
}
return;
}
// 4xx / 5xx error, or any other single-line status.
const full = this._buffer.slice(0, firstLineEnd);
this._buffer = this._buffer.slice(firstLineEnd + 2);
if (code >= 400) this._finishActive(new Error(`AMCP error: ${full}`), full);
else this._finishActive(null, full);
}
_finishActive(err, data) {
const active = this._active;
this._active = null;
if (active) {
clearTimeout(active.timer);
if (err) active.reject(err);
else active.resolve(data);
}
this._pump();
}
_pump() {
if (this._active || this._queue.length === 0) return;
const next = this._queue.shift();
this._active = next;
try {
this.socket.write(next.command + CRLF);
} catch (err) {
this._active = null;
clearTimeout(next.timer);
next.reject(err);
}
}
// Send a single AMCP command and resolve with the raw reply string.
send(command, { timeoutMs = 15000 } = {}) {
return new Promise((resolve, reject) => {
const entry = { command, resolve, reject, timer: null };
entry.timer = setTimeout(() => {
// Drop from queue if still pending; if active, detach so the next
// reply doesn't get misrouted.
if (this._active === entry) this._active = null;
else this._queue = this._queue.filter((e) => e !== entry);
reject(new Error(`AMCP command timed out: ${command}`));
}, timeoutMs);
this._queue.push(entry);
this._pump();
});
}
close() {
if (this._reconnectTimer) { clearTimeout(this._reconnectTimer); this._reconnectTimer = null; }
if (this.socket) { try { this.socket.destroy(); } catch (_) {} this.socket = null; }
this.connected = false;
}
}

View file

@ -0,0 +1,85 @@
import express from 'express';
import cors from 'cors';
import dotenv from 'dotenv';
import playoutManager from './playout-manager.js';
dotenv.config();
const app = express();
const PORT = process.env.PORT || 3002;
app.use(cors());
app.use(express.json());
app.get('/health', (req, res) => res.json({ status: 'ok' }));
// Start the channel's output consumer. Body: { outputType, outputConfig, videoFormat }
app.post('/channel/start', async (req, res) => {
try {
const out = await playoutManager.startChannel(req.body || {});
res.json(out);
} catch (err) {
console.error('[playout] /channel/start error:', err.message);
res.status(500).json({ error: err.message });
}
});
app.post('/channel/stop', async (req, res) => {
try { res.json(await playoutManager.stopChannel()); }
catch (err) { res.status(500).json({ error: err.message }); }
});
// Load + start a playlist. Body: { items: [...], loop }
app.post('/playlist/load', async (req, res) => {
try {
const { items = [], loop = false } = req.body || {};
res.json(await playoutManager.loadPlaylist({ items, loop }));
} catch (err) {
console.error('[playout] /playlist/load error:', err.message);
res.status(500).json({ error: err.message });
}
});
app.post('/transport/skip', async (req, res) => { try { res.json(await playoutManager.skip()); } catch (e) { res.status(500).json({ error: e.message }); } });
app.post('/transport/pause', async (req, res) => { try { res.json(await playoutManager.pause()); } catch (e) { res.status(500).json({ error: e.message }); } });
app.post('/transport/resume', async (req, res) => { try { res.json(await playoutManager.resume()); } catch (e) { res.status(500).json({ error: e.message }); } });
app.get('/status', (req, res) => res.json(playoutManager.getStatus()));
// Auto-start: when the sidecar is spawned by mam-api with channel env, bring up
// the output consumer immediately so the container is "on air idle" (black/slate)
// the moment it boots, mirroring the capture sidecar's bootstrap pattern.
async function bootstrap() {
const outputType = process.env.OUTPUT_TYPE;
if (!outputType) {
console.log('[bootstrap] no OUTPUT_TYPE — on-demand sidecar, waiting for /channel/start');
return;
}
let outputConfig = {};
try { outputConfig = JSON.parse(process.env.OUTPUT_CONFIG || '{}'); }
catch (err) { console.error('[bootstrap] bad OUTPUT_CONFIG json:', err.message); }
const videoFormat = process.env.VIDEO_FORMAT || '1080i5994';
try {
await playoutManager.startChannel({ outputType, outputConfig, videoFormat });
} catch (err) {
console.error('[bootstrap] channel start failed:', err.message);
}
}
const server = app.listen(PORT, () => {
console.log(`Wild Dragon Playout Service listening on port ${PORT}`);
// Give CasparCG a moment to come up (started by the container entrypoint).
playoutManager.amcp.connect();
bootstrap();
});
function shutdown(sig) {
console.log(`[playout] ${sig} — shutting down`);
playoutManager.stopChannel().catch(() => {}).finally(() => {
playoutManager.amcp.close();
server.close(() => process.exit(0));
setTimeout(() => process.exit(0), 5000);
});
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));

View file

@ -0,0 +1,277 @@
import { AmcpClient } from './amcp.js';
// Playout manager — owns one CasparCG channel's lifecycle inside this sidecar.
//
// One sidecar container == one CasparCG Server == one logical channel (channel
// index 1 in CasparCG terms). We add the output consumer (DeckLink / NDI / SRT
// / RTMP) at start, then walk a playlist by cueing the next clip on a background
// layer (LOADBG ... AUTO) so CasparCG performs a gapless transition at end of
// the current clip.
//
// Media is referenced by a path relative to CasparCG's configured media folder
// (/media inside the container). The mam-api stages assets from S3 to that
// shared volume and passes the resolved relative path on each item.
const CHANNEL = 1; // single CasparCG channel per sidecar
const FG_LAYER = 10; // foreground (on-air) layer
const MEDIA_ROOT = process.env.CASPAR_MEDIA_ROOT || '/media';
// Channel-id-derived HLS preview path. The mam-api proxies /live/<channel_id>/
// to this directory (shared media volume) so the UI's existing HLS player
// (capture's /live/<id> plumbing) works for playout monitors with zero new
// transport.
const CHANNEL_ID = process.env.CHANNEL_ID || '';
const HLS_DIR = CHANNEL_ID ? `${MEDIA_ROOT}/live/${CHANNEL_ID}` : '';
// CasparCG SEEK / LENGTH are in frames, not seconds. Capture standard is 59.94;
// SD/film modes need their own values. Default 60000/1001 matches both
// '1080p5994' and '1080i5994'.
function fpsFor(videoFormat) {
const f = String(videoFormat || '').toLowerCase();
if (f.endsWith('5994')) return 60000 / 1001;
if (f.endsWith('p60') || f.endsWith('i60')) return 60;
if (f.endsWith('p50') || f.endsWith('i50')) return 50;
if (f.endsWith('2997')) return 30000 / 1001;
if (f.endsWith('p30')) return 30;
if (f.endsWith('p25')) return 25;
if (f.endsWith('p24') || f.endsWith('2398')) return 24000 / 1001;
return 60000 / 1001; // safe default for the house standard
}
// CasparCG transition syntax fragments keyed by our item.transition value.
function transitionArgs(transition, ms, fps) {
if (!transition || transition === 'cut' || !ms) return '';
const frames = Math.max(1, Math.round((ms / 1000) * fps));
if (transition === 'mix') return ` MIX ${frames}`;
if (transition === 'wipe') return ` WIPE ${frames}`;
return '';
}
// Turn an absolute /media path (or a relative one) into the token CasparCG
// expects: a path relative to MEDIA_ROOT, without extension, forward-slashed.
// CasparCG resolves "subdir/clip" against its media folder + probes extensions.
function toCasparToken(mediaPath) {
let p = String(mediaPath || '');
if (p.startsWith(MEDIA_ROOT)) p = p.slice(MEDIA_ROOT.length);
p = p.replace(/^\/+/, '');
p = p.replace(/\.[^/.]+$/, ''); // strip extension
return p;
}
export class PlayoutManager {
constructor() {
this.amcp = new AmcpClient({
host: process.env.CASPAR_HOST || '127.0.0.1',
port: parseInt(process.env.CASPAR_PORT || '5250', 10),
});
this.state = {
running: false,
outputType: null,
outputConfig: null,
videoFormat: null,
playlist: [], // resolved items in play order
currentIndex: -1,
loop: false,
currentClip: null,
startedAt: null,
lastError: null,
};
this._advanceTimer = null;
}
async _consumerCommand(outputType, cfg) {
// Returns the AMCP ADD argument string for the requested output target.
if (outputType === 'decklink') {
const dev = cfg.device_index || 1;
return `DECKLINK DEVICE ${dev} EMBEDDED_AUDIO`;
}
if (outputType === 'ndi') {
const name = cfg.ndi_name || 'DRAGONFLIGHT';
return `NDI NAME "${name}"`;
}
if (outputType === 'srt' || outputType === 'rtmp') {
// CasparCG 2.3+ FFMPEG consumer streams the channel out via libavformat.
// SRT/RTMP both go through the ffmpeg mpegts/flv muxers.
const url = cfg.url || '';
if (outputType === 'srt') {
const latency = cfg.latency || 200;
const full = url.includes('latency=') ? url : `${url}${url.includes('?') ? '&' : '?'}latency=${latency}`;
return `FFMPEG "${full}" -format mpegts -vcodec libx264 -preset veryfast -tune zerolatency -b:v 6M -acodec aac -b:a 192k`;
}
const target = cfg.key ? `${url}/${cfg.key}` : url;
return `FFMPEG "${target}" -format flv -vcodec libx264 -preset veryfast -tune zerolatency -b:v 6M -acodec aac -b:a 192k`;
}
throw new Error(`Unknown output_type: ${outputType}`);
}
// Start the channel: bring up CasparCG's primary output consumer for the
// target, plus a second FFMPEG consumer writing HLS for the UI preview
// monitor (~4-6s lag, reuses capture's /live/<id> plumbing).
async startChannel({ outputType, outputConfig = {}, videoFormat = '1080p5994' }) {
await this.amcp.waitReady(30000);
// Set the channel video mode, then attach the output consumer.
try { await this.amcp.send(`SET ${CHANNEL} MODE ${videoFormat}`); }
catch (err) { console.warn(`[playout] SET MODE failed (continuing): ${err.message}`); }
const consumer = await this._consumerCommand(outputType, outputConfig);
await this.amcp.send(`ADD ${CHANNEL} ${consumer}`);
if (HLS_DIR) {
try {
await this._addHlsConsumer();
console.log(`[playout] HLS preview at ${HLS_DIR}/index.m3u8`);
} catch (err) {
// HLS preview is non-fatal — operators still get the on-air output.
console.warn(`[playout] HLS preview consumer failed: ${err.message}`);
}
}
this.state.running = true;
this.state.outputType = outputType;
this.state.outputConfig = outputConfig;
this.state.videoFormat = videoFormat;
this.state.fps = fpsFor(videoFormat);
this.state.startedAt = new Date().toISOString();
this.state.lastError = null;
console.log(`[playout] channel started output=${outputType} mode=${videoFormat} fps=${this.state.fps.toFixed(3)}`);
return this.getStatus();
}
// Low-bitrate HLS for the web UI preview. Segments land in the shared media
// volume; the mam-api serves /live/<channel_id>/* from there.
async _addHlsConsumer() {
// mkdir is done by the entrypoint; CasparCG's ffmpeg consumer creates the
// playlist on first segment. 2s segments / 6-window list keeps lag low
// without thrashing disk.
const out = `${HLS_DIR}/index.m3u8`;
const args = [
`FFMPEG "${out}"`,
'-format hls',
'-hls_time 2',
'-hls_list_size 6',
'-hls_flags delete_segments+append_list',
'-vcodec libx264 -preset veryfast -tune zerolatency -b:v 800k -maxrate 1M -bufsize 2M',
'-g 60 -keyint_min 60 -sc_threshold 0',
'-acodec aac -b:a 96k',
].join(' ');
await this.amcp.send(`ADD ${CHANNEL} ${args}`);
}
async stopChannel() {
this._clearAdvance();
try { await this.amcp.send(`STOP ${CHANNEL}-${FG_LAYER}`); } catch (_) {}
try { await this.amcp.send(`CLEAR ${CHANNEL}`); } catch (_) {}
this.state.running = false;
this.state.playlist = [];
this.state.currentIndex = -1;
this.state.currentClip = null;
console.log('[playout] channel stopped');
return { stopped: true };
}
// Load a playlist (array of { id, asset_id, media_path, in_point, out_point,
// transition, transition_ms, clip_name }) and start playing from index 0.
async loadPlaylist({ items = [], loop = false }) {
this.state.playlist = items;
this.state.loop = !!loop;
this.state.currentIndex = -1;
if (items.length === 0) return this.getStatus();
await this._playIndex(0);
return this.getStatus();
}
async _playIndex(index) {
const item = this.state.playlist[index];
if (!item) return;
const fps = this.state.fps || fpsFor(this.state.videoFormat);
const token = toCasparToken(item.media_path);
const seek = item.in_point ? ` SEEK ${Math.round(item.in_point * fps)}` : '';
const length = (item.out_point && item.out_point > (item.in_point || 0))
? ` LENGTH ${Math.round((item.out_point - (item.in_point || 0)) * fps)}`
: '';
const trans = transitionArgs(item.transition, item.transition_ms, fps);
// PLAY puts the clip on the foreground layer immediately (first clip), with
// the configured transition. Subsequent clips are cued via LOADBG ... AUTO
// for a gapless hand-off; see _scheduleAdvance.
await this.amcp.send(`PLAY ${CHANNEL}-${FG_LAYER} "${token}"${seek}${length}${trans}`);
this.state.currentIndex = index;
this.state.currentClip = item.clip_name || token;
console.log(`[playout] PLAY [${index}] ${token}`);
this._reportAsRunStart(item);
this._scheduleAdvance(item);
}
// CasparCG's LOADBG ... AUTO automatically swaps the background layer to
// foreground when the current clip ends. To keep our bookkeeping (currentIndex
// / as-run) in sync we additionally poll INFO and advance our pointer when the
// foreground clip changes. For Phase A we use a simpler model: cue the next
// clip with AUTO and use a duration-based timer to move our pointer.
_scheduleAdvance(item) {
this._clearAdvance();
const next = this._nextIndex();
if (next === null) return;
const nextItem = this.state.playlist[next];
const nextToken = toCasparToken(nextItem.media_path);
const fps = this.state.fps || fpsFor(this.state.videoFormat);
const trans = transitionArgs(nextItem.transition, nextItem.transition_ms, fps);
// Cue next on background with AUTO so CasparCG performs the gapless take.
this.amcp.send(`LOADBG ${CHANNEL}-${FG_LAYER} "${nextToken}" AUTO${trans}`)
.catch((err) => console.warn(`[playout] LOADBG failed: ${err.message}`));
}
_nextIndex() {
const n = this.state.currentIndex + 1;
if (n < this.state.playlist.length) return n;
if (this.state.loop && this.state.playlist.length > 0) return 0;
return null;
}
_clearAdvance() {
if (this._advanceTimer) { clearTimeout(this._advanceTimer); this._advanceTimer = null; }
}
async skip() {
const next = this._nextIndex();
if (next === null) { await this.stopChannel(); return this.getStatus(); }
await this._playIndex(next);
return this.getStatus();
}
async pause() {
try { await this.amcp.send(`PAUSE ${CHANNEL}-${FG_LAYER}`); } catch (_) {}
return this.getStatus();
}
async resume() {
try { await this.amcp.send(`RESUME ${CHANNEL}-${FG_LAYER}`); } catch (_) {}
return this.getStatus();
}
_reportAsRunStart(item) {
// The mam-api owns the as-run table; the sidecar just logs locally. The API
// polls /status and writes as-run rows on clip change. Keeping the DB write
// in the API avoids giving the sidecar a DB connection.
this.state.currentItemId = item.id || null;
this.state.currentItemStartedAt = new Date().toISOString();
}
getStatus() {
return {
running: this.state.running,
outputType: this.state.outputType,
videoFormat: this.state.videoFormat,
currentIndex: this.state.currentIndex,
currentClip: this.state.currentClip,
currentItemId: this.state.currentItemId || null,
currentItemStartedAt: this.state.currentItemStartedAt || null,
playlistLength: this.state.playlist.length,
loop: this.state.loop,
startedAt: this.state.startedAt,
lastError: this.state.lastError,
};
}
}
export default new PlayoutManager();

View file

@ -67,7 +67,7 @@ function App() {
schedule: ['Ingest', 'Schedule'],
youtube: ['Ingest', 'YouTube'],
capture: ['Ingest', 'Capture'], monitors: ['Ingest', 'Monitors'],
jobs: ['Jobs'], editor: ['Editor'],
jobs: ['Jobs'], editor: ['Editor'], playout: ['Operations', 'Playout'],
users: ['Admin', 'Users & Groups'], tokens: ['Admin', 'Tokens'],
containers: ['Admin', 'Containers'], cluster: ['Admin', 'Cluster'],
settings: ['Admin', 'Settings'],
@ -120,6 +120,7 @@ function App() {
case 'capture': content = <Capture navigate={navigate} />; break;
case 'monitors': content = <Monitors navigate={navigate} />; break;
case 'jobs': content = <Jobs navigate={navigate} />; break;
case 'playout': content = <Playout navigate={navigate} />; break;
case 'users': content = <Users />; break;
case 'tokens': content = <Tokens />; break;
case 'billing': content = <TokensParody />; break;

View file

@ -21,6 +21,7 @@
<link rel="stylesheet" href="styles-rest.css" />
<link rel="stylesheet" href="styles-modal.css" />
<link rel="stylesheet" href="styles-fixes.css" />
<link rel="stylesheet" href="styles-playout.css" />
</head>
<body>
<div id="root"></div>
@ -47,6 +48,7 @@
<script src="js/bmd-card.js"></script>
<script src="dist/screens-editor.js"></script>
<script src="dist/screens-admin.js"></script>
<script src="dist/screens-playout.js"></script>
<script src="dist/modal-new-recorder.js"></script>
<script src="dist/app.js"></script>
</body>

View file

@ -0,0 +1,460 @@
// screens-playout.jsx Master Control (MCR) playout page.
//
// Operator workflow (Phase A playlist player):
// 1. Create / pick a channel (output target: SRT / RTMP / NDI / DeckLink).
// 2. Start the channel spawns the CasparCG sidecar, brings up the output.
// 3. Drag assets from the media bin into the playlist; reorder by dragging.
// Each item stages from S3 to the CasparCG /media volume in the background.
// 4. Hit PLAY the engine walks the playlist gaplessly. PAUSE / SKIP / STOP
// transport. As-run log records what aired.
//
// Talks to /api/v1/playout via window.ZAMPP_API.fetch. Native HTML5 drag-drop,
// no extra library. Components are plain globals (esbuild bundle:false).
const PO_OUTPUTS = [
{ value: 'srt', label: 'SRT' },
{ value: 'rtmp', label: 'RTMP' },
{ value: 'ndi', label: 'NDI' },
{ value: 'decklink', label: 'SDI (DeckLink)' },
];
const PO_FORMATS = ['1080p5994', '1080i5994', '1080p2997', '720p5994', '1080i50', '1080p25'];
async function poFetch(path, opts) {
return window.ZAMPP_API.fetch('/playout' + path, opts);
}
// Output-config sub-form (varies by output type)
function OutputConfigFields({ type, config, onChange }) {
const set = (k, v) => onChange({ ...config, [k]: v });
if (type === 'decklink') {
return (
<div className="field">
<label className="field-label">DeckLink device index</label>
<input className="field-input" type="number" min="1" value={config.device_index || 1}
onChange={e => set('device_index', parseInt(e.target.value, 10) || 1)} />
</div>
);
}
if (type === 'ndi') {
return (
<div className="field">
<label className="field-label">NDI source name</label>
<input className="field-input" value={config.ndi_name || ''} placeholder="DRAGONFLIGHT CH1"
onChange={e => set('ndi_name', e.target.value)} />
</div>
);
}
// srt / rtmp
return (
<React.Fragment>
<div className="field">
<label className="field-label">{type.toUpperCase()} URL</label>
<input className="field-input mono" value={config.url || ''}
placeholder={type === 'srt' ? 'srt://host:9000' : 'rtmp://host/live'}
onChange={e => set('url', e.target.value)} />
</div>
{type === 'rtmp' && (
<div className="field">
<label className="field-label">Stream key</label>
<input className="field-input mono" value={config.key || ''}
onChange={e => set('key', e.target.value)} />
</div>
)}
{type === 'srt' && (
<div className="field">
<label className="field-label">Latency (ms)</label>
<input className="field-input" type="number" value={config.latency || 200}
onChange={e => set('latency', parseInt(e.target.value, 10) || 200)} />
</div>
)}
</React.Fragment>
);
}
// Channel create modal
function ChannelCreate({ onClose, onCreated }) {
const PROJECTS = window.ZAMPP_DATA?.PROJECTS || [];
const [name, setName] = React.useState('');
const [outputType, setOutputType] = React.useState('srt');
const [config, setConfig] = React.useState({});
const [videoFormat, setVideoFormat] = React.useState('1080i5994');
const [projectId, setProjectId] = React.useState(PROJECTS[0]?.id || '');
const [busy, setBusy] = React.useState(false);
const [err, setErr] = React.useState(null);
const submit = async () => {
setBusy(true); setErr(null);
try {
const ch = await poFetch('/channels', {
method: 'POST',
body: JSON.stringify({
name, output_type: outputType, output_config: config,
video_format: videoFormat, project_id: projectId || null,
}),
});
onCreated(ch);
} catch (e) { setErr(e.message || 'Failed to create channel'); }
finally { setBusy(false); }
};
return (
<div className="modal-backdrop" onClick={onClose}>
<div className="modal" onClick={e => e.stopPropagation()} style={{ maxWidth: 460 }}>
<div className="modal-header"><h3>New Playout Channel</h3></div>
<div className="modal-body">
<div className="field">
<label className="field-label">Name</label>
<input className="field-input" value={name} autoFocus
onChange={e => setName(e.target.value)} placeholder="Channel 1" />
</div>
<div className="field">
<label className="field-label">Output</label>
<select className="field-input" value={outputType}
onChange={e => { setOutputType(e.target.value); setConfig({}); }}>
{PO_OUTPUTS.map(o => <option key={o.value} value={o.value}>{o.label}</option>)}
</select>
</div>
<OutputConfigFields type={outputType} config={config} onChange={setConfig} />
<div className="field">
<label className="field-label">Video format</label>
<select className="field-input" value={videoFormat} onChange={e => setVideoFormat(e.target.value)}>
{PO_FORMATS.map(f => <option key={f} value={f}>{f}</option>)}
</select>
</div>
<div className="field">
<label className="field-label">Project (RBAC scope)</label>
<select className="field-input" value={projectId} onChange={e => setProjectId(e.target.value)}>
<option value=""> admin only </option>
{PROJECTS.map(p => <option key={p.id} value={p.id}>{p.name}</option>)}
</select>
</div>
{err && <div className="alert error">{err}</div>}
</div>
<div className="modal-footer">
<button className="btn ghost" onClick={onClose}>Cancel</button>
<button className="btn primary" disabled={busy || !name} onClick={submit}>
{busy ? 'Creating…' : 'Create'}
</button>
</div>
</div>
</div>
);
}
// Media bin: assets draggable into the playlist
function MediaBin({ projectId }) {
const ASSETS = (window.ZAMPP_DATA?.ASSETS || []).filter(a =>
!projectId || a.project_id === projectId);
const [q, setQ] = React.useState('');
const filtered = ASSETS.filter(a => !q || (a.name || '').toLowerCase().includes(q.toLowerCase()));
const onDragStart = (e, asset) => {
e.dataTransfer.setData('application/x-df-asset', JSON.stringify({ id: asset.id, name: asset.name }));
e.dataTransfer.effectAllowed = 'copy';
};
return (
<div className="panel po-bin">
<div className="po-bin-head">
<span className="po-section-label">Media Bin</span>
<input className="field-input sm" placeholder="Filter…" value={q}
onChange={e => setQ(e.target.value)} style={{ maxWidth: 160 }} />
</div>
<div className="po-bin-list">
{filtered.length === 0 && <div className="muted" style={{ padding: 12 }}>No assets.</div>}
{filtered.map(a => (
<div key={a.id} className="po-bin-item" draggable
onDragStart={e => onDragStart(e, a)} title="Drag into the playlist">
<span className="po-bin-name">{a.name}</span>
<span className="mono muted" style={{ fontSize: 11 }}>{a.duration || ''}</span>
</div>
))}
</div>
</div>
);
}
const MEDIA_STATUS_BADGE = {
ready: 'success', staging: 'warn', pending: 'neutral', error: 'error',
};
// Playlist: ordered, drag-drop reorder, drop-target for bin assets
function Playlist({ channel, playlistId, items, onReload }) {
const [dragIndex, setDragIndex] = React.useState(null);
const onItemDragStart = (e, index) => {
setDragIndex(index);
e.dataTransfer.effectAllowed = 'move';
};
const onItemDragOver = (e) => { e.preventDefault(); };
const onItemDrop = async (e, index) => {
e.preventDefault();
// Asset dropped from the bin append.
const assetRaw = e.dataTransfer.getData('application/x-df-asset');
if (assetRaw) {
const asset = JSON.parse(assetRaw);
await poFetch('/playlists/' + playlistId + '/items', {
method: 'POST', body: JSON.stringify({ asset_id: asset.id }),
});
onReload();
return;
}
// Reorder within the playlist.
if (dragIndex === null || dragIndex === index) return;
const order = items.map(i => i.id);
const [moved] = order.splice(dragIndex, 1);
order.splice(index, 0, moved);
setDragIndex(null);
await poFetch('/playlists/' + playlistId + '/reorder', {
method: 'PUT', body: JSON.stringify({ order }),
});
onReload();
};
// Dropping onto empty area appends.
const onContainerDrop = async (e) => {
const assetRaw = e.dataTransfer.getData('application/x-df-asset');
if (!assetRaw) return;
e.preventDefault();
const asset = JSON.parse(assetRaw);
await poFetch('/playlists/' + playlistId + '/items', {
method: 'POST', body: JSON.stringify({ asset_id: asset.id }),
});
onReload();
};
const removeItem = async (id) => {
await poFetch('/items/' + id, { method: 'DELETE' });
onReload();
};
const restage = async (id) => {
await poFetch('/items/' + id + '/stage', { method: 'POST' });
onReload();
};
return (
<div className="panel po-playlist" onDragOver={e => e.preventDefault()} onDrop={onContainerDrop}>
<div className="po-section-label" style={{ padding: '8px 12px' }}>Playlist</div>
{items.length === 0 && (
<div className="muted po-playlist-empty">Drag clips here to build the playlist.</div>
)}
{items.map((it, index) => (
<div key={it.id} className="po-pl-item" draggable
onDragStart={e => onItemDragStart(e, index)}
onDragOver={onItemDragOver}
onDrop={e => onItemDrop(e, index)}>
<span className="po-pl-index">{index + 1}</span>
<span className="po-pl-name">{it.clip_name || it.asset_id}</span>
<span className={'badge ' + (MEDIA_STATUS_BADGE[it.media_status] || 'neutral')}>
{it.media_status}
</span>
{it.media_status === 'error' && (
<button className="btn ghost xs" onClick={() => restage(it.id)}>Retry</button>
)}
<button className="btn ghost xs" onClick={() => removeItem(it.id)}></button>
</div>
))}
</div>
);
}
// Transport bar
function Transport({ channel, playlistId, onStatus }) {
const [busy, setBusy] = React.useState(false);
const act = async (fn) => { setBusy(true); try { await fn(); } catch (e) { alert(e.message); } finally { setBusy(false); } };
const play = () => act(async () => {
const r = await poFetch('/channels/' + channel.id + '/play', {
method: 'POST', body: JSON.stringify({ playlist_id: playlistId }),
});
onStatus && onStatus(r);
});
const pause = () => act(() => poFetch('/channels/' + channel.id + '/pause', { method: 'POST' }));
const resume = () => act(() => poFetch('/channels/' + channel.id + '/resume', { method: 'POST' }));
const skip = () => act(() => poFetch('/channels/' + channel.id + '/skip', { method: 'POST' }));
const stopPb = () => act(() => poFetch('/channels/' + channel.id + '/stop-playback', { method: 'POST' }));
const live = channel.status === 'running';
return (
<div className="po-transport">
<button className="btn primary" disabled={!live || busy || !playlistId} onClick={play}> Play</button>
<button className="btn ghost" disabled={!live || busy} onClick={pause}> Pause</button>
<button className="btn ghost" disabled={!live || busy} onClick={resume}> Resume</button>
<button className="btn ghost" disabled={!live || busy} onClick={skip}> Skip</button>
<button className="btn danger ghost" disabled={!live || busy} onClick={stopPb}> Stop</button>
</div>
);
}
// Program monitor
function ProgramMonitor({ channel, engine }) {
const onAir = channel.status === 'running';
return (
<div className="po-monitor">
<div className="po-monitor-head">
<span className={'po-onair ' + (onAir ? 'live' : '')}>{onAir ? '● ON AIR' : '○ OFF'}</span>
<span className="mono muted">{channel.output_type?.toUpperCase()} · {channel.video_format}</span>
</div>
<div className="po-monitor-screen">
{engine && engine.currentClip
? <div className="po-monitor-clip">{engine.currentClip}</div>
: <div className="muted">{onAir ? 'Idle — no clip playing' : 'Channel stopped'}</div>}
</div>
{engine && (
<div className="po-monitor-foot mono muted">
clip {engine.currentIndex >= 0 ? engine.currentIndex + 1 : ''} / {engine.playlistLength || 0}
{engine.loop ? ' · loop' : ''}
</div>
)}
</div>
);
}
// Channel detail (monitors + bin + playlist + transport)
function ChannelDetail({ channel, onChannelChange }) {
const [playlists, setPlaylists] = React.useState([]);
const [playlistId, setPlaylistId] = React.useState(null);
const [items, setItems] = React.useState([]);
const [engine, setEngine] = React.useState(null);
const [ch, setCh] = React.useState(channel);
React.useEffect(() => { setCh(channel); }, [channel.id]);
const loadPlaylists = React.useCallback(async () => {
const pls = await poFetch('/playlists?channel_id=' + channel.id);
setPlaylists(pls);
if (pls.length && !playlistId) setPlaylistId(pls[0].id);
if (!pls.length) {
// Auto-create a default playlist so the operator can start dragging.
const created = await poFetch('/playlists', {
method: 'POST', body: JSON.stringify({ channel_id: channel.id, name: 'Main' }),
});
setPlaylists([created]); setPlaylistId(created.id);
}
}, [channel.id]);
const loadItems = React.useCallback(async () => {
if (!playlistId) return;
const its = await poFetch('/playlists/' + playlistId + '/items');
setItems(its);
}, [playlistId]);
React.useEffect(() => { loadPlaylists(); }, [channel.id]);
React.useEffect(() => { loadItems(); }, [playlistId]);
// Poll engine status + item staging while live.
React.useEffect(() => {
let t;
const poll = async () => {
try {
const s = await poFetch('/channels/' + channel.id + '/status');
setEngine(s.engine || null);
} catch (_) {}
try { await loadItems(); } catch (_) {}
t = setTimeout(poll, 4000);
};
poll();
return () => clearTimeout(t);
}, [channel.id, playlistId]);
const startChannel = async () => {
const updated = await poFetch('/channels/' + channel.id + '/start', { method: 'POST' });
setCh(updated); onChannelChange(updated);
};
const stopChannel = async () => {
const updated = await poFetch('/channels/' + channel.id + '/stop', { method: 'POST' });
setCh(updated); onChannelChange(updated);
};
return (
<div className="po-detail">
<div className="po-detail-head">
<div>
<h3 style={{ margin: 0 }}>{ch.name}</h3>
<span className="mono muted">{ch.output_type?.toUpperCase()} · {ch.video_format} · {ch.status}</span>
</div>
<div className="po-detail-actions">
{ch.status === 'running'
? <button className="btn danger" onClick={stopChannel}>Stop channel</button>
: <button className="btn primary" onClick={startChannel}>Start channel</button>}
</div>
</div>
{ch.error_message && <div className="alert error">{ch.error_message}</div>}
<div className="po-grid">
<ProgramMonitor channel={ch} engine={engine} />
<MediaBin projectId={ch.project_id} />
</div>
<Transport channel={ch} playlistId={playlistId} onStatus={() => loadItems()} />
{playlistId && (
<Playlist channel={ch} playlistId={playlistId} items={items} onReload={loadItems} />
)}
</div>
);
}
// Top-level page
function Playout() {
const [channels, setChannels] = React.useState(null);
const [selectedId, setSelectedId] = React.useState(null);
const [showCreate, setShowCreate] = React.useState(false);
const [err, setErr] = React.useState(null);
const load = React.useCallback(async () => {
try {
const list = await poFetch('/channels');
setChannels(list);
if (list.length && !selectedId) setSelectedId(list[0].id);
} catch (e) { setErr(e.message); setChannels([]); }
}, [selectedId]);
React.useEffect(() => { load(); }, []);
const selected = (channels || []).find(c => c.id === selectedId) || null;
const onChannelChange = (updated) => {
setChannels(cs => (cs || []).map(c => c.id === updated.id ? updated : c));
};
return (
<div className="page">
<div className="page-header">
<span className="title">Playout Master Control</span>
<span className="subtitle">Schedule and play assets to SDI, NDI, SRT or RTMP.</span>
</div>
<div className="page-body po-page">
{err && <div className="alert error">{err}</div>}
<div className="po-channels-bar">
{(channels || []).map(c => (
<button key={c.id}
className={'po-chan-tab ' + (c.id === selectedId ? 'active' : '')}
onClick={() => setSelectedId(c.id)}>
<span className={'po-chan-dot ' + (c.status === 'running' ? 'live' : '')} />
{c.name}
</button>
))}
<button className="btn ghost sm" onClick={() => setShowCreate(true)}>+ Channel</button>
</div>
{channels === null && <div className="muted">Loading channels</div>}
{channels !== null && channels.length === 0 && (
<div className="po-empty">
<p className="muted">No playout channels yet.</p>
<button className="btn primary" onClick={() => setShowCreate(true)}>Create your first channel</button>
</div>
)}
{selected && <ChannelDetail key={selected.id} channel={selected} onChannelChange={onChannelChange} />}
</div>
{showCreate && (
<ChannelCreate
onClose={() => setShowCreate(false)}
onCreated={(ch) => { setShowCreate(false); setChannels(cs => [...(cs || []), ch]); setSelectedId(ch.id); }}
/>
)}
</div>
);
}
window.Playout = Playout;

View file

@ -28,6 +28,7 @@ const NAV_SECTIONS = [
label: "Operations",
items: [
{ id: "capture", label: "Capture", icon: "capture" },
{ id: "playout", label: "Playout", icon: "monitor" },
{ id: "jobs", label: "Jobs", icon: "jobs" },
],
},

View file

@ -0,0 +1,104 @@
/* Playout / Master Control (MCR) page styles. */
.po-page { display: flex; flex-direction: column; gap: 14px; }
/* Channel tab bar */
.po-channels-bar {
display: flex; align-items: center; gap: 8px; flex-wrap: wrap;
padding-bottom: 10px; border-bottom: 1px solid var(--border);
}
.po-chan-tab {
display: inline-flex; align-items: center; gap: 7px;
padding: 6px 12px; border-radius: 8px;
background: var(--bg-2); border: 1px solid var(--border);
color: var(--text-2); font-size: 13px; cursor: pointer;
}
.po-chan-tab:hover { background: var(--bg-3); color: var(--text-1); }
.po-chan-tab.active { background: var(--accent-soft); color: var(--accent-text); border-color: var(--accent-soft-2); }
.po-chan-dot {
width: 8px; height: 8px; border-radius: 50%;
background: var(--text-3);
}
.po-chan-dot.live { background: var(--danger); box-shadow: 0 0 0 3px var(--danger-soft); }
.po-empty { text-align: center; padding: 48px 0; display: flex; flex-direction: column; gap: 12px; align-items: center; }
/* Channel detail */
.po-detail { display: flex; flex-direction: column; gap: 14px; }
.po-detail-head { display: flex; justify-content: space-between; align-items: flex-start; }
.po-detail-actions { display: flex; gap: 8px; }
.po-grid {
display: grid; grid-template-columns: 1.4fr 1fr; gap: 14px;
}
@media (max-width: 900px) { .po-grid { grid-template-columns: 1fr; } }
.po-section-label {
font-size: 11px; text-transform: uppercase; letter-spacing: 0.06em;
color: var(--text-3); font-weight: 600;
}
/* Program monitor */
.po-monitor {
background: var(--bg-1); border: 1px solid var(--border); border-radius: 12px;
display: flex; flex-direction: column; overflow: hidden;
}
.po-monitor-head {
display: flex; justify-content: space-between; align-items: center;
padding: 10px 12px; border-bottom: 1px solid var(--border);
}
.po-onair { font-size: 12px; font-weight: 700; color: var(--text-3); letter-spacing: 0.04em; }
.po-onair.live { color: var(--danger); }
.po-monitor-screen {
flex: 1; min-height: 220px; background: #000;
display: flex; align-items: center; justify-content: center;
color: var(--text-2);
}
.po-monitor-clip { font-family: var(--font-mono); font-size: 14px; color: var(--text-1); }
.po-monitor-foot { padding: 8px 12px; border-top: 1px solid var(--border); font-size: 11px; }
/* Media bin */
.po-bin {
display: flex; flex-direction: column; min-height: 260px; max-height: 360px;
border-radius: 12px; overflow: hidden;
}
.po-bin-head { display: flex; justify-content: space-between; align-items: center; gap: 8px; padding: 10px 12px; border-bottom: 1px solid var(--border); }
.po-bin-list { overflow-y: auto; flex: 1; }
.po-bin-item {
display: flex; justify-content: space-between; align-items: center; gap: 8px;
padding: 8px 12px; border-bottom: 1px solid var(--border);
cursor: grab; user-select: none;
}
.po-bin-item:hover { background: var(--bg-3); }
.po-bin-item:active { cursor: grabbing; }
.po-bin-name { font-size: 13px; color: var(--text-1); overflow: hidden; text-overflow: ellipsis; white-space: nowrap; }
/* Transport */
.po-transport {
display: flex; gap: 8px; flex-wrap: wrap;
padding: 12px; background: var(--bg-1); border: 1px solid var(--border); border-radius: 12px;
}
/* Playlist */
.po-playlist {
border-radius: 12px; overflow: hidden;
min-height: 120px;
}
.po-playlist-empty { padding: 28px 12px; text-align: center; }
.po-pl-item {
display: flex; align-items: center; gap: 10px;
padding: 9px 12px; border-bottom: 1px solid var(--border);
cursor: grab; user-select: none;
}
.po-pl-item:hover { background: var(--bg-3); }
.po-pl-item:active { cursor: grabbing; }
.po-pl-index {
width: 22px; text-align: center; font-family: var(--font-mono);
font-size: 12px; color: var(--text-3);
}
.po-pl-name { flex: 1; font-size: 13px; color: var(--text-1); overflow: hidden; text-overflow: ellipsis; white-space: nowrap; }
/* Small button variants reused */
.btn.xs { padding: 2px 8px; font-size: 11px; }
.btn.sm { padding: 5px 10px; font-size: 12px; }
.field-input.sm { padding: 5px 8px; font-size: 12px; }

View file

@ -7,6 +7,7 @@ import { conformWorker } from './workers/conform.js';
import { youtubeImportWorker, proxyQueue as youtubeProxyQueue } from './workers/youtube-import.js';
import { trimWorker } from './workers/trimWorker.js';
import { hlsWorker } from './workers/hls.js';
import { playoutStageWorker } from './workers/playout-stage.js';
import { startPromotionWorker } from './workers/promotion.js';
const parseRedisUrl = (url) => {
@ -94,6 +95,9 @@ const workers = [
lockDuration: 10 * 60 * 1000,
lockRenewTime: 60000,
}),
// playout-stage = S3 → /media volume + EBU R128 loudnorm. CPU/IO-bound;
// colocate with workers that already have ffmpeg + the media mount.
want('playout-stage') && createWorker('playout-stage', playoutStageWorker, { concurrency: 1 }),
].filter(Boolean);
console.log(`WORKER_QUEUES=${_wq || '(all)'}`);

View file

@ -0,0 +1,137 @@
import { join, extname } from 'path';
import { mkdir, stat, rename, unlink } from 'fs/promises';
import { spawn } from 'child_process';
import { query } from '../db/client.js';
import { downloadFromS3 } from '../s3/client.js';
// Playout media staging — copy an asset from S3 into the shared CasparCG media
// volume so a playout channel can play it. CasparCG plays from a local folder
// (/media), not from S3, so every playlist item must be staged to 'ready'
// before it can go on air. See docs/superpowers/specs/2026-05-30-playout-mcr-design.md §4.
//
// Two passes:
// 1. download from S3 to /media/playout/<assetId><ext>.raw
// 2. ffmpeg loudnorm (EBU R128, target I=-23 LUFS, TP=-1 dBTP, LRA=11) to the
// final path, then atomic rename. Skipped when items.audio_normalized=true.
//
// The media volume is mounted into BOTH this worker and the playout sidecars at
// the same path (PLAYOUT_MEDIA_DIR, default /media). We stage under a per-asset
// filename so re-staging is idempotent and multiple items referencing the same
// asset share one file.
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
const MEDIA_DIR = process.env.PLAYOUT_MEDIA_DIR || '/media';
async function fileExists(p) {
try { const s = await stat(p); return s.size > 0; } catch { return false; }
}
// Two-pass loudnorm: pass 1 measures, pass 2 applies linear normalization with
// the measured values. Linear mode preserves dynamics at the cost of accuracy
// vs the target — fine for broadcast playout where transparent levels matter
// more than hitting -23 LUFS to the decibel.
function runFfmpeg(args) {
return new Promise((resolve, reject) => {
const proc = spawn('ffmpeg', args, { stdio: ['ignore', 'pipe', 'pipe'] });
let stderr = '';
proc.stderr.on('data', (d) => { stderr += d.toString(); });
proc.on('error', reject);
proc.on('close', (code) => {
if (code === 0) resolve(stderr);
else reject(new Error(`ffmpeg exited ${code}: ${stderr.slice(-500)}`));
});
});
}
async function measureLoudness(inputPath) {
// -23 / -1 / 11 are the EBU R128 broadcast targets; loudnorm prints a JSON
// block to stderr after the analysis pass which feeds pass 2's measured_*
// params.
const stderr = await runFfmpeg([
'-hide_banner', '-nostats', '-i', inputPath,
'-af', 'loudnorm=I=-23:TP=-1:LRA=11:print_format=json',
'-f', 'null', '-',
]);
const match = stderr.match(/\{[\s\S]*?"input_i"[\s\S]*?\}/);
if (!match) throw new Error('loudnorm pass 1 produced no JSON');
return JSON.parse(match[0]);
}
async function applyLoudnorm(inputPath, outputPath, m) {
// Pass 2: linear normalization using pass 1's measurements. -c:v copy keeps
// the video stream intact so we only re-encode audio (target AAC stereo, the
// common-denominator CasparCG ffmpeg producer accepts).
await runFfmpeg([
'-hide_banner', '-nostats', '-y', '-i', inputPath,
'-af', `loudnorm=I=-23:TP=-1:LRA=11:measured_I=${m.input_i}:measured_TP=${m.input_tp}:measured_LRA=${m.input_lra}:measured_thresh=${m.input_thresh}:offset=${m.target_offset}:linear=true:print_format=summary`,
'-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k', '-ar', '48000',
outputPath,
]);
}
export async function playoutStageWorker(job) {
const { itemId, assetId } = job.data;
if (!itemId || !assetId) throw new Error('playout-stage requires itemId + assetId');
await query("UPDATE playout_items SET media_status = 'staging', updated_at = NOW() WHERE id = $1", [itemId]);
try {
const a = await query(
'SELECT id, filename, original_s3_key, proxy_s3_key FROM assets WHERE id = $1', [assetId]);
if (a.rows.length === 0) throw new Error(`asset ${assetId} not found`);
const asset = a.rows[0];
// Prefer the master for air quality; fall back to proxy if no master key.
const s3Key = asset.original_s3_key || asset.proxy_s3_key;
if (!s3Key) throw new Error(`asset ${assetId} has no S3 media key to stage`);
const ext = extname(s3Key) || extname(asset.filename || '') || '.mp4';
// Stable per-asset path under the media volume; CasparCG resolves the token
// "playout/<assetId>" against MEDIA_DIR.
const relDir = 'playout';
const fileName = `${assetId}${ext}`;
const absDir = join(MEDIA_DIR, relDir);
const absPath = join(absDir, fileName);
const mediaPath = join(MEDIA_DIR, relDir, fileName);
await mkdir(absDir, { recursive: true });
// Skip the whole pipeline when the final file already exists from a prior
// stage of the same asset. The audio_normalized flag is per-item so a
// second item referencing the same staged file gets flipped to true below.
const itemRow = await query('SELECT audio_normalized FROM playout_items WHERE id = $1', [itemId]);
const alreadyNormalized = itemRow.rows[0]?.audio_normalized === true;
if (!(await fileExists(absPath))) {
const rawPath = `${absPath}.raw${ext}`;
console.log(`[playout-stage] downloading ${s3Key} -> ${rawPath}`);
await downloadFromS3(S3_BUCKET, s3Key, rawPath);
if (alreadyNormalized) {
// Asset was previously normalized for another item — keep the bytes
// as-is. Atomic rename so CasparCG never sees a partial file.
await rename(rawPath, absPath);
} else {
console.log(`[playout-stage] loudnorm pass 1: ${rawPath}`);
const measured = await measureLoudness(rawPath);
const tmpOut = `${absPath}.tmp${ext}`;
console.log(`[playout-stage] loudnorm pass 2: I=${measured.input_i} TP=${measured.input_tp} -> ${tmpOut}`);
await applyLoudnorm(rawPath, tmpOut, measured);
await rename(tmpOut, absPath);
await unlink(rawPath).catch(() => {});
}
} else {
console.log(`[playout-stage] already staged: ${absPath}`);
}
await query(
"UPDATE playout_items SET media_status = 'ready', media_path = $1, audio_normalized = TRUE, updated_at = NOW() WHERE id = $2",
[mediaPath, itemId]);
console.log(`[playout-stage] item ${itemId} ready at ${mediaPath}`);
return { itemId, mediaPath };
} catch (err) {
await query("UPDATE playout_items SET media_status = 'error', updated_at = NOW() WHERE id = $1", [itemId])
.catch(() => {});
throw err;
}
}