diff --git a/.env.example b/.env.example index 05b20c3..281cee1 100644 --- a/.env.example +++ b/.env.example @@ -69,6 +69,14 @@ GOOGLE_ALLOWED_DOMAIN= # the authenticator code (Google is treated as the first factor). Accounts without # TOTP complete sign-in in one Google step. +# Framecache — shared memory ring buffer for SDI + network ingest fan-out. +# Size in GB. Tune per node based on available RAM and number of SDI inputs. +# Each 1080p59.94 source uses ~494MB (120-frame ring at 4.1MB/frame). +# Baratheon (251GB RAM): 60 +# zampp1 (93GB RAM): 40 +# zampp2 (18GB RAM): 8 (increase node RAM before deploying capture) +FC_SHM_SIZE_GB=40 + # Playout / Master Control (MCR) # Image tag the mam-api spawns when a channel starts. Build with: # docker compose --profile build-only build playout diff --git a/docker-compose.worker.yml b/docker-compose.worker.yml index a5e6980..95e17bc 100644 --- a/docker-compose.worker.yml +++ b/docker-compose.worker.yml @@ -151,6 +151,34 @@ services: networks: - wild-dragon-worker + # Framecache — shared memory ring buffer for SDI + network ingest fan-out. + # Runs on every worker node that has capture sources (Blackmagic, Deltacast). + # IPC host mode lets all capture sidecars share /dev/shm with this container. + # FC_SHM_SIZE can be tuned per node in .env.worker: + # Baratheon (251GB RAM): FC_SHM_SIZE=64424509440 (60GB) + # zampp1 (93GB RAM): FC_SHM_SIZE=42949672960 (40GB) + # zampp2 (18GB RAM): FC_SHM_SIZE=8589934592 (8GB — increase RAM first) + framecache: + build: ./services/framecache + profiles: [capture] + restart: unless-stopped + ipc: host + shm_size: '${FC_SHM_SIZE_GB:-40}gb' + environment: + FC_PORT: 7435 + ports: + - "7435:7435" + volumes: + - /dev/shm:/dev/shm + networks: + - wild-dragon-worker + healthcheck: + test: ["CMD", "wget", "-qO-", "http://localhost:7435/health"] + interval: 10s + timeout: 3s + retries: 3 + start_period: 5s + networks: wild-dragon-worker: driver: bridge diff --git a/services/framecache/CMakeLists.txt b/services/framecache/CMakeLists.txt new file mode 100644 index 0000000..b085b93 --- /dev/null +++ b/services/framecache/CMakeLists.txt @@ -0,0 +1,39 @@ +cmake_minimum_required(VERSION 3.16) +project(framecache C) + +set(CMAKE_C_STANDARD 11) +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra -O2") + +# ── libmicrohttpd ──────────────────────────────────────────────────── +find_library(MHD_LIB microhttpd REQUIRED) +find_path(MHD_INCLUDE microhttpd.h REQUIRED) +include_directories(${MHD_INCLUDE}) + +# ── framecache server ──────────────────────────────────────────────── +add_executable(framecache + src/framecache.c + src/slot.c + src/registry.c +) +target_link_libraries(framecache ${MHD_LIB} rt pthread) + +# ── fc_client static library (used by bridges + test) ─────────────── +add_library(fc_client STATIC + client/fc_client.c + src/slot.c # client needs fc_slot_shm_size / fc_frame_at +) +target_include_directories(fc_client PUBLIC src client) +target_link_libraries(fc_client rt pthread) + +# ── test consumer (dev utility) ────────────────────────────────────── +if(BUILD_TESTS) + add_executable(fc_test_consumer + client/fc_test_consumer.c + ) + target_link_libraries(fc_test_consumer fc_client) + target_include_directories(fc_test_consumer PRIVATE src client) +endif() + +install(TARGETS framecache DESTINATION bin) +install(FILES client/fc_client.h src/slot.h DESTINATION include/framecache) +install(TARGETS fc_client DESTINATION lib) diff --git a/services/framecache/Dockerfile b/services/framecache/Dockerfile new file mode 100644 index 0000000..2be01e1 --- /dev/null +++ b/services/framecache/Dockerfile @@ -0,0 +1,31 @@ +# ── Build stage ───────────────────────────────────────────────────── +FROM debian:bookworm AS builder + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential cmake libmicrohttpd-dev \ + && rm -rf /var/lib/apt/lists/* + +COPY . /src +RUN cmake -S /src -B /build \ + -DCMAKE_BUILD_TYPE=Release \ + && cmake --build /build -j"$(nproc)" + +# ── Runtime stage ──────────────────────────────────────────────────── +FROM debian:bookworm-slim + +RUN apt-get update && apt-get install -y --no-install-recommends \ + libmicrohttpd12 \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /build/framecache /usr/local/bin/framecache +COPY --from=builder /build/fc_test_consumer /usr/local/bin/fc_test_consumer 2>/dev/null || true + +# /dev/shm/framecache is created at runtime (tmpfs) +RUN mkdir -p /dev/shm/framecache + +EXPOSE 7435 + +HEALTHCHECK --interval=10s --timeout=3s --start-period=5s \ + CMD wget -qO- http://localhost:7435/health || exit 1 + +CMD ["/usr/local/bin/framecache"] diff --git a/services/framecache/client/fc_client.c b/services/framecache/client/fc_client.c new file mode 100644 index 0000000..978e331 --- /dev/null +++ b/services/framecache/client/fc_client.c @@ -0,0 +1,150 @@ +/** + * fc_client.c — Consumer-side framecache client implementation. + */ +#include "fc_client.h" +#include "../src/slot.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define SHM_DIR "/dev/shm/framecache" +#define SEM_PREFIX "/framecache-" +#define SEM_SUFFIX "-write" + +struct fc_consumer { + int shm_fd; + void *base; + size_t shm_size; + sem_t *sem; + uint64_t read_cursor; /* consumer's own position in the ring */ + uint64_t local_dropped; /* frames skipped by this consumer */ + char slot_id[FC_MAX_SLOT_ID]; +}; + +static uint64_t now_us(void) +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL; +} + +fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms) +{ + char shm_path[128], sem_name[128]; + snprintf(shm_path, sizeof shm_path, "%s/%s", SHM_DIR, slot_id); + snprintf(sem_name, sizeof sem_name, "%s%s%s", SEM_PREFIX, slot_id, SEM_SUFFIX); + + uint64_t deadline = now_us() + wait_ms * 1000ULL; + int fd = -1; + while (1) { + fd = open(shm_path, O_RDONLY); + if (fd >= 0) break; + if (now_us() >= deadline) return NULL; + struct timespec ts = { .tv_nsec = 100000000 }; /* 100ms */ + nanosleep(&ts, NULL); + } + + /* Read header to get frame_size */ + fc_header_t hdr; + if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) { + close(fd); return NULL; + } + size_t total = fc_slot_shm_size(hdr.frame_size); + + void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { close(fd); return NULL; } + + sem_t *sem = sem_open(sem_name, 0); + if (sem == SEM_FAILED) { munmap(base, total); close(fd); return NULL; } + + fc_consumer_t *c = calloc(1, sizeof *c); + if (!c) { sem_close(sem); munmap(base, total); close(fd); return NULL; } + + c->shm_fd = fd; + c->base = base; + c->shm_size = total; + c->sem = sem; + /* Start reading from the current write position so we don't replay old frames */ + c->read_cursor = atomic_load_explicit( + &((fc_header_t *)base)->write_cursor, memory_order_acquire); + c->local_dropped = 0; + strncpy(c->slot_id, slot_id, FC_MAX_SLOT_ID - 1); + return c; +} + +int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms) +{ + fc_header_t *hdr = (fc_header_t *)c->base; + + /* Wait for a new frame via semaphore */ + struct timespec abs_ts; + clock_gettime(CLOCK_REALTIME, &abs_ts); + abs_ts.tv_sec += (time_t)(timeout_ms / 1000); + abs_ts.tv_nsec += (long)((timeout_ms % 1000) * 1000000L); + if (abs_ts.tv_nsec >= 1000000000L) { + abs_ts.tv_sec++; + abs_ts.tv_nsec -= 1000000000L; + } + + while (sem_timedwait(c->sem, &abs_ts) != 0) { + if (errno == ETIMEDOUT) return FC_TIMEOUT; + if (errno == EINTR) continue; + return FC_ERROR; + } + + uint64_t write_cur = atomic_load_explicit(&hdr->write_cursor, + memory_order_acquire); + int dropped = 0; + + /* Check if consumer fell behind by more than ring_depth */ + if (write_cur > c->read_cursor + hdr->ring_depth) { + uint64_t skipped = write_cur - c->read_cursor - hdr->ring_depth; + c->read_cursor = write_cur - hdr->ring_depth; + c->local_dropped += skipped; + atomic_fetch_add(&hdr->dropped_frames, skipped); + dropped = 1; + } + + if (c->read_cursor >= write_cur) { + /* Semaphore posted but nothing new yet (spurious) */ + return FC_TIMEOUT; + } + + fc_frame_t *frame = fc_frame_at(c->base, hdr->frame_size, c->read_cursor); + ref->data = frame->data; + ref->size = frame->size; + ref->pts_us = frame->pts_us; + ref->wall_us = frame->wall_us; + ref->seq = c->read_cursor; + + c->read_cursor++; + return dropped ? FC_DROPPED : FC_OK; +} + +void fc_consumer_close(fc_consumer_t *c) +{ + if (!c) return; + sem_close(c->sem); + munmap(c->base, c->shm_size); + close(c->shm_fd); + free(c); +} + +uint64_t fc_consumer_write_cursor(fc_consumer_t *c) +{ + fc_header_t *hdr = (fc_header_t *)c->base; + return atomic_load(&hdr->write_cursor); +} + +uint64_t fc_consumer_dropped(fc_consumer_t *c) +{ + return c->local_dropped; +} diff --git a/services/framecache/client/fc_client.h b/services/framecache/client/fc_client.h new file mode 100644 index 0000000..442ce16 --- /dev/null +++ b/services/framecache/client/fc_client.h @@ -0,0 +1,70 @@ +/** + * fc_client.h — Consumer-side framecache client library. + * + * Usage: + * fc_consumer_t *c = fc_consumer_open("deltacast-zampp3-0"); + * fc_frame_ref_t ref; + * while (fc_consumer_read(c, &ref, 2000) == FC_OK) { + * // ref.data valid until next fc_consumer_read call + * process_frame(ref.data, ref.size, ref.pts_us); + * } + * fc_consumer_close(c); + * + * Each consumer tracks its own read_cursor — multiple consumers on the same + * slot are fully independent and never block each other or the writer. + * + * If a consumer falls more than ring_depth frames behind the writer its cursor + * is snapped to the latest frame and FC_DROPPED is returned once. + */ +#pragma once + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* Return codes */ +#define FC_OK 0 +#define FC_TIMEOUT 1 /* no new frame within timeout_ms */ +#define FC_DROPPED 2 /* consumer fell behind; cursor snapped to latest */ +#define FC_ERROR -1 + +typedef struct fc_consumer fc_consumer_t; + +typedef struct { + const uint8_t *data; /* zero-copy pointer into shm ring — valid until next read */ + uint32_t size; /* bytes */ + uint64_t pts_us; /* presentation timestamp (microseconds) */ + uint64_t wall_us; /* wall clock at write time (microseconds) */ + uint64_t seq; /* write_cursor value for this frame */ +} fc_frame_ref_t; + +/** + * Open a consumer handle for the named slot. + * Polls the slot shm file until it appears (up to wait_ms milliseconds). + * Returns NULL if slot not found within wait_ms or on error. + */ +fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms); + +/** + * Read the next frame. + * Blocks up to timeout_ms waiting for a new frame (via semaphore). + * Returns FC_OK, FC_TIMEOUT, FC_DROPPED, or FC_ERROR. + * On FC_OK or FC_DROPPED the ref fields are populated. + */ +int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms); + +/** Close the consumer handle. Does NOT destroy the slot. */ +void fc_consumer_close(fc_consumer_t *c); + +/** Current write_cursor of the slot (approximate — no lock). */ +uint64_t fc_consumer_write_cursor(fc_consumer_t *c); + +/** Frames dropped by this consumer since open. */ +uint64_t fc_consumer_dropped(fc_consumer_t *c); + +#ifdef __cplusplus +} +#endif diff --git a/services/framecache/client/fc_test_consumer.c b/services/framecache/client/fc_test_consumer.c new file mode 100644 index 0000000..c809463 --- /dev/null +++ b/services/framecache/client/fc_test_consumer.c @@ -0,0 +1,73 @@ +/** + * fc_test_consumer.c — Dev utility: attach to a framecache slot and print stats. + * + * Usage: fc_test_consumer [wait_ms] + */ +#include "fc_client.h" +#include +#include +#include +#include + +static volatile int g_run = 1; +static void on_sig(int s) { (void)s; g_run = 0; } + +int main(int argc, char **argv) +{ + if (argc < 2) { + fprintf(stderr, "Usage: %s [wait_ms]\n", argv[0]); + return 1; + } + const char *slot_id = argv[1]; + uint64_t wait_ms = argc >= 3 ? (uint64_t)atoi(argv[2]) : 30000; + + signal(SIGINT, on_sig); + signal(SIGTERM, on_sig); + + fprintf(stderr, "Opening slot '%s' (wait up to %llums)...\n", + slot_id, (unsigned long long)wait_ms); + + fc_consumer_t *c = fc_consumer_open(slot_id, wait_ms); + if (!c) { + fprintf(stderr, "Failed to open slot '%s'\n", slot_id); + return 1; + } + fprintf(stderr, "Slot opened. Reading frames (Ctrl+C to stop)...\n"); + + uint64_t total = 0, dropped = 0; + struct timespec t0; + clock_gettime(CLOCK_MONOTONIC, &t0); + + while (g_run) { + fc_frame_ref_t ref; + int rc = fc_consumer_read(c, &ref, 2000); + if (rc == FC_TIMEOUT) continue; + if (rc == FC_ERROR) { fprintf(stderr, "read error\n"); break; } + if (rc == FC_DROPPED) { + dropped = fc_consumer_dropped(c); + fprintf(stderr, "[WARN] consumer fell behind — total dropped: %llu\n", + (unsigned long long)dropped); + } + total++; + + /* Print stats every 100 frames */ + if (total % 100 == 0) { + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + double elapsed = (now.tv_sec - t0.tv_sec) + + (now.tv_nsec - t0.tv_nsec) * 1e-9; + fprintf(stdout, "frames=%llu dropped=%llu fps=%.2f pts_us=%llu\n", + (unsigned long long)total, + (unsigned long long)fc_consumer_dropped(c), + total / elapsed, + (unsigned long long)ref.pts_us); + fflush(stdout); + } + } + + fprintf(stderr, "Done. total=%llu dropped=%llu\n", + (unsigned long long)total, + (unsigned long long)fc_consumer_dropped(c)); + fc_consumer_close(c); + return 0; +} diff --git a/services/framecache/src/framecache.c b/services/framecache/src/framecache.c new file mode 100644 index 0000000..d1631ee --- /dev/null +++ b/services/framecache/src/framecache.c @@ -0,0 +1,350 @@ +/** + * framecache.c — Main entry point. HTTP API server + slot manager. + * + * Endpoints: + * POST /slots Create slot + * GET /slots List slots + * GET /slots/:id Get slot detail + * DELETE /slots/:id Destroy slot + * GET /health Health check + * + * Uses libmicrohttpd for the HTTP layer (single-threaded, poll-based). + */ +#include "slot.h" +#include "registry.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef FC_PORT_DEFAULT +#define FC_PORT_DEFAULT 7435 +#endif + +/* ── tiny JSON helpers ─────────────────────────────────────────────── */ + +static int json_get_uint(const char *json, const char *key, uint32_t *out) +{ + char pat[128]; + snprintf(pat, sizeof pat, "\"%s\":", key); + const char *p = strstr(json, pat); + if (!p) return -1; + p += strlen(pat); + while (*p == ' ' || *p == '\t') p++; + *out = (uint32_t)strtoul(p, NULL, 10); + return 0; +} + +static int json_get_str(const char *json, const char *key, + char *out, size_t out_len) +{ + char pat[128]; + snprintf(pat, sizeof pat, "\"%s\":", key); + const char *p = strstr(json, pat); + if (!p) return -1; + p += strlen(pat); + while (*p == ' ' || *p == '\t') p++; + if (*p != '"') return -1; + p++; + size_t i = 0; + while (*p && *p != '"' && i < out_len - 1) + out[i++] = *p++; + out[i] = '\0'; + return 0; +} + +/* ── HTTP request accumulator ──────────────────────────────────────── */ + +typedef struct { + char *buf; + size_t len; + size_t cap; +} req_body_t; + +static void req_body_free(req_body_t *r) +{ + free(r->buf); + r->buf = NULL; r->len = 0; r->cap = 0; +} + +/* ── response helpers ──────────────────────────────────────────────── */ + +static enum MHD_Result respond(struct MHD_Connection *conn, + unsigned int status, + const char *body) +{ + struct MHD_Response *r = MHD_create_response_from_buffer( + strlen(body), (void *)body, MHD_RESPMEM_MUST_COPY); + MHD_add_response_header(r, "Content-Type", "application/json"); + MHD_add_response_header(r, "Access-Control-Allow-Origin", "*"); + enum MHD_Result rc = MHD_queue_response(conn, status, r); + MHD_destroy_response(r); + return rc; +} + +/* ── slot → JSON ───────────────────────────────────────────────────── */ + +static void slot_to_json(struct fc_slot *s, char *buf, size_t len) +{ + fc_header_t *hdr = fc_slot_header(s); + uint64_t wc = atomic_load(&hdr->write_cursor); + uint64_t df = atomic_load(&hdr->dropped_frames); + /* simple fps estimate — not perfect but good enough for status */ + snprintf(buf, len, + "{" + "\"slot_id\":\"%s\"," + "\"shm_path\":\"%s\"," + "\"sem_name\":\"%s\"," + "\"width\":%u," + "\"height\":%u," + "\"fps_num\":%u," + "\"fps_den\":%u," + "\"pixel_format\":\"UYVY422\"," + "\"source_type\":\"%s\"," + "\"frame_size\":%u," + "\"ring_depth\":%u," + "\"write_cursor\":%llu," + "\"dropped_frames\":%llu" + "}", + fc_slot_id(s), + fc_slot_shm_path(s), + fc_slot_sem_name(s), + hdr->width, hdr->height, + hdr->fps_num, hdr->fps_den, + hdr->source_type, + hdr->frame_size, + hdr->ring_depth, + (unsigned long long)wc, + (unsigned long long)df + ); +} + +/* ── request handler ───────────────────────────────────────────────── */ + +static enum MHD_Result handle_request( + void *cls, + struct MHD_Connection *conn, + const char *url, + const char *method, + const char *version, + const char *upload_data, + size_t *upload_data_size, + void **con_cls) +{ + (void)cls; (void)version; + + /* First call: allocate body accumulator */ + if (*con_cls == NULL) { + req_body_t *rb = calloc(1, sizeof *rb); + if (!rb) return MHD_NO; + *con_cls = rb; + return MHD_YES; + } + req_body_t *rb = (req_body_t *)*con_cls; + + /* Accumulate POST body */ + if (*upload_data_size > 0) { + size_t need = rb->len + *upload_data_size + 1; + if (need > rb->cap) { + rb->buf = realloc(rb->buf, need); + rb->cap = need; + } + memcpy(rb->buf + rb->len, upload_data, *upload_data_size); + rb->len += *upload_data_size; + rb->buf[rb->len] = '\0'; + *upload_data_size = 0; + return MHD_YES; + } + + enum MHD_Result rc; + char resp[4096]; + + /* GET /health */ + if (strcmp(method, "GET") == 0 && strcmp(url, "/health") == 0) { + rc = respond(conn, MHD_HTTP_OK, "{\"status\":\"ok\"}"); + goto done; + } + + /* GET /slots */ + if (strcmp(method, "GET") == 0 && strcmp(url, "/slots") == 0) { + char big[65536]; + int pos = 0; + pos += snprintf(big + pos, sizeof big - pos, "["); + int first = 1; + for (int i = 0; i < FC_MAX_SLOTS; i++) { + if (!g_registry[i].active) continue; + char entry[2048]; + slot_to_json(g_registry[i].slot, entry, sizeof entry); + if (!first) big[pos++] = ','; + first = 0; + pos += snprintf(big + pos, sizeof big - pos, "%s", entry); + } + snprintf(big + pos, sizeof big - pos, "]"); + rc = respond(conn, MHD_HTTP_OK, big); + goto done; + } + + /* GET /slots/:id */ + if (strcmp(method, "GET") == 0 && + strncmp(url, "/slots/", 7) == 0 && strlen(url) > 7) + { + const char *id = url + 7; + struct fc_slot *s = registry_find(id); + if (!s) { + rc = respond(conn, MHD_HTTP_NOT_FOUND, + "{\"error\":\"slot not found\"}"); + goto done; + } + slot_to_json(s, resp, sizeof resp); + rc = respond(conn, MHD_HTTP_OK, resp); + goto done; + } + + /* POST /slots */ + if (strcmp(method, "POST") == 0 && strcmp(url, "/slots") == 0) { + if (!rb->buf || rb->len == 0) { + rc = respond(conn, MHD_HTTP_BAD_REQUEST, + "{\"error\":\"empty body\"}"); + goto done; + } + char slot_id[FC_MAX_SLOT_ID] = {0}; + char source_type[32] = "unknown"; + uint32_t width = 0, height = 0, fps_num = 0, fps_den = 0; + + json_get_str(rb->buf, "slot_id", slot_id, sizeof slot_id); + json_get_str(rb->buf, "source_type", source_type, sizeof source_type); + json_get_uint(rb->buf, "width", &width); + json_get_uint(rb->buf, "height", &height); + json_get_uint(rb->buf, "fps_num", &fps_num); + json_get_uint(rb->buf, "fps_den", &fps_den); + + if (!slot_id[0] || !width || !height || !fps_num || !fps_den) { + rc = respond(conn, MHD_HTTP_BAD_REQUEST, + "{\"error\":\"missing required fields: " + "slot_id, width, height, fps_num, fps_den\"}"); + goto done; + } + if (registry_find(slot_id)) { + rc = respond(conn, MHD_HTTP_CONFLICT, + "{\"error\":\"slot already exists\"}"); + goto done; + } + + struct fc_slot *s = fc_slot_create(slot_id, width, height, + fps_num, fps_den, + FC_PIX_UYVY422, source_type); + if (!s) { + rc = respond(conn, MHD_HTTP_INTERNAL_SERVER_ERROR, + "{\"error\":\"failed to create slot\"}"); + goto done; + } + registry_add(s); + + snprintf(resp, sizeof resp, + "{\"slot_id\":\"%s\"," + "\"shm_path\":\"%s\"," + "\"sem_name\":\"%s\"}", + fc_slot_id(s), + fc_slot_shm_path(s), + fc_slot_sem_name(s)); + rc = respond(conn, MHD_HTTP_CREATED, resp); + goto done; + } + + /* DELETE /slots/:id */ + if (strcmp(method, "DELETE") == 0 && + strncmp(url, "/slots/", 7) == 0 && strlen(url) > 7) + { + const char *id = url + 7; + struct fc_slot *s = registry_find(id); + if (!s) { + rc = respond(conn, MHD_HTTP_NOT_FOUND, + "{\"error\":\"slot not found\"}"); + goto done; + } + registry_remove(id); + fc_slot_destroy(s); + rc = respond(conn, MHD_HTTP_NO_CONTENT, ""); + goto done; + } + + rc = respond(conn, MHD_HTTP_NOT_FOUND, "{\"error\":\"not found\"}"); + +done: + req_body_free(rb); + free(rb); + *con_cls = NULL; + return rc; +} + +static void request_completed(void *cls, + struct MHD_Connection *conn, + void **con_cls, + enum MHD_RequestTerminationCode toe) +{ + (void)cls; (void)conn; (void)toe; + if (*con_cls) { + req_body_free((req_body_t *)*con_cls); + free(*con_cls); + *con_cls = NULL; + } +} + +/* ── main ──────────────────────────────────────────────────────────── */ + +static volatile int g_running = 1; + +static void on_signal(int sig) { (void)sig; g_running = 0; } + +int main(void) +{ + signal(SIGINT, on_signal); + signal(SIGTERM, on_signal); + + /* Ensure /dev/shm/framecache exists */ + mkdir("/dev/shm/framecache", 0755); + + /* Write empty registry */ + registry_write_json(); + + const char *port_str = getenv("FC_PORT"); + uint16_t port = port_str ? (uint16_t)atoi(port_str) : FC_PORT_DEFAULT; + + struct MHD_Daemon *daemon = MHD_start_daemon( + MHD_USE_SELECT_INTERNALLY, + port, + NULL, NULL, + handle_request, NULL, + MHD_OPTION_NOTIFY_COMPLETED, request_completed, NULL, + MHD_OPTION_END); + + if (!daemon) { + fprintf(stderr, "[framecache] failed to start HTTP server on port %u\n", port); + return 1; + } + + fprintf(stderr, "[framecache] listening on port %u\n", port); + + while (g_running) { + struct timespec ts = { .tv_sec = 0, .tv_nsec = 100000000 }; /* 100ms */ + nanosleep(&ts, NULL); + } + + fprintf(stderr, "[framecache] shutting down\n"); + + /* Destroy all active slots */ + for (int i = 0; i < FC_MAX_SLOTS; i++) { + if (g_registry[i].active) { + registry_remove(g_registry[i].slot_id); + fc_slot_destroy(g_registry[i].slot); + } + } + + MHD_stop_daemon(daemon); + return 0; +} diff --git a/services/framecache/src/registry.c b/services/framecache/src/registry.c new file mode 100644 index 0000000..d1c4e20 --- /dev/null +++ b/services/framecache/src/registry.c @@ -0,0 +1,108 @@ +/** + * registry.c — In-memory slot registry + JSON persistence. + */ +#include "registry.h" +#include "slot.h" + +#include +#include +#include + +fc_registry_entry_t g_registry[FC_MAX_SLOTS]; +int g_registry_count = 0; + +static const char *REGISTRY_JSON = "/dev/shm/framecache/registry.json"; + +void registry_add(struct fc_slot *slot) +{ + for (int i = 0; i < FC_MAX_SLOTS; i++) { + if (!g_registry[i].active) { + g_registry[i].active = 1; + g_registry[i].slot = slot; + strncpy(g_registry[i].slot_id, fc_slot_id(slot), + FC_MAX_SLOT_ID - 1); + g_registry_count++; + registry_write_json(); + return; + } + } + fprintf(stderr, "[framecache] registry full (%d slots)\n", FC_MAX_SLOTS); +} + +void registry_remove(const char *slot_id) +{ + for (int i = 0; i < FC_MAX_SLOTS; i++) { + if (g_registry[i].active && + strncmp(g_registry[i].slot_id, slot_id, FC_MAX_SLOT_ID) == 0) + { + g_registry[i].active = 0; + g_registry[i].slot = NULL; + g_registry[i].slot_id[0] = '\0'; + g_registry_count--; + registry_write_json(); + return; + } + } +} + +struct fc_slot *registry_find(const char *slot_id) +{ + for (int i = 0; i < FC_MAX_SLOTS; i++) { + if (g_registry[i].active && + strncmp(g_registry[i].slot_id, slot_id, FC_MAX_SLOT_ID) == 0) + { + return g_registry[i].slot; + } + } + return NULL; +} + +void registry_write_json(void) +{ + FILE *f = fopen(REGISTRY_JSON, "w"); + if (!f) return; + + fprintf(f, "{\n \"version\": 1,\n \"slots\": {\n"); + + int first = 1; + for (int i = 0; i < FC_MAX_SLOTS; i++) { + if (!g_registry[i].active) continue; + fc_header_t *hdr = fc_slot_header(g_registry[i].slot); + + char ts[32]; + time_t now = time(NULL); + struct tm *t = gmtime(&now); + strftime(ts, sizeof ts, "%Y-%m-%dT%H:%M:%SZ", t); + + if (!first) fprintf(f, ",\n"); + first = 0; + + fprintf(f, + " \"%s\": {\n" + " \"shm_path\": \"%s\",\n" + " \"sem_name\": \"%s\",\n" + " \"width\": %u,\n" + " \"height\": %u,\n" + " \"fps_num\": %u,\n" + " \"fps_den\": %u,\n" + " \"pixel_format\": \"UYVY422\",\n" + " \"source_type\": \"%s\",\n" + " \"frame_size\": %u,\n" + " \"ring_depth\": %u,\n" + " \"created_at\": \"%s\"\n" + " }", + g_registry[i].slot_id, + fc_slot_shm_path(g_registry[i].slot), + fc_slot_sem_name(g_registry[i].slot), + hdr->width, hdr->height, + hdr->fps_num, hdr->fps_den, + hdr->source_type, + hdr->frame_size, + hdr->ring_depth, + ts + ); + } + + fprintf(f, "\n }\n}\n"); + fclose(f); +} diff --git a/services/framecache/src/registry.h b/services/framecache/src/registry.h new file mode 100644 index 0000000..aac67a0 --- /dev/null +++ b/services/framecache/src/registry.h @@ -0,0 +1,21 @@ +#pragma once +#include "slot.h" + +/* Maximum number of concurrent slots */ +#define FC_MAX_SLOTS 256 + +/* Registry entry (in-memory) */ +typedef struct { + int active; + struct fc_slot *slot; + char slot_id[FC_MAX_SLOT_ID]; +} fc_registry_entry_t; + +/* Global registry — managed by framecache.c */ +extern fc_registry_entry_t g_registry[FC_MAX_SLOTS]; +extern int g_registry_count; + +void registry_add(struct fc_slot *slot); +void registry_remove(const char *slot_id); +struct fc_slot *registry_find(const char *slot_id); +void registry_write_json(void); /* writes /dev/shm/framecache/registry.json */ diff --git a/services/framecache/src/slot.c b/services/framecache/src/slot.c new file mode 100644 index 0000000..1192c5a --- /dev/null +++ b/services/framecache/src/slot.c @@ -0,0 +1,232 @@ +/** + * slot.c — Framecache slot lifecycle: create, destroy, open. + */ +#include "slot.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#define SHM_DIR "/dev/shm/framecache" +#define SEM_PREFIX "/framecache-" +#define SEM_SUFFIX "-write" + +/* Internal handle used by both server (writer) and client (reader) */ +struct fc_slot { + int shm_fd; + void *base; + size_t shm_size; + sem_t *sem; + char slot_id[FC_MAX_SLOT_ID]; + char shm_path[128]; + char sem_name[128]; +}; + +/* ── helpers ─────────────────────────────────────────────────────────── */ + +static void build_paths(const char *slot_id, + char *shm_path, size_t sp_len, + char *sem_name, size_t sn_len) +{ + snprintf(shm_path, sp_len, "%s/%s", SHM_DIR, slot_id); + snprintf(sem_name, sn_len, "%s%s%s", SEM_PREFIX, slot_id, SEM_SUFFIX); +} + +/* ── server-side: create / destroy ───────────────────────────────────── */ + +/** + * Create a new slot. Allocates and initialises the shm region. + * Returns handle on success, NULL on error (errno set). + */ +struct fc_slot *fc_slot_create(const char *slot_id, + uint32_t width, uint32_t height, + uint32_t fps_num, uint32_t fps_den, + uint32_t pixel_format, + const char *source_type) +{ + char shm_path[128], sem_name[128]; + build_paths(slot_id, shm_path, sizeof shm_path, sem_name, sizeof sem_name); + + uint32_t frame_size = width * height * 2; /* UYVY422 */ + size_t total = fc_slot_shm_size(frame_size); + + /* Ensure directory exists */ + mkdir(SHM_DIR, 0755); + + /* Create shm file */ + int fd = open(shm_path, O_RDWR | O_CREAT | O_TRUNC, 0666); + if (fd < 0) { + perror("[framecache] open shm"); + return NULL; + } + if (ftruncate(fd, (off_t)total) < 0) { + perror("[framecache] ftruncate"); + close(fd); + unlink(shm_path); + return NULL; + } + + void *base = mmap(NULL, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { + perror("[framecache] mmap"); + close(fd); + unlink(shm_path); + return NULL; + } + memset(base, 0, total); + + /* Initialise header */ + fc_header_t *hdr = (fc_header_t *)base; + hdr->magic = FC_MAGIC; + hdr->version = FC_VERSION; + hdr->width = width; + hdr->height = height; + hdr->fps_num = fps_num; + hdr->fps_den = fps_den; + hdr->pixel_format = pixel_format; + hdr->frame_size = frame_size; + hdr->ring_depth = FC_RING_DEPTH; + atomic_store(&hdr->write_cursor, 0); + atomic_store(&hdr->dropped_frames, 0); + strncpy(hdr->source_type, source_type ? source_type : "unknown", + sizeof hdr->source_type - 1); + strncpy(hdr->slot_id, slot_id, sizeof hdr->slot_id - 1); + + /* Create semaphore */ + sem_unlink(sem_name); /* remove stale */ + sem_t *sem = sem_open(sem_name, O_CREAT | O_EXCL, 0666, 0); + if (sem == SEM_FAILED) { + perror("[framecache] sem_open"); + munmap(base, total); + close(fd); + unlink(shm_path); + return NULL; + } + + struct fc_slot *s = calloc(1, sizeof *s); + if (!s) { + sem_close(sem); sem_unlink(sem_name); + munmap(base, total); + close(fd); + unlink(shm_path); + return NULL; + } + s->shm_fd = fd; + s->base = base; + s->shm_size = total; + s->sem = sem; + strncpy(s->slot_id, slot_id, sizeof s->slot_id - 1); + strncpy(s->shm_path, shm_path, sizeof s->shm_path - 1); + strncpy(s->sem_name, sem_name, sizeof s->sem_name - 1); + + fprintf(stderr, "[framecache] slot created: %s (%ux%u %.2ffps %zuMB)\n", + slot_id, width, height, + fps_den ? (double)fps_num / fps_den : 0.0, + total / 1024 / 1024); + return s; +} + +/** + * Destroy a slot: unmap, close fd, delete files, free handle. + */ +void fc_slot_destroy(struct fc_slot *s) +{ + if (!s) return; + sem_close(s->sem); + sem_unlink(s->sem_name); + munmap(s->base, s->shm_size); + close(s->shm_fd); + unlink(s->shm_path); + fprintf(stderr, "[framecache] slot destroyed: %s\n", s->slot_id); + free(s); +} + +/* ── writer: called by ingest bridges ───────────────────────────────── */ + +/** + * Write one frame into the ring. Never blocks — advances write_cursor + * atomically and posts the semaphore. Slow consumers will be skipped. + */ +void fc_slot_write_frame(struct fc_slot *s, + const uint8_t *data, uint32_t size, + uint64_t pts_us) +{ + fc_header_t *hdr = (fc_header_t *)s->base; + uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed); + fc_frame_t *frame = fc_frame_at(s->base, hdr->frame_size, cur); + + frame->pts_us = pts_us; + frame->wall_us = (uint64_t)({ struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000; }); + frame->size = size < hdr->frame_size ? size : hdr->frame_size; + memcpy(frame->data, data, frame->size); + + atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release); + sem_post(s->sem); +} + +/* Accessors used by HTTP API */ +fc_header_t *fc_slot_header(struct fc_slot *s) { return (fc_header_t *)s->base; } +const char *fc_slot_id(struct fc_slot *s) { return s->slot_id; } +const char *fc_slot_shm_path(struct fc_slot *s) { return s->shm_path; } +const char *fc_slot_sem_name(struct fc_slot *s) { return s->sem_name; } + +/* ── client-side open / read / close (also used by capture-manager) ── */ + +/** + * Open an existing slot for reading. + * Returns NULL if slot not found or header magic mismatch. + */ +struct fc_slot *fc_slot_open(const char *slot_id) +{ + char shm_path[128], sem_name[128]; + build_paths(slot_id, shm_path, sizeof shm_path, sem_name, sizeof sem_name); + + int fd = open(shm_path, O_RDONLY); + if (fd < 0) return NULL; + + /* Read header first to get frame_size */ + fc_header_t tmp_hdr; + if (pread(fd, &tmp_hdr, sizeof tmp_hdr, 0) != sizeof tmp_hdr) { + close(fd); return NULL; + } + if (tmp_hdr.magic != FC_MAGIC) { + close(fd); return NULL; + } + size_t total = fc_slot_shm_size(tmp_hdr.frame_size); + + void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { close(fd); return NULL; } + + sem_t *sem = sem_open(sem_name, 0); + if (sem == SEM_FAILED) { munmap(base, total); close(fd); return NULL; } + + struct fc_slot *s = calloc(1, sizeof *s); + if (!s) { sem_close(sem); munmap(base, total); close(fd); return NULL; } + s->shm_fd = fd; + s->base = base; + s->shm_size = total; + s->sem = sem; + strncpy(s->slot_id, slot_id, sizeof s->slot_id - 1); + strncpy(s->shm_path, shm_path, sizeof s->shm_path - 1); + strncpy(s->sem_name, sem_name, sizeof s->sem_name - 1); + return s; +} + +/** + * Close a client-side slot handle. Does not destroy the slot. + */ +void fc_slot_close(struct fc_slot *s) +{ + if (!s) return; + sem_close(s->sem); + munmap(s->base, s->shm_size); + close(s->shm_fd); + free(s); +} diff --git a/services/framecache/src/slot.h b/services/framecache/src/slot.h new file mode 100644 index 0000000..057639c --- /dev/null +++ b/services/framecache/src/slot.h @@ -0,0 +1,76 @@ +/** + * slot.h — Framecache shared memory slot definitions. + * + * Layout per slot (/dev/shm/framecache/): + * [fc_header_t — 4KB aligned] + * [fc_frame_t × ring_depth — each FC_FRAME_HDR_SIZE + frame_size bytes] + * + * Writer advances write_cursor atomically and posts the named semaphore. + * Each consumer tracks its own read_cursor independently — writer never blocks. + */ +#pragma once + +#include +#include +#include + +#define FC_MAGIC 0x46524D43u /* "FRMC" */ +#define FC_VERSION 1u +#define FC_RING_DEPTH 120u /* ~2s at 59.94fps */ +#define FC_HEADER_SIZE 4096u /* 4KB header block */ +#define FC_FRAME_HDR_SIZE 24u /* pts_us(8) + wall_us(8) + size(4) + pad(4) */ +#define FC_MAX_SLOT_ID 64u + +/* Pixel format codes */ +#define FC_PIX_UYVY422 0u + +typedef struct { + uint32_t magic; /* FC_MAGIC */ + uint32_t version; /* FC_VERSION */ + uint32_t width; + uint32_t height; + uint32_t fps_num; + uint32_t fps_den; + uint32_t pixel_format; /* FC_PIX_UYVY422 */ + uint32_t frame_size; /* width * height * 2 */ + uint32_t ring_depth; /* FC_RING_DEPTH */ + uint32_t _reserved; + _Atomic uint64_t write_cursor; /* monotonically increasing frame index */ + _Atomic uint64_t dropped_frames; + char source_type[32]; /* "deltacast" | "blackmagic" | "srt" | "rtmp" */ + char slot_id[FC_MAX_SLOT_ID]; + uint8_t _pad[FC_HEADER_SIZE - 112]; +} fc_header_t; + +/* Per-frame metadata + data (variable length — use fc_frame_at() accessor) */ +typedef struct { + uint64_t pts_us; + uint64_t wall_us; + uint32_t size; + uint32_t _pad; + uint8_t data[]; /* frame_size bytes */ +} fc_frame_t; + +/* Compile-time size check */ +_Static_assert(sizeof(fc_header_t) == FC_HEADER_SIZE, + "fc_header_t must be exactly FC_HEADER_SIZE bytes"); +_Static_assert(sizeof(fc_frame_t) == FC_FRAME_HDR_SIZE, + "fc_frame_t header must be exactly FC_FRAME_HDR_SIZE bytes"); + +/** + * Compute total shm size for a slot given frame_size. + * = FC_HEADER_SIZE + ring_depth * (FC_FRAME_HDR_SIZE + frame_size) + */ +static inline size_t fc_slot_shm_size(uint32_t frame_size) { + return (size_t)FC_HEADER_SIZE + + (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + frame_size); +} + +/** + * Return pointer to frame at ring index idx within a mapped shm base. + */ +static inline fc_frame_t *fc_frame_at(void *base, uint32_t frame_size, uint64_t idx) { + uint8_t *frames = (uint8_t *)base + FC_HEADER_SIZE; + return (fc_frame_t *)(frames + (idx % FC_RING_DEPTH) + * ((size_t)FC_FRAME_HDR_SIZE + frame_size)); +}