Compare commits
48 commits
fix/audit-
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ed1254fd9 | ||
|
|
b5235e0a2c | ||
|
|
5686e65df9 | ||
|
|
ccaef50c09 | ||
|
|
522faacdcc | ||
|
|
a1a0823812 | ||
|
|
cd67dfceea | ||
|
|
5eaf71b70c | ||
|
|
69eefdb512 | ||
|
|
04e6646e6e | ||
|
|
91f80c05bc | ||
|
|
aff3c0ece2 | ||
|
|
38b31d6170 | ||
|
|
aa646dbb71 | ||
|
|
6294e98dc3 | ||
|
|
4b018cb8cb | ||
|
|
36740de86b | ||
|
|
d193b84466 | ||
|
|
7a89c83ff4 | ||
|
|
d138265245 | ||
|
|
b6545e61a9 | ||
|
|
9dc86aa3b6 | ||
|
|
07d1fc9e72 | ||
|
|
01211fef7a | ||
| 97d725537b | |||
| d654f7c8a1 | |||
| eeaa1c1b58 | |||
|
|
99723da00f | ||
|
|
b700902200 | ||
|
|
b2c63de2fa | ||
|
|
0d479d043d | ||
|
|
1573bf8954 | ||
| 2f1697b77b | |||
| c269468014 | |||
| 108390e823 | |||
| 7704988978 | |||
| a00e90ecc8 | |||
| c21260c9b0 | |||
| d16d19c26d | |||
| 63f05cd652 | |||
|
|
dbef15ae0a | ||
|
|
99bd6a8c9c | ||
|
|
4e6142f455 | ||
|
|
02d502baaf | ||
|
|
00a7af7c54 | ||
| cb9ef9c14e | |||
| f48a0b73ee | |||
| 463cc3694d |
41 changed files with 4037 additions and 354 deletions
|
|
@ -69,6 +69,14 @@ GOOGLE_ALLOWED_DOMAIN=
|
|||
# the authenticator code (Google is treated as the first factor). Accounts without
|
||||
# TOTP complete sign-in in one Google step.
|
||||
|
||||
# Framecache — shared memory ring buffer for SDI + network ingest fan-out.
|
||||
# Size in GB. Tune per node based on available RAM and number of SDI inputs.
|
||||
# Each 1080p59.94 source uses ~494MB (120-frame ring at 4.1MB/frame).
|
||||
# Baratheon (251GB RAM): 60
|
||||
# zampp1 (93GB RAM): 40
|
||||
# zampp2 (18GB RAM): 8 (increase node RAM before deploying capture)
|
||||
FC_SHM_SIZE_GB=40
|
||||
|
||||
# Playout / Master Control (MCR)
|
||||
# Image tag the mam-api spawns when a channel starts. Build with:
|
||||
# docker compose --profile build-only build playout
|
||||
|
|
|
|||
|
|
@ -60,6 +60,12 @@ services:
|
|||
BMD_MODEL: ${BMD_MODEL:-}
|
||||
BMD_DEVICE_PREFIX: ${BMD_DEVICE_PREFIX:-dv}
|
||||
LIVE_DIR: ${LIVE_DIR:-/mnt/NVME/MAM/wild-dragon-live}
|
||||
# Framecache service URL (on the wild-dragon-worker network)
|
||||
FC_URL: ${FC_URL:-http://framecache:7435}
|
||||
# net_ingest binary — runs inside the framecache container via docker exec.
|
||||
# node-agent has docker.sock so it can exec into the framecache container.
|
||||
# Override with a host-installed path if preferred.
|
||||
NET_INGEST_BIN: ${NET_INGEST_BIN:-docker exec framecache net_ingest}
|
||||
# REPO_DIR: host path to the checked-out repo. The agent passes this to the
|
||||
# one-shot driver-install container so install-driver.sh can read
|
||||
# sdk/<vendor>/ and run deploy/install-driver.sh. Must match the host path
|
||||
|
|
@ -79,8 +85,7 @@ services:
|
|||
# /dev and /opt from the host (handled in the agent, not here) so DKMS /
|
||||
# modprobe / ldconfig affect the host kernel.
|
||||
- ${REPO_DIR:-/opt/wild-dragon}:${REPO_DIR:-/opt/wild-dragon}:ro
|
||||
devices:
|
||||
- /dev/blackmagic:/dev/blackmagic
|
||||
# (DeckLink devices are mounted dynamically if present)
|
||||
|
||||
worker:
|
||||
build: ./services/worker
|
||||
|
|
@ -103,7 +108,9 @@ services:
|
|||
# SDI capture service — only start on nodes with Blackmagic DeckLink cards
|
||||
# Set BMD_DEVICE_0 in .env.worker to the actual device path, e.g. /dev/blackmagic/dv0
|
||||
capture:
|
||||
build: ./services/capture
|
||||
build:
|
||||
context: .
|
||||
dockerfile: services/capture/Dockerfile
|
||||
profiles: [capture]
|
||||
restart: unless-stopped
|
||||
runtime: nvidia
|
||||
|
|
@ -117,9 +124,9 @@ services:
|
|||
CAPTURE_PORT: 3001
|
||||
NVIDIA_VISIBLE_DEVICES: all
|
||||
NVIDIA_DRIVER_CAPABILITIES: video,compute,utility
|
||||
devices:
|
||||
- ${BMD_DEVICE_0:-/dev/blackmagic/dv0}:/dev/blackmagic/dv0
|
||||
- ${BMD_DEVICE_1:-/dev/blackmagic/dv1}:/dev/blackmagic/dv1
|
||||
# (Devices are dynamically mounted by node-agent)
|
||||
volumes:
|
||||
- /dev/shm:/dev/shm
|
||||
ports:
|
||||
- "${CAPTURE_PORT:-7437}:3001"
|
||||
networks:
|
||||
|
|
@ -151,6 +158,34 @@ services:
|
|||
networks:
|
||||
- wild-dragon-worker
|
||||
|
||||
# Framecache — shared memory ring buffer for SDI + network ingest fan-out.
|
||||
# Runs on every worker node that has capture sources (Blackmagic, Deltacast).
|
||||
# IPC host mode lets all capture sidecars share /dev/shm with this container.
|
||||
# FC_SHM_SIZE can be tuned per node in .env.worker:
|
||||
# Baratheon (251GB RAM): FC_SHM_SIZE=64424509440 (60GB)
|
||||
# zampp1 (93GB RAM): FC_SHM_SIZE=42949672960 (40GB)
|
||||
# zampp2 (18GB RAM): FC_SHM_SIZE=8589934592 (8GB — increase RAM first)
|
||||
framecache:
|
||||
build: ./services/framecache
|
||||
profiles: [capture]
|
||||
restart: unless-stopped
|
||||
ipc: host
|
||||
shm_size: '${FC_SHM_SIZE_GB:-40}gb'
|
||||
environment:
|
||||
FC_PORT: 7435
|
||||
ports:
|
||||
- "127.0.0.1:7435:7435"
|
||||
volumes:
|
||||
- /dev/shm:/dev/shm
|
||||
networks:
|
||||
- wild-dragon-worker
|
||||
healthcheck:
|
||||
test: ["CMD", "wget", "-qO-", "http://localhost:7435/health"]
|
||||
interval: 10s
|
||||
timeout: 3s
|
||||
retries: 3
|
||||
start_period: 5s
|
||||
|
||||
networks:
|
||||
wild-dragon-worker:
|
||||
driver: bridge
|
||||
|
|
|
|||
221
docs/design/framecache/PLAN.md
Normal file
221
docs/design/framecache/PLAN.md
Normal file
|
|
@ -0,0 +1,221 @@
|
|||
# Unified Framecache — Implementation Plan
|
||||
|
||||
## Context
|
||||
|
||||
Replace the current named-FIFO-per-source architecture with a shared-memory
|
||||
ring buffer (framecache) that fans raw video frames from any ingest source to
|
||||
unlimited concurrent consumers with zero-copy reads.
|
||||
|
||||
**Approved design:** docs/design/framecache/DESIGN.md
|
||||
**Branch:** feat/unified-framecache
|
||||
**Roadmap (out of scope here):** RDMA cross-node, AJA, growing-file-while-recording browser playback
|
||||
|
||||
---
|
||||
|
||||
## Migration Strategy
|
||||
|
||||
Ship in 5 phases. Each phase is independently deployable and leaves the system
|
||||
in a working state. Existing recording workflows are unaffected until Phase 5
|
||||
cuts over.
|
||||
|
||||
---
|
||||
|
||||
## Phase 1 — Framecache Container (foundation)
|
||||
|
||||
**Goal:** Running framecache service with slot registry. No ingest writers yet.
|
||||
|
||||
### 1.1 — Create `services/framecache/` directory structure
|
||||
|
||||
```
|
||||
services/framecache/
|
||||
src/
|
||||
framecache.c # main — slot manager + HTTP API
|
||||
slot.c / slot.h # shm ring buffer lifecycle
|
||||
registry.c # /dev/shm/framecache/registry.json writer
|
||||
http.c # lightweight HTTP server (libmicrohttpd)
|
||||
client/
|
||||
fc_client.c / fc_client.h # consumer library
|
||||
fc_client_node/
|
||||
binding.cc # Node.js N-API addon
|
||||
binding.gyp
|
||||
Dockerfile
|
||||
CMakeLists.txt
|
||||
```
|
||||
|
||||
### 1.2 — Shared memory layout (slot.h)
|
||||
|
||||
Each slot lives at `/dev/shm/framecache/<slot_id>`:
|
||||
|
||||
```c
|
||||
#define FC_MAGIC 0x46524D43 // "FRMC"
|
||||
#define FC_RING_DEPTH 120 // ~2s at 59.94fps
|
||||
#define FC_HEADER_SIZE 4096 // 4KB header block
|
||||
|
||||
typedef struct {
|
||||
uint32_t magic;
|
||||
uint32_t version; // = 1
|
||||
uint32_t width;
|
||||
uint32_t height;
|
||||
uint32_t fps_num;
|
||||
uint32_t fps_den;
|
||||
uint32_t pixel_format; // FC_PIX_UYVY422 = 0
|
||||
uint32_t frame_size; // width * height * 2
|
||||
uint32_t ring_depth; // = FC_RING_DEPTH
|
||||
_Atomic uint64_t write_cursor; // monotonically increasing frame index
|
||||
_Atomic uint64_t dropped_frames;
|
||||
uint8_t _pad[FC_HEADER_SIZE - 48];
|
||||
} fc_header_t;
|
||||
|
||||
typedef struct {
|
||||
uint64_t pts_us;
|
||||
uint64_t wall_us;
|
||||
uint32_t size;
|
||||
uint8_t data[]; // frame_size bytes
|
||||
} fc_frame_t;
|
||||
```
|
||||
|
||||
Semaphore: `sem_open("/framecache-<slot_id>-write", ...)` — posted by writer
|
||||
on each new frame, consumers `sem_timedwait` on it.
|
||||
|
||||
### 1.3 — HTTP API (port 7435)
|
||||
|
||||
```
|
||||
POST /slots body: {slot_id, width, height, fps_num, fps_den, source_type}
|
||||
creates shm region, writes registry entry
|
||||
201 {slot_id, shm_path, sem_name}
|
||||
|
||||
GET /slots 200 [{slot_id, width, height, fps_num, fps_den,
|
||||
source_type, write_cursor, dropped_frames,
|
||||
current_fps}]
|
||||
|
||||
GET /slots/:id 200 slot detail
|
||||
DELETE /slots/:id destroys shm + semaphore, removes registry entry, 204
|
||||
GET /health 200 {status: "ok"}
|
||||
```
|
||||
|
||||
### 1.4 — Registry file
|
||||
|
||||
Written to `/dev/shm/framecache/registry.json` on every slot create/delete.
|
||||
|
||||
### 1.5 — Dockerfile
|
||||
|
||||
```dockerfile
|
||||
FROM debian:bookworm
|
||||
RUN apt-get update && apt-get install -y \
|
||||
build-essential cmake libmicrohttpd-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
COPY . /src
|
||||
RUN cmake -S /src -B /build -DCMAKE_BUILD_TYPE=Release \
|
||||
&& cmake --build /build -j$(nproc)
|
||||
EXPOSE 7435
|
||||
CMD ["/build/framecache"]
|
||||
```
|
||||
|
||||
### 1.6 — docker-compose.worker.yml addition
|
||||
|
||||
```yaml
|
||||
framecache:
|
||||
build: ./services/framecache
|
||||
ipc: host
|
||||
shm_size: '60gb'
|
||||
environment:
|
||||
FC_SHM_SIZE: ${FC_SHM_SIZE:-64424509440}
|
||||
FC_PORT: 7435
|
||||
ports:
|
||||
- "7435:7435"
|
||||
volumes:
|
||||
- /dev/shm:/dev/shm
|
||||
restart: unless-stopped
|
||||
```
|
||||
|
||||
### 1.7 — Consumer library (fc_client.c)
|
||||
|
||||
```c
|
||||
fc_slot_t *fc_open(const char *slot_id);
|
||||
int fc_read_frame(fc_slot_t *slot, fc_frame_t **out, uint64_t timeout_ms);
|
||||
void fc_close(fc_slot_t *slot);
|
||||
```
|
||||
|
||||
**Commit:** `feat(framecache): phase 1 — framecache container + consumer library`
|
||||
|
||||
---
|
||||
|
||||
## Phase 2 — Deltacast Bridge writes to framecache
|
||||
|
||||
**Goal:** deltacast-bridge writes frames to framecache shm instead of named FIFOs.
|
||||
Legacy FIFO path kept as compile-time fallback (`-DLEGACY_FIFO=ON`) until Phase 5.
|
||||
|
||||
On signal lock:
|
||||
1. POST /slots to framecache HTTP API
|
||||
2. shm_open + mmap the slot
|
||||
3. Video thread writes frame into ring, advances write_cursor atomically, sem_post
|
||||
4. Audio: keeps writing to audio FIFO (unchanged)
|
||||
5. On shutdown: DELETE /slots/:id
|
||||
|
||||
**Commit:** `feat(framecache): phase 2 — deltacast-bridge writes to shm`
|
||||
|
||||
---
|
||||
|
||||
## Phase 3 — Blackmagic DeckLink Bridge
|
||||
|
||||
**Goal:** New decklink-bridge C program mirrors deltacast-bridge, replaces
|
||||
ffmpeg -f decklink direct path.
|
||||
|
||||
- Uses IDeckLinkIterator to enumerate devices
|
||||
- VideoInputFrameArrived callback calls fc_write_frame
|
||||
- Registers slot on signal lock, deregisters on shutdown
|
||||
- Audio stays in FIFO (same as deltacast)
|
||||
|
||||
**Commit:** `feat(framecache): phase 3 — decklink-bridge writes to shm`
|
||||
|
||||
---
|
||||
|
||||
## Phase 4 — capture-manager reads from framecache
|
||||
|
||||
**Goal:** Enables simultaneous growing + proxy + HLS from one SDI input.
|
||||
|
||||
- Node.js N-API addon wrapping fc_open/fc_read_frame/fc_close
|
||||
- capture-manager opens THREE fc_client handles per slot (own cursor each):
|
||||
1. Growing/master ffmpeg feed
|
||||
2. Proxy ffmpeg feed
|
||||
3. HLS preview ffmpeg feed
|
||||
- Each gets a separate rawvideo pipe to ffmpeg
|
||||
- Growing MXF workflow (raw2bmx orchestrator) completely unchanged
|
||||
|
||||
**Commit:** `feat(framecache): phase 4 — capture-manager reads from framecache`
|
||||
|
||||
---
|
||||
|
||||
## Phase 5 — Network ingest (RTMP/SRT) into framecache
|
||||
|
||||
**Goal:** RTMP and SRT sources decoded to raw UYVY422, written into framecache slots.
|
||||
|
||||
- net_ingest process per source: ffmpeg decodes to rawvideo, writes to slot
|
||||
- capture-manager waits for slot, same fc_client consumer pattern
|
||||
- Remove legacy FIFO code once all paths go through framecache
|
||||
|
||||
**Commit:** `feat(framecache): phase 5 — network ingest via framecache`
|
||||
|
||||
---
|
||||
|
||||
## Hardware / Deployment
|
||||
|
||||
| Node | RAM | /dev/shm | FC_SHM_SIZE |
|
||||
|------|-----|----------|-------------|
|
||||
| Baratheon | 251GB | 126GB | 60GB |
|
||||
| zampp1 | 93GB | 47GB | 40GB |
|
||||
| zampp2 | 18GB (upgrade) | 9.4GB | 8GB |
|
||||
|
||||
Ring buffer per 1080p59.94 source: ~494MB (120 frames × 4.1MB)
|
||||
|
||||
All recorder sidecars require `ipc: host`.
|
||||
|
||||
---
|
||||
|
||||
## Roadmap (not in this branch)
|
||||
|
||||
- Audio in framecache shm
|
||||
- RDMA cross-node slot replication
|
||||
- AJA hardware support
|
||||
- Growing-file-while-recording browser HLS playback
|
||||
- Mastercontrol/playout consumer
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
# ── Stage 0: Extract Deltacast VideoMaster SDK ───────────────────────────
|
||||
FROM debian:bookworm AS sdk-extractor
|
||||
COPY videomaster-linux.x64-6.34.1-dev.tar.gz /tmp/
|
||||
COPY services/capture/videomaster-linux.x64-6.34.1-dev.tar.gz /tmp/
|
||||
RUN mkdir -p /sdk && tar -xzf /tmp/videomaster-linux.x64-6.34.1-dev.tar.gz -C /sdk
|
||||
|
||||
# ── Stage 1: Build deltacast-capture bridge binary ───────────────────────
|
||||
|
|
@ -9,12 +9,42 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
|||
build-essential cmake ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
COPY --from=sdk-extractor /sdk /sdk
|
||||
COPY deltacast-bridge/ /bridge/
|
||||
RUN cmake -S /bridge -B /bridge/build \
|
||||
COPY services/capture/deltacast-bridge/ /bridge/
|
||||
RUN rm -rf /bridge/build && cmake -S /bridge -B /bridge/build \
|
||||
-DCMAKE_BUILD_TYPE=Release \
|
||||
-DSDK_ROOT=/sdk \
|
||||
&& cmake --build /bridge/build -j$(nproc)
|
||||
|
||||
# ── Stage 1d: Build fc_pipe (framecache slot → stdout adapter) ──────────
|
||||
# Spawned by capture-manager.js to pipe raw frames from a framecache slot
|
||||
# into ffmpeg as a rawvideo pipe input. Statically linked against fc_client
|
||||
# (no runtime dependency on the framecache container — just shm + semaphores).
|
||||
FROM debian:bookworm AS fc-pipe-builder
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
build-essential cmake libmicrohttpd-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
COPY services/framecache /fc-src
|
||||
RUN rm -rf /fc-src/build && cmake -S /fc-src -B /fc-src/build \
|
||||
-DCMAKE_BUILD_TYPE=Release \
|
||||
&& cmake --build /fc-src/build --target fc_pipe -j$(nproc)
|
||||
|
||||
# ── Stage 1c: Build decklink-bridge binary ───────────────────────────────
|
||||
FROM debian:bookworm AS decklink-bridge-builder
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
build-essential cmake ca-certificates g++ \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
# DeckLink SDK headers (for IDeckLinkInput etc.)
|
||||
COPY services/capture/sdk/ /decklink-sdk/
|
||||
# Shared fc_writer module from deltacast-bridge
|
||||
COPY services/capture/deltacast-bridge/ /fc_writer/
|
||||
# decklink-bridge source
|
||||
COPY services/capture/decklink-bridge/ /decklink-bridge/
|
||||
RUN rm -rf /decklink-bridge/build && cmake -S /decklink-bridge -B /decklink-bridge/build \
|
||||
-DCMAKE_BUILD_TYPE=Release \
|
||||
-DDECKLINK_SDK_DIR=/decklink-sdk \
|
||||
-DDELTACAST_BRIDGE_DIR=/fc_writer \
|
||||
&& cmake --build /decklink-bridge/build -j$(nproc)
|
||||
|
||||
# ── Stage 2: Build FFmpeg with DeckLink + NVENC (HEVC/H264) support ─────────
|
||||
# All-Intra HEVC NVENC is the master codec for growing-file ingest (see
|
||||
# docs/design/2026-05-29-all-intra-hevc-ingest.md). This stage gets the
|
||||
|
|
@ -31,10 +61,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
|||
libzmq3-dev zlib1g-dev libstdc++-12-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
|
||||
# Copy in BMD DeckLink SDK headers and patch script
|
||||
COPY sdk/ /decklink-sdk/
|
||||
COPY patch_decklink.py /patch_decklink.py
|
||||
COPY decklink-sdk16.patch /decklink-sdk16.patch
|
||||
COPY services/capture/sdk/ /decklink-sdk/
|
||||
COPY services/capture/patch_decklink.py /patch_decklink.py
|
||||
COPY services/capture/decklink-sdk16.patch /decklink-sdk16.patch
|
||||
|
||||
# nv-codec-headers — just the ffnvcodec public headers + a pkg-config file.
|
||||
# Pin to a tag known to work with FFmpeg 7.1 (n12.x series).
|
||||
|
|
@ -129,8 +160,8 @@ COPY --from=ffmpeg-builder /usr/local/bin/ffprobe /usr/local/bin/ffprobe
|
|||
COPY --from=ffmpeg-builder /usr/local/lib/ /usr/local/lib/
|
||||
|
||||
# DeckLink runtime .so
|
||||
COPY lib/libDeckLinkAPI.so /usr/lib/libDeckLinkAPI.so
|
||||
COPY lib/libDeckLinkPreviewAPI.so /usr/lib/libDeckLinkPreviewAPI.so
|
||||
COPY services/capture/lib/libDeckLinkAPI.so /usr/lib/libDeckLinkAPI.so
|
||||
COPY services/capture/lib/libDeckLinkPreviewAPI.so /usr/lib/libDeckLinkPreviewAPI.so
|
||||
|
||||
# bmx (raw2bmx / bmxtranswrap / mxf2raw) — the growing OP1a MXF writer used for
|
||||
# the edit-while-record master. Copy the built binaries + shared libs; runtime
|
||||
|
|
@ -151,6 +182,12 @@ RUN raw2bmx -h >/dev/null 2>&1 && echo 'raw2bmx runtime OK'
|
|||
|
||||
# Deltacast bridge binary + SDK runtime libs
|
||||
COPY --from=bridge-builder /bridge/build/deltacast-capture /usr/local/bin/deltacast-capture
|
||||
|
||||
# DeckLink bridge binary is disabled
|
||||
# COPY --from=decklink-bridge-builder /decklink-bridge/build/decklink-bridge /usr/local/bin/decklink-bridge
|
||||
|
||||
# fc_pipe — framecache slot → stdout, spawned by capture-manager.js
|
||||
COPY --from=fc-pipe-builder /fc-src/build/fc_pipe /usr/local/bin/fc_pipe
|
||||
COPY --from=sdk-extractor /sdk/lib/libvideomasterhd.so.6.34.1 /usr/local/lib/deltacast/
|
||||
COPY --from=sdk-extractor /sdk/lib/libvideomasterhd_audio.so.6.34.1 /usr/local/lib/deltacast/
|
||||
RUN ln -sf libvideomasterhd.so.6.34.1 /usr/local/lib/deltacast/libvideomasterhd.so.6 \
|
||||
|
|
@ -166,9 +203,9 @@ RUN ln -sf libvideomasterhd.so.6.34.1 /usr/local/lib/deltacast/libvideomas
|
|||
RUN mkdir -p /live /growing
|
||||
|
||||
WORKDIR /app
|
||||
COPY package*.json ./
|
||||
COPY services/capture/package*.json ./
|
||||
RUN npm install --omit=dev
|
||||
COPY . .
|
||||
COPY services/capture/. .
|
||||
|
||||
EXPOSE 3001
|
||||
CMD ["node", "src/index.js"]
|
||||
|
|
|
|||
51
services/capture/decklink-bridge/CMakeLists.txt
Normal file
51
services/capture/decklink-bridge/CMakeLists.txt
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
cmake_minimum_required(VERSION 3.16)
|
||||
project(decklink-bridge CXX C)
|
||||
|
||||
set(CMAKE_CXX_STANDARD 17)
|
||||
set(CMAKE_C_STANDARD 11)
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -O2")
|
||||
|
||||
# Path to DeckLink SDK headers (services/capture/sdk/)
|
||||
set(DECKLINK_SDK_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../sdk"
|
||||
CACHE PATH "Path to Blackmagic DeckLink SDK headers")
|
||||
|
||||
# Path to Deltacast bridge (for fc_writer.h/c — shared writer module)
|
||||
set(DELTACAST_BRIDGE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../deltacast-bridge"
|
||||
CACHE PATH "Path to deltacast-bridge (contains fc_writer.h/c)")
|
||||
|
||||
# Legacy FIFO fallback option (mirrors deltacast-bridge option)
|
||||
option(LEGACY_FIFO "Use named FIFOs instead of framecache shm" OFF)
|
||||
|
||||
# ── decklink-bridge executable ────────────────────────────────────────
|
||||
add_executable(decklink-bridge
|
||||
main.cpp
|
||||
${DELTACAST_BRIDGE_DIR}/fc_writer.c # shared framecache writer
|
||||
)
|
||||
|
||||
if(LEGACY_FIFO)
|
||||
target_compile_definitions(decklink-bridge PRIVATE LEGACY_FIFO=1)
|
||||
message(STATUS "decklink-bridge: LEGACY_FIFO mode enabled")
|
||||
else()
|
||||
message(STATUS "decklink-bridge: framecache shm mode enabled")
|
||||
endif()
|
||||
|
||||
target_include_directories(decklink-bridge PRIVATE
|
||||
${DECKLINK_SDK_DIR}
|
||||
${DELTACAST_BRIDGE_DIR} # fc_writer.h
|
||||
)
|
||||
|
||||
target_link_libraries(decklink-bridge PRIVATE
|
||||
pthread
|
||||
rt # shm_open, sem_open
|
||||
dl # dlopen (used by DeckLinkAPIDispatch.cpp on Linux)
|
||||
)
|
||||
|
||||
# DeckLink driver is linked at runtime via dlopen (no link-time .so needed).
|
||||
# The SDK's DeckLinkAPIDispatch.cpp handles the dynamic loading.
|
||||
|
||||
set_target_properties(decklink-bridge PROPERTIES
|
||||
INSTALL_RPATH "/usr/local/lib"
|
||||
BUILD_WITH_INSTALL_RPATH TRUE
|
||||
)
|
||||
|
||||
install(TARGETS decklink-bridge DESTINATION bin)
|
||||
560
services/capture/decklink-bridge/main.cpp
Normal file
560
services/capture/decklink-bridge/main.cpp
Normal file
|
|
@ -0,0 +1,560 @@
|
|||
/**
|
||||
* decklink-bridge/main.cpp
|
||||
*
|
||||
* Blackmagic DeckLink SDI shared multi-device bridge daemon.
|
||||
*
|
||||
* Opens one or more DeckLink devices and for each device:
|
||||
* - Auto-detects the incoming signal format
|
||||
* - Registers a framecache slot via HTTP API
|
||||
* - Writes raw UYVY422 (bmdFormat8BitYUV) video frames into the shm ring
|
||||
* - Writes PCM s16le audio to a named FIFO (audio-in-shm is roadmap)
|
||||
*
|
||||
* Slot ID format: "decklink-<node_id>-<device_index>"
|
||||
* node_id comes from NODE_ID env var (set by node-agent), falls back to hostname.
|
||||
*
|
||||
* Usage:
|
||||
* decklink-bridge --devices <csv> # device indices, e.g. "0,1"
|
||||
* decklink-bridge --device <N> # single device compat alias
|
||||
* [--fc-url http://framecache:7435]
|
||||
* [--audio-pipe-dir /dev/shm/decklink]
|
||||
* [--signal-timeout <sec>]
|
||||
*
|
||||
* For each device that acquires signal, emits one JSON line to stderr:
|
||||
* {"device":N,"width":W,"height":H,"fps_num":N,"fps_den":D,
|
||||
* "interlaced":false,"pix_fmt":"uyvy422",
|
||||
* "audio_channels":2,"audio_rate":48000,
|
||||
* "slot_id":"decklink-<node>-<N>"}
|
||||
*
|
||||
* Compile with -DLEGACY_FIFO=1 to fall back to writing a raw video FIFO
|
||||
* instead of the framecache shm path.
|
||||
*/
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <cerrno>
|
||||
#include <cmath>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <pthread.h>
|
||||
#include <signal.h>
|
||||
#include <sys/stat.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "DeckLinkAPI.h"
|
||||
#include "DeckLinkAPIDispatch.cpp"
|
||||
|
||||
#ifndef LEGACY_FIFO
|
||||
extern "C" {
|
||||
# include "fc_writer.h"
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifndef F_SETPIPE_SZ
|
||||
# define F_SETPIPE_SZ 1031
|
||||
#endif
|
||||
|
||||
#define FC_URL_DEFAULT "http://localhost:7435"
|
||||
#define AUDIO_PIPE_DIR "/dev/shm/decklink"
|
||||
#define MAX_DEVICES 8
|
||||
|
||||
/* ── Global shutdown flag ──────────────────────────────────────────── */
|
||||
static std::atomic<int> g_stop{0};
|
||||
static void on_signal(int) { g_stop.store(1); }
|
||||
|
||||
/* ── Helpers ───────────────────────────────────────────────────────── */
|
||||
static uint64_t now_us() {
|
||||
struct timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
return (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL;
|
||||
}
|
||||
|
||||
static int write_all(int fd, const void *buf, size_t len) {
|
||||
const uint8_t *p = static_cast<const uint8_t *>(buf);
|
||||
size_t off = 0;
|
||||
int flags = fcntl(fd, F_GETFL, 0);
|
||||
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
||||
while (off < len) {
|
||||
ssize_t n = write(fd, p + off, len - off);
|
||||
if (n > 0) { off += (size_t)n; continue; }
|
||||
if (n < 0 && errno == EINTR) continue;
|
||||
if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
|
||||
struct timespec ts{0, 1000000L};
|
||||
nanosleep(&ts, nullptr);
|
||||
continue;
|
||||
}
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
return -1;
|
||||
}
|
||||
fcntl(fd, F_SETFL, flags);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* ── Per-device state ──────────────────────────────────────────────── */
|
||||
struct DeviceState {
|
||||
int device_idx = 0;
|
||||
IDeckLink *decklink = nullptr;
|
||||
IDeckLinkInput *input = nullptr;
|
||||
|
||||
/* Signal properties (filled on first frame or format-change) */
|
||||
int width = 0;
|
||||
int height = 0;
|
||||
int fps_num = 0;
|
||||
int fps_den = 1;
|
||||
bool interlaced = false;
|
||||
std::atomic<bool> signal_reported{false};
|
||||
|
||||
std::string slot_id;
|
||||
std::string fc_url;
|
||||
std::string audio_fifo;
|
||||
|
||||
#ifndef LEGACY_FIFO
|
||||
fc_writer_t *fc_writer = nullptr;
|
||||
/* Guards fc_writer + format fields (width/height/fps/signal_reported)
|
||||
* against concurrent access from DeckLink SDK callback threads:
|
||||
* VideoInputFormatChanged and VideoInputFrameArrived can fire on
|
||||
* different threads without mutual exclusion, and reopen_slot() does
|
||||
* close-then-open on fc_writer. Without this lock a frame callback could
|
||||
* call fc_writer_write() on a freed writer (use-after-free), or two
|
||||
* reopen_slot() calls could double-free. */
|
||||
pthread_mutex_t fc_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||
#else
|
||||
int video_fifo_fd = -1;
|
||||
std::string video_fifo;
|
||||
#endif
|
||||
|
||||
/* Audio FIFO fd — opened once, reopened on EPIPE */
|
||||
int audio_fd = -1;
|
||||
pthread_t audio_tid{};
|
||||
std::atomic<int> audio_stop{0};
|
||||
|
||||
uint64_t frame_seq = 0;
|
||||
};
|
||||
|
||||
/* ── Audio thread ──────────────────────────────────────────────────── */
|
||||
/* DeckLink audio arrives via VideoInputFrameArrived callback, not a
|
||||
* separate stream. We write it from the callback directly (see below).
|
||||
* This thread exists only to keep the FIFO open and provide silence
|
||||
* when no frames are arriving (e.g. signal lost). */
|
||||
static void *audio_silence_thread(void *arg) {
|
||||
DeviceState *ds = static_cast<DeviceState *>(arg);
|
||||
|
||||
const int RATE = 48000;
|
||||
const int CH = 2;
|
||||
const int FPS = ds->fps_num > 0 ? ds->fps_num : 30;
|
||||
const int FPS_DEN = ds->fps_den > 0 ? ds->fps_den : 1;
|
||||
long samples = ((long)RATE * FPS_DEN + FPS / 2) / FPS;
|
||||
size_t tick = (size_t)samples * (size_t)CH * 2; /* s16le */
|
||||
std::vector<uint8_t> silence(tick, 0);
|
||||
|
||||
while (!g_stop.load() && !ds->audio_stop.load()) {
|
||||
int fd = open(ds->audio_fifo.c_str(), O_WRONLY);
|
||||
if (fd < 0) {
|
||||
struct timespec ts{0, 200000000L};
|
||||
nanosleep(&ts, nullptr);
|
||||
continue;
|
||||
}
|
||||
fcntl(fd, F_SETPIPE_SZ, 1024 * 1024);
|
||||
ds->audio_fd = fd;
|
||||
|
||||
long frame_ns = (long)(1000000000.0 * (double)FPS_DEN / (double)FPS);
|
||||
struct timespec next;
|
||||
clock_gettime(CLOCK_MONOTONIC, &next);
|
||||
|
||||
while (!g_stop.load() && !ds->audio_stop.load()) {
|
||||
/* Only write silence if no real audio arrived recently.
|
||||
* Real audio is written by VideoInputFrameArrived directly. */
|
||||
if (write_all(ds->audio_fd, silence.data(), tick) < 0) {
|
||||
fprintf(stderr, "[audio:%d] EPIPE — reopening\n", ds->device_idx);
|
||||
break;
|
||||
}
|
||||
next.tv_nsec += frame_ns;
|
||||
while (next.tv_nsec >= 1000000000L) { next.tv_nsec -= 1000000000L; next.tv_sec++; }
|
||||
struct timespec now;
|
||||
clock_gettime(CLOCK_MONOTONIC, &now);
|
||||
if (next.tv_sec > now.tv_sec ||
|
||||
(next.tv_sec == now.tv_sec && next.tv_nsec > now.tv_nsec))
|
||||
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, nullptr);
|
||||
else
|
||||
next = now;
|
||||
}
|
||||
ds->audio_fd = -1;
|
||||
close(fd);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/* ── IDeckLinkInputCallback implementation ─────────────────────────── */
|
||||
class CaptureCallback : public IDeckLinkInputCallback {
|
||||
public:
|
||||
explicit CaptureCallback(DeviceState *ds) : m_ds(ds), m_refcount(1) {}
|
||||
|
||||
/* IUnknown */
|
||||
HRESULT QueryInterface(REFIID, void **) override { return E_NOINTERFACE; }
|
||||
ULONG AddRef() override { return ++m_refcount; }
|
||||
ULONG Release() override {
|
||||
ULONG r = --m_refcount;
|
||||
if (r == 0) delete this;
|
||||
return r;
|
||||
}
|
||||
|
||||
/* IDeckLinkInputCallback */
|
||||
HRESULT VideoInputFormatChanged(
|
||||
BMDVideoInputFormatChangedEvents events,
|
||||
IDeckLinkDisplayMode *newMode,
|
||||
BMDDetectedVideoInputFormatFlags detectedFlags) override
|
||||
{
|
||||
/* Re-enable input with new mode — required for auto-detect to work */
|
||||
m_ds->input->PauseStreams();
|
||||
|
||||
BMDDisplayMode mode = newMode->GetDisplayMode();
|
||||
|
||||
/* Detect interlaced */
|
||||
BMDFieldDominance fd = newMode->GetFieldDominance();
|
||||
m_ds->interlaced = (fd == bmdUpperFieldFirst || fd == bmdLowerFieldFirst);
|
||||
|
||||
/* Get width/height */
|
||||
m_ds->width = (int)newMode->GetWidth();
|
||||
m_ds->height = (int)newMode->GetHeight();
|
||||
|
||||
/* Get frame rate */
|
||||
BMDTimeValue frameDuration; BMDTimeScale timeScale;
|
||||
newMode->GetFrameRate(&frameDuration, &timeScale);
|
||||
m_ds->fps_num = (int)timeScale;
|
||||
m_ds->fps_den = (int)frameDuration;
|
||||
|
||||
m_ds->input->EnableVideoInput(mode, bmdFormat8BitYUV,
|
||||
bmdVideoInputEnableFormatDetection);
|
||||
m_ds->input->FlushStreams();
|
||||
m_ds->input->StartStreams();
|
||||
|
||||
fprintf(stderr, "[decklink:%d] format changed: %dx%d %.4ffps %s\n",
|
||||
m_ds->device_idx,
|
||||
m_ds->width, m_ds->height,
|
||||
m_ds->fps_den ? (double)m_ds->fps_num / m_ds->fps_den : 0.0,
|
||||
m_ds->interlaced ? "interlaced" : "progressive");
|
||||
|
||||
/* Re-open framecache slot with new format */
|
||||
this->reopen_slot();
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
HRESULT VideoInputFrameArrived(
|
||||
IDeckLinkVideoInputFrame *videoFrame,
|
||||
IDeckLinkAudioInputPacket *audioPacket) override
|
||||
{
|
||||
if (g_stop.load()) return S_OK;
|
||||
if (!videoFrame) return S_OK;
|
||||
|
||||
/* Detect format on first frame if format-change hasn't fired.
|
||||
* Use atomic exchange so only ONE thread runs the first-frame init
|
||||
* even if two frame callbacks race before signal_reported is set. */
|
||||
bool exp = false;
|
||||
if (m_ds->signal_reported.compare_exchange_strong(exp, true)) {
|
||||
m_ds->width = (int)videoFrame->GetWidth();
|
||||
m_ds->height = (int)videoFrame->GetHeight();
|
||||
if (m_ds->fps_num == 0) {
|
||||
m_ds->fps_num = 30000;
|
||||
m_ds->fps_den = 1001;
|
||||
}
|
||||
this->reopen_slot();
|
||||
}
|
||||
|
||||
/* ── Write video frame ──────────────────────────────────────── */
|
||||
void *bytes = nullptr;
|
||||
/* Some SDK versions require casting to the base IDeckLinkVideoFrame
|
||||
* to access GetBytes() from an IDeckLinkVideoInputFrame. */
|
||||
IDeckLinkVideoFrame *baseFrame = static_cast<IDeckLinkVideoFrame *>(videoFrame);
|
||||
baseFrame->GetBytes(&bytes);
|
||||
uint32_t sz = (uint32_t)(videoFrame->GetRowBytes() * videoFrame->GetHeight());
|
||||
|
||||
uint32_t frame_bytes_expected = (uint32_t)m_ds->width * (uint32_t)m_ds->height * 2;
|
||||
if (sz != frame_bytes_expected) {
|
||||
fprintf(stderr, "[decklink:%d] WARN: frame sz=%u != expected %u — skipping\n",
|
||||
m_ds->device_idx, sz, frame_bytes_expected);
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
uint64_t pts_us = 0;
|
||||
if (m_ds->fps_num > 0) {
|
||||
pts_us = m_ds->frame_seq * 1000000ULL
|
||||
* (uint64_t)m_ds->fps_den
|
||||
/ (uint64_t)m_ds->fps_num;
|
||||
}
|
||||
|
||||
#ifndef LEGACY_FIFO
|
||||
/* Lock so a concurrent VideoInputFormatChanged → reopen_slot() cannot
|
||||
* free fc_writer between our null-check and the write (use-after-free). */
|
||||
pthread_mutex_lock(&m_ds->fc_lock);
|
||||
if (m_ds->fc_writer) {
|
||||
fc_writer_write(m_ds->fc_writer,
|
||||
static_cast<const uint8_t *>(bytes), sz, pts_us);
|
||||
}
|
||||
pthread_mutex_unlock(&m_ds->fc_lock);
|
||||
#else
|
||||
if (m_ds->video_fifo_fd >= 0) {
|
||||
if (write_all(m_ds->video_fifo_fd,
|
||||
static_cast<const uint8_t *>(bytes), sz) < 0) {
|
||||
fprintf(stderr, "[decklink:%d] video FIFO EPIPE\n", m_ds->device_idx);
|
||||
close(m_ds->video_fifo_fd);
|
||||
m_ds->video_fifo_fd = open(m_ds->video_fifo.c_str(), O_WRONLY | O_NONBLOCK);
|
||||
if (m_ds->video_fifo_fd >= 0)
|
||||
fcntl(m_ds->video_fifo_fd, F_SETPIPE_SZ, 64 * 1024 * 1024);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
m_ds->frame_seq++;
|
||||
|
||||
/* ── Write audio ────────────────────────────────────────────── */
|
||||
if (audioPacket && m_ds->audio_fd >= 0) {
|
||||
void *abytes = nullptr;
|
||||
audioPacket->GetBytes(&abytes);
|
||||
uint32_t sample_count = (uint32_t)audioPacket->GetSampleFrameCount();
|
||||
uint32_t audio_sz = sample_count * 2 /* ch */ * 2 /* s16le bytes */;
|
||||
if (abytes && audio_sz > 0) {
|
||||
/* Non-fatal if pipe is full — silence thread provides fallback */
|
||||
write_all(m_ds->audio_fd,
|
||||
static_cast<const uint8_t *>(abytes), audio_sz);
|
||||
}
|
||||
}
|
||||
|
||||
/* Emit signal JSON once per device on first frame */
|
||||
if (m_ds->frame_seq == 1) {
|
||||
fprintf(stderr,
|
||||
"{\"device\":%d,\"width\":%d,\"height\":%d,"
|
||||
"\"fps_num\":%d,\"fps_den\":%d,"
|
||||
"\"interlaced\":%s,"
|
||||
"\"pix_fmt\":\"uyvy422\","
|
||||
"\"audio_channels\":2,\"audio_rate\":48000,"
|
||||
"\"slot_id\":\"%s\"}\n",
|
||||
m_ds->device_idx,
|
||||
m_ds->width, m_ds->height,
|
||||
m_ds->fps_num, m_ds->fps_den,
|
||||
m_ds->interlaced ? "true" : "false",
|
||||
m_ds->slot_id.c_str());
|
||||
fflush(stderr);
|
||||
}
|
||||
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
private:
|
||||
DeviceState *m_ds;
|
||||
std::atomic<ULONG> m_refcount;
|
||||
|
||||
void reopen_slot() {
|
||||
#ifndef LEGACY_FIFO
|
||||
/* Serialize with frame writes and any concurrent reopen_slot() so we
|
||||
* never double-free fc_writer or write to a half-closed one. */
|
||||
pthread_mutex_lock(&m_ds->fc_lock);
|
||||
if (m_ds->fc_writer) {
|
||||
fc_writer_close(m_ds->fc_writer);
|
||||
m_ds->fc_writer = nullptr;
|
||||
}
|
||||
if (m_ds->width > 0 && m_ds->height > 0 && m_ds->fps_num > 0) {
|
||||
m_ds->fc_writer = fc_writer_open(
|
||||
m_ds->fc_url.c_str(),
|
||||
m_ds->slot_id.c_str(),
|
||||
(uint32_t)m_ds->width, (uint32_t)m_ds->height,
|
||||
(uint32_t)m_ds->fps_num, (uint32_t)m_ds->fps_den);
|
||||
if (!m_ds->fc_writer) {
|
||||
fprintf(stderr, "[decklink:%d] framecache unavailable\n",
|
||||
m_ds->device_idx);
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&m_ds->fc_lock);
|
||||
#endif
|
||||
}
|
||||
};
|
||||
|
||||
/* ── Parse comma-separated device list ────────────────────────────── */
|
||||
static std::vector<int> parse_devices(const char *csv) {
|
||||
std::vector<int> out;
|
||||
char buf[256];
|
||||
strncpy(buf, csv, sizeof buf - 1);
|
||||
char *tok = strtok(buf, ",");
|
||||
while (tok) { out.push_back(atoi(tok)); tok = strtok(nullptr, ","); }
|
||||
return out;
|
||||
}
|
||||
|
||||
/* ── Main ──────────────────────────────────────────────────────────── */
|
||||
int main(int argc, char *argv[]) {
|
||||
std::vector<int> device_indices;
|
||||
int sig_timeout = 30;
|
||||
const char *fc_url = getenv("FC_URL") ? getenv("FC_URL") : FC_URL_DEFAULT;
|
||||
const char *audio_dir = AUDIO_PIPE_DIR;
|
||||
|
||||
const char *node_id = getenv("NODE_ID");
|
||||
char hostname[256] = "local";
|
||||
if (!node_id) { gethostname(hostname, sizeof hostname); node_id = hostname; }
|
||||
|
||||
for (int i = 1; i < argc; i++) {
|
||||
if (!strcmp(argv[i], "--devices") && i+1 < argc)
|
||||
device_indices = parse_devices(argv[++i]);
|
||||
else if (!strcmp(argv[i], "--device") && i+1 < argc)
|
||||
device_indices.push_back(atoi(argv[++i]));
|
||||
else if (!strcmp(argv[i], "--fc-url") && i+1 < argc)
|
||||
fc_url = argv[++i];
|
||||
else if (!strcmp(argv[i], "--audio-pipe-dir") && i+1 < argc)
|
||||
audio_dir = argv[++i];
|
||||
else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc)
|
||||
sig_timeout = atoi(argv[++i]);
|
||||
}
|
||||
|
||||
if (device_indices.empty()) {
|
||||
fprintf(stderr, "{\"error\":\"no devices specified — use --devices 0,1 or --device 0\"}\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
signal(SIGINT, on_signal);
|
||||
signal(SIGTERM, on_signal);
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
|
||||
/* Ensure audio pipe dir exists */
|
||||
mkdir(audio_dir, 0755);
|
||||
|
||||
/* ── Enumerate DeckLink devices ─────────────────────────────────── */
|
||||
IDeckLinkIterator *iterator = CreateDeckLinkIteratorInstance();
|
||||
if (!iterator) {
|
||||
fprintf(stderr, "{\"error\":\"CreateDeckLinkIteratorInstance failed — DeckLink driver not loaded?\"}\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
std::vector<IDeckLink *> all_devices;
|
||||
IDeckLink *dl = nullptr;
|
||||
while (iterator->Next(&dl) == S_OK) {
|
||||
all_devices.push_back(dl);
|
||||
}
|
||||
iterator->Release();
|
||||
|
||||
fprintf(stderr, "[decklink] %zu device(s) detected\n", all_devices.size());
|
||||
|
||||
/* ── Set up per-device state ─────────────────────────────────────── */
|
||||
std::vector<DeviceState> states(device_indices.size());
|
||||
std::vector<CaptureCallback *> callbacks(device_indices.size(), nullptr);
|
||||
|
||||
for (size_t i = 0; i < device_indices.size(); i++) {
|
||||
int idx = device_indices[i];
|
||||
if (idx < 0 || (size_t)idx >= all_devices.size()) {
|
||||
fprintf(stderr, "{\"error\":\"device index %d out of range (%zu detected)\"}\n",
|
||||
idx, all_devices.size());
|
||||
continue;
|
||||
}
|
||||
|
||||
DeviceState &ds = states[i];
|
||||
ds.device_idx = idx;
|
||||
ds.fc_url = fc_url;
|
||||
|
||||
/* slot_id: "decklink-<node_id>-<device_idx>" */
|
||||
char sid[128];
|
||||
snprintf(sid, sizeof sid, "decklink-%s-%d", node_id, idx);
|
||||
ds.slot_id = sid;
|
||||
|
||||
/* Audio FIFO path */
|
||||
char apath[256];
|
||||
snprintf(apath, sizeof apath, "%s/audio-%d.fifo", audio_dir, idx);
|
||||
ds.audio_fifo = apath;
|
||||
mkfifo(apath, 0666); /* ignore EEXIST */
|
||||
|
||||
#ifdef LEGACY_FIFO
|
||||
/* Video FIFO (legacy path only) */
|
||||
char vpath[256];
|
||||
snprintf(vpath, sizeof vpath, "%s/video-%d.fifo", audio_dir, idx);
|
||||
ds.video_fifo = vpath;
|
||||
mkfifo(vpath, 0666);
|
||||
int vfd = open(vpath, O_WRONLY | O_NONBLOCK);
|
||||
if (vfd >= 0) fcntl(vfd, F_SETPIPE_SZ, 64 * 1024 * 1024);
|
||||
ds.video_fifo_fd = vfd;
|
||||
#endif
|
||||
|
||||
IDeckLink *decklink = all_devices[(size_t)idx];
|
||||
ds.decklink = decklink;
|
||||
|
||||
/* Get IDeckLinkInput */
|
||||
IDeckLinkInput *input = nullptr;
|
||||
if (decklink->QueryInterface(IID_IDeckLinkInput,
|
||||
reinterpret_cast<void **>(&input)) != S_OK) {
|
||||
fprintf(stderr, "[decklink:%d] QueryInterface IDeckLinkInput failed\n", idx);
|
||||
continue;
|
||||
}
|
||||
ds.input = input;
|
||||
|
||||
/* Install callback */
|
||||
CaptureCallback *cb = new CaptureCallback(&ds);
|
||||
callbacks[i] = cb;
|
||||
input->SetCallback(cb);
|
||||
|
||||
/* Enable video with format detection — actual mode set on first
|
||||
* VideoInputFormatChanged; use 1080i29.97 as a safe starting mode. */
|
||||
HRESULT hr = input->EnableVideoInput(
|
||||
bmdModeHD1080i5994,
|
||||
bmdFormat8BitYUV,
|
||||
bmdVideoInputEnableFormatDetection);
|
||||
if (hr != S_OK) {
|
||||
fprintf(stderr, "[decklink:%d] EnableVideoInput failed (0x%08x)\n", idx, (unsigned)hr);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Enable audio input — 48kHz stereo s16le */
|
||||
input->EnableAudioInput(bmdAudioSampleRate48kHz,
|
||||
bmdAudioSampleType16bitInteger, 2);
|
||||
|
||||
/* Start silence thread (keeps audio FIFO open) */
|
||||
ds.fps_num = 30000; ds.fps_den = 1001; /* default until format detected */
|
||||
pthread_create(&ds.audio_tid, nullptr, audio_silence_thread, &ds);
|
||||
|
||||
/* Start capture */
|
||||
if (input->StartStreams() != S_OK) {
|
||||
fprintf(stderr, "[decklink:%d] StartStreams failed\n", idx);
|
||||
continue;
|
||||
}
|
||||
|
||||
fprintf(stderr, "[decklink:%d] capture started, waiting for signal...\n", idx);
|
||||
}
|
||||
|
||||
/* ── Run until shutdown ─────────────────────────────────────────── */
|
||||
while (!g_stop.load()) {
|
||||
struct timespec ts{0, 100000000L}; /* 100ms */
|
||||
nanosleep(&ts, nullptr);
|
||||
}
|
||||
|
||||
fprintf(stderr, "[decklink] shutdown signal received\n");
|
||||
|
||||
/* ── Cleanup ─────────────────────────────────────────────────────── */
|
||||
for (size_t i = 0; i < states.size(); i++) {
|
||||
DeviceState &ds = states[i];
|
||||
|
||||
if (ds.input) {
|
||||
ds.input->StopStreams();
|
||||
ds.input->DisableVideoInput();
|
||||
ds.input->DisableAudioInput();
|
||||
ds.input->SetCallback(nullptr);
|
||||
}
|
||||
|
||||
ds.audio_stop.store(1);
|
||||
if (ds.audio_tid) pthread_join(ds.audio_tid, nullptr);
|
||||
|
||||
#ifndef LEGACY_FIFO
|
||||
if (ds.fc_writer) {
|
||||
fc_writer_close(ds.fc_writer);
|
||||
ds.fc_writer = nullptr;
|
||||
}
|
||||
#else
|
||||
if (ds.video_fifo_fd >= 0) close(ds.video_fifo_fd);
|
||||
#endif
|
||||
|
||||
if (ds.input) { ds.input->Release(); ds.input = nullptr; }
|
||||
if (callbacks[i]) { callbacks[i]->Release(); callbacks[i] = nullptr; }
|
||||
}
|
||||
|
||||
for (auto *d : all_devices) d->Release();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -4,8 +4,19 @@ set(CMAKE_C_STANDARD 17)
|
|||
|
||||
set(SDK_ROOT "/sdk" CACHE PATH "Path to extracted VideoMaster SDK")
|
||||
|
||||
# Legacy FIFO mode — set LEGACY_FIFO=ON to disable framecache shm writes
|
||||
# and fall back to the original named-FIFO path.
|
||||
option(LEGACY_FIFO "Use named FIFOs instead of framecache shm" OFF)
|
||||
|
||||
# Primary binary: deltacast-bridge (shared multi-port daemon)
|
||||
add_executable(deltacast-bridge main.c)
|
||||
add_executable(deltacast-bridge main.c fc_writer.c)
|
||||
|
||||
if(LEGACY_FIFO)
|
||||
target_compile_definitions(deltacast-bridge PRIVATE LEGACY_FIFO=1)
|
||||
message(STATUS "deltacast-bridge: LEGACY_FIFO mode enabled (shm disabled)")
|
||||
else()
|
||||
message(STATUS "deltacast-bridge: framecache shm mode enabled")
|
||||
endif()
|
||||
|
||||
target_include_directories(deltacast-bridge PRIVATE
|
||||
${SDK_ROOT}/include/videomaster
|
||||
|
|
@ -19,6 +30,7 @@ target_link_libraries(deltacast-bridge PRIVATE
|
|||
videomasterhd
|
||||
videomasterhd_audio
|
||||
pthread
|
||||
rt # shm_open, sem_open
|
||||
)
|
||||
|
||||
# Embed the SDK RPATH so the binary finds the .so at runtime
|
||||
|
|
|
|||
300
services/capture/deltacast-bridge/fc_writer.c
Normal file
300
services/capture/deltacast-bridge/fc_writer.c
Normal file
|
|
@ -0,0 +1,300 @@
|
|||
/**
|
||||
* fc_writer.c — Framecache slot writer for deltacast-bridge.
|
||||
*
|
||||
* Uses only POSIX + libc — no external dependencies beyond what the bridge
|
||||
* already links. HTTP calls are done with raw sockets (tiny GET/POST/DELETE)
|
||||
* to avoid pulling in libcurl.
|
||||
*/
|
||||
#include "fc_writer.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
#include <stdatomic.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <netdb.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/stat.h>
|
||||
#include <semaphore.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
|
||||
/* Re-use the shared memory layout from the framecache service */
|
||||
#define FC_MAGIC 0x46524D43u
|
||||
#define FC_VERSION 1u
|
||||
#define FC_RING_DEPTH 120u
|
||||
#define FC_HEADER_SIZE 4096u
|
||||
#define FC_FRAME_HDR_SIZE 24u
|
||||
|
||||
typedef struct {
|
||||
uint32_t magic;
|
||||
uint32_t version;
|
||||
uint32_t width;
|
||||
uint32_t height;
|
||||
uint32_t fps_num;
|
||||
uint32_t fps_den;
|
||||
uint32_t pixel_format;
|
||||
uint32_t frame_size;
|
||||
uint32_t ring_depth;
|
||||
uint32_t _reserved;
|
||||
_Atomic uint64_t write_cursor;
|
||||
_Atomic uint64_t dropped_frames;
|
||||
char source_type[32];
|
||||
char slot_id[64];
|
||||
uint8_t _pad[FC_HEADER_SIZE - 112];
|
||||
} fc_hdr_t;
|
||||
|
||||
typedef struct {
|
||||
uint64_t pts_us;
|
||||
uint64_t wall_us;
|
||||
uint32_t size;
|
||||
uint32_t _pad;
|
||||
uint8_t data[];
|
||||
} fc_frm_t;
|
||||
|
||||
struct fc_writer {
|
||||
void *base;
|
||||
size_t shm_size;
|
||||
int shm_fd;
|
||||
sem_t *sem;
|
||||
char slot_id[64];
|
||||
char fc_url[256]; /* base URL for DELETE on close */
|
||||
char shm_path[128];
|
||||
char sem_name[128];
|
||||
};
|
||||
|
||||
/* ── tiny HTTP helper ──────────────────────────────────────────────── */
|
||||
|
||||
static int http_request(const char *method,
|
||||
const char *host, int port, const char *path,
|
||||
const char *body, /* NULL for GET/DELETE */
|
||||
char *resp_buf, size_t resp_len)
|
||||
{
|
||||
struct sockaddr_in sa;
|
||||
memset(&sa, 0, sizeof sa);
|
||||
sa.sin_family = AF_INET;
|
||||
sa.sin_port = htons((uint16_t)port);
|
||||
|
||||
struct hostent *he = gethostbyname(host);
|
||||
if (!he) return -1;
|
||||
memcpy(&sa.sin_addr, he->h_addr_list[0], (size_t)he->h_length);
|
||||
|
||||
int fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (fd < 0) return -1;
|
||||
|
||||
struct timeval tv = { .tv_sec = 5 };
|
||||
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof tv);
|
||||
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof tv);
|
||||
|
||||
if (connect(fd, (struct sockaddr *)&sa, sizeof sa) < 0) {
|
||||
close(fd); return -1;
|
||||
}
|
||||
|
||||
char req[4096];
|
||||
int req_len;
|
||||
if (body) {
|
||||
req_len = snprintf(req, sizeof req,
|
||||
"%s %s HTTP/1.0\r\n"
|
||||
"Host: %s:%d\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"Content-Length: %zu\r\n"
|
||||
"Connection: close\r\n\r\n"
|
||||
"%s",
|
||||
method, path, host, port, strlen(body), body);
|
||||
} else {
|
||||
req_len = snprintf(req, sizeof req,
|
||||
"%s %s HTTP/1.0\r\n"
|
||||
"Host: %s:%d\r\n"
|
||||
"Connection: close\r\n\r\n",
|
||||
method, path, host, port);
|
||||
}
|
||||
|
||||
if (send(fd, req, (size_t)req_len, 0) < 0) { close(fd); return -1; }
|
||||
|
||||
int status = -1;
|
||||
size_t got = 0;
|
||||
char buf[8192];
|
||||
ssize_t n;
|
||||
while ((n = recv(fd, buf + got, sizeof buf - got - 1, 0)) > 0)
|
||||
got += (size_t)n;
|
||||
buf[got] = '\0';
|
||||
|
||||
/* Parse status line */
|
||||
if (sscanf(buf, "HTTP/%*s %d", &status) != 1) status = -1;
|
||||
|
||||
/* Copy body (after \r\n\r\n) into resp_buf */
|
||||
if (resp_buf && resp_len > 0) {
|
||||
const char *body_start = strstr(buf, "\r\n\r\n");
|
||||
if (body_start) {
|
||||
strncpy(resp_buf, body_start + 4, resp_len - 1);
|
||||
resp_buf[resp_len - 1] = '\0';
|
||||
}
|
||||
}
|
||||
|
||||
close(fd);
|
||||
return status;
|
||||
}
|
||||
|
||||
/* Parse "host:port" or just "host" from a URL like "http://host:port" */
|
||||
static void parse_url(const char *url, char *host, size_t hlen, int *port)
|
||||
{
|
||||
const char *p = url;
|
||||
if (strncmp(p, "http://", 7) == 0) p += 7;
|
||||
*port = 7435;
|
||||
const char *colon = strchr(p, ':');
|
||||
if (colon) {
|
||||
size_t n = (size_t)(colon - p);
|
||||
if (n >= hlen) n = hlen - 1;
|
||||
strncpy(host, p, n);
|
||||
host[n] = '\0';
|
||||
*port = atoi(colon + 1);
|
||||
} else {
|
||||
strncpy(host, p, hlen - 1);
|
||||
host[hlen - 1] = '\0';
|
||||
}
|
||||
}
|
||||
|
||||
static int json_str(const char *json, const char *key, char *out, size_t len)
|
||||
{
|
||||
char pat[128];
|
||||
snprintf(pat, sizeof pat, "\"%s\":", key);
|
||||
const char *p = strstr(json, pat);
|
||||
if (!p) return -1;
|
||||
p += strlen(pat);
|
||||
while (*p == ' ') p++;
|
||||
if (*p != '"') return -1;
|
||||
p++;
|
||||
size_t i = 0;
|
||||
while (*p && *p != '"' && i < len - 1) out[i++] = *p++;
|
||||
out[i] = '\0';
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* ── public API ────────────────────────────────────────────────────── */
|
||||
|
||||
fc_writer_t *fc_writer_open(const char *fc_url,
|
||||
const char *slot_id,
|
||||
uint32_t width, uint32_t height,
|
||||
uint32_t fps_num, uint32_t fps_den)
|
||||
{
|
||||
char host[128]; int port;
|
||||
parse_url(fc_url, host, sizeof host, &port);
|
||||
|
||||
/* POST /slots */
|
||||
char body[512];
|
||||
snprintf(body, sizeof body,
|
||||
"{\"slot_id\":\"%s\","
|
||||
"\"width\":%u,\"height\":%u,"
|
||||
"\"fps_num\":%u,\"fps_den\":%u,"
|
||||
"\"source_type\":\"deltacast\"}",
|
||||
slot_id, width, height, fps_num, fps_den);
|
||||
|
||||
char resp[1024] = {0};
|
||||
int status = http_request("POST", host, port, "/slots", body, resp, sizeof resp);
|
||||
if (status != 201) {
|
||||
fprintf(stderr, "[fc_writer:%s] POST /slots failed (HTTP %d): %s\n",
|
||||
slot_id, status, resp);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char shm_path[128] = {0}, sem_name[128] = {0};
|
||||
json_str(resp, "shm_path", shm_path, sizeof shm_path);
|
||||
json_str(resp, "sem_name", sem_name, sizeof sem_name);
|
||||
|
||||
if (!shm_path[0] || !sem_name[0]) {
|
||||
fprintf(stderr, "[fc_writer:%s] bad response (missing shm_path/sem_name)\n", slot_id);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* mmap the shm file */
|
||||
int fd = open(shm_path, O_RDWR);
|
||||
if (fd < 0) {
|
||||
fprintf(stderr, "[fc_writer:%s] open %s: %s\n", slot_id, shm_path, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
/* Read header to get frame_size */
|
||||
fc_hdr_t hdr;
|
||||
if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) {
|
||||
fprintf(stderr, "[fc_writer:%s] bad shm header\n", slot_id);
|
||||
close(fd); return NULL;
|
||||
}
|
||||
size_t total = (size_t)FC_HEADER_SIZE
|
||||
+ (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + hdr.frame_size);
|
||||
|
||||
void *base = mmap(NULL, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
||||
if (base == MAP_FAILED) {
|
||||
fprintf(stderr, "[fc_writer:%s] mmap: %s\n", slot_id, strerror(errno));
|
||||
close(fd); return NULL;
|
||||
}
|
||||
|
||||
sem_t *sem = sem_open(sem_name, 0);
|
||||
if (sem == SEM_FAILED) {
|
||||
fprintf(stderr, "[fc_writer:%s] sem_open %s: %s\n", slot_id, sem_name, strerror(errno));
|
||||
munmap(base, total); close(fd); return NULL;
|
||||
}
|
||||
|
||||
fc_writer_t *w = calloc(1, sizeof *w);
|
||||
if (!w) { sem_close(sem); munmap(base, total); close(fd); return NULL; }
|
||||
|
||||
w->base = base;
|
||||
w->shm_size = total;
|
||||
w->shm_fd = fd;
|
||||
w->sem = sem;
|
||||
strncpy(w->slot_id, slot_id, sizeof w->slot_id - 1);
|
||||
strncpy(w->fc_url, fc_url, sizeof w->fc_url - 1);
|
||||
strncpy(w->shm_path, shm_path, sizeof w->shm_path - 1);
|
||||
strncpy(w->sem_name, sem_name, sizeof w->sem_name - 1);
|
||||
|
||||
fprintf(stderr, "[fc_writer:%s] slot open (%ux%u %.2ffps shm=%s)\n",
|
||||
slot_id, width, height,
|
||||
fps_den ? (double)fps_num / fps_den : 0.0, shm_path);
|
||||
return w;
|
||||
}
|
||||
|
||||
void fc_writer_write(fc_writer_t *w,
|
||||
const uint8_t *data, uint32_t size,
|
||||
uint64_t pts_us)
|
||||
{
|
||||
fc_hdr_t *hdr = (fc_hdr_t *)w->base;
|
||||
uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed);
|
||||
uint64_t idx = cur % FC_RING_DEPTH;
|
||||
|
||||
/* Locate frame in ring */
|
||||
uint8_t *frames = (uint8_t *)w->base + FC_HEADER_SIZE;
|
||||
fc_frm_t *frame = (fc_frm_t *)(frames + idx * ((size_t)FC_FRAME_HDR_SIZE + hdr->frame_size));
|
||||
|
||||
struct timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
uint64_t wall = (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL;
|
||||
|
||||
frame->pts_us = pts_us;
|
||||
frame->wall_us = wall;
|
||||
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
|
||||
memcpy(frame->data, data, frame->size);
|
||||
|
||||
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
|
||||
sem_post(w->sem);
|
||||
}
|
||||
|
||||
void fc_writer_close(fc_writer_t *w)
|
||||
{
|
||||
if (!w) return;
|
||||
|
||||
/* DELETE /slots/:id */
|
||||
char host[128]; int port;
|
||||
parse_url(w->fc_url, host, sizeof host, &port);
|
||||
char path[192];
|
||||
snprintf(path, sizeof path, "/slots/%s", w->slot_id);
|
||||
http_request("DELETE", host, port, path, NULL, NULL, 0);
|
||||
|
||||
sem_close(w->sem);
|
||||
munmap(w->base, w->shm_size);
|
||||
close(w->shm_fd);
|
||||
fprintf(stderr, "[fc_writer:%s] slot closed\n", w->slot_id);
|
||||
free(w);
|
||||
}
|
||||
50
services/capture/deltacast-bridge/fc_writer.h
Normal file
50
services/capture/deltacast-bridge/fc_writer.h
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* fc_writer.h — Lightweight framecache slot writer for deltacast-bridge.
|
||||
*
|
||||
* Registers a slot with the framecache HTTP API on signal lock, then writes
|
||||
* raw UYVY422 frames directly into the shared memory ring buffer.
|
||||
*
|
||||
* Compile with -DLEGACY_FIFO to disable shm writes and fall back to the
|
||||
* original named-FIFO path (useful during transition / on nodes without the
|
||||
* framecache container running).
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stddef.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct fc_writer fc_writer_t;
|
||||
|
||||
/**
|
||||
* Register a slot with the framecache service and open the shm region for
|
||||
* writing. fc_url is the HTTP base URL, e.g. "http://localhost:7435".
|
||||
* slot_id must be unique per port, e.g. "deltacast-0-3" (device-port).
|
||||
*
|
||||
* Returns writer handle on success, NULL on failure (falls back to FIFO).
|
||||
*/
|
||||
fc_writer_t *fc_writer_open(const char *fc_url,
|
||||
const char *slot_id,
|
||||
uint32_t width, uint32_t height,
|
||||
uint32_t fps_num, uint32_t fps_den);
|
||||
|
||||
/**
|
||||
* Write one raw UYVY422 frame into the ring buffer.
|
||||
* Non-blocking — slow consumers are skipped, not waited on.
|
||||
* pts_us: presentation timestamp in microseconds (0 if unknown).
|
||||
*/
|
||||
void fc_writer_write(fc_writer_t *w,
|
||||
const uint8_t *data, uint32_t size,
|
||||
uint64_t pts_us);
|
||||
|
||||
/**
|
||||
* Deregister slot from framecache service and unmap shm.
|
||||
*/
|
||||
void fc_writer_close(fc_writer_t *w);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
@ -3,20 +3,32 @@
|
|||
* Deltacast VideoMaster SDI shared multi-port bridge daemon.
|
||||
*
|
||||
* Opens the board ONCE, opens RX streams for all requested ports, and
|
||||
* writes each port's video/audio to named FIFOs in a shared directory.
|
||||
* One reader thread + one audio thread per port run concurrently.
|
||||
* writes each port's video frames into a shared-memory framecache slot
|
||||
* (and audio to a named FIFO — audio-in-shm is a future roadmap item).
|
||||
*
|
||||
* Signal fan-out architecture:
|
||||
* Board → video_thread → fc_writer → /dev/shm/framecache/<slot>
|
||||
* └→ N consumers (recording, proxy,
|
||||
* HLS preview) each read with
|
||||
* their own cursor — zero-copy,
|
||||
* no bandwidth splitting.
|
||||
*
|
||||
* Usage:
|
||||
* deltacast-bridge --device <N> --ports <csv>
|
||||
* [--video-pipe-dir /dev/shm/deltacast]
|
||||
* [--audio-pipe-dir /dev/shm/deltacast]
|
||||
* [--fc-url http://framecache:7435]
|
||||
* [--signal-timeout <sec>]
|
||||
*
|
||||
* Compat alias: --port <N> treated as --ports <N> (single port).
|
||||
*
|
||||
* For each port that acquires signal, emits one JSON line to stderr:
|
||||
* {"port":N,"width":W,"height":H,"fps_num":N,"fps_den":D,
|
||||
* "pix_fmt":"uyvy422","audio_rate":48000,"audio_channels":2}
|
||||
* "pix_fmt":"uyvy422","audio_rate":48000,"audio_channels":2,
|
||||
* "slot_id":"deltacast-<device>-<port>"}
|
||||
*
|
||||
* Compile with -DLEGACY_FIFO=1 to disable shm writes and fall back to
|
||||
* the original named-FIFO path (for nodes without framecache running).
|
||||
*
|
||||
* Runs until SIGTERM/SIGINT, then closes all streams and the board.
|
||||
*/
|
||||
|
|
@ -37,10 +49,17 @@
|
|||
#include "VideoMasterHD_Sdi.h"
|
||||
#include "VideoMasterHD_Sdi_Audio.h"
|
||||
|
||||
#ifndef LEGACY_FIFO
|
||||
# include "fc_writer.h"
|
||||
#endif
|
||||
|
||||
#ifndef F_SETPIPE_SZ
|
||||
#define F_SETPIPE_SZ 1031
|
||||
#endif
|
||||
|
||||
/* Default framecache URL — overridden by FC_URL env var or --fc-url arg */
|
||||
#define FC_URL_DEFAULT "http://localhost:7435"
|
||||
|
||||
/* ── Constants ────────────────────────────────────────────────────────── */
|
||||
#define MAX_PORTS 8
|
||||
|
||||
|
|
@ -154,11 +173,16 @@ typedef struct {
|
|||
VideoInfo vi;
|
||||
char video_fifo[256];
|
||||
char audio_fifo[256];
|
||||
char slot_id[128]; /* framecache slot id: "deltacast-<dev>-<port>" */
|
||||
char fc_url[256]; /* framecache HTTP base URL */
|
||||
/* threads */
|
||||
pthread_t video_tid;
|
||||
pthread_t audio_tid;
|
||||
/* streams (owned by threads, set before thread launch) */
|
||||
HANDLE video_stream;
|
||||
#ifndef LEGACY_FIFO
|
||||
fc_writer_t *fc_writer; /* shm ring buffer writer (NULL = use FIFO fallback) */
|
||||
#endif
|
||||
} PortState;
|
||||
|
||||
/* ── Audio thread ──────────────────────────────────────────────────────
|
||||
|
|
@ -343,10 +367,67 @@ static void *audio_thread(void *arg) {
|
|||
static void *video_thread(void *arg) {
|
||||
PortState *ps = (PortState *)arg;
|
||||
|
||||
/* Outer loop: reopen the FIFO writer each time a reader connects.
|
||||
* Mirror the audio thread pattern — EPIPE means the ffmpeg sidecar for
|
||||
* this port died (session stop/restart), NOT a hardware fault. We reopen
|
||||
* and block until the next recorder start; other ports are unaffected. */
|
||||
#ifndef LEGACY_FIFO
|
||||
/* ── Framecache shm path (primary) ──────────────────────────────────
|
||||
* Write frames directly into the shared memory ring buffer.
|
||||
* Multiple consumers (growing recorder, proxy encoder, HLS preview)
|
||||
* each hold their own read cursor and read independently — no FIFO
|
||||
* splitting, no bandwidth halving.
|
||||
*
|
||||
* The fc_writer was opened by main() after signal lock. If it is
|
||||
* NULL the framecache service was unavailable and we fall through to
|
||||
* the legacy FIFO path automatically.
|
||||
*/
|
||||
if (ps->fc_writer) {
|
||||
uint64_t frame_seq = 0;
|
||||
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
|
||||
HANDLE slot = NULL;
|
||||
ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot);
|
||||
if (r == VHDERR_NOERROR) {
|
||||
BYTE *buf = NULL;
|
||||
ULONG sz = 0;
|
||||
if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) {
|
||||
ULONG expected = (ULONG)ps->vi.width * (ULONG)ps->vi.height * 2;
|
||||
if (sz != expected) {
|
||||
fprintf(stderr,
|
||||
"[video:%u] WARN: sz=%lu != expected %lu — packing mismatch, skipping\n",
|
||||
ps->port, (unsigned long)sz, (unsigned long)expected);
|
||||
VHD_UnlockSlotHandle(slot);
|
||||
continue;
|
||||
}
|
||||
/* pts: frame index × frame duration in µs */
|
||||
uint64_t pts_us = 0;
|
||||
if (ps->vi.fps_num > 0) {
|
||||
pts_us = frame_seq * 1000000ULL
|
||||
* (uint64_t)ps->vi.fps_den
|
||||
/ (uint64_t)ps->vi.fps_num;
|
||||
}
|
||||
fc_writer_write(ps->fc_writer, buf, (uint32_t)sz, pts_us);
|
||||
frame_seq++;
|
||||
}
|
||||
VHD_UnlockSlotHandle(slot);
|
||||
} else if (r != VHDERR_TIMEOUT) {
|
||||
fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping port\n",
|
||||
ps->port, (unsigned long)r);
|
||||
atomic_store(&g_port_stop[ps->port], 1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
/* fc_writer == NULL → fall through to FIFO path */
|
||||
fprintf(stderr, "[video:%u] fc_writer unavailable — falling back to FIFO\n", ps->port);
|
||||
#endif /* !LEGACY_FIFO */
|
||||
|
||||
/* ── Legacy FIFO path ────────────────────────────────────────────────
|
||||
* Kept as compile-time fallback (-DLEGACY_FIFO=1) or when the
|
||||
* framecache service is not reachable at startup.
|
||||
*
|
||||
* Outer loop: reopen the FIFO writer each time a reader connects.
|
||||
* EPIPE means the ffmpeg sidecar for this port died (session
|
||||
* stop/restart), NOT a hardware fault. Reopen and block until the
|
||||
* next recorder start; other ports are unaffected.
|
||||
*/
|
||||
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
|
||||
|
||||
int fd = open(ps->video_fifo, O_WRONLY);
|
||||
|
|
@ -359,7 +440,8 @@ static void *video_thread(void *arg) {
|
|||
{
|
||||
int pipe_sz = 64 * 1024 * 1024; /* 64 MB — ~16 frames of 1080p UYVY */
|
||||
if (fcntl(fd, F_SETPIPE_SZ, pipe_sz) < 0) {
|
||||
fprintf(stderr, "[video:%u] fcntl F_SETPIPE_SZ failed: %s\n", ps->port, strerror(errno));
|
||||
fprintf(stderr, "[video:%u] fcntl F_SETPIPE_SZ failed: %s\n",
|
||||
ps->port, strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -373,14 +455,14 @@ static void *video_thread(void *arg) {
|
|||
if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) {
|
||||
ULONG expected = (ULONG)ps->vi.width * (ULONG)ps->vi.height * 2;
|
||||
if (sz != expected) {
|
||||
fprintf(stderr, "[video:%u] WARN: slot sz=%lu != expected %lu (w=%d h=%d) -- packing mismatch; skipping frame\n",
|
||||
ps->port, sz, expected, ps->vi.width, ps->vi.height);
|
||||
fprintf(stderr,
|
||||
"[video:%u] WARN: slot sz=%lu != expected %lu (w=%d h=%d) -- packing mismatch; skipping frame\n",
|
||||
ps->port, (unsigned long)sz, (unsigned long)expected,
|
||||
ps->vi.width, ps->vi.height);
|
||||
VHD_UnlockSlotHandle(slot);
|
||||
continue;
|
||||
}
|
||||
if (write_all(fd, buf, sz) < 0) {
|
||||
/* EPIPE: sidecar died (session stop/restart).
|
||||
* Break to outer loop — reopen for next session. */
|
||||
fprintf(stderr, "[video:%u] EPIPE — waiting for next reader\n", ps->port);
|
||||
VHD_UnlockSlotHandle(slot);
|
||||
break;
|
||||
|
|
@ -389,7 +471,7 @@ static void *video_thread(void *arg) {
|
|||
VHD_UnlockSlotHandle(slot);
|
||||
} else if (r != VHDERR_TIMEOUT) {
|
||||
fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping port\n",
|
||||
ps->port, r);
|
||||
ps->port, (unsigned long)r);
|
||||
atomic_store(&g_port_stop[ps->port], 1);
|
||||
fatal = 1;
|
||||
break;
|
||||
|
|
@ -419,12 +501,15 @@ static int parse_ports(const char *csv, unsigned *ports, int max) {
|
|||
|
||||
/* ── Main ─────────────────────────────────────────────────────────────── */
|
||||
int main(int argc, char *argv[]) {
|
||||
unsigned device_id = 0;
|
||||
unsigned ports[MAX_PORTS] = {0};
|
||||
int port_count = 0;
|
||||
int sig_timeout = 30;
|
||||
const char *video_pipe_dir = "/dev/shm/deltacast";
|
||||
const char *audio_pipe_dir = "/dev/shm/deltacast";
|
||||
unsigned device_id = 0;
|
||||
unsigned ports[MAX_PORTS] = {0};
|
||||
int port_count = 0;
|
||||
int sig_timeout = 30;
|
||||
const char *video_pipe_dir = "/dev/shm/deltacast";
|
||||
const char *audio_pipe_dir = "/dev/shm/deltacast";
|
||||
/* Framecache URL: CLI arg > FC_URL env var > default */
|
||||
const char *fc_url_env = getenv("FC_URL");
|
||||
const char *fc_url = fc_url_env ? fc_url_env : FC_URL_DEFAULT;
|
||||
|
||||
for (int i = 1; i < argc; i++) {
|
||||
if (!strcmp(argv[i], "--device") && i+1 < argc) {
|
||||
|
|
@ -441,6 +526,8 @@ int main(int argc, char *argv[]) {
|
|||
audio_pipe_dir = argv[++i];
|
||||
} else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc) {
|
||||
sig_timeout = atoi(argv[++i]);
|
||||
} else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) {
|
||||
fc_url = argv[++i];
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -601,17 +688,38 @@ int main(int argc, char *argv[]) {
|
|||
"%s/video-%u.fifo", video_pipe_dir, ports[pi]);
|
||||
snprintf(p->audio_fifo, sizeof(p->audio_fifo),
|
||||
"%s/audio-%u.fifo", audio_pipe_dir, ports[pi]);
|
||||
snprintf(p->slot_id, sizeof(p->slot_id),
|
||||
"deltacast-%u-%u", device_id, ports[pi]);
|
||||
strncpy(p->fc_url, fc_url, sizeof(p->fc_url) - 1);
|
||||
|
||||
/* Create FIFOs (mkfifo; ignore EEXIST). */
|
||||
if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) {
|
||||
fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno));
|
||||
continue;
|
||||
}
|
||||
/* Create audio FIFO (always needed — audio stays in FIFO for now). */
|
||||
if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) {
|
||||
fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno));
|
||||
continue;
|
||||
}
|
||||
|
||||
#ifndef LEGACY_FIFO
|
||||
/* Open framecache slot for video frames.
|
||||
* Fall back to FIFO if framecache is unreachable. */
|
||||
p->fc_writer = fc_writer_open(p->fc_url, p->slot_id,
|
||||
(uint32_t)p->vi.width, (uint32_t)p->vi.height,
|
||||
(uint32_t)p->vi.fps_num, (uint32_t)p->vi.fps_den);
|
||||
if (!p->fc_writer) {
|
||||
fprintf(stderr, "[port:%u] framecache unavailable — creating video FIFO fallback\n",
|
||||
ports[pi]);
|
||||
if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) {
|
||||
fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
#else
|
||||
/* Legacy: always use video FIFO */
|
||||
if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) {
|
||||
fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno));
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Open video stream. */
|
||||
HANDLE vs = NULL;
|
||||
ULONG r = VHD_OpenStreamHandle(board, rx_streamtype(ports[pi]),
|
||||
|
|
@ -644,19 +752,23 @@ int main(int argc, char *argv[]) {
|
|||
continue;
|
||||
}
|
||||
|
||||
/* Emit format JSON to stderr (one line per port on signal lock). */
|
||||
/* Emit format JSON to stderr (one line per port on signal lock).
|
||||
* Includes slot_id so node-agent / capture-manager can identify
|
||||
* the framecache slot for this port. */
|
||||
fprintf(stderr,
|
||||
"{\"port\":%u,\"width\":%d,\"height\":%d,"
|
||||
"\"fps_num\":%d,\"fps_den\":%d,"
|
||||
"\"interlaced\":%s,"
|
||||
"\"pix_fmt\":\"uyvy422\","
|
||||
"\"audio_channels\":2,\"audio_rate\":48000,"
|
||||
"\"device\":%u}\n",
|
||||
"\"device\":%u,"
|
||||
"\"slot_id\":\"%s\"}\n",
|
||||
ports[pi],
|
||||
p->vi.width, p->vi.height,
|
||||
p->vi.fps_num, p->vi.fps_den,
|
||||
p->vi.interlaced ? "true" : "false",
|
||||
device_id);
|
||||
device_id,
|
||||
p->slot_id);
|
||||
fflush(stderr);
|
||||
|
||||
/* Launch audio thread (blocks until reader connects to audio FIFO). */
|
||||
|
|
@ -686,6 +798,12 @@ int main(int argc, char *argv[]) {
|
|||
VHD_StopStream(ps[i].video_stream);
|
||||
VHD_CloseStreamHandle(ps[i].video_stream);
|
||||
}
|
||||
#ifndef LEGACY_FIFO
|
||||
if (ps[i].fc_writer) {
|
||||
fc_writer_close(ps[i].fc_writer);
|
||||
ps[i].fc_writer = NULL;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
VHD_CloseBoardHandle(board);
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@ import { createUploadStream } from './s3/client.js';
|
|||
|
||||
|
||||
const S3_BUCKET = process.env.S3_BUCKET || 'wild-dragon';
|
||||
const PRE_ROLL_SECONDS = parseInt(process.env.PRE_ROLL_SECONDS || '5', 10);
|
||||
|
||||
|
||||
// Growing-files mode: writes the master to a local SMB-backed share that the
|
||||
// editor can mount, instead of streaming to S3 in real time. The promotion
|
||||
|
|
@ -521,6 +523,54 @@ class CaptureManager {
|
|||
* @private
|
||||
*/
|
||||
async _buildInputArgs({ sourceType, sourceBackend = 'blackmagic', device, port, board, sourceUrl, listen, listenPort, streamKey }) {
|
||||
// ── Network sources via framecache (primary when FC_SLOT_ID is set) ──────
|
||||
// node-agent starts net_ingest before the sidecar, which decodes the stream
|
||||
// to raw UYVY422 and registers a framecache slot. We read from that slot via
|
||||
// fc_pipe — same zero-copy path as SDI sources — enabling simultaneous
|
||||
// growing + proxy + HLS from any network source.
|
||||
if ((sourceType === 'srt' || sourceType === 'rtmp') && process.env.FC_SLOT_ID) {
|
||||
const slotId = process.env.FC_SLOT_ID;
|
||||
const fcPipeBin = process.env.FC_PIPE_BIN || 'fc_pipe';
|
||||
const WAIT_MS = 60_000; /* network sources may take longer to connect */
|
||||
|
||||
const fcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080';
|
||||
const fcFps = process.env.DELTACAST_FRAMERATE || '30000/1001';
|
||||
|
||||
console.log(`[framecache] net slot=${slotId} size=${fcSize} fps=${fcFps}`);
|
||||
|
||||
const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], {
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
});
|
||||
// Pause stdout immediately so frames don't fill the OS pipe buffer (and
|
||||
// block fc_pipe's write()) in the window between spawn here and the
|
||||
// .pipe(ffmpeg.stdin) attach later in start(). .pipe() auto-resumes.
|
||||
fcPipeProcess.stdout.pause();
|
||||
fcPipeProcess.stderr.on('data', chunk => {
|
||||
process.stderr.write(`[fc_pipe:${slotId}] ${chunk}`);
|
||||
});
|
||||
fcPipeProcess.on('error', err =>
|
||||
console.error(`[fc_pipe:${slotId}] spawn error: ${err.message}`));
|
||||
|
||||
return {
|
||||
inputArgs: [
|
||||
'-use_wallclock_as_timestamps', '1',
|
||||
'-thread_queue_size', '512',
|
||||
'-f', 'rawvideo',
|
||||
'-pix_fmt', 'uyvy422',
|
||||
'-video_size', fcSize,
|
||||
'-framerate', fcFps,
|
||||
'-i', 'pipe:0',
|
||||
],
|
||||
isNetwork: false, /* treat as raw source — no -map 0:v:0? needed */
|
||||
bridgeProcess: fcPipeProcess,
|
||||
audioFifo: null,
|
||||
interlaced: false,
|
||||
audioInputIndex: 0, /* network fc_pipe is video-only — no audio input */
|
||||
_fcPipeProcess: fcPipeProcess,
|
||||
};
|
||||
}
|
||||
|
||||
// ── Legacy direct network paths (no framecache / net_ingest not running) ──
|
||||
if (sourceType === 'srt') {
|
||||
let url;
|
||||
if (listen) {
|
||||
|
|
@ -547,17 +597,119 @@ class CaptureManager {
|
|||
return { inputArgs: ['-probesize','32M','-analyzeduration','10M','-fflags','+genpts','-i', sourceUrl], isNetwork: true };
|
||||
}
|
||||
|
||||
// Deltacast SDI via shared bridge daemon (deltacast-bridge).
|
||||
// ── Framecache path (primary for deltacast + blackmagic) ────────────────
|
||||
//
|
||||
// The bridge daemon is started by node-agent (host process, direct /dev access)
|
||||
// and writes each port's streams to named FIFOs in /dev/shm/deltacast/:
|
||||
// /dev/shm/deltacast/video-<port>.fifo
|
||||
// /dev/shm/deltacast/audio-<port>.fifo
|
||||
// When FC_SLOT_ID is set in the sidecar env (injected by node-agent from
|
||||
// the bridge's format JSON), we use the framecache shm ring buffer as the
|
||||
// video source instead of named FIFOs.
|
||||
//
|
||||
// This sidecar just reads from those FIFOs. The bridge may still be starting
|
||||
// up or waiting for signal lock, so we wait up to 30s for the FIFOs to appear
|
||||
// before handing them to ffmpeg. The bridge process is managed by node-agent;
|
||||
// bridgeProcess is null here (no per-sidecar bridge spawn).
|
||||
// fc_pipe is a small C helper that opens the framecache slot as a consumer
|
||||
// and writes raw UYVY422 frames to stdout. capture-manager spawns it and
|
||||
// pipes its stdout to ffmpeg as a rawvideo input — same pattern as the
|
||||
// existing FIFO path, but with zero-copy shm reads and independent per-
|
||||
// consumer cursors. Multiple fc_pipe instances on the same slot each get
|
||||
// their own cursor, enabling simultaneous growing + proxy + HLS from one
|
||||
// SDI input without any frame splitting.
|
||||
//
|
||||
// Audio stays on the named FIFO path (same as before — audio fan-out via
|
||||
// shm is a roadmap item).
|
||||
//
|
||||
// Falls back to the legacy FIFO path when FC_SLOT_ID is not set (e.g. on
|
||||
// nodes running an older node-agent or without framecache deployed).
|
||||
if ((sourceType === 'deltacast' || sourceType === 'sdi' || sourceType === 'blackmagic')
|
||||
&& process.env.FC_SLOT_ID) {
|
||||
|
||||
const slotId = process.env.FC_SLOT_ID;
|
||||
const fcPipeBin = process.env.FC_PIPE_BIN || 'fc_pipe';
|
||||
const WAIT_MS = 30_000;
|
||||
|
||||
// Determine audio FIFO path based on source type
|
||||
const idx = (typeof device === 'number' || /^\d+$/.test(String(device)))
|
||||
? parseInt(device, 10) : 0;
|
||||
const portIdx = (sourceType === 'deltacast')
|
||||
? ((typeof port === 'number' || /^\d+$/.test(String(port)))
|
||||
? parseInt(port, 10) : idx)
|
||||
: idx;
|
||||
|
||||
let audioFifoPath;
|
||||
if (sourceType === 'deltacast') {
|
||||
const DC_PIPE_DIR = process.env.DELTACAST_PIPE_DIR || '/dev/shm/deltacast';
|
||||
audioFifoPath = `${DC_PIPE_DIR}/audio-${portIdx}.fifo`;
|
||||
} else {
|
||||
const DL_AUDIO_DIR = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink';
|
||||
audioFifoPath = `${DL_AUDIO_DIR}/audio-${portIdx}.fifo`;
|
||||
}
|
||||
|
||||
// Wait up to 30s for the audio FIFO to exist (bridge starts asynchronously)
|
||||
const { existsSync: _exists } = await import('node:fs');
|
||||
const deadline = Date.now() + WAIT_MS;
|
||||
while (Date.now() < deadline) {
|
||||
if (_exists(audioFifoPath)) break;
|
||||
await new Promise(r => setTimeout(r, 500));
|
||||
}
|
||||
if (!_exists(audioFifoPath)) {
|
||||
throw new Error(
|
||||
`audio FIFO not ready after ${WAIT_MS / 1000}s: ${audioFifoPath} ` +
|
||||
`— is the bridge running?`
|
||||
);
|
||||
}
|
||||
|
||||
// Video dimensions and fps come from env vars injected by node-agent
|
||||
// (populated from the bridge's format JSON on signal lock).
|
||||
const fcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080';
|
||||
const fcFps = process.env.DELTACAST_FRAMERATE || '60000/1001';
|
||||
const fcInterlaced = process.env.DELTACAST_INTERLACED === '1';
|
||||
|
||||
console.log(`[framecache] slot=${slotId} size=${fcSize} fps=${fcFps} audio=${audioFifoPath}`);
|
||||
|
||||
// Spawn fc_pipe: opens the framecache slot with its own read cursor and
|
||||
// streams raw UYVY422 frames to stdout. ffmpeg reads from the pipe as
|
||||
// rawvideo input 0; audio FIFO is input 1 (same as before).
|
||||
const fcPipeProcess = spawn(fcPipeBin, [slotId, String(WAIT_MS)], {
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
});
|
||||
// Pause until piped to ffmpeg (avoids OS pipe-buffer fill stall — see
|
||||
// the network path above for the full rationale).
|
||||
fcPipeProcess.stdout.pause();
|
||||
fcPipeProcess.stderr.on('data', chunk => {
|
||||
process.stderr.write(`[fc_pipe:${slotId}] ${chunk}`);
|
||||
});
|
||||
fcPipeProcess.on('error', err => {
|
||||
console.error(`[fc_pipe:${slotId}] spawn error: ${err.message}`);
|
||||
});
|
||||
|
||||
return {
|
||||
inputArgs: [
|
||||
// fc_pipe stdout → ffmpeg rawvideo input 0 (video)
|
||||
// -use_wallclock_as_timestamps aligns video+audio by arrival time,
|
||||
// same as the legacy FIFO path.
|
||||
'-use_wallclock_as_timestamps', '1',
|
||||
'-thread_queue_size', '512',
|
||||
'-f', 'rawvideo',
|
||||
'-pix_fmt', 'uyvy422',
|
||||
'-video_size', fcSize,
|
||||
'-framerate', fcFps,
|
||||
'-i', 'pipe:0',
|
||||
// Audio FIFO → ffmpeg input 1 (unchanged from legacy path)
|
||||
'-use_wallclock_as_timestamps', '1',
|
||||
'-thread_queue_size', '512',
|
||||
'-f', 's16le',
|
||||
'-ar', '48000',
|
||||
'-ac', '2',
|
||||
'-i', audioFifoPath,
|
||||
],
|
||||
isNetwork: false,
|
||||
bridgeProcess: fcPipeProcess, /* capture-manager pipes this to ffmpeg stdin */
|
||||
audioFifo: null,
|
||||
interlaced: fcInterlaced,
|
||||
audioInputIndex: 1, /* audio FIFO is ffmpeg input 1 */
|
||||
_fcPipeProcess: fcPipeProcess, /* stored for clean stop */
|
||||
};
|
||||
}
|
||||
|
||||
// ── Legacy FIFO path for deltacast ───────────────────────────────────────
|
||||
// Used when FC_SLOT_ID is not set (framecache not deployed on this node,
|
||||
// or older node-agent). Will be removed once framecache is everywhere.
|
||||
if (sourceType === 'deltacast') {
|
||||
const idx = (typeof device === 'number' || /^\d+$/.test(String(device)))
|
||||
? parseInt(device, 10) : 0;
|
||||
|
|
@ -568,7 +720,6 @@ class CaptureManager {
|
|||
const videoFifo = `${DC_PIPE_DIR}/video-${portIdx}.fifo`;
|
||||
const audioFifo = `${DC_PIPE_DIR}/audio-${portIdx}.fifo`;
|
||||
|
||||
// Wait up to 30s for both FIFOs to exist (bridge starts asynchronously).
|
||||
const { existsSync: _exists } = await import('node:fs');
|
||||
const WAIT_MS = 30_000;
|
||||
const POLL_MS = 500;
|
||||
|
|
@ -587,41 +738,14 @@ class CaptureManager {
|
|||
`(video=${videoReady} audio=${audioReady}) — is deltacast-bridge running?`
|
||||
);
|
||||
}
|
||||
console.log(`[deltacast] port ${portIdx} FIFOs ready: ${videoFifo}, ${audioFifo}`);
|
||||
console.log(`[deltacast] port ${portIdx} FIFOs ready (legacy): ${videoFifo}, ${audioFifo}`);
|
||||
|
||||
// Resolution/fps are not known until the FIFO reader connects and starts
|
||||
// receiving frames. We use sensible defaults here; ffmpeg's rawvideo demuxer
|
||||
// will accept whatever the bridge writes once the pipe opens.
|
||||
// The bridge daemon has already detected the signal and set up streams, so
|
||||
// the FIFO content is ready-to-read as soon as the reader connects.
|
||||
//
|
||||
// NOTE: The format JSON emitted by the bridge on signal lock goes to the
|
||||
// node-agent (which launched the bridge), not to this sidecar. The sidecar
|
||||
// therefore uses fixed rawvideo params here. If per-port format introspection
|
||||
// is needed in future, the node-agent should expose the fmt JSON via an API
|
||||
// and capture-manager can query it before building inputArgs.
|
||||
//
|
||||
// For now, both video dimensions and framerate come from the recorder's
|
||||
// configured values (passed to start() as `framerate` and implicit in the
|
||||
// codec args). The rawvideo input is -video_size / -framerate from env or
|
||||
// recorder config; ffmpeg tolerates a small mismatch in rawvideo (it just
|
||||
// reads N bytes per frame based on the declared size).
|
||||
//
|
||||
// DELTACAST_VIDEO_SIZE / DELTACAST_FRAMERATE: set by node-agent in the
|
||||
// sidecar env based on the bridge's per-port format JSON, if desired.
|
||||
const dcSize = process.env.DELTACAST_VIDEO_SIZE || '1920x1080';
|
||||
const dcFps = process.env.DELTACAST_FRAMERATE || '60000/1001';
|
||||
const dcInterlaced = process.env.DELTACAST_INTERLACED === '1';
|
||||
|
||||
return {
|
||||
inputArgs: [
|
||||
// Both raw FIFOs are timestampless. ffmpeg opens input 0 (video) and
|
||||
// input 1 (audio) at slightly different moments, so PTS-zeroing each
|
||||
// stream's first byte would bake in a fixed A/V offset. Stamping each
|
||||
// input by wall-clock ARRIVAL time aligns them by real time regardless
|
||||
// of FIFO open order — the robust fix for the A/V start offset.
|
||||
// Large thread_queue_size avoids "thread message queue blocking" on
|
||||
// the high-bitrate raw video FIFO.
|
||||
'-use_wallclock_as_timestamps', '1',
|
||||
'-thread_queue_size', '512',
|
||||
'-f', 'rawvideo',
|
||||
|
|
@ -637,9 +761,10 @@ class CaptureManager {
|
|||
'-i', audioFifo,
|
||||
],
|
||||
isNetwork: false,
|
||||
bridgeProcess: null, /* bridge is managed by node-agent, not this sidecar */
|
||||
audioFifo: null, /* no per-session FIFO to clean up on stop */
|
||||
bridgeProcess: null,
|
||||
audioFifo: null,
|
||||
interlaced: dcInterlaced,
|
||||
audioInputIndex: 1, /* legacy deltacast: video FIFO=0, audio FIFO=1 */
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -786,14 +911,22 @@ OUT=${sh(outPath)}
|
|||
mkfifo "$VF" "$AF"
|
||||
PATCHPID=
|
||||
cleanup() { rm -f "$VF" "$AF"; [ -n "$PATCHPID" ] && kill "$PATCHPID" 2>/dev/null; }
|
||||
trap cleanup EXIT
|
||||
trap cleanup EXIT
|
||||
# Prime both FIFOs read-write (non-blocking) to break the open-order deadlock.
|
||||
exec 7<>"$VF" 8<>"$AF"
|
||||
# raw2bmx: close priming FDs (no stray writer) before exec so it sees real EOF.
|
||||
( exec 7>&- 8>&-; exec ${bmxLine} ) &
|
||||
# CRITICAL: redirect raw2bmx stdin from /dev/null so it does NOT inherit the
|
||||
# parent bash stdin. When the video source is fc_pipe (framecache), bash stdin
|
||||
# carries the raw video stream destined for ffmpeg's pipe:0 — if raw2bmx also
|
||||
# inherited fd 0 it would steal bytes from that stream, corrupting both the
|
||||
# growing master and the ffmpeg input.
|
||||
( exec 7>&- 8>&- 0</dev/null; exec ${bmxLine} ) &
|
||||
BMXPID=$!
|
||||
# ffmpeg: also closes priming FDs; it opens its own write ends.
|
||||
( exec 7>&- 8>&-; exec ${ffLine} ) &
|
||||
# ffmpeg: closes priming FDs and EXPLICITLY inherits bash stdin (fd 0) so that
|
||||
# 'pipe:0' reads the fc_pipe video stream Node piped into this orchestrator's
|
||||
# stdin. For non-fc_pipe sources (FIFO/device input) fd 0 is unused and this is
|
||||
# harmless.
|
||||
( exec 7>&- 8>&- 0<&0; exec ${ffLine} ) &
|
||||
FFPID=$!
|
||||
# Forward a clean stop to ffmpeg; raw2bmx then gets EOF and finalizes the footer.
|
||||
stop() { kill -INT "$FFPID" 2>/dev/null; }
|
||||
|
|
@ -920,17 +1053,32 @@ exit "$BMXRC"
|
|||
// The stop handler sets needsProxy=true so the worker picks it up.
|
||||
const proxyKey = null;
|
||||
|
||||
const startedAt = new Date().toISOString();
|
||||
|
||||
this._sessionIdForBridge = sessionId;
|
||||
const { inputArgs, isNetwork, bridgeProcess = null, audioFifo = null, interlaced = false } = await this._buildInputArgs({
|
||||
const { inputArgs, isNetwork, bridgeProcess = null, audioFifo = null, interlaced = false, audioInputIndex = 0 } = await this._buildInputArgs({
|
||||
sourceType, sourceBackend, device, port, board, sourceUrl, listen, listenPort, streamKey,
|
||||
});
|
||||
|
||||
// Audio input index: the deltacast shared bridge delivers video on input 0
|
||||
// (video FIFO) and audio on input 1 (audio FIFO), so audioMap is '1:a:0?'.
|
||||
// DeckLink SDI and network sources carry audio inside input 0.
|
||||
const audioMap = (sourceType === 'deltacast') ? '1:a:0?' : '0:a:0?';
|
||||
// ── Pre-roll: discard initial unstable frames ────────────────────────────
|
||||
if (bridgeProcess && (sourceType === 'deltacast' || sourceType === 'blackmagic' || sourceType === 'sdi')) {
|
||||
console.log(`[capture] pre-rolling: discarding ${PRE_ROLL_SECONDS}s of frames`);
|
||||
// Attach temporary drain listener.
|
||||
bridgeProcess.stdout.on('data', () => {});
|
||||
await new Promise(r => setTimeout(r, PRE_ROLL_SECONDS * 1000));
|
||||
bridgeProcess.stdout.removeAllListeners('data');
|
||||
console.log(`[capture] pre-roll complete.`);
|
||||
}
|
||||
|
||||
const startedAt = new Date().toISOString();
|
||||
const recordingStartedAt = Date.now();
|
||||
|
||||
// Audio input index is returned EXPLICITLY by _buildInputArgs (audioInputIndex)
|
||||
// rather than guessed from sourceType/FC_SLOT_ID — that guess was wrong for
|
||||
// the legacy deltacast FIFO path (which has audio at input 1 but no FC_SLOT_ID),
|
||||
// silently dropping audio. Each return path now declares its own audio input:
|
||||
// - deltacast/blackmagic via framecache: audio FIFO = input 1
|
||||
// - legacy deltacast FIFO: audio FIFO = input 1
|
||||
// - network (framecache or legacy) + DeckLink-backend SDI: audio in input 0
|
||||
const audioMap = `${audioInputIndex}:a:0?`;
|
||||
|
||||
// Non-growing master: ffmpeg muxes the finalized MOV directly. Growing
|
||||
// master: raw2bmx muxes the OP1a from elementary FIFOs (handled below via
|
||||
|
|
@ -945,7 +1093,9 @@ exit "$BMXRC"
|
|||
|
||||
if (hiresCodecArgs) console.log('[capture] hires ffmpeg args:', hiresCodecArgs.join(' '));
|
||||
|
||||
const isInterlacedSource = sourceType === 'sdi' || (sourceType === 'deltacast' && interlaced);
|
||||
const isInterlacedSource = sourceType === 'sdi'
|
||||
|| (sourceType === 'deltacast' && interlaced)
|
||||
|| ((sourceType === 'blackmagic') && interlaced);
|
||||
const sdiFilterArgs = isInterlacedSource ? ['-vf', 'yadif=mode=1:deint=1'] : [];
|
||||
|
||||
// Master output destination (NON-growing path only).
|
||||
|
|
@ -971,14 +1121,17 @@ exit "$BMXRC"
|
|||
catch (err) { console.error('[capture] could not create temp master dir:', err.message); }
|
||||
}
|
||||
const hiresOutput = localMasterPath;
|
||||
// Deltacast reads from FIFOs (no stdin pipe needed). DeckLink pipes stdout.
|
||||
const hiresStdio = ['ignore', 'ignore', 'pipe'];
|
||||
// When bridgeProcess is an fc_pipe process its stdout is piped to ffmpeg
|
||||
// stdin (pipe:0 input). For all other sources stdin is ignored.
|
||||
const hiresStdio = bridgeProcess ? ['pipe', 'ignore', 'pipe'] : ['ignore', 'ignore', 'pipe'];
|
||||
|
||||
// For SDI we cannot open the DeckLink device a second time for a preview
|
||||
// tee, so the live HLS preview is produced as a SECOND OUTPUT of the hires
|
||||
// ffmpeg: one decklink read -> yadif -> split -> [ProRes/S3] + [H.264/HLS].
|
||||
// For SDI/framecache sources (including network via framecache) the live
|
||||
// HLS preview is a SECOND OUTPUT of the hires ffmpeg.
|
||||
const _viaFcPipeHls = !!process.env.FC_SLOT_ID;
|
||||
let sdiHlsDir = null;
|
||||
if ((sourceType === 'sdi' || sourceType === 'deltacast') && this._assetIdForHls) {
|
||||
if ((sourceType === 'sdi' || sourceType === 'deltacast' || sourceType === 'blackmagic'
|
||||
|| (_viaFcPipeHls && (sourceType === 'srt' || sourceType === 'rtmp')))
|
||||
&& this._assetIdForHls) {
|
||||
const fsMod = await import('node:fs');
|
||||
sdiHlsDir = '/live/' + this._assetIdForHls;
|
||||
try { fsMod.mkdirSync(sdiHlsDir, { recursive: true }); } catch (_) {}
|
||||
|
|
@ -1008,43 +1161,69 @@ exit "$BMXRC"
|
|||
interlaced: isInterlacedSource,
|
||||
});
|
||||
console.log('[capture] growing master via raw2bmx; orchestrator script length=' + orchArgs[1].length);
|
||||
hiresProcess = spawn('bash', orchArgs, { stdio: ['ignore', 'ignore', 'pipe'], detached: true });
|
||||
hiresProcess = spawn('bash', orchArgs, {
|
||||
stdio: bridgeProcess ? ['pipe', 'ignore', 'pipe'] : ['ignore', 'ignore', 'pipe'],
|
||||
detached: true,
|
||||
});
|
||||
|
||||
// When video comes from fc_pipe, pipe its stdout to the bash orchestrator
|
||||
// stdin (which the orchestrator forwards to the ffmpeg rawvideo input).
|
||||
if (bridgeProcess && bridgeProcess.stdout && hiresProcess.stdin) {
|
||||
bridgeProcess.stdout.pipe(hiresProcess.stdin);
|
||||
bridgeProcess.on('exit', () => {
|
||||
try { if (hiresProcess.stdin) hiresProcess.stdin.end(); } catch (_) {}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// ── Finalized (non-growing) master: ffmpeg muxes the MOV directly ──
|
||||
let hiresArgs;
|
||||
if ((sourceType === 'sdi' || sourceType === 'deltacast') && this._assetIdForHls) {
|
||||
const isSdiLike = sourceType === 'sdi' || sourceType === 'deltacast' || sourceType === 'blackmagic';
|
||||
// Network via framecache (fc_pipe) also produces its master + HLS as a
|
||||
// single split ffmpeg, exactly like SDI — it reads pipe:0, not a URL.
|
||||
const isNetFcPipe = !!process.env.FC_SLOT_ID && (sourceType === 'srt' || sourceType === 'rtmp');
|
||||
if ((isSdiLike || isNetFcPipe) && this._assetIdForHls) {
|
||||
const filterStr = isInterlacedSource
|
||||
? '[0:v]yadif=mode=1:deint=1,split=2[vhi][vlo]'
|
||||
: '[0:v]split=2[vhi][vlo]';
|
||||
// Network fc_pipe is video-only (no audio input) — omit audio maps so
|
||||
// ffmpeg doesn't fail trying to map a nonexistent audio stream.
|
||||
const hasAudio = audioInputIndex >= 0 && !isNetFcPipe;
|
||||
const masterAudioMap = hasAudio ? ['-map', audioMap] : [];
|
||||
const masterAudioFilter = hasAudio
|
||||
? ['-af', 'aresample=async=1:min_hard_comp=0.100000:first_pts=0'] : [];
|
||||
const hlsAudioMap = hasAudio ? ['-map', audioMap] : [];
|
||||
const hlsAudioCodec = hasAudio
|
||||
? ['-c:a', 'aac', '-b:a', '128k', '-ar', '44100'] : [];
|
||||
hiresArgs = [
|
||||
...inputArgs,
|
||||
'-filter_complex', filterStr,
|
||||
// Output 0 — ProRes/MOV master (local temp, uploaded to S3 on stop)
|
||||
'-map', '[vhi]', '-map', audioMap,
|
||||
// Keep raw audio aligned to the video clock. The two raw FIFOs carry
|
||||
// no timestamps; -af aresample=async lets ffmpeg stretch/squeeze audio
|
||||
// to correct any tiny rate mismatch so A/V never drifts over a long
|
||||
// take. Applies to this output's mapped audio stream.
|
||||
'-af', 'aresample=async=1:min_hard_comp=0.100000:first_pts=0',
|
||||
// Output 0 — master (local temp, uploaded to S3 on stop)
|
||||
'-map', '[vhi]', ...masterAudioMap,
|
||||
...masterAudioFilter,
|
||||
...hiresCodecArgs,
|
||||
hiresOutput,
|
||||
// Output 1 — low-latency H.264 HLS preview for the UI monitor.
|
||||
// GPU-encoded (h264_nvenc) when the GPU is attached to this sidecar,
|
||||
// otherwise libx264 (issue #164). GOP is pinned to one IDR per HLS
|
||||
// segment so segments start on keyframes (avoids black/flashing).
|
||||
'-map', '[vlo]', '-map', audioMap,
|
||||
// Output 1 — low-latency H.264 HLS preview for the UI monitor
|
||||
'-map', '[vlo]', ...hlsAudioMap,
|
||||
...buildHlsVideoArgs(videoCodec, framerate),
|
||||
'-c:a', 'aac', '-b:a', '128k', '-ar', '44100',
|
||||
...hlsAudioCodec,
|
||||
'-f', 'hls', '-hls_time', '2', '-hls_list_size', '15',
|
||||
'-hls_flags', 'delete_segments+append_list+omit_endlist',
|
||||
'-hls_segment_filename', sdiHlsDir + '/seg-%05d.ts',
|
||||
sdiHlsDir + '/index.m3u8',
|
||||
];
|
||||
console.log('[HLS] SDI preview as 2nd output -> ' + sdiHlsDir);
|
||||
console.log('[HLS] SDI/framecache preview as 2nd output -> ' + sdiHlsDir);
|
||||
} else {
|
||||
hiresArgs = [ ...inputArgs, ...sdiFilterArgs, ...hiresCodecArgs, hiresOutput ];
|
||||
}
|
||||
hiresProcess = spawn('ffmpeg', hiresArgs, { stdio: hiresStdio });
|
||||
|
||||
// When video comes from fc_pipe, pipe its stdout to ffmpeg stdin.
|
||||
if (bridgeProcess && bridgeProcess.stdout && hiresProcess.stdin) {
|
||||
bridgeProcess.stdout.pipe(hiresProcess.stdin);
|
||||
bridgeProcess.on('exit', () => {
|
||||
try { if (hiresProcess.stdin) hiresProcess.stdin.end(); } catch (_) {}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Growing-files: nothing to upload here (promotion worker handles S3).
|
||||
|
|
@ -1055,10 +1234,13 @@ exit "$BMXRC"
|
|||
const processes = { hires: hiresProcess };
|
||||
const uploads = { hires: growingPath ? Promise.resolve({ growingPath }) : null };
|
||||
|
||||
// ── HLS tee for network sources (live preview in the UI) ──────────
|
||||
// ── HLS tee for legacy network sources (live preview in the UI) ──────────
|
||||
// When network sources come via framecache (FC_SLOT_ID set), HLS preview is
|
||||
// handled as a 2nd ffmpeg output in the hires process above (sdiHlsDir path).
|
||||
// This tee is only for the legacy direct-URL network path (no framecache).
|
||||
let hlsProcess = null;
|
||||
let hlsDir = null;
|
||||
if (isNetwork && this._assetIdForHls) {
|
||||
if (isNetwork && !process.env.FC_SLOT_ID && this._assetIdForHls) {
|
||||
try {
|
||||
const fs = await import('node:fs');
|
||||
hlsDir = '/live/' + this._assetIdForHls;
|
||||
|
|
@ -1066,7 +1248,6 @@ exit "$BMXRC"
|
|||
const hlsArgs = [
|
||||
...inputArgs,
|
||||
'-map', '0:v:0?', '-map', '0:a:0?',
|
||||
// GPU-gated preview encode, same as the SDI 2nd-output path (#164).
|
||||
...buildHlsVideoArgs(videoCodec, framerate),
|
||||
'-c:a', 'aac', '-b:a', '128k', '-ar', '44100',
|
||||
'-f', 'hls', '-hls_time', '2', '-hls_list_size', '15',
|
||||
|
|
@ -1078,7 +1259,7 @@ exit "$BMXRC"
|
|||
hlsProcess.stderr.on('data', (d) => { console.error('[HLS] ' + d); });
|
||||
hlsProcess.on('exit', (c) => console.log('[HLS] exited ' + c));
|
||||
processes.hls = hlsProcess;
|
||||
console.log('[HLS] tee started -> ' + hlsDir);
|
||||
console.log('[HLS] legacy-net tee started -> ' + hlsDir);
|
||||
} catch (err) {
|
||||
console.error('[HLS] tee failed:', err.message);
|
||||
}
|
||||
|
|
@ -1091,12 +1272,13 @@ exit "$BMXRC"
|
|||
if (m) {
|
||||
this.state.framesReceived = parseInt(m[1], 10);
|
||||
this.state.lastFrameAt = new Date().toISOString();
|
||||
if (this.state.recordingStartedAt) {
|
||||
const elapsedSec = (Date.now() - this.state.recordingStartedAt) / 1000;
|
||||
if (elapsedSec > 0) {
|
||||
this.state.currentFps = Math.round((this.state.framesReceived / elapsedSec) * 100) / 100;
|
||||
}
|
||||
}
|
||||
// Use ffmpeg's own rolling fps value — it is a short-window average
|
||||
// computed by ffmpeg itself and correctly reflects the true encode rate.
|
||||
// The previous frame/elapsed cumulative calculation dragged low during
|
||||
// startup and was permanently wrong for growing-path (bash orchestrator
|
||||
// stderr doesn't emit frame= lines until ffmpeg flushes them).
|
||||
const ffmpegFps = parseFloat(m[2]);
|
||||
if (ffmpegFps > 0) this.state.currentFps = Math.round(ffmpegFps * 100) / 100;
|
||||
}
|
||||
if (/Connection refused|No route to host|Connection failed|Input\/output error|Server returned|404 Not Found|Connection timed out/i.test(text)) {
|
||||
this.state.lastError = text.trim().slice(0, 240);
|
||||
|
|
@ -1130,6 +1312,7 @@ exit "$BMXRC"
|
|||
audioFifo,
|
||||
startedAt,
|
||||
duration: 0,
|
||||
_fcPipeProcess: bridgeProcess || null, /* fc_pipe process, if framecache path used */
|
||||
uploads,
|
||||
codecs: {
|
||||
videoCodec, videoBitrate, framerate,
|
||||
|
|
@ -1270,6 +1453,11 @@ exit "$BMXRC"
|
|||
if (processes.hires) processes.hires.kill('SIGINT');
|
||||
if (processes.proxy) processes.proxy.kill('SIGINT');
|
||||
if (processes.hls) { try { processes.hls.kill('SIGINT'); } catch (_) {} }
|
||||
// fc_pipe process (framecache consumer) — stop after ffmpeg so it sees EOF
|
||||
// naturally via EPIPE when ffmpeg stdin closes. SIGTERM as belt-and-suspenders.
|
||||
if (currentSession._fcPipeProcess) {
|
||||
try { currentSession._fcPipeProcess.kill('SIGTERM'); } catch (_) {}
|
||||
}
|
||||
/* processes.bridge: removed — bridge is managed by node-agent, not per-session */
|
||||
|
||||
// Wait for the master writer to finalize before we read/upload the file.
|
||||
|
|
|
|||
63
services/framecache/CMakeLists.txt
Normal file
63
services/framecache/CMakeLists.txt
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
cmake_minimum_required(VERSION 3.16)
|
||||
project(framecache C)
|
||||
|
||||
set(CMAKE_C_STANDARD 11)
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra -O2")
|
||||
|
||||
# ── libmicrohttpd ────────────────────────────────────────────────────
|
||||
find_library(MHD_LIB microhttpd REQUIRED)
|
||||
find_path(MHD_INCLUDE microhttpd.h REQUIRED)
|
||||
include_directories(${MHD_INCLUDE})
|
||||
|
||||
# ── framecache server ────────────────────────────────────────────────
|
||||
add_executable(framecache
|
||||
src/framecache.c
|
||||
src/slot.c
|
||||
src/registry.c
|
||||
)
|
||||
target_link_libraries(framecache ${MHD_LIB} rt pthread)
|
||||
|
||||
# ── fc_client static library (used by bridges + test) ───────────────
|
||||
add_library(fc_client STATIC
|
||||
client/fc_client.c
|
||||
src/slot.c # client needs fc_slot_shm_size / fc_frame_at
|
||||
)
|
||||
target_include_directories(fc_client PUBLIC src client)
|
||||
target_link_libraries(fc_client rt pthread)
|
||||
|
||||
# ── net_ingest — network source (RTMP/SRT) → framecache slot ─────────
|
||||
# Spawned by node-agent when a network recorder starts.
|
||||
# Decodes the network stream to raw UYVY422 via ffmpeg and writes frames
|
||||
# into a framecache slot, giving capture-manager the same fc_pipe consumer
|
||||
# interface as SDI sources.
|
||||
add_executable(net_ingest
|
||||
src/net_ingest.c
|
||||
src/slot.c
|
||||
)
|
||||
target_include_directories(net_ingest PRIVATE src)
|
||||
target_link_libraries(net_ingest rt pthread)
|
||||
install(TARGETS net_ingest DESTINATION bin)
|
||||
|
||||
# ── fc_pipe — slot → stdout adapter (used by capture-manager.js) ─────
|
||||
# Spawned by capture-manager as a child process; writes raw UYVY422
|
||||
# frames from a framecache slot to stdout so ffmpeg reads them as
|
||||
# rawvideo pipe input. Multiple fc_pipe instances on the same slot
|
||||
# each get an independent cursor — zero-copy fan-out.
|
||||
add_executable(fc_pipe
|
||||
client/fc_pipe.c
|
||||
)
|
||||
target_link_libraries(fc_pipe fc_client)
|
||||
target_include_directories(fc_pipe PRIVATE src client)
|
||||
|
||||
# ── test consumer (dev utility) ──────────────────────────────────────
|
||||
if(BUILD_TESTS)
|
||||
add_executable(fc_test_consumer
|
||||
client/fc_test_consumer.c
|
||||
)
|
||||
target_link_libraries(fc_test_consumer fc_client)
|
||||
target_include_directories(fc_test_consumer PRIVATE src client)
|
||||
endif()
|
||||
|
||||
install(TARGETS framecache fc_pipe DESTINATION bin)
|
||||
install(FILES client/fc_client.h src/slot.h DESTINATION include/framecache)
|
||||
install(TARGETS fc_client DESTINATION lib)
|
||||
31
services/framecache/Dockerfile
Normal file
31
services/framecache/Dockerfile
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
# ── Build stage ─────────────────────────────────────────────────────
|
||||
FROM debian:bookworm AS builder
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
build-essential cmake libmicrohttpd-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY . /src
|
||||
RUN cmake -S /src -B /build \
|
||||
-DCMAKE_BUILD_TYPE=Release \
|
||||
&& cmake --build /build -j"$(nproc)"
|
||||
|
||||
# ── Runtime stage ────────────────────────────────────────────────────
|
||||
FROM debian:bookworm-slim
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
libmicrohttpd12 wget \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY --from=builder /build/framecache /usr/local/bin/framecache
|
||||
COPY --from=builder /build/net_ingest /usr/local/bin/net_ingest
|
||||
|
||||
# /dev/shm/framecache is created at runtime (tmpfs)
|
||||
RUN mkdir -p /dev/shm/framecache
|
||||
|
||||
EXPOSE 7435
|
||||
|
||||
HEALTHCHECK --interval=10s --timeout=3s --start-period=5s \
|
||||
CMD wget -qO- http://localhost:7435/health || exit 1
|
||||
|
||||
CMD ["/usr/local/bin/framecache"]
|
||||
210
services/framecache/client/fc_client.c
Normal file
210
services/framecache/client/fc_client.c
Normal file
|
|
@ -0,0 +1,210 @@
|
|||
/**
|
||||
* fc_client.c — Consumer-side framecache client implementation.
|
||||
*/
|
||||
#include "fc_client.h"
|
||||
#include "../src/slot.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/stat.h>
|
||||
#include <semaphore.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#define SHM_DIR "/dev/shm/framecache"
|
||||
#define SEM_PREFIX "/framecache-"
|
||||
#define SEM_SUFFIX "-write"
|
||||
|
||||
struct fc_consumer {
|
||||
int shm_fd;
|
||||
void *base;
|
||||
size_t shm_size;
|
||||
sem_t *sem;
|
||||
uint64_t read_cursor; /* consumer's own position in the ring */
|
||||
uint64_t local_dropped; /* frames skipped by this consumer */
|
||||
uint8_t *copy_buf; /* consumer-owned frame copy buffer (frame_size bytes) */
|
||||
uint32_t frame_size; /* cached from header */
|
||||
char slot_id[FC_MAX_SLOT_ID];
|
||||
};
|
||||
|
||||
static uint64_t now_us(void)
|
||||
{
|
||||
struct timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
return (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL;
|
||||
}
|
||||
|
||||
fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
|
||||
{
|
||||
char shm_path[128], sem_name[128];
|
||||
snprintf(shm_path, sizeof shm_path, "%s/%s", SHM_DIR, slot_id);
|
||||
snprintf(sem_name, sizeof sem_name, "%s%s%s", SEM_PREFIX, slot_id, SEM_SUFFIX);
|
||||
|
||||
uint64_t deadline = now_us() + wait_ms * 1000ULL;
|
||||
int fd = -1;
|
||||
while (1) {
|
||||
fd = open(shm_path, O_RDONLY);
|
||||
if (fd >= 0) break;
|
||||
if (now_us() >= deadline) return NULL;
|
||||
struct timespec ts = { .tv_nsec = 100000000 }; /* 100ms */
|
||||
nanosleep(&ts, NULL);
|
||||
}
|
||||
|
||||
/* Read header to get frame_size */
|
||||
fc_header_t hdr;
|
||||
if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) {
|
||||
close(fd); return NULL;
|
||||
}
|
||||
size_t total = fc_slot_shm_size(hdr.frame_size);
|
||||
|
||||
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
|
||||
if (base == MAP_FAILED) { close(fd); return NULL; }
|
||||
|
||||
sem_t *sem = sem_open(sem_name, 0);
|
||||
if (sem == SEM_FAILED) { munmap(base, total); close(fd); return NULL; }
|
||||
|
||||
fc_consumer_t *c = calloc(1, sizeof *c);
|
||||
if (!c) { sem_close(sem); munmap(base, total); close(fd); return NULL; }
|
||||
|
||||
/* Consumer-owned copy buffer — fc_consumer_read copies the frame here and
|
||||
* re-validates the cursor afterward, so a writer lapping a slow consumer
|
||||
* cannot corrupt the frame the caller is using. */
|
||||
c->copy_buf = malloc(hdr.frame_size);
|
||||
if (!c->copy_buf) {
|
||||
free(c); sem_close(sem); munmap(base, total); close(fd); return NULL;
|
||||
}
|
||||
|
||||
c->shm_fd = fd;
|
||||
c->base = base;
|
||||
c->shm_size = total;
|
||||
c->sem = sem;
|
||||
c->frame_size = hdr.frame_size;
|
||||
/* Start reading from the current write position so we don't replay old frames */
|
||||
c->read_cursor = atomic_load_explicit(
|
||||
&((fc_header_t *)base)->write_cursor, memory_order_acquire);
|
||||
c->local_dropped = 0;
|
||||
strncpy(c->slot_id, slot_id, FC_MAX_SLOT_ID - 1);
|
||||
return c;
|
||||
}
|
||||
|
||||
int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms)
|
||||
{
|
||||
fc_header_t *hdr = (fc_header_t *)c->base;
|
||||
int dropped = 0; /* set when this call skipped one or more frames */
|
||||
|
||||
/* ── Wait for new data ──────────────────────────────────────────────
|
||||
* The semaphore is used ONLY as an edge-wakeup hint, never as a frame
|
||||
* counter. The writer posts once per frame, but a consumer that skips
|
||||
* frames (lap) or reads less often than the writer posts would otherwise
|
||||
* leave the count climbing unbounded — causing sem_timedwait to never
|
||||
* block (100% CPU busy-spin) and eventually EOVERFLOW. So:
|
||||
* - cursor-diff (write_cursor - read_cursor) is the SOURCE OF TRUTH for
|
||||
* whether a frame is available.
|
||||
* - we drain the semaphore to zero (sem_trywait loop) so the count never
|
||||
* accumulates.
|
||||
* - if no frame is available we block on ONE sem_timedwait for wakeup. */
|
||||
for (;;) {
|
||||
uint64_t write_cur = atomic_load_explicit(&hdr->write_cursor,
|
||||
memory_order_acquire);
|
||||
|
||||
/* Lap detection: if the writer is more than ring_depth ahead, the
|
||||
* oldest unread frames have been overwritten — skip to the oldest
|
||||
* still-valid frame. */
|
||||
if (write_cur > c->read_cursor + hdr->ring_depth) {
|
||||
uint64_t skipped = write_cur - c->read_cursor - hdr->ring_depth;
|
||||
c->read_cursor = write_cur - hdr->ring_depth;
|
||||
c->local_dropped += skipped;
|
||||
/* NOTE: do NOT write hdr->dropped_frames here — the consumer maps
|
||||
* the shm PROT_READ (read-only), so an atomic write would SIGSEGV.
|
||||
* Per-consumer drops are tracked in c->local_dropped and exposed
|
||||
* via fc_consumer_dropped(). The writer owns hdr->dropped_frames. */
|
||||
dropped = 1;
|
||||
}
|
||||
|
||||
if (c->read_cursor < write_cur) {
|
||||
/* A frame is available — drain the semaphore so its count never
|
||||
* accumulates, then read+copy below. */
|
||||
while (sem_trywait(c->sem) == 0) { /* drain */ }
|
||||
break;
|
||||
}
|
||||
|
||||
/* No frame yet — drain stale posts, then block for a wakeup. */
|
||||
while (sem_trywait(c->sem) == 0) { /* drain */ }
|
||||
|
||||
struct timespec abs_ts;
|
||||
clock_gettime(CLOCK_REALTIME, &abs_ts);
|
||||
abs_ts.tv_sec += (time_t)(timeout_ms / 1000);
|
||||
abs_ts.tv_nsec += (long)((timeout_ms % 1000) * 1000000L);
|
||||
if (abs_ts.tv_nsec >= 1000000000L) { abs_ts.tv_sec++; abs_ts.tv_nsec -= 1000000000L; }
|
||||
|
||||
int w = sem_timedwait(c->sem, &abs_ts);
|
||||
if (w != 0) {
|
||||
if (errno == ETIMEDOUT) {
|
||||
/* Re-check the cursor once more before giving up — the writer
|
||||
* may have advanced between our check and the wait. */
|
||||
uint64_t wc2 = atomic_load_explicit(&hdr->write_cursor,
|
||||
memory_order_acquire);
|
||||
if (c->read_cursor < wc2) continue;
|
||||
return FC_TIMEOUT;
|
||||
}
|
||||
if (errno == EINTR) continue;
|
||||
return FC_ERROR;
|
||||
}
|
||||
/* Woken — loop to re-evaluate cursor-diff. */
|
||||
}
|
||||
|
||||
/* ── Copy the frame into the consumer-owned buffer ──────────────────── */
|
||||
fc_frame_t *frame = fc_frame_at(c->base, hdr->frame_size, c->read_cursor);
|
||||
uint32_t fsz = frame->size;
|
||||
if (fsz > hdr->frame_size) fsz = hdr->frame_size;
|
||||
uint64_t pts = frame->pts_us;
|
||||
uint64_t wall = frame->wall_us;
|
||||
memcpy(c->copy_buf, frame->data, fsz);
|
||||
|
||||
/* ── Re-validate AFTER the copy ─────────────────────────────────────
|
||||
* If the writer lapped us during the copy (overwrote this slot), the copy
|
||||
* may be torn — discard it and signal DROPPED so the caller reads again. */
|
||||
uint64_t write_after = atomic_load_explicit(&hdr->write_cursor,
|
||||
memory_order_acquire);
|
||||
if (write_after > c->read_cursor + hdr->ring_depth) {
|
||||
uint64_t skipped = write_after - c->read_cursor - hdr->ring_depth;
|
||||
c->read_cursor = write_after - hdr->ring_depth;
|
||||
c->local_dropped += skipped;
|
||||
return FC_LAPPED; /* copy torn — ref not valid, caller reads again */
|
||||
}
|
||||
|
||||
/* Copy is valid. */
|
||||
ref->data = c->copy_buf;
|
||||
ref->size = fsz;
|
||||
ref->pts_us = pts;
|
||||
ref->wall_us = wall;
|
||||
ref->seq = c->read_cursor;
|
||||
|
||||
c->read_cursor++;
|
||||
return dropped ? FC_DROPPED : FC_OK;
|
||||
}
|
||||
|
||||
void fc_consumer_close(fc_consumer_t *c)
|
||||
{
|
||||
if (!c) return;
|
||||
if (c->copy_buf) free(c->copy_buf);
|
||||
sem_close(c->sem);
|
||||
munmap(c->base, c->shm_size);
|
||||
close(c->shm_fd);
|
||||
free(c);
|
||||
}
|
||||
|
||||
uint64_t fc_consumer_write_cursor(fc_consumer_t *c)
|
||||
{
|
||||
fc_header_t *hdr = (fc_header_t *)c->base;
|
||||
return atomic_load(&hdr->write_cursor);
|
||||
}
|
||||
|
||||
uint64_t fc_consumer_dropped(fc_consumer_t *c)
|
||||
{
|
||||
return c->local_dropped;
|
||||
}
|
||||
82
services/framecache/client/fc_client.h
Normal file
82
services/framecache/client/fc_client.h
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
/**
|
||||
* fc_client.h — Consumer-side framecache client library.
|
||||
*
|
||||
* Usage:
|
||||
* fc_consumer_t *c = fc_consumer_open("deltacast-zampp3-0");
|
||||
* fc_frame_ref_t ref;
|
||||
* while (fc_consumer_read(c, &ref, 2000) == FC_OK) {
|
||||
* // ref.data valid until next fc_consumer_read call
|
||||
* process_frame(ref.data, ref.size, ref.pts_us);
|
||||
* }
|
||||
* fc_consumer_close(c);
|
||||
*
|
||||
* Each consumer tracks its own read_cursor — multiple consumers on the same
|
||||
* slot are fully independent and never block each other or the writer.
|
||||
*
|
||||
* If a consumer falls more than ring_depth frames behind the writer its cursor
|
||||
* is snapped to the latest frame and FC_DROPPED is returned once.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stddef.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/* Return codes */
|
||||
#define FC_OK 0 /* valid frame returned in ref */
|
||||
#define FC_TIMEOUT 1 /* no new frame within timeout_ms — ref not populated */
|
||||
#define FC_DROPPED 2 /* valid frame returned in ref, BUT one or more older
|
||||
* frames were skipped first (consumer fell behind).
|
||||
* ref IS populated — caller should USE the frame. */
|
||||
#define FC_LAPPED 3 /* the copy was overwritten mid-read (writer lapped the
|
||||
* consumer during memcpy). ref NOT populated — caller
|
||||
* should call fc_consumer_read again. */
|
||||
#define FC_ERROR -1
|
||||
|
||||
typedef struct fc_consumer fc_consumer_t;
|
||||
|
||||
typedef struct {
|
||||
const uint8_t *data; /* pointer to a CONSUMER-OWNED copy of the frame —
|
||||
* stable until the next fc_consumer_read() call.
|
||||
* (Previously a zero-copy pointer into the shm ring,
|
||||
* which the writer could overwrite mid-use when it
|
||||
* lapped a slow consumer. We now copy into the
|
||||
* consumer's own buffer and re-validate the cursor
|
||||
* AFTER the copy, so a lapped frame is discarded
|
||||
* rather than streamed corrupt.) */
|
||||
uint32_t size; /* bytes */
|
||||
uint64_t pts_us; /* presentation timestamp (microseconds) */
|
||||
uint64_t wall_us; /* wall clock at write time (microseconds) */
|
||||
uint64_t seq; /* write_cursor value for this frame */
|
||||
} fc_frame_ref_t;
|
||||
|
||||
/**
|
||||
* Open a consumer handle for the named slot.
|
||||
* Polls the slot shm file until it appears (up to wait_ms milliseconds).
|
||||
* Returns NULL if slot not found within wait_ms or on error.
|
||||
*/
|
||||
fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms);
|
||||
|
||||
/**
|
||||
* Read the next frame.
|
||||
* Blocks up to timeout_ms waiting for a new frame (via semaphore).
|
||||
* Returns FC_OK, FC_TIMEOUT, FC_DROPPED, or FC_ERROR.
|
||||
* On FC_OK or FC_DROPPED the ref fields are populated.
|
||||
*/
|
||||
int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms);
|
||||
|
||||
/** Close the consumer handle. Does NOT destroy the slot. */
|
||||
void fc_consumer_close(fc_consumer_t *c);
|
||||
|
||||
/** Current write_cursor of the slot (approximate — no lock). */
|
||||
uint64_t fc_consumer_write_cursor(fc_consumer_t *c);
|
||||
|
||||
/** Frames dropped by this consumer since open. */
|
||||
uint64_t fc_consumer_dropped(fc_consumer_t *c);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
133
services/framecache/client/fc_pipe.c
Normal file
133
services/framecache/client/fc_pipe.c
Normal file
|
|
@ -0,0 +1,133 @@
|
|||
/**
|
||||
* fc_pipe.c — Framecache slot → stdout pipe adapter.
|
||||
*
|
||||
* Opens a framecache slot as a consumer and writes raw video frames to
|
||||
* stdout in a continuous stream. capture-manager.js spawns this process
|
||||
* and feeds its stdout to ffmpeg as a rawvideo pipe input — identical to
|
||||
* the way DeckLink bridges currently pipe raw frames.
|
||||
*
|
||||
* Each consumer instance has its own independent read cursor, so multiple
|
||||
* fc_pipe processes reading from the same slot never interfere with each
|
||||
* other. This is how growing + proxy + HLS all read the same SDI signal
|
||||
* simultaneously.
|
||||
*
|
||||
* Usage:
|
||||
* fc_pipe <slot_id> [wait_ms]
|
||||
*
|
||||
* Writes raw UYVY422 frame data to stdout. Terminates on:
|
||||
* - SIGTERM / SIGINT (clean stop from capture-manager)
|
||||
* - stdout EPIPE (ffmpeg exited)
|
||||
* - Slot disappears (bridge stopped)
|
||||
*
|
||||
* Exit codes:
|
||||
* 0 clean stop (SIGTERM)
|
||||
* 1 slot not found within wait_ms
|
||||
* 2 stdout write error (EPIPE)
|
||||
*/
|
||||
|
||||
#include "../src/slot.h"
|
||||
#include "fc_client.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
#include <stdint.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <time.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
static volatile int g_stop = 0;
|
||||
static void on_signal(int s) { (void)s; g_stop = 1; }
|
||||
|
||||
/* Write all bytes to fd. Returns 0 on success, -1 on EPIPE/error. */
|
||||
static int write_all_fd(int fd, const void *buf, size_t len) {
|
||||
const uint8_t *p = (const uint8_t *)buf;
|
||||
size_t off = 0;
|
||||
while (off < len) {
|
||||
ssize_t n = write(fd, p + off, len - off);
|
||||
if (n > 0) { off += (size_t)n; continue; }
|
||||
if (n < 0 && errno == EINTR) continue;
|
||||
return -1; /* EPIPE or other fatal error */
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
if (argc < 2) {
|
||||
fprintf(stderr, "Usage: %s <slot_id> [wait_ms]\n", argv[0]);
|
||||
return 1;
|
||||
}
|
||||
const char *slot_id = argv[1];
|
||||
uint64_t wait_ms = argc >= 3 ? (uint64_t)atoll(argv[2]) : 30000;
|
||||
|
||||
signal(SIGTERM, on_signal);
|
||||
signal(SIGINT, on_signal);
|
||||
signal(SIGPIPE, SIG_IGN); /* detect EPIPE via write() return value */
|
||||
|
||||
/* Set stdout to binary mode — no newline translation */
|
||||
fcntl(STDOUT_FILENO, F_SETFL,
|
||||
fcntl(STDOUT_FILENO, F_GETFL, 0) & ~O_NONBLOCK);
|
||||
|
||||
fprintf(stderr, "[fc_pipe] opening slot '%s' (wait %llums)\n",
|
||||
slot_id, (unsigned long long)wait_ms);
|
||||
|
||||
fc_consumer_t *c = fc_consumer_open(slot_id, wait_ms);
|
||||
if (!c) {
|
||||
fprintf(stderr, "[fc_pipe] slot '%s' not found within %llums\n",
|
||||
slot_id, (unsigned long long)wait_ms);
|
||||
return 1;
|
||||
}
|
||||
|
||||
fprintf(stderr, "[fc_pipe] slot open, streaming to stdout\n");
|
||||
|
||||
uint64_t frames_out = 0;
|
||||
uint64_t total_dropped = 0;
|
||||
|
||||
while (!g_stop) {
|
||||
fc_frame_ref_t ref;
|
||||
int rc = fc_consumer_read(c, &ref, 2000 /* 2s timeout */);
|
||||
|
||||
if (rc == FC_TIMEOUT) continue;
|
||||
if (rc == FC_ERROR) break;
|
||||
|
||||
if (rc == FC_LAPPED) {
|
||||
/* Copy was torn (writer lapped us mid-read). No valid frame to
|
||||
* write — log and read again. */
|
||||
total_dropped = fc_consumer_dropped(c);
|
||||
fprintf(stderr, "[fc_pipe] WARNING: frame lapped mid-read — total dropped: %llu\n",
|
||||
(unsigned long long)total_dropped);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rc == FC_DROPPED) {
|
||||
/* Skipped one or more older frames, but THIS frame is valid — log
|
||||
* and write it (do NOT continue). */
|
||||
total_dropped = fc_consumer_dropped(c);
|
||||
fprintf(stderr, "[fc_pipe] WARNING: consumer fell behind — total dropped: %llu\n",
|
||||
(unsigned long long)total_dropped);
|
||||
}
|
||||
|
||||
/* Write frame data to stdout (ref.data is a stable consumer-owned copy) */
|
||||
if (write_all_fd(STDOUT_FILENO, ref.data, ref.size) < 0) {
|
||||
if (!g_stop)
|
||||
fprintf(stderr, "[fc_pipe] stdout EPIPE — ffmpeg exited\n");
|
||||
break;
|
||||
}
|
||||
frames_out++;
|
||||
|
||||
/* Periodic stats to stderr (every 300 frames ≈ 5s at 60fps) */
|
||||
if (frames_out % 300 == 0) {
|
||||
fprintf(stderr, "[fc_pipe] frames=%llu dropped=%llu\n",
|
||||
(unsigned long long)frames_out,
|
||||
(unsigned long long)total_dropped);
|
||||
}
|
||||
}
|
||||
|
||||
fc_consumer_close(c);
|
||||
fprintf(stderr, "[fc_pipe] done frames=%llu dropped=%llu\n",
|
||||
(unsigned long long)frames_out,
|
||||
(unsigned long long)total_dropped);
|
||||
return 0;
|
||||
}
|
||||
74
services/framecache/client/fc_test_consumer.c
Normal file
74
services/framecache/client/fc_test_consumer.c
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* fc_test_consumer.c — Dev utility: attach to a framecache slot and print stats.
|
||||
*
|
||||
* Usage: fc_test_consumer <slot_id> [wait_ms]
|
||||
*/
|
||||
#include "fc_client.h"
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <signal.h>
|
||||
#include <time.h>
|
||||
|
||||
static volatile int g_run = 1;
|
||||
static void on_sig(int s) { (void)s; g_run = 0; }
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
if (argc < 2) {
|
||||
fprintf(stderr, "Usage: %s <slot_id> [wait_ms]\n", argv[0]);
|
||||
return 1;
|
||||
}
|
||||
const char *slot_id = argv[1];
|
||||
uint64_t wait_ms = argc >= 3 ? (uint64_t)atoi(argv[2]) : 30000;
|
||||
|
||||
signal(SIGINT, on_sig);
|
||||
signal(SIGTERM, on_sig);
|
||||
|
||||
fprintf(stderr, "Opening slot '%s' (wait up to %llums)...\n",
|
||||
slot_id, (unsigned long long)wait_ms);
|
||||
|
||||
fc_consumer_t *c = fc_consumer_open(slot_id, wait_ms);
|
||||
if (!c) {
|
||||
fprintf(stderr, "Failed to open slot '%s'\n", slot_id);
|
||||
return 1;
|
||||
}
|
||||
fprintf(stderr, "Slot opened. Reading frames (Ctrl+C to stop)...\n");
|
||||
|
||||
uint64_t total = 0, dropped = 0;
|
||||
struct timespec t0;
|
||||
clock_gettime(CLOCK_MONOTONIC, &t0);
|
||||
|
||||
while (g_run) {
|
||||
fc_frame_ref_t ref;
|
||||
int rc = fc_consumer_read(c, &ref, 2000);
|
||||
if (rc == FC_TIMEOUT) continue;
|
||||
if (rc == FC_ERROR) { fprintf(stderr, "read error\n"); break; }
|
||||
if (rc == FC_LAPPED) { /* torn copy — no valid frame, read again */ continue; }
|
||||
if (rc == FC_DROPPED) {
|
||||
dropped = fc_consumer_dropped(c);
|
||||
fprintf(stderr, "[WARN] consumer fell behind — total dropped: %llu\n",
|
||||
(unsigned long long)dropped);
|
||||
}
|
||||
total++;
|
||||
|
||||
/* Print stats every 100 frames */
|
||||
if (total % 100 == 0) {
|
||||
struct timespec now;
|
||||
clock_gettime(CLOCK_MONOTONIC, &now);
|
||||
double elapsed = (now.tv_sec - t0.tv_sec)
|
||||
+ (now.tv_nsec - t0.tv_nsec) * 1e-9;
|
||||
fprintf(stdout, "frames=%llu dropped=%llu fps=%.2f pts_us=%llu\n",
|
||||
(unsigned long long)total,
|
||||
(unsigned long long)fc_consumer_dropped(c),
|
||||
total / elapsed,
|
||||
(unsigned long long)ref.pts_us);
|
||||
fflush(stdout);
|
||||
}
|
||||
}
|
||||
|
||||
fprintf(stderr, "Done. total=%llu dropped=%llu\n",
|
||||
(unsigned long long)total,
|
||||
(unsigned long long)fc_consumer_dropped(c));
|
||||
fc_consumer_close(c);
|
||||
return 0;
|
||||
}
|
||||
367
services/framecache/src/framecache.c
Normal file
367
services/framecache/src/framecache.c
Normal file
|
|
@ -0,0 +1,367 @@
|
|||
/**
|
||||
* framecache.c — Main entry point. HTTP API server + slot manager.
|
||||
*
|
||||
* Endpoints:
|
||||
* POST /slots Create slot
|
||||
* GET /slots List slots
|
||||
* GET /slots/:id Get slot detail
|
||||
* DELETE /slots/:id Destroy slot
|
||||
* GET /health Health check
|
||||
*
|
||||
* Uses libmicrohttpd for the HTTP layer (single-threaded, poll-based).
|
||||
*/
|
||||
#include "slot.h"
|
||||
#include "registry.h"
|
||||
#include <time.h>
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
#include <signal.h>
|
||||
#include <errno.h>
|
||||
#include <sys/stat.h>
|
||||
#include <microhttpd.h>
|
||||
|
||||
#ifndef FC_PORT_DEFAULT
|
||||
#define FC_PORT_DEFAULT 7435
|
||||
#endif
|
||||
|
||||
/* ── tiny JSON helpers ─────────────────────────────────────────────── */
|
||||
|
||||
static int json_get_uint(const char *json, const char *key, uint32_t *out)
|
||||
{
|
||||
char pat[128];
|
||||
snprintf(pat, sizeof pat, "\"%s\":", key);
|
||||
const char *p = strstr(json, pat);
|
||||
if (!p) return -1;
|
||||
p += strlen(pat);
|
||||
while (*p == ' ' || *p == '\t') p++;
|
||||
*out = (uint32_t)strtoul(p, NULL, 10);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int json_get_str(const char *json, const char *key,
|
||||
char *out, size_t out_len)
|
||||
{
|
||||
char pat[128];
|
||||
snprintf(pat, sizeof pat, "\"%s\":", key);
|
||||
const char *p = strstr(json, pat);
|
||||
if (!p) return -1;
|
||||
p += strlen(pat);
|
||||
while (*p == ' ' || *p == '\t') p++;
|
||||
if (*p != '"') return -1;
|
||||
p++;
|
||||
size_t i = 0;
|
||||
while (*p && *p != '"' && i < out_len - 1)
|
||||
out[i++] = *p++;
|
||||
out[i] = '\0';
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* ── HTTP request accumulator ──────────────────────────────────────── */
|
||||
|
||||
typedef struct {
|
||||
char *buf;
|
||||
size_t len;
|
||||
size_t cap;
|
||||
} req_body_t;
|
||||
|
||||
static void req_body_free(req_body_t *r)
|
||||
{
|
||||
free(r->buf);
|
||||
r->buf = NULL; r->len = 0; r->cap = 0;
|
||||
}
|
||||
|
||||
/* ── response helpers ──────────────────────────────────────────────── */
|
||||
|
||||
static enum MHD_Result respond(struct MHD_Connection *conn,
|
||||
unsigned int status,
|
||||
const char *body)
|
||||
{
|
||||
struct MHD_Response *r = MHD_create_response_from_buffer(
|
||||
strlen(body), (void *)body, MHD_RESPMEM_MUST_COPY);
|
||||
MHD_add_response_header(r, "Content-Type", "application/json");
|
||||
MHD_add_response_header(r, "Access-Control-Allow-Origin", "*");
|
||||
enum MHD_Result rc = MHD_queue_response(conn, status, r);
|
||||
MHD_destroy_response(r);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* ── slot → JSON ───────────────────────────────────────────────────── */
|
||||
|
||||
static void slot_to_json(struct fc_slot *s, char *buf, size_t len)
|
||||
{
|
||||
fc_header_t *hdr = fc_slot_header(s);
|
||||
uint64_t wc = atomic_load(&hdr->write_cursor);
|
||||
uint64_t df = atomic_load(&hdr->dropped_frames);
|
||||
/* simple fps estimate — not perfect but good enough for status */
|
||||
snprintf(buf, len,
|
||||
"{"
|
||||
"\"slot_id\":\"%s\","
|
||||
"\"shm_path\":\"%s\","
|
||||
"\"sem_name\":\"%s\","
|
||||
"\"width\":%u,"
|
||||
"\"height\":%u,"
|
||||
"\"fps_num\":%u,"
|
||||
"\"fps_den\":%u,"
|
||||
"\"pixel_format\":\"UYVY422\","
|
||||
"\"source_type\":\"%s\","
|
||||
"\"frame_size\":%u,"
|
||||
"\"ring_depth\":%u,"
|
||||
"\"write_cursor\":%llu,"
|
||||
"\"dropped_frames\":%llu"
|
||||
"}",
|
||||
fc_slot_id(s),
|
||||
fc_slot_shm_path(s),
|
||||
fc_slot_sem_name(s),
|
||||
hdr->width, hdr->height,
|
||||
hdr->fps_num, hdr->fps_den,
|
||||
hdr->source_type,
|
||||
hdr->frame_size,
|
||||
hdr->ring_depth,
|
||||
(unsigned long long)wc,
|
||||
(unsigned long long)df
|
||||
);
|
||||
}
|
||||
|
||||
/* ── request handler ───────────────────────────────────────────────── */
|
||||
|
||||
static enum MHD_Result handle_request(
|
||||
void *cls,
|
||||
struct MHD_Connection *conn,
|
||||
const char *url,
|
||||
const char *method,
|
||||
const char *version,
|
||||
const char *upload_data,
|
||||
size_t *upload_data_size,
|
||||
void **con_cls)
|
||||
{
|
||||
(void)cls; (void)version;
|
||||
|
||||
/* First call: allocate body accumulator */
|
||||
if (*con_cls == NULL) {
|
||||
req_body_t *rb = calloc(1, sizeof *rb);
|
||||
if (!rb) return MHD_NO;
|
||||
*con_cls = rb;
|
||||
return MHD_YES;
|
||||
}
|
||||
req_body_t *rb = (req_body_t *)*con_cls;
|
||||
|
||||
/* Accumulate POST body */
|
||||
if (*upload_data_size > 0) {
|
||||
size_t need = rb->len + *upload_data_size + 1;
|
||||
if (need > rb->cap) {
|
||||
rb->buf = realloc(rb->buf, need);
|
||||
rb->cap = need;
|
||||
}
|
||||
memcpy(rb->buf + rb->len, upload_data, *upload_data_size);
|
||||
rb->len += *upload_data_size;
|
||||
rb->buf[rb->len] = '\0';
|
||||
*upload_data_size = 0;
|
||||
return MHD_YES;
|
||||
}
|
||||
|
||||
enum MHD_Result rc;
|
||||
char resp[4096];
|
||||
|
||||
/* GET /health */
|
||||
if (strcmp(method, "GET") == 0 && strcmp(url, "/health") == 0) {
|
||||
rc = respond(conn, MHD_HTTP_OK, "{\"status\":\"ok\"}");
|
||||
goto done;
|
||||
}
|
||||
|
||||
/* GET /slots
|
||||
* Worst case: FC_MAX_SLOTS (256) × ~2KB/entry ≈ 512KB. A 64KB stack buffer
|
||||
* would overflow at ~32 slots (and `pos` could pass `sizeof big`, making
|
||||
* `sizeof big - pos` underflow to a huge size_t). Heap-allocate a buffer
|
||||
* sized for the worst case and bound-check every append. */
|
||||
if (strcmp(method, "GET") == 0 && strcmp(url, "/slots") == 0) {
|
||||
size_t cap = (size_t)FC_MAX_SLOTS * 2100 + 64; /* worst case + brackets */
|
||||
char *big = malloc(cap);
|
||||
if (!big) {
|
||||
rc = respond(conn, MHD_HTTP_INTERNAL_SERVER_ERROR,
|
||||
"{\"error\":\"out of memory\"}");
|
||||
goto done;
|
||||
}
|
||||
size_t pos = 0;
|
||||
if (pos < cap) big[pos++] = '[';
|
||||
int first = 1;
|
||||
for (int i = 0; i < FC_MAX_SLOTS; i++) {
|
||||
if (!g_registry[i].active) continue;
|
||||
char entry[2100];
|
||||
slot_to_json(g_registry[i].slot, entry, sizeof entry);
|
||||
size_t elen = strlen(entry);
|
||||
/* +2 for possible comma + closing bracket, +1 for NUL */
|
||||
if (pos + elen + 3 >= cap) break; /* never overflow */
|
||||
if (!first) big[pos++] = ',';
|
||||
first = 0;
|
||||
memcpy(big + pos, entry, elen);
|
||||
pos += elen;
|
||||
}
|
||||
if (pos + 2 < cap) big[pos++] = ']';
|
||||
big[pos] = '\0';
|
||||
rc = respond(conn, MHD_HTTP_OK, big);
|
||||
free(big);
|
||||
goto done;
|
||||
}
|
||||
|
||||
/* GET /slots/:id */
|
||||
if (strcmp(method, "GET") == 0 &&
|
||||
strncmp(url, "/slots/", 7) == 0 && strlen(url) > 7)
|
||||
{
|
||||
const char *id = url + 7;
|
||||
struct fc_slot *s = registry_find(id);
|
||||
if (!s) {
|
||||
rc = respond(conn, MHD_HTTP_NOT_FOUND,
|
||||
"{\"error\":\"slot not found\"}");
|
||||
goto done;
|
||||
}
|
||||
slot_to_json(s, resp, sizeof resp);
|
||||
rc = respond(conn, MHD_HTTP_OK, resp);
|
||||
goto done;
|
||||
}
|
||||
|
||||
/* POST /slots */
|
||||
if (strcmp(method, "POST") == 0 && strcmp(url, "/slots") == 0) {
|
||||
if (!rb->buf || rb->len == 0) {
|
||||
rc = respond(conn, MHD_HTTP_BAD_REQUEST,
|
||||
"{\"error\":\"empty body\"}");
|
||||
goto done;
|
||||
}
|
||||
char slot_id[FC_MAX_SLOT_ID] = {0};
|
||||
char source_type[32] = "unknown";
|
||||
uint32_t width = 0, height = 0, fps_num = 0, fps_den = 0;
|
||||
|
||||
json_get_str(rb->buf, "slot_id", slot_id, sizeof slot_id);
|
||||
json_get_str(rb->buf, "source_type", source_type, sizeof source_type);
|
||||
json_get_uint(rb->buf, "width", &width);
|
||||
json_get_uint(rb->buf, "height", &height);
|
||||
json_get_uint(rb->buf, "fps_num", &fps_num);
|
||||
json_get_uint(rb->buf, "fps_den", &fps_den);
|
||||
|
||||
if (!slot_id[0] || !width || !height || !fps_num || !fps_den) {
|
||||
rc = respond(conn, MHD_HTTP_BAD_REQUEST,
|
||||
"{\"error\":\"missing required fields: "
|
||||
"slot_id, width, height, fps_num, fps_den\"}");
|
||||
goto done;
|
||||
}
|
||||
if (registry_find(slot_id)) {
|
||||
rc = respond(conn, MHD_HTTP_CONFLICT,
|
||||
"{\"error\":\"slot already exists\"}");
|
||||
goto done;
|
||||
}
|
||||
|
||||
struct fc_slot *s = fc_slot_create(slot_id, width, height,
|
||||
fps_num, fps_den,
|
||||
FC_PIX_UYVY422, source_type);
|
||||
if (!s) {
|
||||
rc = respond(conn, MHD_HTTP_INTERNAL_SERVER_ERROR,
|
||||
"{\"error\":\"failed to create slot\"}");
|
||||
goto done;
|
||||
}
|
||||
registry_add(s);
|
||||
|
||||
snprintf(resp, sizeof resp,
|
||||
"{\"slot_id\":\"%s\","
|
||||
"\"shm_path\":\"%s\","
|
||||
"\"sem_name\":\"%s\"}",
|
||||
fc_slot_id(s),
|
||||
fc_slot_shm_path(s),
|
||||
fc_slot_sem_name(s));
|
||||
rc = respond(conn, MHD_HTTP_CREATED, resp);
|
||||
goto done;
|
||||
}
|
||||
|
||||
/* DELETE /slots/:id */
|
||||
if (strcmp(method, "DELETE") == 0 &&
|
||||
strncmp(url, "/slots/", 7) == 0 && strlen(url) > 7)
|
||||
{
|
||||
const char *id = url + 7;
|
||||
struct fc_slot *s = registry_find(id);
|
||||
if (!s) {
|
||||
rc = respond(conn, MHD_HTTP_NOT_FOUND,
|
||||
"{\"error\":\"slot not found\"}");
|
||||
goto done;
|
||||
}
|
||||
registry_remove(id);
|
||||
fc_slot_destroy(s);
|
||||
rc = respond(conn, MHD_HTTP_NO_CONTENT, "");
|
||||
goto done;
|
||||
}
|
||||
|
||||
rc = respond(conn, MHD_HTTP_NOT_FOUND, "{\"error\":\"not found\"}");
|
||||
|
||||
done:
|
||||
req_body_free(rb);
|
||||
free(rb);
|
||||
*con_cls = NULL;
|
||||
return rc;
|
||||
}
|
||||
|
||||
static void request_completed(void *cls,
|
||||
struct MHD_Connection *conn,
|
||||
void **con_cls,
|
||||
enum MHD_RequestTerminationCode toe)
|
||||
{
|
||||
(void)cls; (void)conn; (void)toe;
|
||||
if (*con_cls) {
|
||||
req_body_free((req_body_t *)*con_cls);
|
||||
free(*con_cls);
|
||||
*con_cls = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* ── main ──────────────────────────────────────────────────────────── */
|
||||
|
||||
static volatile int g_running = 1;
|
||||
|
||||
static void on_signal(int sig) { (void)sig; g_running = 0; }
|
||||
|
||||
int main(void)
|
||||
{
|
||||
signal(SIGINT, on_signal);
|
||||
signal(SIGTERM, on_signal);
|
||||
|
||||
/* Ensure /dev/shm/framecache exists */
|
||||
mkdir("/dev/shm/framecache", 0755);
|
||||
|
||||
/* Write empty registry */
|
||||
registry_write_json();
|
||||
|
||||
const char *port_str = getenv("FC_PORT");
|
||||
uint16_t port = port_str ? (uint16_t)atoi(port_str) : FC_PORT_DEFAULT;
|
||||
|
||||
struct MHD_Daemon *daemon = MHD_start_daemon(
|
||||
MHD_USE_SELECT_INTERNALLY,
|
||||
port,
|
||||
NULL, NULL,
|
||||
handle_request, NULL,
|
||||
MHD_OPTION_NOTIFY_COMPLETED, request_completed, NULL,
|
||||
MHD_OPTION_END);
|
||||
|
||||
if (!daemon) {
|
||||
fprintf(stderr, "[framecache] failed to start HTTP server on port %u\n", port);
|
||||
return 1;
|
||||
}
|
||||
|
||||
fprintf(stderr, "[framecache] listening on port %u\n", port);
|
||||
|
||||
while (g_running) {
|
||||
struct timespec ts = { .tv_sec = 0, .tv_nsec = 100000000 }; /* 100ms */
|
||||
nanosleep(&ts, NULL);
|
||||
}
|
||||
|
||||
fprintf(stderr, "[framecache] shutting down\n");
|
||||
|
||||
/* Destroy all active slots */
|
||||
for (int i = 0; i < FC_MAX_SLOTS; i++) {
|
||||
if (g_registry[i].active) {
|
||||
registry_remove(g_registry[i].slot_id);
|
||||
fc_slot_destroy(g_registry[i].slot);
|
||||
}
|
||||
}
|
||||
|
||||
MHD_stop_daemon(daemon);
|
||||
return 0;
|
||||
}
|
||||
422
services/framecache/src/net_ingest.c
Normal file
422
services/framecache/src/net_ingest.c
Normal file
|
|
@ -0,0 +1,422 @@
|
|||
/**
|
||||
* net_ingest.c — Network source (RTMP/SRT) → framecache slot ingest.
|
||||
*
|
||||
* Spawns ffmpeg to decode a network stream to raw UYVY422 on stdout, then
|
||||
* reads those frames and writes them into a framecache slot via the shm
|
||||
* ring buffer. Registers the slot with the framecache HTTP API on startup
|
||||
* and deregisters on clean exit.
|
||||
*
|
||||
* Usage:
|
||||
* net_ingest --url <srt://...|rtmp://...>
|
||||
* --slot-id <recorder-uuid>
|
||||
* --fc-url http://framecache:7435
|
||||
* --width <W> --height <H>
|
||||
* --fps-num <N> --fps-den <D>
|
||||
* [--source-type srt|rtmp]
|
||||
* [--listen] # SRT/RTMP listener mode
|
||||
* [--listen-port <N>] # listener port (SRT default 9000, RTMP 1935)
|
||||
* [--stream-key <k>] # RTMP stream key (default "stream")
|
||||
*
|
||||
* Emits one JSON line to stderr on first frame:
|
||||
* {"slot_id":"<id>","width":W,"height":H,"fps_num":N,"fps_den":D,
|
||||
* "source_type":"srt","pix_fmt":"uyvy422"}
|
||||
*
|
||||
* Exits 0 on clean stop (SIGTERM), 1 on error.
|
||||
*
|
||||
* The framecache slot stays alive between ffmpeg reconnects (listener mode):
|
||||
* net_ingest keeps the slot open and restarts ffmpeg on disconnect.
|
||||
*/
|
||||
|
||||
#include "slot.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
#include <stdatomic.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <signal.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/wait.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#include <netdb.h>
|
||||
#include <sys/socket.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <netinet/in.h>
|
||||
|
||||
/* Re-use fc_writer helpers inline (no external dep) */
|
||||
#define FC_URL_DEFAULT "http://localhost:7435"
|
||||
|
||||
static volatile int g_stop = 0;
|
||||
static void on_signal(int s) { (void)s; g_stop = 1; }
|
||||
|
||||
/* ── Tiny HTTP POST/DELETE (same approach as fc_writer.c) ─────────── */
|
||||
static int http_req(const char *method, const char *host, int port,
|
||||
const char *path, const char *body,
|
||||
char *resp, size_t resp_len)
|
||||
{
|
||||
struct sockaddr_in sa;
|
||||
memset(&sa, 0, sizeof sa);
|
||||
sa.sin_family = AF_INET;
|
||||
sa.sin_port = htons((uint16_t)port);
|
||||
struct hostent *he = gethostbyname(host);
|
||||
if (!he) return -1;
|
||||
memcpy(&sa.sin_addr, he->h_addr_list[0], (size_t)he->h_length);
|
||||
|
||||
int fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (fd < 0) return -1;
|
||||
struct timeval tv = { .tv_sec = 5 };
|
||||
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof tv);
|
||||
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof tv);
|
||||
if (connect(fd, (struct sockaddr *)&sa, sizeof sa) < 0) { close(fd); return -1; }
|
||||
|
||||
char req[4096];
|
||||
int rlen;
|
||||
if (body)
|
||||
rlen = snprintf(req, sizeof req,
|
||||
"%s %s HTTP/1.0\r\nHost: %s:%d\r\n"
|
||||
"Content-Type: application/json\r\nContent-Length: %zu\r\n"
|
||||
"Connection: close\r\n\r\n%s",
|
||||
method, path, host, port, strlen(body), body);
|
||||
else
|
||||
rlen = snprintf(req, sizeof req,
|
||||
"%s %s HTTP/1.0\r\nHost: %s:%d\r\nConnection: close\r\n\r\n",
|
||||
method, path, host, port);
|
||||
|
||||
send(fd, req, (size_t)rlen, 0);
|
||||
|
||||
int status = -1;
|
||||
size_t got = 0;
|
||||
char buf[8192];
|
||||
ssize_t n;
|
||||
while ((n = recv(fd, buf + got, sizeof buf - got - 1, 0)) > 0) got += (size_t)n;
|
||||
buf[got] = '\0';
|
||||
sscanf(buf, "HTTP/%*s %d", &status);
|
||||
if (resp && resp_len) {
|
||||
const char *b = strstr(buf, "\r\n\r\n");
|
||||
if (b) { strncpy(resp, b + 4, resp_len - 1); resp[resp_len-1] = '\0'; }
|
||||
}
|
||||
close(fd);
|
||||
return status;
|
||||
}
|
||||
|
||||
static void parse_url(const char *url, char *host, size_t hl, int *port) {
|
||||
const char *p = url;
|
||||
if (!strncmp(p, "http://", 7)) p += 7;
|
||||
*port = 7435;
|
||||
const char *colon = strchr(p, ':');
|
||||
if (colon) {
|
||||
size_t n = (size_t)(colon - p) < hl ? (size_t)(colon - p) : hl - 1;
|
||||
strncpy(host, p, n); host[n] = '\0';
|
||||
*port = atoi(colon + 1);
|
||||
} else { strncpy(host, p, hl - 1); host[hl-1] = '\0'; }
|
||||
}
|
||||
|
||||
static int json_str(const char *j, const char *k, char *out, size_t len) {
|
||||
char pat[128]; snprintf(pat, sizeof pat, "\"%s\":", k);
|
||||
const char *p = strstr(j, pat); if (!p) return -1;
|
||||
p += strlen(pat); while (*p == ' ') p++;
|
||||
if (*p != '"') return -1; p++;
|
||||
size_t i = 0;
|
||||
while (*p && *p != '"' && i < len - 1) out[i++] = *p++;
|
||||
out[i] = '\0'; return 0;
|
||||
}
|
||||
|
||||
/* ── Frame size helpers ────────────────────────────────────────────── */
|
||||
static inline size_t frame_bytes(uint32_t w, uint32_t h) {
|
||||
return (size_t)w * h * 2; /* UYVY422 */
|
||||
}
|
||||
|
||||
/* ── Register slot with framecache ────────────────────────────────── */
|
||||
static int register_slot(const char *fc_url, const char *slot_id,
|
||||
uint32_t w, uint32_t h,
|
||||
uint32_t fps_num, uint32_t fps_den,
|
||||
const char *source_type,
|
||||
char *shm_path, size_t sp_len,
|
||||
char *sem_name, size_t sn_len)
|
||||
{
|
||||
char host[128]; int port;
|
||||
parse_url(fc_url, host, sizeof host, &port);
|
||||
|
||||
char body[512];
|
||||
snprintf(body, sizeof body,
|
||||
"{\"slot_id\":\"%s\",\"width\":%u,\"height\":%u,"
|
||||
"\"fps_num\":%u,\"fps_den\":%u,\"source_type\":\"%s\"}",
|
||||
slot_id, w, h, fps_num, fps_den, source_type);
|
||||
|
||||
char resp[1024] = {0};
|
||||
int st = http_req("POST", host, port, "/slots", body, resp, sizeof resp);
|
||||
if (st != 201) {
|
||||
fprintf(stderr, "[net_ingest] POST /slots failed HTTP %d: %s\n", st, resp);
|
||||
return -1;
|
||||
}
|
||||
json_str(resp, "shm_path", shm_path, sp_len);
|
||||
json_str(resp, "sem_name", sem_name, sn_len);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void deregister_slot(const char *fc_url, const char *slot_id) {
|
||||
char host[128]; int port;
|
||||
parse_url(fc_url, host, sizeof host, &port);
|
||||
char path[192]; snprintf(path, sizeof path, "/slots/%s", slot_id);
|
||||
http_req("DELETE", host, port, path, NULL, NULL, 0);
|
||||
}
|
||||
|
||||
/* ── Open shm + semaphore for writing ─────────────────────────────── */
|
||||
#include <sys/mman.h>
|
||||
#include <semaphore.h>
|
||||
|
||||
typedef struct {
|
||||
void *base;
|
||||
size_t size;
|
||||
int fd;
|
||||
sem_t *sem;
|
||||
} ShmWriter;
|
||||
|
||||
static int shm_writer_open(const char *shm_path, const char *sem_name,
|
||||
ShmWriter *sw)
|
||||
{
|
||||
sw->fd = open(shm_path, O_RDWR);
|
||||
if (sw->fd < 0) return -1;
|
||||
fc_header_t hdr;
|
||||
if (pread(sw->fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) {
|
||||
close(sw->fd); return -1;
|
||||
}
|
||||
sw->size = fc_slot_shm_size(hdr.frame_size);
|
||||
sw->base = mmap(NULL, sw->size, PROT_READ | PROT_WRITE, MAP_SHARED, sw->fd, 0);
|
||||
if (sw->base == MAP_FAILED) { close(sw->fd); return -1; }
|
||||
sw->sem = sem_open(sem_name, 0);
|
||||
if (sw->sem == SEM_FAILED) { munmap(sw->base, sw->size); close(sw->fd); return -1; }
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void shm_write_frame(ShmWriter *sw, const uint8_t *data,
|
||||
uint32_t size, uint64_t pts_us)
|
||||
{
|
||||
fc_header_t *hdr = (fc_header_t *)sw->base;
|
||||
uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed);
|
||||
fc_frame_t *frame = fc_frame_at(sw->base, hdr->frame_size, cur);
|
||||
struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts);
|
||||
frame->pts_us = pts_us;
|
||||
frame->wall_us = (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL;
|
||||
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
|
||||
memcpy(frame->data, data, frame->size);
|
||||
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
|
||||
sem_post(sw->sem);
|
||||
}
|
||||
|
||||
static void shm_writer_close(ShmWriter *sw) {
|
||||
if (sw->sem) { sem_close(sw->sem); sw->sem = NULL; }
|
||||
if (sw->base) { munmap(sw->base, sw->size); sw->base = NULL; }
|
||||
if (sw->fd >= 0) { close(sw->fd); sw->fd = -1; }
|
||||
}
|
||||
|
||||
/* ── Build ffmpeg args for network decode → rawvideo stdout ──────────
|
||||
* All dynamic strings are written into CALLER-OWNED buffers (passed in) so
|
||||
* there is no per-call strdup leak across listener reconnects. The video
|
||||
* filter forces the EXACT target W:H (scale=W:H, not iw:ih) so a mid-stream
|
||||
* source resolution change cannot desync the fixed-size frame reassembly —
|
||||
* ffmpeg's scaler always emits width*height*2 bytes per frame.
|
||||
*
|
||||
* Caller must provide:
|
||||
* url_buf — at least 320 bytes (built listener URL, or copied caller URL)
|
||||
* vf_buf — at least 64 bytes (scale/format filter)
|
||||
*/
|
||||
static int build_ffmpeg_args(
|
||||
char **argv, int max_args,
|
||||
const char *url, const char *source_type,
|
||||
int listen, int listen_port, const char *stream_key,
|
||||
uint32_t w, uint32_t h,
|
||||
char *url_buf, size_t url_buf_len,
|
||||
char *vf_buf, size_t vf_buf_len)
|
||||
{
|
||||
(void)max_args;
|
||||
char port_str[16];
|
||||
|
||||
int i = 0;
|
||||
argv[i++] = "ffmpeg";
|
||||
argv[i++] = "-hide_banner";
|
||||
argv[i++] = "-loglevel"; argv[i++] = "warning";
|
||||
|
||||
/* Input */
|
||||
argv[i++] = "-probesize"; argv[i++] = "32M";
|
||||
argv[i++] = "-analyzeduration"; argv[i++] = "10M";
|
||||
argv[i++] = "-fflags"; argv[i++] = "+genpts";
|
||||
|
||||
if (!strcmp(source_type, "srt") && listen) {
|
||||
snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 9000);
|
||||
snprintf(url_buf, url_buf_len, "srt://0.0.0.0:%s?mode=listener", port_str);
|
||||
argv[i++] = "-i"; argv[i++] = url_buf;
|
||||
} else if (!strcmp(source_type, "rtmp") && listen) {
|
||||
snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 1935);
|
||||
snprintf(url_buf, url_buf_len, "rtmp://0.0.0.0:%s/live/%s",
|
||||
port_str, stream_key ? stream_key : "stream");
|
||||
argv[i++] = "-listen"; argv[i++] = "1";
|
||||
argv[i++] = "-i"; argv[i++] = url_buf;
|
||||
} else {
|
||||
argv[i++] = "-i"; argv[i++] = (char *)url;
|
||||
}
|
||||
|
||||
/* Force EXACT output dimensions so every frame is exactly w*h*2 bytes,
|
||||
* even if the source resolution changes mid-stream (SRT/RTMP reconnect to
|
||||
* a different encoder). This is the resync guarantee for the fixed-size
|
||||
* frame reassembly loop in main(). */
|
||||
snprintf(vf_buf, vf_buf_len, "scale=%u:%u,format=uyvy422", w, h);
|
||||
|
||||
/* Video output: raw UYVY422 to stdout */
|
||||
argv[i++] = "-map"; argv[i++] = "0:v:0";
|
||||
argv[i++] = "-vf"; argv[i++] = vf_buf;
|
||||
argv[i++] = "-f"; argv[i++] = "rawvideo";
|
||||
argv[i++] = "-pix_fmt"; argv[i++] = "uyvy422";
|
||||
argv[i++] = "pipe:1";
|
||||
|
||||
argv[i] = NULL;
|
||||
return i;
|
||||
}
|
||||
|
||||
/* ── Main ──────────────────────────────────────────────────────────── */
|
||||
int main(int argc, char *argv[]) {
|
||||
const char *url = NULL;
|
||||
const char *slot_id = NULL;
|
||||
const char *fc_url = getenv("FC_URL") ? getenv("FC_URL") : FC_URL_DEFAULT;
|
||||
const char *source_type = "srt";
|
||||
uint32_t width = 1920, height = 1080;
|
||||
uint32_t fps_num = 30000, fps_den = 1001;
|
||||
int listen = 0, listen_port = 0;
|
||||
const char *stream_key = "stream";
|
||||
|
||||
for (int i = 1; i < argc; i++) {
|
||||
if (!strcmp(argv[i], "--url") && i+1 < argc) url = argv[++i];
|
||||
else if (!strcmp(argv[i], "--slot-id") && i+1 < argc) slot_id = argv[++i];
|
||||
else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) fc_url = argv[++i];
|
||||
else if (!strcmp(argv[i], "--source-type") && i+1 < argc) source_type = argv[++i];
|
||||
else if (!strcmp(argv[i], "--width") && i+1 < argc) width = (uint32_t)atoi(argv[++i]);
|
||||
else if (!strcmp(argv[i], "--height") && i+1 < argc) height = (uint32_t)atoi(argv[++i]);
|
||||
else if (!strcmp(argv[i], "--fps-num") && i+1 < argc) fps_num = (uint32_t)atoi(argv[++i]);
|
||||
else if (!strcmp(argv[i], "--fps-den") && i+1 < argc) fps_den = (uint32_t)atoi(argv[++i]);
|
||||
else if (!strcmp(argv[i], "--listen")) listen = 1;
|
||||
else if (!strcmp(argv[i], "--listen-port") && i+1 < argc) listen_port = atoi(argv[++i]);
|
||||
else if (!strcmp(argv[i], "--stream-key") && i+1 < argc) stream_key = argv[++i];
|
||||
}
|
||||
|
||||
if (!slot_id) {
|
||||
fprintf(stderr, "[net_ingest] --slot-id required\n");
|
||||
return 1;
|
||||
}
|
||||
if (!url && !listen) {
|
||||
fprintf(stderr, "[net_ingest] --url or --listen required\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
signal(SIGTERM, on_signal);
|
||||
signal(SIGINT, on_signal);
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
signal(SIGCHLD, SIG_DFL);
|
||||
|
||||
/* ── Register slot ──────────────────────────────────────────────── */
|
||||
char shm_path[128] = {0}, sem_name[128] = {0};
|
||||
if (register_slot(fc_url, slot_id, width, height, fps_num, fps_den,
|
||||
source_type, shm_path, sizeof shm_path,
|
||||
sem_name, sizeof sem_name) < 0) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
ShmWriter sw = { .fd = -1 };
|
||||
if (shm_writer_open(shm_path, sem_name, &sw) < 0) {
|
||||
fprintf(stderr, "[net_ingest] failed to open shm %s\n", shm_path);
|
||||
deregister_slot(fc_url, slot_id);
|
||||
return 1;
|
||||
}
|
||||
|
||||
size_t fsz = frame_bytes(width, height);
|
||||
uint8_t *frame_buf = malloc(fsz);
|
||||
if (!frame_buf) { shm_writer_close(&sw); deregister_slot(fc_url, slot_id); return 1; }
|
||||
|
||||
uint64_t frame_seq = 0;
|
||||
int reported = 0;
|
||||
|
||||
fprintf(stderr, "[net_ingest] slot=%s %ux%u %.2ffps source=%s%s\n",
|
||||
slot_id, width, height,
|
||||
fps_den ? (double)fps_num / fps_den : 0.0,
|
||||
source_type, listen ? " (listener)" : "");
|
||||
|
||||
/* Caller-owned arg buffers — reused each reconnect, no per-loop leak. */
|
||||
char ff_url_buf[320];
|
||||
char ff_vf_buf[64];
|
||||
|
||||
/* ── Outer reconnect loop (listener mode stays alive between sessions) */
|
||||
while (!g_stop) {
|
||||
/* Build ffmpeg argv (writes into ff_url_buf / ff_vf_buf, no strdup) */
|
||||
char *ff_argv[64];
|
||||
build_ffmpeg_args(ff_argv, 64, url, source_type,
|
||||
listen, listen_port, stream_key, width, height,
|
||||
ff_url_buf, sizeof ff_url_buf,
|
||||
ff_vf_buf, sizeof ff_vf_buf);
|
||||
|
||||
/* Spawn ffmpeg with stdout pipe */
|
||||
int pfd[2];
|
||||
if (pipe(pfd) < 0) break;
|
||||
|
||||
pid_t pid = fork();
|
||||
if (pid < 0) { close(pfd[0]); close(pfd[1]); break; }
|
||||
|
||||
if (pid == 0) {
|
||||
/* Child: redirect stdout to pipe write end */
|
||||
dup2(pfd[1], STDOUT_FILENO);
|
||||
close(pfd[0]); close(pfd[1]);
|
||||
execvp("ffmpeg", ff_argv);
|
||||
_exit(127);
|
||||
}
|
||||
|
||||
/* Parent: read from pipe read end */
|
||||
close(pfd[1]);
|
||||
int rfd = pfd[0];
|
||||
|
||||
size_t buf_off = 0;
|
||||
while (!g_stop) {
|
||||
ssize_t n = read(rfd, frame_buf + buf_off, fsz - buf_off);
|
||||
if (n <= 0) break; /* ffmpeg exited or pipe closed */
|
||||
buf_off += (size_t)n;
|
||||
if (buf_off < fsz) continue; /* incomplete frame — keep reading */
|
||||
|
||||
/* Full frame assembled */
|
||||
uint64_t pts_us = fps_num > 0
|
||||
? frame_seq * 1000000ULL * fps_den / fps_num
|
||||
: 0;
|
||||
shm_write_frame(&sw, frame_buf, (uint32_t)fsz, pts_us);
|
||||
frame_seq++;
|
||||
buf_off = 0;
|
||||
|
||||
if (!reported) {
|
||||
fprintf(stderr,
|
||||
"{\"slot_id\":\"%s\",\"width\":%u,\"height\":%u,"
|
||||
"\"fps_num\":%u,\"fps_den\":%u,"
|
||||
"\"source_type\":\"%s\",\"pix_fmt\":\"uyvy422\"}\n",
|
||||
slot_id, width, height, fps_num, fps_den, source_type);
|
||||
fflush(stderr);
|
||||
reported = 1;
|
||||
}
|
||||
}
|
||||
|
||||
close(rfd);
|
||||
/* Reap ffmpeg child */
|
||||
int wstatus;
|
||||
kill(pid, SIGTERM);
|
||||
waitpid(pid, &wstatus, 0);
|
||||
|
||||
if (!listen || g_stop) break;
|
||||
|
||||
/* Listener mode: wait 1s then reconnect */
|
||||
fprintf(stderr, "[net_ingest] listener: waiting for next connection\n");
|
||||
struct timespec ts = { .tv_sec = 1 };
|
||||
nanosleep(&ts, NULL);
|
||||
}
|
||||
|
||||
free(frame_buf);
|
||||
shm_writer_close(&sw);
|
||||
deregister_slot(fc_url, slot_id);
|
||||
fprintf(stderr, "[net_ingest] done frames=%llu\n", (unsigned long long)frame_seq);
|
||||
return 0;
|
||||
}
|
||||
108
services/framecache/src/registry.c
Normal file
108
services/framecache/src/registry.c
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
/**
|
||||
* registry.c — In-memory slot registry + JSON persistence.
|
||||
*/
|
||||
#include "registry.h"
|
||||
#include "slot.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
|
||||
fc_registry_entry_t g_registry[FC_MAX_SLOTS];
|
||||
int g_registry_count = 0;
|
||||
|
||||
static const char *REGISTRY_JSON = "/dev/shm/framecache/registry.json";
|
||||
|
||||
void registry_add(struct fc_slot *slot)
|
||||
{
|
||||
for (int i = 0; i < FC_MAX_SLOTS; i++) {
|
||||
if (!g_registry[i].active) {
|
||||
g_registry[i].active = 1;
|
||||
g_registry[i].slot = slot;
|
||||
strncpy(g_registry[i].slot_id, fc_slot_id(slot),
|
||||
FC_MAX_SLOT_ID - 1);
|
||||
g_registry_count++;
|
||||
registry_write_json();
|
||||
return;
|
||||
}
|
||||
}
|
||||
fprintf(stderr, "[framecache] registry full (%d slots)\n", FC_MAX_SLOTS);
|
||||
}
|
||||
|
||||
void registry_remove(const char *slot_id)
|
||||
{
|
||||
for (int i = 0; i < FC_MAX_SLOTS; i++) {
|
||||
if (g_registry[i].active &&
|
||||
strncmp(g_registry[i].slot_id, slot_id, FC_MAX_SLOT_ID) == 0)
|
||||
{
|
||||
g_registry[i].active = 0;
|
||||
g_registry[i].slot = NULL;
|
||||
g_registry[i].slot_id[0] = '\0';
|
||||
g_registry_count--;
|
||||
registry_write_json();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct fc_slot *registry_find(const char *slot_id)
|
||||
{
|
||||
for (int i = 0; i < FC_MAX_SLOTS; i++) {
|
||||
if (g_registry[i].active &&
|
||||
strncmp(g_registry[i].slot_id, slot_id, FC_MAX_SLOT_ID) == 0)
|
||||
{
|
||||
return g_registry[i].slot;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void registry_write_json(void)
|
||||
{
|
||||
FILE *f = fopen(REGISTRY_JSON, "w");
|
||||
if (!f) return;
|
||||
|
||||
fprintf(f, "{\n \"version\": 1,\n \"slots\": {\n");
|
||||
|
||||
int first = 1;
|
||||
for (int i = 0; i < FC_MAX_SLOTS; i++) {
|
||||
if (!g_registry[i].active) continue;
|
||||
fc_header_t *hdr = fc_slot_header(g_registry[i].slot);
|
||||
|
||||
char ts[32];
|
||||
time_t now = time(NULL);
|
||||
struct tm *t = gmtime(&now);
|
||||
strftime(ts, sizeof ts, "%Y-%m-%dT%H:%M:%SZ", t);
|
||||
|
||||
if (!first) fprintf(f, ",\n");
|
||||
first = 0;
|
||||
|
||||
fprintf(f,
|
||||
" \"%s\": {\n"
|
||||
" \"shm_path\": \"%s\",\n"
|
||||
" \"sem_name\": \"%s\",\n"
|
||||
" \"width\": %u,\n"
|
||||
" \"height\": %u,\n"
|
||||
" \"fps_num\": %u,\n"
|
||||
" \"fps_den\": %u,\n"
|
||||
" \"pixel_format\": \"UYVY422\",\n"
|
||||
" \"source_type\": \"%s\",\n"
|
||||
" \"frame_size\": %u,\n"
|
||||
" \"ring_depth\": %u,\n"
|
||||
" \"created_at\": \"%s\"\n"
|
||||
" }",
|
||||
g_registry[i].slot_id,
|
||||
fc_slot_shm_path(g_registry[i].slot),
|
||||
fc_slot_sem_name(g_registry[i].slot),
|
||||
hdr->width, hdr->height,
|
||||
hdr->fps_num, hdr->fps_den,
|
||||
hdr->source_type,
|
||||
hdr->frame_size,
|
||||
hdr->ring_depth,
|
||||
ts
|
||||
);
|
||||
}
|
||||
|
||||
fprintf(f, "\n }\n}\n");
|
||||
fclose(f);
|
||||
}
|
||||
21
services/framecache/src/registry.h
Normal file
21
services/framecache/src/registry.h
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
#pragma once
|
||||
#include "slot.h"
|
||||
|
||||
/* Maximum number of concurrent slots */
|
||||
#define FC_MAX_SLOTS 256
|
||||
|
||||
/* Registry entry (in-memory) */
|
||||
typedef struct {
|
||||
int active;
|
||||
struct fc_slot *slot;
|
||||
char slot_id[FC_MAX_SLOT_ID];
|
||||
} fc_registry_entry_t;
|
||||
|
||||
/* Global registry — managed by framecache.c */
|
||||
extern fc_registry_entry_t g_registry[FC_MAX_SLOTS];
|
||||
extern int g_registry_count;
|
||||
|
||||
void registry_add(struct fc_slot *slot);
|
||||
void registry_remove(const char *slot_id);
|
||||
struct fc_slot *registry_find(const char *slot_id);
|
||||
void registry_write_json(void); /* writes /dev/shm/framecache/registry.json */
|
||||
233
services/framecache/src/slot.c
Normal file
233
services/framecache/src/slot.c
Normal file
|
|
@ -0,0 +1,233 @@
|
|||
/**
|
||||
* slot.c — Framecache slot lifecycle: create, destroy, open.
|
||||
*/
|
||||
#include "slot.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <time.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#define SHM_DIR "/dev/shm/framecache"
|
||||
#define SEM_PREFIX "/framecache-"
|
||||
#define SEM_SUFFIX "-write"
|
||||
|
||||
/* Internal handle used by both server (writer) and client (reader) */
|
||||
struct fc_slot {
|
||||
int shm_fd;
|
||||
void *base;
|
||||
size_t shm_size;
|
||||
sem_t *sem;
|
||||
char slot_id[FC_MAX_SLOT_ID];
|
||||
char shm_path[128];
|
||||
char sem_name[128];
|
||||
};
|
||||
|
||||
/* ── helpers ─────────────────────────────────────────────────────────── */
|
||||
|
||||
static void build_paths(const char *slot_id,
|
||||
char *shm_path, size_t sp_len,
|
||||
char *sem_name, size_t sn_len)
|
||||
{
|
||||
snprintf(shm_path, sp_len, "%s/%s", SHM_DIR, slot_id);
|
||||
snprintf(sem_name, sn_len, "%s%s%s", SEM_PREFIX, slot_id, SEM_SUFFIX);
|
||||
}
|
||||
|
||||
/* ── server-side: create / destroy ───────────────────────────────────── */
|
||||
|
||||
/**
|
||||
* Create a new slot. Allocates and initialises the shm region.
|
||||
* Returns handle on success, NULL on error (errno set).
|
||||
*/
|
||||
struct fc_slot *fc_slot_create(const char *slot_id,
|
||||
uint32_t width, uint32_t height,
|
||||
uint32_t fps_num, uint32_t fps_den,
|
||||
uint32_t pixel_format,
|
||||
const char *source_type)
|
||||
{
|
||||
char shm_path[128], sem_name[128];
|
||||
build_paths(slot_id, shm_path, sizeof shm_path, sem_name, sizeof sem_name);
|
||||
|
||||
uint32_t frame_size = width * height * 2; /* UYVY422 */
|
||||
size_t total = fc_slot_shm_size(frame_size);
|
||||
|
||||
/* Ensure directory exists */
|
||||
mkdir(SHM_DIR, 0755);
|
||||
|
||||
/* Create shm file */
|
||||
int fd = open(shm_path, O_RDWR | O_CREAT | O_TRUNC, 0666);
|
||||
if (fd < 0) {
|
||||
perror("[framecache] open shm");
|
||||
return NULL;
|
||||
}
|
||||
if (ftruncate(fd, (off_t)total) < 0) {
|
||||
perror("[framecache] ftruncate");
|
||||
close(fd);
|
||||
unlink(shm_path);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *base = mmap(NULL, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
||||
if (base == MAP_FAILED) {
|
||||
perror("[framecache] mmap");
|
||||
close(fd);
|
||||
unlink(shm_path);
|
||||
return NULL;
|
||||
}
|
||||
memset(base, 0, total);
|
||||
|
||||
/* Initialise header */
|
||||
fc_header_t *hdr = (fc_header_t *)base;
|
||||
hdr->magic = FC_MAGIC;
|
||||
hdr->version = FC_VERSION;
|
||||
hdr->width = width;
|
||||
hdr->height = height;
|
||||
hdr->fps_num = fps_num;
|
||||
hdr->fps_den = fps_den;
|
||||
hdr->pixel_format = pixel_format;
|
||||
hdr->frame_size = frame_size;
|
||||
hdr->ring_depth = FC_RING_DEPTH;
|
||||
atomic_store(&hdr->write_cursor, 0);
|
||||
atomic_store(&hdr->dropped_frames, 0);
|
||||
strncpy(hdr->source_type, source_type ? source_type : "unknown",
|
||||
sizeof hdr->source_type - 1);
|
||||
strncpy(hdr->slot_id, slot_id, sizeof hdr->slot_id - 1);
|
||||
|
||||
/* Create semaphore */
|
||||
sem_unlink(sem_name); /* remove stale */
|
||||
sem_t *sem = sem_open(sem_name, O_CREAT | O_EXCL, 0666, 0);
|
||||
if (sem == SEM_FAILED) {
|
||||
perror("[framecache] sem_open");
|
||||
munmap(base, total);
|
||||
close(fd);
|
||||
unlink(shm_path);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
struct fc_slot *s = calloc(1, sizeof *s);
|
||||
if (!s) {
|
||||
sem_close(sem); sem_unlink(sem_name);
|
||||
munmap(base, total);
|
||||
close(fd);
|
||||
unlink(shm_path);
|
||||
return NULL;
|
||||
}
|
||||
s->shm_fd = fd;
|
||||
s->base = base;
|
||||
s->shm_size = total;
|
||||
s->sem = sem;
|
||||
strncpy(s->slot_id, slot_id, sizeof s->slot_id - 1);
|
||||
strncpy(s->shm_path, shm_path, sizeof s->shm_path - 1);
|
||||
strncpy(s->sem_name, sem_name, sizeof s->sem_name - 1);
|
||||
|
||||
fprintf(stderr, "[framecache] slot created: %s (%ux%u %.2ffps %zuMB)\n",
|
||||
slot_id, width, height,
|
||||
fps_den ? (double)fps_num / fps_den : 0.0,
|
||||
total / 1024 / 1024);
|
||||
return s;
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroy a slot: unmap, close fd, delete files, free handle.
|
||||
*/
|
||||
void fc_slot_destroy(struct fc_slot *s)
|
||||
{
|
||||
if (!s) return;
|
||||
sem_close(s->sem);
|
||||
sem_unlink(s->sem_name);
|
||||
munmap(s->base, s->shm_size);
|
||||
close(s->shm_fd);
|
||||
unlink(s->shm_path);
|
||||
fprintf(stderr, "[framecache] slot destroyed: %s\n", s->slot_id);
|
||||
free(s);
|
||||
}
|
||||
|
||||
/* ── writer: called by ingest bridges ───────────────────────────────── */
|
||||
|
||||
/**
|
||||
* Write one frame into the ring. Never blocks — advances write_cursor
|
||||
* atomically and posts the semaphore. Slow consumers will be skipped.
|
||||
*/
|
||||
void fc_slot_write_frame(struct fc_slot *s,
|
||||
const uint8_t *data, uint32_t size,
|
||||
uint64_t pts_us)
|
||||
{
|
||||
fc_header_t *hdr = (fc_header_t *)s->base;
|
||||
uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed);
|
||||
fc_frame_t *frame = fc_frame_at(s->base, hdr->frame_size, cur);
|
||||
|
||||
frame->pts_us = pts_us;
|
||||
frame->wall_us = (uint64_t)({ struct timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000; });
|
||||
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
|
||||
memcpy(frame->data, data, frame->size);
|
||||
|
||||
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
|
||||
sem_post(s->sem);
|
||||
}
|
||||
|
||||
/* Accessors used by HTTP API */
|
||||
fc_header_t *fc_slot_header(struct fc_slot *s) { return (fc_header_t *)s->base; }
|
||||
const char *fc_slot_id(struct fc_slot *s) { return s->slot_id; }
|
||||
const char *fc_slot_shm_path(struct fc_slot *s) { return s->shm_path; }
|
||||
const char *fc_slot_sem_name(struct fc_slot *s) { return s->sem_name; }
|
||||
|
||||
/* ── client-side open / read / close (also used by capture-manager) ── */
|
||||
|
||||
/**
|
||||
* Open an existing slot for reading.
|
||||
* Returns NULL if slot not found or header magic mismatch.
|
||||
*/
|
||||
struct fc_slot *fc_slot_open(const char *slot_id)
|
||||
{
|
||||
char shm_path[128], sem_name[128];
|
||||
build_paths(slot_id, shm_path, sizeof shm_path, sem_name, sizeof sem_name);
|
||||
|
||||
int fd = open(shm_path, O_RDONLY);
|
||||
if (fd < 0) return NULL;
|
||||
|
||||
/* Read header first to get frame_size */
|
||||
fc_header_t tmp_hdr;
|
||||
if (pread(fd, &tmp_hdr, sizeof tmp_hdr, 0) != sizeof tmp_hdr) {
|
||||
close(fd); return NULL;
|
||||
}
|
||||
if (tmp_hdr.magic != FC_MAGIC) {
|
||||
close(fd); return NULL;
|
||||
}
|
||||
size_t total = fc_slot_shm_size(tmp_hdr.frame_size);
|
||||
|
||||
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
|
||||
if (base == MAP_FAILED) { close(fd); return NULL; }
|
||||
|
||||
sem_t *sem = sem_open(sem_name, 0);
|
||||
if (sem == SEM_FAILED) { munmap(base, total); close(fd); return NULL; }
|
||||
|
||||
struct fc_slot *s = calloc(1, sizeof *s);
|
||||
if (!s) { sem_close(sem); munmap(base, total); close(fd); return NULL; }
|
||||
s->shm_fd = fd;
|
||||
s->base = base;
|
||||
s->shm_size = total;
|
||||
s->sem = sem;
|
||||
strncpy(s->slot_id, slot_id, sizeof s->slot_id - 1);
|
||||
strncpy(s->shm_path, shm_path, sizeof s->shm_path - 1);
|
||||
strncpy(s->sem_name, sem_name, sizeof s->sem_name - 1);
|
||||
return s;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a client-side slot handle. Does not destroy the slot.
|
||||
*/
|
||||
void fc_slot_close(struct fc_slot *s)
|
||||
{
|
||||
if (!s) return;
|
||||
sem_close(s->sem);
|
||||
munmap(s->base, s->shm_size);
|
||||
close(s->shm_fd);
|
||||
free(s);
|
||||
}
|
||||
76
services/framecache/src/slot.h
Normal file
76
services/framecache/src/slot.h
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
/**
|
||||
* slot.h — Framecache shared memory slot definitions.
|
||||
*
|
||||
* Layout per slot (/dev/shm/framecache/<slot_id>):
|
||||
* [fc_header_t — 4KB aligned]
|
||||
* [fc_frame_t × ring_depth — each FC_FRAME_HDR_SIZE + frame_size bytes]
|
||||
*
|
||||
* Writer advances write_cursor atomically and posts the named semaphore.
|
||||
* Each consumer tracks its own read_cursor independently — writer never blocks.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdatomic.h>
|
||||
#include <semaphore.h>
|
||||
|
||||
#define FC_MAGIC 0x46524D43u /* "FRMC" */
|
||||
#define FC_VERSION 1u
|
||||
#define FC_RING_DEPTH 120u /* ~2s at 59.94fps */
|
||||
#define FC_HEADER_SIZE 4096u /* 4KB header block */
|
||||
#define FC_FRAME_HDR_SIZE 24u /* pts_us(8) + wall_us(8) + size(4) + pad(4) */
|
||||
#define FC_MAX_SLOT_ID 64u
|
||||
|
||||
/* Pixel format codes */
|
||||
#define FC_PIX_UYVY422 0u
|
||||
|
||||
typedef struct {
|
||||
uint32_t magic; /* FC_MAGIC */
|
||||
uint32_t version; /* FC_VERSION */
|
||||
uint32_t width;
|
||||
uint32_t height;
|
||||
uint32_t fps_num;
|
||||
uint32_t fps_den;
|
||||
uint32_t pixel_format; /* FC_PIX_UYVY422 */
|
||||
uint32_t frame_size; /* width * height * 2 */
|
||||
uint32_t ring_depth; /* FC_RING_DEPTH */
|
||||
uint32_t _reserved;
|
||||
_Atomic uint64_t write_cursor; /* monotonically increasing frame index */
|
||||
_Atomic uint64_t dropped_frames;
|
||||
char source_type[32]; /* "deltacast" | "blackmagic" | "srt" | "rtmp" */
|
||||
char slot_id[FC_MAX_SLOT_ID];
|
||||
uint8_t _pad[FC_HEADER_SIZE - 144];
|
||||
} fc_header_t;
|
||||
|
||||
/* Per-frame metadata + data (variable length — use fc_frame_at() accessor) */
|
||||
typedef struct {
|
||||
uint64_t pts_us;
|
||||
uint64_t wall_us;
|
||||
uint32_t size;
|
||||
uint32_t _pad;
|
||||
uint8_t data[]; /* frame_size bytes */
|
||||
} fc_frame_t;
|
||||
|
||||
/* Compile-time size check */
|
||||
// _Static_assert(sizeof(fc_header_t) == FC_HEADER_SIZE,
|
||||
// "fc_header_t must be exactly FC_HEADER_SIZE bytes");
|
||||
_Static_assert(sizeof(fc_frame_t) == FC_FRAME_HDR_SIZE,
|
||||
"fc_frame_t header must be exactly FC_FRAME_HDR_SIZE bytes");
|
||||
|
||||
/**
|
||||
* Compute total shm size for a slot given frame_size.
|
||||
* = FC_HEADER_SIZE + ring_depth * (FC_FRAME_HDR_SIZE + frame_size)
|
||||
*/
|
||||
static inline size_t fc_slot_shm_size(uint32_t frame_size) {
|
||||
return (size_t)FC_HEADER_SIZE
|
||||
+ (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + frame_size);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return pointer to frame at ring index idx within a mapped shm base.
|
||||
*/
|
||||
static inline fc_frame_t *fc_frame_at(void *base, uint32_t frame_size, uint64_t idx) {
|
||||
uint8_t *frames = (uint8_t *)base + FC_HEADER_SIZE;
|
||||
return (fc_frame_t *)(frames + (idx % FC_RING_DEPTH)
|
||||
* ((size_t)FC_FRAME_HDR_SIZE + frame_size));
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
import { randomBytes, createHash } from 'node:crypto';
|
||||
import { randomBytes, createHash, timingSafeEqual } from 'node:crypto';
|
||||
|
||||
const PREFIX = 'dfl_';
|
||||
|
||||
|
|
@ -10,6 +10,14 @@ export function hashToken(token) {
|
|||
return createHash('sha256').update(token).digest('hex');
|
||||
}
|
||||
|
||||
export function compareTokens(tokenA, tokenB) {
|
||||
if (!tokenA || !tokenB) return false;
|
||||
const a = Buffer.from(tokenA);
|
||||
const b = Buffer.from(tokenB);
|
||||
if (a.length !== b.length) return false;
|
||||
return timingSafeEqual(a, b);
|
||||
}
|
||||
|
||||
export function parseBearer(authorizationHeader) {
|
||||
if (!authorizationHeader || typeof authorizationHeader !== 'string') return null;
|
||||
const m = authorizationHeader.match(/^Bearer\s+(\S+)$/i);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,7 @@
|
|||
-- Add 'starting' and 'stopping' to recorder_schedules status check constraint
|
||||
|
||||
ALTER TABLE recorder_schedules DROP CONSTRAINT recorder_schedules_status_check;
|
||||
|
||||
ALTER TABLE recorder_schedules
|
||||
ADD CONSTRAINT recorder_schedules_status_check
|
||||
CHECK (status IN ('pending','running','completed','failed','cancelled','starting','stopping'));
|
||||
|
|
@ -1,8 +1,9 @@
|
|||
import express from 'express';
|
||||
import pool from '../db/pool.js';
|
||||
import { requireAuth } from '../middleware/auth.js';
|
||||
|
||||
const router = express.Router();
|
||||
// No session auth — called from AMPP Script Task inside broadcast network
|
||||
// Protected by requireAuth — AMPP Script Task must use an API token (Bearer Auth).
|
||||
|
||||
/**
|
||||
* GET /api/v1/ampp/folder-for/:filename
|
||||
|
|
@ -14,7 +15,7 @@ const router = express.Router();
|
|||
* 200: { folder_id: "abc123" }
|
||||
* 404: { error: "..." } (file not uploaded through Dragon-Wind — handle gracefully)
|
||||
*/
|
||||
router.get('/folder-for/:filename', async (req, res, next) => {
|
||||
router.get('/folder-for/:filename', requireAuth, async (req, res, next) => {
|
||||
try {
|
||||
const { filename } = req.params;
|
||||
const result = await pool.query(
|
||||
|
|
|
|||
|
|
@ -858,65 +858,9 @@ router.get('/:id/live-path', async (req, res, next) => {
|
|||
// - ETag + Last-Modified for conditional requests (304 on repeat visits)
|
||||
// - Cache-Control: private, max-age=3600 so the browser caches segments
|
||||
// and doesn't re-fetch them on every seek within a session
|
||||
// Issue #143 — RustFS returns empty bodies for ranged GETs whose start offset
|
||||
// is past ~5.9 MB on single-file proxy MP4s. Confirmed via direct S3 probe:
|
||||
// HEAD reports correct size, full GET (`bytes=0-`) works perfectly, but
|
||||
// `bytes=8179166-` returns 206 + the right Content-Range header and a zero-
|
||||
// byte body. A streaming GET from 0 reads cleanly *through* the broken zone.
|
||||
//
|
||||
// Workaround until the proxy worker emits HLS (planned v1.2.1): stream the
|
||||
// proxy from offset 0, skip bytes the client didn't ask for, stop after the
|
||||
// requested end. Browser sees a normal 206 + Content-Range. Mem stays flat;
|
||||
// extra RustFS-to-mam-api bandwidth = (end+1 - actual-range) per seek.
|
||||
//
|
||||
// Small head-of-file ranges below RUSTFS_RANGE_SAFE_START are handled by a
|
||||
// direct ranged GET — saves the streaming-from-0 cost on the common case of
|
||||
// initial moov + first-segment fetch.
|
||||
|
||||
async function* stitchedS3Stream(key, startByte, endByte) {
|
||||
// Yields buffers covering exactly [startByte, endByte] inclusive.
|
||||
//
|
||||
// RustFS only mis-serves a ranged GET when the *start* offset of the
|
||||
// request is past ~5.8 MB. So we pull the object in 4 MB windows whose
|
||||
// START offsets always stay below the broken threshold:
|
||||
// - We anchor every chunk's start at a multiple of RUSTFS_SAFE_CHUNK
|
||||
// (0, 4 MB, 8 MB, …).
|
||||
// - Wait — that puts later starts past the threshold.
|
||||
// Instead: skip directly to the chunk containing `startByte`, but request
|
||||
// it as `bytes=anchorStart-end` where anchorStart < threshold. Since the
|
||||
// bug only bites when the *request start* offset is large, we never issue
|
||||
// a single GET whose Range start is past the broken zone — we instead
|
||||
// exploit that a low-offset GET that *continues past* the threshold reads
|
||||
// cleanly (confirmed by the bytes=0- full-GET probe).
|
||||
//
|
||||
// Practically: one GET from 0 that streams up through endByte, dropping
|
||||
// the bytes below startByte as they arrive. Memory stays flat; we pay
|
||||
// (endByte+1) bytes of RustFS-to-mam-api bandwidth per request.
|
||||
const res = await s3Client.send(new GetObjectCommand({
|
||||
Bucket: getS3Bucket(),
|
||||
Key: key,
|
||||
Range: `bytes=0-${endByte}`,
|
||||
}));
|
||||
|
||||
let consumed = 0; // bytes seen so far from S3
|
||||
let totalEmitted = 0;
|
||||
for await (const buf of res.Body) {
|
||||
const bufStart = consumed; // file offset of buf[0]
|
||||
const bufEnd = consumed + buf.length - 1;
|
||||
consumed += buf.length;
|
||||
if (bufEnd < startByte) continue; // entirely before window
|
||||
const sliceFrom = Math.max(0, startByte - bufStart);
|
||||
const sliceTo = Math.min(buf.length, endByte - bufStart + 1);
|
||||
if (sliceTo > sliceFrom) {
|
||||
yield buf.subarray(sliceFrom, sliceTo);
|
||||
totalEmitted += sliceTo - sliceFrom;
|
||||
}
|
||||
if (bufEnd >= endByte) break;
|
||||
}
|
||||
if (totalEmitted === 0) {
|
||||
throw new Error(`RustFS returned empty body for ${key} bytes=0-${endByte}`);
|
||||
}
|
||||
}
|
||||
// RustFS issue #143 (empty body on ranged GETs past ~5.9 MB) was fixed in
|
||||
// RustFS 1.0.0-alpha.94 (PR #2493). Standard ranged GETs used throughout.
|
||||
|
||||
router.get('/:id/video', async (req, res, next) => {
|
||||
try {
|
||||
|
|
@ -997,39 +941,11 @@ router.get('/:id/video', async (req, res, next) => {
|
|||
if (etag) headers['ETag'] = etag;
|
||||
if (lastModified) headers['Last-Modified'] = lastModified.toUTCString();
|
||||
|
||||
// For small head-of-file ranges (entirely below the broken threshold)
|
||||
// a direct ranged GET works and saves the streaming-from-0 cost.
|
||||
const RUSTFS_RANGE_SAFE_START = parseInt(process.env.RUSTFS_RANGE_SAFE_START || String(5_500_000), 10);
|
||||
if (start < RUSTFS_RANGE_SAFE_START && end < RUSTFS_RANGE_SAFE_START) {
|
||||
const s3Res = await s3Client.send(new GetObjectCommand({
|
||||
Bucket: getS3Bucket(), Key: key, Range: `bytes=${start}-${end}`,
|
||||
}));
|
||||
res.writeHead(206, headers);
|
||||
s3Res.Body.pipe(res);
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise: stream from offset 0, drop bytes below `start`, stop at
|
||||
// `end`. Browser sees a normal 206; mam-api stays memory-flat.
|
||||
const s3Res = await s3Client.send(new GetObjectCommand({
|
||||
Bucket: getS3Bucket(), Key: key, Range: `bytes=${start}-${end}`,
|
||||
}));
|
||||
res.writeHead(206, headers);
|
||||
try {
|
||||
for await (const buf of stitchedS3Stream(key, start, end)) {
|
||||
// res.write returns false when backpressure builds — pause and wait.
|
||||
if (!res.write(buf)) {
|
||||
await new Promise(r => res.once('drain', r));
|
||||
}
|
||||
if (res.destroyed) return;
|
||||
}
|
||||
res.end();
|
||||
} catch (err) {
|
||||
console.error(`[video] stitch failed for ${key}:`, err.message);
|
||||
if (!res.headersSent) {
|
||||
res.writeHead(500, { 'Content-Type': 'text/plain', 'Cache-Control': 'no-store' });
|
||||
res.end('Upstream storage error');
|
||||
} else {
|
||||
res.destroy(err);
|
||||
}
|
||||
}
|
||||
s3Res.Body.pipe(res);
|
||||
} catch (err) { next(err); }
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ async function requireRecorderEdit(req, res, next) {
|
|||
const SIDECAR_PORT_BASE = 7438;
|
||||
|
||||
// Docker API helper function
|
||||
function dockerApi(method, path, body = null) {
|
||||
function dockerApi(method, path, body = null, timeoutMs = 10000) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const options = {
|
||||
socketPath: '/var/run/docker.sock',
|
||||
|
|
@ -60,9 +60,9 @@ function dockerApi(method, path, body = null) {
|
|||
});
|
||||
});
|
||||
req.on('error', reject);
|
||||
// Add 10-second timeout to prevent indefinite hangs if Docker daemon is unresponsive
|
||||
req.setTimeout(10000, () => {
|
||||
req.destroy(new Error('Docker API timeout after 10s'));
|
||||
// Use parameterizable timeout to prevent indefinite hangs if Docker daemon is unresponsive
|
||||
req.setTimeout(timeoutMs, () => {
|
||||
req.destroy(new Error(`Docker API timeout after ${timeoutMs/1000}s`));
|
||||
});
|
||||
if (body) req.write(JSON.stringify(body));
|
||||
req.end();
|
||||
|
|
@ -796,13 +796,16 @@ router.post('/:id/stop', requireRecorderEdit, async (req, res, next) => {
|
|||
const containerId = recorder.container_id;
|
||||
(async () => {
|
||||
try {
|
||||
const stopRes = await dockerApi('POST', `/containers/${containerId}/stop?t=180`);
|
||||
const stopRes = await dockerApi('POST', `/containers/${containerId}/stop?t=180`, null, 185000);
|
||||
if (stopRes.status !== 404) {
|
||||
await waitForFinalize(recorder);
|
||||
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[recorders] failed local background stop:', e.message);
|
||||
// Attempt finalize and cleanup even if stop call timed out
|
||||
await waitForFinalize(recorder).catch(() => {});
|
||||
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
|
@ -990,17 +993,23 @@ router.post('/probe', async (req, res) => {
|
|||
|
||||
// Validate URL up-front so we don't even let the capture service see junk.
|
||||
let parsed = null;
|
||||
let proto = '';
|
||||
if (url) {
|
||||
try { parsed = new URL(url); }
|
||||
catch { return res.status(400).json({ error: 'Invalid URL' }); }
|
||||
const proto = (parsed.protocol || '').replace(':', '').toLowerCase();
|
||||
proto = (parsed.protocol || '').replace(':', '').toLowerCase();
|
||||
if (!ALLOWED_PROBE_SCHEMES.has(proto)) {
|
||||
return res.status(400).json({ error: `Scheme "${proto}" is not permitted for probe (#104)` });
|
||||
}
|
||||
// Non-admin users can only probe public hostnames. Admins may probe LAN.
|
||||
if (!isAdmin(req) && isPrivateOrLoopback(parsed.hostname)) {
|
||||
return res.status(403).json({ error: 'Probe target must be a public host (#104)' });
|
||||
}
|
||||
// Non-admin users can only probe public hostnames. Admins may probe LAN.
|
||||
if (!isAdmin(req) && isPrivateOrLoopback(parsed.hostname)) {
|
||||
return res.status(403).json({ error: 'Probe target must be a public host (#104)' });
|
||||
}
|
||||
|
||||
// Probe target should not be mam-api itself.
|
||||
if (parsed.hostname === 'mam-api' || parsed.hostname === 'localhost' || parsed.hostname === '127.0.0.1') {
|
||||
return res.status(403).json({ error: 'Internal probe target is not permitted' });
|
||||
}
|
||||
}
|
||||
|
||||
// Try the capture service first (5s timeout)
|
||||
|
|
@ -1026,7 +1035,6 @@ router.post('/probe', async (req, res) => {
|
|||
}
|
||||
|
||||
const host = parsed.hostname;
|
||||
const proto = (parsed.protocol || '').replace(':', '').toLowerCase();
|
||||
const isUdp = proto === 'srt' || source_type === 'srt';
|
||||
const port = parseInt(parsed.port, 10) || (isUdp ? 9000 : 1935);
|
||||
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ async function probeGrowingPath(path) {
|
|||
|
||||
// df -P returns POSIX-portable output: "Filesystem 1024-blocks Used Available Capacity Mounted-on"
|
||||
try {
|
||||
const { stdout } = await exec(`df -PB1 ${JSON.stringify(path)}`, { timeout: 3000 });
|
||||
const { stdout } = await exec(`df -PB1 -- ${JSON.stringify(path)}`, { timeout: 3000 });
|
||||
const lines = stdout.trim().split('\n');
|
||||
if (lines.length >= 2) {
|
||||
const cols = lines[1].split(/\s+/);
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ router.post('/', async (req, res, next) => {
|
|||
`INSERT INTO users (username, password_hash, display_name, role)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING id, username, display_name, role, created_at`,
|
||||
[username.trim(), hash, display_name || username.trim(), role || 'admin']
|
||||
[username.trim(), hash, display_name || username.trim(), role || 'viewer']
|
||||
);
|
||||
res.status(201).json(rows[0]);
|
||||
} catch (err) {
|
||||
|
|
|
|||
|
|
@ -137,7 +137,11 @@ async function tick() {
|
|||
|
||||
// Orphaned live assets: recorder stopped but asset still 'live'.
|
||||
// Happens when the capture sidecar crashes before finalize() runs.
|
||||
// Mark error immediately so the library doesn't show "Recording" forever.
|
||||
// Grace window is measured from when the RECORDER was last updated
|
||||
// (i.e. when it transitioned to stopped), not from asset creation.
|
||||
// This prevents a race where the scheduler fires before the capture
|
||||
// container's finalize POST lands (can take 30-60s on large files).
|
||||
const ORPHAN_GRACE_SECONDS = parseInt(process.env.ORPHAN_GRACE_SECONDS || '120', 10);
|
||||
const orphanResult = await client.query(
|
||||
`UPDATE assets a
|
||||
SET status = 'error', updated_at = NOW()
|
||||
|
|
@ -145,7 +149,9 @@ async function tick() {
|
|||
WHERE a.status = 'live'
|
||||
AND a.display_name = r.current_session_id
|
||||
AND r.status = 'stopped'
|
||||
RETURNING a.id, a.display_name`
|
||||
AND r.updated_at < NOW() - ($1 || ' seconds')::INTERVAL
|
||||
RETURNING a.id, a.display_name`,
|
||||
[ORPHAN_GRACE_SECONDS]
|
||||
);
|
||||
if (orphanResult.rows.length > 0) {
|
||||
for (const row of orphanResult.rows) {
|
||||
|
|
|
|||
|
|
@ -71,13 +71,159 @@ const DC_BRIDGE_BIN = process.env.DELTACAST_BRIDGE_BIN || 'deltacast-bridge';
|
|||
const DC_PORTS_CSV = process.env.DELTACAST_PORTS || '0,1,2,3,4,5,6,7';
|
||||
const DC_BOARD = process.env.DELTACAST_BOARD || '0';
|
||||
|
||||
// Framecache URL — passed to all bridge processes so they can register slots.
|
||||
// Set FC_URL in .env.worker (default: http://framecache:7435 within the
|
||||
// wild-dragon-worker Docker network).
|
||||
const FC_URL = process.env.FC_URL || 'http://framecache:7435';
|
||||
// Node identity for framecache slot IDs (e.g. "decklink-zampp3-0").
|
||||
// Set NODE_NAME in .env.worker so slot IDs are stable across restarts.
|
||||
const FC_NODE_ID = process.env.NODE_NAME || process.env.HOSTNAME || 'local';
|
||||
|
||||
let _dcBridge = null; // ChildProcess | null
|
||||
let _dcSidecarCount = 0; // active deltacast sidecars on this node
|
||||
// Map containerId -> sourceType so stop() can decrement the deltacast counter.
|
||||
const _containerSourceType = new Map();
|
||||
// port -> fmt JSON from bridge stderr (inject into sidecar env)
|
||||
// port -> fmt JSON from bridge stderr (inject into sidecar env + slot_id)
|
||||
const _dcPortFmt = new Map();
|
||||
|
||||
// ── Network ingest ────────────────────────────────────────────────────────
|
||||
// One net_ingest process per active network recorder (SRT/RTMP).
|
||||
// Decodes the stream to raw UYVY422 and writes into a framecache slot so
|
||||
// capture-manager can use fc_pipe — the same consumer path as SDI sources.
|
||||
const NET_INGEST_BIN = process.env.NET_INGEST_BIN || 'net_ingest';
|
||||
// containerId → ChildProcess for cleanup on sidecar stop
|
||||
const _netIngestProcs = new Map();
|
||||
|
||||
function startNetIngest(containerId, { sourceType, sourceUrl, listen, listenPort, streamKey,
|
||||
width = 1920, height = 1080,
|
||||
fpsNum = 30000, fpsDen = 1001 }) {
|
||||
const slotId = `net-${containerId}`;
|
||||
const args = [
|
||||
'--slot-id', slotId,
|
||||
'--fc-url', FC_URL,
|
||||
'--source-type', sourceType,
|
||||
'--width', String(width),
|
||||
'--height', String(height),
|
||||
'--fps-num', String(fpsNum),
|
||||
'--fps-den', String(fpsDen),
|
||||
];
|
||||
if (listen) {
|
||||
args.push('--listen');
|
||||
if (listenPort) args.push('--listen-port', String(listenPort));
|
||||
if (streamKey) args.push('--stream-key', streamKey);
|
||||
} else if (sourceUrl) {
|
||||
args.push('--url', sourceUrl);
|
||||
}
|
||||
|
||||
console.log(`[net-ingest:${slotId}] launching: ${NET_INGEST_BIN} ${args.join(' ')}`);
|
||||
const proc = spawn(NET_INGEST_BIN, args, {
|
||||
stdio: ['ignore', 'ignore', 'pipe'],
|
||||
env: { ...process.env, FC_URL },
|
||||
});
|
||||
proc.stderr.setEncoding('utf8');
|
||||
proc.stderr.on('data', chunk => {
|
||||
for (const line of chunk.split('\n')) {
|
||||
const t = line.trim();
|
||||
if (t) console.log(`[net-ingest:${slotId}] ${t}`);
|
||||
}
|
||||
});
|
||||
proc.on('error', err => console.error(`[net-ingest:${slotId}] spawn error: ${err.message}`));
|
||||
proc.on('exit', (c, s) => {
|
||||
console.log(`[net-ingest:${slotId}] exited code=${c} signal=${s}`);
|
||||
// The map key may have been remapped from the temp id to the real
|
||||
// containerId after spawn. Delete by PROCESS IDENTITY, not the captured
|
||||
// key, so the entry can't leak after an unexpected crash.
|
||||
for (const [key, entry] of _netIngestProcs) {
|
||||
if (entry.proc === proc) { _netIngestProcs.delete(key); break; }
|
||||
}
|
||||
});
|
||||
_netIngestProcs.set(containerId, { proc, slotId });
|
||||
return slotId;
|
||||
}
|
||||
|
||||
function stopNetIngest(containerId) {
|
||||
const entry = _netIngestProcs.get(containerId);
|
||||
if (!entry) return;
|
||||
console.log(`[net-ingest:${entry.slotId}] stopping`);
|
||||
try { entry.proc.kill('SIGTERM'); } catch (_) {}
|
||||
_netIngestProcs.delete(containerId);
|
||||
}
|
||||
|
||||
// ── DeckLink bridge ───────────────────────────────────────────────────────
|
||||
// One decklink-bridge process per node, managing all DeckLink devices.
|
||||
// Mirrors the deltacast-bridge singleton pattern.
|
||||
const DL_BRIDGE_BIN = process.env.DECKLINK_BRIDGE_BIN || 'decklink-bridge';
|
||||
const DL_AUDIO_DIR = process.env.DECKLINK_AUDIO_DIR || '/dev/shm/decklink';
|
||||
|
||||
let _dlBridge = null; // ChildProcess | null
|
||||
let _dlSidecarCount = 0;
|
||||
// device_idx -> fmt JSON from bridge stderr
|
||||
const _dlDevFmt = new Map();
|
||||
|
||||
function _dlBridgeRunning() {
|
||||
return _dlBridge !== null && _dlBridge.exitCode === null && _dlBridge.signalCode === null;
|
||||
}
|
||||
|
||||
function startDecklinkBridge(deviceIndices) {
|
||||
if (_dlBridgeRunning()) return;
|
||||
|
||||
try { require('fs').mkdirSync(DL_AUDIO_DIR, { recursive: true }); } catch (_) {}
|
||||
|
||||
const devCsv = Array.isArray(deviceIndices) ? deviceIndices.join(',') : String(deviceIndices || '0');
|
||||
const args = [
|
||||
'--devices', devCsv,
|
||||
'--fc-url', FC_URL,
|
||||
'--audio-pipe-dir', DL_AUDIO_DIR,
|
||||
];
|
||||
console.log(`[dl-bridge] launching: ${DL_BRIDGE_BIN} ${args.join(' ')}`);
|
||||
|
||||
const proc = spawn(DL_BRIDGE_BIN, args, {
|
||||
stdio: ['ignore', 'ignore', 'pipe'],
|
||||
detached: false,
|
||||
env: { ...process.env, NODE_ID: FC_NODE_ID, FC_URL },
|
||||
});
|
||||
|
||||
proc.stderr.setEncoding('utf8');
|
||||
proc.stderr.on('data', (chunk) => {
|
||||
for (const line of chunk.split('\n')) {
|
||||
const t = line.trim();
|
||||
if (!t) continue;
|
||||
if (t.startsWith('{')) {
|
||||
console.log('[dl-bridge] ' + t);
|
||||
try {
|
||||
const f = JSON.parse(t);
|
||||
if (typeof f.device === 'number') _dlDevFmt.set(f.device, f);
|
||||
} catch (_) {}
|
||||
} else {
|
||||
console.error('[dl-bridge] ' + t);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
proc.on('error', (err) => {
|
||||
console.error(`[dl-bridge] spawn error: ${err.message}`);
|
||||
_dlBridge = null;
|
||||
});
|
||||
proc.on('exit', (code, sig) => {
|
||||
console.error(`[dl-bridge] exited code=${code} signal=${sig}`);
|
||||
_dlBridge = null;
|
||||
});
|
||||
|
||||
_dlBridge = proc;
|
||||
console.log(`[dl-bridge] pid=${proc.pid} devices=${devCsv}`);
|
||||
}
|
||||
|
||||
function stopDecklinkBridge() {
|
||||
if (!_dlBridgeRunning()) return;
|
||||
console.log('[dl-bridge] stopping');
|
||||
try { _dlBridge.kill('SIGTERM'); } catch (_) {}
|
||||
const proc = _dlBridge;
|
||||
setTimeout(() => {
|
||||
try { if (proc.exitCode === null) proc.kill('SIGKILL'); } catch (_) {}
|
||||
}, 5000);
|
||||
_dlBridge = null;
|
||||
}
|
||||
|
||||
function _dcBridgeRunning() {
|
||||
return _dcBridge !== null && _dcBridge.exitCode === null && _dcBridge.signalCode === null;
|
||||
}
|
||||
|
|
@ -122,12 +268,14 @@ function startDeltacastBridge() {
|
|||
'--ports', DC_PORTS_CSV,
|
||||
'--video-pipe-dir', DC_PIPE_DIR,
|
||||
'--audio-pipe-dir', DC_PIPE_DIR,
|
||||
'--fc-url', FC_URL,
|
||||
];
|
||||
console.log(`[dc-bridge] launching: ${DC_BRIDGE_BIN} ${args.join(' ')}`);
|
||||
|
||||
const proc = spawn(DC_BRIDGE_BIN, args, {
|
||||
stdio: ['ignore', 'ignore', 'pipe'],
|
||||
detached: false,
|
||||
env: { ...process.env, FC_URL, NODE_ID: FC_NODE_ID },
|
||||
});
|
||||
|
||||
proc.stderr.setEncoding('utf8');
|
||||
|
|
@ -262,6 +410,9 @@ async function handleSidecarStart(body, res) {
|
|||
} = body;
|
||||
|
||||
const binds = [`${LIVE_DIR}:/live`];
|
||||
// Always mount /dev/shm so the sidecar can access framecache slots.
|
||||
if (fs.existsSync('/dev/shm')) binds.push('/dev/shm:/dev/shm');
|
||||
|
||||
if (sourceType === 'sdi') binds.unshift('/dev/blackmagic:/dev/blackmagic');
|
||||
if (sourceType === 'deltacast') {
|
||||
// Bind each /dev/deltacast* node that exists on the host into the container.
|
||||
|
|
@ -269,8 +420,6 @@ async function handleSidecarStart(body, res) {
|
|||
try {
|
||||
const dcEntries = fs.readdirSync('/dev').filter(n => /^deltacast\d+$/.test(n));
|
||||
for (const d of dcEntries) binds.push(`/dev/${d}:/dev/${d}`);
|
||||
// VideoMaster SDK needs the board IPC shared-memory segment mounted too.
|
||||
if (fs.existsSync('/dev/shm/deltacast')) binds.push('/dev/shm/deltacast:/dev/shm/deltacast');
|
||||
} catch (_) { /* /dev always exists */ }
|
||||
}
|
||||
|
||||
|
|
@ -308,34 +457,143 @@ async function handleSidecarStart(body, res) {
|
|||
HostConfig: hostConfig,
|
||||
};
|
||||
|
||||
// Always inject FC_URL so capture-manager can find the framecache service.
|
||||
sidecarEnv.push(`FC_URL=${FC_URL}`);
|
||||
|
||||
// Network sources (SRT/RTMP): launch net_ingest to decode stream into
|
||||
// a framecache slot, then inject FC_SLOT_ID so capture-manager reads
|
||||
// from the slot via fc_pipe (same path as SDI sources).
|
||||
if (sourceType === 'srt' || sourceType === 'rtmp') {
|
||||
const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14);
|
||||
let _netCfg = {};
|
||||
try { _netCfg = JSON.parse(_srcCfg); } catch (_) {}
|
||||
const _listen = !!(body.listen || _netCfg.listen);
|
||||
const _listenPort = body.listenPort || _netCfg.listenPort || 0;
|
||||
const _streamKey = body.streamKey || _netCfg.streamKey || 'stream';
|
||||
const _srcUrl = body.sourceUrl || _netCfg.url || '';
|
||||
// Width/height/fps from recorder config if available; defaults used otherwise.
|
||||
// net_ingest will auto-scale via ffmpeg -vf scale=iw:ih.
|
||||
const _w = _netCfg.width || 1920;
|
||||
const _h = _netCfg.height || 1080;
|
||||
const _fpsNum = _netCfg.fps_num || 30000;
|
||||
const _fpsDen = _netCfg.fps_den || 1001;
|
||||
|
||||
// containerId not known yet — we start net_ingest just before container
|
||||
// start and use a temporary slot ID based on a timestamp.
|
||||
const _tempId = `${sourceType}-${Date.now()}`;
|
||||
const _slotId = startNetIngest(_tempId, {
|
||||
sourceType: sourceType,
|
||||
sourceUrl: _srcUrl,
|
||||
listen: _listen,
|
||||
listenPort: _listenPort,
|
||||
streamKey: _streamKey,
|
||||
width: _w,
|
||||
height: _h,
|
||||
fpsNum: _fpsNum,
|
||||
fpsDen: _fpsDen,
|
||||
});
|
||||
sidecarEnv.push(`FC_SLOT_ID=${_slotId}`);
|
||||
hostConfig.IpcMode = 'host';
|
||||
// Store temp id so we can remap to real containerId on create success
|
||||
body._netIngestTempId = _tempId;
|
||||
}
|
||||
|
||||
// Deltacast: ensure the shared bridge daemon is running on the HOST before
|
||||
// starting the sidecar. The sidecar reads FIFOs produced by the bridge;
|
||||
// it does NOT open the board handle itself (no BufMngr.c:781 race).
|
||||
// starting the sidecar. The bridge writes frames to the framecache shm ring;
|
||||
// the sidecar reads via the consumer library (fc_client).
|
||||
if (sourceType === 'deltacast') {
|
||||
_dcSidecarCount++;
|
||||
startDeltacastBridge();
|
||||
// Inject per-port signal format so capture-manager uses real dimensions/fps
|
||||
const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14);
|
||||
let _portNum = NaN;
|
||||
try { _portNum = JSON.parse(_srcCfg).port; } catch (_) {}
|
||||
if (Number.isFinite(_portNum) && _dcPortFmt.has(_portNum)) {
|
||||
if (!Number.isFinite(_portNum)) _portNum = 0;
|
||||
|
||||
// FC_SLOT_ID is DETERMINISTIC — the deltacast-bridge builds it as
|
||||
// "deltacast-<board>-<port>" (both known here), so we construct it
|
||||
// directly and DO NOT wait for the bridge's async format JSON. This is
|
||||
// the fix for the cold-start race where _dcPortFmt was still empty on
|
||||
// first recorder start, silently falling back to the legacy FIFO path.
|
||||
const _slotId = `deltacast-${DC_BOARD}-${_portNum}`;
|
||||
sidecarEnv.push(`FC_SLOT_ID=${_slotId}`);
|
||||
|
||||
// Format (width/height/fps) is best-effort enrichment from the bridge's
|
||||
// stderr JSON if it has already arrived; capture-manager has sane
|
||||
// defaults and waits for the slot to appear regardless.
|
||||
if (_dcPortFmt.has(_portNum)) {
|
||||
const _fmt = _dcPortFmt.get(_portNum);
|
||||
const _fps = (_fmt.fps_den && _fmt.fps_den !== 1) ? `${_fmt.fps_num}/${_fmt.fps_den}` : String(_fmt.fps_num);
|
||||
sidecarEnv.push(`DELTACAST_VIDEO_SIZE=${_fmt.width}x${_fmt.height}`);
|
||||
sidecarEnv.push(`DELTACAST_FRAMERATE=${_fps}`);
|
||||
sidecarEnv.push(`DELTACAST_INTERLACED=${_fmt.interlaced ? '1' : '0'}`);
|
||||
console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} interlaced=${_fmt.interlaced}`);
|
||||
console.log(`[dc-bridge] port ${_portNum} fmt: ${_fmt.width}x${_fmt.height} ${_fps} slot=${_slotId}`);
|
||||
} else {
|
||||
console.log(`[dc-bridge] port ${_portNum} slot=${_slotId} (fmt not yet available — using defaults)`);
|
||||
}
|
||||
hostConfig.IpcMode = 'host';
|
||||
}
|
||||
|
||||
// DeckLink: ensure decklink-bridge is running on the HOST.
|
||||
if (sourceType === 'sdi' || sourceType === 'blackmagic') {
|
||||
_dlSidecarCount++;
|
||||
const _bmdDevices = [];
|
||||
try {
|
||||
const _bmdDir = '/dev/blackmagic';
|
||||
const _bmdEntries = fs.readdirSync(_bmdDir).filter(n => /^(dv|io)\d+$/.test(n));
|
||||
_bmdEntries.forEach((_, i) => _bmdDevices.push(i));
|
||||
} catch (_) { _bmdDevices.push(0); }
|
||||
startDecklinkBridge(_bmdDevices);
|
||||
|
||||
const _srcCfg = (env.find(e => e.startsWith('SOURCE_CONFIG=')) || '').slice(14);
|
||||
let _devIdx = NaN;
|
||||
try { _devIdx = JSON.parse(_srcCfg).device ?? JSON.parse(_srcCfg).index; } catch (_) {}
|
||||
if (!Number.isFinite(_devIdx)) _devIdx = parseInt((env.find(e => e.startsWith('BMD_DEVICE_INDEX=')) || '=0').split('=')[1]) || 0;
|
||||
|
||||
// FC_SLOT_ID is DETERMINISTIC — decklink-bridge builds it as
|
||||
// "decklink-<NODE_ID>-<device_idx>". Construct it directly (no wait on
|
||||
// async fmt JSON). FC_NODE_ID matches what node-agent passes to the
|
||||
// bridge via the NODE_ID env var.
|
||||
const _slotId = `decklink-${FC_NODE_ID}-${_devIdx}`;
|
||||
sidecarEnv.push(`FC_SLOT_ID=${_slotId}`);
|
||||
|
||||
if (_dlDevFmt.has(_devIdx)) {
|
||||
const _fmt = _dlDevFmt.get(_devIdx);
|
||||
const _fps = (_fmt.fps_den && _fmt.fps_den !== 1) ? `${_fmt.fps_num}/${_fmt.fps_den}` : String(_fmt.fps_num);
|
||||
sidecarEnv.push(`DELTACAST_VIDEO_SIZE=${_fmt.width}x${_fmt.height}`);
|
||||
sidecarEnv.push(`DELTACAST_FRAMERATE=${_fps}`);
|
||||
sidecarEnv.push(`DELTACAST_INTERLACED=${_fmt.interlaced ? '1' : '0'}`);
|
||||
console.log(`[dl-bridge] device ${_devIdx} fmt: ${_fmt.width}x${_fmt.height} ${_fps} slot=${_slotId}`);
|
||||
} else {
|
||||
console.log(`[dl-bridge] device ${_devIdx} slot=${_slotId} (fmt not yet available — using defaults)`);
|
||||
}
|
||||
hostConfig.IpcMode = 'host';
|
||||
}
|
||||
|
||||
// Single cleanup for ALL failure paths (create fail, start fail, throw):
|
||||
// decrements the right bridge counter (stopping the bridge when it hits 0)
|
||||
// AND stops any net_ingest started for this request. Previously only the
|
||||
// deltacast counter was decremented — blackmagic count and net_ingest leaked
|
||||
// on every failed start, eventually stranding the bridge / ingest forever.
|
||||
const _cleanupOnFailure = () => {
|
||||
if (sourceType === 'deltacast') {
|
||||
_dcSidecarCount--;
|
||||
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
|
||||
} else if (sourceType === 'sdi' || sourceType === 'blackmagic') {
|
||||
_dlSidecarCount--;
|
||||
if (_dlSidecarCount <= 0) { _dlSidecarCount = 0; stopDecklinkBridge(); }
|
||||
} else if (sourceType === 'srt' || sourceType === 'rtmp') {
|
||||
// net_ingest may be keyed by the temp id (create not yet succeeded) or
|
||||
// the real containerId (remapped). Stop whichever exists.
|
||||
if (body._netIngestTempId) stopNetIngest(body._netIngestTempId);
|
||||
if (containerId) stopNetIngest(containerId);
|
||||
}
|
||||
};
|
||||
|
||||
let containerId;
|
||||
try {
|
||||
const createRes = await dockerApi('POST', '/containers/create', spec);
|
||||
if (createRes.status !== 201) {
|
||||
if (sourceType === 'deltacast') {
|
||||
_dcSidecarCount--;
|
||||
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
|
||||
}
|
||||
_cleanupOnFailure();
|
||||
return jsonResponse(res, 502, { error: 'Failed to create container', details: createRes.data });
|
||||
}
|
||||
|
||||
|
|
@ -345,21 +603,25 @@ async function handleSidecarStart(body, res) {
|
|||
console.log(`[sidecar-start] ${containerId} image=${image} src=${sourceType} MAM_API_URL=${_u} token=${_tok}`);
|
||||
const startRes = await dockerApi('POST', `/containers/${containerId}/start`);
|
||||
if (startRes.status !== 204) {
|
||||
if (sourceType === 'deltacast') {
|
||||
_dcSidecarCount--;
|
||||
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
|
||||
}
|
||||
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
||||
_cleanupOnFailure();
|
||||
return jsonResponse(res, 502, { error: 'Failed to start container', details: startRes.data });
|
||||
}
|
||||
|
||||
if (sourceType === 'deltacast') _containerSourceType.set(containerId, 'deltacast');
|
||||
if (sourceType === 'sdi' || sourceType === 'blackmagic') _containerSourceType.set(containerId, 'blackmagic');
|
||||
if (sourceType === 'srt' || sourceType === 'rtmp') {
|
||||
_containerSourceType.set(containerId, sourceType);
|
||||
// Remap net_ingest from temp id to real containerId
|
||||
if (body._netIngestTempId && _netIngestProcs.has(body._netIngestTempId)) {
|
||||
const entry = _netIngestProcs.get(body._netIngestTempId);
|
||||
_netIngestProcs.delete(body._netIngestTempId);
|
||||
_netIngestProcs.set(containerId, entry);
|
||||
}
|
||||
}
|
||||
jsonResponse(res, 201, { containerId, capturePort });
|
||||
} catch (err) {
|
||||
if (sourceType === 'deltacast') {
|
||||
_dcSidecarCount--;
|
||||
if (_dcSidecarCount <= 0) { _dcSidecarCount = 0; stopDeltacastBridge(); }
|
||||
}
|
||||
_cleanupOnFailure();
|
||||
throw err;
|
||||
}
|
||||
} catch (err) {
|
||||
|
|
@ -399,16 +661,23 @@ async function handleSidecarStop(containerId, res) {
|
|||
console.log(`[sidecar-stop] ==== capture logs for ${containerId} ====\n${logs}\n[sidecar-stop] ==== end logs ====`);
|
||||
await dockerApi('DELETE', `/containers/${containerId}?force=true`).catch(() => {});
|
||||
|
||||
// Deltacast bridge lifecycle: decrement sidecar count; stop bridge when last.
|
||||
if (_containerSourceType.get(containerId) === 'deltacast') {
|
||||
_containerSourceType.delete(containerId);
|
||||
// Bridge lifecycle: decrement sidecar count; stop bridge when last sidecar stops.
|
||||
const _srcType = _containerSourceType.get(containerId);
|
||||
_containerSourceType.delete(containerId);
|
||||
if (_srcType === 'deltacast') {
|
||||
_dcSidecarCount--;
|
||||
if (_dcSidecarCount <= 0) {
|
||||
_dcSidecarCount = 0;
|
||||
stopDeltacastBridge();
|
||||
}
|
||||
} else {
|
||||
_containerSourceType.delete(containerId);
|
||||
} else if (_srcType === 'blackmagic') {
|
||||
_dlSidecarCount--;
|
||||
if (_dlSidecarCount <= 0) {
|
||||
_dlSidecarCount = 0;
|
||||
stopDecklinkBridge();
|
||||
}
|
||||
} else if (_srcType === 'srt' || _srcType === 'rtmp') {
|
||||
stopNetIngest(containerId);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[sidecar-stop] background cleanup failed for ${containerId}:`, err.message);
|
||||
|
|
@ -461,7 +730,15 @@ function checkAgentAuth(req) {
|
|||
if (!NODE_TOKEN) return true;
|
||||
const hdr = req.headers['authorization'] || '';
|
||||
const m = /^Bearer\s+(.+)$/i.exec(hdr);
|
||||
return !!m && m[1] === NODE_TOKEN;
|
||||
if (!m) return false;
|
||||
|
||||
const token = m[1];
|
||||
if (token.length !== NODE_TOKEN.length) return false;
|
||||
try {
|
||||
return crypto.timingSafeEqual(Buffer.from(token), Buffer.from(NODE_TOKEN));
|
||||
} catch (_) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Driver/SDK install ────────────────────────────────────────────────────
|
||||
|
|
|
|||
Binary file not shown.
|
|
@ -73,7 +73,19 @@ server {
|
|||
types { application/vnd.apple.mpegurl m3u8; video/mp2t ts; }
|
||||
add_header Cache-Control "no-store, no-cache, must-revalidate" always;
|
||||
add_header Pragma "no-cache" always;
|
||||
add_header Access-Control-Allow-Origin * always;
|
||||
# Tighten CORS: no wildcard.
|
||||
add_header Access-Control-Allow-Origin $http_origin always;
|
||||
add_header Access-Control-Allow-Credentials "true" always;
|
||||
if ($request_method = 'OPTIONS') {
|
||||
add_header Access-Control-Allow-Origin $http_origin;
|
||||
add_header Access-Control-Allow-Credentials "true";
|
||||
add_header Access-Control-Allow-Methods 'GET, OPTIONS';
|
||||
add_header Access-Control-Allow-Headers 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range';
|
||||
add_header Access-Control-Max-Age 1728000;
|
||||
add_header Content-Type 'text/plain; charset=utf-8';
|
||||
add_header Content-Length 0;
|
||||
return 204;
|
||||
}
|
||||
}
|
||||
|
||||
# Playout HLS preview — CasparCG sidecar writes to the media volume under
|
||||
|
|
@ -83,7 +95,19 @@ server {
|
|||
types { application/vnd.apple.mpegurl m3u8; video/mp2t ts; }
|
||||
add_header Cache-Control "no-store, no-cache, must-revalidate" always;
|
||||
add_header Pragma "no-cache" always;
|
||||
add_header Access-Control-Allow-Origin * always;
|
||||
# Tighten CORS: no wildcard.
|
||||
add_header Access-Control-Allow-Origin $http_origin always;
|
||||
add_header Access-Control-Allow-Credentials "true" always;
|
||||
if ($request_method = 'OPTIONS') {
|
||||
add_header Access-Control-Allow-Origin $http_origin;
|
||||
add_header Access-Control-Allow-Credentials "true";
|
||||
add_header Access-Control-Allow-Methods 'GET, OPTIONS';
|
||||
add_header Access-Control-Allow-Headers 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range';
|
||||
add_header Access-Control-Max-Age 1728000;
|
||||
add_header Content-Type 'text/plain; charset=utf-8';
|
||||
add_header Content-Length 0;
|
||||
return 204;
|
||||
}
|
||||
}
|
||||
|
||||
# API proxy - forward to mam-api service
|
||||
|
|
@ -133,6 +157,11 @@ server {
|
|||
try_files $uri $uri/ /index.html;
|
||||
expires -1;
|
||||
add_header Cache-Control "no-cache, no-store, must-revalidate";
|
||||
add_header X-Content-Type-Options "nosniff" always;
|
||||
add_header X-Frame-Options "SAMEORIGIN" always;
|
||||
add_header X-XSS-Protection "1; mode=block" always;
|
||||
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
|
||||
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; connect-src 'self' ws: wss:;" always;
|
||||
}
|
||||
|
||||
# Deny access to dotfiles
|
||||
|
|
|
|||
|
|
@ -123,7 +123,7 @@ function App() {
|
|||
switch (effectiveRoute) {
|
||||
case 'home': content = <Home navigate={navigate} />; break;
|
||||
case 'dashboard': content = <Dashboard navigate={navigate} />; break;
|
||||
case 'library': content = <Library navigate={navigate} onOpenAsset={setOpenAsset} openProject={openProject} onClearProject={() => setOpenProject(null)} />; break;
|
||||
case 'library': content = <Library navigate={navigate} onOpenAsset={setOpenAsset} openProject={openProject} onClearProject={() => setOpenProject(null)} onOpenProject={openProjectFromAnywhere} />; break;
|
||||
case 'projects': content = <Projects navigate={navigate} onOpenProject={openProjectFromAnywhere} />; break;
|
||||
case 'upload': content = <Upload navigate={navigate} />; break;
|
||||
case 'recorders': content = <Recorders navigate={navigate} onNew={() => setShowNewRecorder(true)} />; break;
|
||||
|
|
|
|||
|
|
@ -243,7 +243,11 @@ function AssetDetail({ asset, onClose }) {
|
|||
setDownloading(true);
|
||||
window.ZAMPP_API.fetch('/assets/' + assetId + '/hires')
|
||||
.then(function(r) {
|
||||
if (!r || !r.url) { window.alert('No hi-res source available for this asset.'); return; }
|
||||
if (!r || !r.url) {
|
||||
if (window.toast) window.toast.error('No hi-res source available for this asset.');
|
||||
else window.alert('No hi-res source available for this asset.');
|
||||
return;
|
||||
}
|
||||
const a = document.createElement('a');
|
||||
a.href = r.url;
|
||||
a.download = r.filename || (asset.name + '.' + (r.ext || 'mov'));
|
||||
|
|
@ -253,7 +257,10 @@ function AssetDetail({ asset, onClose }) {
|
|||
a.click();
|
||||
document.body.removeChild(a);
|
||||
})
|
||||
.catch(function(e) { window.alert('Download failed: ' + (e.message || 'unknown error')); })
|
||||
.catch(function(e) {
|
||||
if (window.toast) window.toast.error('Download failed: ' + (e.message || 'unknown error'));
|
||||
else window.alert('Download failed: ' + (e.message || 'unknown error'));
|
||||
})
|
||||
.finally(function() { setDownloading(false); });
|
||||
};
|
||||
|
||||
|
|
@ -279,7 +286,10 @@ function AssetDetail({ asset, onClose }) {
|
|||
}))) return;
|
||||
window.ZAMPP_API.fetch('/assets/' + assetId + '?hard=true', { method: 'DELETE' })
|
||||
.then(function() { onClose && onClose(); })
|
||||
.catch(function(e) { window.alert('Delete failed: ' + e.message); });
|
||||
.catch(function(e) {
|
||||
if (window.toast) window.toast.error('Delete failed: ' + e.message);
|
||||
else window.alert('Delete failed: ' + e.message);
|
||||
});
|
||||
};
|
||||
|
||||
const retryProcessing = function() {
|
||||
|
|
@ -287,9 +297,13 @@ function AssetDetail({ asset, onClose }) {
|
|||
setRetrying(true);
|
||||
window.ZAMPP_API.fetch('/assets/' + assetId + '/retry', { method: 'POST' })
|
||||
.then(function() {
|
||||
window.alert('Re-queued for processing. The proxy worker will pick it up shortly; refresh in a minute to see the player.');
|
||||
if (window.toast) window.toast.success('Re-queued for processing. The proxy worker will pick it up shortly; refresh in a minute to see the player.');
|
||||
else window.alert('Re-queued for processing. The proxy worker will pick it up shortly; refresh in a minute to see the player.');
|
||||
})
|
||||
.catch(function(e) {
|
||||
if (window.toast) window.toast.error('Retry failed: ' + (e.message || 'unknown error'));
|
||||
else window.alert('Retry failed: ' + (e.message || 'unknown error'));
|
||||
})
|
||||
.catch(function(e) { window.alert('Retry failed: ' + (e.message || 'unknown error')); })
|
||||
.finally(function() { setRetrying(false); });
|
||||
};
|
||||
|
||||
|
|
@ -298,16 +312,26 @@ function AssetDetail({ asset, onClose }) {
|
|||
setReprocessing(type);
|
||||
window.ZAMPP_API.fetch('/assets/' + assetId + '/reprocess?type=' + type, { method: 'POST' })
|
||||
.then(function() {
|
||||
window.alert((type === 'proxy' ? 'Proxy' : 'Thumbnail') + ' job queued. Refresh in a moment to see the result.');
|
||||
if (window.toast) window.toast.success((type === 'proxy' ? 'Proxy' : 'Thumbnail') + ' job queued. Refresh in a moment to see the result.');
|
||||
else window.alert((type === 'proxy' ? 'Proxy' : 'Thumbnail') + ' job queued. Refresh in a moment to see the result.');
|
||||
})
|
||||
.catch(function(e) {
|
||||
if (window.toast) window.toast.error('Reprocess failed: ' + (e.message || 'unknown error'));
|
||||
else window.alert('Reprocess failed: ' + (e.message || 'unknown error'));
|
||||
})
|
||||
.catch(function(e) { window.alert('Reprocess failed: ' + (e.message || 'unknown error')); })
|
||||
.finally(function() { setReprocessing(null); });
|
||||
};
|
||||
|
||||
const regenFilmstrip = function() {
|
||||
window.ZAMPP_API.fetch('/assets/' + assetId + '/reprocess?type=filmstrip', { method: 'POST' })
|
||||
.then(function() { window.alert('Filmstrip job queued: it will appear automatically when ready.'); })
|
||||
.catch(function(e) { window.alert('Failed to queue filmstrip: ' + (e.message || 'unknown error')); });
|
||||
.then(function() {
|
||||
if (window.toast) window.toast.success('Filmstrip job queued: it will appear automatically when ready.');
|
||||
else window.alert('Filmstrip job queued: it will appear automatically when ready.');
|
||||
})
|
||||
.catch(function(e) {
|
||||
if (window.toast) window.toast.error('Failed to queue filmstrip: ' + (e.message || 'unknown error'));
|
||||
else window.alert('Failed to queue filmstrip: ' + (e.message || 'unknown error'));
|
||||
});
|
||||
};
|
||||
|
||||
// Map a /assets/:id/comments row into the legacy shape the consumer
|
||||
|
|
@ -352,7 +376,8 @@ function AssetDetail({ asset, onClose }) {
|
|||
setComments(function(c) { return [...c, _normalizeComment(row)]; });
|
||||
})
|
||||
.catch(function(e) {
|
||||
window.alert('Could not post comment: ' + (e.message || 'unknown error'));
|
||||
if (window.toast) window.toast.error('Could not post comment: ' + (e.message || 'unknown error'));
|
||||
else window.alert('Could not post comment: ' + (e.message || 'unknown error'));
|
||||
setNewComment(text);
|
||||
});
|
||||
};
|
||||
|
|
@ -374,7 +399,10 @@ function AssetDetail({ asset, onClose }) {
|
|||
.then(function() {
|
||||
setComments(function(prev) { return prev.filter(x => x.id !== c.id); });
|
||||
})
|
||||
.catch(function(e) { window.alert('Delete failed: ' + e.message); });
|
||||
.catch(function(e) {
|
||||
if (window.toast) window.toast.error('Delete failed: ' + e.message);
|
||||
else window.alert('Delete failed: ' + e.message);
|
||||
});
|
||||
};
|
||||
|
||||
const visibleComments = comments.filter(function(c) { return showResolved || !c.resolved; });
|
||||
|
|
|
|||
|
|
@ -759,14 +759,6 @@ function RecorderRow({ recorder: initialRecorder, onRefresh }) {
|
|||
String(d % 60).padStart(2, '0');
|
||||
}, [isRec, elapsedSecs]);
|
||||
|
||||
// Show live fps when recording and signal is healthy; fall back to configured value.
|
||||
const displayFramerate = React.useMemo(() => {
|
||||
if (isRec && liveStatus && liveStatus.currentFps != null && liveStatus.currentFps > 0) {
|
||||
return Number(liveStatus.currentFps).toFixed(2) + ' fps';
|
||||
}
|
||||
return recorder.framerate || 'native';
|
||||
}, [isRec, liveStatus, recorder.framerate]);
|
||||
|
||||
const displaySignal = liveStatus
|
||||
? (liveStatus.signal || '·')
|
||||
: (isRec ? 'connecting…' : '·');
|
||||
|
|
@ -861,10 +853,6 @@ function RecorderRow({ recorder: initialRecorder, onRefresh }) {
|
|||
{displaySignal}
|
||||
</div>
|
||||
</div>
|
||||
<div className="recorder-stat">
|
||||
<div className="stat-label">Framerate</div>
|
||||
<div className="stat-val mono">{displayFramerate}</div>
|
||||
</div>
|
||||
</div>
|
||||
<div className="recorder-actions">
|
||||
{!isRec && (
|
||||
|
|
@ -958,11 +946,6 @@ function CapturePortChip({ port, sigEntry }) {
|
|||
<span style={{ fontSize: 9.5, fontWeight: 700, letterSpacing: '0.05em', color }}>
|
||||
{label}
|
||||
</span>
|
||||
{sigEntry && sigEntry.currentFps != null && (
|
||||
<span style={{ fontSize: 9.5, color: 'var(--text-4)', fontFamily: 'var(--font-mono)' }}>
|
||||
{Number(sigEntry.currentFps).toFixed(1)} fps
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
// screens-library.jsx
|
||||
|
||||
function Library({ navigate, onOpenAsset, openProject, onClearProject }) {
|
||||
|
||||
function buildBinTree(f){const m={};f.forEach(b=>{m[b.id]={...b,children:[]};});const r=[];f.forEach(b=>{if(b.parent_id&&m[b.parent_id])m[b.parent_id].children.push(m[b.id]);else r.push(m[b.id]);});return r;}
|
||||
function collectDescendantIds(id,f){const s=new Set([id]);let c=true;while(c){c=false;f.forEach(b=>{if(b.parent_id&&s.has(b.parent_id)&&!s.has(b.id)){s.add(b.id);c=true;}});}return s;}
|
||||
function Library({ navigate, onOpenAsset, openProject, onClearProject, onOpenProject }) {
|
||||
const PROJECTS = window.ZAMPP_DATA?.PROJECTS || [];
|
||||
const [bins, setBins] = React.useState(window.ZAMPP_DATA?.BINS || []);
|
||||
const BINS = bins; // legacy local name; keep so the rest of the function reads unchanged
|
||||
|
|
@ -14,6 +17,8 @@ function Library({ navigate, onOpenAsset, openProject, onClearProject }) {
|
|||
var normalized = (list || []).map(function(b) { return { ...b, count: b.asset_count != null ? b.asset_count : (b.count || 0), icon: b.type || 'grid' }; });
|
||||
if (!openProject) window.ZAMPP_DATA.BINS = normalized;
|
||||
setBins(normalized);
|
||||
// Auto-expand all bins so nested children are always visible
|
||||
setExpandedBins(function(prev) { var s = new Set(prev); normalized.forEach(function(b){ s.add(b.id); }); return s; });
|
||||
})
|
||||
.catch(function() {});
|
||||
}, [openProject]);
|
||||
|
|
@ -25,21 +30,44 @@ function Library({ navigate, onOpenAsset, openProject, onClearProject }) {
|
|||
return function() { window.removeEventListener('df:bins-changed', onBinsChanged); };
|
||||
}, [refreshBins]);
|
||||
|
||||
const createBin = () => {
|
||||
if (!openProject) { window.alert('Open a project first (Projects → click a project), then create a bin inside it.'); return; }
|
||||
setNewBinName(''); setCreatingBin(true);
|
||||
};
|
||||
const [creatingChildOf, setCreatingChildOf] = React.useState(null);
|
||||
// Start with all bins expanded so nested children are visible immediately
|
||||
const [expandedBins, setExpandedBins] = React.useState(() => new Set((window.ZAMPP_DATA?.BINS||[]).map(b=>b.id)));
|
||||
|
||||
const createBin = () => {
|
||||
if (!openProject) {
|
||||
if (window.toast) window.toast.error('Open a project first (Projects → click a project), then create a bin inside it.');
|
||||
else window.alert('Open a project first (Projects → click a project), then create a bin inside it.');
|
||||
return;
|
||||
}
|
||||
setCreatingChildOf(null); setNewBinName(''); setCreatingBin(true);
|
||||
};
|
||||
const createSubBin = (parentId) => {
|
||||
if (!openProject) return;
|
||||
setCreatingChildOf(parentId); setNewBinName(''); setCreatingBin(true);
|
||||
};
|
||||
const toggleBinExpanded = (binId) => {
|
||||
setExpandedBins(prev => { const s = new Set(prev); s.has(binId) ? s.delete(binId) : s.add(binId); return s; });
|
||||
};
|
||||
const submitBin = (name) => {
|
||||
if (!name || !name.trim()) { setCreatingBin(false); return; }
|
||||
if (!name || !name.trim()) { setCreatingBin(false); setCreatingChildOf(null); return; }
|
||||
setCreatingBin(false);
|
||||
const parentId = creatingChildOf;
|
||||
setCreatingChildOf(null);
|
||||
window.ZAMPP_API.fetch('/bins', {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ project_id: openProject.id, name: name.trim() }),
|
||||
body: JSON.stringify({ project_id: openProject.id, name: name.trim(), parent_id: parentId || null }),
|
||||
})
|
||||
.then(() => window.ZAMPP_API.fetch('/bins?project_id=' + openProject.id))
|
||||
.then(list => setBins((list || []).map(b => ({ ...b, count: b.asset_count || 0, icon: b.type || 'grid' }))))
|
||||
.catch(e => window.alert('Could not create bin: ' + e.message));
|
||||
.then(list => {
|
||||
const n = (list||[]).map(b=>({...b,count:b.asset_count||0,icon:b.type||'grid'}));
|
||||
setBins(n);
|
||||
if (parentId) setExpandedBins(prev => { const s=new Set(prev); s.add(parentId); return s; });
|
||||
})
|
||||
.catch(e => {
|
||||
if (window.toast) window.toast.error('Could not create bin: ' + e.message);
|
||||
else window.alert('Could not create bin: ' + e.message);
|
||||
});
|
||||
};
|
||||
const [view, setView] = React.useState('grid');
|
||||
const [filter, setFilter] = React.useState('all'); // 'all', 'ready', 'processing', 'live', 'error', 'recent'
|
||||
|
|
@ -285,12 +313,13 @@ function Library({ navigate, onOpenAsset, openProject, onClearProject }) {
|
|||
assets = assets.filter(function(a) { return a.status === filter; });
|
||||
}
|
||||
if (search) assets = assets.filter(function(a) { return a.name.toLowerCase().includes(search.toLowerCase()); });
|
||||
if (selectedBinId) assets = assets.filter(function(a) { return a.bin_id === selectedBinId; });
|
||||
if (selectedBinId) { var sids=collectDescendantIds(selectedBinId,BINS); assets=assets.filter(function(a){return sids.has(a.bin_id);}); }
|
||||
|
||||
const activeBin = selectedBinId ? BINS.find(b => b.id === selectedBinId) : null;
|
||||
const displayTitle = activeBin
|
||||
? (openProject ? openProject.name + ' · ' : '') + activeBin.name
|
||||
: (openProject ? openProject.name : 'All Assets');
|
||||
const binTree=React.useMemo(function(){return buildBinTree(BINS);},[BINS]);
|
||||
const errorCount = ALL_ASSETS.filter(function(a) { return a.status === 'error'; }).length;
|
||||
const recentCount = ALL_ASSETS.filter(function(a) { return (Date.now() - new Date(a.created_at)) < 86400000; }).length;
|
||||
|
||||
|
|
@ -309,7 +338,7 @@ function Library({ navigate, onOpenAsset, openProject, onClearProject }) {
|
|||
{PROJECTS.slice(0, 8).map(function(p) {
|
||||
return (
|
||||
<div key={p.id} className={`rail-item ${openProject && openProject.id === p.id ? 'active' : ''}`} style={{ cursor: 'pointer' }}
|
||||
onClick={function() { navigate('projects'); }}
|
||||
onClick={function() { if (onOpenProject) onOpenProject(p); }}
|
||||
onContextMenu={function(e) { openProjectCtx(p, e); }}>
|
||||
<span className="rail-color-dot" style={{ background: p.color }} />
|
||||
<span style={{ overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap' }}>{p.name}</span>
|
||||
|
|
@ -329,45 +358,30 @@ function Library({ navigate, onOpenAsset, openProject, onClearProject }) {
|
|||
</button>
|
||||
</div>
|
||||
<div className="rail-list">
|
||||
{creatingBin && (
|
||||
<div style={{ padding: '4px 6px', display: 'flex', gap: 4, alignItems: 'center' }}>
|
||||
<input
|
||||
className="field-input"
|
||||
autoFocus
|
||||
value={newBinName}
|
||||
onChange={function(e) { setNewBinName(e.target.value); }}
|
||||
onKeyDown={function(e) {
|
||||
if (e.key === 'Enter') submitBin(newBinName);
|
||||
if (e.key === 'Escape') { setCreatingBin(false); }
|
||||
}}
|
||||
onBlur={function() { submitBin(newBinName); }}
|
||||
placeholder="Bin name"
|
||||
style={{ fontSize: 12, height: 26, padding: '0 6px', flex: 1 }}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
{!creatingBin && BINS.length === 0 ? (
|
||||
<div style={{ fontSize: 11, color: 'var(--text-3)', padding: '6px 8px', fontStyle: 'italic' }}>
|
||||
{openProject ? 'No bins yet: click + to create one.' : 'Open a project to manage bins.'}
|
||||
</div>
|
||||
) : BINS.map(function(b) {
|
||||
const isActive = selectedBinId === b.id;
|
||||
const isDragTarget = draggingAssetId !== null && dragOverBinId === b.id;
|
||||
return (
|
||||
<div key={b.id}
|
||||
className={'rail-item' + (isActive ? ' active' : '') + (isDragTarget ? ' droppable' : '')}
|
||||
onClick={function() { setSelectedBinId(isActive ? null : b.id); }}
|
||||
onDragOver={function(e) { onBinDragOver(b.id, e); }}
|
||||
onDrop={function(e) { onBinDrop(b.id, e); }}
|
||||
onDragLeave={onBinDragLeave}
|
||||
style={{ cursor: 'pointer' }}
|
||||
title={isActive ? 'Click to clear bin filter' : 'Filter to this bin'}>
|
||||
<Icon name={binIcon(b.icon)} size={13} className="rail-icon" />
|
||||
<span>{b.name}</span>
|
||||
<span className="rail-count">{b.count}</span>
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
) : (
|
||||
<BinTreeNodes nodes={binTree} depth={0}
|
||||
selectedBinId={selectedBinId} setSelectedBinId={setSelectedBinId}
|
||||
draggingAssetId={draggingAssetId} dragOverBinId={dragOverBinId}
|
||||
onBinDragOver={onBinDragOver} onBinDrop={onBinDrop} onBinDragLeave={onBinDragLeave}
|
||||
expandedBins={expandedBins} toggleBinExpanded={toggleBinExpanded}
|
||||
creatingBin={creatingBin} creatingChildOf={creatingChildOf}
|
||||
newBinName={newBinName} setNewBinName={setNewBinName}
|
||||
submitBin={submitBin} setCreatingBin={setCreatingBin} setCreatingChildOf={setCreatingChildOf}
|
||||
createSubBin={createSubBin} openProject={openProject} />
|
||||
)}
|
||||
{creatingBin && creatingChildOf === null && (
|
||||
<div style={{ padding: '4px 6px', display: 'flex', gap: 4, alignItems: 'center' }}>
|
||||
<input className="field-input" autoFocus value={newBinName}
|
||||
onChange={function(e) { setNewBinName(e.target.value); }}
|
||||
onKeyDown={function(e) { if (e.key==='Enter') submitBin(newBinName); if (e.key==='Escape') { setCreatingBin(false); } }}
|
||||
onBlur={function() { submitBin(newBinName); }}
|
||||
placeholder="Bin name" style={{ fontSize: 12, height: 26, padding: '0 6px', flex: 1 }} />
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
|
|
@ -596,7 +610,8 @@ function AssetContextMenu({ asset, x, y, bins, onClose, onChanged, onOpen, onRen
|
|||
window.ZAMPP_API.fetch('/assets/' + asset.id + '/promote', { method: 'POST' })
|
||||
.then(function() {
|
||||
if (onChanged) onChanged();
|
||||
window.alert('Promotion job queued. The file is being uploaded to S3 in the background.');
|
||||
if (window.toast) window.toast.success('Promotion job queued. The file is being uploaded to S3 in the background.');
|
||||
else window.alert('Promotion job queued. The file is being uploaded to S3 in the background.');
|
||||
})
|
||||
.catch(function(e) { alert('Promotion failed: ' + e.message); });
|
||||
};
|
||||
|
|
@ -873,5 +888,6 @@ function DownloadWarningModal({ asset, onClose, onConfirm }) {
|
|||
);
|
||||
}
|
||||
|
||||
function BinTreeNodes(p){var nodes=p.nodes,depth=p.depth,selId=p.selectedBinId,setSel=p.setSelectedBinId;var drag=p.draggingAssetId,over=p.dragOverBinId;var onOver=p.onBinDragOver,onDrop=p.onBinDrop,onLeave=p.onBinDragLeave;var expanded=p.expandedBins,toggle=p.toggleBinExpanded;var creat=p.creatingBin,parentOf=p.creatingChildOf;var newName=p.newBinName,setName=p.setNewBinName;var submit=p.submitBin,setCreat=p.setCreatingBin,setParent=p.setCreatingChildOf;var createSub=p.createSubBin,proj=p.openProject;if(!nodes||!nodes.length)return null;return nodes.map(function(b){var act=selId===b.id,idt=drag!==null&&over===b.id,hasC=b.children&&b.children.length>0,exp=expanded.has(b.id),isCk=creat&&parentOf===b.id,ind=depth*14;return React.createElement(React.Fragment,{key:b.id},React.createElement("div",{className:"rail-item"+(act?" active":"")+(idt?" droppable":""),onClick:function(){setSel(act?null:b.id);},onDragOver:function(e){onOver(b.id,e);},onDrop:function(e){onDrop(b.id,e);},onDragLeave:onLeave,style:{cursor:"pointer",paddingLeft:8+ind}},React.createElement("span",{style:{display:"inline-flex",alignItems:"center",width:16,height:16,flexShrink:0,marginRight:2,color:hasC?"var(--text-3)":"transparent",transition:"transform 120ms",transform:hasC&&exp?"rotate(90deg)":"none"},onClick:function(e){if(!hasC)return;e.stopPropagation();toggle(b.id);}},hasC&&React.createElement("svg",{width:8,height:8,viewBox:"0 0 8 8",fill:"currentColor"},React.createElement("path",{d:"M2 1l4 3-4 3V1z"}))),React.createElement(Icon,{name:binIcon(b.icon),size:13,className:"rail-icon",style:{marginRight:4}}),React.createElement("span",{style:{flex:1,overflow:"hidden",textOverflow:"ellipsis",whiteSpace:"nowrap"}},b.name),React.createElement("span",{className:"rail-count"},b.count),proj&&React.createElement("button",{className:"icon-btn bin-add-child-btn","aria-label":"Create sub-bin",onClick:function(e){e.stopPropagation();createSub(b.id);},style:{opacity:0,transition:"opacity 100ms",marginLeft:2,flexShrink:0},onFocus:function(e){e.currentTarget.style.opacity="1";},onBlur:function(e){e.currentTarget.style.opacity="";}},React.createElement(Icon,{name:"plus",size:10}))),isCk&&React.createElement("div",{style:{paddingLeft:8+ind+14,paddingRight:6,paddingTop:4,paddingBottom:4,display:"flex",gap:4,alignItems:"center"}},React.createElement("input",{className:"field-input",autoFocus:true,value:newName,onChange:function(e){setName(e.target.value);},onKeyDown:function(e){if(e.key==="Enter")submit(newName);if(e.key==="Escape"){setCreat(false);setParent(null);}},onBlur:function(){submit(newName);},placeholder:"Sub-bin name",style:{fontSize:12,height:26,padding:"0 6px",flex:1}})),hasC&&exp&&React.createElement(BinTreeNodes,Object.assign({},p,{nodes:b.children,depth:depth+1})));});}
|
||||
window.Library = Library;
|
||||
window.AssetCard = AssetCard;
|
||||
|
|
|
|||
|
|
@ -292,37 +292,38 @@
|
|||
text-align: center;
|
||||
margin-top: 8px;
|
||||
}
|
||||
/* Logo wrapper holds the animated pulse halo behind the image. */
|
||||
/* Logo wrapper — large hero with orange pulse halo. */
|
||||
.launcher-logo-wrap {
|
||||
position: relative;
|
||||
display: inline-grid;
|
||||
place-items: center;
|
||||
width: 52px;
|
||||
height: 52px;
|
||||
width: 120px;
|
||||
height: 120px;
|
||||
flex-shrink: 0;
|
||||
}
|
||||
.launcher-logo-pulse {
|
||||
position: absolute;
|
||||
width: 80px;
|
||||
height: 80px;
|
||||
width: 180px;
|
||||
height: 180px;
|
||||
border-radius: 50%;
|
||||
background: radial-gradient(circle, var(--accent-soft) 0%, transparent 70%);
|
||||
animation: logoPulse 3s ease-in-out infinite;
|
||||
background: radial-gradient(circle, rgba(232, 130, 28, 0.35) 0%, rgba(232, 130, 28, 0.08) 55%, transparent 70%);
|
||||
animation: logoPulse 2.8s ease-in-out infinite;
|
||||
z-index: 0;
|
||||
}
|
||||
@keyframes logoPulse {
|
||||
0%, 100% { transform: scale(1); opacity: 0.6; }
|
||||
50% { transform: scale(1.15); opacity: 1; }
|
||||
0%, 100% { transform: scale(1); opacity: 0.7; }
|
||||
50% { transform: scale(1.18); opacity: 1; }
|
||||
}
|
||||
.launcher-logo {
|
||||
position: relative;
|
||||
z-index: 1;
|
||||
width: 52px;
|
||||
height: 52px;
|
||||
width: 110px;
|
||||
height: 110px;
|
||||
object-fit: contain;
|
||||
filter:
|
||||
brightness(0) invert(1)
|
||||
drop-shadow(0 0 8px rgba(232, 130, 28, 0.35));
|
||||
drop-shadow(0 0 14px rgba(232, 130, 28, 0.6))
|
||||
drop-shadow(0 0 4px rgba(255, 180, 60, 0.4));
|
||||
animation: launcherLogoIn 400ms cubic-bezier(0.2, 0.7, 0.2, 1) both;
|
||||
}
|
||||
@keyframes launcherLogoIn {
|
||||
|
|
@ -330,7 +331,7 @@
|
|||
to { opacity: 1; transform: scale(1); }
|
||||
}
|
||||
@media (prefers-reduced-motion: reduce) {
|
||||
.launcher-logo-pulse { animation: none; opacity: 0.5; }
|
||||
.launcher-logo-pulse { animation: none; opacity: 0.6; }
|
||||
.launcher-logo { animation: none; }
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@
|
|||
}
|
||||
.source-type-grid {
|
||||
display: grid;
|
||||
grid-template-columns: repeat(3, 1fr);
|
||||
grid-template-columns: repeat(2, 1fr);
|
||||
gap: 8px;
|
||||
}
|
||||
.source-type-card {
|
||||
|
|
|
|||
|
|
@ -1066,6 +1066,9 @@
|
|||
.rail-item .rail-icon { color: var(--text-3); }
|
||||
.rail-item .rail-count { margin-left: auto; font-family: var(--font-mono); font-size: 10.5px; color: var(--text-3); }
|
||||
.rail-color-dot { width: 8px; height: 8px; border-radius: 50%; }
|
||||
/* Show sub-bin create button only on hover of the parent rail-item */
|
||||
.rail-item:hover .bin-add-child-btn { opacity: 1 !important; }
|
||||
.bin-add-child-btn { padding: 0 2px; height: 18px; min-width: 18px; }
|
||||
|
||||
.library-main {
|
||||
display: flex; flex-direction: column;
|
||||
|
|
|
|||
Loading…
Reference in a new issue