datarhei-dragonfork-core/app/webrtc/handler.go
Zac Gaetano 6eaf346d06 Merge branch 'm3-robustness' into m2-webrtc-core-integration
Conflict resolution: keep M3's full handler.go rewrite (per-stream
index, error matrix, PATCH, CORS, auto-cleanup) and re-apply the
swagger annotations from #7 onto the new function declarations,
including a fresh annotation for the M3-introduced Trickle endpoint.
Swagger docs regenerated to pick up all three.

Race-clean: go test -race ./app/webrtc/... green.
2026-05-03 12:26:15 +00:00

387 lines
13 KiB
Go

package webrtc
import (
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"github.com/labstack/echo/v4"
"github.com/pion/webrtc/v4"
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
)
// Default per-stream peer cap when the caller passes 0. The total cap
// (passed to NewHandler) is enforced separately and takes precedence.
const defaultMaxPeersPerStream = 8
// Handler exposes the subsystem's WHEP Echo handlers. Wire them into
// the /api/v3 group (or a sibling group) via Handler.Register.
//
// Lifecycle: peers are tracked in a streamID→resourceID→Peer index.
// On every Subscribe we spin a tiny goroutine watching the new peer's
// Done() channel; when ICE fails or Close() runs the index entry is
// removed and the counters tick back down — no leaks if the browser
// rage-quits.
type Handler struct {
sub *Subsystem
mu sync.Mutex
peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer
peerStream map[string]string // resource -> streamID (reverse index)
count int64 // atomic
maxCapTotal int64
maxCapPerStrm int64
}
// NewHandler wraps the subsystem in an Echo-compatible HTTP handler.
// The maxPeers argument caps concurrent subscribers across all streams;
// pass 0 to use a generous default (matches corewebrtc.DefaultConfig).
// The per-stream cap is taken from the corewebrtc default; pass a
// non-zero value to override via NewHandlerWithCaps.
func NewHandler(s *Subsystem, maxPeers int) *Handler {
return NewHandlerWithCaps(s, maxPeers, 0)
}
// NewHandlerWithCaps is NewHandler plus an explicit per-stream cap.
// maxPeersPerStream <= 0 falls back to defaultMaxPeersPerStream.
func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler {
total := int64(maxPeers)
if total <= 0 {
total = int64(corewebrtc.DefaultConfig().MaxPeersTotal)
}
perStream := int64(maxPeersPerStream)
if perStream <= 0 {
perStream = defaultMaxPeersPerStream
}
h := &Handler{
sub: s,
peersByStream: make(map[string]map[string]*corewebrtc.Peer),
peerStream: make(map[string]string),
maxCapTotal: total,
maxCapPerStrm: perStream,
}
// Subsystem broadcasts process-stop via this hook so the handler
// can yank stale peer entries before their Sources close out
// from underneath them.
if s != nil {
s.SetTeardownHook(h.tearDownStreamPeers)
}
return h
}
// Register mounts the WHEP routes on the provided Echo group.
//
// CORS preflights are answered on every WHEP path; regular WHEP
// responses also carry the Access-Control-* headers so browser-side
// players living on a different origin can subscribe.
func (h *Handler) Register(g *echo.Group) {
g.OPTIONS("/whep/:id", h.preflight)
g.OPTIONS("/whep/:id/:resource", h.preflight)
g.POST("/whep/:id", h.Subscribe)
g.DELETE("/whep/:id/:resource", h.Unsubscribe)
g.PATCH("/whep/:id/:resource", h.Trickle)
}
// Subscribe handles POST /whep/:id. Request body is an SDP offer,
// response is an SDP answer with a Location header pointing at the
// DELETE/PATCH resource.
//
// @Summary Subscribe to a WebRTC stream via WHEP
// @Description Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE.
// @Tags v16.16.0
// @ID webrtc-3-whep-subscribe
// @Accept application/sdp
// @Produce application/sdp
// @Param id path string true "Process ID with config.webrtc.enabled=true"
// @Success 201 {string} string "SDP answer"
// @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP"
// @Failure 404 {string} string "no stream registered for this process id"
// @Failure 406 {string} string "offer SDP missing required H264 / Opus rtpmap"
// @Failure 503 {string} string "peer cap reached (per-stream or total)"
// @Failure 504 {string} string "ICE gathering timeout"
// @Security ApiKeyAuth
// @Router /api/v3/whep/{id} [post]
func (h *Handler) Subscribe(c echo.Context) error {
addCORS(c)
id := c.Param("id")
if id == "" {
return c.String(http.StatusBadRequest, "missing stream id")
}
// Total cap: cheap atomic check before doing real work.
if atomic.LoadInt64(&h.count) >= h.maxCapTotal {
return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error())
}
stream, ok := h.sub.lookup(id)
if !ok {
return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error())
}
// Per-stream cap: needs the lock since we're indexing per stream.
h.mu.Lock()
if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm {
h.mu.Unlock()
return c.String(http.StatusServiceUnavailable, "webrtc: per-stream peer cap reached")
}
h.mu.Unlock()
body, err := io.ReadAll(c.Request().Body)
if err != nil {
return c.String(http.StatusBadRequest, "read body: "+err.Error())
}
if len(body) == 0 || !strings.HasPrefix(string(body), "v=") {
return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error())
}
if err := requireH264AndOpus(string(body)); err != nil {
return c.String(http.StatusNotAcceptable, err.Error())
}
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer)
if err != nil {
// Surface the design's error matrix.
switch err {
case corewebrtc.ErrICETimeout:
return c.String(http.StatusGatewayTimeout, err.Error())
case corewebrtc.ErrCodecMismatch:
return c.String(http.StatusNotAcceptable, err.Error())
default:
return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
}
}
rid := peer.ResourceID()
h.mu.Lock()
if h.peersByStream[id] == nil {
h.peersByStream[id] = make(map[string]*corewebrtc.Peer)
}
h.peersByStream[id][rid] = peer
h.peerStream[rid] = id
h.mu.Unlock()
atomic.AddInt64(&h.count, 1)
// Auto-cleanup: when Pion's OnConnectionStateChange triggers
// peer.Close() (ICE failed/disconnected), the Done channel
// closes and we yank the index entry. Without this the map
// leaks for the lifetime of the handler.
go h.awaitPeerClose(rid, peer)
c.Response().Header().Set("Content-Type", "application/sdp")
c.Response().Header().Set("Location", "/whep/"+id+"/"+rid)
c.Response().Header().Set("ETag", `"`+rid+`"`)
return c.String(http.StatusCreated, peer.Answer().SDP)
}
// Unsubscribe handles DELETE /whep/:id/:resource. Per WHEP spec we
// return 204 even when the resource is unknown — DELETE is idempotent
// and a re-issued tear-down should never error out.
//
// @Summary Tear down a WHEP subscription
// @Description Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec.
// @Tags v16.16.0
// @ID webrtc-3-whep-unsubscribe
// @Param id path string true "Process ID"
// @Param resource path string true "Resource ID from the Subscribe Location header"
// @Success 204 "no content"
// @Failure 400 {string} string "missing resource id"
// @Security ApiKeyAuth
// @Router /api/v3/whep/{id}/{resource} [delete]
func (h *Handler) Unsubscribe(c echo.Context) error {
addCORS(c)
resource := c.Param("resource")
if resource == "" {
return c.String(http.StatusBadRequest, "missing resource id")
}
h.mu.Lock()
streamID := h.peerStream[resource]
var peer *corewebrtc.Peer
if streamID != "" {
peer = h.peersByStream[streamID][resource]
delete(h.peersByStream[streamID], resource)
if len(h.peersByStream[streamID]) == 0 {
delete(h.peersByStream, streamID)
}
delete(h.peerStream, resource)
}
h.mu.Unlock()
if peer != nil {
_ = peer.Close()
}
if streamID != "" {
atomic.AddInt64(&h.count, -1)
}
return c.NoContent(http.StatusNoContent)
}
// Trickle handles PATCH /whep/:id/:resource — adds ICE candidates
// from a trickle-ice-sdpfrag body. Empty body is a no-op (clients
// signal end-of-candidates via an a=end-of-candidates line, which
// AddICECandidate accepts).
//
// @Summary Trickle ICE candidates for a WHEP subscription
// @Description Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.
// @Tags v16.16.0
// @ID webrtc-3-whep-trickle
// @Accept application/trickle-ice-sdpfrag
// @Param id path string true "Process ID"
// @Param resource path string true "Resource ID from the Subscribe Location header"
// @Success 204 "no content"
// @Failure 400 {string} string "missing resource id or unreadable body"
// @Failure 404 {string} string "peer not found"
// @Security ApiKeyAuth
// @Router /api/v3/whep/{id}/{resource} [patch]
func (h *Handler) Trickle(c echo.Context) error {
addCORS(c)
resource := c.Param("resource")
if resource == "" {
return c.String(http.StatusBadRequest, "missing resource id")
}
h.mu.Lock()
streamID := h.peerStream[resource]
var peer *corewebrtc.Peer
if streamID != "" {
peer = h.peersByStream[streamID][resource]
}
h.mu.Unlock()
if peer == nil {
return c.NoContent(http.StatusNotFound)
}
body, err := io.ReadAll(c.Request().Body)
if err != nil {
return c.String(http.StatusBadRequest, "read body: "+err.Error())
}
for _, line := range strings.Split(string(body), "\n") {
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "a=candidate:") {
continue
}
cand := strings.TrimPrefix(line, "a=")
_ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand})
}
return c.NoContent(http.StatusNoContent)
}
// preflight answers a CORS OPTIONS request; the headers are also
// echoed on every other response.
func (h *Handler) preflight(c echo.Context) error {
addCORS(c)
return c.NoContent(http.StatusNoContent)
}
// Close tears down every active peer (e.g., during Core shutdown).
func (h *Handler) Close() {
h.mu.Lock()
peers := make([]*corewebrtc.Peer, 0)
for _, m := range h.peersByStream {
for _, p := range m {
peers = append(peers, p)
}
}
h.peersByStream = make(map[string]map[string]*corewebrtc.Peer)
h.peerStream = make(map[string]string)
h.mu.Unlock()
for _, p := range peers {
if p != nil {
_ = p.Close()
}
}
atomic.StoreInt64(&h.count, 0)
}
// awaitPeerClose blocks on peer.Done() and yanks the index entry when
// the peer self-closes (ICE failed/disconnected). Idempotent with
// the Unsubscribe path: if Unsubscribe ran first the index is already
// empty and we just decrement the counter once on first arrival.
func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) {
<-peer.Done()
h.mu.Lock()
streamID := h.peerStream[resource]
_, present := h.peerStream[resource]
if present {
delete(h.peerStream, resource)
if streamID != "" {
delete(h.peersByStream[streamID], resource)
if len(h.peersByStream[streamID]) == 0 {
delete(h.peersByStream, streamID)
}
}
}
h.mu.Unlock()
if present {
atomic.AddInt64(&h.count, -1)
}
}
// tearDownStreamPeers is the callback the Subsystem runs in its
// onProcessStop hook. It closes every peer subscribed to that
// stream (driving each one's Done() and indirectly awaitPeerClose).
func (h *Handler) tearDownStreamPeers(streamID string) {
h.mu.Lock()
bucket := h.peersByStream[streamID]
peers := make([]*corewebrtc.Peer, 0, len(bucket))
for _, p := range bucket {
peers = append(peers, p)
}
h.mu.Unlock()
for _, p := range peers {
if p != nil {
_ = p.Close()
}
}
}
// addCORS emits the response headers a browser-side WHEP player
// expects. WHEP's Location and ETag headers must be exposed for
// fetch() to read them across origins.
func addCORS(c echo.Context) {
hh := c.Response().Header()
hh.Set("Access-Control-Allow-Origin", "*")
hh.Set("Access-Control-Allow-Methods", "POST, DELETE, PATCH, OPTIONS")
hh.Set("Access-Control-Allow-Headers", "Content-Type, Authorization, If-Match, If-None-Match")
hh.Set("Access-Control-Expose-Headers", "Location, ETag")
}
// requireH264AndOpus does a coarse SDP scan to confirm the offer
// includes both an H.264 video rtpmap and an Opus audio rtpmap. The
// design treats codec mismatch as a 406, never a silent black frame.
//
// This is intentionally a string scan rather than a full SDP parse:
// every modern browser advertises H.264 and Opus by name, and a
// dependency on a real SDP parser for one validation step is
// disproportionate. M4 may swap this for pion/sdp.v3 when other
// surfaces also need parsing.
func requireH264AndOpus(sdp string) error {
lower := strings.ToLower(sdp)
hasH264 := strings.Contains(lower, "h264/90000") || strings.Contains(lower, " h264/")
hasOpus := strings.Contains(lower, "opus/48000") || strings.Contains(lower, " opus/")
if hasH264 && hasOpus {
return nil
}
missing := []string{}
if !hasH264 {
missing = append(missing, "H264")
}
if !hasOpus {
missing = append(missing, "Opus")
}
return &codecMismatchError{missing: missing}
}
type codecMismatchError struct{ missing []string }
func (e *codecMismatchError) Error() string {
return "webrtc: codec mismatch — offer is missing: " + strings.Join(e.missing, ", ")
}