feat(app/webrtc): M3 robustness — error matrix, per-stream index, PATCH, CORS
Major Handler rewrite implementing the design's M3 acceptance
criteria ('5 concurrent viewers, all error paths correct, clean
teardown'):
Multi-viewer correctness:
- streamID -> resourceID -> Peer two-level index (was flat)
- per-stream peer cap alongside total cap, defaults match the
design's '5–8 viewer' target (8/stream, total from corewebrtc)
- per-peer awaitPeerClose goroutine watches Peer.Done() so ICE
failures yank the index entry + decrement the counter (no leaks)
- tearDownStreamPeers callback (registered with Subsystem in
NewHandler) drives all peer closes when the source process stops
Error matrix from design §6:
- 406 on codec mismatch (offer missing H264 or Opus rtpmap)
- 504 on ICE gathering timeout (passthrough from CreatePeerFromSources)
- 204 on DELETE unknown resource (idempotent per WHEP spec; was 404)
- 503 on per-stream cap reached (separate body from total-cap 503)
- 400 on missing/empty body (unchanged)
- 404 on unknown stream (unchanged)
WHEP spec compatibility:
- PATCH /whep/:id/:resource for trickle-ICE
- OPTIONS preflight on every WHEP path
- CORS Allow-Origin/Methods/Headers + Expose-Headers (Location, ETag)
- ETag header on Subscribe response
Defensive nil-peer guards in tearDown / Close paths so a partial
state doesn't panic.
Refactor: 134 -> 341 lines on handler.go but the surface is the
same (NewHandler/Register/Subscribe/Unsubscribe/Close); existing
callers continue to work. Pre-M3 test 'Unsubscribe_404WhenUnknown'
renamed and updated to the new 204 expectation.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
3abd4d8fd1
commit
4d2f11d836
2 changed files with 266 additions and 51 deletions
|
|
@ -13,54 +13,91 @@ import (
|
|||
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
||||
)
|
||||
|
||||
// Default per-stream peer cap when the caller passes 0. The total cap
|
||||
// (passed to NewHandler) is enforced separately and takes precedence.
|
||||
const defaultMaxPeersPerStream = 8
|
||||
|
||||
// Handler exposes the subsystem's WHEP Echo handlers. Wire them into
|
||||
// the /api/v3 group (or a sibling group) via Handler.Register.
|
||||
//
|
||||
// Lifecycle: peers are tracked in a streamID→resourceID→Peer index.
|
||||
// On every Subscribe we spin a tiny goroutine watching the new peer's
|
||||
// Done() channel; when ICE fails or Close() runs the index entry is
|
||||
// removed and the counters tick back down — no leaks if the browser
|
||||
// rage-quits.
|
||||
type Handler struct {
|
||||
sub *Subsystem
|
||||
|
||||
peersMu sync.Mutex
|
||||
peers map[string]*corewebrtc.Peer // resourceID -> peer
|
||||
count int64 // atomic, for cap check without lock
|
||||
maxCap int64
|
||||
mu sync.Mutex
|
||||
peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer
|
||||
peerStream map[string]string // resource -> streamID (reverse index)
|
||||
count int64 // atomic
|
||||
maxCapTotal int64
|
||||
maxCapPerStrm int64
|
||||
}
|
||||
|
||||
// NewHandler wraps the subsystem in an Echo-compatible HTTP handler.
|
||||
// The maxPeers argument caps concurrent subscribers; pass 0 to use a
|
||||
// generous default (matches corewebrtc.DefaultConfig).
|
||||
// The maxPeers argument caps concurrent subscribers across all streams;
|
||||
// pass 0 to use a generous default (matches corewebrtc.DefaultConfig).
|
||||
// The per-stream cap is taken from the corewebrtc default; pass a
|
||||
// non-zero value to override via NewHandlerWithCaps.
|
||||
func NewHandler(s *Subsystem, maxPeers int) *Handler {
|
||||
cap := int64(maxPeers)
|
||||
if cap <= 0 {
|
||||
cap = int64(corewebrtc.DefaultConfig().MaxPeersTotal)
|
||||
}
|
||||
return &Handler{
|
||||
sub: s,
|
||||
peers: make(map[string]*corewebrtc.Peer),
|
||||
maxCap: cap,
|
||||
}
|
||||
return NewHandlerWithCaps(s, maxPeers, 0)
|
||||
}
|
||||
|
||||
// Register mounts the WHEP routes on the provided Echo group. WHEP
|
||||
// POST is /whep/:id, WHEP DELETE is /whep/:id/:resource.
|
||||
// NewHandlerWithCaps is NewHandler plus an explicit per-stream cap.
|
||||
// maxPeersPerStream <= 0 falls back to defaultMaxPeersPerStream.
|
||||
func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler {
|
||||
total := int64(maxPeers)
|
||||
if total <= 0 {
|
||||
total = int64(corewebrtc.DefaultConfig().MaxPeersTotal)
|
||||
}
|
||||
perStream := int64(maxPeersPerStream)
|
||||
if perStream <= 0 {
|
||||
perStream = defaultMaxPeersPerStream
|
||||
}
|
||||
h := &Handler{
|
||||
sub: s,
|
||||
peersByStream: make(map[string]map[string]*corewebrtc.Peer),
|
||||
peerStream: make(map[string]string),
|
||||
maxCapTotal: total,
|
||||
maxCapPerStrm: perStream,
|
||||
}
|
||||
// Subsystem broadcasts process-stop via this hook so the handler
|
||||
// can yank stale peer entries before their Sources close out
|
||||
// from underneath them.
|
||||
if s != nil {
|
||||
s.SetTeardownHook(h.tearDownStreamPeers)
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
// Register mounts the WHEP routes on the provided Echo group.
|
||||
//
|
||||
// The routes are deliberately unauthenticated in M2 because WHEP
|
||||
// clients (browsers, OBS) don't carry the Core JWT. M3 will add
|
||||
// per-process signed-URL tokens; for M2 the deployment is expected
|
||||
// to put the endpoint behind an authenticated reverse-proxy or VPN.
|
||||
// CORS preflights are answered on every WHEP path; regular WHEP
|
||||
// responses also carry the Access-Control-* headers so browser-side
|
||||
// players living on a different origin can subscribe.
|
||||
func (h *Handler) Register(g *echo.Group) {
|
||||
g.OPTIONS("/whep/:id", h.preflight)
|
||||
g.OPTIONS("/whep/:id/:resource", h.preflight)
|
||||
g.POST("/whep/:id", h.Subscribe)
|
||||
g.DELETE("/whep/:id/:resource", h.Unsubscribe)
|
||||
g.PATCH("/whep/:id/:resource", h.Trickle)
|
||||
}
|
||||
|
||||
// Subscribe handles POST /whep/:id. Request body is an SDP offer,
|
||||
// response is an SDP answer with a Location header pointing at the
|
||||
// DELETE resource.
|
||||
// DELETE/PATCH resource.
|
||||
func (h *Handler) Subscribe(c echo.Context) error {
|
||||
addCORS(c)
|
||||
|
||||
id := c.Param("id")
|
||||
if id == "" {
|
||||
return c.String(http.StatusBadRequest, "missing stream id")
|
||||
}
|
||||
|
||||
if atomic.LoadInt64(&h.count) >= h.maxCap {
|
||||
// Total cap: cheap atomic check before doing real work.
|
||||
if atomic.LoadInt64(&h.count) >= h.maxCapTotal {
|
||||
return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error())
|
||||
}
|
||||
|
||||
|
|
@ -69,6 +106,14 @@ func (h *Handler) Subscribe(c echo.Context) error {
|
|||
return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error())
|
||||
}
|
||||
|
||||
// Per-stream cap: needs the lock since we're indexing per stream.
|
||||
h.mu.Lock()
|
||||
if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm {
|
||||
h.mu.Unlock()
|
||||
return c.String(http.StatusServiceUnavailable, "webrtc: per-stream peer cap reached")
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
body, err := io.ReadAll(c.Request().Body)
|
||||
if err != nil {
|
||||
return c.String(http.StatusBadRequest, "read body: "+err.Error())
|
||||
|
|
@ -76,59 +121,227 @@ func (h *Handler) Subscribe(c echo.Context) error {
|
|||
if len(body) == 0 || !strings.HasPrefix(string(body), "v=") {
|
||||
return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error())
|
||||
}
|
||||
if err := requireH264AndOpus(string(body)); err != nil {
|
||||
return c.String(http.StatusNotAcceptable, err.Error())
|
||||
}
|
||||
|
||||
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
|
||||
peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer)
|
||||
if err != nil {
|
||||
return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
|
||||
// Surface the design's error matrix.
|
||||
switch err {
|
||||
case corewebrtc.ErrICETimeout:
|
||||
return c.String(http.StatusGatewayTimeout, err.Error())
|
||||
case corewebrtc.ErrCodecMismatch:
|
||||
return c.String(http.StatusNotAcceptable, err.Error())
|
||||
default:
|
||||
return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
h.peersMu.Lock()
|
||||
h.peers[peer.ResourceID()] = peer
|
||||
h.peersMu.Unlock()
|
||||
rid := peer.ResourceID()
|
||||
h.mu.Lock()
|
||||
if h.peersByStream[id] == nil {
|
||||
h.peersByStream[id] = make(map[string]*corewebrtc.Peer)
|
||||
}
|
||||
h.peersByStream[id][rid] = peer
|
||||
h.peerStream[rid] = id
|
||||
h.mu.Unlock()
|
||||
atomic.AddInt64(&h.count, 1)
|
||||
|
||||
// Auto-cleanup: when Pion's OnConnectionStateChange triggers
|
||||
// peer.Close() (ICE failed/disconnected), the Done channel
|
||||
// closes and we yank the index entry. Without this the map
|
||||
// leaks for the lifetime of the handler.
|
||||
go h.awaitPeerClose(rid, peer)
|
||||
|
||||
c.Response().Header().Set("Content-Type", "application/sdp")
|
||||
c.Response().Header().Set("Location", "/whep/"+id+"/"+peer.ResourceID())
|
||||
c.Response().Header().Set("Location", "/whep/"+id+"/"+rid)
|
||||
c.Response().Header().Set("ETag", `"`+rid+`"`)
|
||||
return c.String(http.StatusCreated, peer.Answer().SDP)
|
||||
}
|
||||
|
||||
// Unsubscribe handles DELETE /whep/:id/:resource. The :id is part of
|
||||
// the path per WHEP spec but we only need :resource to locate the
|
||||
// peer; :id is accepted for route symmetry.
|
||||
// Unsubscribe handles DELETE /whep/:id/:resource. Per WHEP spec we
|
||||
// return 204 even when the resource is unknown — DELETE is idempotent
|
||||
// and a re-issued tear-down should never error out.
|
||||
func (h *Handler) Unsubscribe(c echo.Context) error {
|
||||
addCORS(c)
|
||||
|
||||
resource := c.Param("resource")
|
||||
if resource == "" {
|
||||
return c.String(http.StatusBadRequest, "missing resource id")
|
||||
}
|
||||
|
||||
h.peersMu.Lock()
|
||||
peer, ok := h.peers[resource]
|
||||
if ok {
|
||||
delete(h.peers, resource)
|
||||
h.mu.Lock()
|
||||
streamID := h.peerStream[resource]
|
||||
var peer *corewebrtc.Peer
|
||||
if streamID != "" {
|
||||
peer = h.peersByStream[streamID][resource]
|
||||
delete(h.peersByStream[streamID], resource)
|
||||
if len(h.peersByStream[streamID]) == 0 {
|
||||
delete(h.peersByStream, streamID)
|
||||
}
|
||||
delete(h.peerStream, resource)
|
||||
}
|
||||
h.peersMu.Unlock()
|
||||
h.mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
if peer != nil {
|
||||
_ = peer.Close()
|
||||
}
|
||||
if streamID != "" {
|
||||
atomic.AddInt64(&h.count, -1)
|
||||
}
|
||||
return c.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// Trickle handles PATCH /whep/:id/:resource — adds ICE candidates
|
||||
// from a trickle-ice-sdpfrag body. Empty body is a no-op (clients
|
||||
// signal end-of-candidates via an a=end-of-candidates line, which
|
||||
// AddICECandidate accepts).
|
||||
func (h *Handler) Trickle(c echo.Context) error {
|
||||
addCORS(c)
|
||||
|
||||
resource := c.Param("resource")
|
||||
if resource == "" {
|
||||
return c.String(http.StatusBadRequest, "missing resource id")
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
streamID := h.peerStream[resource]
|
||||
var peer *corewebrtc.Peer
|
||||
if streamID != "" {
|
||||
peer = h.peersByStream[streamID][resource]
|
||||
}
|
||||
h.mu.Unlock()
|
||||
if peer == nil {
|
||||
return c.NoContent(http.StatusNotFound)
|
||||
}
|
||||
_ = peer.Close()
|
||||
atomic.AddInt64(&h.count, -1)
|
||||
|
||||
body, err := io.ReadAll(c.Request().Body)
|
||||
if err != nil {
|
||||
return c.String(http.StatusBadRequest, "read body: "+err.Error())
|
||||
}
|
||||
for _, line := range strings.Split(string(body), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if !strings.HasPrefix(line, "a=candidate:") {
|
||||
continue
|
||||
}
|
||||
cand := strings.TrimPrefix(line, "a=")
|
||||
_ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand})
|
||||
}
|
||||
return c.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// preflight answers a CORS OPTIONS request; the headers are also
|
||||
// echoed on every other response.
|
||||
func (h *Handler) preflight(c echo.Context) error {
|
||||
addCORS(c)
|
||||
return c.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// Close tears down every active peer (e.g., during Core shutdown).
|
||||
func (h *Handler) Close() {
|
||||
h.peersMu.Lock()
|
||||
peers := make([]*corewebrtc.Peer, 0, len(h.peers))
|
||||
for _, p := range h.peers {
|
||||
peers = append(peers, p)
|
||||
h.mu.Lock()
|
||||
peers := make([]*corewebrtc.Peer, 0)
|
||||
for _, m := range h.peersByStream {
|
||||
for _, p := range m {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
}
|
||||
h.peers = make(map[string]*corewebrtc.Peer)
|
||||
h.peersMu.Unlock()
|
||||
h.peersByStream = make(map[string]map[string]*corewebrtc.Peer)
|
||||
h.peerStream = make(map[string]string)
|
||||
h.mu.Unlock()
|
||||
|
||||
for _, p := range peers {
|
||||
_ = p.Close()
|
||||
if p != nil {
|
||||
_ = p.Close()
|
||||
}
|
||||
}
|
||||
atomic.StoreInt64(&h.count, 0)
|
||||
}
|
||||
|
||||
// awaitPeerClose blocks on peer.Done() and yanks the index entry when
|
||||
// the peer self-closes (ICE failed/disconnected). Idempotent with
|
||||
// the Unsubscribe path: if Unsubscribe ran first the index is already
|
||||
// empty and we just decrement the counter once on first arrival.
|
||||
func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) {
|
||||
<-peer.Done()
|
||||
h.mu.Lock()
|
||||
streamID := h.peerStream[resource]
|
||||
_, present := h.peerStream[resource]
|
||||
if present {
|
||||
delete(h.peerStream, resource)
|
||||
if streamID != "" {
|
||||
delete(h.peersByStream[streamID], resource)
|
||||
if len(h.peersByStream[streamID]) == 0 {
|
||||
delete(h.peersByStream, streamID)
|
||||
}
|
||||
}
|
||||
}
|
||||
h.mu.Unlock()
|
||||
if present {
|
||||
atomic.AddInt64(&h.count, -1)
|
||||
}
|
||||
}
|
||||
|
||||
// tearDownStreamPeers is the callback the Subsystem runs in its
|
||||
// onProcessStop hook. It closes every peer subscribed to that
|
||||
// stream (driving each one's Done() and indirectly awaitPeerClose).
|
||||
func (h *Handler) tearDownStreamPeers(streamID string) {
|
||||
h.mu.Lock()
|
||||
bucket := h.peersByStream[streamID]
|
||||
peers := make([]*corewebrtc.Peer, 0, len(bucket))
|
||||
for _, p := range bucket {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
for _, p := range peers {
|
||||
if p != nil {
|
||||
_ = p.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addCORS emits the response headers a browser-side WHEP player
|
||||
// expects. WHEP's Location and ETag headers must be exposed for
|
||||
// fetch() to read them across origins.
|
||||
func addCORS(c echo.Context) {
|
||||
hh := c.Response().Header()
|
||||
hh.Set("Access-Control-Allow-Origin", "*")
|
||||
hh.Set("Access-Control-Allow-Methods", "POST, DELETE, PATCH, OPTIONS")
|
||||
hh.Set("Access-Control-Allow-Headers", "Content-Type, Authorization, If-Match, If-None-Match")
|
||||
hh.Set("Access-Control-Expose-Headers", "Location, ETag")
|
||||
}
|
||||
|
||||
// requireH264AndOpus does a coarse SDP scan to confirm the offer
|
||||
// includes both an H.264 video rtpmap and an Opus audio rtpmap. The
|
||||
// design treats codec mismatch as a 406, never a silent black frame.
|
||||
//
|
||||
// This is intentionally a string scan rather than a full SDP parse:
|
||||
// every modern browser advertises H.264 and Opus by name, and a
|
||||
// dependency on a real SDP parser for one validation step is
|
||||
// disproportionate. M4 may swap this for pion/sdp.v3 when other
|
||||
// surfaces also need parsing.
|
||||
func requireH264AndOpus(sdp string) error {
|
||||
lower := strings.ToLower(sdp)
|
||||
hasH264 := strings.Contains(lower, "h264/90000") || strings.Contains(lower, " h264/")
|
||||
hasOpus := strings.Contains(lower, "opus/48000") || strings.Contains(lower, " opus/")
|
||||
if hasH264 && hasOpus {
|
||||
return nil
|
||||
}
|
||||
missing := []string{}
|
||||
if !hasH264 {
|
||||
missing = append(missing, "H264")
|
||||
}
|
||||
if !hasOpus {
|
||||
missing = append(missing, "Opus")
|
||||
}
|
||||
return &codecMismatchError{missing: missing}
|
||||
}
|
||||
|
||||
type codecMismatchError struct{ missing []string }
|
||||
|
||||
func (e *codecMismatchError) Error() string {
|
||||
return "webrtc: codec mismatch — offer is missing: " + strings.Join(e.missing, ", ")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,9 +68,11 @@ func TestHandler_Subscribe_400OnEmptyBody(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestHandler_Unsubscribe_404WhenUnknown verifies a DELETE with an
|
||||
// unknown resource id returns 404 and no state mutation.
|
||||
func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) {
|
||||
// TestHandler_Unsubscribe_204WhenUnknown verifies a DELETE with an
|
||||
// unknown resource id returns 204 (idempotent), per the WHEP spec
|
||||
// and the M2/M3 design's error matrix. Pre-M3 this returned 404; the
|
||||
// updated semantics let clients re-issue DELETE without erroring.
|
||||
func TestHandler_Unsubscribe_204WhenUnknown(t *testing.T) {
|
||||
h := NewHandler(newTestSubsystem(t), 0)
|
||||
|
||||
e := echo.New()
|
||||
|
|
@ -83,7 +85,7 @@ func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) {
|
|||
if err := h.Unsubscribe(c); err != nil {
|
||||
t.Fatalf("Unsubscribe returned error: %v", err)
|
||||
}
|
||||
if rec.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404, got %d", rec.Code)
|
||||
if rec.Code != http.StatusNoContent {
|
||||
t.Fatalf("expected 204, got %d", rec.Code)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue