diff --git a/app/webrtc/whip_handler.go b/app/webrtc/whip_handler.go index 06e187d..b2ddfa1 100644 --- a/app/webrtc/whip_handler.go +++ b/app/webrtc/whip_handler.go @@ -31,6 +31,8 @@ type WHIPHandler struct { ingestStream map[string]string // resource -> streamID (reverse index) count int64 // atomic; concurrent publishers maxCapTotal int64 + + met *webrtcMetrics // nil until SetMetrics is called } // NewWHIPHandler wraps the subsystem in an Echo-compatible WHIP handler. @@ -111,6 +113,9 @@ func (h *WHIPHandler) Publish(c echo.Context) error { // Global cap: cheap atomic check before real work. if atomic.LoadInt64(&h.count) >= h.maxCapTotal { + if h.met != nil { + h.met.whipCapRejections.WithLabelValues(id, "global").Inc() + } h.recordRequest("publish", id, http.StatusServiceUnavailable, t0) return c.String(http.StatusServiceUnavailable, "webrtc: whip: publisher cap reached") } @@ -126,6 +131,9 @@ func (h *WHIPHandler) Publish(c echo.Context) error { h.mu.Lock() if len(h.ingestByStream[id]) > 0 { h.mu.Unlock() + if h.met != nil { + h.met.whipCapRejections.WithLabelValues(id, "conflict").Inc() + } h.recordRequest("publish", id, http.StatusConflict, t0) return c.String(http.StatusConflict, "webrtc: whip: stream already has an active publisher") } @@ -172,6 +180,10 @@ func (h *WHIPHandler) Publish(c echo.Context) error { // Auto-cleanup on disconnect. go h.awaitIngestClose(rid, peer) + // Track ICE establishment duration using the shared ICE histograms + // (same metric family as WHEP egress, disambiguated by result label). + go h.trackICE(id, peer, time.Now()) + h.recordRequest("publish", id, http.StatusCreated, t0) // RFC 9261 §5.2: emit one Link header per configured ICE server so @@ -236,13 +248,6 @@ func (h *WHIPHandler) Unpublish(c echo.Context) error { // TrickleIngest handles PATCH /whip/:id/:resource — adds ICE candidates // from a trickle-ice-sdpfrag body. // -// The body format follows draft-ietf-wish-whip §5: one or more -// "a=candidate:…" lines in application/trickle-ice-sdpfrag. Each -// matching line is forwarded directly to the underlying PeerConnection. -// An empty body or a body with no candidate lines is a no-op (clients -// signal end-of-candidates via an a=end-of-candidates line, which -// AddICECandidate correctly ignores at the Pion level). -// // @Summary Trickle ICE candidates for a WHIP publish session // @Tags v16.16.0 // @ID webrtc-3-whip-trickle @@ -339,7 +344,6 @@ func (h *WHIPHandler) awaitIngestClose(resource string, peer *corewebrtc.IngestP // tearDownStreamIngests is called by the Subsystem's SetWHIPTeardownHook // to close any active publisher when the FFmpeg process stops. -// Not exported — registered internally via NewWHIPHandler. func (h *WHIPHandler) tearDownStreamIngests(streamID string) { h.mu.Lock() bucket := h.ingestByStream[streamID] @@ -356,11 +360,15 @@ func (h *WHIPHandler) tearDownStreamIngests(streamID string) { } } -// recordRequest logs request metrics. Currently a thin wrapper; WHIP -// metrics counters will be wired in alongside WHEP metrics in a follow-up. +// recordRequest logs request metrics to the shared Prometheus metrics +// instance. No-ops if SetMetrics has not been called. func (h *WHIPHandler) recordRequest(route, streamID string, code int, t0 time.Time) { - // Placeholder — wire Prometheus metrics in a follow-up commit. - _ = fmt.Sprintf("%s %s %d %.3fs", route, streamID, code, time.Since(t0).Seconds()) + if h.met == nil { + return + } + codeStr := fmt.Sprintf("%d", code) + h.met.whipRequests.WithLabelValues(route, codeStr, streamID).Inc() + h.met.whipRequestDuration.WithLabelValues(route, streamID).Observe(time.Since(t0).Seconds()) } // preflight answers CORS OPTIONS requests.