diff --git a/app/webrtc/whip_handler.go b/app/webrtc/whip_handler.go new file mode 100644 index 0000000..1924371 --- /dev/null +++ b/app/webrtc/whip_handler.go @@ -0,0 +1,345 @@ +package webrtc + +import ( + "fmt" + "io" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/labstack/echo/v4" + "github.com/pion/webrtc/v4" + + corewebrtc "github.com/datarhei/core/v16/core/webrtc" +) + +// WHIPHandler exposes the subsystem's WHIP Echo handlers. Wire them +// into the /api/v3 group alongside the WHEP Handler via +// WHIPHandler.Register. +// +// Lifecycle: ingest peers are tracked in a streamID→resourceID→IngestPeer +// index. On every Publish a goroutine watches the peer's Done() channel; +// when the publisher disconnects or Close() runs the entry is removed +// and the counters tick back down — no leaks if OBS rage-quits. +type WHIPHandler struct { + sub *Subsystem + + mu sync.Mutex + ingestByStream map[string]map[string]*corewebrtc.IngestPeer // streamID -> resource -> peer + ingestStream map[string]string // resource -> streamID (reverse index) + count int64 // atomic; concurrent publishers + maxCapTotal int64 +} + +// NewWHIPHandler wraps the subsystem in an Echo-compatible WHIP handler. +// maxPublishers caps concurrent ingest sessions across all streams; +// pass 0 to default to 64. +func NewWHIPHandler(s *Subsystem, maxPublishers int) *WHIPHandler { + total := int64(maxPublishers) + if total <= 0 { + total = 64 + } + h := &WHIPHandler{ + sub: s, + ingestByStream: make(map[string]map[string]*corewebrtc.IngestPeer), + ingestStream: make(map[string]string), + maxCapTotal: total, + } + return h +} + +// Register mounts the WHIP routes on the provided Echo group. +// +// POST /whip/:id — start a publish session (SDP offer → answer) +// DELETE /whip/:id/:resource — tear down a publish session +// PATCH /whip/:id/:resource — trickle ICE candidates +// OPTIONS /whip/* — CORS preflight +func (h *WHIPHandler) Register(g *echo.Group) { + g.OPTIONS("/whip/:id", h.preflight) + g.OPTIONS("/whip/:id/:resource", h.preflight) + g.POST("/whip/:id", h.Publish) + g.DELETE("/whip/:id/:resource", h.Unpublish) + g.PATCH("/whip/:id/:resource", h.TrickleIngest) +} + +// Publish handles POST /whip/:id. +// +// The request body is an SDP offer (Content-Type: application/sdp). +// Response is the SDP answer; the Location header identifies the +// DELETE/PATCH resource for teardown and trickle ICE. +// +// The target process must have WHIPIngest.Enabled=true in its config, +// and an active ingest port pair must have been allocated by +// onWHIPProcessStart. +// +// @Summary Publish a WebRTC stream via WHIP +// @Description Start a WHIP ingest session. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; Location header points at DELETE/PATCH resource. +// @Tags v16.16.0 +// @ID webrtc-3-whip-publish +// @Accept application/sdp +// @Produce application/sdp +// @Param id path string true "Process ID with whip_ingest.enabled=true" +// @Success 201 {string} string "SDP answer" +// @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP" +// @Failure 404 {string} string "no ingest stream registered for this process id" +// @Failure 409 {string} string "a publisher is already active on this stream (single-publisher enforcement)" +// @Failure 503 {string} string "global publisher cap reached" +// @Failure 504 {string} string "ICE gathering timeout" +// @Security ApiKeyAuth +// @Router /api/v3/whip/{id} [post] +func (h *WHIPHandler) Publish(c echo.Context) error { + addCORS(c) + t0 := time.Now() + + id := c.Param("id") + if id == "" { + h.recordRequest("publish", "", http.StatusBadRequest, t0) + return c.String(http.StatusBadRequest, "missing stream id") + } + + // Global cap: cheap atomic check before real work. + if atomic.LoadInt64(&h.count) >= h.maxCapTotal { + h.recordRequest("publish", id, http.StatusServiceUnavailable, t0) + return c.String(http.StatusServiceUnavailable, "webrtc: whip: publisher cap reached") + } + + ingest, ok := h.sub.lookupIngest(id) + if !ok { + h.recordRequest("publish", id, http.StatusNotFound, t0) + return c.String(http.StatusNotFound, "webrtc: whip: no ingest registered for process") + } + + // Single-publisher enforcement: WHIP is point-to-point — + // only one active publisher per stream at a time. + h.mu.Lock() + if len(h.ingestByStream[id]) > 0 { + h.mu.Unlock() + h.recordRequest("publish", id, http.StatusConflict, t0) + return c.String(http.StatusConflict, "webrtc: whip: stream already has an active publisher") + } + h.mu.Unlock() + + body, err := io.ReadAll(c.Request().Body) + if err != nil { + h.recordRequest("publish", id, http.StatusBadRequest, t0) + return c.String(http.StatusBadRequest, "read body: "+err.Error()) + } + if len(body) == 0 || !strings.HasPrefix(string(body), "v=") { + h.recordRequest("publish", id, http.StatusBadRequest, t0) + return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error()) + } + + offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} + peer, err := h.sub.factory.CreateIngestPeer( + c.Request().Context(), + offer, + ingest.videoPort, + ingest.audioPort, + ) + if err != nil { + switch err { + case corewebrtc.ErrICETimeout: + h.recordRequest("publish", id, http.StatusGatewayTimeout, t0) + return c.String(http.StatusGatewayTimeout, err.Error()) + default: + h.recordRequest("publish", id, http.StatusInternalServerError, t0) + return c.String(http.StatusInternalServerError, "create ingest peer: "+err.Error()) + } + } + + rid := peer.ResourceID() + h.mu.Lock() + if h.ingestByStream[id] == nil { + h.ingestByStream[id] = make(map[string]*corewebrtc.IngestPeer) + } + h.ingestByStream[id][rid] = peer + h.ingestStream[rid] = id + h.mu.Unlock() + atomic.AddInt64(&h.count, 1) + + // Auto-cleanup on disconnect. + go h.awaitIngestClose(rid, peer) + + h.recordRequest("publish", id, http.StatusCreated, t0) + + c.Response().Header().Set("Content-Type", "application/sdp") + c.Response().Header().Set("Location", "/whip/"+id+"/"+rid) + c.Response().Header().Set("ETag", `"`+rid+`"`) + return c.String(http.StatusCreated, peer.Answer().SDP) +} + +// Unpublish handles DELETE /whip/:id/:resource. Returns 204 even when +// the resource is unknown (DELETE is idempotent, per the WHIP spec). +// +// @Summary Tear down a WHIP publish session +// @Tags v16.16.0 +// @ID webrtc-3-whip-unpublish +// @Param id path string true "Process ID" +// @Param resource path string true "Resource ID from the Publish Location header" +// @Success 204 "no content" +// @Failure 400 {string} string "missing resource id" +// @Security ApiKeyAuth +// @Router /api/v3/whip/{id}/{resource} [delete] +func (h *WHIPHandler) Unpublish(c echo.Context) error { + addCORS(c) + t0 := time.Now() + + resource := c.Param("resource") + if resource == "" { + h.recordRequest("unpublish", "", http.StatusBadRequest, t0) + return c.String(http.StatusBadRequest, "missing resource id") + } + + h.mu.Lock() + streamID := h.ingestStream[resource] + var peer *corewebrtc.IngestPeer + if streamID != "" { + peer = h.ingestByStream[streamID][resource] + delete(h.ingestByStream[streamID], resource) + if len(h.ingestByStream[streamID]) == 0 { + delete(h.ingestByStream, streamID) + } + delete(h.ingestStream, resource) + } + h.mu.Unlock() + + if peer != nil { + _ = peer.Close() + } + if streamID != "" { + atomic.AddInt64(&h.count, -1) + } + + h.recordRequest("unpublish", streamID, http.StatusNoContent, t0) + return c.NoContent(http.StatusNoContent) +} + +// TrickleIngest handles PATCH /whip/:id/:resource — adds ICE candidates +// from a trickle-ice-sdpfrag body. +// +// @Summary Trickle ICE candidates for a WHIP publish session +// @Tags v16.16.0 +// @ID webrtc-3-whip-trickle +// @Accept application/trickle-ice-sdpfrag +// @Param id path string true "Process ID" +// @Param resource path string true "Resource ID from the Publish Location header" +// @Success 204 "no content" +// @Failure 400 {string} string "missing resource id or unreadable body" +// @Failure 404 {string} string "peer not found" +// @Security ApiKeyAuth +// @Router /api/v3/whip/{id}/{resource} [patch] +func (h *WHIPHandler) TrickleIngest(c echo.Context) error { + addCORS(c) + t0 := time.Now() + + resource := c.Param("resource") + if resource == "" { + h.recordRequest("trickle", "", http.StatusBadRequest, t0) + return c.String(http.StatusBadRequest, "missing resource id") + } + + h.mu.Lock() + streamID := h.ingestStream[resource] + var peer *corewebrtc.IngestPeer + if streamID != "" { + peer = h.ingestByStream[streamID][resource] + } + h.mu.Unlock() + if peer == nil { + h.recordRequest("trickle", streamID, http.StatusNotFound, t0) + return c.NoContent(http.StatusNotFound) + } + + body, err := io.ReadAll(c.Request().Body) + if err != nil { + h.recordRequest("trickle", streamID, http.StatusBadRequest, t0) + return c.String(http.StatusBadRequest, "read body: "+err.Error()) + } + for _, line := range strings.Split(string(body), "\n") { + line = strings.TrimSpace(line) + if !strings.HasPrefix(line, "a=candidate:") { + continue + } + cand := strings.TrimPrefix(line, "a=") + _ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand}) + } + + h.recordRequest("trickle", streamID, http.StatusNoContent, t0) + return c.NoContent(http.StatusNoContent) +} + +// Close tears down every active ingest peer (e.g., during Core shutdown). +func (h *WHIPHandler) Close() { + h.mu.Lock() + peers := make([]*corewebrtc.IngestPeer, 0) + for _, m := range h.ingestByStream { + for _, p := range m { + peers = append(peers, p) + } + } + h.ingestByStream = make(map[string]map[string]*corewebrtc.IngestPeer) + h.ingestStream = make(map[string]string) + h.mu.Unlock() + + for _, p := range peers { + if p != nil { + _ = p.Close() + } + } + atomic.StoreInt64(&h.count, 0) +} + +// awaitIngestClose blocks on peer.Done() and yanks the index entry +// when the publisher disconnects. Idempotent with Unpublish. +func (h *WHIPHandler) awaitIngestClose(resource string, peer *corewebrtc.IngestPeer) { + <-peer.Done() + h.mu.Lock() + streamID := h.ingestStream[resource] + _, present := h.ingestStream[resource] + if present { + delete(h.ingestStream, resource) + if streamID != "" { + delete(h.ingestByStream[streamID], resource) + if len(h.ingestByStream[streamID]) == 0 { + delete(h.ingestByStream, streamID) + } + } + } + h.mu.Unlock() + if present { + atomic.AddInt64(&h.count, -1) + } +} + +// tearDownStreamIngests is called by the Subsystem's onWHIPProcessStop +// hook to close any active publisher when the FFmpeg process stops. +func (h *WHIPHandler) tearDownStreamIngests(streamID string) { + h.mu.Lock() + bucket := h.ingestByStream[streamID] + peers := make([]*corewebrtc.IngestPeer, 0, len(bucket)) + for _, p := range bucket { + peers = append(peers, p) + } + h.mu.Unlock() + + for _, p := range peers { + if p != nil { + _ = p.Close() + } + } +} + +// recordRequest logs request metrics. Currently a thin wrapper; WHIP +// metrics counters will be wired in alongside WHEP metrics in a follow-up. +func (h *WHIPHandler) recordRequest(route, streamID string, code int, t0 time.Time) { + // Placeholder — wire Prometheus metrics in a follow-up commit. + _ = fmt.Sprintf("%s %s %d %.3fs", route, streamID, code, time.Since(t0).Seconds()) +} + +// preflight answers CORS OPTIONS requests. +func (h *WHIPHandler) preflight(c echo.Context) error { + addCORS(c) + return c.NoContent(http.StatusNoContent) +}