2026-04-17 10:03:24 -04:00
package webrtc
import (
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"github.com/labstack/echo/v4"
"github.com/pion/webrtc/v4"
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
)
// Handler exposes the subsystem's WHEP Echo handlers. Wire them into
// the /api/v3 group (or a sibling group) via Handler.Register.
type Handler struct {
sub * Subsystem
peersMu sync . Mutex
peers map [ string ] * corewebrtc . Peer // resourceID -> peer
count int64 // atomic, for cap check without lock
maxCap int64
}
// NewHandler wraps the subsystem in an Echo-compatible HTTP handler.
// The maxPeers argument caps concurrent subscribers; pass 0 to use a
// generous default (matches corewebrtc.DefaultConfig).
func NewHandler ( s * Subsystem , maxPeers int ) * Handler {
cap := int64 ( maxPeers )
if cap <= 0 {
cap = int64 ( corewebrtc . DefaultConfig ( ) . MaxPeersTotal )
}
return & Handler {
sub : s ,
peers : make ( map [ string ] * corewebrtc . Peer ) ,
maxCap : cap ,
}
}
// Register mounts the WHEP routes on the provided Echo group. WHEP
// POST is /whep/:id, WHEP DELETE is /whep/:id/:resource.
//
// The routes are deliberately unauthenticated in M2 because WHEP
// clients (browsers, OBS) don't carry the Core JWT. M3 will add
// per-process signed-URL tokens; for M2 the deployment is expected
// to put the endpoint behind an authenticated reverse-proxy or VPN.
func ( h * Handler ) Register ( g * echo . Group ) {
g . POST ( "/whep/:id" , h . Subscribe )
g . DELETE ( "/whep/:id/:resource" , h . Unsubscribe )
}
// Subscribe handles POST /whep/:id. Request body is an SDP offer,
// response is an SDP answer with a Location header pointing at the
// DELETE resource.
2026-05-03 08:12:05 -04:00
//
// @Summary Subscribe to a WebRTC stream via WHEP
// @Description Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE resource for teardown.
// @Tags v16.16.0
// @ID webrtc-3-whep-subscribe
// @Accept application/sdp
// @Produce application/sdp
// @Param id path string true "Process ID with config.webrtc.enabled=true"
// @Success 201 {string} string "SDP answer"
// @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP"
// @Failure 404 {string} string "no stream registered for this process id"
// @Failure 503 {string} string "peer cap reached"
// @Security ApiKeyAuth
// @Router /api/v3/whep/{id} [post]
2026-04-17 10:03:24 -04:00
func ( h * Handler ) Subscribe ( c echo . Context ) error {
id := c . Param ( "id" )
if id == "" {
return c . String ( http . StatusBadRequest , "missing stream id" )
}
if atomic . LoadInt64 ( & h . count ) >= h . maxCap {
return c . String ( http . StatusServiceUnavailable , corewebrtc . ErrPeerCapReached . Error ( ) )
}
stream , ok := h . sub . lookup ( id )
if ! ok {
return c . String ( http . StatusNotFound , corewebrtc . ErrStreamNotFound . Error ( ) )
}
body , err := io . ReadAll ( c . Request ( ) . Body )
if err != nil {
return c . String ( http . StatusBadRequest , "read body: " + err . Error ( ) )
}
if len ( body ) == 0 || ! strings . HasPrefix ( string ( body ) , "v=" ) {
return c . String ( http . StatusBadRequest , corewebrtc . ErrInvalidSDP . Error ( ) )
}
offer := webrtc . SessionDescription { Type : webrtc . SDPTypeOffer , SDP : string ( body ) }
peer , err := h . sub . factory . CreatePeerFromSources ( c . Request ( ) . Context ( ) , stream . video , stream . audio , offer )
if err != nil {
return c . String ( http . StatusInternalServerError , "create peer: " + err . Error ( ) )
}
h . peersMu . Lock ( )
h . peers [ peer . ResourceID ( ) ] = peer
h . peersMu . Unlock ( )
atomic . AddInt64 ( & h . count , 1 )
c . Response ( ) . Header ( ) . Set ( "Content-Type" , "application/sdp" )
c . Response ( ) . Header ( ) . Set ( "Location" , "/whep/" + id + "/" + peer . ResourceID ( ) )
return c . String ( http . StatusCreated , peer . Answer ( ) . SDP )
}
// Unsubscribe handles DELETE /whep/:id/:resource. The :id is part of
// the path per WHEP spec but we only need :resource to locate the
// peer; :id is accepted for route symmetry.
2026-05-03 08:12:05 -04:00
//
// @Summary Tear down a WHEP subscription
// @Description Tear down a WebRTC peer connection by its resource id (returned in the Location header by Subscribe). Idempotent: returns 204 even when the resource is unknown, per the WHEP spec.
// @Tags v16.16.0
// @ID webrtc-3-whep-unsubscribe
// @Param id path string true "Process ID"
// @Param resource path string true "Resource ID from the Subscribe Location header"
// @Success 204 "no content"
// @Failure 400 {string} string "missing resource id"
// @Security ApiKeyAuth
// @Router /api/v3/whep/{id}/{resource} [delete]
2026-04-17 10:03:24 -04:00
func ( h * Handler ) Unsubscribe ( c echo . Context ) error {
resource := c . Param ( "resource" )
if resource == "" {
return c . String ( http . StatusBadRequest , "missing resource id" )
}
h . peersMu . Lock ( )
peer , ok := h . peers [ resource ]
if ok {
delete ( h . peers , resource )
}
h . peersMu . Unlock ( )
if ! ok {
return c . NoContent ( http . StatusNotFound )
}
_ = peer . Close ( )
atomic . AddInt64 ( & h . count , - 1 )
return c . NoContent ( http . StatusNoContent )
}
// Close tears down every active peer (e.g., during Core shutdown).
func ( h * Handler ) Close ( ) {
h . peersMu . Lock ( )
peers := make ( [ ] * corewebrtc . Peer , 0 , len ( h . peers ) )
for _ , p := range h . peers {
peers = append ( peers , p )
}
h . peers = make ( map [ string ] * corewebrtc . Peer )
h . peersMu . Unlock ( )
for _ , p := range peers {
_ = p . Close ( )
}
atomic . StoreInt64 ( & h . count , 0 )
}