2026-05-09 16:26:42 -04:00
package webrtc
import (
"fmt"
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/labstack/echo/v4"
"github.com/pion/webrtc/v4"
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
)
// WHIPHandler exposes the subsystem's WHIP Echo handlers. Wire them
// into the /api/v3 group alongside the WHEP Handler via
// WHIPHandler.Register.
//
// Lifecycle: ingest peers are tracked in a streamID→resourceID→IngestPeer
// index. On every Publish a goroutine watches the peer's Done() channel;
// when the publisher disconnects or Close() runs the entry is removed
// and the counters tick back down — no leaks if OBS rage-quits.
type WHIPHandler struct {
sub * Subsystem
mu sync . Mutex
ingestByStream map [ string ] map [ string ] * corewebrtc . IngestPeer // streamID -> resource -> peer
ingestStream map [ string ] string // resource -> streamID (reverse index)
count int64 // atomic; concurrent publishers
maxCapTotal int64
2026-05-10 21:08:03 -04:00
met * webrtcMetrics // nil until SetMetrics is called
2026-05-09 16:26:42 -04:00
}
// NewWHIPHandler wraps the subsystem in an Echo-compatible WHIP handler.
// maxPublishers caps concurrent ingest sessions across all streams;
// pass 0 to default to 64.
2026-05-09 16:38:29 -04:00
//
// The constructor registers a teardown hook on the Subsystem so that
// when a process stops, any active WHIP publisher is closed automatically
// (mirroring the pattern used by the WHEP NewHandler).
2026-05-09 16:26:42 -04:00
func NewWHIPHandler ( s * Subsystem , maxPublishers int ) * WHIPHandler {
total := int64 ( maxPublishers )
if total <= 0 {
total = 64
}
h := & WHIPHandler {
sub : s ,
ingestByStream : make ( map [ string ] map [ string ] * corewebrtc . IngestPeer ) ,
ingestStream : make ( map [ string ] string ) ,
maxCapTotal : total ,
}
2026-05-09 16:38:29 -04:00
// Wire the WHIP teardown hook so onWHIPProcessStop notifies us
// before releasing the port allocation — same pattern as WHEP's
// NewHandler → s.SetTeardownHook(h.tearDownStreamPeers).
if s != nil {
s . SetWHIPTeardownHook ( h . tearDownStreamIngests )
}
2026-05-09 16:26:42 -04:00
return h
}
// Register mounts the WHIP routes on the provided Echo group.
//
// POST /whip/:id — start a publish session (SDP offer → answer)
// DELETE /whip/:id/:resource — tear down a publish session
// PATCH /whip/:id/:resource — trickle ICE candidates
// OPTIONS /whip/* — CORS preflight
func ( h * WHIPHandler ) Register ( g * echo . Group ) {
g . OPTIONS ( "/whip/:id" , h . preflight )
g . OPTIONS ( "/whip/:id/:resource" , h . preflight )
g . POST ( "/whip/:id" , h . Publish )
g . DELETE ( "/whip/:id/:resource" , h . Unpublish )
g . PATCH ( "/whip/:id/:resource" , h . TrickleIngest )
}
// Publish handles POST /whip/:id.
//
// The request body is an SDP offer (Content-Type: application/sdp).
// Response is the SDP answer; the Location header identifies the
// DELETE/PATCH resource for teardown and trickle ICE.
//
// The target process must have WHIPIngest.Enabled=true in its config,
// and an active ingest port pair must have been allocated by
// onWHIPProcessStart.
//
// @Summary Publish a WebRTC stream via WHIP
// @Description Start a WHIP ingest session. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; Location header points at DELETE/PATCH resource.
// @Tags v16.16.0
// @ID webrtc-3-whip-publish
// @Accept application/sdp
// @Produce application/sdp
// @Param id path string true "Process ID with whip_ingest.enabled=true"
// @Success 201 {string} string "SDP answer"
// @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP"
// @Failure 404 {string} string "no ingest stream registered for this process id"
// @Failure 409 {string} string "a publisher is already active on this stream (single-publisher enforcement)"
// @Failure 503 {string} string "global publisher cap reached"
// @Failure 504 {string} string "ICE gathering timeout"
// @Security ApiKeyAuth
// @Router /api/v3/whip/{id} [post]
func ( h * WHIPHandler ) Publish ( c echo . Context ) error {
addCORS ( c )
t0 := time . Now ( )
id := c . Param ( "id" )
if id == "" {
h . recordRequest ( "publish" , "" , http . StatusBadRequest , t0 )
return c . String ( http . StatusBadRequest , "missing stream id" )
}
// Global cap: cheap atomic check before real work.
if atomic . LoadInt64 ( & h . count ) >= h . maxCapTotal {
2026-05-10 21:08:03 -04:00
if h . met != nil {
h . met . whipCapRejections . WithLabelValues ( id , "global" ) . Inc ( )
}
2026-05-09 16:26:42 -04:00
h . recordRequest ( "publish" , id , http . StatusServiceUnavailable , t0 )
return c . String ( http . StatusServiceUnavailable , "webrtc: whip: publisher cap reached" )
}
ingest , ok := h . sub . lookupIngest ( id )
if ! ok {
h . recordRequest ( "publish" , id , http . StatusNotFound , t0 )
return c . String ( http . StatusNotFound , "webrtc: whip: no ingest registered for process" )
}
// Single-publisher enforcement: WHIP is point-to-point —
// only one active publisher per stream at a time.
h . mu . Lock ( )
if len ( h . ingestByStream [ id ] ) > 0 {
h . mu . Unlock ( )
2026-05-10 21:08:03 -04:00
if h . met != nil {
h . met . whipCapRejections . WithLabelValues ( id , "conflict" ) . Inc ( )
}
2026-05-09 16:26:42 -04:00
h . recordRequest ( "publish" , id , http . StatusConflict , t0 )
return c . String ( http . StatusConflict , "webrtc: whip: stream already has an active publisher" )
}
h . mu . Unlock ( )
body , err := io . ReadAll ( c . Request ( ) . Body )
if err != nil {
h . recordRequest ( "publish" , id , http . StatusBadRequest , t0 )
return c . String ( http . StatusBadRequest , "read body: " + err . Error ( ) )
}
if len ( body ) == 0 || ! strings . HasPrefix ( string ( body ) , "v=" ) {
h . recordRequest ( "publish" , id , http . StatusBadRequest , t0 )
return c . String ( http . StatusBadRequest , corewebrtc . ErrInvalidSDP . Error ( ) )
}
offer := webrtc . SessionDescription { Type : webrtc . SDPTypeOffer , SDP : string ( body ) }
peer , err := h . sub . factory . CreateIngestPeer (
c . Request ( ) . Context ( ) ,
offer ,
ingest . videoPort ,
ingest . audioPort ,
)
if err != nil {
switch err {
case corewebrtc . ErrICETimeout :
h . recordRequest ( "publish" , id , http . StatusGatewayTimeout , t0 )
return c . String ( http . StatusGatewayTimeout , err . Error ( ) )
default :
h . recordRequest ( "publish" , id , http . StatusInternalServerError , t0 )
return c . String ( http . StatusInternalServerError , "create ingest peer: " + err . Error ( ) )
}
}
rid := peer . ResourceID ( )
h . mu . Lock ( )
if h . ingestByStream [ id ] == nil {
h . ingestByStream [ id ] = make ( map [ string ] * corewebrtc . IngestPeer )
}
h . ingestByStream [ id ] [ rid ] = peer
h . ingestStream [ rid ] = id
h . mu . Unlock ( )
atomic . AddInt64 ( & h . count , 1 )
// Auto-cleanup on disconnect.
go h . awaitIngestClose ( rid , peer )
2026-05-10 21:08:03 -04:00
// Track ICE establishment duration using the shared ICE histograms
// (same metric family as WHEP egress, disambiguated by result label).
go h . trackICE ( id , peer , time . Now ( ) )
2026-05-09 16:26:42 -04:00
h . recordRequest ( "publish" , id , http . StatusCreated , t0 )
2026-05-10 20:34:47 -04:00
// RFC 9261 §5.2: emit one Link header per configured ICE server so
// that the publisher (OBS, browser, GStreamer, etc.) can discover
// STUN/TURN without a separate signalling round-trip — symmetric
// with the WHEP Subscribe Link header added in issue #19.
for _ , uri := range h . sub . ICEServerURIs ( ) {
c . Response ( ) . Header ( ) . Add ( "Link" , "<" + uri + ">; rel=\"ice-server\"" )
}
2026-05-09 16:26:42 -04:00
c . Response ( ) . Header ( ) . Set ( "Content-Type" , "application/sdp" )
c . Response ( ) . Header ( ) . Set ( "Location" , "/whip/" + id + "/" + rid )
c . Response ( ) . Header ( ) . Set ( "ETag" , ` " ` + rid + ` " ` )
return c . String ( http . StatusCreated , peer . Answer ( ) . SDP )
}
// Unpublish handles DELETE /whip/:id/:resource. Returns 204 even when
// the resource is unknown (DELETE is idempotent, per the WHIP spec).
//
// @Summary Tear down a WHIP publish session
// @Tags v16.16.0
// @ID webrtc-3-whip-unpublish
// @Param id path string true "Process ID"
// @Param resource path string true "Resource ID from the Publish Location header"
// @Success 204 "no content"
// @Failure 400 {string} string "missing resource id"
// @Security ApiKeyAuth
// @Router /api/v3/whip/{id}/{resource} [delete]
func ( h * WHIPHandler ) Unpublish ( c echo . Context ) error {
addCORS ( c )
t0 := time . Now ( )
resource := c . Param ( "resource" )
if resource == "" {
h . recordRequest ( "unpublish" , "" , http . StatusBadRequest , t0 )
return c . String ( http . StatusBadRequest , "missing resource id" )
}
h . mu . Lock ( )
streamID := h . ingestStream [ resource ]
var peer * corewebrtc . IngestPeer
if streamID != "" {
peer = h . ingestByStream [ streamID ] [ resource ]
delete ( h . ingestByStream [ streamID ] , resource )
if len ( h . ingestByStream [ streamID ] ) == 0 {
delete ( h . ingestByStream , streamID )
}
delete ( h . ingestStream , resource )
}
h . mu . Unlock ( )
if peer != nil {
_ = peer . Close ( )
}
if streamID != "" {
atomic . AddInt64 ( & h . count , - 1 )
}
h . recordRequest ( "unpublish" , streamID , http . StatusNoContent , t0 )
return c . NoContent ( http . StatusNoContent )
}
// TrickleIngest handles PATCH /whip/:id/:resource — adds ICE candidates
// from a trickle-ice-sdpfrag body.
//
// @Summary Trickle ICE candidates for a WHIP publish session
// @Tags v16.16.0
// @ID webrtc-3-whip-trickle
// @Accept application/trickle-ice-sdpfrag
// @Param id path string true "Process ID"
// @Param resource path string true "Resource ID from the Publish Location header"
// @Success 204 "no content"
// @Failure 400 {string} string "missing resource id or unreadable body"
// @Failure 404 {string} string "peer not found"
// @Security ApiKeyAuth
// @Router /api/v3/whip/{id}/{resource} [patch]
func ( h * WHIPHandler ) TrickleIngest ( c echo . Context ) error {
addCORS ( c )
t0 := time . Now ( )
resource := c . Param ( "resource" )
if resource == "" {
h . recordRequest ( "trickle" , "" , http . StatusBadRequest , t0 )
return c . String ( http . StatusBadRequest , "missing resource id" )
}
h . mu . Lock ( )
streamID := h . ingestStream [ resource ]
var peer * corewebrtc . IngestPeer
if streamID != "" {
peer = h . ingestByStream [ streamID ] [ resource ]
}
h . mu . Unlock ( )
if peer == nil {
h . recordRequest ( "trickle" , streamID , http . StatusNotFound , t0 )
return c . NoContent ( http . StatusNotFound )
}
body , err := io . ReadAll ( c . Request ( ) . Body )
if err != nil {
h . recordRequest ( "trickle" , streamID , http . StatusBadRequest , t0 )
return c . String ( http . StatusBadRequest , "read body: " + err . Error ( ) )
}
for _ , line := range strings . Split ( string ( body ) , "\n" ) {
line = strings . TrimSpace ( line )
if ! strings . HasPrefix ( line , "a=candidate:" ) {
continue
}
cand := strings . TrimPrefix ( line , "a=" )
_ = peer . AddICECandidate ( webrtc . ICECandidateInit { Candidate : cand } )
}
h . recordRequest ( "trickle" , streamID , http . StatusNoContent , t0 )
return c . NoContent ( http . StatusNoContent )
}
// Close tears down every active ingest peer (e.g., during Core shutdown).
func ( h * WHIPHandler ) Close ( ) {
h . mu . Lock ( )
peers := make ( [ ] * corewebrtc . IngestPeer , 0 )
for _ , m := range h . ingestByStream {
for _ , p := range m {
peers = append ( peers , p )
}
}
h . ingestByStream = make ( map [ string ] map [ string ] * corewebrtc . IngestPeer )
h . ingestStream = make ( map [ string ] string )
h . mu . Unlock ( )
for _ , p := range peers {
if p != nil {
_ = p . Close ( )
}
}
atomic . StoreInt64 ( & h . count , 0 )
}
// awaitIngestClose blocks on peer.Done() and yanks the index entry
// when the publisher disconnects. Idempotent with Unpublish.
func ( h * WHIPHandler ) awaitIngestClose ( resource string , peer * corewebrtc . IngestPeer ) {
<- peer . Done ( )
h . mu . Lock ( )
streamID := h . ingestStream [ resource ]
_ , present := h . ingestStream [ resource ]
if present {
delete ( h . ingestStream , resource )
if streamID != "" {
delete ( h . ingestByStream [ streamID ] , resource )
if len ( h . ingestByStream [ streamID ] ) == 0 {
delete ( h . ingestByStream , streamID )
}
}
}
h . mu . Unlock ( )
if present {
atomic . AddInt64 ( & h . count , - 1 )
}
}
2026-05-09 16:38:29 -04:00
// tearDownStreamIngests is called by the Subsystem's SetWHIPTeardownHook
// to close any active publisher when the FFmpeg process stops.
2026-05-09 16:26:42 -04:00
func ( h * WHIPHandler ) tearDownStreamIngests ( streamID string ) {
h . mu . Lock ( )
bucket := h . ingestByStream [ streamID ]
peers := make ( [ ] * corewebrtc . IngestPeer , 0 , len ( bucket ) )
for _ , p := range bucket {
peers = append ( peers , p )
}
h . mu . Unlock ( )
for _ , p := range peers {
if p != nil {
_ = p . Close ( )
}
}
}
2026-05-10 21:08:03 -04:00
// recordRequest logs request metrics to the shared Prometheus metrics
// instance. No-ops if SetMetrics has not been called.
2026-05-09 16:26:42 -04:00
func ( h * WHIPHandler ) recordRequest ( route , streamID string , code int , t0 time . Time ) {
2026-05-10 21:08:03 -04:00
if h . met == nil {
return
}
codeStr := fmt . Sprintf ( "%d" , code )
h . met . whipRequests . WithLabelValues ( route , codeStr , streamID ) . Inc ( )
h . met . whipRequestDuration . WithLabelValues ( route , streamID ) . Observe ( time . Since ( t0 ) . Seconds ( ) )
2026-05-09 16:26:42 -04:00
}
// preflight answers CORS OPTIONS requests.
func ( h * WHIPHandler ) preflight ( c echo . Context ) error {
addCORS ( c )
return c . NoContent ( http . StatusNoContent )
}