webrtc: add WHIP request metrics to webrtcMetrics, expose SetMetrics on WHIPHandler (issue #22)
This commit is contained in:
parent
07db6ebb4e
commit
eaa798e77b
1 changed files with 64 additions and 3 deletions
|
|
@ -11,6 +11,7 @@ import (
|
||||||
var iceHistBuckets = []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
|
var iceHistBuckets = []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
|
||||||
|
|
||||||
type webrtcMetrics struct {
|
type webrtcMetrics struct {
|
||||||
|
// WHEP egress metrics
|
||||||
whepRequests *prometheus.CounterVec
|
whepRequests *prometheus.CounterVec
|
||||||
whepRequestDuration *prometheus.HistogramVec
|
whepRequestDuration *prometheus.HistogramVec
|
||||||
iceEstablishment *prometheus.HistogramVec
|
iceEstablishment *prometheus.HistogramVec
|
||||||
|
|
@ -18,6 +19,12 @@ type webrtcMetrics struct {
|
||||||
codecMismatches *prometheus.CounterVec
|
codecMismatches *prometheus.CounterVec
|
||||||
capRejections *prometheus.CounterVec
|
capRejections *prometheus.CounterVec
|
||||||
ffmpegLegFailures *prometheus.CounterVec
|
ffmpegLegFailures *prometheus.CounterVec
|
||||||
|
|
||||||
|
// WHIP ingest metrics — symmetric with WHEP where applicable.
|
||||||
|
// ICE establishment/failure reuse the WHEP histograms (shared labels).
|
||||||
|
whipRequests *prometheus.CounterVec
|
||||||
|
whipRequestDuration *prometheus.HistogramVec
|
||||||
|
whipCapRejections *prometheus.CounterVec
|
||||||
}
|
}
|
||||||
|
|
||||||
// mustRegisterCounter creates a CounterVec and registers it with reg.
|
// mustRegisterCounter creates a CounterVec and registers it with reg.
|
||||||
|
|
@ -38,6 +45,7 @@ func mustRegisterHistogram(reg prometheus.Registerer, opts prometheus.HistogramO
|
||||||
func initMetrics(reg prometheus.Registerer, core string) *webrtcMetrics {
|
func initMetrics(reg prometheus.Registerer, core string) *webrtcMetrics {
|
||||||
cl := prometheus.Labels{"core": core}
|
cl := prometheus.Labels{"core": core}
|
||||||
return &webrtcMetrics{
|
return &webrtcMetrics{
|
||||||
|
// --- WHEP ---
|
||||||
whepRequests: mustRegisterCounter(reg, prometheus.CounterOpts{
|
whepRequests: mustRegisterCounter(reg, prometheus.CounterOpts{
|
||||||
Name: "dragonfork_webrtc_whep_requests_total",
|
Name: "dragonfork_webrtc_whep_requests_total",
|
||||||
Help: "Count of WHEP HTTP requests by route, HTTP status code, and stream.",
|
Help: "Count of WHEP HTTP requests by route, HTTP status code, and stream.",
|
||||||
|
|
@ -53,14 +61,14 @@ func initMetrics(reg prometheus.Registerer, core string) *webrtcMetrics {
|
||||||
|
|
||||||
iceEstablishment: mustRegisterHistogram(reg, prometheus.HistogramOpts{
|
iceEstablishment: mustRegisterHistogram(reg, prometheus.HistogramOpts{
|
||||||
Name: "dragonfork_webrtc_ice_establishment_duration_seconds",
|
Name: "dragonfork_webrtc_ice_establishment_duration_seconds",
|
||||||
Help: "Duration from peer creation to first connected or failed ICE state.",
|
Help: "Duration from peer creation to first connected or failed ICE state (shared by WHEP and WHIP).",
|
||||||
ConstLabels: cl,
|
ConstLabels: cl,
|
||||||
Buckets: iceHistBuckets,
|
Buckets: iceHistBuckets,
|
||||||
}, []string{"stream_id", "result"}),
|
}, []string{"stream_id", "result"}),
|
||||||
|
|
||||||
iceFailures: mustRegisterCounter(reg, prometheus.CounterOpts{
|
iceFailures: mustRegisterCounter(reg, prometheus.CounterOpts{
|
||||||
Name: "dragonfork_webrtc_ice_failures_total",
|
Name: "dragonfork_webrtc_ice_failures_total",
|
||||||
Help: "Count of ICE failures by stream and reason.",
|
Help: "Count of ICE failures by stream and reason (shared by WHEP and WHIP).",
|
||||||
ConstLabels: cl,
|
ConstLabels: cl,
|
||||||
}, []string{"stream_id", "reason"}),
|
}, []string{"stream_id", "reason"}),
|
||||||
|
|
||||||
|
|
@ -72,7 +80,7 @@ func initMetrics(reg prometheus.Registerer, core string) *webrtcMetrics {
|
||||||
|
|
||||||
capRejections: mustRegisterCounter(reg, prometheus.CounterOpts{
|
capRejections: mustRegisterCounter(reg, prometheus.CounterOpts{
|
||||||
Name: "dragonfork_webrtc_cap_rejections_total",
|
Name: "dragonfork_webrtc_cap_rejections_total",
|
||||||
Help: "Count of 503 peer-cap rejections by stream and scope (global or stream).",
|
Help: "Count of 503 WHEP peer-cap rejections by stream and scope (global or stream).",
|
||||||
ConstLabels: cl,
|
ConstLabels: cl,
|
||||||
}, []string{"stream_id", "scope"}),
|
}, []string{"stream_id", "scope"}),
|
||||||
|
|
||||||
|
|
@ -81,6 +89,26 @@ func initMetrics(reg prometheus.Registerer, core string) *webrtcMetrics {
|
||||||
Help: "Count of FFmpeg RTP output leg failures (process stopped while peers were active).",
|
Help: "Count of FFmpeg RTP output leg failures (process stopped while peers were active).",
|
||||||
ConstLabels: cl,
|
ConstLabels: cl,
|
||||||
}, []string{"stream_id", "leg"}),
|
}, []string{"stream_id", "leg"}),
|
||||||
|
|
||||||
|
// --- WHIP ---
|
||||||
|
whipRequests: mustRegisterCounter(reg, prometheus.CounterOpts{
|
||||||
|
Name: "dragonfork_webrtc_whip_requests_total",
|
||||||
|
Help: "Count of WHIP HTTP requests by route, HTTP status code, and stream.",
|
||||||
|
ConstLabels: cl,
|
||||||
|
}, []string{"route", "code", "stream_id"}),
|
||||||
|
|
||||||
|
whipRequestDuration: mustRegisterHistogram(reg, prometheus.HistogramOpts{
|
||||||
|
Name: "dragonfork_webrtc_whip_request_duration_seconds",
|
||||||
|
Help: "Server-side WHIP request latency in seconds, by route and stream.",
|
||||||
|
ConstLabels: cl,
|
||||||
|
Buckets: iceHistBuckets,
|
||||||
|
}, []string{"route", "stream_id"}),
|
||||||
|
|
||||||
|
whipCapRejections: mustRegisterCounter(reg, prometheus.CounterOpts{
|
||||||
|
Name: "dragonfork_webrtc_whip_cap_rejections_total",
|
||||||
|
Help: "Count of 503/409 WHIP publisher-cap or conflict rejections by stream and scope.",
|
||||||
|
ConstLabels: cl,
|
||||||
|
}, []string{"stream_id", "scope"}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -91,6 +119,15 @@ func (h *Handler) InitMetrics(reg prometheus.Registerer, core string) {
|
||||||
h.met = initMetrics(reg, core)
|
h.met = initMetrics(reg, core)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetMetrics attaches a shared *webrtcMetrics to the WHIPHandler so that
|
||||||
|
// WHIP ingest routes emit Prometheus observations. If both the WHEP Handler
|
||||||
|
// and the WHIP Handler are in use, call Handler.InitMetrics once and pass
|
||||||
|
// the result to WHIPHandler.SetMetrics — registering the metrics twice
|
||||||
|
// on the same Registerer panics.
|
||||||
|
func (h *WHIPHandler) SetMetrics(met *webrtcMetrics) {
|
||||||
|
h.met = met
|
||||||
|
}
|
||||||
|
|
||||||
// Stats implements coreprom.WebRTCStatsSource for the Prometheus snapshot
|
// Stats implements coreprom.WebRTCStatsSource for the Prometheus snapshot
|
||||||
// collector. Returns a consistent snapshot under h.mu.
|
// collector. Returns a consistent snapshot under h.mu.
|
||||||
func (h *Handler) Stats() coreprom.WebRTCStats {
|
func (h *Handler) Stats() coreprom.WebRTCStats {
|
||||||
|
|
@ -111,6 +148,12 @@ func (h *Handler) Stats() coreprom.WebRTCStats {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PublisherCount returns the number of currently active WHIP publishers.
|
||||||
|
// Safe to call from any goroutine (atomic read).
|
||||||
|
func (h *WHIPHandler) PublisherCount() int64 {
|
||||||
|
return h.count
|
||||||
|
}
|
||||||
|
|
||||||
// trackICE waits for the first terminal ICE event and records establishment
|
// trackICE waits for the first terminal ICE event and records establishment
|
||||||
// duration and failure metrics. t0 should be captured immediately before
|
// duration and failure metrics. t0 should be captured immediately before
|
||||||
// CreatePeerFromSources returns. Runs in a goroutine per Subscribe call.
|
// CreatePeerFromSources returns. Runs in a goroutine per Subscribe call.
|
||||||
|
|
@ -127,3 +170,21 @@ func (h *Handler) trackICE(streamID string, peer *corewebrtc.Peer, t0 time.Time)
|
||||||
h.met.iceFailures.WithLabelValues(streamID, "reason").Inc()
|
h.met.iceFailures.WithLabelValues(streamID, "reason").Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// trackICE waits for the first terminal ICE event on a WHIP IngestPeer and
|
||||||
|
// records establishment duration using the same shared histograms as the
|
||||||
|
// WHEP egress ICE tracker. This gives a unified ICE health view across
|
||||||
|
// both publish and subscribe paths.
|
||||||
|
func (h *WHIPHandler) trackICE(streamID string, peer *corewebrtc.IngestPeer, t0 time.Time) {
|
||||||
|
if h.met == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-peer.Connected():
|
||||||
|
h.met.iceEstablishment.WithLabelValues(streamID, "connected").Observe(time.Since(t0).Seconds())
|
||||||
|
case <-peer.Done():
|
||||||
|
dur := time.Since(t0)
|
||||||
|
h.met.iceEstablishment.WithLabelValues(streamID, "failed").Observe(dur.Seconds())
|
||||||
|
h.met.iceFailures.WithLabelValues(streamID, "ingest").Inc()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue