diff --git a/app/webrtc/metrics.go b/app/webrtc/metrics.go index 9197575..bc1d49f 100644 --- a/app/webrtc/metrics.go +++ b/app/webrtc/metrics.go @@ -11,6 +11,7 @@ import ( var iceHistBuckets = []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} type webrtcMetrics struct { + // WHEP egress metrics whepRequests *prometheus.CounterVec whepRequestDuration *prometheus.HistogramVec iceEstablishment *prometheus.HistogramVec @@ -18,6 +19,12 @@ type webrtcMetrics struct { codecMismatches *prometheus.CounterVec capRejections *prometheus.CounterVec ffmpegLegFailures *prometheus.CounterVec + + // WHIP ingest metrics — symmetric with WHEP where applicable. + // ICE establishment/failure reuse the WHEP histograms (shared labels). + whipRequests *prometheus.CounterVec + whipRequestDuration *prometheus.HistogramVec + whipCapRejections *prometheus.CounterVec } // mustRegisterCounter creates a CounterVec and registers it with reg. @@ -38,6 +45,7 @@ func mustRegisterHistogram(reg prometheus.Registerer, opts prometheus.HistogramO func initMetrics(reg prometheus.Registerer, core string) *webrtcMetrics { cl := prometheus.Labels{"core": core} return &webrtcMetrics{ + // --- WHEP --- whepRequests: mustRegisterCounter(reg, prometheus.CounterOpts{ Name: "dragonfork_webrtc_whep_requests_total", Help: "Count of WHEP HTTP requests by route, HTTP status code, and stream.", @@ -53,14 +61,14 @@ func initMetrics(reg prometheus.Registerer, core string) *webrtcMetrics { iceEstablishment: mustRegisterHistogram(reg, prometheus.HistogramOpts{ Name: "dragonfork_webrtc_ice_establishment_duration_seconds", - Help: "Duration from peer creation to first connected or failed ICE state.", + Help: "Duration from peer creation to first connected or failed ICE state (shared by WHEP and WHIP).", ConstLabels: cl, Buckets: iceHistBuckets, }, []string{"stream_id", "result"}), iceFailures: mustRegisterCounter(reg, prometheus.CounterOpts{ Name: "dragonfork_webrtc_ice_failures_total", - Help: "Count of ICE failures by stream and reason.", + Help: "Count of ICE failures by stream and reason (shared by WHEP and WHIP).", ConstLabels: cl, }, []string{"stream_id", "reason"}), @@ -72,7 +80,7 @@ func initMetrics(reg prometheus.Registerer, core string) *webrtcMetrics { capRejections: mustRegisterCounter(reg, prometheus.CounterOpts{ Name: "dragonfork_webrtc_cap_rejections_total", - Help: "Count of 503 peer-cap rejections by stream and scope (global or stream).", + Help: "Count of 503 WHEP peer-cap rejections by stream and scope (global or stream).", ConstLabels: cl, }, []string{"stream_id", "scope"}), @@ -81,6 +89,26 @@ func initMetrics(reg prometheus.Registerer, core string) *webrtcMetrics { Help: "Count of FFmpeg RTP output leg failures (process stopped while peers were active).", ConstLabels: cl, }, []string{"stream_id", "leg"}), + + // --- WHIP --- + whipRequests: mustRegisterCounter(reg, prometheus.CounterOpts{ + Name: "dragonfork_webrtc_whip_requests_total", + Help: "Count of WHIP HTTP requests by route, HTTP status code, and stream.", + ConstLabels: cl, + }, []string{"route", "code", "stream_id"}), + + whipRequestDuration: mustRegisterHistogram(reg, prometheus.HistogramOpts{ + Name: "dragonfork_webrtc_whip_request_duration_seconds", + Help: "Server-side WHIP request latency in seconds, by route and stream.", + ConstLabels: cl, + Buckets: iceHistBuckets, + }, []string{"route", "stream_id"}), + + whipCapRejections: mustRegisterCounter(reg, prometheus.CounterOpts{ + Name: "dragonfork_webrtc_whip_cap_rejections_total", + Help: "Count of 503/409 WHIP publisher-cap or conflict rejections by stream and scope.", + ConstLabels: cl, + }, []string{"stream_id", "scope"}), } } @@ -91,6 +119,15 @@ func (h *Handler) InitMetrics(reg prometheus.Registerer, core string) { h.met = initMetrics(reg, core) } +// SetMetrics attaches a shared *webrtcMetrics to the WHIPHandler so that +// WHIP ingest routes emit Prometheus observations. If both the WHEP Handler +// and the WHIP Handler are in use, call Handler.InitMetrics once and pass +// the result to WHIPHandler.SetMetrics — registering the metrics twice +// on the same Registerer panics. +func (h *WHIPHandler) SetMetrics(met *webrtcMetrics) { + h.met = met +} + // Stats implements coreprom.WebRTCStatsSource for the Prometheus snapshot // collector. Returns a consistent snapshot under h.mu. func (h *Handler) Stats() coreprom.WebRTCStats { @@ -111,6 +148,12 @@ func (h *Handler) Stats() coreprom.WebRTCStats { } } +// PublisherCount returns the number of currently active WHIP publishers. +// Safe to call from any goroutine (atomic read). +func (h *WHIPHandler) PublisherCount() int64 { + return h.count +} + // trackICE waits for the first terminal ICE event and records establishment // duration and failure metrics. t0 should be captured immediately before // CreatePeerFromSources returns. Runs in a goroutine per Subscribe call. @@ -127,3 +170,21 @@ func (h *Handler) trackICE(streamID string, peer *corewebrtc.Peer, t0 time.Time) h.met.iceFailures.WithLabelValues(streamID, "reason").Inc() } } + +// trackICE waits for the first terminal ICE event on a WHIP IngestPeer and +// records establishment duration using the same shared histograms as the +// WHEP egress ICE tracker. This gives a unified ICE health view across +// both publish and subscribe paths. +func (h *WHIPHandler) trackICE(streamID string, peer *corewebrtc.IngestPeer, t0 time.Time) { + if h.met == nil { + return + } + select { + case <-peer.Connected(): + h.met.iceEstablishment.WithLabelValues(streamID, "connected").Observe(time.Since(t0).Seconds()) + case <-peer.Done(): + dur := time.Since(t0) + h.met.iceEstablishment.WithLabelValues(streamID, "failed").Observe(dur.Seconds()) + h.met.iceFailures.WithLabelValues(streamID, "ingest").Inc() + } +}