package webrtc import ( "io" "net/http" "strings" "sync" "sync/atomic" "github.com/labstack/echo/v4" "github.com/pion/webrtc/v4" corewebrtc "github.com/datarhei/core/v16/core/webrtc" ) // Handler exposes the subsystem's WHEP Echo handlers. Wire them into // the /api/v3 group (or a sibling group) via Handler.Register. type Handler struct { sub *Subsystem peersMu sync.Mutex peers map[string]*corewebrtc.Peer // resourceID -> peer count int64 // atomic, for cap check without lock maxCap int64 } // NewHandler wraps the subsystem in an Echo-compatible HTTP handler. // The maxPeers argument caps concurrent subscribers; pass 0 to use a // generous default (matches corewebrtc.DefaultConfig). func NewHandler(s *Subsystem, maxPeers int) *Handler { cap := int64(maxPeers) if cap <= 0 { cap = int64(corewebrtc.DefaultConfig().MaxPeersTotal) } return &Handler{ sub: s, peers: make(map[string]*corewebrtc.Peer), maxCap: cap, } } // Register mounts the WHEP routes on the provided Echo group. WHEP // POST is /whep/:id, WHEP DELETE is /whep/:id/:resource. // // The routes are deliberately unauthenticated in M2 because WHEP // clients (browsers, OBS) don't carry the Core JWT. M3 will add // per-process signed-URL tokens; for M2 the deployment is expected // to put the endpoint behind an authenticated reverse-proxy or VPN. func (h *Handler) Register(g *echo.Group) { g.POST("/whep/:id", h.Subscribe) g.DELETE("/whep/:id/:resource", h.Unsubscribe) } // Subscribe handles POST /whep/:id. Request body is an SDP offer, // response is an SDP answer with a Location header pointing at the // DELETE resource. func (h *Handler) Subscribe(c echo.Context) error { id := c.Param("id") if id == "" { return c.String(http.StatusBadRequest, "missing stream id") } if atomic.LoadInt64(&h.count) >= h.maxCap { return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error()) } stream, ok := h.sub.lookup(id) if !ok { return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error()) } body, err := io.ReadAll(c.Request().Body) if err != nil { return c.String(http.StatusBadRequest, "read body: "+err.Error()) } if len(body) == 0 || !strings.HasPrefix(string(body), "v=") { return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error()) } offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer) if err != nil { return c.String(http.StatusInternalServerError, "create peer: "+err.Error()) } h.peersMu.Lock() h.peers[peer.ResourceID()] = peer h.peersMu.Unlock() atomic.AddInt64(&h.count, 1) c.Response().Header().Set("Content-Type", "application/sdp") c.Response().Header().Set("Location", "/whep/"+id+"/"+peer.ResourceID()) return c.String(http.StatusCreated, peer.Answer().SDP) } // Unsubscribe handles DELETE /whep/:id/:resource. The :id is part of // the path per WHEP spec but we only need :resource to locate the // peer; :id is accepted for route symmetry. func (h *Handler) Unsubscribe(c echo.Context) error { resource := c.Param("resource") if resource == "" { return c.String(http.StatusBadRequest, "missing resource id") } h.peersMu.Lock() peer, ok := h.peers[resource] if ok { delete(h.peers, resource) } h.peersMu.Unlock() if !ok { return c.NoContent(http.StatusNotFound) } _ = peer.Close() atomic.AddInt64(&h.count, -1) return c.NoContent(http.StatusNoContent) } // Close tears down every active peer (e.g., during Core shutdown). func (h *Handler) Close() { h.peersMu.Lock() peers := make([]*corewebrtc.Peer, 0, len(h.peers)) for _, p := range h.peers { peers = append(peers, p) } h.peers = make(map[string]*corewebrtc.Peer) h.peersMu.Unlock() for _, p := range peers { _ = p.Close() } atomic.StoreInt64(&h.count, 0) }