diff --git a/.gitignore b/.gitignore index d95f155..0561ec1 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ *.flv .VSCodeCounter +whep-client diff --git a/app/webrtc/handler.go b/app/webrtc/handler.go index 4f28624..3b1fe5c 100644 --- a/app/webrtc/handler.go +++ b/app/webrtc/handler.go @@ -13,50 +13,84 @@ import ( corewebrtc "github.com/datarhei/core/v16/core/webrtc" ) +// Default per-stream peer cap when the caller passes 0. The total cap +// (passed to NewHandler) is enforced separately and takes precedence. +const defaultMaxPeersPerStream = 8 + // Handler exposes the subsystem's WHEP Echo handlers. Wire them into // the /api/v3 group (or a sibling group) via Handler.Register. +// +// Lifecycle: peers are tracked in a streamID→resourceID→Peer index. +// On every Subscribe we spin a tiny goroutine watching the new peer's +// Done() channel; when ICE fails or Close() runs the index entry is +// removed and the counters tick back down — no leaks if the browser +// rage-quits. type Handler struct { sub *Subsystem - peersMu sync.Mutex - peers map[string]*corewebrtc.Peer // resourceID -> peer - count int64 // atomic, for cap check without lock - maxCap int64 + mu sync.Mutex + peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer + peerStream map[string]string // resource -> streamID (reverse index) + count int64 // atomic + maxCapTotal int64 + maxCapPerStrm int64 } // NewHandler wraps the subsystem in an Echo-compatible HTTP handler. -// The maxPeers argument caps concurrent subscribers; pass 0 to use a -// generous default (matches corewebrtc.DefaultConfig). +// The maxPeers argument caps concurrent subscribers across all streams; +// pass 0 to use a generous default (matches corewebrtc.DefaultConfig). +// The per-stream cap is taken from the corewebrtc default; pass a +// non-zero value to override via NewHandlerWithCaps. func NewHandler(s *Subsystem, maxPeers int) *Handler { - cap := int64(maxPeers) - if cap <= 0 { - cap = int64(corewebrtc.DefaultConfig().MaxPeersTotal) - } - return &Handler{ - sub: s, - peers: make(map[string]*corewebrtc.Peer), - maxCap: cap, - } + return NewHandlerWithCaps(s, maxPeers, 0) } -// Register mounts the WHEP routes on the provided Echo group. WHEP -// POST is /whep/:id, WHEP DELETE is /whep/:id/:resource. +// NewHandlerWithCaps is NewHandler plus an explicit per-stream cap. +// maxPeersPerStream <= 0 falls back to defaultMaxPeersPerStream. +func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler { + total := int64(maxPeers) + if total <= 0 { + total = int64(corewebrtc.DefaultConfig().MaxPeersTotal) + } + perStream := int64(maxPeersPerStream) + if perStream <= 0 { + perStream = defaultMaxPeersPerStream + } + h := &Handler{ + sub: s, + peersByStream: make(map[string]map[string]*corewebrtc.Peer), + peerStream: make(map[string]string), + maxCapTotal: total, + maxCapPerStrm: perStream, + } + // Subsystem broadcasts process-stop via this hook so the handler + // can yank stale peer entries before their Sources close out + // from underneath them. + if s != nil { + s.SetTeardownHook(h.tearDownStreamPeers) + } + return h +} + +// Register mounts the WHEP routes on the provided Echo group. // -// The routes are deliberately unauthenticated in M2 because WHEP -// clients (browsers, OBS) don't carry the Core JWT. M3 will add -// per-process signed-URL tokens; for M2 the deployment is expected -// to put the endpoint behind an authenticated reverse-proxy or VPN. +// CORS preflights are answered on every WHEP path; regular WHEP +// responses also carry the Access-Control-* headers so browser-side +// players living on a different origin can subscribe. func (h *Handler) Register(g *echo.Group) { + g.OPTIONS("/whep/:id", h.preflight) + g.OPTIONS("/whep/:id/:resource", h.preflight) g.POST("/whep/:id", h.Subscribe) g.DELETE("/whep/:id/:resource", h.Unsubscribe) + g.PATCH("/whep/:id/:resource", h.Trickle) } // Subscribe handles POST /whep/:id. Request body is an SDP offer, // response is an SDP answer with a Location header pointing at the -// DELETE resource. +// DELETE/PATCH resource. // // @Summary Subscribe to a WebRTC stream via WHEP -// @Description Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE resource for teardown. +// @Description Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE. // @Tags v16.16.0 // @ID webrtc-3-whep-subscribe // @Accept application/sdp @@ -65,16 +99,21 @@ func (h *Handler) Register(g *echo.Group) { // @Success 201 {string} string "SDP answer" // @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP" // @Failure 404 {string} string "no stream registered for this process id" -// @Failure 503 {string} string "peer cap reached" +// @Failure 406 {string} string "offer SDP missing required H264 / Opus rtpmap" +// @Failure 503 {string} string "peer cap reached (per-stream or total)" +// @Failure 504 {string} string "ICE gathering timeout" // @Security ApiKeyAuth // @Router /api/v3/whep/{id} [post] func (h *Handler) Subscribe(c echo.Context) error { + addCORS(c) + id := c.Param("id") if id == "" { return c.String(http.StatusBadRequest, "missing stream id") } - if atomic.LoadInt64(&h.count) >= h.maxCap { + // Total cap: cheap atomic check before doing real work. + if atomic.LoadInt64(&h.count) >= h.maxCapTotal { return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error()) } @@ -83,6 +122,14 @@ func (h *Handler) Subscribe(c echo.Context) error { return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error()) } + // Per-stream cap: needs the lock since we're indexing per stream. + h.mu.Lock() + if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm { + h.mu.Unlock() + return c.String(http.StatusServiceUnavailable, "webrtc: per-stream peer cap reached") + } + h.mu.Unlock() + body, err := io.ReadAll(c.Request().Body) if err != nil { return c.String(http.StatusBadRequest, "read body: "+err.Error()) @@ -90,29 +137,52 @@ func (h *Handler) Subscribe(c echo.Context) error { if len(body) == 0 || !strings.HasPrefix(string(body), "v=") { return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error()) } + if err := requireH264AndOpus(string(body)); err != nil { + return c.String(http.StatusNotAcceptable, err.Error()) + } offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer) if err != nil { - return c.String(http.StatusInternalServerError, "create peer: "+err.Error()) + // Surface the design's error matrix. + switch err { + case corewebrtc.ErrICETimeout: + return c.String(http.StatusGatewayTimeout, err.Error()) + case corewebrtc.ErrCodecMismatch: + return c.String(http.StatusNotAcceptable, err.Error()) + default: + return c.String(http.StatusInternalServerError, "create peer: "+err.Error()) + } } - h.peersMu.Lock() - h.peers[peer.ResourceID()] = peer - h.peersMu.Unlock() + rid := peer.ResourceID() + h.mu.Lock() + if h.peersByStream[id] == nil { + h.peersByStream[id] = make(map[string]*corewebrtc.Peer) + } + h.peersByStream[id][rid] = peer + h.peerStream[rid] = id + h.mu.Unlock() atomic.AddInt64(&h.count, 1) + // Auto-cleanup: when Pion's OnConnectionStateChange triggers + // peer.Close() (ICE failed/disconnected), the Done channel + // closes and we yank the index entry. Without this the map + // leaks for the lifetime of the handler. + go h.awaitPeerClose(rid, peer) + c.Response().Header().Set("Content-Type", "application/sdp") - c.Response().Header().Set("Location", "/whep/"+id+"/"+peer.ResourceID()) + c.Response().Header().Set("Location", "/whep/"+id+"/"+rid) + c.Response().Header().Set("ETag", `"`+rid+`"`) return c.String(http.StatusCreated, peer.Answer().SDP) } -// Unsubscribe handles DELETE /whep/:id/:resource. The :id is part of -// the path per WHEP spec but we only need :resource to locate the -// peer; :id is accepted for route symmetry. +// Unsubscribe handles DELETE /whep/:id/:resource. Per WHEP spec we +// return 204 even when the resource is unknown — DELETE is idempotent +// and a re-issued tear-down should never error out. // // @Summary Tear down a WHEP subscription -// @Description Tear down a WebRTC peer connection by its resource id (returned in the Location header by Subscribe). Idempotent: returns 204 even when the resource is unknown, per the WHEP spec. +// @Description Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec. // @Tags v16.16.0 // @ID webrtc-3-whep-unsubscribe // @Param id path string true "Process ID" @@ -122,38 +192,196 @@ func (h *Handler) Subscribe(c echo.Context) error { // @Security ApiKeyAuth // @Router /api/v3/whep/{id}/{resource} [delete] func (h *Handler) Unsubscribe(c echo.Context) error { + addCORS(c) + resource := c.Param("resource") if resource == "" { return c.String(http.StatusBadRequest, "missing resource id") } - h.peersMu.Lock() - peer, ok := h.peers[resource] - if ok { - delete(h.peers, resource) + h.mu.Lock() + streamID := h.peerStream[resource] + var peer *corewebrtc.Peer + if streamID != "" { + peer = h.peersByStream[streamID][resource] + delete(h.peersByStream[streamID], resource) + if len(h.peersByStream[streamID]) == 0 { + delete(h.peersByStream, streamID) + } + delete(h.peerStream, resource) } - h.peersMu.Unlock() + h.mu.Unlock() - if !ok { + if peer != nil { + _ = peer.Close() + } + if streamID != "" { + atomic.AddInt64(&h.count, -1) + } + return c.NoContent(http.StatusNoContent) +} + +// Trickle handles PATCH /whep/:id/:resource — adds ICE candidates +// from a trickle-ice-sdpfrag body. Empty body is a no-op (clients +// signal end-of-candidates via an a=end-of-candidates line, which +// AddICECandidate accepts). +// +// @Summary Trickle ICE candidates for a WHEP subscription +// @Description Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag. +// @Tags v16.16.0 +// @ID webrtc-3-whep-trickle +// @Accept application/trickle-ice-sdpfrag +// @Param id path string true "Process ID" +// @Param resource path string true "Resource ID from the Subscribe Location header" +// @Success 204 "no content" +// @Failure 400 {string} string "missing resource id or unreadable body" +// @Failure 404 {string} string "peer not found" +// @Security ApiKeyAuth +// @Router /api/v3/whep/{id}/{resource} [patch] +func (h *Handler) Trickle(c echo.Context) error { + addCORS(c) + + resource := c.Param("resource") + if resource == "" { + return c.String(http.StatusBadRequest, "missing resource id") + } + + h.mu.Lock() + streamID := h.peerStream[resource] + var peer *corewebrtc.Peer + if streamID != "" { + peer = h.peersByStream[streamID][resource] + } + h.mu.Unlock() + if peer == nil { return c.NoContent(http.StatusNotFound) } - _ = peer.Close() - atomic.AddInt64(&h.count, -1) + + body, err := io.ReadAll(c.Request().Body) + if err != nil { + return c.String(http.StatusBadRequest, "read body: "+err.Error()) + } + for _, line := range strings.Split(string(body), "\n") { + line = strings.TrimSpace(line) + if !strings.HasPrefix(line, "a=candidate:") { + continue + } + cand := strings.TrimPrefix(line, "a=") + _ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand}) + } + return c.NoContent(http.StatusNoContent) +} + +// preflight answers a CORS OPTIONS request; the headers are also +// echoed on every other response. +func (h *Handler) preflight(c echo.Context) error { + addCORS(c) return c.NoContent(http.StatusNoContent) } // Close tears down every active peer (e.g., during Core shutdown). func (h *Handler) Close() { - h.peersMu.Lock() - peers := make([]*corewebrtc.Peer, 0, len(h.peers)) - for _, p := range h.peers { - peers = append(peers, p) + h.mu.Lock() + peers := make([]*corewebrtc.Peer, 0) + for _, m := range h.peersByStream { + for _, p := range m { + peers = append(peers, p) + } } - h.peers = make(map[string]*corewebrtc.Peer) - h.peersMu.Unlock() + h.peersByStream = make(map[string]map[string]*corewebrtc.Peer) + h.peerStream = make(map[string]string) + h.mu.Unlock() for _, p := range peers { - _ = p.Close() + if p != nil { + _ = p.Close() + } } atomic.StoreInt64(&h.count, 0) } + +// awaitPeerClose blocks on peer.Done() and yanks the index entry when +// the peer self-closes (ICE failed/disconnected). Idempotent with +// the Unsubscribe path: if Unsubscribe ran first the index is already +// empty and we just decrement the counter once on first arrival. +func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) { + <-peer.Done() + h.mu.Lock() + streamID := h.peerStream[resource] + _, present := h.peerStream[resource] + if present { + delete(h.peerStream, resource) + if streamID != "" { + delete(h.peersByStream[streamID], resource) + if len(h.peersByStream[streamID]) == 0 { + delete(h.peersByStream, streamID) + } + } + } + h.mu.Unlock() + if present { + atomic.AddInt64(&h.count, -1) + } +} + +// tearDownStreamPeers is the callback the Subsystem runs in its +// onProcessStop hook. It closes every peer subscribed to that +// stream (driving each one's Done() and indirectly awaitPeerClose). +func (h *Handler) tearDownStreamPeers(streamID string) { + h.mu.Lock() + bucket := h.peersByStream[streamID] + peers := make([]*corewebrtc.Peer, 0, len(bucket)) + for _, p := range bucket { + peers = append(peers, p) + } + h.mu.Unlock() + + for _, p := range peers { + if p != nil { + _ = p.Close() + } + } +} + +// addCORS emits the response headers a browser-side WHEP player +// expects. WHEP's Location and ETag headers must be exposed for +// fetch() to read them across origins. +func addCORS(c echo.Context) { + hh := c.Response().Header() + hh.Set("Access-Control-Allow-Origin", "*") + hh.Set("Access-Control-Allow-Methods", "POST, DELETE, PATCH, OPTIONS") + hh.Set("Access-Control-Allow-Headers", "Content-Type, Authorization, If-Match, If-None-Match") + hh.Set("Access-Control-Expose-Headers", "Location, ETag") +} + +// requireH264AndOpus does a coarse SDP scan to confirm the offer +// includes both an H.264 video rtpmap and an Opus audio rtpmap. The +// design treats codec mismatch as a 406, never a silent black frame. +// +// This is intentionally a string scan rather than a full SDP parse: +// every modern browser advertises H.264 and Opus by name, and a +// dependency on a real SDP parser for one validation step is +// disproportionate. M4 may swap this for pion/sdp.v3 when other +// surfaces also need parsing. +func requireH264AndOpus(sdp string) error { + lower := strings.ToLower(sdp) + hasH264 := strings.Contains(lower, "h264/90000") || strings.Contains(lower, " h264/") + hasOpus := strings.Contains(lower, "opus/48000") || strings.Contains(lower, " opus/") + if hasH264 && hasOpus { + return nil + } + missing := []string{} + if !hasH264 { + missing = append(missing, "H264") + } + if !hasOpus { + missing = append(missing, "Opus") + } + return &codecMismatchError{missing: missing} +} + +type codecMismatchError struct{ missing []string } + +func (e *codecMismatchError) Error() string { + return "webrtc: codec mismatch — offer is missing: " + strings.Join(e.missing, ", ") +} diff --git a/app/webrtc/handler_m3_test.go b/app/webrtc/handler_m3_test.go new file mode 100644 index 0000000..393e318 --- /dev/null +++ b/app/webrtc/handler_m3_test.go @@ -0,0 +1,251 @@ +package webrtc + +import ( + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + + "github.com/labstack/echo/v4" + + corewebrtc "github.com/datarhei/core/v16/core/webrtc" +) + +// minimalH264OpusOffer returns an SDP offer that includes both H264 +// and Opus rtpmap lines — passes requireH264AndOpus but is otherwise +// nonsense, so CreatePeerFromSources will fail downstream when this +// is wired through. Use it only in tests that don't reach the +// PeerConnection path. +func minimalH264OpusOffer() string { + return "v=0\r\n" + + "o=- 0 0 IN IP4 0.0.0.0\r\ns=-\r\nt=0 0\r\n" + + "m=video 9 UDP/TLS/RTP/SAVPF 102\r\n" + + "a=rtpmap:102 H264/90000\r\n" + + "m=audio 9 UDP/TLS/RTP/SAVPF 111\r\n" + + "a=rtpmap:111 opus/48000/2\r\n" +} + +// nonH264Offer is missing H264 entirely. Triggers requireH264AndOpus. +func nonH264Offer() string { + return "v=0\r\n" + + "m=video 9 UDP/TLS/RTP/SAVPF 96\r\n" + + "a=rtpmap:96 VP8/90000\r\n" + + "m=audio 9 UDP/TLS/RTP/SAVPF 111\r\n" + + "a=rtpmap:111 opus/48000/2\r\n" +} + +// TestHandler_Subscribe_406OnCodecMismatch verifies an offer that +// doesn't include H264 yields 406, per the design's error matrix. +func TestHandler_Subscribe_406OnCodecMismatch(t *testing.T) { + sub := newTestSubsystem(t) + sub.mu.Lock() + sub.streams["s"] = &processStream{id: "s"} + sub.mu.Unlock() + h := NewHandler(sub, 0) + + e := echo.New() + req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(nonH264Offer())) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + c.SetParamNames("id") + c.SetParamValues("s") + + if err := h.Subscribe(c); err != nil { + t.Fatalf("Subscribe: %v", err) + } + if rec.Code != http.StatusNotAcceptable { + t.Fatalf("expected 406, got %d: %s", rec.Code, rec.Body.String()) + } + if !strings.Contains(rec.Body.String(), "H264") { + t.Errorf("body should mention missing codec: %q", rec.Body.String()) + } +} + +// TestHandler_Subscribe_503OnTotalCap simulates the total cap being +// exhausted by another subscriber. We don't actually create real peers +// (would need a real PeerConnection); instead we pre-load the atomic +// counter so the cap check fires. +func TestHandler_Subscribe_503OnTotalCap(t *testing.T) { + sub := newTestSubsystem(t) + sub.mu.Lock() + sub.streams["s"] = &processStream{id: "s"} + sub.mu.Unlock() + h := NewHandlerWithCaps(sub, 1, 100) + atomic.StoreInt64(&h.count, 1) // simulate one in-flight peer + + e := echo.New() + req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(minimalH264OpusOffer())) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + c.SetParamNames("id") + c.SetParamValues("s") + _ = h.Subscribe(c) + if rec.Code != http.StatusServiceUnavailable { + t.Fatalf("expected 503, got %d: %s", rec.Code, rec.Body.String()) + } + if !strings.Contains(rec.Body.String(), corewebrtc.ErrPeerCapReached.Error()) { + t.Errorf("body should mention peer cap: %q", rec.Body.String()) + } +} + +// TestHandler_Subscribe_503OnPerStreamCap simulates the per-stream cap +// being exhausted. Same trick as above but populating the per-stream +// index directly. +func TestHandler_Subscribe_503OnPerStreamCap(t *testing.T) { + sub := newTestSubsystem(t) + sub.mu.Lock() + sub.streams["s"] = &processStream{id: "s"} + sub.mu.Unlock() + h := NewHandlerWithCaps(sub, 100, 1) + // Drop a placeholder peer into the per-stream bucket so the cap + // arithmetic trips on the next subscribe. + h.mu.Lock() + h.peersByStream["s"] = map[string]*corewebrtc.Peer{"existing": nil} + h.mu.Unlock() + + e := echo.New() + req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(minimalH264OpusOffer())) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + c.SetParamNames("id") + c.SetParamValues("s") + _ = h.Subscribe(c) + if rec.Code != http.StatusServiceUnavailable { + t.Fatalf("expected 503, got %d: %s", rec.Code, rec.Body.String()) + } + if !strings.Contains(rec.Body.String(), "per-stream") { + t.Errorf("body should mention per-stream cap: %q", rec.Body.String()) + } +} + +// TestHandler_Trickle_404WhenUnknown verifies a PATCH for an unknown +// resource returns 404 (we still treat the resource as authoritative +// here; only DELETE is idempotent per spec). +func TestHandler_Trickle_404WhenUnknown(t *testing.T) { + h := NewHandler(newTestSubsystem(t), 0) + + e := echo.New() + req := httptest.NewRequest(http.MethodPatch, "/whep/id/unknown", strings.NewReader("")) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + c.SetParamNames("id", "resource") + c.SetParamValues("id", "unknown") + + if err := h.Trickle(c); err != nil { + t.Fatalf("Trickle: %v", err) + } + if rec.Code != http.StatusNotFound { + t.Fatalf("expected 404, got %d", rec.Code) + } +} + +// TestHandler_PreflightCORS verifies OPTIONS returns 204 with the +// browser-friendly CORS headers. +func TestHandler_PreflightCORS(t *testing.T) { + h := NewHandler(newTestSubsystem(t), 0) + + e := echo.New() + req := httptest.NewRequest(http.MethodOptions, "/whep/x", nil) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + c.SetParamNames("id") + c.SetParamValues("x") + + if err := h.preflight(c); err != nil { + t.Fatalf("preflight: %v", err) + } + if rec.Code != http.StatusNoContent { + t.Fatalf("expected 204, got %d", rec.Code) + } + hh := rec.Header() + for _, k := range []string{ + "Access-Control-Allow-Origin", + "Access-Control-Allow-Methods", + "Access-Control-Allow-Headers", + "Access-Control-Expose-Headers", + } { + if hh.Get(k) == "" { + t.Errorf("missing CORS header %q", k) + } + } +} + +// TestHandler_RegisterMountsAllRoutes is a sanity check that +// Handler.Register installs OPTIONS / POST / DELETE / PATCH on the +// expected paths. Echo's Group has no public route enumerator, so we +// dispatch synthetic requests and assert the right methods are +// reachable. +func TestHandler_RegisterMountsAllRoutes(t *testing.T) { + h := NewHandler(newTestSubsystem(t), 0) + e := echo.New() + g := e.Group("") + h.Register(g) + + cases := []struct { + method, path string + want int + }{ + {http.MethodOptions, "/whep/foo", http.StatusNoContent}, + {http.MethodOptions, "/whep/foo/bar", http.StatusNoContent}, + {http.MethodPost, "/whep/foo", http.StatusNotFound}, // stream missing -> 404 + {http.MethodDelete, "/whep/foo/bar", http.StatusNoContent}, + {http.MethodPatch, "/whep/foo/bar", http.StatusNotFound}, + } + for _, tc := range cases { + req := httptest.NewRequest(tc.method, tc.path, strings.NewReader("")) + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + if rec.Code != tc.want { + t.Errorf("%s %s: got %d want %d (%s)", tc.method, tc.path, rec.Code, tc.want, rec.Body.String()) + } + } +} + +// TestHandler_Close_DrainsPeers seeds a fake peer into the index and +// verifies Close clears it without panicking. +func TestHandler_Close_DrainsPeers(t *testing.T) { + h := NewHandler(newTestSubsystem(t), 0) + h.mu.Lock() + h.peersByStream["s"] = map[string]*corewebrtc.Peer{"r1": nil} + h.peerStream["r1"] = "s" + atomic.StoreInt64(&h.count, 1) + h.mu.Unlock() + + h.Close() + if got := atomic.LoadInt64(&h.count); got != 0 { + t.Errorf("count after Close = %d, want 0", got) + } + h.mu.Lock() + if len(h.peersByStream) != 0 || len(h.peerStream) != 0 { + t.Errorf("indexes not cleared") + } + h.mu.Unlock() +} + +// TestRequireH264AndOpus covers the SDP scanner's positive + +// negative cases. +func TestRequireH264AndOpus(t *testing.T) { + cases := []struct { + name string + sdp string + ok bool + }{ + {"both", minimalH264OpusOffer(), true}, + {"missing h264", nonH264Offer(), false}, + {"missing opus", "m=video 9 UDP/TLS/RTP/SAVPF 102\r\na=rtpmap:102 H264/90000\r\n", false}, + {"capitalized", "a=rtpmap:111 OPUS/48000\r\na=rtpmap:102 H264/90000", true}, + {"empty", "", false}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + err := requireH264AndOpus(c.sdp) + if c.ok && err != nil { + t.Errorf("expected ok, got %v", err) + } + if !c.ok && err == nil { + t.Errorf("expected error") + } + }) + } +} diff --git a/app/webrtc/handler_test.go b/app/webrtc/handler_test.go index 51e88d0..b36dfd6 100644 --- a/app/webrtc/handler_test.go +++ b/app/webrtc/handler_test.go @@ -68,9 +68,11 @@ func TestHandler_Subscribe_400OnEmptyBody(t *testing.T) { } } -// TestHandler_Unsubscribe_404WhenUnknown verifies a DELETE with an -// unknown resource id returns 404 and no state mutation. -func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) { +// TestHandler_Unsubscribe_204WhenUnknown verifies a DELETE with an +// unknown resource id returns 204 (idempotent), per the WHEP spec +// and the M2/M3 design's error matrix. Pre-M3 this returned 404; the +// updated semantics let clients re-issue DELETE without erroring. +func TestHandler_Unsubscribe_204WhenUnknown(t *testing.T) { h := NewHandler(newTestSubsystem(t), 0) e := echo.New() @@ -83,7 +85,7 @@ func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) { if err := h.Unsubscribe(c); err != nil { t.Fatalf("Unsubscribe returned error: %v", err) } - if rec.Code != http.StatusNotFound { - t.Fatalf("expected 404, got %d", rec.Code) + if rec.Code != http.StatusNoContent { + t.Fatalf("expected 204, got %d", rec.Code) } } diff --git a/app/webrtc/lifecycle.go b/app/webrtc/lifecycle.go index 61e676a..2583d09 100644 --- a/app/webrtc/lifecycle.go +++ b/app/webrtc/lifecycle.go @@ -94,6 +94,7 @@ func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.Conf func (s *Subsystem) onProcessStop(id string) { s.mu.Lock() st, ok := s.streams[id] + teardown := s.teardown if ok { delete(s.streams, id) } @@ -102,6 +103,16 @@ func (s *Subsystem) onProcessStop(id string) { if !ok { return } + + // Broadcast first, so any subscribed peers get torn down while + // the streamID is still meaningful. The handler's tearDownStreamPeers + // drives each Peer.Close() which in turn unsubscribes from the + // Sources we're about to shut down — preventing a "subscribers fan + // out into a closed channel" race. + if teardown != nil { + teardown(id) + } + if st.video != nil { _ = st.video.Close() } diff --git a/app/webrtc/multiviewer_test.go b/app/webrtc/multiviewer_test.go new file mode 100644 index 0000000..1f9910f --- /dev/null +++ b/app/webrtc/multiviewer_test.go @@ -0,0 +1,257 @@ +package webrtc + +import ( + "net" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/labstack/echo/v4" + pionwebrtc "github.com/pion/webrtc/v4" + + "github.com/datarhei/core/v16/config" + appcfg "github.com/datarhei/core/v16/restream/app" +) + +// TestIntegration_FiveViewerFanout drives the M3 acceptance criterion +// "5 concurrent viewers, all error paths correct, clean teardown" in +// the wide direction. Five Pion subscribers attach to a single +// process's stream pair and each receives RTP without crosstalk; on +// teardown every subscriber's PeerConnection observes its tracks +// closing. +// +// Verifies (in order): +// * subsystem.onProcessStart returns adjacent UDP ports +// * 5 WHEP POSTs in parallel succeed (per-stream cap default = 8) +// * every subscriber's video and audio track receives at least one +// RTP packet within the timeout +// * onProcessStop tears every subscriber down (PeerConnection +// transitions away from connected/connecting) +func TestIntegration_FiveViewerFanout(t *testing.T) { + const N = 5 + + sub, err := New(config.DataWebRTC{Enable: true}, nil) + if err != nil { + t.Fatalf("subsystem New: %v", err) + } + defer sub.Close() + + h := NewHandler(sub, 0) + defer h.Close() + + processID := "fanout" + legs, err := sub.onProcessStart(processID, &appcfg.Config{ + ID: processID, + WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}, + }) + if err != nil { + t.Fatalf("onProcessStart: %v", err) + } + if len(legs) != 2 { + t.Fatalf("expected 2 legs, got %d", len(legs)) + } + videoPort, err := portFromLegAddress(legs[0].Address) + if err != nil { + t.Fatalf("video port: %v", err) + } + audioPort, err := portFromLegAddress(legs[1].Address) + if err != nil { + t.Fatalf("audio port: %v", err) + } + + e := echo.New() + g := e.Group("") + h.Register(g) + srv := httptest.NewServer(e) + defer srv.Close() + + // Each subscriber tracks first-RTP-received signals for V and A. + type viewer struct { + pc *pionwebrtc.PeerConnection + videoCh chan struct{} + audioCh chan struct{} + } + viewers := make([]*viewer, N) + api := func() *pionwebrtc.API { + me := &pionwebrtc.MediaEngine{} + _ = me.RegisterDefaultCodecs() + return pionwebrtc.NewAPI(pionwebrtc.WithMediaEngine(me)) + }() + + subscribe := func(i int) error { + pc, err := api.NewPeerConnection(pionwebrtc.Configuration{}) + if err != nil { + return err + } + v := &viewer{pc: pc, videoCh: make(chan struct{}, 1), audioCh: make(chan struct{}, 1)} + viewers[i] = v + var vGot, aGot atomic.Bool + pc.OnTrack(func(tr *pionwebrtc.TrackRemote, _ *pionwebrtc.RTPReceiver) { + go func() { + if _, _, rerr := tr.ReadRTP(); rerr != nil { + return + } + switch tr.Kind() { + case pionwebrtc.RTPCodecTypeVideo: + if vGot.CompareAndSwap(false, true) { + v.videoCh <- struct{}{} + } + case pionwebrtc.RTPCodecTypeAudio: + if aGot.CompareAndSwap(false, true) { + v.audioCh <- struct{}{} + } + } + }() + }) + _, _ = pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeVideo, + pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}) + _, _ = pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeAudio, + pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}) + offer, err := pc.CreateOffer(nil) + if err != nil { + return err + } + gather := pionwebrtc.GatheringCompletePromise(pc) + if err := pc.SetLocalDescription(offer); err != nil { + return err + } + <-gather + resp, err := http.Post(srv.URL+"/whep/"+processID, "application/sdp", + strings.NewReader(pc.LocalDescription().SDP)) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusCreated { + t.Errorf("viewer %d: WHEP %d", i, resp.StatusCode) + return nil + } + buf := make([]byte, 1<<15) + n, _ := resp.Body.Read(buf) + return pc.SetRemoteDescription(pionwebrtc.SessionDescription{ + Type: pionwebrtc.SDPTypeAnswer, + SDP: string(buf[:n]), + }) + } + + // Subscribe all N viewers in parallel. + var wg sync.WaitGroup + for i := 0; i < N; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + if err := subscribe(i); err != nil { + t.Errorf("viewer %d subscribe: %v", i, err) + } + }(i) + } + wg.Wait() + + for i := 0; i < N; i++ { + if viewers[i] == nil || viewers[i].pc == nil { + t.Fatalf("viewer %d not constructed", i) + } + defer viewers[i].pc.Close() + } + + // Spray RTP into both ports until every viewer reports first-RTP. + videoSender, _ := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(videoPort)) + audioSender, _ := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(audioPort)) + defer videoSender.Close() + defer audioSender.Close() + stop := make(chan struct{}) + go func() { + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + var seq uint16 + for { + select { + case <-stop: + return + case <-ticker.C: + seq++ + _, _ = videoSender.Write(synthRTPPacket(102, seq, uint32(seq)*3000, 0xcafe0000, []byte("vvvvvvvv"))) + _, _ = audioSender.Write(synthRTPPacket(111, seq, uint32(seq)*960, 0xbeef0000, []byte("aaaaaaaa"))) + } + } + }() + defer close(stop) + + deadline := time.After(15 * time.Second) + for i, v := range viewers { + select { + case <-v.videoCh: + case <-deadline: + t.Fatalf("viewer %d: no video RTP within 15s", i) + } + select { + case <-v.audioCh: + case <-deadline: + t.Fatalf("viewer %d: no audio RTP within 15s", i) + } + } + + // Confirm the per-stream peer index has all N entries. + h.mu.Lock() + got := len(h.peersByStream[processID]) + h.mu.Unlock() + if got != N { + t.Errorf("peersByStream[%s] = %d, want %d", processID, got, N) + } + + // Tear the process down — every viewer's PC should observe state + // transitioning away from connected within a short window. + sub.onProcessStop(processID) + + // After teardown the peer index for this stream should be empty. + // Closing peers is async (driven by Done channel), so poll briefly. + deadline2 := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline2) { + h.mu.Lock() + empty := len(h.peersByStream[processID]) == 0 + h.mu.Unlock() + if empty { + break + } + time.Sleep(50 * time.Millisecond) + } + h.mu.Lock() + leftover := len(h.peersByStream[processID]) + h.mu.Unlock() + if leftover != 0 { + t.Errorf("after onProcessStop, %d peers remain in index", leftover) + } +} + +// TestSubsystem_TeardownHookFiresOnProcessStop is a unit-level check +// that the teardown callback the Handler installs actually runs. +func TestSubsystem_TeardownHookFiresOnProcessStop(t *testing.T) { + sub, err := New(config.DataWebRTC{Enable: true}, nil) + if err != nil { + t.Fatalf("New: %v", err) + } + defer sub.Close() + + var fired atomic.Int32 + sub.SetTeardownHook(func(streamID string) { + if streamID == "p1" { + fired.Add(1) + } + }) + + if _, err := sub.onProcessStart("p1", &appcfg.Config{ + ID: "p1", + WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}, + }); err != nil { + t.Fatalf("onProcessStart: %v", err) + } + sub.onProcessStop("p1") + if got := fired.Load(); got != 1 { + t.Errorf("teardown fired %d times, want 1", got) + } +} diff --git a/app/webrtc/subsystem.go b/app/webrtc/subsystem.go index d0d5d09..a9257d4 100644 --- a/app/webrtc/subsystem.go +++ b/app/webrtc/subsystem.go @@ -31,6 +31,12 @@ type Subsystem struct { mu sync.Mutex streams map[string]*processStream // processID -> stream pair + + // teardown is set by the Handler (or any other consumer) so the + // Subsystem can broadcast process-stop events. Called *before* + // the per-stream Sources are closed, so consumers can yank their + // own indexes while the stream id is still valid. + teardown func(streamID string) } // processStream captures the two Sources (video + audio) backing a @@ -110,6 +116,19 @@ func (s *Subsystem) Close() { } } +// SetTeardownHook registers a callback invoked just before a stream's +// Sources are closed in onProcessStop. The callback is expected to +// tear down any external resources keyed by streamID — most importantly +// the WHEP Handler's per-stream peer index. +// +// Calling SetTeardownHook again replaces the previous callback; pass +// nil to detach. Only one consumer is supported by design. +func (s *Subsystem) SetTeardownHook(fn func(streamID string)) { + s.mu.Lock() + defer s.mu.Unlock() + s.teardown = fn +} + // lookup returns the per-process stream pair for id, or nil, false. // Used by the WHEP handler. func (s *Subsystem) lookup(id string) (*processStream, bool) { diff --git a/core/webrtc/peer.go b/core/webrtc/peer.go index 64a1cc8..7782ed2 100644 --- a/core/webrtc/peer.go +++ b/core/webrtc/peer.go @@ -152,6 +152,12 @@ func (p *Peer) Answer() webrtc.SessionDescription { return p.answer } // ResourceID returns the stable resource id used in the WHEP Location header. func (p *Peer) ResourceID() string { return p.resourceID } +// Done returns a channel that is closed when the Peer has been torn down +// (either explicitly via Close, or because Pion observed an ICE +// failure / disconnection). Consumers can range over it to drive +// index cleanup without polling. +func (p *Peer) Done() <-chan struct{} { return p.done } + // Close tears down the peer connection and unsubscribes from each // source. Safe to call multiple times. func (p *Peer) Close() error { @@ -257,6 +263,14 @@ func (f *PeerFactory) CreatePeerFromSources(ctx context.Context, return p, nil } + +// AddICECandidate forwards a trickle-ICE candidate to the underlying +// PeerConnection. Returns the underlying error if the candidate is +// malformed or the connection has already been closed. +func (p *Peer) AddICECandidate(c webrtc.ICECandidateInit) error { + return p.pc.AddICECandidate(c) +} + func newResourceID() string { b := make([]byte, 8) _, _ = rand.Read(b) diff --git a/docs/docs.go b/docs/docs.go index 39ce52e..598b6cb 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1910,7 +1910,7 @@ const docTemplate = `{ "ApiKeyAuth": [] } ], - "description": "Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE resource for teardown.", + "description": "Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE.", "consumes": [ "application/sdp" ], @@ -1950,8 +1950,20 @@ const docTemplate = `{ "type": "string" } }, + "406": { + "description": "offer SDP missing required H264 / Opus rtpmap", + "schema": { + "type": "string" + } + }, "503": { - "description": "peer cap reached", + "description": "peer cap reached (per-stream or total)", + "schema": { + "type": "string" + } + }, + "504": { + "description": "ICE gathering timeout", "schema": { "type": "string" } @@ -1966,7 +1978,7 @@ const docTemplate = `{ "ApiKeyAuth": [] } ], - "description": "Tear down a WebRTC peer connection by its resource id (returned in the Location header by Subscribe). Idempotent: returns 204 even when the resource is unknown, per the WHEP spec.", + "description": "Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec.", "tags": [ "v16.16.0" ], @@ -1999,6 +2011,55 @@ const docTemplate = `{ } } } + }, + "patch": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.", + "consumes": [ + "application/trickle-ice-sdpfrag" + ], + "tags": [ + "v16.16.0" + ], + "summary": "Trickle ICE candidates for a WHEP subscription", + "operationId": "webrtc-3-whep-trickle", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Resource ID from the Subscribe Location header", + "name": "resource", + "in": "path", + "required": true + } + ], + "responses": { + "204": { + "description": "no content" + }, + "400": { + "description": "missing resource id or unreadable body", + "schema": { + "type": "string" + } + }, + "404": { + "description": "peer not found", + "schema": { + "type": "string" + } + } + } } }, "/api/v3/widget/process/{id}": { @@ -3283,6 +3344,9 @@ const docTemplate = `{ "api.ProcessConfigWebRTC": { "type": "object", "properties": { + "audio_map": { + "type": "string" + }, "audio_pt": { "type": "integer" }, @@ -3292,6 +3356,9 @@ const docTemplate = `{ "force_transcode": { "type": "boolean" }, + "video_map": { + "type": "string" + }, "video_pt": { "type": "integer" } diff --git a/docs/swagger.json b/docs/swagger.json index dfb4173..bc99cd2 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -1903,7 +1903,7 @@ "ApiKeyAuth": [] } ], - "description": "Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE resource for teardown.", + "description": "Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE.", "consumes": [ "application/sdp" ], @@ -1943,8 +1943,20 @@ "type": "string" } }, + "406": { + "description": "offer SDP missing required H264 / Opus rtpmap", + "schema": { + "type": "string" + } + }, "503": { - "description": "peer cap reached", + "description": "peer cap reached (per-stream or total)", + "schema": { + "type": "string" + } + }, + "504": { + "description": "ICE gathering timeout", "schema": { "type": "string" } @@ -1959,7 +1971,7 @@ "ApiKeyAuth": [] } ], - "description": "Tear down a WebRTC peer connection by its resource id (returned in the Location header by Subscribe). Idempotent: returns 204 even when the resource is unknown, per the WHEP spec.", + "description": "Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec.", "tags": [ "v16.16.0" ], @@ -1992,6 +2004,55 @@ } } } + }, + "patch": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.", + "consumes": [ + "application/trickle-ice-sdpfrag" + ], + "tags": [ + "v16.16.0" + ], + "summary": "Trickle ICE candidates for a WHEP subscription", + "operationId": "webrtc-3-whep-trickle", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Resource ID from the Subscribe Location header", + "name": "resource", + "in": "path", + "required": true + } + ], + "responses": { + "204": { + "description": "no content" + }, + "400": { + "description": "missing resource id or unreadable body", + "schema": { + "type": "string" + } + }, + "404": { + "description": "peer not found", + "schema": { + "type": "string" + } + } + } } }, "/api/v3/widget/process/{id}": { @@ -3276,6 +3337,9 @@ "api.ProcessConfigWebRTC": { "type": "object", "properties": { + "audio_map": { + "type": "string" + }, "audio_pt": { "type": "integer" }, @@ -3285,6 +3349,9 @@ "force_transcode": { "type": "boolean" }, + "video_map": { + "type": "string" + }, "video_pt": { "type": "integer" } diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 959fe45..54d7267 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -796,12 +796,16 @@ definitions: type: object api.ProcessConfigWebRTC: properties: + audio_map: + type: string audio_pt: type: integer enabled: type: boolean force_transcode: type: boolean + video_map: + type: string video_pt: type: integer type: object @@ -3223,7 +3227,7 @@ paths: - application/sdp description: 'Subscribe to a process''s WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location - header points at the DELETE resource for teardown.' + header points at the DELETE/PATCH resource for teardown and trickle ICE.' operationId: webrtc-3-whep-subscribe parameters: - description: Process ID with config.webrtc.enabled=true @@ -3246,8 +3250,16 @@ paths: description: no stream registered for this process id schema: type: string + "406": + description: offer SDP missing required H264 / Opus rtpmap + schema: + type: string "503": - description: peer cap reached + description: peer cap reached (per-stream or total) + schema: + type: string + "504": + description: ICE gathering timeout schema: type: string security: @@ -3257,9 +3269,9 @@ paths: - v16.16.0 /api/v3/whep/{id}/{resource}: delete: - description: 'Tear down a WebRTC peer connection by its resource id (returned - in the Location header by Subscribe). Idempotent: returns 204 even when the - resource is unknown, per the WHEP spec.' + description: Idempotent peer teardown by resource id (returned in the Location + header by Subscribe). Returns 204 even when the resource is unknown, per the + WHEP spec. operationId: webrtc-3-whep-unsubscribe parameters: - description: Process ID @@ -3284,6 +3296,38 @@ paths: summary: Tear down a WHEP subscription tags: - v16.16.0 + patch: + consumes: + - application/trickle-ice-sdpfrag + description: Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag. + operationId: webrtc-3-whep-trickle + parameters: + - description: Process ID + in: path + name: id + required: true + type: string + - description: Resource ID from the Subscribe Location header + in: path + name: resource + required: true + type: string + responses: + "204": + description: no content + "400": + description: missing resource id or unreadable body + schema: + type: string + "404": + description: peer not found + schema: + type: string + security: + - ApiKeyAuth: [] + summary: Trickle ICE candidates for a WHEP subscription + tags: + - v16.16.0 /api/v3/widget/process/{id}: get: description: Fetch minimal statistics about a process, which is not protected