package webrtc import ( "fmt" "io" "net/http" "strings" "sync" "sync/atomic" "time" "github.com/labstack/echo/v4" "github.com/pion/webrtc/v4" corewebrtc "github.com/datarhei/core/v16/core/webrtc" ) // WHIPHandler exposes the subsystem's WHIP Echo handlers. Wire them // into the /api/v3 group alongside the WHEP Handler via // WHIPHandler.Register. // // Lifecycle: ingest peers are tracked in a streamID→resourceID→IngestPeer // index. On every Publish a goroutine watches the peer's Done() channel; // when the publisher disconnects or Close() runs the entry is removed // and the counters tick back down — no leaks if OBS rage-quits. type WHIPHandler struct { sub *Subsystem mu sync.Mutex ingestByStream map[string]map[string]*corewebrtc.IngestPeer // streamID -> resource -> peer ingestStream map[string]string // resource -> streamID (reverse index) count int64 // atomic; concurrent publishers maxCapTotal int64 } // NewWHIPHandler wraps the subsystem in an Echo-compatible WHIP handler. // maxPublishers caps concurrent ingest sessions across all streams; // pass 0 to default to 64. // // The constructor registers a teardown hook on the Subsystem so that // when a process stops, any active WHIP publisher is closed automatically // (mirroring the pattern used by the WHEP NewHandler). func NewWHIPHandler(s *Subsystem, maxPublishers int) *WHIPHandler { total := int64(maxPublishers) if total <= 0 { total = 64 } h := &WHIPHandler{ sub: s, ingestByStream: make(map[string]map[string]*corewebrtc.IngestPeer), ingestStream: make(map[string]string), maxCapTotal: total, } // Wire the WHIP teardown hook so onWHIPProcessStop notifies us // before releasing the port allocation — same pattern as WHEP's // NewHandler → s.SetTeardownHook(h.tearDownStreamPeers). if s != nil { s.SetWHIPTeardownHook(h.tearDownStreamIngests) } return h } // Register mounts the WHIP routes on the provided Echo group. // // POST /whip/:id — start a publish session (SDP offer → answer) // DELETE /whip/:id/:resource — tear down a publish session // PATCH /whip/:id/:resource — trickle ICE candidates // OPTIONS /whip/* — CORS preflight func (h *WHIPHandler) Register(g *echo.Group) { g.OPTIONS("/whip/:id", h.preflight) g.OPTIONS("/whip/:id/:resource", h.preflight) g.POST("/whip/:id", h.Publish) g.DELETE("/whip/:id/:resource", h.Unpublish) g.PATCH("/whip/:id/:resource", h.TrickleIngest) } // Publish handles POST /whip/:id. // // The request body is an SDP offer (Content-Type: application/sdp). // Response is the SDP answer; the Location header identifies the // DELETE/PATCH resource for teardown and trickle ICE. // // The target process must have WHIPIngest.Enabled=true in its config, // and an active ingest port pair must have been allocated by // onWHIPProcessStart. // // @Summary Publish a WebRTC stream via WHIP // @Description Start a WHIP ingest session. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; Location header points at DELETE/PATCH resource. // @Tags v16.16.0 // @ID webrtc-3-whip-publish // @Accept application/sdp // @Produce application/sdp // @Param id path string true "Process ID with whip_ingest.enabled=true" // @Success 201 {string} string "SDP answer" // @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP" // @Failure 404 {string} string "no ingest stream registered for this process id" // @Failure 409 {string} string "a publisher is already active on this stream (single-publisher enforcement)" // @Failure 503 {string} string "global publisher cap reached" // @Failure 504 {string} string "ICE gathering timeout" // @Security ApiKeyAuth // @Router /api/v3/whip/{id} [post] func (h *WHIPHandler) Publish(c echo.Context) error { addCORS(c) t0 := time.Now() id := c.Param("id") if id == "" { h.recordRequest("publish", "", http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "missing stream id") } // Global cap: cheap atomic check before real work. if atomic.LoadInt64(&h.count) >= h.maxCapTotal { h.recordRequest("publish", id, http.StatusServiceUnavailable, t0) return c.String(http.StatusServiceUnavailable, "webrtc: whip: publisher cap reached") } ingest, ok := h.sub.lookupIngest(id) if !ok { h.recordRequest("publish", id, http.StatusNotFound, t0) return c.String(http.StatusNotFound, "webrtc: whip: no ingest registered for process") } // Single-publisher enforcement: WHIP is point-to-point — // only one active publisher per stream at a time. h.mu.Lock() if len(h.ingestByStream[id]) > 0 { h.mu.Unlock() h.recordRequest("publish", id, http.StatusConflict, t0) return c.String(http.StatusConflict, "webrtc: whip: stream already has an active publisher") } h.mu.Unlock() body, err := io.ReadAll(c.Request().Body) if err != nil { h.recordRequest("publish", id, http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "read body: "+err.Error()) } if len(body) == 0 || !strings.HasPrefix(string(body), "v=") { h.recordRequest("publish", id, http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error()) } offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} peer, err := h.sub.factory.CreateIngestPeer( c.Request().Context(), offer, ingest.videoPort, ingest.audioPort, ) if err != nil { switch err { case corewebrtc.ErrICETimeout: h.recordRequest("publish", id, http.StatusGatewayTimeout, t0) return c.String(http.StatusGatewayTimeout, err.Error()) default: h.recordRequest("publish", id, http.StatusInternalServerError, t0) return c.String(http.StatusInternalServerError, "create ingest peer: "+err.Error()) } } rid := peer.ResourceID() h.mu.Lock() if h.ingestByStream[id] == nil { h.ingestByStream[id] = make(map[string]*corewebrtc.IngestPeer) } h.ingestByStream[id][rid] = peer h.ingestStream[rid] = id h.mu.Unlock() atomic.AddInt64(&h.count, 1) // Auto-cleanup on disconnect. go h.awaitIngestClose(rid, peer) h.recordRequest("publish", id, http.StatusCreated, t0) // RFC 9261 §5.2: emit one Link header per configured ICE server so // that the publisher (OBS, browser, GStreamer, etc.) can discover // STUN/TURN without a separate signalling round-trip — symmetric // with the WHEP Subscribe Link header added in issue #19. for _, uri := range h.sub.ICEServerURIs() { c.Response().Header().Add("Link", "<"+uri+">; rel=\"ice-server\"") } c.Response().Header().Set("Content-Type", "application/sdp") c.Response().Header().Set("Location", "/whip/"+id+"/"+rid) c.Response().Header().Set("ETag", `"`+rid+`"`) return c.String(http.StatusCreated, peer.Answer().SDP) } // Unpublish handles DELETE /whip/:id/:resource. Returns 204 even when // the resource is unknown (DELETE is idempotent, per the WHIP spec). // // @Summary Tear down a WHIP publish session // @Tags v16.16.0 // @ID webrtc-3-whip-unpublish // @Param id path string true "Process ID" // @Param resource path string true "Resource ID from the Publish Location header" // @Success 204 "no content" // @Failure 400 {string} string "missing resource id" // @Security ApiKeyAuth // @Router /api/v3/whip/{id}/{resource} [delete] func (h *WHIPHandler) Unpublish(c echo.Context) error { addCORS(c) t0 := time.Now() resource := c.Param("resource") if resource == "" { h.recordRequest("unpublish", "", http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "missing resource id") } h.mu.Lock() streamID := h.ingestStream[resource] var peer *corewebrtc.IngestPeer if streamID != "" { peer = h.ingestByStream[streamID][resource] delete(h.ingestByStream[streamID], resource) if len(h.ingestByStream[streamID]) == 0 { delete(h.ingestByStream, streamID) } delete(h.ingestStream, resource) } h.mu.Unlock() if peer != nil { _ = peer.Close() } if streamID != "" { atomic.AddInt64(&h.count, -1) } h.recordRequest("unpublish", streamID, http.StatusNoContent, t0) return c.NoContent(http.StatusNoContent) } // TrickleIngest handles PATCH /whip/:id/:resource — adds ICE candidates // from a trickle-ice-sdpfrag body. // // The body format follows draft-ietf-wish-whip §5: one or more // "a=candidate:…" lines in application/trickle-ice-sdpfrag. Each // matching line is forwarded directly to the underlying PeerConnection. // An empty body or a body with no candidate lines is a no-op (clients // signal end-of-candidates via an a=end-of-candidates line, which // AddICECandidate correctly ignores at the Pion level). // // @Summary Trickle ICE candidates for a WHIP publish session // @Tags v16.16.0 // @ID webrtc-3-whip-trickle // @Accept application/trickle-ice-sdpfrag // @Param id path string true "Process ID" // @Param resource path string true "Resource ID from the Publish Location header" // @Success 204 "no content" // @Failure 400 {string} string "missing resource id or unreadable body" // @Failure 404 {string} string "peer not found" // @Security ApiKeyAuth // @Router /api/v3/whip/{id}/{resource} [patch] func (h *WHIPHandler) TrickleIngest(c echo.Context) error { addCORS(c) t0 := time.Now() resource := c.Param("resource") if resource == "" { h.recordRequest("trickle", "", http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "missing resource id") } h.mu.Lock() streamID := h.ingestStream[resource] var peer *corewebrtc.IngestPeer if streamID != "" { peer = h.ingestByStream[streamID][resource] } h.mu.Unlock() if peer == nil { h.recordRequest("trickle", streamID, http.StatusNotFound, t0) return c.NoContent(http.StatusNotFound) } body, err := io.ReadAll(c.Request().Body) if err != nil { h.recordRequest("trickle", streamID, http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "read body: "+err.Error()) } for _, line := range strings.Split(string(body), "\n") { line = strings.TrimSpace(line) if !strings.HasPrefix(line, "a=candidate:") { continue } cand := strings.TrimPrefix(line, "a=") _ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand}) } h.recordRequest("trickle", streamID, http.StatusNoContent, t0) return c.NoContent(http.StatusNoContent) } // Close tears down every active ingest peer (e.g., during Core shutdown). func (h *WHIPHandler) Close() { h.mu.Lock() peers := make([]*corewebrtc.IngestPeer, 0) for _, m := range h.ingestByStream { for _, p := range m { peers = append(peers, p) } } h.ingestByStream = make(map[string]map[string]*corewebrtc.IngestPeer) h.ingestStream = make(map[string]string) h.mu.Unlock() for _, p := range peers { if p != nil { _ = p.Close() } } atomic.StoreInt64(&h.count, 0) } // awaitIngestClose blocks on peer.Done() and yanks the index entry // when the publisher disconnects. Idempotent with Unpublish. func (h *WHIPHandler) awaitIngestClose(resource string, peer *corewebrtc.IngestPeer) { <-peer.Done() h.mu.Lock() streamID := h.ingestStream[resource] _, present := h.ingestStream[resource] if present { delete(h.ingestStream, resource) if streamID != "" { delete(h.ingestByStream[streamID], resource) if len(h.ingestByStream[streamID]) == 0 { delete(h.ingestByStream, streamID) } } } h.mu.Unlock() if present { atomic.AddInt64(&h.count, -1) } } // tearDownStreamIngests is called by the Subsystem's SetWHIPTeardownHook // to close any active publisher when the FFmpeg process stops. // Not exported — registered internally via NewWHIPHandler. func (h *WHIPHandler) tearDownStreamIngests(streamID string) { h.mu.Lock() bucket := h.ingestByStream[streamID] peers := make([]*corewebrtc.IngestPeer, 0, len(bucket)) for _, p := range bucket { peers = append(peers, p) } h.mu.Unlock() for _, p := range peers { if p != nil { _ = p.Close() } } } // recordRequest logs request metrics. Currently a thin wrapper; WHIP // metrics counters will be wired in alongside WHEP metrics in a follow-up. func (h *WHIPHandler) recordRequest(route, streamID string, code int, t0 time.Time) { // Placeholder — wire Prometheus metrics in a follow-up commit. _ = fmt.Sprintf("%s %s %d %.3fs", route, streamID, code, time.Since(t0).Seconds()) } // preflight answers CORS OPTIONS requests. func (h *WHIPHandler) preflight(c echo.Context) error { addCORS(c) return c.NoContent(http.StatusNoContent) }