/** * fc_client.c — Consumer-side framecache client implementation. */ #include "fc_client.h" #include "../src/slot.h" #include #include #include #include #include #include #include #include #include #include #define SHM_DIR "/dev/shm/framecache" #define SEM_PREFIX "/framecache-" #define SEM_SUFFIX "-write" struct fc_consumer { int shm_fd; void *base; size_t shm_size; sem_t *sem; uint64_t read_cursor; /* consumer's own position in the ring */ uint64_t local_dropped; /* frames skipped by this consumer */ uint8_t *copy_buf; /* consumer-owned copy buffer: [video frame_size][audio audio_max] */ uint32_t frame_size; /* cached from header (video bytes) */ uint32_t audio_max; /* cached from header (audio region capacity) */ char slot_id[FC_MAX_SLOT_ID]; }; static uint64_t now_us(void) { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); return (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL; } fc_consumer_t *fc_consumer_open(const char *slot_id, uint64_t wait_ms) { char shm_path[128], sem_name[128]; snprintf(shm_path, sizeof shm_path, "%s/%s", SHM_DIR, slot_id); snprintf(sem_name, sizeof sem_name, "%s%s%s", SEM_PREFIX, slot_id, SEM_SUFFIX); uint64_t deadline = now_us() + wait_ms * 1000ULL; int fd = -1; while (1) { fd = open(shm_path, O_RDONLY); if (fd >= 0) break; if (now_us() >= deadline) return NULL; struct timespec ts = { .tv_nsec = 100000000 }; /* 100ms */ nanosleep(&ts, NULL); } /* Read header to get frame_size */ fc_header_t hdr; if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) { close(fd); return NULL; } /* Version gate: an old reader against a new writer (or vice-versa) computes * the wrong per-entry stride and would misparse. Fail safe. */ if (hdr.version != FC_VERSION) { fprintf(stderr, "[fc_client] slot %s version %u != expected %u — refusing\n", slot_id, hdr.version, FC_VERSION); close(fd); return NULL; } size_t total = fc_slot_shm_size(hdr.frame_size); void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0); if (base == MAP_FAILED) { close(fd); return NULL; } sem_t *sem = sem_open(sem_name, 0); if (sem == SEM_FAILED) { munmap(base, total); close(fd); return NULL; } fc_consumer_t *c = calloc(1, sizeof *c); if (!c) { sem_close(sem); munmap(base, total); close(fd); return NULL; } /* Consumer-owned copy buffer — fc_consumer_read copies the frame here and * re-validates the cursor afterward, so a writer lapping a slow consumer * cannot corrupt the frame the caller is using. Sized for video + audio so * the frame-coupled audio is copied atomically with its video. */ c->copy_buf = malloc((size_t)hdr.frame_size + hdr.audio_max_bytes); if (!c->copy_buf) { free(c); sem_close(sem); munmap(base, total); close(fd); return NULL; } c->shm_fd = fd; c->base = base; c->shm_size = total; c->sem = sem; c->frame_size = hdr.frame_size; c->audio_max = hdr.audio_max_bytes; /* Start reading from the current write position so we don't replay old frames */ c->read_cursor = atomic_load_explicit( &((fc_header_t *)base)->write_cursor, memory_order_acquire); c->local_dropped = 0; strncpy(c->slot_id, slot_id, FC_MAX_SLOT_ID - 1); return c; } int fc_consumer_read(fc_consumer_t *c, fc_frame_ref_t *ref, uint64_t timeout_ms) { fc_header_t *hdr = (fc_header_t *)c->base; int dropped = 0; /* set when this call skipped one or more frames */ /* ── Wait for new data ────────────────────────────────────────────── * The semaphore is used ONLY as an edge-wakeup hint, never as a frame * counter. The writer posts once per frame, but a consumer that skips * frames (lap) or reads less often than the writer posts would otherwise * leave the count climbing unbounded — causing sem_timedwait to never * block (100% CPU busy-spin) and eventually EOVERFLOW. So: * - cursor-diff (write_cursor - read_cursor) is the SOURCE OF TRUTH for * whether a frame is available. * - we drain the semaphore to zero (sem_trywait loop) so the count never * accumulates. * - if no frame is available we block on ONE sem_timedwait for wakeup. */ for (;;) { uint64_t write_cur = atomic_load_explicit(&hdr->write_cursor, memory_order_acquire); /* Lap detection: if the writer is more than ring_depth ahead, the * oldest unread frames have been overwritten — skip to the oldest * still-valid frame. */ if (write_cur > c->read_cursor + hdr->ring_depth) { uint64_t skipped = write_cur - c->read_cursor - hdr->ring_depth; c->read_cursor = write_cur - hdr->ring_depth; c->local_dropped += skipped; /* NOTE: do NOT write hdr->dropped_frames here — the consumer maps * the shm PROT_READ (read-only), so an atomic write would SIGSEGV. * Per-consumer drops are tracked in c->local_dropped and exposed * via fc_consumer_dropped(). The writer owns hdr->dropped_frames. */ dropped = 1; } if (c->read_cursor < write_cur) { /* A frame is available — drain the semaphore so its count never * accumulates, then read+copy below. */ while (sem_trywait(c->sem) == 0) { /* drain */ } break; } /* No frame yet — drain stale posts, then block for a wakeup. */ while (sem_trywait(c->sem) == 0) { /* drain */ } struct timespec abs_ts; clock_gettime(CLOCK_REALTIME, &abs_ts); abs_ts.tv_sec += (time_t)(timeout_ms / 1000); abs_ts.tv_nsec += (long)((timeout_ms % 1000) * 1000000L); if (abs_ts.tv_nsec >= 1000000000L) { abs_ts.tv_sec++; abs_ts.tv_nsec -= 1000000000L; } int w = sem_timedwait(c->sem, &abs_ts); if (w != 0) { if (errno == ETIMEDOUT) { /* Re-check the cursor once more before giving up — the writer * may have advanced between our check and the wait. */ uint64_t wc2 = atomic_load_explicit(&hdr->write_cursor, memory_order_acquire); if (c->read_cursor < wc2) continue; return FC_TIMEOUT; } if (errno == EINTR) continue; return FC_ERROR; } /* Woken — loop to re-evaluate cursor-diff. */ } /* ── Copy the entry (video + this frame's audio) into the owned buffer ─ * Both are copied from the SAME ring entry in the SAME iteration, so the * audio handed to the caller is exactly this video frame's embedded audio — * frame-coupled, no second buffer to drift. copy_buf layout mirrors the * shm entry: [video frame_size][audio audio_max]. */ fc_frame_t *frame = fc_frame_at(c->base, hdr->frame_size, c->read_cursor); uint32_t fsz = frame->size; if (fsz > hdr->frame_size) fsz = hdr->frame_size; uint32_t asz = frame->audio_size; if (asz > c->audio_max) asz = c->audio_max; uint64_t pts = frame->pts_us; uint64_t wall = frame->wall_us; memcpy(c->copy_buf, frame->data, fsz); if (asz) memcpy(c->copy_buf + c->frame_size, fc_frame_audio(frame, hdr->frame_size), asz); /* ── Re-validate AFTER the copy ───────────────────────────────────── * If the writer lapped us during the copy (overwrote this slot), the copy * may be torn — discard it and signal DROPPED so the caller reads again. */ uint64_t write_after = atomic_load_explicit(&hdr->write_cursor, memory_order_acquire); if (write_after > c->read_cursor + hdr->ring_depth) { uint64_t skipped = write_after - c->read_cursor - hdr->ring_depth; c->read_cursor = write_after - hdr->ring_depth; c->local_dropped += skipped; return FC_LAPPED; /* copy torn — ref not valid, caller reads again */ } /* Copy is valid. */ ref->data = c->copy_buf; ref->size = fsz; ref->audio = asz ? (c->copy_buf + c->frame_size) : NULL; ref->audio_size = asz; ref->pts_us = pts; ref->wall_us = wall; ref->seq = c->read_cursor; c->read_cursor++; return dropped ? FC_DROPPED : FC_OK; } void fc_consumer_close(fc_consumer_t *c) { if (!c) return; if (c->copy_buf) free(c->copy_buf); sem_close(c->sem); munmap(c->base, c->shm_size); close(c->shm_fd); free(c); } uint64_t fc_consumer_write_cursor(fc_consumer_t *c) { fc_header_t *hdr = (fc_header_t *)c->base; return atomic_load(&hdr->write_cursor); } uint64_t fc_consumer_dropped(fc_consumer_t *c) { return c->local_dropped; } int fc_consumer_info(fc_consumer_t *c, fc_stream_info_t *info) { if (!c || !info) return -1; fc_header_t *hdr = (fc_header_t *)c->base; info->width = hdr->width; info->height = hdr->height; info->fps_num = hdr->fps_num; info->fps_den = hdr->fps_den; info->pixel_format = hdr->pixel_format; info->frame_size = hdr->frame_size; info->audio_rate = hdr->audio_rate ? hdr->audio_rate : FC_AUDIO_RATE; info->audio_channels = hdr->audio_channels ? hdr->audio_channels : FC_AUDIO_CHANNELS; info->audio_sample_bytes = FC_AUDIO_SAMPLE_BYTES; /* s16le */ return 0; }