diff --git a/app/webrtc/handler.go b/app/webrtc/handler.go index 34db336..a8d30d9 100644 --- a/app/webrtc/handler.go +++ b/app/webrtc/handler.go @@ -13,54 +13,91 @@ import ( corewebrtc "github.com/datarhei/core/v16/core/webrtc" ) +// Default per-stream peer cap when the caller passes 0. The total cap +// (passed to NewHandler) is enforced separately and takes precedence. +const defaultMaxPeersPerStream = 8 + // Handler exposes the subsystem's WHEP Echo handlers. Wire them into // the /api/v3 group (or a sibling group) via Handler.Register. +// +// Lifecycle: peers are tracked in a streamID→resourceID→Peer index. +// On every Subscribe we spin a tiny goroutine watching the new peer's +// Done() channel; when ICE fails or Close() runs the index entry is +// removed and the counters tick back down — no leaks if the browser +// rage-quits. type Handler struct { sub *Subsystem - peersMu sync.Mutex - peers map[string]*corewebrtc.Peer // resourceID -> peer - count int64 // atomic, for cap check without lock - maxCap int64 + mu sync.Mutex + peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer + peerStream map[string]string // resource -> streamID (reverse index) + count int64 // atomic + maxCapTotal int64 + maxCapPerStrm int64 } // NewHandler wraps the subsystem in an Echo-compatible HTTP handler. -// The maxPeers argument caps concurrent subscribers; pass 0 to use a -// generous default (matches corewebrtc.DefaultConfig). +// The maxPeers argument caps concurrent subscribers across all streams; +// pass 0 to use a generous default (matches corewebrtc.DefaultConfig). +// The per-stream cap is taken from the corewebrtc default; pass a +// non-zero value to override via NewHandlerWithCaps. func NewHandler(s *Subsystem, maxPeers int) *Handler { - cap := int64(maxPeers) - if cap <= 0 { - cap = int64(corewebrtc.DefaultConfig().MaxPeersTotal) - } - return &Handler{ - sub: s, - peers: make(map[string]*corewebrtc.Peer), - maxCap: cap, - } + return NewHandlerWithCaps(s, maxPeers, 0) } -// Register mounts the WHEP routes on the provided Echo group. WHEP -// POST is /whep/:id, WHEP DELETE is /whep/:id/:resource. +// NewHandlerWithCaps is NewHandler plus an explicit per-stream cap. +// maxPeersPerStream <= 0 falls back to defaultMaxPeersPerStream. +func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler { + total := int64(maxPeers) + if total <= 0 { + total = int64(corewebrtc.DefaultConfig().MaxPeersTotal) + } + perStream := int64(maxPeersPerStream) + if perStream <= 0 { + perStream = defaultMaxPeersPerStream + } + h := &Handler{ + sub: s, + peersByStream: make(map[string]map[string]*corewebrtc.Peer), + peerStream: make(map[string]string), + maxCapTotal: total, + maxCapPerStrm: perStream, + } + // Subsystem broadcasts process-stop via this hook so the handler + // can yank stale peer entries before their Sources close out + // from underneath them. + if s != nil { + s.SetTeardownHook(h.tearDownStreamPeers) + } + return h +} + +// Register mounts the WHEP routes on the provided Echo group. // -// The routes are deliberately unauthenticated in M2 because WHEP -// clients (browsers, OBS) don't carry the Core JWT. M3 will add -// per-process signed-URL tokens; for M2 the deployment is expected -// to put the endpoint behind an authenticated reverse-proxy or VPN. +// CORS preflights are answered on every WHEP path; regular WHEP +// responses also carry the Access-Control-* headers so browser-side +// players living on a different origin can subscribe. func (h *Handler) Register(g *echo.Group) { + g.OPTIONS("/whep/:id", h.preflight) + g.OPTIONS("/whep/:id/:resource", h.preflight) g.POST("/whep/:id", h.Subscribe) g.DELETE("/whep/:id/:resource", h.Unsubscribe) + g.PATCH("/whep/:id/:resource", h.Trickle) } // Subscribe handles POST /whep/:id. Request body is an SDP offer, // response is an SDP answer with a Location header pointing at the -// DELETE resource. +// DELETE/PATCH resource. func (h *Handler) Subscribe(c echo.Context) error { + addCORS(c) + id := c.Param("id") if id == "" { return c.String(http.StatusBadRequest, "missing stream id") } - if atomic.LoadInt64(&h.count) >= h.maxCap { + // Total cap: cheap atomic check before doing real work. + if atomic.LoadInt64(&h.count) >= h.maxCapTotal { return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error()) } @@ -69,6 +106,14 @@ func (h *Handler) Subscribe(c echo.Context) error { return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error()) } + // Per-stream cap: needs the lock since we're indexing per stream. + h.mu.Lock() + if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm { + h.mu.Unlock() + return c.String(http.StatusServiceUnavailable, "webrtc: per-stream peer cap reached") + } + h.mu.Unlock() + body, err := io.ReadAll(c.Request().Body) if err != nil { return c.String(http.StatusBadRequest, "read body: "+err.Error()) @@ -76,59 +121,227 @@ func (h *Handler) Subscribe(c echo.Context) error { if len(body) == 0 || !strings.HasPrefix(string(body), "v=") { return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error()) } + if err := requireH264AndOpus(string(body)); err != nil { + return c.String(http.StatusNotAcceptable, err.Error()) + } offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer) if err != nil { - return c.String(http.StatusInternalServerError, "create peer: "+err.Error()) + // Surface the design's error matrix. + switch err { + case corewebrtc.ErrICETimeout: + return c.String(http.StatusGatewayTimeout, err.Error()) + case corewebrtc.ErrCodecMismatch: + return c.String(http.StatusNotAcceptable, err.Error()) + default: + return c.String(http.StatusInternalServerError, "create peer: "+err.Error()) + } } - h.peersMu.Lock() - h.peers[peer.ResourceID()] = peer - h.peersMu.Unlock() + rid := peer.ResourceID() + h.mu.Lock() + if h.peersByStream[id] == nil { + h.peersByStream[id] = make(map[string]*corewebrtc.Peer) + } + h.peersByStream[id][rid] = peer + h.peerStream[rid] = id + h.mu.Unlock() atomic.AddInt64(&h.count, 1) + // Auto-cleanup: when Pion's OnConnectionStateChange triggers + // peer.Close() (ICE failed/disconnected), the Done channel + // closes and we yank the index entry. Without this the map + // leaks for the lifetime of the handler. + go h.awaitPeerClose(rid, peer) + c.Response().Header().Set("Content-Type", "application/sdp") - c.Response().Header().Set("Location", "/whep/"+id+"/"+peer.ResourceID()) + c.Response().Header().Set("Location", "/whep/"+id+"/"+rid) + c.Response().Header().Set("ETag", `"`+rid+`"`) return c.String(http.StatusCreated, peer.Answer().SDP) } -// Unsubscribe handles DELETE /whep/:id/:resource. The :id is part of -// the path per WHEP spec but we only need :resource to locate the -// peer; :id is accepted for route symmetry. +// Unsubscribe handles DELETE /whep/:id/:resource. Per WHEP spec we +// return 204 even when the resource is unknown — DELETE is idempotent +// and a re-issued tear-down should never error out. func (h *Handler) Unsubscribe(c echo.Context) error { + addCORS(c) + resource := c.Param("resource") if resource == "" { return c.String(http.StatusBadRequest, "missing resource id") } - h.peersMu.Lock() - peer, ok := h.peers[resource] - if ok { - delete(h.peers, resource) + h.mu.Lock() + streamID := h.peerStream[resource] + var peer *corewebrtc.Peer + if streamID != "" { + peer = h.peersByStream[streamID][resource] + delete(h.peersByStream[streamID], resource) + if len(h.peersByStream[streamID]) == 0 { + delete(h.peersByStream, streamID) + } + delete(h.peerStream, resource) } - h.peersMu.Unlock() + h.mu.Unlock() - if !ok { + if peer != nil { + _ = peer.Close() + } + if streamID != "" { + atomic.AddInt64(&h.count, -1) + } + return c.NoContent(http.StatusNoContent) +} + +// Trickle handles PATCH /whep/:id/:resource — adds ICE candidates +// from a trickle-ice-sdpfrag body. Empty body is a no-op (clients +// signal end-of-candidates via an a=end-of-candidates line, which +// AddICECandidate accepts). +func (h *Handler) Trickle(c echo.Context) error { + addCORS(c) + + resource := c.Param("resource") + if resource == "" { + return c.String(http.StatusBadRequest, "missing resource id") + } + + h.mu.Lock() + streamID := h.peerStream[resource] + var peer *corewebrtc.Peer + if streamID != "" { + peer = h.peersByStream[streamID][resource] + } + h.mu.Unlock() + if peer == nil { return c.NoContent(http.StatusNotFound) } - _ = peer.Close() - atomic.AddInt64(&h.count, -1) + + body, err := io.ReadAll(c.Request().Body) + if err != nil { + return c.String(http.StatusBadRequest, "read body: "+err.Error()) + } + for _, line := range strings.Split(string(body), "\n") { + line = strings.TrimSpace(line) + if !strings.HasPrefix(line, "a=candidate:") { + continue + } + cand := strings.TrimPrefix(line, "a=") + _ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand}) + } + return c.NoContent(http.StatusNoContent) +} + +// preflight answers a CORS OPTIONS request; the headers are also +// echoed on every other response. +func (h *Handler) preflight(c echo.Context) error { + addCORS(c) return c.NoContent(http.StatusNoContent) } // Close tears down every active peer (e.g., during Core shutdown). func (h *Handler) Close() { - h.peersMu.Lock() - peers := make([]*corewebrtc.Peer, 0, len(h.peers)) - for _, p := range h.peers { - peers = append(peers, p) + h.mu.Lock() + peers := make([]*corewebrtc.Peer, 0) + for _, m := range h.peersByStream { + for _, p := range m { + peers = append(peers, p) + } } - h.peers = make(map[string]*corewebrtc.Peer) - h.peersMu.Unlock() + h.peersByStream = make(map[string]map[string]*corewebrtc.Peer) + h.peerStream = make(map[string]string) + h.mu.Unlock() for _, p := range peers { - _ = p.Close() + if p != nil { + _ = p.Close() + } } atomic.StoreInt64(&h.count, 0) } + +// awaitPeerClose blocks on peer.Done() and yanks the index entry when +// the peer self-closes (ICE failed/disconnected). Idempotent with +// the Unsubscribe path: if Unsubscribe ran first the index is already +// empty and we just decrement the counter once on first arrival. +func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) { + <-peer.Done() + h.mu.Lock() + streamID := h.peerStream[resource] + _, present := h.peerStream[resource] + if present { + delete(h.peerStream, resource) + if streamID != "" { + delete(h.peersByStream[streamID], resource) + if len(h.peersByStream[streamID]) == 0 { + delete(h.peersByStream, streamID) + } + } + } + h.mu.Unlock() + if present { + atomic.AddInt64(&h.count, -1) + } +} + +// tearDownStreamPeers is the callback the Subsystem runs in its +// onProcessStop hook. It closes every peer subscribed to that +// stream (driving each one's Done() and indirectly awaitPeerClose). +func (h *Handler) tearDownStreamPeers(streamID string) { + h.mu.Lock() + bucket := h.peersByStream[streamID] + peers := make([]*corewebrtc.Peer, 0, len(bucket)) + for _, p := range bucket { + peers = append(peers, p) + } + h.mu.Unlock() + + for _, p := range peers { + if p != nil { + _ = p.Close() + } + } +} + +// addCORS emits the response headers a browser-side WHEP player +// expects. WHEP's Location and ETag headers must be exposed for +// fetch() to read them across origins. +func addCORS(c echo.Context) { + hh := c.Response().Header() + hh.Set("Access-Control-Allow-Origin", "*") + hh.Set("Access-Control-Allow-Methods", "POST, DELETE, PATCH, OPTIONS") + hh.Set("Access-Control-Allow-Headers", "Content-Type, Authorization, If-Match, If-None-Match") + hh.Set("Access-Control-Expose-Headers", "Location, ETag") +} + +// requireH264AndOpus does a coarse SDP scan to confirm the offer +// includes both an H.264 video rtpmap and an Opus audio rtpmap. The +// design treats codec mismatch as a 406, never a silent black frame. +// +// This is intentionally a string scan rather than a full SDP parse: +// every modern browser advertises H.264 and Opus by name, and a +// dependency on a real SDP parser for one validation step is +// disproportionate. M4 may swap this for pion/sdp.v3 when other +// surfaces also need parsing. +func requireH264AndOpus(sdp string) error { + lower := strings.ToLower(sdp) + hasH264 := strings.Contains(lower, "h264/90000") || strings.Contains(lower, " h264/") + hasOpus := strings.Contains(lower, "opus/48000") || strings.Contains(lower, " opus/") + if hasH264 && hasOpus { + return nil + } + missing := []string{} + if !hasH264 { + missing = append(missing, "H264") + } + if !hasOpus { + missing = append(missing, "Opus") + } + return &codecMismatchError{missing: missing} +} + +type codecMismatchError struct{ missing []string } + +func (e *codecMismatchError) Error() string { + return "webrtc: codec mismatch — offer is missing: " + strings.Join(e.missing, ", ") +} diff --git a/app/webrtc/handler_test.go b/app/webrtc/handler_test.go index 51e88d0..b36dfd6 100644 --- a/app/webrtc/handler_test.go +++ b/app/webrtc/handler_test.go @@ -68,9 +68,11 @@ func TestHandler_Subscribe_400OnEmptyBody(t *testing.T) { } } -// TestHandler_Unsubscribe_404WhenUnknown verifies a DELETE with an -// unknown resource id returns 404 and no state mutation. -func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) { +// TestHandler_Unsubscribe_204WhenUnknown verifies a DELETE with an +// unknown resource id returns 204 (idempotent), per the WHEP spec +// and the M2/M3 design's error matrix. Pre-M3 this returned 404; the +// updated semantics let clients re-issue DELETE without erroring. +func TestHandler_Unsubscribe_204WhenUnknown(t *testing.T) { h := NewHandler(newTestSubsystem(t), 0) e := echo.New() @@ -83,7 +85,7 @@ func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) { if err := h.Unsubscribe(c); err != nil { t.Fatalf("Unsubscribe returned error: %v", err) } - if rec.Code != http.StatusNotFound { - t.Fatalf("expected 404, got %d", rec.Code) + if rec.Code != http.StatusNoContent { + t.Fatalf("expected 204, got %d", rec.Code) } }