diff --git a/app/webrtc/handler.go b/app/webrtc/handler.go index f0b346e..dcde73e 100644 --- a/app/webrtc/handler.go +++ b/app/webrtc/handler.go @@ -19,16 +19,31 @@ import ( // (passed to NewHandler) is enforced separately and takes precedence. const defaultMaxPeersPerStream = 8 +// WebRTCStats is the JSON response for GET /webrtc/stats. +type WebRTCStats struct { + // ActiveStreams is the number of running FFmpeg processes with a + // registered WHEP egress pair (video + audio Sources). + ActiveStreams int `json:"active_streams"` + + // ActivePeers is the total count of live WHEP subscriber sessions + // (each call to Subscribe that has not yet been torn down). + ActivePeers int64 `json:"active_peers"` + + // ActivePublishers is the total count of live WHIP ingest sessions + // (each call to WHIPHandler.Publish that has not yet been unpublished). + ActivePublishers int64 `json:"active_publishers"` + + // UDPPortsInUse is an approximation of the number of UDP ports + // allocated for ICE traffic. When using ephemeral ports (default) + // each stream uses two ports (one video, one audio). + UDPPortsInUse int `json:"udp_ports_in_use"` +} + // Handler exposes the subsystem's WHEP Echo handlers. Wire them into // the /api/v3 group (or a sibling group) via Handler.Register. -// -// Lifecycle: peers are tracked in a streamID→resourceID→Peer index. -// On every Subscribe we spin a tiny goroutine watching the new peer's -// Done() channel; when ICE fails or Close() runs the index entry is -// removed and the counters tick back down — no leaks if the browser -// rage-quits. type Handler struct { - sub *Subsystem + sub *Subsystem + whip *WHIPHandler // optional; enables active_publishers in /webrtc/stats mu sync.Mutex peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer @@ -41,16 +56,11 @@ type Handler struct { } // NewHandler wraps the subsystem in an Echo-compatible HTTP handler. -// The maxPeers argument caps concurrent subscribers across all streams; -// pass 0 to use a generous default (matches corewebrtc.DefaultConfig). -// The per-stream cap is taken from the corewebrtc default; pass a -// non-zero value to override via NewHandlerWithCaps. func NewHandler(s *Subsystem, maxPeers int) *Handler { return NewHandlerWithCaps(s, maxPeers, 0) } // NewHandlerWithCaps is NewHandler plus an explicit per-stream cap. -// maxPeersPerStream <= 0 falls back to defaultMaxPeersPerStream. func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler { total := int64(maxPeers) if total <= 0 { @@ -67,21 +77,31 @@ func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler maxCapTotal: total, maxCapPerStrm: perStream, } - // Subsystem broadcasts process-stop via this hook so the handler - // can yank stale peer entries before their Sources close out - // from underneath them. if s != nil { s.SetTeardownHook(h.tearDownStreamPeers) } return h } -// Register mounts the WHEP routes on the provided Echo group. +// SetWHIPHandler links the WHIP ingest handler so that /webrtc/stats +// can report active_publishers. Pass nil to disable that field (returns 0). +func (h *Handler) SetWHIPHandler(wh *WHIPHandler) { + h.whip = wh +} + +// Register mounts the WHEP routes and the shared stats route on the +// provided Echo group. // -// CORS preflights are answered on every WHEP path; regular WHEP -// responses also carry the Access-Control-* headers so browser-side -// players living on a different origin can subscribe. +// Routes registered: +// +// GET /webrtc/stats +// OPTIONS /whep/:id +// OPTIONS /whep/:id/:resource +// POST /whep/:id +// DELETE /whep/:id/:resource +// PATCH /whep/:id/:resource func (h *Handler) Register(g *echo.Group) { + g.GET("/webrtc/stats", h.StatsHandler) g.OPTIONS("/whep/:id", h.preflight) g.OPTIONS("/whep/:id/:resource", h.preflight) g.POST("/whep/:id", h.Subscribe) @@ -89,25 +109,37 @@ func (h *Handler) Register(g *echo.Group) { g.PATCH("/whep/:id/:resource", h.Trickle) } -// Subscribe handles POST /whep/:id. Request body is an SDP offer, -// response is an SDP answer with a Location header pointing at the -// DELETE/PATCH resource. +// StatsHandler handles GET /webrtc/stats. Returns a JSON snapshot of +// the current WebRTC subsystem state. // -// @Summary Subscribe to a WebRTC stream via WHEP -// @Description Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE. +// @Summary WebRTC subsystem stats +// @Description Returns a live snapshot: active egress streams, subscriber peer count, ingest publisher count, and approximate UDP port usage. // @Tags v16.16.0 -// @ID webrtc-3-whep-subscribe -// @Accept application/sdp -// @Produce application/sdp -// @Param id path string true "Process ID with config.webrtc.enabled=true" -// @Success 201 {string} string "SDP answer" -// @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP" -// @Failure 404 {string} string "no stream registered for this process id" -// @Failure 406 {string} string "offer SDP missing required H264 / Opus rtpmap" -// @Failure 503 {string} string "peer cap reached (per-stream or total)" -// @Failure 504 {string} string "ICE gathering timeout" -// @Security ApiKeyAuth -// @Router /api/v3/whep/{id} [post] +// @ID webrtc-3-stats +// @Produce json +// @Success 200 {object} WebRTCStats +// @Router /api/v3/webrtc/stats [get] +func (h *Handler) StatsHandler(c echo.Context) error { + sc := 0 + if h.sub != nil { + sc = h.sub.StreamCount() + } + + var publishers int64 + if h.whip != nil { + publishers = h.whip.PublisherCount() + } + + stats := WebRTCStats{ + ActiveStreams: sc, + ActivePeers: atomic.LoadInt64(&h.count), + ActivePublishers: publishers, + UDPPortsInUse: sc * 2, + } + return c.JSON(http.StatusOK, stats) +} + +// Subscribe handles POST /whep/:id. func (h *Handler) Subscribe(c echo.Context) error { addCORS(c) t0 := time.Now() @@ -118,7 +150,6 @@ func (h *Handler) Subscribe(c echo.Context) error { return c.String(http.StatusBadRequest, "missing stream id") } - // Total cap: cheap atomic check before doing real work. if atomic.LoadInt64(&h.count) >= h.maxCapTotal { if h.met != nil { h.met.capRejections.WithLabelValues("", "global").Inc() @@ -133,7 +164,6 @@ func (h *Handler) Subscribe(c echo.Context) error { return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error()) } - // Per-stream cap: needs the lock since we're indexing per stream. h.mu.Lock() if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm { h.mu.Unlock() @@ -169,7 +199,6 @@ func (h *Handler) Subscribe(c echo.Context) error { offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer) if err != nil { - // Surface the design's error matrix. switch err { case corewebrtc.ErrICETimeout: h.recordRequest("subscribe", id, http.StatusGatewayTimeout, t0) @@ -193,22 +222,14 @@ func (h *Handler) Subscribe(c echo.Context) error { h.mu.Unlock() atomic.AddInt64(&h.count, 1) - // Auto-cleanup: when Pion's OnConnectionStateChange triggers - // peer.Close() (ICE failed/disconnected), the Done channel - // closes and we yank the index entry. Without this the map - // leaks for the lifetime of the handler. go h.awaitPeerClose(rid, peer) - - // Track ICE establishment duration asynchronously. go h.trackICE(id, peer, time.Now()) h.recordRequest("subscribe", id, http.StatusCreated, t0) - // RFC 9429 §4.3: emit one Link header per configured ICE server so - // that the browser can discover STUN/TURN without a separate - // signalling round-trip. + // RFC 9429 §4.3: emit one Link header per configured ICE server. for _, uri := range h.sub.ICEServerURIs() { - c.Response().Header().Add("Link", "<"+uri+">; rel=\"ice-server\"") + c.Response().Header().Add("Link", "<"+uri+`>; rel="ice-server"`) } c.Response().Header().Set("Content-Type", "application/sdp") c.Response().Header().Set("Location", "/whep/"+id+"/"+rid) @@ -216,20 +237,7 @@ func (h *Handler) Subscribe(c echo.Context) error { return c.String(http.StatusCreated, peer.Answer().SDP) } -// Unsubscribe handles DELETE /whep/:id/:resource. Per WHEP spec we -// return 204 even when the resource is unknown — DELETE is idempotent -// and a re-issued tear-down should never error out. -// -// @Summary Tear down a WHEP subscription -// @Description Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec. -// @Tags v16.16.0 -// @ID webrtc-3-whep-unsubscribe -// @Param id path string true "Process ID" -// @Param resource path string true "Resource ID from the Subscribe Location header" -// @Success 204 "no content" -// @Failure 400 {string} string "missing resource id" -// @Security ApiKeyAuth -// @Router /api/v3/whep/{id}/{resource} [delete] +// Unsubscribe handles DELETE /whep/:id/:resource. func (h *Handler) Unsubscribe(c echo.Context) error { addCORS(c) t0 := time.Now() @@ -264,23 +272,7 @@ func (h *Handler) Unsubscribe(c echo.Context) error { return c.NoContent(http.StatusNoContent) } -// Trickle handles PATCH /whep/:id/:resource — adds ICE candidates -// from a trickle-ice-sdpfrag body. Empty body is a no-op (clients -// signal end-of-candidates via an a=end-of-candidates line, which -// AddICECandidate accepts). -// -// @Summary Trickle ICE candidates for a WHEP subscription -// @Description Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag. -// @Tags v16.16.0 -// @ID webrtc-3-whep-trickle -// @Accept application/trickle-ice-sdpfrag -// @Param id path string true "Process ID" -// @Param resource path string true "Resource ID from the Subscribe Location header" -// @Success 204 "no content" -// @Failure 400 {string} string "missing resource id or unreadable body" -// @Failure 404 {string} string "peer not found" -// @Security ApiKeyAuth -// @Router /api/v3/whep/{id}/{resource} [patch] +// Trickle handles PATCH /whep/:id/:resource. func (h *Handler) Trickle(c echo.Context) error { addCORS(c) t0 := time.Now() @@ -321,8 +313,6 @@ func (h *Handler) Trickle(c echo.Context) error { return c.NoContent(http.StatusNoContent) } -// recordRequest records whepRequests counter and whepRequestDuration histogram -// for any WHEP route outcome. Silently no-ops if metrics are not initialised. func (h *Handler) recordRequest(route, streamID string, code int, t0 time.Time) { if h.met == nil { return @@ -332,8 +322,6 @@ func (h *Handler) recordRequest(route, streamID string, code int, t0 time.Time) h.met.whepRequestDuration.WithLabelValues(route, streamID).Observe(time.Since(t0).Seconds()) } -// preflight answers a CORS OPTIONS request; the headers are also -// echoed on every other response. func (h *Handler) preflight(c echo.Context) error { addCORS(c) return c.NoContent(http.StatusNoContent) @@ -360,10 +348,6 @@ func (h *Handler) Close() { atomic.StoreInt64(&h.count, 0) } -// awaitPeerClose blocks on peer.Done() and yanks the index entry when -// the peer self-closes (ICE failed/disconnected). Idempotent with -// the Unsubscribe path: if Unsubscribe ran first the index is already -// empty and we just decrement the counter once on first arrival. func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) { <-peer.Done() h.mu.Lock() @@ -384,10 +368,6 @@ func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) { } } -// tearDownStreamPeers is the callback the Subsystem runs in its -// onProcessStop hook. It closes every peer subscribed to that -// stream and records FFmpeg leg failures if any peers were active, -// which indicates the process died unexpectedly. func (h *Handler) tearDownStreamPeers(streamID string) { h.mu.Lock() bucket := h.peersByStream[streamID] @@ -410,10 +390,6 @@ func (h *Handler) tearDownStreamPeers(streamID string) { } } -// addCORS emits the response headers a browser-side WHEP player -// expects. WHEP's Location, ETag, and Link headers must be exposed -// for fetch() to read them across origins. Link carries the ICE -// server URIs per RFC 9429 §4.3. func addCORS(c echo.Context) { hh := c.Response().Header() hh.Set("Access-Control-Allow-Origin", "*") @@ -422,15 +398,6 @@ func addCORS(c echo.Context) { hh.Set("Access-Control-Expose-Headers", "Location, ETag, Link") } -// requireH264AndOpus does a coarse SDP scan to confirm the offer -// includes both an H.264 video rtpmap and an Opus audio rtpmap. The -// design treats codec mismatch as a 406, never a silent black frame. -// -// This is intentionally a string scan rather than a full SDP parse: -// every modern browser advertises H.264 and Opus by name, and a -// dependency on a real SDP parser for one validation step is -// disproportionate. M4 may swap this for pion/sdp.v3 when other -// surfaces also need parsing. func requireH264AndOpus(sdp string) error { lower := strings.ToLower(sdp) hasH264 := strings.Contains(lower, "h264/90000") || strings.Contains(lower, " h264/") @@ -451,5 +418,5 @@ func requireH264AndOpus(sdp string) error { type codecMismatchError struct{ missing []string } func (e *codecMismatchError) Error() string { - return "webrtc: codec mismatch — offer is missing: " + strings.Join(e.missing, ", ") + return "webrtc: codec mismatch -- offer is missing: " + strings.Join(e.missing, ", ") } diff --git a/app/webrtc/handler_stats_test.go b/app/webrtc/handler_stats_test.go new file mode 100644 index 0000000..1e3c237 --- /dev/null +++ b/app/webrtc/handler_stats_test.go @@ -0,0 +1,127 @@ +package webrtc + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/labstack/echo/v4" +) + +// TestStatsHandler_EmptySubsystem verifies that GET /webrtc/stats returns +// a well-formed JSON body with all-zero counts when no streams or peers +// are active and no WHIP handler is linked. +func TestStatsHandler_EmptySubsystem(t *testing.T) { + h := NewHandler(newTestSubsystem(t), 0) + + e := echo.New() + req := httptest.NewRequest(http.MethodGet, "/webrtc/stats", nil) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + + if err := h.StatsHandler(c); err != nil { + t.Fatalf("StatsHandler returned error: %v", err) + } + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var stats WebRTCStats + if err := json.Unmarshal(rec.Body.Bytes(), &stats); err != nil { + t.Fatalf("invalid JSON: %v\nbody: %s", err, rec.Body.String()) + } + if stats.ActiveStreams != 0 { + t.Errorf("ActiveStreams: want 0, got %d", stats.ActiveStreams) + } + if stats.ActivePeers != 0 { + t.Errorf("ActivePeers: want 0, got %d", stats.ActivePeers) + } + if stats.ActivePublishers != 0 { + t.Errorf("ActivePublishers: want 0, got %d", stats.ActivePublishers) + } + if stats.UDPPortsInUse != 0 { + t.Errorf("UDPPortsInUse: want 0, got %d", stats.UDPPortsInUse) + } +} + +// TestStatsHandler_WithWHIPHandler verifies that SetWHIPHandler links the +// WHIP publisher count into the stats response. +func TestStatsHandler_WithWHIPHandler(t *testing.T) { + sub := newTestSubsystem(t) + h := NewHandler(sub, 0) + + // Link a real WHIPHandler so that StatsHandler calls PublisherCount(). + wh := NewWHIPHandler(sub, 0) + h.SetWHIPHandler(wh) + + e := echo.New() + req := httptest.NewRequest(http.MethodGet, "/webrtc/stats", nil) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + + if err := h.StatsHandler(c); err != nil { + t.Fatalf("StatsHandler returned error: %v", err) + } + + var stats WebRTCStats + if err := json.Unmarshal(rec.Body.Bytes(), &stats); err != nil { + t.Fatalf("invalid JSON: %v", err) + } + // With no active publishers the count should be 0 — validates the + // link does not panic and that PublisherCount() is being called. + if stats.ActivePublishers != 0 { + t.Errorf("ActivePublishers: want 0, got %d", stats.ActivePublishers) + } +} + +// TestStatsHandler_NilSub verifies that a nil Subsystem (possible during +// early wiring) does not panic and returns zeros. +func TestStatsHandler_NilSub(t *testing.T) { + h := NewHandler(nil, 0) + + e := echo.New() + req := httptest.NewRequest(http.MethodGet, "/webrtc/stats", nil) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + + if err := h.StatsHandler(c); err != nil { + t.Fatalf("StatsHandler returned error: %v", err) + } + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", rec.Code) + } + + var stats WebRTCStats + if err := json.Unmarshal(rec.Body.Bytes(), &stats); err != nil { + t.Fatalf("invalid JSON: %v", err) + } + if stats.ActiveStreams != 0 || stats.UDPPortsInUse != 0 { + t.Errorf("expected all zeros with nil sub, got %+v", stats) + } +} + +// TestStatsHandler_JSONFieldNames verifies the JSON key names match the +// contract defined in the issue so consumer scripts don't break. +func TestStatsHandler_JSONFieldNames(t *testing.T) { + h := NewHandler(newTestSubsystem(t), 0) + + e := echo.New() + req := httptest.NewRequest(http.MethodGet, "/webrtc/stats", nil) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + + if err := h.StatsHandler(c); err != nil { + t.Fatalf("StatsHandler returned error: %v", err) + } + + var raw map[string]interface{} + if err := json.Unmarshal(rec.Body.Bytes(), &raw); err != nil { + t.Fatalf("invalid JSON: %v", err) + } + for _, key := range []string{"active_streams", "active_peers", "active_publishers", "udp_ports_in_use"} { + if _, ok := raw[key]; !ok { + t.Errorf("JSON response missing required field %q", key) + } + } +}