diff --git a/services/capture/deltacast-bridge/CMakeLists.txt b/services/capture/deltacast-bridge/CMakeLists.txt index a877608..3b54bd0 100644 --- a/services/capture/deltacast-bridge/CMakeLists.txt +++ b/services/capture/deltacast-bridge/CMakeLists.txt @@ -4,8 +4,19 @@ set(CMAKE_C_STANDARD 17) set(SDK_ROOT "/sdk" CACHE PATH "Path to extracted VideoMaster SDK") +# Legacy FIFO mode — set LEGACY_FIFO=ON to disable framecache shm writes +# and fall back to the original named-FIFO path. +option(LEGACY_FIFO "Use named FIFOs instead of framecache shm" OFF) + # Primary binary: deltacast-bridge (shared multi-port daemon) -add_executable(deltacast-bridge main.c) +add_executable(deltacast-bridge main.c fc_writer.c) + +if(LEGACY_FIFO) + target_compile_definitions(deltacast-bridge PRIVATE LEGACY_FIFO=1) + message(STATUS "deltacast-bridge: LEGACY_FIFO mode enabled (shm disabled)") +else() + message(STATUS "deltacast-bridge: framecache shm mode enabled") +endif() target_include_directories(deltacast-bridge PRIVATE ${SDK_ROOT}/include/videomaster @@ -19,6 +30,7 @@ target_link_libraries(deltacast-bridge PRIVATE videomasterhd videomasterhd_audio pthread + rt # shm_open, sem_open ) # Embed the SDK RPATH so the binary finds the .so at runtime diff --git a/services/capture/deltacast-bridge/fc_writer.c b/services/capture/deltacast-bridge/fc_writer.c new file mode 100644 index 0000000..d0d8287 --- /dev/null +++ b/services/capture/deltacast-bridge/fc_writer.c @@ -0,0 +1,300 @@ +/** + * fc_writer.c — Framecache slot writer for deltacast-bridge. + * + * Uses only POSIX + libc — no external dependencies beyond what the bridge + * already links. HTTP calls are done with raw sockets (tiny GET/POST/DELETE) + * to avoid pulling in libcurl. + */ +#include "fc_writer.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Re-use the shared memory layout from the framecache service */ +#define FC_MAGIC 0x46524D43u +#define FC_VERSION 1u +#define FC_RING_DEPTH 120u +#define FC_HEADER_SIZE 4096u +#define FC_FRAME_HDR_SIZE 24u + +typedef struct { + uint32_t magic; + uint32_t version; + uint32_t width; + uint32_t height; + uint32_t fps_num; + uint32_t fps_den; + uint32_t pixel_format; + uint32_t frame_size; + uint32_t ring_depth; + uint32_t _reserved; + _Atomic uint64_t write_cursor; + _Atomic uint64_t dropped_frames; + char source_type[32]; + char slot_id[64]; + uint8_t _pad[FC_HEADER_SIZE - 112]; +} fc_hdr_t; + +typedef struct { + uint64_t pts_us; + uint64_t wall_us; + uint32_t size; + uint32_t _pad; + uint8_t data[]; +} fc_frm_t; + +struct fc_writer { + void *base; + size_t shm_size; + int shm_fd; + sem_t *sem; + char slot_id[64]; + char fc_url[256]; /* base URL for DELETE on close */ + char shm_path[128]; + char sem_name[128]; +}; + +/* ── tiny HTTP helper ──────────────────────────────────────────────── */ + +static int http_request(const char *method, + const char *host, int port, const char *path, + const char *body, /* NULL for GET/DELETE */ + char *resp_buf, size_t resp_len) +{ + struct sockaddr_in sa; + memset(&sa, 0, sizeof sa); + sa.sin_family = AF_INET; + sa.sin_port = htons((uint16_t)port); + + struct hostent *he = gethostbyname(host); + if (!he) return -1; + memcpy(&sa.sin_addr, he->h_addr_list[0], (size_t)he->h_length); + + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) return -1; + + struct timeval tv = { .tv_sec = 5 }; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof tv); + setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof tv); + + if (connect(fd, (struct sockaddr *)&sa, sizeof sa) < 0) { + close(fd); return -1; + } + + char req[4096]; + int req_len; + if (body) { + req_len = snprintf(req, sizeof req, + "%s %s HTTP/1.0\r\n" + "Host: %s:%d\r\n" + "Content-Type: application/json\r\n" + "Content-Length: %zu\r\n" + "Connection: close\r\n\r\n" + "%s", + method, path, host, port, strlen(body), body); + } else { + req_len = snprintf(req, sizeof req, + "%s %s HTTP/1.0\r\n" + "Host: %s:%d\r\n" + "Connection: close\r\n\r\n", + method, path, host, port); + } + + if (send(fd, req, (size_t)req_len, 0) < 0) { close(fd); return -1; } + + int status = -1; + size_t got = 0; + char buf[8192]; + ssize_t n; + while ((n = recv(fd, buf + got, sizeof buf - got - 1, 0)) > 0) + got += (size_t)n; + buf[got] = '\0'; + + /* Parse status line */ + if (sscanf(buf, "HTTP/%*s %d", &status) != 1) status = -1; + + /* Copy body (after \r\n\r\n) into resp_buf */ + if (resp_buf && resp_len > 0) { + const char *body_start = strstr(buf, "\r\n\r\n"); + if (body_start) { + strncpy(resp_buf, body_start + 4, resp_len - 1); + resp_buf[resp_len - 1] = '\0'; + } + } + + close(fd); + return status; +} + +/* Parse "host:port" or just "host" from a URL like "http://host:port" */ +static void parse_url(const char *url, char *host, size_t hlen, int *port) +{ + const char *p = url; + if (strncmp(p, "http://", 7) == 0) p += 7; + *port = 7435; + const char *colon = strchr(p, ':'); + if (colon) { + size_t n = (size_t)(colon - p); + if (n >= hlen) n = hlen - 1; + strncpy(host, p, n); + host[n] = '\0'; + *port = atoi(colon + 1); + } else { + strncpy(host, p, hlen - 1); + host[hlen - 1] = '\0'; + } +} + +static int json_str(const char *json, const char *key, char *out, size_t len) +{ + char pat[128]; + snprintf(pat, sizeof pat, "\"%s\":", key); + const char *p = strstr(json, pat); + if (!p) return -1; + p += strlen(pat); + while (*p == ' ') p++; + if (*p != '"') return -1; + p++; + size_t i = 0; + while (*p && *p != '"' && i < len - 1) out[i++] = *p++; + out[i] = '\0'; + return 0; +} + +/* ── public API ────────────────────────────────────────────────────── */ + +fc_writer_t *fc_writer_open(const char *fc_url, + const char *slot_id, + uint32_t width, uint32_t height, + uint32_t fps_num, uint32_t fps_den) +{ + char host[128]; int port; + parse_url(fc_url, host, sizeof host, &port); + + /* POST /slots */ + char body[512]; + snprintf(body, sizeof body, + "{\"slot_id\":\"%s\"," + "\"width\":%u,\"height\":%u," + "\"fps_num\":%u,\"fps_den\":%u," + "\"source_type\":\"deltacast\"}", + slot_id, width, height, fps_num, fps_den); + + char resp[1024] = {0}; + int status = http_request("POST", host, port, "/slots", body, resp, sizeof resp); + if (status != 201) { + fprintf(stderr, "[fc_writer:%s] POST /slots failed (HTTP %d): %s\n", + slot_id, status, resp); + return NULL; + } + + char shm_path[128] = {0}, sem_name[128] = {0}; + json_str(resp, "shm_path", shm_path, sizeof shm_path); + json_str(resp, "sem_name", sem_name, sizeof sem_name); + + if (!shm_path[0] || !sem_name[0]) { + fprintf(stderr, "[fc_writer:%s] bad response (missing shm_path/sem_name)\n", slot_id); + return NULL; + } + + /* mmap the shm file */ + int fd = open(shm_path, O_RDWR); + if (fd < 0) { + fprintf(stderr, "[fc_writer:%s] open %s: %s\n", slot_id, shm_path, strerror(errno)); + return NULL; + } + /* Read header to get frame_size */ + fc_hdr_t hdr; + if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) { + fprintf(stderr, "[fc_writer:%s] bad shm header\n", slot_id); + close(fd); return NULL; + } + size_t total = (size_t)FC_HEADER_SIZE + + (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + hdr.frame_size); + + void *base = mmap(NULL, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { + fprintf(stderr, "[fc_writer:%s] mmap: %s\n", slot_id, strerror(errno)); + close(fd); return NULL; + } + + sem_t *sem = sem_open(sem_name, 0); + if (sem == SEM_FAILED) { + fprintf(stderr, "[fc_writer:%s] sem_open %s: %s\n", slot_id, sem_name, strerror(errno)); + munmap(base, total); close(fd); return NULL; + } + + fc_writer_t *w = calloc(1, sizeof *w); + if (!w) { sem_close(sem); munmap(base, total); close(fd); return NULL; } + + w->base = base; + w->shm_size = total; + w->shm_fd = fd; + w->sem = sem; + strncpy(w->slot_id, slot_id, sizeof w->slot_id - 1); + strncpy(w->fc_url, fc_url, sizeof w->fc_url - 1); + strncpy(w->shm_path, shm_path, sizeof w->shm_path - 1); + strncpy(w->sem_name, sem_name, sizeof w->sem_name - 1); + + fprintf(stderr, "[fc_writer:%s] slot open (%ux%u %.2ffps shm=%s)\n", + slot_id, width, height, + fps_den ? (double)fps_num / fps_den : 0.0, shm_path); + return w; +} + +void fc_writer_write(fc_writer_t *w, + const uint8_t *data, uint32_t size, + uint64_t pts_us) +{ + fc_hdr_t *hdr = (fc_hdr_t *)w->base; + uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed); + uint64_t idx = cur % FC_RING_DEPTH; + + /* Locate frame in ring */ + uint8_t *frames = (uint8_t *)w->base + FC_HEADER_SIZE; + fc_frm_t *frame = (fc_frm_t *)(frames + idx * ((size_t)FC_FRAME_HDR_SIZE + hdr->frame_size)); + + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + uint64_t wall = (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL; + + frame->pts_us = pts_us; + frame->wall_us = wall; + frame->size = size < hdr->frame_size ? size : hdr->frame_size; + memcpy(frame->data, data, frame->size); + + atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release); + sem_post(w->sem); +} + +void fc_writer_close(fc_writer_t *w) +{ + if (!w) return; + + /* DELETE /slots/:id */ + char host[128]; int port; + parse_url(w->fc_url, host, sizeof host, &port); + char path[192]; + snprintf(path, sizeof path, "/slots/%s", w->slot_id); + http_request("DELETE", host, port, path, NULL, NULL, 0); + + sem_close(w->sem); + munmap(w->base, w->shm_size); + close(w->shm_fd); + fprintf(stderr, "[fc_writer:%s] slot closed\n", w->slot_id); + free(w); +} diff --git a/services/capture/deltacast-bridge/fc_writer.h b/services/capture/deltacast-bridge/fc_writer.h new file mode 100644 index 0000000..f2497fd --- /dev/null +++ b/services/capture/deltacast-bridge/fc_writer.h @@ -0,0 +1,50 @@ +/** + * fc_writer.h — Lightweight framecache slot writer for deltacast-bridge. + * + * Registers a slot with the framecache HTTP API on signal lock, then writes + * raw UYVY422 frames directly into the shared memory ring buffer. + * + * Compile with -DLEGACY_FIFO to disable shm writes and fall back to the + * original named-FIFO path (useful during transition / on nodes without the + * framecache container running). + */ +#pragma once + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct fc_writer fc_writer_t; + +/** + * Register a slot with the framecache service and open the shm region for + * writing. fc_url is the HTTP base URL, e.g. "http://localhost:7435". + * slot_id must be unique per port, e.g. "deltacast-0-3" (device-port). + * + * Returns writer handle on success, NULL on failure (falls back to FIFO). + */ +fc_writer_t *fc_writer_open(const char *fc_url, + const char *slot_id, + uint32_t width, uint32_t height, + uint32_t fps_num, uint32_t fps_den); + +/** + * Write one raw UYVY422 frame into the ring buffer. + * Non-blocking — slow consumers are skipped, not waited on. + * pts_us: presentation timestamp in microseconds (0 if unknown). + */ +void fc_writer_write(fc_writer_t *w, + const uint8_t *data, uint32_t size, + uint64_t pts_us); + +/** + * Deregister slot from framecache service and unmap shm. + */ +void fc_writer_close(fc_writer_t *w); + +#ifdef __cplusplus +} +#endif diff --git a/services/capture/deltacast-bridge/main.c b/services/capture/deltacast-bridge/main.c index ce41763..9c12c54 100644 --- a/services/capture/deltacast-bridge/main.c +++ b/services/capture/deltacast-bridge/main.c @@ -3,20 +3,32 @@ * Deltacast VideoMaster SDI shared multi-port bridge daemon. * * Opens the board ONCE, opens RX streams for all requested ports, and - * writes each port's video/audio to named FIFOs in a shared directory. - * One reader thread + one audio thread per port run concurrently. + * writes each port's video frames into a shared-memory framecache slot + * (and audio to a named FIFO — audio-in-shm is a future roadmap item). + * + * Signal fan-out architecture: + * Board → video_thread → fc_writer → /dev/shm/framecache/ + * └→ N consumers (recording, proxy, + * HLS preview) each read with + * their own cursor — zero-copy, + * no bandwidth splitting. * * Usage: * deltacast-bridge --device --ports * [--video-pipe-dir /dev/shm/deltacast] * [--audio-pipe-dir /dev/shm/deltacast] + * [--fc-url http://framecache:7435] * [--signal-timeout ] * * Compat alias: --port treated as --ports (single port). * * For each port that acquires signal, emits one JSON line to stderr: * {"port":N,"width":W,"height":H,"fps_num":N,"fps_den":D, - * "pix_fmt":"uyvy422","audio_rate":48000,"audio_channels":2} + * "pix_fmt":"uyvy422","audio_rate":48000,"audio_channels":2, + * "slot_id":"deltacast--"} + * + * Compile with -DLEGACY_FIFO=1 to disable shm writes and fall back to + * the original named-FIFO path (for nodes without framecache running). * * Runs until SIGTERM/SIGINT, then closes all streams and the board. */ @@ -37,10 +49,17 @@ #include "VideoMasterHD_Sdi.h" #include "VideoMasterHD_Sdi_Audio.h" +#ifndef LEGACY_FIFO +# include "fc_writer.h" +#endif + #ifndef F_SETPIPE_SZ #define F_SETPIPE_SZ 1031 #endif +/* Default framecache URL — overridden by FC_URL env var or --fc-url arg */ +#define FC_URL_DEFAULT "http://localhost:7435" + /* ── Constants ────────────────────────────────────────────────────────── */ #define MAX_PORTS 8 @@ -154,11 +173,16 @@ typedef struct { VideoInfo vi; char video_fifo[256]; char audio_fifo[256]; + char slot_id[128]; /* framecache slot id: "deltacast--" */ + char fc_url[256]; /* framecache HTTP base URL */ /* threads */ pthread_t video_tid; pthread_t audio_tid; /* streams (owned by threads, set before thread launch) */ HANDLE video_stream; +#ifndef LEGACY_FIFO + fc_writer_t *fc_writer; /* shm ring buffer writer (NULL = use FIFO fallback) */ +#endif } PortState; /* ── Audio thread ────────────────────────────────────────────────────── @@ -343,10 +367,67 @@ static void *audio_thread(void *arg) { static void *video_thread(void *arg) { PortState *ps = (PortState *)arg; - /* Outer loop: reopen the FIFO writer each time a reader connects. - * Mirror the audio thread pattern — EPIPE means the ffmpeg sidecar for - * this port died (session stop/restart), NOT a hardware fault. We reopen - * and block until the next recorder start; other ports are unaffected. */ +#ifndef LEGACY_FIFO + /* ── Framecache shm path (primary) ────────────────────────────────── + * Write frames directly into the shared memory ring buffer. + * Multiple consumers (growing recorder, proxy encoder, HLS preview) + * each hold their own read cursor and read independently — no FIFO + * splitting, no bandwidth halving. + * + * The fc_writer was opened by main() after signal lock. If it is + * NULL the framecache service was unavailable and we fall through to + * the legacy FIFO path automatically. + */ + if (ps->fc_writer) { + uint64_t frame_seq = 0; + while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { + HANDLE slot = NULL; + ULONG r = VHD_LockSlotHandle(ps->video_stream, &slot); + if (r == VHDERR_NOERROR) { + BYTE *buf = NULL; + ULONG sz = 0; + if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) { + ULONG expected = (ULONG)ps->vi.width * (ULONG)ps->vi.height * 2; + if (sz != expected) { + fprintf(stderr, + "[video:%u] WARN: sz=%lu != expected %lu — packing mismatch, skipping\n", + ps->port, (unsigned long)sz, (unsigned long)expected); + VHD_UnlockSlotHandle(slot); + continue; + } + /* pts: frame index × frame duration in µs */ + uint64_t pts_us = 0; + if (ps->vi.fps_num > 0) { + pts_us = frame_seq * 1000000ULL + * (uint64_t)ps->vi.fps_den + / (uint64_t)ps->vi.fps_num; + } + fc_writer_write(ps->fc_writer, buf, (uint32_t)sz, pts_us); + frame_seq++; + } + VHD_UnlockSlotHandle(slot); + } else if (r != VHDERR_TIMEOUT) { + fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping port\n", + ps->port, (unsigned long)r); + atomic_store(&g_port_stop[ps->port], 1); + break; + } + } + return NULL; + } + /* fc_writer == NULL → fall through to FIFO path */ + fprintf(stderr, "[video:%u] fc_writer unavailable — falling back to FIFO\n", ps->port); +#endif /* !LEGACY_FIFO */ + + /* ── Legacy FIFO path ──────────────────────────────────────────────── + * Kept as compile-time fallback (-DLEGACY_FIFO=1) or when the + * framecache service is not reachable at startup. + * + * Outer loop: reopen the FIFO writer each time a reader connects. + * EPIPE means the ffmpeg sidecar for this port died (session + * stop/restart), NOT a hardware fault. Reopen and block until the + * next recorder start; other ports are unaffected. + */ while (!atomic_load(&g_stop) && !atomic_load(&g_port_stop[ps->port])) { int fd = open(ps->video_fifo, O_WRONLY); @@ -359,7 +440,8 @@ static void *video_thread(void *arg) { { int pipe_sz = 64 * 1024 * 1024; /* 64 MB — ~16 frames of 1080p UYVY */ if (fcntl(fd, F_SETPIPE_SZ, pipe_sz) < 0) { - fprintf(stderr, "[video:%u] fcntl F_SETPIPE_SZ failed: %s\n", ps->port, strerror(errno)); + fprintf(stderr, "[video:%u] fcntl F_SETPIPE_SZ failed: %s\n", + ps->port, strerror(errno)); } } @@ -373,14 +455,14 @@ static void *video_thread(void *arg) { if (VHD_GetSlotBuffer(slot, VHD_SDI_BT_VIDEO, &buf, &sz) == VHDERR_NOERROR) { ULONG expected = (ULONG)ps->vi.width * (ULONG)ps->vi.height * 2; if (sz != expected) { - fprintf(stderr, "[video:%u] WARN: slot sz=%lu != expected %lu (w=%d h=%d) -- packing mismatch; skipping frame\n", - ps->port, sz, expected, ps->vi.width, ps->vi.height); + fprintf(stderr, + "[video:%u] WARN: slot sz=%lu != expected %lu (w=%d h=%d) -- packing mismatch; skipping frame\n", + ps->port, (unsigned long)sz, (unsigned long)expected, + ps->vi.width, ps->vi.height); VHD_UnlockSlotHandle(slot); continue; } if (write_all(fd, buf, sz) < 0) { - /* EPIPE: sidecar died (session stop/restart). - * Break to outer loop — reopen for next session. */ fprintf(stderr, "[video:%u] EPIPE — waiting for next reader\n", ps->port); VHD_UnlockSlotHandle(slot); break; @@ -389,7 +471,7 @@ static void *video_thread(void *arg) { VHD_UnlockSlotHandle(slot); } else if (r != VHDERR_TIMEOUT) { fprintf(stderr, "[video:%u] VHD_LockSlotHandle error %lu — stopping port\n", - ps->port, r); + ps->port, (unsigned long)r); atomic_store(&g_port_stop[ps->port], 1); fatal = 1; break; @@ -419,12 +501,15 @@ static int parse_ports(const char *csv, unsigned *ports, int max) { /* ── Main ─────────────────────────────────────────────────────────────── */ int main(int argc, char *argv[]) { - unsigned device_id = 0; - unsigned ports[MAX_PORTS] = {0}; - int port_count = 0; - int sig_timeout = 30; - const char *video_pipe_dir = "/dev/shm/deltacast"; - const char *audio_pipe_dir = "/dev/shm/deltacast"; + unsigned device_id = 0; + unsigned ports[MAX_PORTS] = {0}; + int port_count = 0; + int sig_timeout = 30; + const char *video_pipe_dir = "/dev/shm/deltacast"; + const char *audio_pipe_dir = "/dev/shm/deltacast"; + /* Framecache URL: CLI arg > FC_URL env var > default */ + const char *fc_url_env = getenv("FC_URL"); + const char *fc_url = fc_url_env ? fc_url_env : FC_URL_DEFAULT; for (int i = 1; i < argc; i++) { if (!strcmp(argv[i], "--device") && i+1 < argc) { @@ -441,6 +526,8 @@ int main(int argc, char *argv[]) { audio_pipe_dir = argv[++i]; } else if (!strcmp(argv[i], "--signal-timeout") && i+1 < argc) { sig_timeout = atoi(argv[++i]); + } else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) { + fc_url = argv[++i]; } } @@ -601,17 +688,38 @@ int main(int argc, char *argv[]) { "%s/video-%u.fifo", video_pipe_dir, ports[pi]); snprintf(p->audio_fifo, sizeof(p->audio_fifo), "%s/audio-%u.fifo", audio_pipe_dir, ports[pi]); + snprintf(p->slot_id, sizeof(p->slot_id), + "deltacast-%u-%u", device_id, ports[pi]); + strncpy(p->fc_url, fc_url, sizeof(p->fc_url) - 1); - /* Create FIFOs (mkfifo; ignore EEXIST). */ - if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) { - fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno)); - continue; - } + /* Create audio FIFO (always needed — audio stays in FIFO for now). */ if (mkfifo(p->audio_fifo, 0666) != 0 && errno != EEXIST) { fprintf(stderr, "[port:%u] mkfifo audio failed: %s\n", ports[pi], strerror(errno)); continue; } +#ifndef LEGACY_FIFO + /* Open framecache slot for video frames. + * Fall back to FIFO if framecache is unreachable. */ + p->fc_writer = fc_writer_open(p->fc_url, p->slot_id, + (uint32_t)p->vi.width, (uint32_t)p->vi.height, + (uint32_t)p->vi.fps_num, (uint32_t)p->vi.fps_den); + if (!p->fc_writer) { + fprintf(stderr, "[port:%u] framecache unavailable — creating video FIFO fallback\n", + ports[pi]); + if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) { + fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno)); + continue; + } + } +#else + /* Legacy: always use video FIFO */ + if (mkfifo(p->video_fifo, 0666) != 0 && errno != EEXIST) { + fprintf(stderr, "[port:%u] mkfifo video failed: %s\n", ports[pi], strerror(errno)); + continue; + } +#endif + /* Open video stream. */ HANDLE vs = NULL; ULONG r = VHD_OpenStreamHandle(board, rx_streamtype(ports[pi]), @@ -644,19 +752,23 @@ int main(int argc, char *argv[]) { continue; } - /* Emit format JSON to stderr (one line per port on signal lock). */ + /* Emit format JSON to stderr (one line per port on signal lock). + * Includes slot_id so node-agent / capture-manager can identify + * the framecache slot for this port. */ fprintf(stderr, "{\"port\":%u,\"width\":%d,\"height\":%d," "\"fps_num\":%d,\"fps_den\":%d," "\"interlaced\":%s," "\"pix_fmt\":\"uyvy422\"," "\"audio_channels\":2,\"audio_rate\":48000," - "\"device\":%u}\n", + "\"device\":%u," + "\"slot_id\":\"%s\"}\n", ports[pi], p->vi.width, p->vi.height, p->vi.fps_num, p->vi.fps_den, p->vi.interlaced ? "true" : "false", - device_id); + device_id, + p->slot_id); fflush(stderr); /* Launch audio thread (blocks until reader connects to audio FIFO). */ @@ -686,6 +798,12 @@ int main(int argc, char *argv[]) { VHD_StopStream(ps[i].video_stream); VHD_CloseStreamHandle(ps[i].video_stream); } +#ifndef LEGACY_FIFO + if (ps[i].fc_writer) { + fc_writer_close(ps[i].fc_writer); + ps[i].fc_writer = NULL; + } +#endif } VHD_CloseBoardHandle(board);