feat(framecache): phase 1 — framecache container + consumer library
- services/framecache/: new standalone container
- slot.h/slot.c: shm ring buffer (120 frames, FC_MAGIC header, atomic
write_cursor, POSIX semaphore per slot)
- registry.h/registry.c: in-memory slot registry + /dev/shm/framecache/
registry.json persistence
- framecache.c: HTTP API server (libmicrohttpd, port 7435)
POST /slots, GET /slots, GET /slots/:id, DELETE /slots/:id, GET /health
- fc_client.h/fc_client.c: consumer library — fc_consumer_open/read/close
with per-consumer cursor, timeout via sem_timedwait, automatic skip+count
when consumer falls behind writer by > ring_depth frames
- fc_test_consumer.c: dev utility to attach to any slot and print fps/stats
- CMakeLists.txt: framecache server + fc_client static lib + test consumer
- Dockerfile: builder + slim runtime stages
- docker-compose.worker.yml: add framecache service (profile: capture,
ipc: host, shm_size from FC_SHM_SIZE_GB env var, healthcheck)
- .env.example: document FC_SHM_SIZE_GB with per-node guidance
2026-06-03 10:53:51 -04:00
|
|
|
/**
|
|
|
|
|
* fc_client.c — Consumer-side framecache client implementation.
|
|
|
|
|
*/
|
|
|
|
|
#include "fc_client.h"
|
|
|
|
|
#include "../src/slot.h"
|
|
|
|
|
|
|
|
|
|
#include <stdio.h>
|
|
|
|
|
#include <stdlib.h>
|
|
|
|
|
#include <string.h>
|
|
|
|
|
#include <errno.h>
|
|
|
|
|
#include <fcntl.h>
|
|
|
|
|
#include <sys/mman.h>
|
|
|
|
|
#include <sys/stat.h>
|
|
|
|
|
#include <semaphore.h>
|
|
|
|
|
#include <time.h>
|
|
|
|
|
#include <unistd.h>
|
|
|
|
|
|
|
|
|
|
#define SHM_DIR "/dev/shm/framecache"
|
|
|
|
|
#define SEM_PREFIX "/framecache-"
|
|
|
|
|
#define SEM_SUFFIX "-write"
|
|
|
|
|
|
|
|
|
|
struct fc_consumer {
|
|
|
|
|
int shm_fd;
|
|
|
|
|
void *base;
|
|
|
|
|
size_t shm_size;
|
|
|
|
|
sem_t *sem;
|
|
|
|
|
uint64_t read_cursor; /* consumer's own position in the ring */
|
|
|
|
|
uint64_t local_dropped; /* frames skipped by this consumer */
|
2026-06-03 12:25:34 -04:00
|
|
|
uint8_t *copy_buf; /* consumer-owned frame copy buffer (frame_size bytes) */
|
|
|
|
|
uint32_t frame_size; /* cached from header */
|
feat(framecache): phase 1 — framecache container + consumer library
- services/framecache/: new standalone container
- slot.h/slot.c: shm ring buffer (120 frames, FC_MAGIC header, atomic
write_cursor, POSIX semaphore per slot)
- registry.h/registry.c: in-memory slot registry + /dev/shm/framecache/
registry.json persistence
- framecache.c: HTTP API server (libmicrohttpd, port 7435)
POST /slots, GET /slots, GET /slots/:id, DELETE /slots/:id, GET /health
- fc_client.h/fc_client.c: consumer library — fc_consumer_open/read/close
with per-consumer cursor, timeout via sem_timedwait, automatic skip+count
when consumer falls behind writer by > ring_depth frames
- fc_test_consumer.c: dev utility to attach to any slot and print fps/stats
- CMakeLists.txt: framecache server + fc_client static lib + test consumer
- Dockerfile: builder + slim runtime stages
- docker-compose.worker.yml: add framecache service (profile: capture,
ipc: host, shm_size from FC_SHM_SIZE_GB env var, healthcheck)
- .env.example: document FC_SHM_SIZE_GB with per-node guidance
2026-06-03 10:53:51 -04:00
|
|
|
char slot_id[FC_MAX_SLOT_ID];
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static uint64_t now_us(void)
|
|
|
|
|
{
|
|
|
|
|
struct timespec ts;
|
|
|
|
|
clock_gettime(CLOCK_REALTIME, &ts);
|
|
|
|
|
return (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms)
|
|
|
|
|
{
|
|
|
|
|
char shm_path[128], sem_name[128];
|
|
|
|
|
snprintf(shm_path, sizeof shm_path, "%s/%s", SHM_DIR, slot_id);
|
|
|
|
|
snprintf(sem_name, sizeof sem_name, "%s%s%s", SEM_PREFIX, slot_id, SEM_SUFFIX);
|
|
|
|
|
|
|
|
|
|
uint64_t deadline = now_us() + wait_ms * 1000ULL;
|
|
|
|
|
int fd = -1;
|
|
|
|
|
while (1) {
|
|
|
|
|
fd = open(shm_path, O_RDONLY);
|
|
|
|
|
if (fd >= 0) break;
|
|
|
|
|
if (now_us() >= deadline) return NULL;
|
|
|
|
|
struct timespec ts = { .tv_nsec = 100000000 }; /* 100ms */
|
|
|
|
|
nanosleep(&ts, NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Read header to get frame_size */
|
|
|
|
|
fc_header_t hdr;
|
|
|
|
|
if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) {
|
|
|
|
|
close(fd); return NULL;
|
|
|
|
|
}
|
|
|
|
|
size_t total = fc_slot_shm_size(hdr.frame_size);
|
|
|
|
|
|
|
|
|
|
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
|
|
|
|
|
if (base == MAP_FAILED) { close(fd); return NULL; }
|
|
|
|
|
|
|
|
|
|
sem_t *sem = sem_open(sem_name, 0);
|
|
|
|
|
if (sem == SEM_FAILED) { munmap(base, total); close(fd); return NULL; }
|
|
|
|
|
|
|
|
|
|
fc_consumer_t *c = calloc(1, sizeof *c);
|
|
|
|
|
if (!c) { sem_close(sem); munmap(base, total); close(fd); return NULL; }
|
|
|
|
|
|
2026-06-03 12:25:34 -04:00
|
|
|
/* Consumer-owned copy buffer — fc_consumer_read copies the frame here and
|
|
|
|
|
* re-validates the cursor afterward, so a writer lapping a slow consumer
|
|
|
|
|
* cannot corrupt the frame the caller is using. */
|
|
|
|
|
c->copy_buf = malloc(hdr.frame_size);
|
|
|
|
|
if (!c->copy_buf) {
|
|
|
|
|
free(c); sem_close(sem); munmap(base, total); close(fd); return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c->shm_fd = fd;
|
|
|
|
|
c->base = base;
|
|
|
|
|
c->shm_size = total;
|
|
|
|
|
c->sem = sem;
|
|
|
|
|
c->frame_size = hdr.frame_size;
|
feat(framecache): phase 1 — framecache container + consumer library
- services/framecache/: new standalone container
- slot.h/slot.c: shm ring buffer (120 frames, FC_MAGIC header, atomic
write_cursor, POSIX semaphore per slot)
- registry.h/registry.c: in-memory slot registry + /dev/shm/framecache/
registry.json persistence
- framecache.c: HTTP API server (libmicrohttpd, port 7435)
POST /slots, GET /slots, GET /slots/:id, DELETE /slots/:id, GET /health
- fc_client.h/fc_client.c: consumer library — fc_consumer_open/read/close
with per-consumer cursor, timeout via sem_timedwait, automatic skip+count
when consumer falls behind writer by > ring_depth frames
- fc_test_consumer.c: dev utility to attach to any slot and print fps/stats
- CMakeLists.txt: framecache server + fc_client static lib + test consumer
- Dockerfile: builder + slim runtime stages
- docker-compose.worker.yml: add framecache service (profile: capture,
ipc: host, shm_size from FC_SHM_SIZE_GB env var, healthcheck)
- .env.example: document FC_SHM_SIZE_GB with per-node guidance
2026-06-03 10:53:51 -04:00
|
|
|
/* Start reading from the current write position so we don't replay old frames */
|
|
|
|
|
c->read_cursor = atomic_load_explicit(
|
|
|
|
|
&((fc_header_t *)base)->write_cursor, memory_order_acquire);
|
|
|
|
|
c->local_dropped = 0;
|
|
|
|
|
strncpy(c->slot_id, slot_id, FC_MAX_SLOT_ID - 1);
|
|
|
|
|
return c;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms)
|
|
|
|
|
{
|
|
|
|
|
fc_header_t *hdr = (fc_header_t *)c->base;
|
2026-06-03 12:25:34 -04:00
|
|
|
int dropped = 0; /* set when this call skipped one or more frames */
|
|
|
|
|
|
|
|
|
|
/* ── Wait for new data ──────────────────────────────────────────────
|
|
|
|
|
* The semaphore is used ONLY as an edge-wakeup hint, never as a frame
|
|
|
|
|
* counter. The writer posts once per frame, but a consumer that skips
|
|
|
|
|
* frames (lap) or reads less often than the writer posts would otherwise
|
|
|
|
|
* leave the count climbing unbounded — causing sem_timedwait to never
|
|
|
|
|
* block (100% CPU busy-spin) and eventually EOVERFLOW. So:
|
|
|
|
|
* - cursor-diff (write_cursor - read_cursor) is the SOURCE OF TRUTH for
|
|
|
|
|
* whether a frame is available.
|
|
|
|
|
* - we drain the semaphore to zero (sem_trywait loop) so the count never
|
|
|
|
|
* accumulates.
|
|
|
|
|
* - if no frame is available we block on ONE sem_timedwait for wakeup. */
|
|
|
|
|
for (;;) {
|
|
|
|
|
uint64_t write_cur = atomic_load_explicit(&hdr->write_cursor,
|
|
|
|
|
memory_order_acquire);
|
|
|
|
|
|
|
|
|
|
/* Lap detection: if the writer is more than ring_depth ahead, the
|
|
|
|
|
* oldest unread frames have been overwritten — skip to the oldest
|
|
|
|
|
* still-valid frame. */
|
|
|
|
|
if (write_cur > c->read_cursor + hdr->ring_depth) {
|
|
|
|
|
uint64_t skipped = write_cur - c->read_cursor - hdr->ring_depth;
|
|
|
|
|
c->read_cursor = write_cur - hdr->ring_depth;
|
|
|
|
|
c->local_dropped += skipped;
|
|
|
|
|
/* NOTE: do NOT write hdr->dropped_frames here — the consumer maps
|
|
|
|
|
* the shm PROT_READ (read-only), so an atomic write would SIGSEGV.
|
|
|
|
|
* Per-consumer drops are tracked in c->local_dropped and exposed
|
|
|
|
|
* via fc_consumer_dropped(). The writer owns hdr->dropped_frames. */
|
|
|
|
|
dropped = 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (c->read_cursor < write_cur) {
|
|
|
|
|
/* A frame is available — drain the semaphore so its count never
|
|
|
|
|
* accumulates, then read+copy below. */
|
|
|
|
|
while (sem_trywait(c->sem) == 0) { /* drain */ }
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* No frame yet — drain stale posts, then block for a wakeup. */
|
|
|
|
|
while (sem_trywait(c->sem) == 0) { /* drain */ }
|
|
|
|
|
|
|
|
|
|
struct timespec abs_ts;
|
|
|
|
|
clock_gettime(CLOCK_REALTIME, &abs_ts);
|
|
|
|
|
abs_ts.tv_sec += (time_t)(timeout_ms / 1000);
|
|
|
|
|
abs_ts.tv_nsec += (long)((timeout_ms % 1000) * 1000000L);
|
|
|
|
|
if (abs_ts.tv_nsec >= 1000000000L) { abs_ts.tv_sec++; abs_ts.tv_nsec -= 1000000000L; }
|
|
|
|
|
|
|
|
|
|
int w = sem_timedwait(c->sem, &abs_ts);
|
|
|
|
|
if (w != 0) {
|
|
|
|
|
if (errno == ETIMEDOUT) {
|
|
|
|
|
/* Re-check the cursor once more before giving up — the writer
|
|
|
|
|
* may have advanced between our check and the wait. */
|
|
|
|
|
uint64_t wc2 = atomic_load_explicit(&hdr->write_cursor,
|
|
|
|
|
memory_order_acquire);
|
|
|
|
|
if (c->read_cursor < wc2) continue;
|
|
|
|
|
return FC_TIMEOUT;
|
|
|
|
|
}
|
|
|
|
|
if (errno == EINTR) continue;
|
|
|
|
|
return FC_ERROR;
|
|
|
|
|
}
|
|
|
|
|
/* Woken — loop to re-evaluate cursor-diff. */
|
feat(framecache): phase 1 — framecache container + consumer library
- services/framecache/: new standalone container
- slot.h/slot.c: shm ring buffer (120 frames, FC_MAGIC header, atomic
write_cursor, POSIX semaphore per slot)
- registry.h/registry.c: in-memory slot registry + /dev/shm/framecache/
registry.json persistence
- framecache.c: HTTP API server (libmicrohttpd, port 7435)
POST /slots, GET /slots, GET /slots/:id, DELETE /slots/:id, GET /health
- fc_client.h/fc_client.c: consumer library — fc_consumer_open/read/close
with per-consumer cursor, timeout via sem_timedwait, automatic skip+count
when consumer falls behind writer by > ring_depth frames
- fc_test_consumer.c: dev utility to attach to any slot and print fps/stats
- CMakeLists.txt: framecache server + fc_client static lib + test consumer
- Dockerfile: builder + slim runtime stages
- docker-compose.worker.yml: add framecache service (profile: capture,
ipc: host, shm_size from FC_SHM_SIZE_GB env var, healthcheck)
- .env.example: document FC_SHM_SIZE_GB with per-node guidance
2026-06-03 10:53:51 -04:00
|
|
|
}
|
|
|
|
|
|
2026-06-03 12:25:34 -04:00
|
|
|
/* ── Copy the frame into the consumer-owned buffer ──────────────────── */
|
|
|
|
|
fc_frame_t *frame = fc_frame_at(c->base, hdr->frame_size, c->read_cursor);
|
|
|
|
|
uint32_t fsz = frame->size;
|
|
|
|
|
if (fsz > hdr->frame_size) fsz = hdr->frame_size;
|
|
|
|
|
uint64_t pts = frame->pts_us;
|
|
|
|
|
uint64_t wall = frame->wall_us;
|
|
|
|
|
memcpy(c->copy_buf, frame->data, fsz);
|
|
|
|
|
|
|
|
|
|
/* ── Re-validate AFTER the copy ─────────────────────────────────────
|
|
|
|
|
* If the writer lapped us during the copy (overwrote this slot), the copy
|
|
|
|
|
* may be torn — discard it and signal DROPPED so the caller reads again. */
|
|
|
|
|
uint64_t write_after = atomic_load_explicit(&hdr->write_cursor,
|
|
|
|
|
memory_order_acquire);
|
|
|
|
|
if (write_after > c->read_cursor + hdr->ring_depth) {
|
|
|
|
|
uint64_t skipped = write_after - c->read_cursor - hdr->ring_depth;
|
|
|
|
|
c->read_cursor = write_after - hdr->ring_depth;
|
feat(framecache): phase 1 — framecache container + consumer library
- services/framecache/: new standalone container
- slot.h/slot.c: shm ring buffer (120 frames, FC_MAGIC header, atomic
write_cursor, POSIX semaphore per slot)
- registry.h/registry.c: in-memory slot registry + /dev/shm/framecache/
registry.json persistence
- framecache.c: HTTP API server (libmicrohttpd, port 7435)
POST /slots, GET /slots, GET /slots/:id, DELETE /slots/:id, GET /health
- fc_client.h/fc_client.c: consumer library — fc_consumer_open/read/close
with per-consumer cursor, timeout via sem_timedwait, automatic skip+count
when consumer falls behind writer by > ring_depth frames
- fc_test_consumer.c: dev utility to attach to any slot and print fps/stats
- CMakeLists.txt: framecache server + fc_client static lib + test consumer
- Dockerfile: builder + slim runtime stages
- docker-compose.worker.yml: add framecache service (profile: capture,
ipc: host, shm_size from FC_SHM_SIZE_GB env var, healthcheck)
- .env.example: document FC_SHM_SIZE_GB with per-node guidance
2026-06-03 10:53:51 -04:00
|
|
|
c->local_dropped += skipped;
|
2026-06-03 12:25:34 -04:00
|
|
|
return FC_LAPPED; /* copy torn — ref not valid, caller reads again */
|
feat(framecache): phase 1 — framecache container + consumer library
- services/framecache/: new standalone container
- slot.h/slot.c: shm ring buffer (120 frames, FC_MAGIC header, atomic
write_cursor, POSIX semaphore per slot)
- registry.h/registry.c: in-memory slot registry + /dev/shm/framecache/
registry.json persistence
- framecache.c: HTTP API server (libmicrohttpd, port 7435)
POST /slots, GET /slots, GET /slots/:id, DELETE /slots/:id, GET /health
- fc_client.h/fc_client.c: consumer library — fc_consumer_open/read/close
with per-consumer cursor, timeout via sem_timedwait, automatic skip+count
when consumer falls behind writer by > ring_depth frames
- fc_test_consumer.c: dev utility to attach to any slot and print fps/stats
- CMakeLists.txt: framecache server + fc_client static lib + test consumer
- Dockerfile: builder + slim runtime stages
- docker-compose.worker.yml: add framecache service (profile: capture,
ipc: host, shm_size from FC_SHM_SIZE_GB env var, healthcheck)
- .env.example: document FC_SHM_SIZE_GB with per-node guidance
2026-06-03 10:53:51 -04:00
|
|
|
}
|
|
|
|
|
|
2026-06-03 12:25:34 -04:00
|
|
|
/* Copy is valid. */
|
|
|
|
|
ref->data = c->copy_buf;
|
|
|
|
|
ref->size = fsz;
|
|
|
|
|
ref->pts_us = pts;
|
|
|
|
|
ref->wall_us = wall;
|
feat(framecache): phase 1 — framecache container + consumer library
- services/framecache/: new standalone container
- slot.h/slot.c: shm ring buffer (120 frames, FC_MAGIC header, atomic
write_cursor, POSIX semaphore per slot)
- registry.h/registry.c: in-memory slot registry + /dev/shm/framecache/
registry.json persistence
- framecache.c: HTTP API server (libmicrohttpd, port 7435)
POST /slots, GET /slots, GET /slots/:id, DELETE /slots/:id, GET /health
- fc_client.h/fc_client.c: consumer library — fc_consumer_open/read/close
with per-consumer cursor, timeout via sem_timedwait, automatic skip+count
when consumer falls behind writer by > ring_depth frames
- fc_test_consumer.c: dev utility to attach to any slot and print fps/stats
- CMakeLists.txt: framecache server + fc_client static lib + test consumer
- Dockerfile: builder + slim runtime stages
- docker-compose.worker.yml: add framecache service (profile: capture,
ipc: host, shm_size from FC_SHM_SIZE_GB env var, healthcheck)
- .env.example: document FC_SHM_SIZE_GB with per-node guidance
2026-06-03 10:53:51 -04:00
|
|
|
ref->seq = c->read_cursor;
|
|
|
|
|
|
|
|
|
|
c->read_cursor++;
|
|
|
|
|
return dropped ? FC_DROPPED : FC_OK;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void fc_consumer_close(fc_consumer_t *c)
|
|
|
|
|
{
|
|
|
|
|
if (!c) return;
|
2026-06-03 12:25:34 -04:00
|
|
|
if (c->copy_buf) free(c->copy_buf);
|
feat(framecache): phase 1 — framecache container + consumer library
- services/framecache/: new standalone container
- slot.h/slot.c: shm ring buffer (120 frames, FC_MAGIC header, atomic
write_cursor, POSIX semaphore per slot)
- registry.h/registry.c: in-memory slot registry + /dev/shm/framecache/
registry.json persistence
- framecache.c: HTTP API server (libmicrohttpd, port 7435)
POST /slots, GET /slots, GET /slots/:id, DELETE /slots/:id, GET /health
- fc_client.h/fc_client.c: consumer library — fc_consumer_open/read/close
with per-consumer cursor, timeout via sem_timedwait, automatic skip+count
when consumer falls behind writer by > ring_depth frames
- fc_test_consumer.c: dev utility to attach to any slot and print fps/stats
- CMakeLists.txt: framecache server + fc_client static lib + test consumer
- Dockerfile: builder + slim runtime stages
- docker-compose.worker.yml: add framecache service (profile: capture,
ipc: host, shm_size from FC_SHM_SIZE_GB env var, healthcheck)
- .env.example: document FC_SHM_SIZE_GB with per-node guidance
2026-06-03 10:53:51 -04:00
|
|
|
sem_close(c->sem);
|
|
|
|
|
munmap(c->base, c->shm_size);
|
|
|
|
|
close(c->shm_fd);
|
|
|
|
|
free(c);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uint64_t fc_consumer_write_cursor(fc_consumer_t *c)
|
|
|
|
|
{
|
|
|
|
|
fc_header_t *hdr = (fc_header_t *)c->base;
|
|
|
|
|
return atomic_load(&hdr->write_cursor);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uint64_t fc_consumer_dropped(fc_consumer_t *c)
|
|
|
|
|
{
|
|
|
|
|
return c->local_dropped;
|
|
|
|
|
}
|