feat(framecache): phase 2 — deltacast-bridge writes to shm ring
- fc_writer.h/fc_writer.c: new framecache slot writer module - Registers slot via POST /slots to framecache HTTP API on signal lock - Opens shm file returned by API (O_RDWR + mmap MAP_SHARED) - fc_writer_write(): atomic write_cursor advance + sem_post per frame - fc_writer_close(): DELETE /slots/:id + munmap + sem_close - HTTP calls via raw POSIX sockets (no libcurl dependency) - Parses host:port from FC_URL env var or --fc-url arg - main.c changes: - PortState gains slot_id, fc_url, fc_writer fields - --fc-url CLI arg + FC_URL env var (default http://localhost:7435) - On signal lock: fc_writer_open() before thread launch; falls back to FIFO if framecache unreachable (fc_writer == NULL) - video_thread: shm path primary (fc_writer != NULL), FIFO path fallback (fc_writer == NULL or LEGACY_FIFO=1) - Format JSON now includes slot_id field for node-agent consumption - Cleanup: fc_writer_close() before VHD_CloseBoardHandle - CMakeLists.txt: - Add fc_writer.c to build - Link rt (shm_open, sem_open) - LEGACY_FIFO option (OFF by default) for nodes without framecache Audio thread unchanged — audio stays in FIFO (shm audio is roadmap).
This commit is contained in:
parent
1573bf8954
commit
0d479d043d
4 changed files with 508 additions and 28 deletions
|
|
@ -4,8 +4,19 @@ set(CMAKE_C_STANDARD 17)
|
||||||
|
|
||||||
set(SDK_ROOT "/sdk" CACHE PATH "Path to extracted VideoMaster SDK")
|
set(SDK_ROOT "/sdk" CACHE PATH "Path to extracted VideoMaster SDK")
|
||||||
|
|
||||||
|
# Legacy FIFO mode — set LEGACY_FIFO=ON to disable framecache shm writes
|
||||||
|
# and fall back to the original named-FIFO path.
|
||||||
|
option(LEGACY_FIFO "Use named FIFOs instead of framecache shm" OFF)
|
||||||
|
|
||||||
# Primary binary: deltacast-bridge (shared multi-port daemon)
|
# Primary binary: deltacast-bridge (shared multi-port daemon)
|
||||||
add_executable(deltacast-bridge main.c)
|
add_executable(deltacast-bridge main.c fc_writer.c)
|
||||||
|
|
||||||
|
if(LEGACY_FIFO)
|
||||||
|
target_compile_definitions(deltacast-bridge PRIVATE LEGACY_FIFO=1)
|
||||||
|
message(STATUS "deltacast-bridge: LEGACY_FIFO mode enabled (shm disabled)")
|
||||||
|
else()
|
||||||
|
message(STATUS "deltacast-bridge: framecache shm mode enabled")
|
||||||
|
endif()
|
||||||
|
|
||||||
target_include_directories(deltacast-bridge PRIVATE
|
target_include_directories(deltacast-bridge PRIVATE
|
||||||
${SDK_ROOT}/include/videomaster
|
${SDK_ROOT}/include/videomaster
|
||||||
|
|
@ -19,6 +30,7 @@ target_link_libraries(deltacast-bridge PRIVATE
|
||||||
videomasterhd
|
videomasterhd
|
||||||
videomasterhd_audio
|
videomasterhd_audio
|
||||||
pthread
|
pthread
|
||||||
|
rt # shm_open, sem_open
|
||||||
)
|
)
|
||||||
|
|
||||||
# Embed the SDK RPATH so the binary finds the .so at runtime
|
# Embed the SDK RPATH so the binary finds the .so at runtime
|
||||||
|
|
|
||||||
300
services/capture/deltacast-bridge/fc_writer.c
Normal file
300
services/capture/deltacast-bridge/fc_writer.c
Normal file
|
|
@ -0,0 +1,300 @@
|
||||||
|
/**
|
||||||
|
* fc_writer.c — Framecache slot writer for deltacast-bridge.
|
||||||
|
*
|
||||||
|
* Uses only POSIX + libc — no external dependencies beyond what the bridge
|
||||||
|
* already links. HTTP calls are done with raw sockets (tiny GET/POST/DELETE)
|
||||||
|
* to avoid pulling in libcurl.
|
||||||
|
*/
|
||||||
|
#include "fc_writer.h"
|
||||||
|
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <stdatomic.h>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <netdb.h>
|
||||||
|
#include <sys/mman.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <semaphore.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
|
||||||
|
/* Re-use the shared memory layout from the framecache service */
|
||||||
|
#define FC_MAGIC 0x46524D43u
|
||||||
|
#define FC_VERSION 1u
|
||||||
|
#define FC_RING_DEPTH 120u
|
||||||
|
#define FC_HEADER_SIZE 4096u
|
||||||
|
#define FC_FRAME_HDR_SIZE 24u
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint32_t magic;
|
||||||
|
uint32_t version;
|
||||||
|
uint32_t width;
|
||||||
|
uint32_t height;
|
||||||
|
uint32_t fps_num;
|
||||||
|
uint32_t fps_den;
|
||||||
|
uint32_t pixel_format;
|
||||||
|
uint32_t frame_size;
|
||||||
|
uint32_t ring_depth;
|
||||||
|
uint32_t _reserved;
|
||||||
|
_Atomic uint64_t write_cursor;
|
||||||
|
_Atomic uint64_t dropped_frames;
|
||||||
|
char source_type[32];
|
||||||
|
char slot_id[64];
|
||||||
|
uint8_t _pad[FC_HEADER_SIZE - 112];
|
||||||
|
} fc_hdr_t;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint64_t pts_us;
|
||||||
|
uint64_t wall_us;
|
||||||
|
uint32_t size;
|
||||||
|
uint32_t _pad;
|
||||||
|
uint8_t data[];
|
||||||
|
} fc_frm_t;
|
||||||
|
|
||||||
|
struct fc_writer {
|
||||||
|
void *base;
|
||||||
|
size_t shm_size;
|
||||||
|
int shm_fd;
|
||||||
|
sem_t *sem;
|
||||||
|
char slot_id[64];
|
||||||
|
char fc_url[256]; /* base URL for DELETE on close */
|
||||||
|
char shm_path[128];
|
||||||
|
char sem_name[128];
|
||||||
|
};
|
||||||
|
|
||||||
|
/* ── tiny HTTP helper ──────────────────────────────────────────────── */
|
||||||
|
|
||||||
|
static int http_request(const char *method,
|
||||||
|
const char *host, int port, const char *path,
|
||||||
|
const char *body, /* NULL for GET/DELETE */
|
||||||
|
char *resp_buf, size_t resp_len)
|
||||||
|
{
|
||||||
|
struct sockaddr_in sa;
|
||||||
|
memset(&sa, 0, sizeof sa);
|
||||||
|
sa.sin_family = AF_INET;
|
||||||
|
sa.sin_port = htons((uint16_t)port);
|
||||||
|
|
||||||
|
struct hostent *he = gethostbyname(host);
|
||||||
|
if (!he) return -1;
|
||||||
|
memcpy(&sa.sin_addr, he->h_addr_list[0], (size_t)he->h_length);
|
||||||
|
|
||||||
|
int fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||||
|
if (fd < 0) return -1;
|
||||||
|
|
||||||
|
struct timeval tv = { .tv_sec = 5 };
|
||||||
|
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof tv);
|
||||||
|
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof tv);
|
||||||
|
|
||||||
|
if (connect(fd, (struct sockaddr *)&sa, sizeof sa) < 0) {
|
||||||
|
close(fd); return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
char req[4096];
|
||||||
|
int req_len;
|
||||||
|
if (body) {
|
||||||
|
req_len = snprintf(req, sizeof req,
|
||||||
|
"%s %s HTTP/1.0\r\n"
|
||||||
|
"Host: %s:%d\r\n"
|
||||||
|
"Content-Type: application/json\r\n"
|
||||||
|
"Content-Length: %zu\r\n"
|
||||||
|
"Connection: close\r\n\r\n"
|
||||||
|
"%s",
|
||||||
|
method, path, host, port, strlen(body), body);
|
||||||
|
} else {
|
||||||
|
req_len = snprintf(req, sizeof req,
|
||||||
|
"%s %s HTTP/1.0\r\n"
|
||||||
|
"Host: %s:%d\r\n"
|
||||||
|
"Connection: close\r\n\r\n",
|
||||||
|
method, path, host, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (send(fd, req, (size_t)req_len, 0) < 0) { close(fd); return -1; }
|
||||||
|
|
||||||
|
int status = -1;
|
||||||
|
size_t got = 0;
|
||||||
|
char buf[8192];
|
||||||
|
ssize_t n;
|
||||||
|
while ((n = recv(fd, buf + got, sizeof buf - got - 1, 0)) > 0)
|
||||||
|
got += (size_t)n;
|
||||||
|
buf[got] = '\0';
|
||||||
|
|
||||||
|
/* Parse status line */
|
||||||
|
if (sscanf(buf, "HTTP/%*s %d", &status) != 1) status = -1;
|
||||||
|
|
||||||
|
/* Copy body (after \r\n\r\n) into resp_buf */
|
||||||
|
if (resp_buf && resp_len > 0) {
|
||||||
|
const char *body_start = strstr(buf, "\r\n\r\n");
|
||||||
|
if (body_start) {
|
||||||
|
strncpy(resp_buf, body_start + 4, resp_len - 1);
|
||||||
|
resp_buf[resp_len - 1] = '\0';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
close(fd);
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Parse "host:port" or just "host" from a URL like "http://host:port" */
|
||||||
|
static void parse_url(const char *url, char *host, size_t hlen, int *port)
|
||||||
|
{
|
||||||
|
const char *p = url;
|
||||||
|
if (strncmp(p, "http://", 7) == 0) p += 7;
|
||||||
|
*port = 7435;
|
||||||
|
const char *colon = strchr(p, ':');
|
||||||
|
if (colon) {
|
||||||
|
size_t n = (size_t)(colon - p);
|
||||||
|
if (n >= hlen) n = hlen - 1;
|
||||||
|
strncpy(host, p, n);
|
||||||
|
host[n] = '\0';
|
||||||
|
*port = atoi(colon + 1);
|
||||||
|
} else {
|
||||||
|
strncpy(host, p, hlen - 1);
|
||||||
|
host[hlen - 1] = '\0';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int json_str(const char *json, const char *key, char *out, size_t len)
|
||||||
|
{
|
||||||
|
char pat[128];
|
||||||
|
snprintf(pat, sizeof pat, "\"%s\":", key);
|
||||||
|
const char *p = strstr(json, pat);
|
||||||
|
if (!p) return -1;
|
||||||
|
p += strlen(pat);
|
||||||
|
while (*p == ' ') p++;
|
||||||
|
if (*p != '"') return -1;
|
||||||
|
p++;
|
||||||
|
size_t i = 0;
|
||||||
|
while (*p && *p != '"' && i < len - 1) out[i++] = *p++;
|
||||||
|
out[i] = '\0';
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ── public API ────────────────────────────────────────────────────── */
|
||||||
|
|
||||||
|
fc_writer_t *fc_writer_open(const char *fc_url,
|
||||||
|
const char *slot_id,
|
||||||
|
uint32_t width, uint32_t height,
|
||||||
|
uint32_t fps_num, uint32_t fps_den)
|
||||||
|
{
|
||||||
|
char host[128]; int port;
|
||||||
|
parse_url(fc_url, host, sizeof host, &port);
|
||||||
|
|
||||||
|
/* POST /slots */
|
||||||
|
char body[512];
|
||||||
|
snprintf(body, sizeof body,
|
||||||
|
"{\"slot_id\":\"%s\","
|
||||||
|
"\"width\":%u,\"height\":%u,"
|
||||||
|
"\"fps_num\":%u,\"fps_den\":%u,"
|
||||||
|
"\"source_type\":\"deltacast\"}",
|
||||||
|
slot_id, width, height, fps_num, fps_den);
|
||||||
|
|
||||||
|
char resp[1024] = {0};
|
||||||
|
int status = http_request("POST", host, port, "/slots", body, resp, sizeof resp);
|
||||||
|
if (status != 201) {
|
||||||
|
fprintf(stderr, "[fc_writer:%s] POST /slots failed (HTTP %d): %s\n",
|
||||||
|
slot_id, status, resp);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
char shm_path[128] = {0}, sem_name[128] = {0};
|
||||||
|
json_str(resp, "shm_path", shm_path, sizeof shm_path);
|
||||||
|
json_str(resp, "sem_name", sem_name, sizeof sem_name);
|
||||||
|
|
||||||
|
if (!shm_path[0] || !sem_name[0]) {
|
||||||
|
fprintf(stderr, "[fc_writer:%s] bad response (missing shm_path/sem_name)\n", slot_id);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* mmap the shm file */
|
||||||
|
int fd = open(shm_path, O_RDWR);
|
||||||
|
if (fd < 0) {
|
||||||
|
fprintf(stderr, "[fc_writer:%s] open %s: %s\n", slot_id, shm_path, strerror(errno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
/* Read header to get frame_size */
|
||||||
|
fc_hdr_t hdr;
|
||||||
|
if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) {
|
||||||
|
fprintf(stderr, "[fc_writer:%s] bad shm header\n", slot_id);
|
||||||
|
close(fd); return NULL;
|
||||||
|
}
|
||||||
|
size_t total = (size_t)FC_HEADER_SIZE
|
||||||
|
+ (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + hdr.frame_size);
|
||||||
|
|
||||||
|
void *base = mmap(NULL, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
||||||
|
if (base == MAP_FAILED) {
|
||||||
|
fprintf(stderr, "[fc_writer:%s] mmap: %s\n", slot_id, strerror(errno));
|
||||||
|
close(fd); return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
sem_t *sem = sem_open(sem_name, 0);
|
||||||
|
if (sem == SEM_FAILED) {
|
||||||
|
fprintf(stderr, "[fc_writer:%s] sem_open %s: %s\n", slot_id, sem_name, strerror(errno));
|
||||||
|
munmap(base, total); close(fd); return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
fc_writer_t *w = calloc(1, sizeof *w);
|
||||||
|
if (!w) { sem_close(sem); munmap(base, total); close(fd); return NULL; }
|
||||||
|
|
||||||
|
w->base = base;
|
||||||
|
w->shm_size = total;
|
||||||
|
w->shm_fd = fd;
|
||||||
|
w->sem = sem;
|
||||||
|
strncpy(w->slot_id, slot_id, sizeof w->slot_id - 1);
|
||||||
|
strncpy(w->fc_url, fc_url, sizeof w->fc_url - 1);
|
||||||
|
strncpy(w->shm_path, shm_path, sizeof w->shm_path - 1);
|
||||||
|
strncpy(w->sem_name, sem_name, sizeof w->sem_name - 1);
|
||||||
|
|
||||||
|
fprintf(stderr, "[fc_writer:%s] slot open (%ux%u %.2ffps shm=%s)\n",
|
||||||
|
slot_id, width, height,
|
||||||
|
fps_den ? (double)fps_num / fps_den : 0.0, shm_path);
|
||||||
|
return w;
|
||||||
|
}
|
||||||
|
|
||||||
|
void fc_writer_write(fc_writer_t *w,
|
||||||
|
const uint8_t *data, uint32_t size,
|
||||||
|
uint64_t pts_us)
|
||||||
|
{
|
||||||
|
fc_hdr_t *hdr = (fc_hdr_t *)w->base;
|
||||||
|
uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed);
|
||||||
|
uint64_t idx = cur % FC_RING_DEPTH;
|
||||||
|
|
||||||
|
/* Locate frame in ring */
|
||||||
|
uint8_t *frames = (uint8_t *)w->base + FC_HEADER_SIZE;
|
||||||
|
fc_frm_t *frame = (fc_frm_t *)(frames + idx * ((size_t)FC_FRAME_HDR_SIZE + hdr->frame_size));
|
||||||
|
|
||||||
|
struct timespec ts;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &ts);
|
||||||
|
uint64_t wall = (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL;
|
||||||
|
|
||||||
|
frame->pts_us = pts_us;
|
||||||
|
frame->wall_us = wall;
|
||||||
|
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
|
||||||
|
memcpy(frame->data, data, frame->size);
|
||||||
|
|
||||||
|
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
|
||||||
|
sem_post(w->sem);
|
||||||
|
}
|
||||||
|
|
||||||
|
void fc_writer_close(fc_writer_t *w)
|
||||||
|
{
|
||||||
|
if (!w) return;
|
||||||
|
|
||||||
|
/* DELETE /slots/:id */
|
||||||
|
char host[128]; int port;
|
||||||
|
parse_url(w->fc_url, host, sizeof host, &port);
|
||||||
|
char path[192];
|
||||||
|
snprintf(path, sizeof path, "/slots/%s", w->slot_id);
|
||||||
|
http_request("DELETE", host, port, path, NULL, NULL, 0);
|
||||||
|
|
||||||
|
sem_close(w->sem);
|
||||||
|
munmap(w->base, w->shm_size);
|
||||||
|
close(w->shm_fd);
|
||||||
|
fprintf(stderr, "[fc_writer:%s] slot closed\n", w->slot_id);
|
||||||
|
free(w);
|
||||||
|
}
|
||||||
50
services/capture/deltacast-bridge/fc_writer.h
Normal file
50
services/capture/deltacast-bridge/fc_writer.h
Normal file
|
|
@ -0,0 +1,50 @@
|
||||||
|
/**
|
||||||
|
* fc_writer.h — Lightweight framecache slot writer for deltacast-bridge.
|
||||||
|
*
|
||||||
|
* Registers a slot with the framecache HTTP API on signal lock, then writes
|
||||||
|
* raw UYVY422 frames directly into the shared memory ring buffer.
|
||||||
|
*
|
||||||
|
* Compile with -DLEGACY_FIFO to disable shm writes and fall back to the
|
||||||
|
* original named-FIFO path (useful during transition / on nodes without the
|
||||||
|
* framecache container running).
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <stddef.h>
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct fc_writer fc_writer_t;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a slot with the framecache service and open the shm region for
|
||||||
|
* writing. fc_url is the HTTP base URL, e.g. "http://localhost:7435".
|
||||||
|
* slot_id must be unique per port, e.g. "deltacast-0-3" (device-port).
|
||||||
|
*
|
||||||
|
* Returns writer handle on success, NULL on failure (falls back to FIFO).
|
||||||
|
*/
|
||||||
|
fc_writer_t *fc_writer_open(const char *fc_url,
|
||||||
|
const char *slot_id,
|
||||||
|
uint32_t width, uint32_t height,
|
||||||
|
uint32_t fps_num, uint32_t fps_den);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write one raw UYVY422 frame into the ring buffer.
|
||||||
|
* Non-blocking — slow consumers are skipped, not waited on.
|
||||||
|
* pts_us: presentation timestamp in microseconds (0 if unknown).
|
||||||
|
*/
|
||||||
|
void fc_writer_write(fc_writer_t *w,
|
||||||
|
const uint8_t *data, uint32_t size,
|
||||||
|
uint64_t pts_us);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deregister slot from framecache service and unmap shm.
|
||||||
|
*/
|
||||||
|
void fc_writer_close(fc_writer_t *w);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
@ -3,20 +3,32 @@
|
||||||
* Deltacast VideoMaster SDI shared multi-port bridge daemon.
|
* Deltacast VideoMaster SDI shared multi-port bridge daemon.
|
||||||
*
|
*
|
||||||
* Opens the board ONCE, opens RX streams for all requested ports, and
|
* Opens the board ONCE, opens RX streams for all requested ports, and
|
||||||
* writes each port's video/audio to named FIFOs in a shared directory.
|
* writes each port's video frames into a shared-memory framecache slot
|
||||||
* One reader thread + one audio thread per port run concurrently.
|
* (and audio to a named FIFO — audio-in-shm is a future roadmap item).
|
||||||
|
*
|
||||||
|
* Signal fan-out architecture:
|
||||||
|
* Board → video_thread → fc_writer → /dev/shm/framecache/<slot>
|
||||||
|
* └→ N consumers (recording, proxy,
|
||||||
|
* HLS preview) each read with
|
||||||
|
* their own cursor — zero-copy,
|
||||||
|
* no bandwidth splitting.
|
||||||
*
|
*
|
||||||
* Usage:
|
* Usage:
|
||||||
* deltacast-bridge --device <N> --ports <csv>
|
* deltacast-bridge --device <N> --ports <csv>
|
||||||
* [--video-pipe-dir /dev/shm/deltacast]
|
* [--video-pipe-dir /dev/shm/deltacast]
|
||||||
* [--audio-pipe-dir /dev/shm/deltacast]
|
* [--audio-pipe-dir /dev/shm/deltacast]
|
||||||
|
* [--fc-url http://framecache:7435]
|
||||||
* [--signal-timeout <sec>]
|
* [--signal-timeout <sec>]
|
||||||
*
|
*
|
||||||
* Compat alias: --port <N> treated as --ports <N> (single port).
|
* Compat alias: --port <N> treated as --ports <N> (single port).
|
||||||
*
|
*
|
||||||
* For each port that acquires signal, emits one JSON line to stderr:
|
* For each port that acquires signal, emits one JSON line to stderr:
|
||||||
* {"port":N,"width":W,"height":H,"fps_num":N,"fps_den":D,
|
* {"port":N,"width":W,"height":H,"fps_num":N,"fps_den":D,
|
||||||
* "pix_fmt":"uyvy422","audio_rate":48000,"audio_channels":2}
|
* "pix_fmt":"uyvy422","audio_rate":48000,"audio_channels":2,
|
||||||
|
* "slot_id":"deltacast-<device>-<port>"}
|
||||||
|
*
|
||||||
|
* Compile with -DLEGACY_FIFO=1 to disable shm writes and fall back to
|
||||||
|
* the original named-FIFO path (for nodes without framecache running).
|
||||||
*
|
*
|
||||||
* Runs until SIGTERM/SIGINT, then closes all streams and the board.
|
* Runs until SIGTERM/SIGINT, then closes all streams and the board.
|
||||||
*/
|
*/
|
||||||
|
|
@ -37,10 +49,17 @@
|
||||||
#include "VideoMasterHD_Sdi.h"
|
#include "VideoMasterHD_Sdi.h"
|
||||||
#include "VideoMasterHD_Sdi_Audio.h"
|
#include "VideoMasterHD_Sdi_Audio.h"
|
||||||
|
|
||||||
|
#ifndef LEGACY_FIFO
|
||||||
|
# include "fc_writer.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifndef F_SETPIPE_SZ
|
#ifndef F_SETPIPE_SZ
|
||||||
#define F_SETPIPE_SZ 1031
|
#define F_SETPIPE_SZ 1031
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/* Default framecache URL — overridden by FC_URL env var or --fc-url arg */
|
||||||
|
#define FC_URL_DEFAULT "http://localhost:7435"
|
||||||
|
|
||||||
/* ── Constants ────────────────────────────────────────────────────────── */
|
/* ── Constants ────────────────────────────────────────────────────────── */
|
||||||
#define MAX_PORTS 8
|
#define MAX_PORTS 8
|
||||||
|
|
||||||
|
|
@ -154,11 +173,16 @@ typedef struct {
|
||||||
VideoInfo vi;
|
VideoInfo vi;
|
||||||
char video_fifo[256];
|
char video_fifo[256];
|
||||||
char audio_fifo[256];
|
char audio_fifo[256];
|
||||||
|
char slot_id[128]; /* framecache slot id: "deltacast-<dev>-<port>" */
|
||||||
|
char fc_url[256]; /* framecache HTTP base URL */
|
||||||
/* threads */
|
/* threads */
|
||||||
pthread_t video_tid;
|
pthread_t video_tid;
|
||||||
pthread_t audio_tid;
|
pthread_t audio_tid;
|
||||||
/* streams (owned by threads, set before thread launch) */
|
/* streams (owned by threads, set before thread launch) */
|
||||||
HANDLE video_stream;
|
HANDLE video_stream;
|
||||||
|
#ifndef LEGACY_FIFO
|
||||||
|
fc_writer_t *fc_writer; /* shm ring buffer writer (NULL = use FIFO fallback) */
|
||||||
|
#endif
|
||||||
} PortState;
|
} PortState;
|
||||||
|
|
||||||
/* ── Audio thread ──────────────────────────────────────────────────────
|
/* ── Audio thread ──────────────────────────────────────────────────────
|
||||||
|
|
@ -343,10 +367,67 @@ static void *audio_thread(void *arg) {
|
||||||
static void *video_thread(void *arg) {
|
static void *video_thread(void *arg) {
|
||||||
PortState *ps = (PortState *)arg;
|
PortState *ps = (PortState *)arg;
|
||||||
|
|
||||||
/* Outer loop: reopen the FIFO writer each time a reader connects.
|
#ifndef LEGACY_FIFO
|
||||||
* Mirror the audio thread pattern — EPIPE means the ffmpeg sidecar for
|
/* ── Framecache shm path (primary) ──────────────────────────────────
|
||||||
* this port died (session stop/restart), NOT a hardware fault. We reopen
|
* Write frames directly into the shared memory ring buffer.
|
||||||
* and block until the next recorder start; other ports are unaffected. */
|
* Multiple consumers (growing recorder, proxy encoder, HLS preview)
|
||||||
|
* each hold their own read cursor and read independently — no FIFO
|
||||||
|
* splitting, no bandwidth halving.
|
||||||
|
*
|
||||||
|
* The fc_writer was opened by main() after signal lock. If it is
|
||||||
|
* NULL the framecache service was unavailable and we fall through to
|
||||||
|
* the legacy FIFO path automatically.
|
||||||
|
*/
|
||||||
|
if (ps->fc_writer) {
|
||||||
|
uint64_t frame_seq = 0;
|
||||||
|
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
|
||||||
|
HANDLE slot = NULL;
|
||||||
|
ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot);
|
||||||
|
if (r == VHDERR_NOERROR) {
|
||||||
|
BYTE *buf = NULL;
|
||||||
|
ULONG sz = 0;
|
||||||
|
if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) {
|
||||||
|
ULONG expected = (ULONG)ps->vi.width * (ULONG)ps->vi.height * 2;
|
||||||
|
if (sz != expected) {
|
||||||
|
fprintf(stderr,
|
||||||
|
"[video:%u] WARN: sz=%lu != expected %lu — packing mismatch, skipping\n",
|
||||||
|
ps->port, (unsigned long)sz, (unsigned long)expected);
|
||||||
|
VHD_UnlockSlotHandle(slot);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
/* pts: frame index × frame duration in µs */
|
||||||
|
uint64_t pts_us = 0;
|
||||||
|
if (ps->vi.fps_num > 0) {
|
||||||
|
pts_us = frame_seq * 1000000ULL
|
||||||
|
* (uint64_t)ps->vi.fps_den
|
||||||
|
/ (uint64_t)ps->vi.fps_num;
|
||||||
|
}
|
||||||
|
fc_writer_write(ps->fc_writer, buf, (uint32_t)sz, pts_us);
|
||||||
|
frame_seq++;
|
||||||
|
}
|
||||||
|
VHD_UnlockSlotHandle(slot);
|
||||||
|
} else if (r != VHDERR_TIMEOUT) {
|
||||||
|
fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping port\n",
|
||||||
|
ps->port, (unsigned long)r);
|
||||||
|
atomic_store(&g_port_stop[ps->port], 1);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
/* fc_writer == NULL → fall through to FIFO path */
|
||||||
|
fprintf(stderr, "[video:%u] fc_writer unavailable — falling back to FIFO\n", ps->port);
|
||||||
|
#endif /* !LEGACY_FIFO */
|
||||||
|
|
||||||
|
/* ── Legacy FIFO path ────────────────────────────────────────────────
|
||||||
|
* Kept as compile-time fallback (-DLEGACY_FIFO=1) or when the
|
||||||
|
* framecache service is not reachable at startup.
|
||||||
|
*
|
||||||
|
* Outer loop: reopen the FIFO writer each time a reader connects.
|
||||||
|
* EPIPE means the ffmpeg sidecar for this port died (session
|
||||||
|
* stop/restart), NOT a hardware fault. Reopen and block until the
|
||||||
|
* next recorder start; other ports are unaffected.
|
||||||
|
*/
|
||||||
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
|
while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) {
|
||||||
|
|
||||||
int fd = open(ps->video_fifo, O_WRONLY);
|
int fd = open(ps->video_fifo, O_WRONLY);
|
||||||
|
|
@ -359,7 +440,8 @@ static void *video_thread(void *arg) {
|
||||||
{
|
{
|
||||||
int pipe_sz = 64 * 1024 * 1024; /* 64 MB — ~16 frames of 1080p UYVY */
|
int pipe_sz = 64 * 1024 * 1024; /* 64 MB — ~16 frames of 1080p UYVY */
|
||||||
if (fcntl(fd, F_SETPIPE_SZ, pipe_sz) < 0) {
|
if (fcntl(fd, F_SETPIPE_SZ, pipe_sz) < 0) {
|
||||||
fprintf(stderr, "[video:%u] fcntl F_SETPIPE_SZ failed: %s\n", ps->port, strerror(errno));
|
fprintf(stderr, "[video:%u] fcntl F_SETPIPE_SZ failed: %s\n",
|
||||||
|
ps->port, strerror(errno));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -373,14 +455,14 @@ static void *video_thread(void *arg) {
|
||||||
if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) {
|
if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) {
|
||||||
ULONG expected = (ULONG)ps->vi.width * (ULONG)ps->vi.height * 2;
|
ULONG expected = (ULONG)ps->vi.width * (ULONG)ps->vi.height * 2;
|
||||||
if (sz != expected) {
|
if (sz != expected) {
|
||||||
fprintf(stderr, "[video:%u] WARN: slot sz=%lu != expected %lu (w=%d h=%d) -- packing mismatch; skipping frame\n",
|
fprintf(stderr,
|
||||||
ps->port, sz, expected, ps->vi.width, ps->vi.height);
|
"[video:%u] WARN: slot sz=%lu != expected %lu (w=%d h=%d) -- packing mismatch; skipping frame\n",
|
||||||
|
ps->port, (unsigned long)sz, (unsigned long)expected,
|
||||||
|
ps->vi.width, ps->vi.height);
|
||||||
VHD_UnlockSlotHandle(slot);
|
VHD_UnlockSlotHandle(slot);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (write_all(fd, buf, sz) < 0) {
|
if (write_all(fd, buf, sz) < 0) {
|
||||||
/* EPIPE: sidecar died (session stop/restart).
|
|
||||||
* Break to outer loop — reopen for next session. */
|
|
||||||
fprintf(stderr, "[video:%u] EPIPE — waiting for next reader\n", ps->port);
|
fprintf(stderr, "[video:%u] EPIPE — waiting for next reader\n", ps->port);
|
||||||
VHD_UnlockSlotHandle(slot);
|
VHD_UnlockSlotHandle(slot);
|
||||||
break;
|
break;
|
||||||
|
|
@ -389,7 +471,7 @@ static void *video_thread(void *arg) {
|
||||||
VHD_UnlockSlotHandle(slot);
|
VHD_UnlockSlotHandle(slot);
|
||||||
} else if (r != VHDERR_TIMEOUT) {
|
} else if (r != VHDERR_TIMEOUT) {
|
||||||
fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping port\n",
|
fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping port\n",
|
||||||
ps->port, r);
|
ps->port, (unsigned long)r);
|
||||||
atomic_store(&g_port_stop[ps->port], 1);
|
atomic_store(&g_port_stop[ps->port], 1);
|
||||||
fatal = 1;
|
fatal = 1;
|
||||||
break;
|
break;
|
||||||
|
|
@ -419,12 +501,15 @@ static int parse_ports(const char *csv, unsigned *ports, int max) {
|
||||||
|
|
||||||
/* ── Main ─────────────────────────────────────────────────────────────── */
|
/* ── Main ─────────────────────────────────────────────────────────────── */
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
unsigned device_id = 0;
|
unsigned device_id = 0;
|
||||||
unsigned ports[MAX_PORTS] = {0};
|
unsigned ports[MAX_PORTS] = {0};
|
||||||
int port_count = 0;
|
int port_count = 0;
|
||||||
int sig_timeout = 30;
|
int sig_timeout = 30;
|
||||||
const char *video_pipe_dir = "/dev/shm/deltacast";
|
const char *video_pipe_dir = "/dev/shm/deltacast";
|
||||||
const char *audio_pipe_dir = "/dev/shm/deltacast";
|
const char *audio_pipe_dir = "/dev/shm/deltacast";
|
||||||
|
/* Framecache URL: CLI arg > FC_URL env var > default */
|
||||||
|
const char *fc_url_env = getenv("FC_URL");
|
||||||
|
const char *fc_url = fc_url_env ? fc_url_env : FC_URL_DEFAULT;
|
||||||
|
|
||||||
for (int i = 1; i < argc; i++) {
|
for (int i = 1; i < argc; i++) {
|
||||||
if (!strcmp(argv[i], "--device") && i+1 < argc) {
|
if (!strcmp(argv[i], "--device") && i+1 < argc) {
|
||||||
|
|
@ -441,6 +526,8 @@ int main(int argc, char *argv[]) {
|
||||||
audio_pipe_dir = argv[++i];
|
audio_pipe_dir = argv[++i];
|
||||||
} else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc) {
|
} else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc) {
|
||||||
sig_timeout = atoi(argv[++i]);
|
sig_timeout = atoi(argv[++i]);
|
||||||
|
} else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) {
|
||||||
|
fc_url = argv[++i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -601,17 +688,38 @@ int main(int argc, char *argv[]) {
|
||||||
"%s/video-%u.fifo", video_pipe_dir, ports[pi]);
|
"%s/video-%u.fifo", video_pipe_dir, ports[pi]);
|
||||||
snprintf(p->audio_fifo, sizeof(p->audio_fifo),
|
snprintf(p->audio_fifo, sizeof(p->audio_fifo),
|
||||||
"%s/audio-%u.fifo", audio_pipe_dir, ports[pi]);
|
"%s/audio-%u.fifo", audio_pipe_dir, ports[pi]);
|
||||||
|
snprintf(p->slot_id, sizeof(p->slot_id),
|
||||||
|
"deltacast-%u-%u", device_id, ports[pi]);
|
||||||
|
strncpy(p->fc_url, fc_url, sizeof(p->fc_url) - 1);
|
||||||
|
|
||||||
/* Create FIFOs (mkfifo; ignore EEXIST). */
|
/* Create audio FIFO (always needed — audio stays in FIFO for now). */
|
||||||
if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) {
|
|
||||||
fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) {
|
if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) {
|
||||||
fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno));
|
fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef LEGACY_FIFO
|
||||||
|
/* Open framecache slot for video frames.
|
||||||
|
* Fall back to FIFO if framecache is unreachable. */
|
||||||
|
p->fc_writer = fc_writer_open(p->fc_url, p->slot_id,
|
||||||
|
(uint32_t)p->vi.width, (uint32_t)p->vi.height,
|
||||||
|
(uint32_t)p->vi.fps_num, (uint32_t)p->vi.fps_den);
|
||||||
|
if (!p->fc_writer) {
|
||||||
|
fprintf(stderr, "[port:%u] framecache unavailable — creating video FIFO fallback\n",
|
||||||
|
ports[pi]);
|
||||||
|
if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) {
|
||||||
|
fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
/* Legacy: always use video FIFO */
|
||||||
|
if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) {
|
||||||
|
fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Open video stream. */
|
/* Open video stream. */
|
||||||
HANDLE vs = NULL;
|
HANDLE vs = NULL;
|
||||||
ULONG r = VHD_OpenStreamHandle(board, rx_streamtype(ports[pi]),
|
ULONG r = VHD_OpenStreamHandle(board, rx_streamtype(ports[pi]),
|
||||||
|
|
@ -644,19 +752,23 @@ int main(int argc, char *argv[]) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Emit format JSON to stderr (one line per port on signal lock). */
|
/* Emit format JSON to stderr (one line per port on signal lock).
|
||||||
|
* Includes slot_id so node-agent / capture-manager can identify
|
||||||
|
* the framecache slot for this port. */
|
||||||
fprintf(stderr,
|
fprintf(stderr,
|
||||||
"{\"port\":%u,\"width\":%d,\"height\":%d,"
|
"{\"port\":%u,\"width\":%d,\"height\":%d,"
|
||||||
"\"fps_num\":%d,\"fps_den\":%d,"
|
"\"fps_num\":%d,\"fps_den\":%d,"
|
||||||
"\"interlaced\":%s,"
|
"\"interlaced\":%s,"
|
||||||
"\"pix_fmt\":\"uyvy422\","
|
"\"pix_fmt\":\"uyvy422\","
|
||||||
"\"audio_channels\":2,\"audio_rate\":48000,"
|
"\"audio_channels\":2,\"audio_rate\":48000,"
|
||||||
"\"device\":%u}\n",
|
"\"device\":%u,"
|
||||||
|
"\"slot_id\":\"%s\"}\n",
|
||||||
ports[pi],
|
ports[pi],
|
||||||
p->vi.width, p->vi.height,
|
p->vi.width, p->vi.height,
|
||||||
p->vi.fps_num, p->vi.fps_den,
|
p->vi.fps_num, p->vi.fps_den,
|
||||||
p->vi.interlaced ? "true" : "false",
|
p->vi.interlaced ? "true" : "false",
|
||||||
device_id);
|
device_id,
|
||||||
|
p->slot_id);
|
||||||
fflush(stderr);
|
fflush(stderr);
|
||||||
|
|
||||||
/* Launch audio thread (blocks until reader connects to audio FIFO). */
|
/* Launch audio thread (blocks until reader connects to audio FIFO). */
|
||||||
|
|
@ -686,6 +798,12 @@ int main(int argc, char *argv[]) {
|
||||||
VHD_StopStream(ps[i].video_stream);
|
VHD_StopStream(ps[i].video_stream);
|
||||||
VHD_CloseStreamHandle(ps[i].video_stream);
|
VHD_CloseStreamHandle(ps[i].video_stream);
|
||||||
}
|
}
|
||||||
|
#ifndef LEGACY_FIFO
|
||||||
|
if (ps[i].fc_writer) {
|
||||||
|
fc_writer_close(ps[i].fc_writer);
|
||||||
|
ps[i].fc_writer = NULL;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
VHD_CloseBoardHandle(board);
|
VHD_CloseBoardHandle(board);
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue