package webrtc import ( "time" corewebrtc "github.com/datarhei/core/v16/core/webrtc" coreprom "github.com/datarhei/core/v16/prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) var iceHistBuckets = []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} type webrtcMetrics struct { whepRequests *prometheus.CounterVec whepRequestDuration *prometheus.HistogramVec iceEstablishment *prometheus.HistogramVec iceFailures *prometheus.CounterVec codecMismatches *prometheus.CounterVec capRejections *prometheus.CounterVec ffmpegLegFailures *prometheus.CounterVec } func initMetrics(reg prometheus.Registerer, core string) *webrtcMetrics { f := promauto.With(reg) cl := prometheus.Labels{"core": core} return &webrtcMetrics{ whepRequests: f.NewCounterVec(prometheus.CounterOpts{ Name: "dragonfork_webrtc_whep_requests_total", Help: "Count of WHEP HTTP requests by route, HTTP status code, and stream.", ConstLabels: cl, }, []string{"route", "code", "stream_id"}), whepRequestDuration: f.NewHistogramVec(prometheus.HistogramOpts{ Name: "dragonfork_webrtc_whep_request_duration_seconds", Help: "Server-side WHEP request latency in seconds, by route and stream.", ConstLabels: cl, Buckets: iceHistBuckets, }, []string{"route", "stream_id"}), iceEstablishment: f.NewHistogramVec(prometheus.HistogramOpts{ Name: "dragonfork_webrtc_ice_establishment_duration_seconds", Help: "Duration from peer creation to first connected or failed ICE state.", ConstLabels: cl, Buckets: iceHistBuckets, }, []string{"stream_id", "result"}), iceFailures: f.NewCounterVec(prometheus.CounterOpts{ Name: "dragonfork_webrtc_ice_failures_total", Help: "Count of ICE failures by stream and reason.", ConstLabels: cl, }, []string{"stream_id", "reason"}), codecMismatches: f.NewCounterVec(prometheus.CounterOpts{ Name: "dragonfork_webrtc_codec_mismatches_total", Help: "Count of 406 codec-mismatch rejections by stream and codec kind.", ConstLabels: cl, }, []string{"stream_id", "kind"}), capRejections: f.NewCounterVec(prometheus.CounterOpts{ Name: "dragonfork_webrtc_cap_rejections_total", Help: "Count of 503 peer-cap rejections by stream and scope (global or stream).", ConstLabels: cl, }, []string{"stream_id", "scope"}), ffmpegLegFailures: f.NewCounterVec(prometheus.CounterOpts{ Name: "dragonfork_webrtc_ffmpeg_leg_failures_total", Help: "Count of FFmpeg RTP output leg failures (process stopped while peers were active).", ConstLabels: cl, }, []string{"stream_id", "leg"}), } } // InitMetrics initialises WebRTC direct-instrumentation metrics on h and // registers the snapshot collector with reg. Call once after construction, // before the handler serves requests. Panics on duplicate registration. func (h *Handler) InitMetrics(reg prometheus.Registerer, core string) { h.met = initMetrics(reg, core) } // Stats implements coreprom.WebRTCStatsSource for the Prometheus snapshot // collector. Returns a consistent snapshot under h.mu. func (h *Handler) Stats() coreprom.WebRTCStats { h.mu.Lock() peers := make(map[string]int, len(h.peersByStream)) for id, pm := range h.peersByStream { peers[id] = len(pm) } sc := 0 if h.sub != nil { sc = h.sub.StreamCount() } h.mu.Unlock() return coreprom.WebRTCStats{ StreamCount: sc, PeersByStream: peers, UDPPortsInUse: sc * 2, } } // trackICE waits for the first terminal ICE event and records establishment // duration and failure metrics. t0 should be captured immediately before // CreatePeerFromSources returns. Runs in a goroutine per Subscribe call. func (h *Handler) trackICE(streamID string, peer *corewebrtc.Peer, t0 time.Time) { if h.met == nil { return } select { case <-peer.Connected(): h.met.iceEstablishment.WithLabelValues(streamID, "connected").Observe(time.Since(t0).Seconds()) case <-peer.Done(): dur := time.Since(t0) h.met.iceEstablishment.WithLabelValues(streamID, "failed").Observe(dur.Seconds()) h.met.iceFailures.WithLabelValues(streamID, "failed").Inc() } }