/** * net_ingest.c — Network source (RTMP/SRT) → framecache slot ingest. * * Spawns ffmpeg to decode a network stream to raw UYVY422 on stdout, then * reads those frames and writes them into a framecache slot via the shm * ring buffer. Registers the slot with the framecache HTTP API on startup * and deregisters on clean exit. * * Usage: * net_ingest --url * --slot-id * --fc-url http://framecache:7435 * --width --height * --fps-num --fps-den * [--source-type srt|rtmp] * [--listen] # SRT/RTMP listener mode * [--listen-port ] # listener port (SRT default 9000, RTMP 1935) * [--stream-key ] # RTMP stream key (default "stream") * * Emits one JSON line to stderr on first frame: * {"slot_id":"","width":W,"height":H,"fps_num":N,"fps_den":D, * "source_type":"srt","pix_fmt":"uyvy422"} * * Exits 0 on clean stop (SIGTERM), 1 on error. * * The framecache slot stays alive between ffmpeg reconnects (listener mode): * net_ingest keeps the slot open and restarts ffmpeg on disconnect. */ #include "slot.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* Re-use fc_writer helpers inline (no external dep) */ #define FC_URL_DEFAULT "http://localhost:7435" static volatile int g_stop = 0; static void on_signal(int s) { (void)s; g_stop = 1; } /* ── Tiny HTTP POST/DELETE (same approach as fc_writer.c) ─────────── */ static int http_req(const char *method, const char *host, int port, const char *path, const char *body, char *resp, size_t resp_len) { struct sockaddr_in sa; memset(&sa, 0, sizeof sa); sa.sin_family = AF_INET; sa.sin_port = htons((uint16_t)port); struct hostent *he = gethostbyname(host); if (!he) return -1; memcpy(&sa.sin_addr, he->h_addr_list[0], (size_t)he->h_length); int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) return -1; struct timeval tv = { .tv_sec = 5 }; setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof tv); setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof tv); if (connect(fd, (struct sockaddr *)&sa, sizeof sa) < 0) { close(fd); return -1; } char req[4096]; int rlen; if (body) rlen = snprintf(req, sizeof req, "%s %s HTTP/1.0\r\nHost: %s:%d\r\n" "Content-Type: application/json\r\nContent-Length: %zu\r\n" "Connection: close\r\n\r\n%s", method, path, host, port, strlen(body), body); else rlen = snprintf(req, sizeof req, "%s %s HTTP/1.0\r\nHost: %s:%d\r\nConnection: close\r\n\r\n", method, path, host, port); send(fd, req, (size_t)rlen, 0); int status = -1; size_t got = 0; char buf[8192]; ssize_t n; while ((n = recv(fd, buf + got, sizeof buf - got - 1, 0)) > 0) got += (size_t)n; buf[got] = '\0'; sscanf(buf, "HTTP/%*s %d", &status); if (resp && resp_len) { const char *b = strstr(buf, "\r\n\r\n"); if (b) { strncpy(resp, b + 4, resp_len - 1); resp[resp_len-1] = '\0'; } } close(fd); return status; } static void parse_url(const char *url, char *host, size_t hl, int *port) { const char *p = url; if (!strncmp(p, "http://", 7)) p += 7; *port = 7435; const char *colon = strchr(p, ':'); if (colon) { size_t n = (size_t)(colon - p) < hl ? (size_t)(colon - p) : hl - 1; strncpy(host, p, n); host[n] = '\0'; *port = atoi(colon + 1); } else { strncpy(host, p, hl - 1); host[hl-1] = '\0'; } } static int json_str(const char *j, const char *k, char *out, size_t len) { char pat[128]; snprintf(pat, sizeof pat, "\"%s\":", k); const char *p = strstr(j, pat); if (!p) return -1; p += strlen(pat); while (*p == ' ') p++; if (*p != '"') return -1; p++; size_t i = 0; while (*p && *p != '"' && i < len - 1) out[i++] = *p++; out[i] = '\0'; return 0; } /* ── Frame size helpers ────────────────────────────────────────────── */ static inline size_t frame_bytes(uint32_t w, uint32_t h) { return (size_t)w * h * 2; /* UYVY422 */ } /* ── Register slot with framecache ────────────────────────────────── */ static int register_slot(const char *fc_url, const char *slot_id, uint32_t w, uint32_t h, uint32_t fps_num, uint32_t fps_den, const char *source_type, char *shm_path, size_t sp_len, char *sem_name, size_t sn_len) { char host[128]; int port; parse_url(fc_url, host, sizeof host, &port); char body[512]; snprintf(body, sizeof body, "{\"slot_id\":\"%s\",\"width\":%u,\"height\":%u," "\"fps_num\":%u,\"fps_den\":%u,\"source_type\":\"%s\"}", slot_id, w, h, fps_num, fps_den, source_type); char resp[1024] = {0}; int st = http_req("POST", host, port, "/slots", body, resp, sizeof resp); if (st != 201) { fprintf(stderr, "[net_ingest] POST /slots failed HTTP %d: %s\n", st, resp); return -1; } json_str(resp, "shm_path", shm_path, sp_len); json_str(resp, "sem_name", sem_name, sn_len); return 0; } static void deregister_slot(const char *fc_url, const char *slot_id) { char host[128]; int port; parse_url(fc_url, host, sizeof host, &port); char path[192]; snprintf(path, sizeof path, "/slots/%s", slot_id); http_req("DELETE", host, port, path, NULL, NULL, 0); } /* ── Open shm + semaphore for writing ─────────────────────────────── */ #include #include typedef struct { void *base; size_t size; int fd; sem_t *sem; } ShmWriter; static int shm_writer_open(const char *shm_path, const char *sem_name, ShmWriter *sw) { sw->fd = open(shm_path, O_RDWR); if (sw->fd < 0) return -1; fc_header_t hdr; if (pread(sw->fd, &hdr, sizeof hdr, 0) != sizeof hdr || hdr.magic != FC_MAGIC) { close(sw->fd); return -1; } sw->size = fc_slot_shm_size(hdr.frame_size); sw->base = mmap(NULL, sw->size, PROT_READ | PROT_WRITE, MAP_SHARED, sw->fd, 0); if (sw->base == MAP_FAILED) { close(sw->fd); return -1; } sw->sem = sem_open(sem_name, 0); if (sw->sem == SEM_FAILED) { munmap(sw->base, sw->size); close(sw->fd); return -1; } return 0; } static void shm_write_frame(ShmWriter *sw, const uint8_t *data, uint32_t size, uint64_t pts_us) { fc_header_t *hdr = (fc_header_t *)sw->base; uint64_t cur = atomic_load_explicit(&hdr->write_cursor, memory_order_relaxed); fc_frame_t *frame = fc_frame_at(sw->base, hdr->frame_size, cur); struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); frame->pts_us = pts_us; frame->wall_us = (uint64_t)ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000ULL; frame->size = size < hdr->frame_size ? size : hdr->frame_size; memcpy(frame->data, data, frame->size); atomic_store_explicit(&hdr->write_cursor, cur + 1, memory_order_release); sem_post(sw->sem); } static void shm_writer_close(ShmWriter *sw) { if (sw->sem) { sem_close(sw->sem); sw->sem = NULL; } if (sw->base) { munmap(sw->base, sw->size); sw->base = NULL; } if (sw->fd >= 0) { close(sw->fd); sw->fd = -1; } } /* ── Build ffmpeg args for network decode → rawvideo stdout ────────── * All dynamic strings are written into CALLER-OWNED buffers (passed in) so * there is no per-call strdup leak across listener reconnects. The video * filter forces the EXACT target W:H (scale=W:H, not iw:ih) so a mid-stream * source resolution change cannot desync the fixed-size frame reassembly — * ffmpeg's scaler always emits width*height*2 bytes per frame. * * Caller must provide: * url_buf — at least 320 bytes (built listener URL, or copied caller URL) * vf_buf — at least 64 bytes (scale/format filter) */ static int build_ffmpeg_args( char **argv, int max_args, const char *url, const char *source_type, int listen, int listen_port, const char *stream_key, uint32_t w, uint32_t h, char *url_buf, size_t url_buf_len, char *vf_buf, size_t vf_buf_len) { (void)max_args; char port_str[16]; int i = 0; argv[i++] = "ffmpeg"; argv[i++] = "-hide_banner"; argv[i++] = "-loglevel"; argv[i++] = "warning"; /* Input */ argv[i++] = "-probesize"; argv[i++] = "32M"; argv[i++] = "-analyzeduration"; argv[i++] = "10M"; argv[i++] = "-fflags"; argv[i++] = "+genpts"; if (!strcmp(source_type, "srt") && listen) { snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 9000); snprintf(url_buf, url_buf_len, "srt://0.0.0.0:%s?mode=listener", port_str); argv[i++] = "-i"; argv[i++] = url_buf; } else if (!strcmp(source_type, "rtmp") && listen) { snprintf(port_str, sizeof port_str, "%d", listen_port ? listen_port : 1935); snprintf(url_buf, url_buf_len, "rtmp://0.0.0.0:%s/live/%s", port_str, stream_key ? stream_key : "stream"); argv[i++] = "-listen"; argv[i++] = "1"; argv[i++] = "-i"; argv[i++] = url_buf; } else { argv[i++] = "-i"; argv[i++] = (char *)url; } /* Force EXACT output dimensions so every frame is exactly w*h*2 bytes, * even if the source resolution changes mid-stream (SRT/RTMP reconnect to * a different encoder). This is the resync guarantee for the fixed-size * frame reassembly loop in main(). */ snprintf(vf_buf, vf_buf_len, "scale=%u:%u,format=uyvy422", w, h); /* Video output: raw UYVY422 to stdout */ argv[i++] = "-map"; argv[i++] = "0:v:0"; argv[i++] = "-vf"; argv[i++] = vf_buf; argv[i++] = "-f"; argv[i++] = "rawvideo"; argv[i++] = "-pix_fmt"; argv[i++] = "uyvy422"; argv[i++] = "pipe:1"; argv[i] = NULL; return i; } /* ── Main ──────────────────────────────────────────────────────────── */ int main(int argc, char *argv[]) { const char *url = NULL; const char *slot_id = NULL; const char *fc_url = getenv("FC_URL") ? getenv("FC_URL") : FC_URL_DEFAULT; const char *source_type = "srt"; uint32_t width = 1920, height = 1080; uint32_t fps_num = 30000, fps_den = 1001; int listen = 0, listen_port = 0; const char *stream_key = "stream"; for (int i = 1; i < argc; i++) { if (!strcmp(argv[i], "--url") && i+1 < argc) url = argv[++i]; else if (!strcmp(argv[i], "--slot-id") && i+1 < argc) slot_id = argv[++i]; else if (!strcmp(argv[i], "--fc-url") && i+1 < argc) fc_url = argv[++i]; else if (!strcmp(argv[i], "--source-type") && i+1 < argc) source_type = argv[++i]; else if (!strcmp(argv[i], "--width") && i+1 < argc) width = (uint32_t)atoi(argv[++i]); else if (!strcmp(argv[i], "--height") && i+1 < argc) height = (uint32_t)atoi(argv[++i]); else if (!strcmp(argv[i], "--fps-num") && i+1 < argc) fps_num = (uint32_t)atoi(argv[++i]); else if (!strcmp(argv[i], "--fps-den") && i+1 < argc) fps_den = (uint32_t)atoi(argv[++i]); else if (!strcmp(argv[i], "--listen")) listen = 1; else if (!strcmp(argv[i], "--listen-port") && i+1 < argc) listen_port = atoi(argv[++i]); else if (!strcmp(argv[i], "--stream-key") && i+1 < argc) stream_key = argv[++i]; } if (!slot_id) { fprintf(stderr, "[net_ingest] --slot-id required\n"); return 1; } if (!url && !listen) { fprintf(stderr, "[net_ingest] --url or --listen required\n"); return 1; } signal(SIGTERM, on_signal); signal(SIGINT, on_signal); signal(SIGPIPE, SIG_IGN); signal(SIGCHLD, SIG_DFL); /* ── Register slot ──────────────────────────────────────────────── */ char shm_path[128] = {0}, sem_name[128] = {0}; if (register_slot(fc_url, slot_id, width, height, fps_num, fps_den, source_type, shm_path, sizeof shm_path, sem_name, sizeof sem_name) < 0) { return 1; } ShmWriter sw = { .fd = -1 }; if (shm_writer_open(shm_path, sem_name, &sw) < 0) { fprintf(stderr, "[net_ingest] failed to open shm %s\n", shm_path); deregister_slot(fc_url, slot_id); return 1; } size_t fsz = frame_bytes(width, height); uint8_t *frame_buf = malloc(fsz); if (!frame_buf) { shm_writer_close(&sw); deregister_slot(fc_url, slot_id); return 1; } uint64_t frame_seq = 0; int reported = 0; fprintf(stderr, "[net_ingest] slot=%s %ux%u %.2ffps source=%s%s\n", slot_id, width, height, fps_den ? (double)fps_num / fps_den : 0.0, source_type, listen ? " (listener)" : ""); /* Caller-owned arg buffers — reused each reconnect, no per-loop leak. */ char ff_url_buf[320]; char ff_vf_buf[64]; /* ── Outer reconnect loop (listener mode stays alive between sessions) */ while (!g_stop) { /* Build ffmpeg argv (writes into ff_url_buf / ff_vf_buf, no strdup) */ char *ff_argv[64]; build_ffmpeg_args(ff_argv, 64, url, source_type, listen, listen_port, stream_key, width, height, ff_url_buf, sizeof ff_url_buf, ff_vf_buf, sizeof ff_vf_buf); /* Spawn ffmpeg with stdout pipe */ int pfd[2]; if (pipe(pfd) < 0) break; pid_t pid = fork(); if (pid < 0) { close(pfd[0]); close(pfd[1]); break; } if (pid == 0) { /* Child: redirect stdout to pipe write end */ dup2(pfd[1], STDOUT_FILENO); close(pfd[0]); close(pfd[1]); execvp("ffmpeg", ff_argv); _exit(127); } /* Parent: read from pipe read end */ close(pfd[1]); int rfd = pfd[0]; size_t buf_off = 0; while (!g_stop) { ssize_t n = read(rfd, frame_buf + buf_off, fsz - buf_off); if (n <= 0) break; /* ffmpeg exited or pipe closed */ buf_off += (size_t)n; if (buf_off < fsz) continue; /* incomplete frame — keep reading */ /* Full frame assembled */ uint64_t pts_us = fps_num > 0 ? frame_seq * 1000000ULL * fps_den / fps_num : 0; shm_write_frame(&sw, frame_buf, (uint32_t)fsz, pts_us); frame_seq++; buf_off = 0; if (!reported) { fprintf(stderr, "{\"slot_id\":\"%s\",\"width\":%u,\"height\":%u," "\"fps_num\":%u,\"fps_den\":%u," "\"source_type\":\"%s\",\"pix_fmt\":\"uyvy422\"}\n", slot_id, width, height, fps_num, fps_den, source_type); fflush(stderr); reported = 1; } } close(rfd); /* Reap ffmpeg child */ int wstatus; kill(pid, SIGTERM); waitpid(pid, &wstatus, 0); if (!listen || g_stop) break; /* Listener mode: wait 1s then reconnect */ fprintf(stderr, "[net_ingest] listener: waiting for next connection\n"); struct timespec ts = { .tv_sec = 1 }; nanosleep(&ts, NULL); } free(frame_buf); shm_writer_close(&sw); deregister_slot(fc_url, slot_id); fprintf(stderr, "[net_ingest] done frames=%llu\n", (unsigned long long)frame_seq); return 0; }