feat(framecache): phase 1 — framecache container + consumer library
- services/framecache/: new standalone container
- slot.h/slot.c: shm ring buffer (120 frames, FC_MAGIC header, atomic
write_cursor, POSIX semaphore per slot)
- registry.h/registry.c: in-memory slot registry + /dev/shm/framecache/
registry.json persistence
- framecache.c: HTTP API server (libmicrohttpd, port 7435)
POST /slots, GET /slots, GET /slots/:id, DELETE /slots/:id, GET /health
- fc_client.h/fc_client.c: consumer library — fc_consumer_open/read/close
with per-consumer cursor, timeout via sem_timedwait, automatic skip+count
when consumer falls behind writer by > ring_depth frames
- fc_test_consumer.c: dev utility to attach to any slot and print fps/stats
- CMakeLists.txt: framecache server + fc_client static lib + test consumer
- Dockerfile: builder + slim runtime stages
- docker-compose.worker.yml: add framecache service (profile: capture,
ipc: host, shm_size from FC_SHM_SIZE_GB env var, healthcheck)
- .env.example: document FC_SHM_SIZE_GB with per-node guidance
2026-06-03 10:53:51 -04:00
|
|
|
/**
|
|
|
|
|
* slot.c — Framecache slot lifecycle: create, destroy, open.
|
|
|
|
|
*/
|
|
|
|
|
#include "slot.h"
|
|
|
|
|
|
|
|
|
|
#include <stdio.h>
|
|
|
|
|
#include <stdlib.h>
|
|
|
|
|
#include <string.h>
|
|
|
|
|
#include <errno.h>
|
|
|
|
|
#include <fcntl.h>
|
2026-06-03 14:05:38 -04:00
|
|
|
#include <time.h>
|
feat(framecache): phase 1 — framecache container + consumer library
- services/framecache/: new standalone container
- slot.h/slot.c: shm ring buffer (120 frames, FC_MAGIC header, atomic
write_cursor, POSIX semaphore per slot)
- registry.h/registry.c: in-memory slot registry + /dev/shm/framecache/
registry.json persistence
- framecache.c: HTTP API server (libmicrohttpd, port 7435)
POST /slots, GET /slots, GET /slots/:id, DELETE /slots/:id, GET /health
- fc_client.h/fc_client.c: consumer library — fc_consumer_open/read/close
with per-consumer cursor, timeout via sem_timedwait, automatic skip+count
when consumer falls behind writer by > ring_depth frames
- fc_test_consumer.c: dev utility to attach to any slot and print fps/stats
- CMakeLists.txt: framecache server + fc_client static lib + test consumer
- Dockerfile: builder + slim runtime stages
- docker-compose.worker.yml: add framecache service (profile: capture,
ipc: host, shm_size from FC_SHM_SIZE_GB env var, healthcheck)
- .env.example: document FC_SHM_SIZE_GB with per-node guidance
2026-06-03 10:53:51 -04:00
|
|
|
#include <sys/mman.h>
|
|
|
|
|
#include <sys/stat.h>
|
|
|
|
|
#include <unistd.h>
|
|
|
|
|
|
|
|
|
|
#define SHM_DIR "/dev/shm/framecache"
|
|
|
|
|
#define SEM_PREFIX "/framecache-"
|
|
|
|
|
#define SEM_SUFFIX "-write"
|
|
|
|
|
|
|
|
|
|
/* ── helpers ─────────────────────────────────────────────────────────── */
|
|
|
|
|
|
|
|
|
|
static void build_paths(const char *slot_id,
|
|
|
|
|
char *shm_path, size_t sp_len,
|
|
|
|
|
char *sem_name, size_t sn_len)
|
|
|
|
|
{
|
|
|
|
|
snprintf(shm_path, sp_len, "%s/%s", SHM_DIR, slot_id);
|
|
|
|
|
snprintf(sem_name, sn_len, "%s%s%s", SEM_PREFIX, slot_id, SEM_SUFFIX);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* ── server-side: create / destroy ───────────────────────────────────── */
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Create a new slot. Allocates and initialises the shm region.
|
|
|
|
|
* Returns handle on success, NULL on error (errno set).
|
|
|
|
|
*/
|
|
|
|
|
struct fc_slot *fc_slot_create(const char *slot_id,
|
|
|
|
|
uint32_t width, uint32_t height,
|
|
|
|
|
uint32_t fps_num, uint32_t fps_den,
|
|
|
|
|
uint32_t pixel_format,
|
|
|
|
|
const char *source_type)
|
|
|
|
|
{
|
|
|
|
|
char shm_path[128], sem_name[128];
|
|
|
|
|
build_paths(slot_id, shm_path, sizeof shm_path, sem_name, sizeof sem_name);
|
|
|
|
|
|
|
|
|
|
uint32_t frame_size = width * height * 2; /* UYVY422 */
|
|
|
|
|
size_t total = fc_slot_shm_size(frame_size);
|
|
|
|
|
|
|
|
|
|
/* Ensure directory exists */
|
|
|
|
|
mkdir(SHM_DIR, 0755);
|
|
|
|
|
|
|
|
|
|
/* Create shm file */
|
|
|
|
|
int fd = open(shm_path, O_RDWR | O_CREAT | O_TRUNC, 0666);
|
|
|
|
|
if (fd < 0) {
|
|
|
|
|
perror("[framecache] open shm");
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
if (ftruncate(fd, (off_t)total) < 0) {
|
|
|
|
|
perror("[framecache] ftruncate");
|
|
|
|
|
close(fd);
|
|
|
|
|
unlink(shm_path);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void *base = mmap(NULL, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
|
|
|
|
if (base == MAP_FAILED) {
|
|
|
|
|
perror("[framecache] mmap");
|
|
|
|
|
close(fd);
|
|
|
|
|
unlink(shm_path);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
memset(base, 0, total);
|
|
|
|
|
|
|
|
|
|
/* Initialise header */
|
|
|
|
|
fc_header_t *hdr = (fc_header_t *)base;
|
|
|
|
|
hdr->magic = FC_MAGIC;
|
|
|
|
|
hdr->version = FC_VERSION;
|
|
|
|
|
hdr->width = width;
|
|
|
|
|
hdr->height = height;
|
|
|
|
|
hdr->fps_num = fps_num;
|
|
|
|
|
hdr->fps_den = fps_den;
|
|
|
|
|
hdr->pixel_format = pixel_format;
|
|
|
|
|
hdr->frame_size = frame_size;
|
|
|
|
|
hdr->ring_depth = FC_RING_DEPTH;
|
|
|
|
|
atomic_store(&hdr->write_cursor, 0);
|
|
|
|
|
atomic_store(&hdr->dropped_frames, 0);
|
|
|
|
|
strncpy(hdr->source_type, source_type ? source_type : "unknown",
|
|
|
|
|
sizeof hdr->source_type - 1);
|
|
|
|
|
strncpy(hdr->slot_id, slot_id, sizeof hdr->slot_id - 1);
|
|
|
|
|
|
|
|
|
|
/* Create semaphore */
|
|
|
|
|
sem_unlink(sem_name); /* remove stale */
|
|
|
|
|
sem_t *sem = sem_open(sem_name, O_CREAT | O_EXCL, 0666, 0);
|
|
|
|
|
if (sem == SEM_FAILED) {
|
|
|
|
|
perror("[framecache] sem_open");
|
|
|
|
|
munmap(base, total);
|
|
|
|
|
close(fd);
|
|
|
|
|
unlink(shm_path);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct fc_slot *s = calloc(1, sizeof *s);
|
|
|
|
|
if (!s) {
|
|
|
|
|
sem_close(sem); sem_unlink(sem_name);
|
|
|
|
|
munmap(base, total);
|
|
|
|
|
close(fd);
|
|
|
|
|
unlink(shm_path);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
s->shm_fd = fd;
|
|
|
|
|
s->base = base;
|
|
|
|
|
s->shm_size = total;
|
|
|
|
|
s->sem = sem;
|
|
|
|
|
strncpy(s->slot_id, slot_id, sizeof s->slot_id - 1);
|
|
|
|
|
strncpy(s->shm_path, shm_path, sizeof s->shm_path - 1);
|
|
|
|
|
strncpy(s->sem_name, sem_name, sizeof s->sem_name - 1);
|
|
|
|
|
|
|
|
|
|
fprintf(stderr, "[framecache] slot created: %s (%ux%u %.2ffps %zuMB)\n",
|
|
|
|
|
slot_id, width, height,
|
|
|
|
|
fps_den ? (double)fps_num / fps_den : 0.0,
|
|
|
|
|
total / 1024 / 1024);
|
|
|
|
|
return s;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Destroy a slot: unmap, close fd, delete files, free handle.
|
|
|
|
|
*/
|
|
|
|
|
void fc_slot_destroy(struct fc_slot *s)
|
|
|
|
|
{
|
|
|
|
|
if (!s) return;
|
|
|
|
|
sem_close(s->sem);
|
|
|
|
|
sem_unlink(s->sem_name);
|
|
|
|
|
munmap(s->base, s->shm_size);
|
|
|
|
|
close(s->shm_fd);
|
|
|
|
|
unlink(s->shm_path);
|
|
|
|
|
fprintf(stderr, "[framecache] slot destroyed: %s\n", s->slot_id);
|
|
|
|
|
free(s);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* ── writer: called by ingest bridges ───────────────────────────────── */
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Write one frame into the ring. Never blocks — advances write_cursor
|
|
|
|
|
* atomically and posts the semaphore. Slow consumers will be skipped.
|
|
|
|
|
*/
|
|
|
|
|
void fc_slot_write_frame(struct fc_slot *s,
|
|
|
|
|
const uint8_t *data, uint32_t size,
|
|
|
|
|
uint64_t pts_us)
|
|
|
|
|
{
|
|
|
|
|
fc_header_t *hdr = (fc_header_t *)s->base;
|
|
|
|
|
uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed);
|
|
|
|
|
fc_frame_t *frame = fc_frame_at(s->base, hdr->frame_size, cur);
|
|
|
|
|
|
|
|
|
|
frame->pts_us = pts_us;
|
|
|
|
|
frame->wall_us = (uint64_t)({ struct timespec ts;
|
|
|
|
|
clock_gettime(CLOCK_REALTIME, &ts);
|
|
|
|
|
ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000; });
|
|
|
|
|
frame->size = size < hdr->frame_size ? size : hdr->frame_size;
|
|
|
|
|
memcpy(frame->data, data, frame->size);
|
|
|
|
|
|
|
|
|
|
atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release);
|
|
|
|
|
sem_post(s->sem);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* ── client-side open / read / close (also used by capture-manager) ── */
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Open an existing slot for reading.
|
|
|
|
|
* Returns NULL if slot not found or header magic mismatch.
|
|
|
|
|
*/
|
|
|
|
|
struct fc_slot *fc_slot_open(const char *slot_id)
|
|
|
|
|
{
|
|
|
|
|
char shm_path[128], sem_name[128];
|
|
|
|
|
build_paths(slot_id, shm_path, sizeof shm_path, sem_name, sizeof sem_name);
|
|
|
|
|
|
|
|
|
|
int fd = open(shm_path, O_RDONLY);
|
|
|
|
|
if (fd < 0) return NULL;
|
|
|
|
|
|
|
|
|
|
/* Read header first to get frame_size */
|
|
|
|
|
fc_header_t tmp_hdr;
|
|
|
|
|
if (pread(fd, &tmp_hdr, sizeof tmp_hdr, 0) != sizeof tmp_hdr) {
|
|
|
|
|
close(fd); return NULL;
|
|
|
|
|
}
|
|
|
|
|
if (tmp_hdr.magic != FC_MAGIC) {
|
|
|
|
|
close(fd); return NULL;
|
|
|
|
|
}
|
|
|
|
|
size_t total = fc_slot_shm_size(tmp_hdr.frame_size);
|
|
|
|
|
|
|
|
|
|
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
|
|
|
|
|
if (base == MAP_FAILED) { close(fd); return NULL; }
|
|
|
|
|
|
|
|
|
|
sem_t *sem = sem_open(sem_name, 0);
|
|
|
|
|
if (sem == SEM_FAILED) { munmap(base, total); close(fd); return NULL; }
|
|
|
|
|
|
|
|
|
|
struct fc_slot *s = calloc(1, sizeof *s);
|
|
|
|
|
if (!s) { sem_close(sem); munmap(base, total); close(fd); return NULL; }
|
|
|
|
|
s->shm_fd = fd;
|
|
|
|
|
s->base = base;
|
|
|
|
|
s->shm_size = total;
|
|
|
|
|
s->sem = sem;
|
|
|
|
|
strncpy(s->slot_id, slot_id, sizeof s->slot_id - 1);
|
|
|
|
|
strncpy(s->shm_path, shm_path, sizeof s->shm_path - 1);
|
|
|
|
|
strncpy(s->sem_name, sem_name, sizeof s->sem_name - 1);
|
|
|
|
|
return s;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Close a client-side slot handle. Does not destroy the slot.
|
|
|
|
|
*/
|
|
|
|
|
void fc_slot_close(struct fc_slot *s)
|
|
|
|
|
{
|
|
|
|
|
if (!s) return;
|
|
|
|
|
sem_close(s->sem);
|
|
|
|
|
munmap(s->base, s->shm_size);
|
|
|
|
|
close(s->shm_fd);
|
|
|
|
|
free(s);
|
|
|
|
|
}
|