feat(webrtc): instrument WHEP handler with Prometheus metrics
Some checks failed
ci / race tests (push) Blocked by required conditions
ci / WebRTC smoke (5-viewer fanout) (push) Blocked by required conditions
ci / WebRTC latency p95 gate (push) Blocked by required conditions
ci / vet + build (push) Has been cancelled

This commit is contained in:
Zac Gaetano 2026-05-06 15:58:26 -04:00
parent 1d7cd5b520
commit 47a28bf9d4

View file

@ -1,11 +1,13 @@
package webrtc package webrtc
import ( import (
"fmt"
"io" "io"
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
"github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4"
@ -28,12 +30,14 @@ const defaultMaxPeersPerStream = 8
type Handler struct { type Handler struct {
sub *Subsystem sub *Subsystem
mu sync.Mutex mu sync.Mutex
peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer
peerStream map[string]string // resource -> streamID (reverse index) peerStream map[string]string // resource -> streamID (reverse index)
count int64 // atomic count int64 // atomic
maxCapTotal int64 maxCapTotal int64
maxCapPerStrm int64 maxCapPerStrm int64
met *webrtcMetrics
} }
// NewHandler wraps the subsystem in an Echo-compatible HTTP handler. // NewHandler wraps the subsystem in an Echo-compatible HTTP handler.
@ -106,19 +110,26 @@ func (h *Handler) Register(g *echo.Group) {
// @Router /api/v3/whep/{id} [post] // @Router /api/v3/whep/{id} [post]
func (h *Handler) Subscribe(c echo.Context) error { func (h *Handler) Subscribe(c echo.Context) error {
addCORS(c) addCORS(c)
t0 := time.Now()
id := c.Param("id") id := c.Param("id")
if id == "" { if id == "" {
h.recordRequest("subscribe", "", http.StatusBadRequest, t0)
return c.String(http.StatusBadRequest, "missing stream id") return c.String(http.StatusBadRequest, "missing stream id")
} }
// Total cap: cheap atomic check before doing real work. // Total cap: cheap atomic check before doing real work.
if atomic.LoadInt64(&h.count) >= h.maxCapTotal { if atomic.LoadInt64(&h.count) >= h.maxCapTotal {
if h.met != nil {
h.met.capRejections.WithLabelValues("", "global").Inc()
}
h.recordRequest("subscribe", id, http.StatusServiceUnavailable, t0)
return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error()) return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error())
} }
stream, ok := h.sub.lookup(id) stream, ok := h.sub.lookup(id)
if !ok { if !ok {
h.recordRequest("subscribe", id, http.StatusNotFound, t0)
return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error()) return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error())
} }
@ -126,18 +137,32 @@ func (h *Handler) Subscribe(c echo.Context) error {
h.mu.Lock() h.mu.Lock()
if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm { if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm {
h.mu.Unlock() h.mu.Unlock()
if h.met != nil {
h.met.capRejections.WithLabelValues(id, "stream").Inc()
}
h.recordRequest("subscribe", id, http.StatusServiceUnavailable, t0)
return c.String(http.StatusServiceUnavailable, "webrtc: per-stream peer cap reached") return c.String(http.StatusServiceUnavailable, "webrtc: per-stream peer cap reached")
} }
h.mu.Unlock() h.mu.Unlock()
body, err := io.ReadAll(c.Request().Body) body, err := io.ReadAll(c.Request().Body)
if err != nil { if err != nil {
h.recordRequest("subscribe", id, http.StatusBadRequest, t0)
return c.String(http.StatusBadRequest, "read body: "+err.Error()) return c.String(http.StatusBadRequest, "read body: "+err.Error())
} }
if len(body) == 0 || !strings.HasPrefix(string(body), "v=") { if len(body) == 0 || !strings.HasPrefix(string(body), "v=") {
h.recordRequest("subscribe", id, http.StatusBadRequest, t0)
return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error()) return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error())
} }
if err := requireH264AndOpus(string(body)); err != nil { if err := requireH264AndOpus(string(body)); err != nil {
if h.met != nil {
if cme, ok2 := err.(*codecMismatchError); ok2 {
for _, kind := range cme.missing {
h.met.codecMismatches.WithLabelValues(id, strings.ToLower(kind)).Inc()
}
}
}
h.recordRequest("subscribe", id, http.StatusNotAcceptable, t0)
return c.String(http.StatusNotAcceptable, err.Error()) return c.String(http.StatusNotAcceptable, err.Error())
} }
@ -147,10 +172,13 @@ func (h *Handler) Subscribe(c echo.Context) error {
// Surface the design's error matrix. // Surface the design's error matrix.
switch err { switch err {
case corewebrtc.ErrICETimeout: case corewebrtc.ErrICETimeout:
h.recordRequest("subscribe", id, http.StatusGatewayTimeout, t0)
return c.String(http.StatusGatewayTimeout, err.Error()) return c.String(http.StatusGatewayTimeout, err.Error())
case corewebrtc.ErrCodecMismatch: case corewebrtc.ErrCodecMismatch:
h.recordRequest("subscribe", id, http.StatusNotAcceptable, t0)
return c.String(http.StatusNotAcceptable, err.Error()) return c.String(http.StatusNotAcceptable, err.Error())
default: default:
h.recordRequest("subscribe", id, http.StatusInternalServerError, t0)
return c.String(http.StatusInternalServerError, "create peer: "+err.Error()) return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
} }
} }
@ -171,6 +199,11 @@ func (h *Handler) Subscribe(c echo.Context) error {
// leaks for the lifetime of the handler. // leaks for the lifetime of the handler.
go h.awaitPeerClose(rid, peer) go h.awaitPeerClose(rid, peer)
// Track ICE establishment duration asynchronously.
go h.trackICE(id, peer, time.Now())
h.recordRequest("subscribe", id, http.StatusCreated, t0)
c.Response().Header().Set("Content-Type", "application/sdp") c.Response().Header().Set("Content-Type", "application/sdp")
c.Response().Header().Set("Location", "/whep/"+id+"/"+rid) c.Response().Header().Set("Location", "/whep/"+id+"/"+rid)
c.Response().Header().Set("ETag", `"`+rid+`"`) c.Response().Header().Set("ETag", `"`+rid+`"`)
@ -193,9 +226,11 @@ func (h *Handler) Subscribe(c echo.Context) error {
// @Router /api/v3/whep/{id}/{resource} [delete] // @Router /api/v3/whep/{id}/{resource} [delete]
func (h *Handler) Unsubscribe(c echo.Context) error { func (h *Handler) Unsubscribe(c echo.Context) error {
addCORS(c) addCORS(c)
t0 := time.Now()
resource := c.Param("resource") resource := c.Param("resource")
if resource == "" { if resource == "" {
h.recordRequest("unsubscribe", "", http.StatusBadRequest, t0)
return c.String(http.StatusBadRequest, "missing resource id") return c.String(http.StatusBadRequest, "missing resource id")
} }
@ -218,6 +253,8 @@ func (h *Handler) Unsubscribe(c echo.Context) error {
if streamID != "" { if streamID != "" {
atomic.AddInt64(&h.count, -1) atomic.AddInt64(&h.count, -1)
} }
h.recordRequest("unsubscribe", streamID, http.StatusNoContent, t0)
return c.NoContent(http.StatusNoContent) return c.NoContent(http.StatusNoContent)
} }
@ -240,9 +277,11 @@ func (h *Handler) Unsubscribe(c echo.Context) error {
// @Router /api/v3/whep/{id}/{resource} [patch] // @Router /api/v3/whep/{id}/{resource} [patch]
func (h *Handler) Trickle(c echo.Context) error { func (h *Handler) Trickle(c echo.Context) error {
addCORS(c) addCORS(c)
t0 := time.Now()
resource := c.Param("resource") resource := c.Param("resource")
if resource == "" { if resource == "" {
h.recordRequest("trickle", "", http.StatusBadRequest, t0)
return c.String(http.StatusBadRequest, "missing resource id") return c.String(http.StatusBadRequest, "missing resource id")
} }
@ -254,11 +293,13 @@ func (h *Handler) Trickle(c echo.Context) error {
} }
h.mu.Unlock() h.mu.Unlock()
if peer == nil { if peer == nil {
h.recordRequest("trickle", streamID, http.StatusNotFound, t0)
return c.NoContent(http.StatusNotFound) return c.NoContent(http.StatusNotFound)
} }
body, err := io.ReadAll(c.Request().Body) body, err := io.ReadAll(c.Request().Body)
if err != nil { if err != nil {
h.recordRequest("trickle", streamID, http.StatusBadRequest, t0)
return c.String(http.StatusBadRequest, "read body: "+err.Error()) return c.String(http.StatusBadRequest, "read body: "+err.Error())
} }
for _, line := range strings.Split(string(body), "\n") { for _, line := range strings.Split(string(body), "\n") {
@ -269,9 +310,22 @@ func (h *Handler) Trickle(c echo.Context) error {
cand := strings.TrimPrefix(line, "a=") cand := strings.TrimPrefix(line, "a=")
_ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand}) _ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand})
} }
h.recordRequest("trickle", streamID, http.StatusNoContent, t0)
return c.NoContent(http.StatusNoContent) return c.NoContent(http.StatusNoContent)
} }
// recordRequest records whepRequests counter and whepRequestDuration histogram
// for any WHEP route outcome. Silently no-ops if metrics are not initialised.
func (h *Handler) recordRequest(route, streamID string, code int, t0 time.Time) {
if h.met == nil {
return
}
codeStr := fmt.Sprintf("%d", code)
h.met.whepRequests.WithLabelValues(route, codeStr, streamID).Inc()
h.met.whepRequestDuration.WithLabelValues(route, streamID).Observe(time.Since(t0).Seconds())
}
// preflight answers a CORS OPTIONS request; the headers are also // preflight answers a CORS OPTIONS request; the headers are also
// echoed on every other response. // echoed on every other response.
func (h *Handler) preflight(c echo.Context) error { func (h *Handler) preflight(c echo.Context) error {
@ -326,10 +380,12 @@ func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) {
// tearDownStreamPeers is the callback the Subsystem runs in its // tearDownStreamPeers is the callback the Subsystem runs in its
// onProcessStop hook. It closes every peer subscribed to that // onProcessStop hook. It closes every peer subscribed to that
// stream (driving each one's Done() and indirectly awaitPeerClose). // stream and records FFmpeg leg failures if any peers were active,
// which indicates the process died unexpectedly.
func (h *Handler) tearDownStreamPeers(streamID string) { func (h *Handler) tearDownStreamPeers(streamID string) {
h.mu.Lock() h.mu.Lock()
bucket := h.peersByStream[streamID] bucket := h.peersByStream[streamID]
hadPeers := len(bucket) > 0
peers := make([]*corewebrtc.Peer, 0, len(bucket)) peers := make([]*corewebrtc.Peer, 0, len(bucket))
for _, p := range bucket { for _, p := range bucket {
peers = append(peers, p) peers = append(peers, p)
@ -341,6 +397,11 @@ func (h *Handler) tearDownStreamPeers(streamID string) {
_ = p.Close() _ = p.Close()
} }
} }
if hadPeers && h.met != nil {
h.met.ffmpegLegFailures.WithLabelValues(streamID, "video").Inc()
h.met.ffmpegLegFailures.WithLabelValues(streamID, "audio").Inc()
}
} }
// addCORS emits the response headers a browser-side WHEP player // addCORS emits the response headers a browser-side WHEP player