/** * fc_writer.c — Framecache slot writer for deltacast-bridge. * * Uses only POSIX + libc — no external dependencies beyond what the bridge * already links. HTTP calls are done with raw sockets (tiny GET/POST/DELETE) * to avoid pulling in libcurl. */ #include "fc_writer.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* Re-use the shared memory layout from the framecache service */ #define FC_MAGIC 0x46524D43u #define FC_VERSION 1u #define FC_RING_DEPTH 120u #define FC_HEADER_SIZE 4096u #define FC_FRAME_HDR_SIZE 24u typedef struct { uint32_t magic; uint32_t version; uint32_t width; uint32_t height; uint32_t fps_num; uint32_t fps_den; uint32_t pixel_format; uint32_t frame_size; uint32_t ring_depth; uint32_t _reserved; _Atomic uint64_t write_cursor; _Atomic uint64_t dropped_frames; char source_type[32]; char slot_id[64]; uint8_t _pad[FC_HEADER_SIZE - 112]; } fc_hdr_t; typedef struct { uint64_t pts_us; uint64_t wall_us; uint32_t size; uint32_t _pad; uint8_t data[]; } fc_frm_t; struct fc_writer { void *base; size_t shm_size; int shm_fd; sem_t *sem; char slot_id[64]; char fc_url[256]; /* base URL for DELETE on close */ char shm_path[128]; char sem_name[128]; }; /* ── tiny HTTP helper ──────────────────────────────────────────────── */ static int http_request(const char *method, const char *host, int port, const char *path, const char *body, /* NULL for GET/DELETE */ char *resp_buf, size_t resp_len) { struct sockaddr_in sa; memset(&sa, 0, sizeof sa); sa.sin_family = AF_INET; sa.sin_port = htons((uint16_t)port); struct hostent *he = gethostbyname(host); if (!he) return -1; memcpy(&sa.sin_addr, he->h_addr_list[0], (size_t)he->h_length); int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) return -1; struct timeval tv = { .tv_sec = 5 }; setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof tv); setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof tv); if (connect(fd, (struct sockaddr *)&sa, sizeof sa) < 0) { close(fd); return -1; } char req[4096]; int req_len; if (body) { req_len = snprintf(req, sizeof req, "%s %s HTTP/1.0\r\n" "Host: %s:%d\r\n" "Content-Type: application/json\r\n" "Content-Length: %zu\r\n" "Connection: close\r\n\r\n" "%s", method, path, host, port, strlen(body), body); } else { req_len = snprintf(req, sizeof req, "%s %s HTTP/1.0\r\n" "Host: %s:%d\r\n" "Connection: close\r\n\r\n", method, path, host, port); } if (send(fd, req, (size_t)req_len, 0) < 0) { close(fd); return -1; } int status = -1; size_t got = 0; char buf[8192]; ssize_t n; while ((n = recv(fd, buf + got, sizeof buf - got - 1, 0)) > 0) got += (size_t)n; buf[got] = '\0'; /* Parse status line */ if (sscanf(buf, "HTTP/%*s %d", &status) != 1) status = -1; /* Copy body (after \r\n\r\n) into resp_buf */ if (resp_buf && resp_len > 0) { const char *body_start = strstr(buf, "\r\n\r\n"); if (body_start) { strncpy(resp_buf, body_start + 4, resp_len - 1); resp_buf[resp_len - 1] = '\0'; } } close(fd); return status; } /* Parse "host:port" or just "host" from a URL like "http://host:port" */ static void parse_url(const char *url, char *host, size_t hlen, int *port) { const char *p = url; if (strncmp(p, "http://", 7) == 0) p += 7; *port = 7435; const char *colon = strchr(p, ':'); if (colon) { size_t n = (size_t)(colon - p); if (n >= hlen) n = hlen - 1; strncpy(host, p, n); host[n] = '\0'; *port = atoi(colon + 1); } else { strncpy(host, p, hlen - 1); host[hlen - 1] = '\0'; } } static int json_str(const char *json, const char *key, char *out, size_t len) { char pat[128]; snprintf(pat, sizeof pat, "\"%s\":", key); const char *p = strstr(json, pat); if (!p) return -1; p += strlen(pat); while (*p == ' ') p++; if (*p != '"') return -1; p++; size_t i = 0; while (*p && *p != '"' && i < len - 1) out[i++] = *p++; out[i] = '\0'; return 0; } /* ── public API ────────────────────────────────────────────────────── */ fc_writer_t *fc_writer_open(const char *fc_url, const char *slot_id, uint32_t width, uint32_t height, uint32_t fps_num, uint32_t fps_den) { char host[128]; int port; parse_url(fc_url, host, sizeof host, &port); /* POST /slots */ char body[512]; snprintf(body, sizeof body, "{\"slot_id\":\"%s\"," "\"width\":%u,\"height\":%u," "\"fps_num\":%u,\"fps_den\":%u," "\"source_type\":\"deltacast\"}", slot_id, width, height, fps_num, fps_den); char resp[1024] = {0}; int status = http_request("POST", host, port, "/slots", body, resp, sizeof resp); if (status != 201) { fprintf(stderr, "[fc_writer:%s] POST /slots failed (HTTP %d): %s\n", slot_id, status, resp); return NULL; } char shm_path[128] = {0}, sem_name[128] = {0}; json_str(resp, "shm_path", shm_path, sizeof shm_path); json_str(resp, "sem_name", sem_name, sizeof sem_name); if (!shm_path[0] || !sem_name[0]) { fprintf(stderr, "[fc_writer:%s] bad response (missing shm_path/sem_name)\n", slot_id); return NULL; } /* mmap the shm file */ int fd = open(shm_path, O_RDWR); if (fd < 0) { fprintf(stderr, "[fc_writer:%s] open %s: %s\n", slot_id, shm_path, strerror(errno)); return NULL; } /* Read header to get frame_size */ fc_hdr_t hdr; if (pread(fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) { fprintf(stderr, "[fc_writer:%s] bad shm header\n", slot_id); close(fd); return NULL; } size_t total = (size_t)FC_HEADER_SIZE + (size_t)FC_RING_DEPTH * ((size_t)FC_FRAME_HDR_SIZE + hdr.frame_size); void *base = mmap(NULL, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (base == MAP_FAILED) { fprintf(stderr, "[fc_writer:%s] mmap: %s\n", slot_id, strerror(errno)); close(fd); return NULL; } sem_t *sem = sem_open(sem_name, 0); if (sem == SEM_FAILED) { fprintf(stderr, "[fc_writer:%s] sem_open %s: %s\n", slot_id, sem_name, strerror(errno)); munmap(base, total); close(fd); return NULL; } fc_writer_t *w = calloc(1, sizeof *w); if (!w) { sem_close(sem); munmap(base, total); close(fd); return NULL; } w->base = base; w->shm_size = total; w->shm_fd = fd; w->sem = sem; strncpy(w->slot_id, slot_id, sizeof w->slot_id - 1); strncpy(w->fc_url, fc_url, sizeof w->fc_url - 1); strncpy(w->shm_path, shm_path, sizeof w->shm_path - 1); strncpy(w->sem_name, sem_name, sizeof w->sem_name - 1); fprintf(stderr, "[fc_writer:%s] slot open (%ux%u %.2ffps shm=%s)\n", slot_id, width, height, fps_den ? (double)fps_num / fps_den : 0.0, shm_path); return w; } void fc_writer_write(fc_writer_t *w, const uint8_t *data, uint32_t size, uint64_t pts_us) { fc_hdr_t *hdr = (fc_hdr_t *)w->base; uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed); uint64_t idx = cur % FC_RING_DEPTH; /* Locate frame in ring */ uint8_t *frames = (uint8_t *)w->base + FC_HEADER_SIZE; fc_frm_t *frame = (fc_frm_t *)(frames + idx * ((size_t)FC_FRAME_HDR_SIZE + hdr->frame_size)); struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); uint64_t wall = (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL; frame->pts_us = pts_us; frame->wall_us = wall; frame->size = size < hdr->frame_size ? size : hdr->frame_size; memcpy(frame->data, data, frame->size); atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release); sem_post(w->sem); } void fc_writer_close(fc_writer_t *w) { if (!w) return; /* DELETE /slots/:id */ char host[128]; int port; parse_url(w->fc_url, host, sizeof host, &port); char path[192]; snprintf(path, sizeof path, "/slots/%s", w->slot_id); http_request("DELETE", host, port, path, NULL, NULL, 0); sem_close(w->sem); munmap(w->base, w->shm_size); close(w->shm_fd); fprintf(stderr, "[fc_writer:%s] slot closed\n", w->slot_id); free(w); }