From eaf62b7397a94b183e54d15713a5ca0236d37163 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Wed, 6 May 2026 15:56:12 -0400 Subject: [PATCH] feat(webrtc): add WebRTC Prometheus metrics (direct instrumentation) --- app/webrtc/metrics.go | 110 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 app/webrtc/metrics.go diff --git a/app/webrtc/metrics.go b/app/webrtc/metrics.go new file mode 100644 index 0000000..5df3aca --- /dev/null +++ b/app/webrtc/metrics.go @@ -0,0 +1,110 @@ +package webrtc + +import ( + "time" + + corewebrtc "github.com/datarhei/core/v16/core/webrtc" + coreprom "github.com/datarhei/core/v16/prometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var iceHistBuckets = []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} + +type webrtcMetrics struct { + whepRequests *prometheus.CounterVec + whepRequestDuration *prometheus.HistogramVec + iceEstablishment *prometheus.HistogramVec + iceFailures *prometheus.CounterVec + codecMismatches *prometheus.CounterVec + capRejections *prometheus.CounterVec + ffmpegLegFailures *prometheus.CounterVec +} + +func initMetrics(reg prometheus.Registerer, core string) *webrtcMetrics { + f := promauto.With(reg) + cl := prometheus.Labels{"core": core} + return &webrtcMetrics{ + whepRequests: f.NewCounterVec(prometheus.CounterOpts{ + Name: "dragonfork_webrtc_whep_requests_total", + Help: "Count of WHEP HTTP requests by route, HTTP status code, and stream.", + ConstLabels: cl, + }, []string{"route", "code", "stream_id"}), + whepRequestDuration: f.NewHistogramVec(prometheus.HistogramOpts{ + Name: "dragonfork_webrtc_whep_request_duration_seconds", + Help: "Server-side WHEP request latency in seconds, by route and stream.", + ConstLabels: cl, + Buckets: iceHistBuckets, + }, []string{"route", "stream_id"}), + iceEstablishment: f.NewHistogramVec(prometheus.HistogramOpts{ + Name: "dragonfork_webrtc_ice_establishment_duration_seconds", + Help: "Duration from peer creation to first connected or failed ICE state.", + ConstLabels: cl, + Buckets: iceHistBuckets, + }, []string{"stream_id", "result"}), + iceFailures: f.NewCounterVec(prometheus.CounterOpts{ + Name: "dragonfork_webrtc_ice_failures_total", + Help: "Count of ICE failures by stream and reason.", + ConstLabels: cl, + }, []string{"stream_id", "reason"}), + codecMismatches: f.NewCounterVec(prometheus.CounterOpts{ + Name: "dragonfork_webrtc_codec_mismatches_total", + Help: "Count of 406 codec-mismatch rejections by stream and codec kind.", + ConstLabels: cl, + }, []string{"stream_id", "kind"}), + capRejections: f.NewCounterVec(prometheus.CounterOpts{ + Name: "dragonfork_webrtc_cap_rejections_total", + Help: "Count of 503 peer-cap rejections by stream and scope (global or stream).", + ConstLabels: cl, + }, []string{"stream_id", "scope"}), + ffmpegLegFailures: f.NewCounterVec(prometheus.CounterOpts{ + Name: "dragonfork_webrtc_ffmpeg_leg_failures_total", + Help: "Count of FFmpeg RTP output leg failures (process stopped while peers were active).", + ConstLabels: cl, + }, []string{"stream_id", "leg"}), + } +} + +// InitMetrics initialises WebRTC direct-instrumentation metrics on h and +// registers the snapshot collector with reg. Call once after construction, +// before the handler serves requests. Panics on duplicate registration. +func (h *Handler) InitMetrics(reg prometheus.Registerer, core string) { + h.met = initMetrics(reg, core) +} + +// Stats implements coreprom.WebRTCStatsSource for the Prometheus snapshot +// collector. Returns a consistent snapshot under h.mu. +func (h *Handler) Stats() coreprom.WebRTCStats { + h.mu.Lock() + peers := make(map[string]int, len(h.peersByStream)) + for id, pm := range h.peersByStream { + peers[id] = len(pm) + } + sc := 0 + if h.sub != nil { + sc = h.sub.StreamCount() + } + h.mu.Unlock() + return coreprom.WebRTCStats{ + StreamCount: sc, + PeersByStream: peers, + UDPPortsInUse: sc * 2, + } +} + +// trackICE waits for the first terminal ICE event and records establishment +// duration and failure metrics. t0 should be captured immediately before +// CreatePeerFromSources returns. Runs in a goroutine per Subscribe call. +func (h *Handler) trackICE(streamID string, peer *corewebrtc.Peer, t0 time.Time) { + if h.met == nil { + return + } + select { + case <-peer.Connected(): + h.met.iceEstablishment.WithLabelValues(streamID, "connected").Observe(time.Since(t0).Seconds()) + case <-peer.Done(): + dur := time.Since(t0) + h.met.iceEstablishment.WithLabelValues(streamID, "failed").Observe(dur.Seconds()) + h.met.iceFailures.WithLabelValues(streamID, "failed").Inc() + } +}