package webrtc import ( "fmt" "io" "net/http" "strings" "sync" "sync/atomic" "time" "github.com/labstack/echo/v4" "github.com/pion/webrtc/v4" corewebrtc "github.com/datarhei/core/v16/core/webrtc" ) // Default per-stream peer cap when the caller passes 0. The total cap // (passed to NewHandler) is enforced separately and takes precedence. const defaultMaxPeersPerStream = 8 // Handler exposes the subsystem's WHEP Echo handlers. Wire them into // the /api/v3 group (or a sibling group) via Handler.Register. // // Lifecycle: peers are tracked in a streamID→resourceID→Peer index. // On every Subscribe we spin a tiny goroutine watching the new peer's // Done() channel; when ICE fails or Close() runs the index entry is // removed and the counters tick back down — no leaks if the browser // rage-quits. type Handler struct { sub *Subsystem mu sync.Mutex peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer peerStream map[string]string // resource -> streamID (reverse index) count int64 // atomic maxCapTotal int64 maxCapPerStrm int64 met *webrtcMetrics } // NewHandler wraps the subsystem in an Echo-compatible HTTP handler. // The maxPeers argument caps concurrent subscribers across all streams; // pass 0 to use a generous default (matches corewebrtc.DefaultConfig). // The per-stream cap is taken from the corewebrtc default; pass a // non-zero value to override via NewHandlerWithCaps. func NewHandler(s *Subsystem, maxPeers int) *Handler { return NewHandlerWithCaps(s, maxPeers, 0) } // NewHandlerWithCaps is NewHandler plus an explicit per-stream cap. // maxPeersPerStream <= 0 falls back to defaultMaxPeersPerStream. func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler { total := int64(maxPeers) if total <= 0 { total = int64(corewebrtc.DefaultConfig().MaxPeersTotal) } perStream := int64(maxPeersPerStream) if perStream <= 0 { perStream = defaultMaxPeersPerStream } h := &Handler{ sub: s, peersByStream: make(map[string]map[string]*corewebrtc.Peer), peerStream: make(map[string]string), maxCapTotal: total, maxCapPerStrm: perStream, } // Subsystem broadcasts process-stop via this hook so the handler // can yank stale peer entries before their Sources close out // from underneath them. if s != nil { s.SetTeardownHook(h.tearDownStreamPeers) } return h } // Register mounts the WHEP routes on the provided Echo group. // // CORS preflights are answered on every WHEP path; regular WHEP // responses also carry the Access-Control-* headers so browser-side // players living on a different origin can subscribe. func (h *Handler) Register(g *echo.Group) { g.OPTIONS("/whep/:id", h.preflight) g.OPTIONS("/whep/:id/:resource", h.preflight) g.POST("/whep/:id", h.Subscribe) g.DELETE("/whep/:id/:resource", h.Unsubscribe) g.PATCH("/whep/:id/:resource", h.Trickle) } // Subscribe handles POST /whep/:id. Request body is an SDP offer, // response is an SDP answer with a Location header pointing at the // DELETE/PATCH resource. // // @Summary Subscribe to a WebRTC stream via WHEP // @Description Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE. // @Tags v16.16.0 // @ID webrtc-3-whep-subscribe // @Accept application/sdp // @Produce application/sdp // @Param id path string true "Process ID with config.webrtc.enabled=true" // @Success 201 {string} string "SDP answer" // @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP" // @Failure 404 {string} string "no stream registered for this process id" // @Failure 406 {string} string "offer SDP missing required H264 / Opus rtpmap" // @Failure 503 {string} string "peer cap reached (per-stream or total)" // @Failure 504 {string} string "ICE gathering timeout" // @Security ApiKeyAuth // @Router /api/v3/whep/{id} [post] func (h *Handler) Subscribe(c echo.Context) error { addCORS(c) t0 := time.Now() id := c.Param("id") if id == "" { h.recordRequest("subscribe", "", http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "missing stream id") } // Total cap: cheap atomic check before doing real work. if atomic.LoadInt64(&h.count) >= h.maxCapTotal { if h.met != nil { h.met.capRejections.WithLabelValues("", "global").Inc() } h.recordRequest("subscribe", id, http.StatusServiceUnavailable, t0) return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error()) } stream, ok := h.sub.lookup(id) if !ok { h.recordRequest("subscribe", id, http.StatusNotFound, t0) return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error()) } // Per-stream cap: needs the lock since we're indexing per stream. h.mu.Lock() if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm { h.mu.Unlock() if h.met != nil { h.met.capRejections.WithLabelValues(id, "stream").Inc() } h.recordRequest("subscribe", id, http.StatusServiceUnavailable, t0) return c.String(http.StatusServiceUnavailable, "webrtc: per-stream peer cap reached") } h.mu.Unlock() body, err := io.ReadAll(c.Request().Body) if err != nil { h.recordRequest("subscribe", id, http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "read body: "+err.Error()) } if len(body) == 0 || !strings.HasPrefix(string(body), "v=") { h.recordRequest("subscribe", id, http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error()) } if err := requireH264AndOpus(string(body)); err != nil { if h.met != nil { if cme, ok2 := err.(*codecMismatchError); ok2 { for _, kind := range cme.missing { h.met.codecMismatches.WithLabelValues(id, strings.ToLower(kind)).Inc() } } } h.recordRequest("subscribe", id, http.StatusNotAcceptable, t0) return c.String(http.StatusNotAcceptable, err.Error()) } offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer) if err != nil { // Surface the design's error matrix. switch err { case corewebrtc.ErrICETimeout: h.recordRequest("subscribe", id, http.StatusGatewayTimeout, t0) return c.String(http.StatusGatewayTimeout, err.Error()) case corewebrtc.ErrCodecMismatch: h.recordRequest("subscribe", id, http.StatusNotAcceptable, t0) return c.String(http.StatusNotAcceptable, err.Error()) default: h.recordRequest("subscribe", id, http.StatusInternalServerError, t0) return c.String(http.StatusInternalServerError, "create peer: "+err.Error()) } } rid := peer.ResourceID() h.mu.Lock() if h.peersByStream[id] == nil { h.peersByStream[id] = make(map[string]*corewebrtc.Peer) } h.peersByStream[id][rid] = peer h.peerStream[rid] = id h.mu.Unlock() atomic.AddInt64(&h.count, 1) // Auto-cleanup: when Pion's OnConnectionStateChange triggers // peer.Close() (ICE failed/disconnected), the Done channel // closes and we yank the index entry. Without this the map // leaks for the lifetime of the handler. go h.awaitPeerClose(rid, peer) // Track ICE establishment duration asynchronously. go h.trackICE(id, peer, time.Now()) h.recordRequest("subscribe", id, http.StatusCreated, t0) c.Response().Header().Set("Content-Type", "application/sdp") c.Response().Header().Set("Location", "/whep/"+id+"/"+rid) c.Response().Header().Set("ETag", `"`+rid+`"`) return c.String(http.StatusCreated, peer.Answer().SDP) } // Unsubscribe handles DELETE /whep/:id/:resource. Per WHEP spec we // return 204 even when the resource is unknown — DELETE is idempotent // and a re-issued tear-down should never error out. // // @Summary Tear down a WHEP subscription // @Description Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec. // @Tags v16.16.0 // @ID webrtc-3-whep-unsubscribe // @Param id path string true "Process ID" // @Param resource path string true "Resource ID from the Subscribe Location header" // @Success 204 "no content" // @Failure 400 {string} string "missing resource id" // @Security ApiKeyAuth // @Router /api/v3/whep/{id}/{resource} [delete] func (h *Handler) Unsubscribe(c echo.Context) error { addCORS(c) t0 := time.Now() resource := c.Param("resource") if resource == "" { h.recordRequest("unsubscribe", "", http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "missing resource id") } h.mu.Lock() streamID := h.peerStream[resource] var peer *corewebrtc.Peer if streamID != "" { peer = h.peersByStream[streamID][resource] delete(h.peersByStream[streamID], resource) if len(h.peersByStream[streamID]) == 0 { delete(h.peersByStream, streamID) } delete(h.peerStream, resource) } h.mu.Unlock() if peer != nil { _ = peer.Close() } if streamID != "" { atomic.AddInt64(&h.count, -1) } h.recordRequest("unsubscribe", streamID, http.StatusNoContent, t0) return c.NoContent(http.StatusNoContent) } // Trickle handles PATCH /whep/:id/:resource — adds ICE candidates // from a trickle-ice-sdpfrag body. Empty body is a no-op (clients // signal end-of-candidates via an a=end-of-candidates line, which // AddICECandidate accepts). // // @Summary Trickle ICE candidates for a WHEP subscription // @Description Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag. // @Tags v16.16.0 // @ID webrtc-3-whep-trickle // @Accept application/trickle-ice-sdpfrag // @Param id path string true "Process ID" // @Param resource path string true "Resource ID from the Subscribe Location header" // @Success 204 "no content" // @Failure 400 {string} string "missing resource id or unreadable body" // @Failure 404 {string} string "peer not found" // @Security ApiKeyAuth // @Router /api/v3/whep/{id}/{resource} [patch] func (h *Handler) Trickle(c echo.Context) error { addCORS(c) t0 := time.Now() resource := c.Param("resource") if resource == "" { h.recordRequest("trickle", "", http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "missing resource id") } h.mu.Lock() streamID := h.peerStream[resource] var peer *corewebrtc.Peer if streamID != "" { peer = h.peersByStream[streamID][resource] } h.mu.Unlock() if peer == nil { h.recordRequest("trickle", streamID, http.StatusNotFound, t0) return c.NoContent(http.StatusNotFound) } body, err := io.ReadAll(c.Request().Body) if err != nil { h.recordRequest("trickle", streamID, http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "read body: "+err.Error()) } for _, line := range strings.Split(string(body), "\n") { line = strings.TrimSpace(line) if !strings.HasPrefix(line, "a=candidate:") { continue } cand := strings.TrimPrefix(line, "a=") _ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand}) } h.recordRequest("trickle", streamID, http.StatusNoContent, t0) return c.NoContent(http.StatusNoContent) } // recordRequest records whepRequests counter and whepRequestDuration histogram // for any WHEP route outcome. Silently no-ops if metrics are not initialised. func (h *Handler) recordRequest(route, streamID string, code int, t0 time.Time) { if h.met == nil { return } codeStr := fmt.Sprintf("%d", code) h.met.whepRequests.WithLabelValues(route, codeStr, streamID).Inc() h.met.whepRequestDuration.WithLabelValues(route, streamID).Observe(time.Since(t0).Seconds()) } // preflight answers a CORS OPTIONS request; the headers are also // echoed on every other response. func (h *Handler) preflight(c echo.Context) error { addCORS(c) return c.NoContent(http.StatusNoContent) } // Close tears down every active peer (e.g., during Core shutdown). func (h *Handler) Close() { h.mu.Lock() peers := make([]*corewebrtc.Peer, 0) for _, m := range h.peersByStream { for _, p := range m { peers = append(peers, p) } } h.peersByStream = make(map[string]map[string]*corewebrtc.Peer) h.peerStream = make(map[string]string) h.mu.Unlock() for _, p := range peers { if p != nil { _ = p.Close() } } atomic.StoreInt64(&h.count, 0) } // awaitPeerClose blocks on peer.Done() and yanks the index entry when // the peer self-closes (ICE failed/disconnected). Idempotent with // the Unsubscribe path: if Unsubscribe ran first the index is already // empty and we just decrement the counter once on first arrival. func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) { <-peer.Done() h.mu.Lock() streamID := h.peerStream[resource] _, present := h.peerStream[resource] if present { delete(h.peerStream, resource) if streamID != "" { delete(h.peersByStream[streamID], resource) if len(h.peersByStream[streamID]) == 0 { delete(h.peersByStream, streamID) } } } h.mu.Unlock() if present { atomic.AddInt64(&h.count, -1) } } // tearDownStreamPeers is the callback the Subsystem runs in its // onProcessStop hook. It closes every peer subscribed to that // stream and records FFmpeg leg failures if any peers were active, // which indicates the process died unexpectedly. func (h *Handler) tearDownStreamPeers(streamID string) { h.mu.Lock() bucket := h.peersByStream[streamID] hadPeers := len(bucket) > 0 peers := make([]*corewebrtc.Peer, 0, len(bucket)) for _, p := range bucket { peers = append(peers, p) } h.mu.Unlock() for _, p := range peers { if p != nil { _ = p.Close() } } if hadPeers && h.met != nil { h.met.ffmpegLegFailures.WithLabelValues(streamID, "video").Inc() h.met.ffmpegLegFailures.WithLabelValues(streamID, "audio").Inc() } } // addCORS emits the response headers a browser-side WHEP player // expects. WHEP's Location and ETag headers must be exposed for // fetch() to read them across origins. func addCORS(c echo.Context) { hh := c.Response().Header() hh.Set("Access-Control-Allow-Origin", "*") hh.Set("Access-Control-Allow-Methods", "POST, DELETE, PATCH, OPTIONS") hh.Set("Access-Control-Allow-Headers", "Content-Type, Authorization, If-Match, If-None-Match") hh.Set("Access-Control-Expose-Headers", "Location, ETag") } // requireH264AndOpus does a coarse SDP scan to confirm the offer // includes both an H.264 video rtpmap and an Opus audio rtpmap. The // design treats codec mismatch as a 406, never a silent black frame. // // This is intentionally a string scan rather than a full SDP parse: // every modern browser advertises H.264 and Opus by name, and a // dependency on a real SDP parser for one validation step is // disproportionate. M4 may swap this for pion/sdp.v3 when other // surfaces also need parsing. func requireH264AndOpus(sdp string) error { lower := strings.ToLower(sdp) hasH264 := strings.Contains(lower, "h264/90000") || strings.Contains(lower, " h264/") hasOpus := strings.Contains(lower, "opus/48000") || strings.Contains(lower, " opus/") if hasH264 && hasOpus { return nil } missing := []string{} if !hasH264 { missing = append(missing, "H264") } if !hasOpus { missing = append(missing, "Opus") } return &codecMismatchError{missing: missing} } type codecMismatchError struct{ missing []string } func (e *codecMismatchError) Error() string { return "webrtc: codec mismatch — offer is missing: " + strings.Join(e.missing, ", ") }