feat(framecache): phase 1 — framecache container + consumer library

- services/framecache/: new standalone container
  - slot.h/slot.c: shm ring buffer (120 frames, FC_MAGIC header, atomic
    write_cursor, POSIX semaphore per slot)
  - registry.h/registry.c: in-memory slot registry + /dev/shm/framecache/
    registry.json persistence
  - framecache.c: HTTP API server (libmicrohttpd, port 7435)
    POST /slots, GET /slots, GET /slots/:id, DELETE /slots/:id, GET /health
  - fc_client.h/fc_client.c: consumer library — fc_consumer_open/read/close
    with per-consumer cursor, timeout via sem_timedwait, automatic skip+count
    when consumer falls behind writer by > ring_depth frames
  - fc_test_consumer.c: dev utility to attach to any slot and print fps/stats
  - CMakeLists.txt: framecache server + fc_client static lib + test consumer
  - Dockerfile: builder + slim runtime stages

- docker-compose.worker.yml: add framecache service (profile: capture,
  ipc: host, shm_size from FC_SHM_SIZE_GB env var, healthcheck)

- .env.example: document FC_SHM_SIZE_GB with per-node guidance
This commit is contained in:
Wild Dragon Dev 2026-06-03 14:53:51 +00:00
parent 2f1697b77b
commit 1573bf8954
12 changed files with 1186 additions and 0 deletions

View file

@ -69,6 +69,14 @@ GOOGLE_ALLOWED_DOMAIN=
# the authenticator code (Google is treated as the first factor). Accounts without
# TOTP complete sign-in in one Google step.
# Framecache — shared memory ring buffer for SDI + network ingest fan-out.
# Size in GB. Tune per node based on available RAM and number of SDI inputs.
# Each 1080p59.94 source uses ~494MB (120-frame ring at 4.1MB/frame).
# Baratheon (251GB RAM): 60
# zampp1 (93GB RAM): 40
# zampp2 (18GB RAM): 8 (increase node RAM before deploying capture)
FC_SHM_SIZE_GB=40
# Playout / Master Control (MCR)
# Image tag the mam-api spawns when a channel starts. Build with:
# docker compose --profile build-only build playout

View file

@ -151,6 +151,34 @@ services:
networks:
- wild-dragon-worker
# Framecache — shared memory ring buffer for SDI + network ingest fan-out.
# Runs on every worker node that has capture sources (Blackmagic, Deltacast).
# IPC host mode lets all capture sidecars share /dev/shm with this container.
# FC_SHM_SIZE can be tuned per node in .env.worker:
# Baratheon (251GB RAM): FC_SHM_SIZE=64424509440 (60GB)
# zampp1 (93GB RAM): FC_SHM_SIZE=42949672960 (40GB)
# zampp2 (18GB RAM): FC_SHM_SIZE=8589934592 (8GB — increase RAM first)
framecache:
build: ./services/framecache
profiles: [capture]
restart: unless-stopped
ipc: host
shm_size: '${FC_SHM_SIZE_GB:-40}gb'
environment:
FC_PORT: 7435
ports:
- "7435:7435"
volumes:
- /dev/shm:/dev/shm
networks:
- wild-dragon-worker
healthcheck:
test: ["CMD", "wget", "-qO-", "http://localhost:7435/health"]
interval: 10s
timeout: 3s
retries: 3
start_period: 5s
networks:
wild-dragon-worker:
driver: bridge

View file

@ -0,0 +1,39 @@
cmake_minimum_required(VERSION 3.16)
project(framecache C)
set(CMAKE_C_STANDARD 11)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra -O2")
# ── libmicrohttpd ────────────────────────────────────────────────────
find_library(MHD_LIB microhttpd REQUIRED)
find_path(MHD_INCLUDE microhttpd.h REQUIRED)
include_directories(${MHD_INCLUDE})
# ── framecache server ────────────────────────────────────────────────
add_executable(framecache
src/framecache.c
src/slot.c
src/registry.c
)
target_link_libraries(framecache ${MHD_LIB} rt pthread)
# ── fc_client static library (used by bridges + test) ───────────────
add_library(fc_client STATIC
client/fc_client.c
src/slot.c # client needs fc_slot_shm_size / fc_frame_at
)
target_include_directories(fc_client PUBLIC src client)
target_link_libraries(fc_client rt pthread)
# ── test consumer (dev utility) ──────────────────────────────────────
if(BUILD_TESTS)
add_executable(fc_test_consumer
client/fc_test_consumer.c
)
target_link_libraries(fc_test_consumer fc_client)
target_include_directories(fc_test_consumer PRIVATE src client)
endif()
install(TARGETS framecache DESTINATION bin)
install(FILES client/fc_client.h src/slot.h DESTINATION include/framecache)
install(TARGETS fc_client DESTINATION lib)

View file

@ -0,0 +1,31 @@
# ── Build stage ─────────────────────────────────────────────────────
FROM debian:bookworm AS builder
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential cmake libmicrohttpd-dev \
&& rm -rf /var/lib/apt/lists/*
COPY . /src
RUN cmake -S /src -B /build \
-DCMAKE_BUILD_TYPE=Release \
&& cmake --build /build -j"$(nproc)"
# ── Runtime stage ────────────────────────────────────────────────────
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
libmicrohttpd12 \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /build/framecache /usr/local/bin/framecache
COPY --from=builder /build/fc_test_consumer /usr/local/bin/fc_test_consumer 2>/dev/null || true
# /dev/shm/framecache is created at runtime (tmpfs)
RUN mkdir -p /dev/shm/framecache
EXPOSE 7435
HEALTHCHECK --interval=10s --timeout=3s --start-period=5s \
CMD wget -qO- http://localhost:7435/health || exit 1
CMD ["/usr/local/bin/framecache"]

View file

@ -0,0 +1,150 @@
/**
* fc_client.c Consumer-side framecache client implementation.
*/
#include "fc_client.h"
#include "../src/slot.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <time.h>
#include <unistd.h>
#define SHM_DIR "/dev/shm/framecache"
#define SEM_PREFIX "/framecache-"
#define SEM_SUFFIX "-write"
struct fc_consumer {
int shm_fd;
void *base;
size_t shm_size;
sem_t *sem;
uint64_t read_cursor; /* consumer's own position in the ring */
uint64_t local_dropped; /* frames skipped by this consumer */
char slot_id[FC_MAX_SLOT_ID];
};
static uint64_t now_us(void)
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL;
}
fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
{
char shm_path[128], sem_name[128];
snprintf(shm_path, sizeof shm_path, "%s/%s", SHM_DIR, slot_id);
snprintf(sem_name, sizeof sem_name, "%s%s%s", SEM_PREFIX, slot_id, SEM_SUFFIX);
uint64_t deadline = now_us() + wait_ms * 1000ULL;
int fd = -1;
while (1) {
fd = open(shm_path, O_RDONLY);
if (fd >= 0) break;
if (now_us() >= deadline) return NULL;
struct timespec ts = { .tv_nsec = 100000000 }; /* 100ms */
nanosleep(&ts, NULL);
}
/* Read header to get frame_size */
fc_header_t hdr;
if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) {
close(fd); return NULL;
}
size_t total = fc_slot_shm_size(hdr.frame_size);
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) { close(fd); return NULL; }
sem_t *sem = sem_open(sem_name, 0);
if (sem == SEM_FAILED) { munmap(base, total); close(fd); return NULL; }
fc_consumer_t *c = calloc(1, sizeof *c);
if (!c) { sem_close(sem); munmap(base, total); close(fd); return NULL; }
c->shm_fd = fd;
c->base = base;
c->shm_size = total;
c->sem = sem;
/* Start reading from the current write position so we don't replay old frames */
c->read_cursor = atomic_load_explicit(
&((fc_header_t *)base)->write_cursor, memory_order_acquire);
c->local_dropped = 0;
strncpy(c->slot_id, slot_id, FC_MAX_SLOT_ID - 1);
return c;
}
int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms)
{
fc_header_t *hdr = (fc_header_t *)c->base;
/* Wait for a new frame via semaphore */
struct timespec abs_ts;
clock_gettime(CLOCK_REALTIME, &abs_ts);
abs_ts.tv_sec += (time_t)(timeout_ms / 1000);
abs_ts.tv_nsec += (long)((timeout_ms % 1000) * 1000000L);
if (abs_ts.tv_nsec >= 1000000000L) {
abs_ts.tv_sec++;
abs_ts.tv_nsec -= 1000000000L;
}
while (sem_timedwait(c->sem, &abs_ts) != 0) {
if (errno == ETIMEDOUT) return FC_TIMEOUT;
if (errno == EINTR) continue;
return FC_ERROR;
}
uint64_t write_cur = atomic_load_explicit(&hdr->write_cursor,
memory_order_acquire);
int dropped = 0;
/* Check if consumer fell behind by more than ring_depth */
if (write_cur > c->read_cursor + hdr->ring_depth) {
uint64_t skipped = write_cur - c->read_cursor - hdr->ring_depth;
c->read_cursor = write_cur - hdr->ring_depth;
c->local_dropped += skipped;
atomic_fetch_add(&hdr->dropped_frames, skipped);
dropped = 1;
}
if (c->read_cursor >= write_cur) {
/* Semaphore posted but nothing new yet (spurious) */
return FC_TIMEOUT;
}
fc_frame_t *frame = fc_frame_at(c->base, hdr->frame_size, c->read_cursor);
ref->data = frame->data;
ref->size = frame->size;
ref->pts_us = frame->pts_us;
ref->wall_us = frame->wall_us;
ref->seq = c->read_cursor;
c->read_cursor++;
return dropped ? FC_DROPPED : FC_OK;
}
void fc_consumer_close(fc_consumer_t *c)
{
if (!c) return;
sem_close(c->sem);
munmap(c->base, c->shm_size);
close(c->shm_fd);
free(c);
}
uint64_t fc_consumer_write_cursor(fc_consumer_t *c)
{
fc_header_t *hdr = (fc_header_t *)c->base;
return atomic_load(&hdr->write_cursor);
}
uint64_t fc_consumer_dropped(fc_consumer_t *c)
{
return c->local_dropped;
}

View file

@ -0,0 +1,70 @@
/**
* fc_client.h Consumer-side framecache client library.
*
* Usage:
* fc_consumer_t *c = fc_consumer_open("deltacast-zampp3-0");
* fc_frame_ref_t ref;
* while (fc_consumer_read(c, &ref, 2000) == FC_OK) {
* // ref.data valid until next fc_consumer_read call
* process_frame(ref.data, ref.size, ref.pts_us);
* }
* fc_consumer_close(c);
*
* Each consumer tracks its own read_cursor multiple consumers on the same
* slot are fully independent and never block each other or the writer.
*
* If a consumer falls more than ring_depth frames behind the writer its cursor
* is snapped to the latest frame and FC_DROPPED is returned once.
*/
#pragma once
#include <stdint.h>
#include <stddef.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Return codes */
#define FC_OK 0
#define FC_TIMEOUT 1 /* no new frame within timeout_ms */
#define FC_DROPPED 2 /* consumer fell behind; cursor snapped to latest */
#define FC_ERROR -1
typedef struct fc_consumer fc_consumer_t;
typedef struct {
const uint8_t *data; /* zero-copy pointer into shm ring — valid until next read */
uint32_t size; /* bytes */
uint64_t pts_us; /* presentation timestamp (microseconds) */
uint64_t wall_us; /* wall clock at write time (microseconds) */
uint64_t seq; /* write_cursor value for this frame */
} fc_frame_ref_t;
/**
* Open a consumer handle for the named slot.
* Polls the slot shm file until it appears (up to wait_ms milliseconds).
* Returns NULL if slot not found within wait_ms or on error.
*/
fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms);
/**
* Read the next frame.
* Blocks up to timeout_ms waiting for a new frame (via semaphore).
* Returns FC_OK, FC_TIMEOUT, FC_DROPPED, or FC_ERROR.
* On FC_OK or FC_DROPPED the ref fields are populated.
*/
int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms);
/** Close the consumer handle. Does NOT destroy the slot. */
void fc_consumer_close(fc_consumer_t *c);
/** Current write_cursor of the slot (approximate — no lock). */
uint64_t fc_consumer_write_cursor(fc_consumer_t *c);
/** Frames dropped by this consumer since open. */
uint64_t fc_consumer_dropped(fc_consumer_t *c);
#ifdef __cplusplus
}
#endif

View file

@ -0,0 +1,73 @@
/**
* fc_test_consumer.c Dev utility: attach to a framecache slot and print stats.
*
* Usage: fc_test_consumer <slot_id> [wait_ms]
*/
#include "fc_client.h"
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <time.h>
static volatile int g_run = 1;
static void on_sig(int s) { (void)s; g_run = 0; }
int main(int argc, char **argv)
{
if (argc < 2) {
fprintf(stderr, "Usage: %s <slot_id> [wait_ms]\n", argv[0]);
return 1;
}
const char *slot_id = argv[1];
uint64_t wait_ms = argc >= 3 ? (uint64_t)atoi(argv[2]) : 30000;
signal(SIGINT, on_sig);
signal(SIGTERM, on_sig);
fprintf(stderr, "Opening slot '%s' (wait up to %llums)...\n",
slot_id, (unsigned long long)wait_ms);
fc_consumer_t *c = fc_consumer_open(slot_id, wait_ms);
if (!c) {
fprintf(stderr, "Failed to open slot '%s'\n", slot_id);
return 1;
}
fprintf(stderr, "Slot opened. Reading frames (Ctrl+C to stop)...\n");
uint64_t total = 0, dropped = 0;
struct timespec t0;
clock_gettime(CLOCK_MONOTONIC, &t0);
while (g_run) {
fc_frame_ref_t ref;
int rc = fc_consumer_read(c, &ref, 2000);
if (rc == FC_TIMEOUT) continue;
if (rc == FC_ERROR) { fprintf(stderr, "read error\n"); break; }
if (rc == FC_DROPPED) {
dropped = fc_consumer_dropped(c);
fprintf(stderr, "[WARN] consumer fell behind — total dropped: %llu\n",
(unsigned long long)dropped);
}
total++;
/* Print stats every 100 frames */
if (total % 100 == 0) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
double elapsed = (now.tv_sec - t0.tv_sec)
+ (now.tv_nsec - t0.tv_nsec) * 1e-9;
fprintf(stdout, "frames=%llu dropped=%llu fps=%.2f pts_us=%llu\n",
(unsigned long long)total,
(unsigned long long)fc_consumer_dropped(c),
total / elapsed,
(unsigned long long)ref.pts_us);
fflush(stdout);
}
}
fprintf(stderr, "Done. total=%llu dropped=%llu\n",
(unsigned long long)total,
(unsigned long long)fc_consumer_dropped(c));
fc_consumer_close(c);
return 0;
}

View file

@ -0,0 +1,350 @@
/**
* framecache.c Main entry point. HTTP API server + slot manager.
*
* Endpoints:
* POST /slots Create slot
* GET /slots List slots
* GET /slots/:id Get slot detail
* DELETE /slots/:id Destroy slot
* GET /health Health check
*
* Uses libmicrohttpd for the HTTP layer (single-threaded, poll-based).
*/
#include "slot.h"
#include "registry.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <signal.h>
#include <errno.h>
#include <sys/stat.h>
#include <microhttpd.h>
#ifndef FC_PORT_DEFAULT
#define FC_PORT_DEFAULT 7435
#endif
/* ── tiny JSON helpers ─────────────────────────────────────────────── */
static int json_get_uint(const char *json, const char *key, uint32_t *out)
{
char pat[128];
snprintf(pat, sizeof pat, "\"%s\":", key);
const char *p = strstr(json, pat);
if (!p) return -1;
p += strlen(pat);
while (*p == ' ' || *p == '\t') p++;
*out = (uint32_t)strtoul(p, NULL, 10);
return 0;
}
static int json_get_str(const char *json, const char *key,
char *out, size_t out_len)
{
char pat[128];
snprintf(pat, sizeof pat, "\"%s\":", key);
const char *p = strstr(json, pat);
if (!p) return -1;
p += strlen(pat);
while (*p == ' ' || *p == '\t') p++;
if (*p != '"') return -1;
p++;
size_t i = 0;
while (*p && *p != '"' && i < out_len - 1)
out[i++] = *p++;
out[i] = '\0';
return 0;
}
/* ── HTTP request accumulator ──────────────────────────────────────── */
typedef struct {
char *buf;
size_t len;
size_t cap;
} req_body_t;
static void req_body_free(req_body_t *r)
{
free(r->buf);
r->buf = NULL; r->len = 0; r->cap = 0;
}
/* ── response helpers ──────────────────────────────────────────────── */
static enum MHD_Result respond(struct MHD_Connection *conn,
unsigned int status,
const char *body)
{
struct MHD_Response *r = MHD_create_response_from_buffer(
strlen(body), (void *)body, MHD_RESPMEM_MUST_COPY);
MHD_add_response_header(r, "Content-Type", "application/json");
MHD_add_response_header(r, "Access-Control-Allow-Origin", "*");
enum MHD_Result rc = MHD_queue_response(conn, status, r);
MHD_destroy_response(r);
return rc;
}
/* ── slot → JSON ───────────────────────────────────────────────────── */
static void slot_to_json(struct fc_slot *s, char *buf, size_t len)
{
fc_header_t *hdr = fc_slot_header(s);
uint64_t wc = atomic_load(&hdr->write_cursor);
uint64_t df = atomic_load(&hdr->dropped_frames);
/* simple fps estimate — not perfect but good enough for status */
snprintf(buf, len,
"{"
"\"slot_id\":\"%s\","
"\"shm_path\":\"%s\","
"\"sem_name\":\"%s\","
"\"width\":%u,"
"\"height\":%u,"
"\"fps_num\":%u,"
"\"fps_den\":%u,"
"\"pixel_format\":\"UYVY422\","
"\"source_type\":\"%s\","
"\"frame_size\":%u,"
"\"ring_depth\":%u,"
"\"write_cursor\":%llu,"
"\"dropped_frames\":%llu"
"}",
fc_slot_id(s),
fc_slot_shm_path(s),
fc_slot_sem_name(s),
hdr->width, hdr->height,
hdr->fps_num, hdr->fps_den,
hdr->source_type,
hdr->frame_size,
hdr->ring_depth,
(unsigned long long)wc,
(unsigned long long)df
);
}
/* ── request handler ───────────────────────────────────────────────── */
static enum MHD_Result handle_request(
void *cls,
struct MHD_Connection *conn,
const char *url,
const char *method,
const char *version,
const char *upload_data,
size_t *upload_data_size,
void **con_cls)
{
(void)cls; (void)version;
/* First call: allocate body accumulator */
if (*con_cls == NULL) {
req_body_t *rb = calloc(1, sizeof *rb);
if (!rb) return MHD_NO;
*con_cls = rb;
return MHD_YES;
}
req_body_t *rb = (req_body_t *)*con_cls;
/* Accumulate POST body */
if (*upload_data_size > 0) {
size_t need = rb->len + *upload_data_size + 1;
if (need > rb->cap) {
rb->buf = realloc(rb->buf, need);
rb->cap = need;
}
memcpy(rb->buf + rb->len, upload_data, *upload_data_size);
rb->len += *upload_data_size;
rb->buf[rb->len] = '\0';
*upload_data_size = 0;
return MHD_YES;
}
enum MHD_Result rc;
char resp[4096];
/* GET /health */
if (strcmp(method, "GET") == 0 && strcmp(url, "/health") == 0) {
rc = respond(conn, MHD_HTTP_OK, "{\"status\":\"ok\"}");
goto done;
}
/* GET /slots */
if (strcmp(method, "GET") == 0 && strcmp(url, "/slots") == 0) {
char big[65536];
int pos = 0;
pos += snprintf(big + pos, sizeof big - pos, "[");
int first = 1;
for (int i = 0; i < FC_MAX_SLOTS; i++) {
if (!g_registry[i].active) continue;
char entry[2048];
slot_to_json(g_registry[i].slot, entry, sizeof entry);
if (!first) big[pos++] = ',';
first = 0;
pos += snprintf(big + pos, sizeof big - pos, "%s", entry);
}
snprintf(big + pos, sizeof big - pos, "]");
rc = respond(conn, MHD_HTTP_OK, big);
goto done;
}
/* GET /slots/:id */
if (strcmp(method, "GET") == 0 &&
strncmp(url, "/slots/", 7) == 0 && strlen(url) > 7)
{
const char *id = url + 7;
struct fc_slot *s = registry_find(id);
if (!s) {
rc = respond(conn, MHD_HTTP_NOT_FOUND,
"{\"error\":\"slot not found\"}");
goto done;
}
slot_to_json(s, resp, sizeof resp);
rc = respond(conn, MHD_HTTP_OK, resp);
goto done;
}
/* POST /slots */
if (strcmp(method, "POST") == 0 && strcmp(url, "/slots") == 0) {
if (!rb->buf || rb->len == 0) {
rc = respond(conn, MHD_HTTP_BAD_REQUEST,
"{\"error\":\"empty body\"}");
goto done;
}
char slot_id[FC_MAX_SLOT_ID] = {0};
char source_type[32] = "unknown";
uint32_t width = 0, height = 0, fps_num = 0, fps_den = 0;
json_get_str(rb->buf, "slot_id", slot_id, sizeof slot_id);
json_get_str(rb->buf, "source_type", source_type, sizeof source_type);
json_get_uint(rb->buf, "width", &width);
json_get_uint(rb->buf, "height", &height);
json_get_uint(rb->buf, "fps_num", &fps_num);
json_get_uint(rb->buf, "fps_den", &fps_den);
if (!slot_id[0] || !width || !height || !fps_num || !fps_den) {
rc = respond(conn, MHD_HTTP_BAD_REQUEST,
"{\"error\":\"missing required fields: "
"slot_id, width, height, fps_num, fps_den\"}");
goto done;
}
if (registry_find(slot_id)) {
rc = respond(conn, MHD_HTTP_CONFLICT,
"{\"error\":\"slot already exists\"}");
goto done;
}
struct fc_slot *s = fc_slot_create(slot_id, width, height,
fps_num, fps_den,
FC_PIX_UYVY422, source_type);
if (!s) {
rc = respond(conn, MHD_HTTP_INTERNAL_SERVER_ERROR,
"{\"error\":\"failed to create slot\"}");
goto done;
}
registry_add(s);
snprintf(resp, sizeof resp,
"{\"slot_id\":\"%s\","
"\"shm_path\":\"%s\","
"\"sem_name\":\"%s\"}",
fc_slot_id(s),
fc_slot_shm_path(s),
fc_slot_sem_name(s));
rc = respond(conn, MHD_HTTP_CREATED, resp);
goto done;
}
/* DELETE /slots/:id */
if (strcmp(method, "DELETE") == 0 &&
strncmp(url, "/slots/", 7) == 0 && strlen(url) > 7)
{
const char *id = url + 7;
struct fc_slot *s = registry_find(id);
if (!s) {
rc = respond(conn, MHD_HTTP_NOT_FOUND,
"{\"error\":\"slot not found\"}");
goto done;
}
registry_remove(id);
fc_slot_destroy(s);
rc = respond(conn, MHD_HTTP_NO_CONTENT, "");
goto done;
}
rc = respond(conn, MHD_HTTP_NOT_FOUND, "{\"error\":\"not found\"}");
done:
req_body_free(rb);
free(rb);
*con_cls = NULL;
return rc;
}
static void request_completed(void *cls,
struct MHD_Connection *conn,
void **con_cls,
enum MHD_RequestTerminationCode toe)
{
(void)cls; (void)conn; (void)toe;
if (*con_cls) {
req_body_free((req_body_t *)*con_cls);
free(*con_cls);
*con_cls = NULL;
}
}
/* ── main ──────────────────────────────────────────────────────────── */
static volatile int g_running = 1;
static void on_signal(int sig) { (void)sig; g_running = 0; }
int main(void)
{
signal(SIGINT, on_signal);
signal(SIGTERM, on_signal);
/* Ensure /dev/shm/framecache exists */
mkdir("/dev/shm/framecache", 0755);
/* Write empty registry */
registry_write_json();
const char *port_str = getenv("FC_PORT");
uint16_t port = port_str ? (uint16_t)atoi(port_str) : FC_PORT_DEFAULT;
struct MHD_Daemon *daemon = MHD_start_daemon(
MHD_USE_SELECT_INTERNALLY,
port,
NULL, NULL,
handle_request, NULL,
MHD_OPTION_NOTIFY_COMPLETED, request_completed, NULL,
MHD_OPTION_END);
if (!daemon) {
fprintf(stderr, "[framecache] failed to start HTTP server on port %u\n", port);
return 1;
}
fprintf(stderr, "[framecache] listening on port %u\n", port);
while (g_running) {
struct timespec ts = { .tv_sec = 0, .tv_nsec = 100000000 }; /* 100ms */
nanosleep(&ts, NULL);
}
fprintf(stderr, "[framecache] shutting down\n");
/* Destroy all active slots */
for (int i = 0; i < FC_MAX_SLOTS; i++) {
if (g_registry[i].active) {
registry_remove(g_registry[i].slot_id);
fc_slot_destroy(g_registry[i].slot);
}
}
MHD_stop_daemon(daemon);
return 0;
}

View file

@ -0,0 +1,108 @@
/**
* registry.c In-memory slot registry + JSON persistence.
*/
#include "registry.h"
#include "slot.h"
#include <stdio.h>
#include <string.h>
#include <time.h>
fc_registry_entry_t g_registry[FC_MAX_SLOTS];
int g_registry_count = 0;
static const char *REGISTRY_JSON = "/dev/shm/framecache/registry.json";
void registry_add(struct fc_slot *slot)
{
for (int i = 0; i < FC_MAX_SLOTS; i++) {
if (!g_registry[i].active) {
g_registry[i].active = 1;
g_registry[i].slot = slot;
strncpy(g_registry[i].slot_id, fc_slot_id(slot),
FC_MAX_SLOT_ID - 1);
g_registry_count++;
registry_write_json();
return;
}
}
fprintf(stderr, "[framecache] registry full (%d slots)\n", FC_MAX_SLOTS);
}
void registry_remove(const char *slot_id)
{
for (int i = 0; i < FC_MAX_SLOTS; i++) {
if (g_registry[i].active &&
strncmp(g_registry[i].slot_id, slot_id, FC_MAX_SLOT_ID) == 0)
{
g_registry[i].active = 0;
g_registry[i].slot = NULL;
g_registry[i].slot_id[0] = '\0';
g_registry_count--;
registry_write_json();
return;
}
}
}
struct fc_slot *registry_find(const char *slot_id)
{
for (int i = 0; i < FC_MAX_SLOTS; i++) {
if (g_registry[i].active &&
strncmp(g_registry[i].slot_id, slot_id, FC_MAX_SLOT_ID) == 0)
{
return g_registry[i].slot;
}
}
return NULL;
}
void registry_write_json(void)
{
FILE *f = fopen(REGISTRY_JSON, "w");
if (!f) return;
fprintf(f, "{\n \"version\": 1,\n \"slots\": {\n");
int first = 1;
for (int i = 0; i < FC_MAX_SLOTS; i++) {
if (!g_registry[i].active) continue;
fc_header_t *hdr = fc_slot_header(g_registry[i].slot);
char ts[32];
time_t now = time(NULL);
struct tm *t = gmtime(&now);
strftime(ts, sizeof ts, "%Y-%m-%dT%H:%M:%SZ", t);
if (!first) fprintf(f, ",\n");
first = 0;
fprintf(f,
" \"%s\": {\n"
" \"shm_path\": \"%s\",\n"
" \"sem_name\": \"%s\",\n"
" \"width\": %u,\n"
" \"height\": %u,\n"
" \"fps_num\": %u,\n"
" \"fps_den\": %u,\n"
" \"pixel_format\": \"UYVY422\",\n"
" \"source_type\": \"%s\",\n"
" \"frame_size\": %u,\n"
" \"ring_depth\": %u,\n"
" \"created_at\": \"%s\"\n"
" }",
g_registry[i].slot_id,
fc_slot_shm_path(g_registry[i].slot),
fc_slot_sem_name(g_registry[i].slot),
hdr->width, hdr->height,
hdr->fps_num, hdr->fps_den,
hdr->source_type,
hdr->frame_size,
hdr->ring_depth,
ts
);
}
fprintf(f, "\n }\n}\n");
fclose(f);
}

View file

@ -0,0 +1,21 @@
#pragma once
#include "slot.h"
/* Maximum number of concurrent slots */
#define FC_MAX_SLOTS 256
/* Registry entry (in-memory) */
typedef struct {
int active;
struct fc_slot *slot;
char slot_id[FC_MAX_SLOT_ID];
} fc_registry_entry_t;
/* Global registry — managed by framecache.c */
extern fc_registry_entry_t g_registry[FC_MAX_SLOTS];
extern int g_registry_count;
void registry_add(struct fc_slot *slot);
void registry_remove(const char *slot_id);
struct fc_slot *registry_find(const char *slot_id);
void registry_write_json(void); /* writes /dev/shm/framecache/registry.json */

View file

@ -0,0 +1,232 @@
/**
* slot.c Framecache slot lifecycle: create, destroy, open.
*/
#include "slot.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#define SHM_DIR "/dev/shm/framecache"
#define SEM_PREFIX "/framecache-"
#define SEM_SUFFIX "-write"
/* Internal handle used by both server (writer) and client (reader) */
struct fc_slot {
int shm_fd;
void *base;
size_t shm_size;
sem_t *sem;
char slot_id[FC_MAX_SLOT_ID];
char shm_path[128];
char sem_name[128];
};
/* ── helpers ─────────────────────────────────────────────────────────── */
static void build_paths(const char *slot_id,
char *shm_path, size_t sp_len,
char *sem_name, size_t sn_len)
{
snprintf(shm_path, sp_len, "%s/%s", SHM_DIR, slot_id);
snprintf(sem_name, sn_len, "%s%s%s", SEM_PREFIX, slot_id, SEM_SUFFIX);
}
/* ── server-side: create / destroy ───────────────────────────────────── */
/**
* Create a new slot. Allocates and initialises the shm region.
* Returns handle on success, NULL on error (errno set).
*/
struct fc_slot *fc_slot_create(const char *slot_id,
uint32_t width, uint32_t height,
uint32_t fps_num, uint32_t fps_den,
uint32_t pixel_format,
const char *source_type)
{
char shm_path[128], sem_name[128];
build_paths(slot_id, shm_path, sizeof shm_path, sem_name, sizeof sem_name);
uint32_t frame_size = width * height * 2; /* UYVY422 */
size_t total = fc_slot_shm_size(frame_size);
/* Ensure directory exists */
mkdir(SHM_DIR, 0755);
/* Create shm file */
int fd = open(shm_path, O_RDWR | O_CREAT | O_TRUNC, 0666);
if (fd < 0) {
perror("[framecache] open shm");
return NULL;
}
if (ftruncate(fd, (off_t)total) < 0) {
perror("[framecache] ftruncate");
close(fd);
unlink(shm_path);
return NULL;
}
void *base = mmap(NULL, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) {
perror("[framecache] mmap");
close(fd);
unlink(shm_path);
return NULL;
}
memset(base, 0, total);
/* Initialise header */
fc_header_t *hdr = (fc_header_t *)base;
hdr->magic = FC_MAGIC;
hdr->version = FC_VERSION;
hdr->width = width;
hdr->height = height;
hdr->fps_num = fps_num;
hdr->fps_den = fps_den;
hdr->pixel_format = pixel_format;
hdr->frame_size = frame_size;
hdr->ring_depth = FC_RING_DEPTH;
atomic_store(&hdr->write_cursor, 0);
atomic_store(&hdr->dropped_frames, 0);
strncpy(hdr->source_type, source_type ? source_type : "unknown",
sizeof hdr->source_type - 1);
strncpy(hdr->slot_id, slot_id, sizeof hdr->slot_id - 1);
/* Create semaphore */
sem_unlink(sem_name); /* remove stale */
sem_t *sem = sem_open(sem_name, O_CREAT | O_EXCL, 0666, 0);
if (sem == SEM_FAILED) {
perror("[framecache] sem_open");
munmap(base, total);
close(fd);
unlink(shm_path);
return NULL;
}
struct fc_slot *s = calloc(1, sizeof *s);
if (!s) {
sem_close(sem); sem_unlink(sem_name);
munmap(base, total);
close(fd);
unlink(shm_path);
return NULL;
}
s->shm_fd = fd;
s->base = base;
s->shm_size = total;
s->sem = sem;
strncpy(s->slot_id, slot_id, sizeof s->slot_id - 1);
strncpy(s->shm_path, shm_path, sizeof s->shm_path - 1);
strncpy(s->sem_name, sem_name, sizeof s->sem_name - 1);
fprintf(stderr, "[framecache] slot created: %s (%ux%u %.2ffps %zuMB)\n",
slot_id, width, height,
fps_den ? (double)fps_num / fps_den : 0.0,
total / 1024 / 1024);
return s;
}
/**
* Destroy a slot: unmap, close fd, delete files, free handle.
*/
void fc_slot_destroy(struct fc_slot *s)
{
if (!s) return;
sem_close(s->sem);
sem_unlink(s->sem_name);
munmap(s->base, s->shm_size);
close(s->shm_fd);
unlink(s->shm_path);
fprintf(stderr, "[framecache] slot destroyed: %s\n", s->slot_id);
free(s);
}
/* ── writer: called by ingest bridges ───────────────────────────────── */
/**
* Write one frame into the ring. Never blocks advances write_cursor
* atomically and posts the semaphore. Slow consumers will be skipped.
*/
void fc_slot_write_frame(struct fc_slot *s,
const uint8_t *data, uint32_t size,
uint64_t pts_us)
{
fc_header_t *hdr = (fc_header_t *)s->base;
uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed);
fc_frame_t *frame = fc_frame_at(s->base, hdr->frame_size, cur);
frame->pts_us = pts_us;
frame->wall_us = (uint64_t)({ struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000; });
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
memcpy(frame->data, data, frame->size);
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
sem_post(s->sem);
}
/* Accessors used by HTTP API */
fc_header_t *fc_slot_header(struct fc_slot *s) { return (fc_header_t *)s->base; }
const char *fc_slot_id(struct fc_slot *s) { return s->slot_id; }
const char *fc_slot_shm_path(struct fc_slot *s) { return s->shm_path; }
const char *fc_slot_sem_name(struct fc_slot *s) { return s->sem_name; }
/* ── client-side open / read / close (also used by capture-manager) ── */
/**
* Open an existing slot for reading.
* Returns NULL if slot not found or header magic mismatch.
*/
struct fc_slot *fc_slot_open(const char *slot_id)
{
char shm_path[128], sem_name[128];
build_paths(slot_id, shm_path, sizeof shm_path, sem_name, sizeof sem_name);
int fd = open(shm_path, O_RDONLY);
if (fd < 0) return NULL;
/* Read header first to get frame_size */
fc_header_t tmp_hdr;
if (pread(fd, &tmp_hdr, sizeof tmp_hdr, 0) != sizeof tmp_hdr) {
close(fd); return NULL;
}
if (tmp_hdr.magic != FC_MAGIC) {
close(fd); return NULL;
}
size_t total = fc_slot_shm_size(tmp_hdr.frame_size);
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) { close(fd); return NULL; }
sem_t *sem = sem_open(sem_name, 0);
if (sem == SEM_FAILED) { munmap(base, total); close(fd); return NULL; }
struct fc_slot *s = calloc(1, sizeof *s);
if (!s) { sem_close(sem); munmap(base, total); close(fd); return NULL; }
s->shm_fd = fd;
s->base = base;
s->shm_size = total;
s->sem = sem;
strncpy(s->slot_id, slot_id, sizeof s->slot_id - 1);
strncpy(s->shm_path, shm_path, sizeof s->shm_path - 1);
strncpy(s->sem_name, sem_name, sizeof s->sem_name - 1);
return s;
}
/**
* Close a client-side slot handle. Does not destroy the slot.
*/
void fc_slot_close(struct fc_slot *s)
{
if (!s) return;
sem_close(s->sem);
munmap(s->base, s->shm_size);
close(s->shm_fd);
free(s);
}

View file

@ -0,0 +1,76 @@
/**
* slot.h Framecache shared memory slot definitions.
*
* Layout per slot (/dev/shm/framecache/<slot_id>):
* [fc_header_t 4KB aligned]
* [fc_frame_t × ring_depth each FC_FRAME_HDR_SIZE + frame_size bytes]
*
* Writer advances write_cursor atomically and posts the named semaphore.
* Each consumer tracks its own read_cursor independently writer never blocks.
*/
#pragma once
#include <stdint.h>
#include <stdatomic.h>
#include <semaphore.h>
#define FC_MAGIC 0x46524D43u /* "FRMC" */
#define FC_VERSION 1u
#define FC_RING_DEPTH 120u /* ~2s at 59.94fps */
#define FC_HEADER_SIZE 4096u /* 4KB header block */
#define FC_FRAME_HDR_SIZE 24u /* pts_us(8) + wall_us(8) + size(4) + pad(4) */
#define FC_MAX_SLOT_ID 64u
/* Pixel format codes */
#define FC_PIX_UYVY422 0u
typedef struct {
uint32_t magic; /* FC_MAGIC */
uint32_t version; /* FC_VERSION */
uint32_t width;
uint32_t height;
uint32_t fps_num;
uint32_t fps_den;
uint32_t pixel_format; /* FC_PIX_UYVY422 */
uint32_t frame_size; /* width * height * 2 */
uint32_t ring_depth; /* FC_RING_DEPTH */
uint32_t _reserved;
_Atomic uint64_t write_cursor; /* monotonically increasing frame index */
_Atomic uint64_t dropped_frames;
char source_type[32]; /* "deltacast" | "blackmagic" | "srt" | "rtmp" */
char slot_id[FC_MAX_SLOT_ID];
uint8_t _pad[FC_HEADER_SIZE - 112];
} fc_header_t;
/* Per-frame metadata + data (variable length — use fc_frame_at() accessor) */
typedef struct {
uint64_t pts_us;
uint64_t wall_us;
uint32_t size;
uint32_t _pad;
uint8_t data[]; /* frame_size bytes */
} fc_frame_t;
/* Compile-time size check */
_Static_assert(sizeof(fc_header_t) == FC_HEADER_SIZE,
"fc_header_t must be exactly FC_HEADER_SIZE bytes");
_Static_assert(sizeof(fc_frame_t) == FC_FRAME_HDR_SIZE,
"fc_frame_t header must be exactly FC_FRAME_HDR_SIZE bytes");
/**
* Compute total shm size for a slot given frame_size.
* = FC_HEADER_SIZE + ring_depth * (FC_FRAME_HDR_SIZE + frame_size)
*/
static inline size_t fc_slot_shm_size(uint32_t frame_size) {
return (size_t)FC_HEADER_SIZE
+ (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + frame_size);
}
/**
* Return pointer to frame at ring index idx within a mapped shm base.
*/
static inline fc_frame_t *fc_frame_at(void *base, uint32_t frame_size, uint64_t idx) {
uint8_t *frames = (uint8_t *)base + FC_HEADER_SIZE;
return (fc_frame_t *)(frames + (idx % FC_RING_DEPTH)
* ((size_t)FC_FRAME_HDR_SIZE + frame_size));
}