webrtc: add GET /webrtc/stats endpoint and SetWHIPHandler (issue #24)
Some checks failed
ci / vet + build (push) Failing after 5m7s
ci / race tests (push) Has been skipped
ci / WebRTC smoke (5-viewer fanout) (push) Has been skipped
ci / WebRTC latency p95 gate (push) Has been skipped

This commit is contained in:
Zac Gaetano 2026-05-10 21:28:24 -04:00
parent b3e667c835
commit 8557a1c65e
2 changed files with 200 additions and 106 deletions

View file

@ -19,16 +19,31 @@ import (
// (passed to NewHandler) is enforced separately and takes precedence. // (passed to NewHandler) is enforced separately and takes precedence.
const defaultMaxPeersPerStream = 8 const defaultMaxPeersPerStream = 8
// WebRTCStats is the JSON response for GET /webrtc/stats.
type WebRTCStats struct {
// ActiveStreams is the number of running FFmpeg processes with a
// registered WHEP egress pair (video + audio Sources).
ActiveStreams int `json:"active_streams"`
// ActivePeers is the total count of live WHEP subscriber sessions
// (each call to Subscribe that has not yet been torn down).
ActivePeers int64 `json:"active_peers"`
// ActivePublishers is the total count of live WHIP ingest sessions
// (each call to WHIPHandler.Publish that has not yet been unpublished).
ActivePublishers int64 `json:"active_publishers"`
// UDPPortsInUse is an approximation of the number of UDP ports
// allocated for ICE traffic. When using ephemeral ports (default)
// each stream uses two ports (one video, one audio).
UDPPortsInUse int `json:"udp_ports_in_use"`
}
// Handler exposes the subsystem's WHEP Echo handlers. Wire them into // Handler exposes the subsystem's WHEP Echo handlers. Wire them into
// the /api/v3 group (or a sibling group) via Handler.Register. // the /api/v3 group (or a sibling group) via Handler.Register.
//
// Lifecycle: peers are tracked in a streamID→resourceID→Peer index.
// On every Subscribe we spin a tiny goroutine watching the new peer's
// Done() channel; when ICE fails or Close() runs the index entry is
// removed and the counters tick back down — no leaks if the browser
// rage-quits.
type Handler struct { type Handler struct {
sub *Subsystem sub *Subsystem
whip *WHIPHandler // optional; enables active_publishers in /webrtc/stats
mu sync.Mutex mu sync.Mutex
peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer
@ -41,16 +56,11 @@ type Handler struct {
} }
// NewHandler wraps the subsystem in an Echo-compatible HTTP handler. // NewHandler wraps the subsystem in an Echo-compatible HTTP handler.
// The maxPeers argument caps concurrent subscribers across all streams;
// pass 0 to use a generous default (matches corewebrtc.DefaultConfig).
// The per-stream cap is taken from the corewebrtc default; pass a
// non-zero value to override via NewHandlerWithCaps.
func NewHandler(s *Subsystem, maxPeers int) *Handler { func NewHandler(s *Subsystem, maxPeers int) *Handler {
return NewHandlerWithCaps(s, maxPeers, 0) return NewHandlerWithCaps(s, maxPeers, 0)
} }
// NewHandlerWithCaps is NewHandler plus an explicit per-stream cap. // NewHandlerWithCaps is NewHandler plus an explicit per-stream cap.
// maxPeersPerStream <= 0 falls back to defaultMaxPeersPerStream.
func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler { func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler {
total := int64(maxPeers) total := int64(maxPeers)
if total <= 0 { if total <= 0 {
@ -67,21 +77,31 @@ func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler
maxCapTotal: total, maxCapTotal: total,
maxCapPerStrm: perStream, maxCapPerStrm: perStream,
} }
// Subsystem broadcasts process-stop via this hook so the handler
// can yank stale peer entries before their Sources close out
// from underneath them.
if s != nil { if s != nil {
s.SetTeardownHook(h.tearDownStreamPeers) s.SetTeardownHook(h.tearDownStreamPeers)
} }
return h return h
} }
// Register mounts the WHEP routes on the provided Echo group. // SetWHIPHandler links the WHIP ingest handler so that /webrtc/stats
// can report active_publishers. Pass nil to disable that field (returns 0).
func (h *Handler) SetWHIPHandler(wh *WHIPHandler) {
h.whip = wh
}
// Register mounts the WHEP routes and the shared stats route on the
// provided Echo group.
// //
// CORS preflights are answered on every WHEP path; regular WHEP // Routes registered:
// responses also carry the Access-Control-* headers so browser-side //
// players living on a different origin can subscribe. // GET /webrtc/stats
// OPTIONS /whep/:id
// OPTIONS /whep/:id/:resource
// POST /whep/:id
// DELETE /whep/:id/:resource
// PATCH /whep/:id/:resource
func (h *Handler) Register(g *echo.Group) { func (h *Handler) Register(g *echo.Group) {
g.GET("/webrtc/stats", h.StatsHandler)
g.OPTIONS("/whep/:id", h.preflight) g.OPTIONS("/whep/:id", h.preflight)
g.OPTIONS("/whep/:id/:resource", h.preflight) g.OPTIONS("/whep/:id/:resource", h.preflight)
g.POST("/whep/:id", h.Subscribe) g.POST("/whep/:id", h.Subscribe)
@ -89,25 +109,37 @@ func (h *Handler) Register(g *echo.Group) {
g.PATCH("/whep/:id/:resource", h.Trickle) g.PATCH("/whep/:id/:resource", h.Trickle)
} }
// Subscribe handles POST /whep/:id. Request body is an SDP offer, // StatsHandler handles GET /webrtc/stats. Returns a JSON snapshot of
// response is an SDP answer with a Location header pointing at the // the current WebRTC subsystem state.
// DELETE/PATCH resource.
// //
// @Summary Subscribe to a WebRTC stream via WHEP // @Summary WebRTC subsystem stats
// @Description Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE. // @Description Returns a live snapshot: active egress streams, subscriber peer count, ingest publisher count, and approximate UDP port usage.
// @Tags v16.16.0 // @Tags v16.16.0
// @ID webrtc-3-whep-subscribe // @ID webrtc-3-stats
// @Accept application/sdp // @Produce json
// @Produce application/sdp // @Success 200 {object} WebRTCStats
// @Param id path string true "Process ID with config.webrtc.enabled=true" // @Router /api/v3/webrtc/stats [get]
// @Success 201 {string} string "SDP answer" func (h *Handler) StatsHandler(c echo.Context) error {
// @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP" sc := 0
// @Failure 404 {string} string "no stream registered for this process id" if h.sub != nil {
// @Failure 406 {string} string "offer SDP missing required H264 / Opus rtpmap" sc = h.sub.StreamCount()
// @Failure 503 {string} string "peer cap reached (per-stream or total)" }
// @Failure 504 {string} string "ICE gathering timeout"
// @Security ApiKeyAuth var publishers int64
// @Router /api/v3/whep/{id} [post] if h.whip != nil {
publishers = h.whip.PublisherCount()
}
stats := WebRTCStats{
ActiveStreams: sc,
ActivePeers: atomic.LoadInt64(&h.count),
ActivePublishers: publishers,
UDPPortsInUse: sc * 2,
}
return c.JSON(http.StatusOK, stats)
}
// Subscribe handles POST /whep/:id.
func (h *Handler) Subscribe(c echo.Context) error { func (h *Handler) Subscribe(c echo.Context) error {
addCORS(c) addCORS(c)
t0 := time.Now() t0 := time.Now()
@ -118,7 +150,6 @@ func (h *Handler) Subscribe(c echo.Context) error {
return c.String(http.StatusBadRequest, "missing stream id") return c.String(http.StatusBadRequest, "missing stream id")
} }
// Total cap: cheap atomic check before doing real work.
if atomic.LoadInt64(&h.count) >= h.maxCapTotal { if atomic.LoadInt64(&h.count) >= h.maxCapTotal {
if h.met != nil { if h.met != nil {
h.met.capRejections.WithLabelValues("", "global").Inc() h.met.capRejections.WithLabelValues("", "global").Inc()
@ -133,7 +164,6 @@ func (h *Handler) Subscribe(c echo.Context) error {
return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error()) return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error())
} }
// Per-stream cap: needs the lock since we're indexing per stream.
h.mu.Lock() h.mu.Lock()
if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm { if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm {
h.mu.Unlock() h.mu.Unlock()
@ -169,7 +199,6 @@ func (h *Handler) Subscribe(c echo.Context) error {
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer) peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer)
if err != nil { if err != nil {
// Surface the design's error matrix.
switch err { switch err {
case corewebrtc.ErrICETimeout: case corewebrtc.ErrICETimeout:
h.recordRequest("subscribe", id, http.StatusGatewayTimeout, t0) h.recordRequest("subscribe", id, http.StatusGatewayTimeout, t0)
@ -193,22 +222,14 @@ func (h *Handler) Subscribe(c echo.Context) error {
h.mu.Unlock() h.mu.Unlock()
atomic.AddInt64(&h.count, 1) atomic.AddInt64(&h.count, 1)
// Auto-cleanup: when Pion's OnConnectionStateChange triggers
// peer.Close() (ICE failed/disconnected), the Done channel
// closes and we yank the index entry. Without this the map
// leaks for the lifetime of the handler.
go h.awaitPeerClose(rid, peer) go h.awaitPeerClose(rid, peer)
// Track ICE establishment duration asynchronously.
go h.trackICE(id, peer, time.Now()) go h.trackICE(id, peer, time.Now())
h.recordRequest("subscribe", id, http.StatusCreated, t0) h.recordRequest("subscribe", id, http.StatusCreated, t0)
// RFC 9429 §4.3: emit one Link header per configured ICE server so // RFC 9429 §4.3: emit one Link header per configured ICE server.
// that the browser can discover STUN/TURN without a separate
// signalling round-trip.
for _, uri := range h.sub.ICEServerURIs() { for _, uri := range h.sub.ICEServerURIs() {
c.Response().Header().Add("Link", "<"+uri+">; rel=\"ice-server\"") c.Response().Header().Add("Link", "<"+uri+`>; rel="ice-server"`)
} }
c.Response().Header().Set("Content-Type", "application/sdp") c.Response().Header().Set("Content-Type", "application/sdp")
c.Response().Header().Set("Location", "/whep/"+id+"/"+rid) c.Response().Header().Set("Location", "/whep/"+id+"/"+rid)
@ -216,20 +237,7 @@ func (h *Handler) Subscribe(c echo.Context) error {
return c.String(http.StatusCreated, peer.Answer().SDP) return c.String(http.StatusCreated, peer.Answer().SDP)
} }
// Unsubscribe handles DELETE /whep/:id/:resource. Per WHEP spec we // Unsubscribe handles DELETE /whep/:id/:resource.
// return 204 even when the resource is unknown — DELETE is idempotent
// and a re-issued tear-down should never error out.
//
// @Summary Tear down a WHEP subscription
// @Description Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec.
// @Tags v16.16.0
// @ID webrtc-3-whep-unsubscribe
// @Param id path string true "Process ID"
// @Param resource path string true "Resource ID from the Subscribe Location header"
// @Success 204 "no content"
// @Failure 400 {string} string "missing resource id"
// @Security ApiKeyAuth
// @Router /api/v3/whep/{id}/{resource} [delete]
func (h *Handler) Unsubscribe(c echo.Context) error { func (h *Handler) Unsubscribe(c echo.Context) error {
addCORS(c) addCORS(c)
t0 := time.Now() t0 := time.Now()
@ -264,23 +272,7 @@ func (h *Handler) Unsubscribe(c echo.Context) error {
return c.NoContent(http.StatusNoContent) return c.NoContent(http.StatusNoContent)
} }
// Trickle handles PATCH /whep/:id/:resource — adds ICE candidates // Trickle handles PATCH /whep/:id/:resource.
// from a trickle-ice-sdpfrag body. Empty body is a no-op (clients
// signal end-of-candidates via an a=end-of-candidates line, which
// AddICECandidate accepts).
//
// @Summary Trickle ICE candidates for a WHEP subscription
// @Description Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.
// @Tags v16.16.0
// @ID webrtc-3-whep-trickle
// @Accept application/trickle-ice-sdpfrag
// @Param id path string true "Process ID"
// @Param resource path string true "Resource ID from the Subscribe Location header"
// @Success 204 "no content"
// @Failure 400 {string} string "missing resource id or unreadable body"
// @Failure 404 {string} string "peer not found"
// @Security ApiKeyAuth
// @Router /api/v3/whep/{id}/{resource} [patch]
func (h *Handler) Trickle(c echo.Context) error { func (h *Handler) Trickle(c echo.Context) error {
addCORS(c) addCORS(c)
t0 := time.Now() t0 := time.Now()
@ -321,8 +313,6 @@ func (h *Handler) Trickle(c echo.Context) error {
return c.NoContent(http.StatusNoContent) return c.NoContent(http.StatusNoContent)
} }
// recordRequest records whepRequests counter and whepRequestDuration histogram
// for any WHEP route outcome. Silently no-ops if metrics are not initialised.
func (h *Handler) recordRequest(route, streamID string, code int, t0 time.Time) { func (h *Handler) recordRequest(route, streamID string, code int, t0 time.Time) {
if h.met == nil { if h.met == nil {
return return
@ -332,8 +322,6 @@ func (h *Handler) recordRequest(route, streamID string, code int, t0 time.Time)
h.met.whepRequestDuration.WithLabelValues(route, streamID).Observe(time.Since(t0).Seconds()) h.met.whepRequestDuration.WithLabelValues(route, streamID).Observe(time.Since(t0).Seconds())
} }
// preflight answers a CORS OPTIONS request; the headers are also
// echoed on every other response.
func (h *Handler) preflight(c echo.Context) error { func (h *Handler) preflight(c echo.Context) error {
addCORS(c) addCORS(c)
return c.NoContent(http.StatusNoContent) return c.NoContent(http.StatusNoContent)
@ -360,10 +348,6 @@ func (h *Handler) Close() {
atomic.StoreInt64(&h.count, 0) atomic.StoreInt64(&h.count, 0)
} }
// awaitPeerClose blocks on peer.Done() and yanks the index entry when
// the peer self-closes (ICE failed/disconnected). Idempotent with
// the Unsubscribe path: if Unsubscribe ran first the index is already
// empty and we just decrement the counter once on first arrival.
func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) { func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) {
<-peer.Done() <-peer.Done()
h.mu.Lock() h.mu.Lock()
@ -384,10 +368,6 @@ func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) {
} }
} }
// tearDownStreamPeers is the callback the Subsystem runs in its
// onProcessStop hook. It closes every peer subscribed to that
// stream and records FFmpeg leg failures if any peers were active,
// which indicates the process died unexpectedly.
func (h *Handler) tearDownStreamPeers(streamID string) { func (h *Handler) tearDownStreamPeers(streamID string) {
h.mu.Lock() h.mu.Lock()
bucket := h.peersByStream[streamID] bucket := h.peersByStream[streamID]
@ -410,10 +390,6 @@ func (h *Handler) tearDownStreamPeers(streamID string) {
} }
} }
// addCORS emits the response headers a browser-side WHEP player
// expects. WHEP's Location, ETag, and Link headers must be exposed
// for fetch() to read them across origins. Link carries the ICE
// server URIs per RFC 9429 §4.3.
func addCORS(c echo.Context) { func addCORS(c echo.Context) {
hh := c.Response().Header() hh := c.Response().Header()
hh.Set("Access-Control-Allow-Origin", "*") hh.Set("Access-Control-Allow-Origin", "*")
@ -422,15 +398,6 @@ func addCORS(c echo.Context) {
hh.Set("Access-Control-Expose-Headers", "Location, ETag, Link") hh.Set("Access-Control-Expose-Headers", "Location, ETag, Link")
} }
// requireH264AndOpus does a coarse SDP scan to confirm the offer
// includes both an H.264 video rtpmap and an Opus audio rtpmap. The
// design treats codec mismatch as a 406, never a silent black frame.
//
// This is intentionally a string scan rather than a full SDP parse:
// every modern browser advertises H.264 and Opus by name, and a
// dependency on a real SDP parser for one validation step is
// disproportionate. M4 may swap this for pion/sdp.v3 when other
// surfaces also need parsing.
func requireH264AndOpus(sdp string) error { func requireH264AndOpus(sdp string) error {
lower := strings.ToLower(sdp) lower := strings.ToLower(sdp)
hasH264 := strings.Contains(lower, "h264/90000") || strings.Contains(lower, " h264/") hasH264 := strings.Contains(lower, "h264/90000") || strings.Contains(lower, " h264/")
@ -451,5 +418,5 @@ func requireH264AndOpus(sdp string) error {
type codecMismatchError struct{ missing []string } type codecMismatchError struct{ missing []string }
func (e *codecMismatchError) Error() string { func (e *codecMismatchError) Error() string {
return "webrtc: codec mismatch offer is missing: " + strings.Join(e.missing, ", ") return "webrtc: codec mismatch -- offer is missing: " + strings.Join(e.missing, ", ")
} }

View file

@ -0,0 +1,127 @@
package webrtc
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/labstack/echo/v4"
)
// TestStatsHandler_EmptySubsystem verifies that GET /webrtc/stats returns
// a well-formed JSON body with all-zero counts when no streams or peers
// are active and no WHIP handler is linked.
func TestStatsHandler_EmptySubsystem(t *testing.T) {
h := NewHandler(newTestSubsystem(t), 0)
e := echo.New()
req := httptest.NewRequest(http.MethodGet, "/webrtc/stats", nil)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
if err := h.StatsHandler(c); err != nil {
t.Fatalf("StatsHandler returned error: %v", err)
}
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String())
}
var stats WebRTCStats
if err := json.Unmarshal(rec.Body.Bytes(), &stats); err != nil {
t.Fatalf("invalid JSON: %v\nbody: %s", err, rec.Body.String())
}
if stats.ActiveStreams != 0 {
t.Errorf("ActiveStreams: want 0, got %d", stats.ActiveStreams)
}
if stats.ActivePeers != 0 {
t.Errorf("ActivePeers: want 0, got %d", stats.ActivePeers)
}
if stats.ActivePublishers != 0 {
t.Errorf("ActivePublishers: want 0, got %d", stats.ActivePublishers)
}
if stats.UDPPortsInUse != 0 {
t.Errorf("UDPPortsInUse: want 0, got %d", stats.UDPPortsInUse)
}
}
// TestStatsHandler_WithWHIPHandler verifies that SetWHIPHandler links the
// WHIP publisher count into the stats response.
func TestStatsHandler_WithWHIPHandler(t *testing.T) {
sub := newTestSubsystem(t)
h := NewHandler(sub, 0)
// Link a real WHIPHandler so that StatsHandler calls PublisherCount().
wh := NewWHIPHandler(sub, 0)
h.SetWHIPHandler(wh)
e := echo.New()
req := httptest.NewRequest(http.MethodGet, "/webrtc/stats", nil)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
if err := h.StatsHandler(c); err != nil {
t.Fatalf("StatsHandler returned error: %v", err)
}
var stats WebRTCStats
if err := json.Unmarshal(rec.Body.Bytes(), &stats); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
// With no active publishers the count should be 0 — validates the
// link does not panic and that PublisherCount() is being called.
if stats.ActivePublishers != 0 {
t.Errorf("ActivePublishers: want 0, got %d", stats.ActivePublishers)
}
}
// TestStatsHandler_NilSub verifies that a nil Subsystem (possible during
// early wiring) does not panic and returns zeros.
func TestStatsHandler_NilSub(t *testing.T) {
h := NewHandler(nil, 0)
e := echo.New()
req := httptest.NewRequest(http.MethodGet, "/webrtc/stats", nil)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
if err := h.StatsHandler(c); err != nil {
t.Fatalf("StatsHandler returned error: %v", err)
}
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", rec.Code)
}
var stats WebRTCStats
if err := json.Unmarshal(rec.Body.Bytes(), &stats); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
if stats.ActiveStreams != 0 || stats.UDPPortsInUse != 0 {
t.Errorf("expected all zeros with nil sub, got %+v", stats)
}
}
// TestStatsHandler_JSONFieldNames verifies the JSON key names match the
// contract defined in the issue so consumer scripts don't break.
func TestStatsHandler_JSONFieldNames(t *testing.T) {
h := NewHandler(newTestSubsystem(t), 0)
e := echo.New()
req := httptest.NewRequest(http.MethodGet, "/webrtc/stats", nil)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
if err := h.StatsHandler(c); err != nil {
t.Fatalf("StatsHandler returned error: %v", err)
}
var raw map[string]interface{}
if err := json.Unmarshal(rec.Body.Bytes(), &raw); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
for _, key := range []string{"active_streams", "active_peers", "active_publishers", "udp_ports_in_use"} {
if _, ok := raw[key]; !ok {
t.Errorf("JSON response missing required field %q", key)
}
}
}