From 47a28bf9d4d06e0734944b5ee22f3737ad906efd Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Wed, 6 May 2026 15:58:26 -0400 Subject: [PATCH] feat(webrtc): instrument WHEP handler with Prometheus metrics --- app/webrtc/handler.go | 75 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 68 insertions(+), 7 deletions(-) diff --git a/app/webrtc/handler.go b/app/webrtc/handler.go index 3b1fe5c..2ca9305 100644 --- a/app/webrtc/handler.go +++ b/app/webrtc/handler.go @@ -1,11 +1,13 @@ package webrtc import ( + "fmt" "io" "net/http" "strings" "sync" "sync/atomic" + "time" "github.com/labstack/echo/v4" "github.com/pion/webrtc/v4" @@ -28,12 +30,14 @@ const defaultMaxPeersPerStream = 8 type Handler struct { sub *Subsystem - mu sync.Mutex - peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer - peerStream map[string]string // resource -> streamID (reverse index) - count int64 // atomic - maxCapTotal int64 - maxCapPerStrm int64 + mu sync.Mutex + peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer + peerStream map[string]string // resource -> streamID (reverse index) + count int64 // atomic + maxCapTotal int64 + maxCapPerStrm int64 + + met *webrtcMetrics } // NewHandler wraps the subsystem in an Echo-compatible HTTP handler. @@ -106,19 +110,26 @@ func (h *Handler) Register(g *echo.Group) { // @Router /api/v3/whep/{id} [post] func (h *Handler) Subscribe(c echo.Context) error { addCORS(c) + t0 := time.Now() id := c.Param("id") if id == "" { + h.recordRequest("subscribe", "", http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "missing stream id") } // Total cap: cheap atomic check before doing real work. if atomic.LoadInt64(&h.count) >= h.maxCapTotal { + if h.met != nil { + h.met.capRejections.WithLabelValues("", "global").Inc() + } + h.recordRequest("subscribe", id, http.StatusServiceUnavailable, t0) return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error()) } stream, ok := h.sub.lookup(id) if !ok { + h.recordRequest("subscribe", id, http.StatusNotFound, t0) return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error()) } @@ -126,18 +137,32 @@ func (h *Handler) Subscribe(c echo.Context) error { h.mu.Lock() if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm { h.mu.Unlock() + if h.met != nil { + h.met.capRejections.WithLabelValues(id, "stream").Inc() + } + h.recordRequest("subscribe", id, http.StatusServiceUnavailable, t0) return c.String(http.StatusServiceUnavailable, "webrtc: per-stream peer cap reached") } h.mu.Unlock() body, err := io.ReadAll(c.Request().Body) if err != nil { + h.recordRequest("subscribe", id, http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "read body: "+err.Error()) } if len(body) == 0 || !strings.HasPrefix(string(body), "v=") { + h.recordRequest("subscribe", id, http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error()) } if err := requireH264AndOpus(string(body)); err != nil { + if h.met != nil { + if cme, ok2 := err.(*codecMismatchError); ok2 { + for _, kind := range cme.missing { + h.met.codecMismatches.WithLabelValues(id, strings.ToLower(kind)).Inc() + } + } + } + h.recordRequest("subscribe", id, http.StatusNotAcceptable, t0) return c.String(http.StatusNotAcceptable, err.Error()) } @@ -147,10 +172,13 @@ func (h *Handler) Subscribe(c echo.Context) error { // Surface the design's error matrix. switch err { case corewebrtc.ErrICETimeout: + h.recordRequest("subscribe", id, http.StatusGatewayTimeout, t0) return c.String(http.StatusGatewayTimeout, err.Error()) case corewebrtc.ErrCodecMismatch: + h.recordRequest("subscribe", id, http.StatusNotAcceptable, t0) return c.String(http.StatusNotAcceptable, err.Error()) default: + h.recordRequest("subscribe", id, http.StatusInternalServerError, t0) return c.String(http.StatusInternalServerError, "create peer: "+err.Error()) } } @@ -171,6 +199,11 @@ func (h *Handler) Subscribe(c echo.Context) error { // leaks for the lifetime of the handler. go h.awaitPeerClose(rid, peer) + // Track ICE establishment duration asynchronously. + go h.trackICE(id, peer, time.Now()) + + h.recordRequest("subscribe", id, http.StatusCreated, t0) + c.Response().Header().Set("Content-Type", "application/sdp") c.Response().Header().Set("Location", "/whep/"+id+"/"+rid) c.Response().Header().Set("ETag", `"`+rid+`"`) @@ -193,9 +226,11 @@ func (h *Handler) Subscribe(c echo.Context) error { // @Router /api/v3/whep/{id}/{resource} [delete] func (h *Handler) Unsubscribe(c echo.Context) error { addCORS(c) + t0 := time.Now() resource := c.Param("resource") if resource == "" { + h.recordRequest("unsubscribe", "", http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "missing resource id") } @@ -218,6 +253,8 @@ func (h *Handler) Unsubscribe(c echo.Context) error { if streamID != "" { atomic.AddInt64(&h.count, -1) } + + h.recordRequest("unsubscribe", streamID, http.StatusNoContent, t0) return c.NoContent(http.StatusNoContent) } @@ -240,9 +277,11 @@ func (h *Handler) Unsubscribe(c echo.Context) error { // @Router /api/v3/whep/{id}/{resource} [patch] func (h *Handler) Trickle(c echo.Context) error { addCORS(c) + t0 := time.Now() resource := c.Param("resource") if resource == "" { + h.recordRequest("trickle", "", http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "missing resource id") } @@ -254,11 +293,13 @@ func (h *Handler) Trickle(c echo.Context) error { } h.mu.Unlock() if peer == nil { + h.recordRequest("trickle", streamID, http.StatusNotFound, t0) return c.NoContent(http.StatusNotFound) } body, err := io.ReadAll(c.Request().Body) if err != nil { + h.recordRequest("trickle", streamID, http.StatusBadRequest, t0) return c.String(http.StatusBadRequest, "read body: "+err.Error()) } for _, line := range strings.Split(string(body), "\n") { @@ -269,9 +310,22 @@ func (h *Handler) Trickle(c echo.Context) error { cand := strings.TrimPrefix(line, "a=") _ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand}) } + + h.recordRequest("trickle", streamID, http.StatusNoContent, t0) return c.NoContent(http.StatusNoContent) } +// recordRequest records whepRequests counter and whepRequestDuration histogram +// for any WHEP route outcome. Silently no-ops if metrics are not initialised. +func (h *Handler) recordRequest(route, streamID string, code int, t0 time.Time) { + if h.met == nil { + return + } + codeStr := fmt.Sprintf("%d", code) + h.met.whepRequests.WithLabelValues(route, codeStr, streamID).Inc() + h.met.whepRequestDuration.WithLabelValues(route, streamID).Observe(time.Since(t0).Seconds()) +} + // preflight answers a CORS OPTIONS request; the headers are also // echoed on every other response. func (h *Handler) preflight(c echo.Context) error { @@ -326,10 +380,12 @@ func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) { // tearDownStreamPeers is the callback the Subsystem runs in its // onProcessStop hook. It closes every peer subscribed to that -// stream (driving each one's Done() and indirectly awaitPeerClose). +// stream and records FFmpeg leg failures if any peers were active, +// which indicates the process died unexpectedly. func (h *Handler) tearDownStreamPeers(streamID string) { h.mu.Lock() bucket := h.peersByStream[streamID] + hadPeers := len(bucket) > 0 peers := make([]*corewebrtc.Peer, 0, len(bucket)) for _, p := range bucket { peers = append(peers, p) @@ -341,6 +397,11 @@ func (h *Handler) tearDownStreamPeers(streamID string) { _ = p.Close() } } + + if hadPeers && h.met != nil { + h.met.ffmpegLegFailures.WithLabelValues(streamID, "video").Inc() + h.met.ffmpegLegFailures.WithLabelValues(streamID, "audio").Inc() + } } // addCORS emits the response headers a browser-side WHEP player