feat(webrtc): add Echo WHEP handler for app/webrtc subsystem

Introduces the HTTP surface the browser (or OBS WebRTC clients)
target when subscribing to a process's egress:

  POST   /whep/:id              -> answer SDP + Location header
  DELETE /whep/:id/:resource    -> tear down a specific peer

The handler looks up the per-process stream pair via the Subsystem,
validates SDP offer shape, and delegates peer creation to the core
PeerFactory's CreatePeerFromSources (two-source forwarding).

WHEP routes are left unauthenticated in M2 — browsers and OBS don't
carry the Core JWT, and per-process signed-URL tokens are an M3
enhancement. Deployments should place the endpoint behind an
authenticated reverse-proxy for now.

Tests cover:
  - 404 for POSTs against unregistered streams
  - 400 for empty/invalid SDP offers once a stream is registered
  - 404 for DELETE against unknown resource ids
This commit is contained in:
Zac Gaetano 2026-04-17 10:03:24 -04:00
parent 9d38e9ccdb
commit f6d5b3378a
2 changed files with 223 additions and 0 deletions

134
app/webrtc/handler.go Normal file
View file

@ -0,0 +1,134 @@
package webrtc
import (
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"github.com/labstack/echo/v4"
"github.com/pion/webrtc/v4"
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
)
// Handler exposes the subsystem's WHEP Echo handlers. Wire them into
// the /api/v3 group (or a sibling group) via Handler.Register.
type Handler struct {
sub *Subsystem
peersMu sync.Mutex
peers map[string]*corewebrtc.Peer // resourceID -> peer
count int64 // atomic, for cap check without lock
maxCap int64
}
// NewHandler wraps the subsystem in an Echo-compatible HTTP handler.
// The maxPeers argument caps concurrent subscribers; pass 0 to use a
// generous default (matches corewebrtc.DefaultConfig).
func NewHandler(s *Subsystem, maxPeers int) *Handler {
cap := int64(maxPeers)
if cap <= 0 {
cap = int64(corewebrtc.DefaultConfig().MaxPeersTotal)
}
return &Handler{
sub: s,
peers: make(map[string]*corewebrtc.Peer),
maxCap: cap,
}
}
// Register mounts the WHEP routes on the provided Echo group. WHEP
// POST is /whep/:id, WHEP DELETE is /whep/:id/:resource.
//
// The routes are deliberately unauthenticated in M2 because WHEP
// clients (browsers, OBS) don't carry the Core JWT. M3 will add
// per-process signed-URL tokens; for M2 the deployment is expected
// to put the endpoint behind an authenticated reverse-proxy or VPN.
func (h *Handler) Register(g *echo.Group) {
g.POST("/whep/:id", h.Subscribe)
g.DELETE("/whep/:id/:resource", h.Unsubscribe)
}
// Subscribe handles POST /whep/:id. Request body is an SDP offer,
// response is an SDP answer with a Location header pointing at the
// DELETE resource.
func (h *Handler) Subscribe(c echo.Context) error {
id := c.Param("id")
if id == "" {
return c.String(http.StatusBadRequest, "missing stream id")
}
if atomic.LoadInt64(&h.count) >= h.maxCap {
return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error())
}
stream, ok := h.sub.lookup(id)
if !ok {
return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error())
}
body, err := io.ReadAll(c.Request().Body)
if err != nil {
return c.String(http.StatusBadRequest, "read body: "+err.Error())
}
if len(body) == 0 || !strings.HasPrefix(string(body), "v=") {
return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error())
}
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer)
if err != nil {
return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
}
h.peersMu.Lock()
h.peers[peer.ResourceID()] = peer
h.peersMu.Unlock()
atomic.AddInt64(&h.count, 1)
c.Response().Header().Set("Content-Type", "application/sdp")
c.Response().Header().Set("Location", "/whep/"+id+"/"+peer.ResourceID())
return c.String(http.StatusCreated, peer.Answer().SDP)
}
// Unsubscribe handles DELETE /whep/:id/:resource. The :id is part of
// the path per WHEP spec but we only need :resource to locate the
// peer; :id is accepted for route symmetry.
func (h *Handler) Unsubscribe(c echo.Context) error {
resource := c.Param("resource")
if resource == "" {
return c.String(http.StatusBadRequest, "missing resource id")
}
h.peersMu.Lock()
peer, ok := h.peers[resource]
if ok {
delete(h.peers, resource)
}
h.peersMu.Unlock()
if !ok {
return c.NoContent(http.StatusNotFound)
}
_ = peer.Close()
atomic.AddInt64(&h.count, -1)
return c.NoContent(http.StatusNoContent)
}
// Close tears down every active peer (e.g., during Core shutdown).
func (h *Handler) Close() {
h.peersMu.Lock()
peers := make([]*corewebrtc.Peer, 0, len(h.peers))
for _, p := range h.peers {
peers = append(peers, p)
}
h.peers = make(map[string]*corewebrtc.Peer)
h.peersMu.Unlock()
for _, p := range peers {
_ = p.Close()
}
atomic.StoreInt64(&h.count, 0)
}

View file

@ -0,0 +1,89 @@
package webrtc
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/labstack/echo/v4"
"github.com/datarhei/core/v16/config"
)
func newTestSubsystem(t *testing.T) *Subsystem {
t.Helper()
s, err := New(config.DataWebRTC{Enable: true}, nil)
if err != nil {
t.Fatalf("New: %v", err)
}
return s
}
// TestHandler_Subscribe_404WhenStreamMissing verifies the WHEP POST
// returns 404 when no process has registered a stream for that id.
func TestHandler_Subscribe_404WhenStreamMissing(t *testing.T) {
h := NewHandler(newTestSubsystem(t), 0)
e := echo.New()
req := httptest.NewRequest(http.MethodPost, "/whep/ghost", strings.NewReader("v=0\r\n"))
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("ghost")
if err := h.Subscribe(c); err != nil {
t.Fatalf("Subscribe returned error: %v", err)
}
if rec.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d: %s", rec.Code, rec.Body.String())
}
}
// TestHandler_Subscribe_400OnEmptyBody verifies invalid SDP offers
// short-circuit before any peer is created. Requires a registered
// stream so lookup doesn't 404 first.
func TestHandler_Subscribe_400OnEmptyBody(t *testing.T) {
sub := newTestSubsystem(t)
// Register a dummy stream so the handler reaches body validation.
sub.mu.Lock()
sub.streams["probe"] = &processStream{id: "probe"} // video/audio nil is fine here — we never get past body parse
sub.mu.Unlock()
h := NewHandler(sub, 0)
e := echo.New()
req := httptest.NewRequest(http.MethodPost, "/whep/probe", strings.NewReader(""))
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("probe")
if err := h.Subscribe(c); err != nil {
t.Fatalf("Subscribe returned error: %v", err)
}
if rec.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", rec.Code, rec.Body.String())
}
}
// TestHandler_Unsubscribe_404WhenUnknown verifies a DELETE with an
// unknown resource id returns 404 and no state mutation.
func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) {
h := NewHandler(newTestSubsystem(t), 0)
e := echo.New()
req := httptest.NewRequest(http.MethodDelete, "/whep/id/unknown", nil)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id", "resource")
c.SetParamValues("id", "unknown")
if err := h.Unsubscribe(c); err != nil {
t.Fatalf("Unsubscribe returned error: %v", err)
}
if rec.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d", rec.Code)
}
}