Compare commits

..

No commits in common. "de4b21512375e2f7478265bb3c5c909e1169c887" and "0417aff3b109a59ac31de0b813cf8d26df945273" have entirely different histories.

8 changed files with 51 additions and 819 deletions

1
.gitignore vendored
View file

@ -25,4 +25,3 @@
*.flv *.flv
.VSCodeCounter .VSCodeCounter
whep-client

View file

@ -13,91 +13,54 @@ import (
corewebrtc "github.com/datarhei/core/v16/core/webrtc" corewebrtc "github.com/datarhei/core/v16/core/webrtc"
) )
// Default per-stream peer cap when the caller passes 0. The total cap
// (passed to NewHandler) is enforced separately and takes precedence.
const defaultMaxPeersPerStream = 8
// Handler exposes the subsystem's WHEP Echo handlers. Wire them into // Handler exposes the subsystem's WHEP Echo handlers. Wire them into
// the /api/v3 group (or a sibling group) via Handler.Register. // the /api/v3 group (or a sibling group) via Handler.Register.
//
// Lifecycle: peers are tracked in a streamID→resourceID→Peer index.
// On every Subscribe we spin a tiny goroutine watching the new peer's
// Done() channel; when ICE fails or Close() runs the index entry is
// removed and the counters tick back down — no leaks if the browser
// rage-quits.
type Handler struct { type Handler struct {
sub *Subsystem sub *Subsystem
mu sync.Mutex peersMu sync.Mutex
peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer peers map[string]*corewebrtc.Peer // resourceID -> peer
peerStream map[string]string // resource -> streamID (reverse index) count int64 // atomic, for cap check without lock
count int64 // atomic maxCap int64
maxCapTotal int64
maxCapPerStrm int64
} }
// NewHandler wraps the subsystem in an Echo-compatible HTTP handler. // NewHandler wraps the subsystem in an Echo-compatible HTTP handler.
// The maxPeers argument caps concurrent subscribers across all streams; // The maxPeers argument caps concurrent subscribers; pass 0 to use a
// pass 0 to use a generous default (matches corewebrtc.DefaultConfig). // generous default (matches corewebrtc.DefaultConfig).
// The per-stream cap is taken from the corewebrtc default; pass a
// non-zero value to override via NewHandlerWithCaps.
func NewHandler(s *Subsystem, maxPeers int) *Handler { func NewHandler(s *Subsystem, maxPeers int) *Handler {
return NewHandlerWithCaps(s, maxPeers, 0) cap := int64(maxPeers)
if cap <= 0 {
cap = int64(corewebrtc.DefaultConfig().MaxPeersTotal)
}
return &Handler{
sub: s,
peers: make(map[string]*corewebrtc.Peer),
maxCap: cap,
}
} }
// NewHandlerWithCaps is NewHandler plus an explicit per-stream cap. // Register mounts the WHEP routes on the provided Echo group. WHEP
// maxPeersPerStream <= 0 falls back to defaultMaxPeersPerStream. // POST is /whep/:id, WHEP DELETE is /whep/:id/:resource.
func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler {
total := int64(maxPeers)
if total <= 0 {
total = int64(corewebrtc.DefaultConfig().MaxPeersTotal)
}
perStream := int64(maxPeersPerStream)
if perStream <= 0 {
perStream = defaultMaxPeersPerStream
}
h := &Handler{
sub: s,
peersByStream: make(map[string]map[string]*corewebrtc.Peer),
peerStream: make(map[string]string),
maxCapTotal: total,
maxCapPerStrm: perStream,
}
// Subsystem broadcasts process-stop via this hook so the handler
// can yank stale peer entries before their Sources close out
// from underneath them.
if s != nil {
s.SetTeardownHook(h.tearDownStreamPeers)
}
return h
}
// Register mounts the WHEP routes on the provided Echo group.
// //
// CORS preflights are answered on every WHEP path; regular WHEP // The routes are deliberately unauthenticated in M2 because WHEP
// responses also carry the Access-Control-* headers so browser-side // clients (browsers, OBS) don't carry the Core JWT. M3 will add
// players living on a different origin can subscribe. // per-process signed-URL tokens; for M2 the deployment is expected
// to put the endpoint behind an authenticated reverse-proxy or VPN.
func (h *Handler) Register(g *echo.Group) { func (h *Handler) Register(g *echo.Group) {
g.OPTIONS("/whep/:id", h.preflight)
g.OPTIONS("/whep/:id/:resource", h.preflight)
g.POST("/whep/:id", h.Subscribe) g.POST("/whep/:id", h.Subscribe)
g.DELETE("/whep/:id/:resource", h.Unsubscribe) g.DELETE("/whep/:id/:resource", h.Unsubscribe)
g.PATCH("/whep/:id/:resource", h.Trickle)
} }
// Subscribe handles POST /whep/:id. Request body is an SDP offer, // Subscribe handles POST /whep/:id. Request body is an SDP offer,
// response is an SDP answer with a Location header pointing at the // response is an SDP answer with a Location header pointing at the
// DELETE/PATCH resource. // DELETE resource.
func (h *Handler) Subscribe(c echo.Context) error { func (h *Handler) Subscribe(c echo.Context) error {
addCORS(c)
id := c.Param("id") id := c.Param("id")
if id == "" { if id == "" {
return c.String(http.StatusBadRequest, "missing stream id") return c.String(http.StatusBadRequest, "missing stream id")
} }
// Total cap: cheap atomic check before doing real work. if atomic.LoadInt64(&h.count) >= h.maxCap {
if atomic.LoadInt64(&h.count) >= h.maxCapTotal {
return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error()) return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error())
} }
@ -106,14 +69,6 @@ func (h *Handler) Subscribe(c echo.Context) error {
return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error()) return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error())
} }
// Per-stream cap: needs the lock since we're indexing per stream.
h.mu.Lock()
if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm {
h.mu.Unlock()
return c.String(http.StatusServiceUnavailable, "webrtc: per-stream peer cap reached")
}
h.mu.Unlock()
body, err := io.ReadAll(c.Request().Body) body, err := io.ReadAll(c.Request().Body)
if err != nil { if err != nil {
return c.String(http.StatusBadRequest, "read body: "+err.Error()) return c.String(http.StatusBadRequest, "read body: "+err.Error())
@ -121,227 +76,59 @@ func (h *Handler) Subscribe(c echo.Context) error {
if len(body) == 0 || !strings.HasPrefix(string(body), "v=") { if len(body) == 0 || !strings.HasPrefix(string(body), "v=") {
return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error()) return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error())
} }
if err := requireH264AndOpus(string(body)); err != nil {
return c.String(http.StatusNotAcceptable, err.Error())
}
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer) peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer)
if err != nil { if err != nil {
// Surface the design's error matrix. return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
switch err {
case corewebrtc.ErrICETimeout:
return c.String(http.StatusGatewayTimeout, err.Error())
case corewebrtc.ErrCodecMismatch:
return c.String(http.StatusNotAcceptable, err.Error())
default:
return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
}
} }
rid := peer.ResourceID() h.peersMu.Lock()
h.mu.Lock() h.peers[peer.ResourceID()] = peer
if h.peersByStream[id] == nil { h.peersMu.Unlock()
h.peersByStream[id] = make(map[string]*corewebrtc.Peer)
}
h.peersByStream[id][rid] = peer
h.peerStream[rid] = id
h.mu.Unlock()
atomic.AddInt64(&h.count, 1) atomic.AddInt64(&h.count, 1)
// Auto-cleanup: when Pion's OnConnectionStateChange triggers
// peer.Close() (ICE failed/disconnected), the Done channel
// closes and we yank the index entry. Without this the map
// leaks for the lifetime of the handler.
go h.awaitPeerClose(rid, peer)
c.Response().Header().Set("Content-Type", "application/sdp") c.Response().Header().Set("Content-Type", "application/sdp")
c.Response().Header().Set("Location", "/whep/"+id+"/"+rid) c.Response().Header().Set("Location", "/whep/"+id+"/"+peer.ResourceID())
c.Response().Header().Set("ETag", `"`+rid+`"`)
return c.String(http.StatusCreated, peer.Answer().SDP) return c.String(http.StatusCreated, peer.Answer().SDP)
} }
// Unsubscribe handles DELETE /whep/:id/:resource. Per WHEP spec we // Unsubscribe handles DELETE /whep/:id/:resource. The :id is part of
// return 204 even when the resource is unknown — DELETE is idempotent // the path per WHEP spec but we only need :resource to locate the
// and a re-issued tear-down should never error out. // peer; :id is accepted for route symmetry.
func (h *Handler) Unsubscribe(c echo.Context) error { func (h *Handler) Unsubscribe(c echo.Context) error {
addCORS(c)
resource := c.Param("resource") resource := c.Param("resource")
if resource == "" { if resource == "" {
return c.String(http.StatusBadRequest, "missing resource id") return c.String(http.StatusBadRequest, "missing resource id")
} }
h.mu.Lock() h.peersMu.Lock()
streamID := h.peerStream[resource] peer, ok := h.peers[resource]
var peer *corewebrtc.Peer if ok {
if streamID != "" { delete(h.peers, resource)
peer = h.peersByStream[streamID][resource]
delete(h.peersByStream[streamID], resource)
if len(h.peersByStream[streamID]) == 0 {
delete(h.peersByStream, streamID)
}
delete(h.peerStream, resource)
} }
h.mu.Unlock() h.peersMu.Unlock()
if peer != nil { if !ok {
_ = peer.Close()
}
if streamID != "" {
atomic.AddInt64(&h.count, -1)
}
return c.NoContent(http.StatusNoContent)
}
// Trickle handles PATCH /whep/:id/:resource — adds ICE candidates
// from a trickle-ice-sdpfrag body. Empty body is a no-op (clients
// signal end-of-candidates via an a=end-of-candidates line, which
// AddICECandidate accepts).
func (h *Handler) Trickle(c echo.Context) error {
addCORS(c)
resource := c.Param("resource")
if resource == "" {
return c.String(http.StatusBadRequest, "missing resource id")
}
h.mu.Lock()
streamID := h.peerStream[resource]
var peer *corewebrtc.Peer
if streamID != "" {
peer = h.peersByStream[streamID][resource]
}
h.mu.Unlock()
if peer == nil {
return c.NoContent(http.StatusNotFound) return c.NoContent(http.StatusNotFound)
} }
_ = peer.Close()
body, err := io.ReadAll(c.Request().Body) atomic.AddInt64(&h.count, -1)
if err != nil {
return c.String(http.StatusBadRequest, "read body: "+err.Error())
}
for _, line := range strings.Split(string(body), "\n") {
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "a=candidate:") {
continue
}
cand := strings.TrimPrefix(line, "a=")
_ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand})
}
return c.NoContent(http.StatusNoContent)
}
// preflight answers a CORS OPTIONS request; the headers are also
// echoed on every other response.
func (h *Handler) preflight(c echo.Context) error {
addCORS(c)
return c.NoContent(http.StatusNoContent) return c.NoContent(http.StatusNoContent)
} }
// Close tears down every active peer (e.g., during Core shutdown). // Close tears down every active peer (e.g., during Core shutdown).
func (h *Handler) Close() { func (h *Handler) Close() {
h.mu.Lock() h.peersMu.Lock()
peers := make([]*corewebrtc.Peer, 0) peers := make([]*corewebrtc.Peer, 0, len(h.peers))
for _, m := range h.peersByStream { for _, p := range h.peers {
for _, p := range m { peers = append(peers, p)
peers = append(peers, p)
}
} }
h.peersByStream = make(map[string]map[string]*corewebrtc.Peer) h.peers = make(map[string]*corewebrtc.Peer)
h.peerStream = make(map[string]string) h.peersMu.Unlock()
h.mu.Unlock()
for _, p := range peers { for _, p := range peers {
if p != nil { _ = p.Close()
_ = p.Close()
}
} }
atomic.StoreInt64(&h.count, 0) atomic.StoreInt64(&h.count, 0)
} }
// awaitPeerClose blocks on peer.Done() and yanks the index entry when
// the peer self-closes (ICE failed/disconnected). Idempotent with
// the Unsubscribe path: if Unsubscribe ran first the index is already
// empty and we just decrement the counter once on first arrival.
func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) {
<-peer.Done()
h.mu.Lock()
streamID := h.peerStream[resource]
_, present := h.peerStream[resource]
if present {
delete(h.peerStream, resource)
if streamID != "" {
delete(h.peersByStream[streamID], resource)
if len(h.peersByStream[streamID]) == 0 {
delete(h.peersByStream, streamID)
}
}
}
h.mu.Unlock()
if present {
atomic.AddInt64(&h.count, -1)
}
}
// tearDownStreamPeers is the callback the Subsystem runs in its
// onProcessStop hook. It closes every peer subscribed to that
// stream (driving each one's Done() and indirectly awaitPeerClose).
func (h *Handler) tearDownStreamPeers(streamID string) {
h.mu.Lock()
bucket := h.peersByStream[streamID]
peers := make([]*corewebrtc.Peer, 0, len(bucket))
for _, p := range bucket {
peers = append(peers, p)
}
h.mu.Unlock()
for _, p := range peers {
if p != nil {
_ = p.Close()
}
}
}
// addCORS emits the response headers a browser-side WHEP player
// expects. WHEP's Location and ETag headers must be exposed for
// fetch() to read them across origins.
func addCORS(c echo.Context) {
hh := c.Response().Header()
hh.Set("Access-Control-Allow-Origin", "*")
hh.Set("Access-Control-Allow-Methods", "POST, DELETE, PATCH, OPTIONS")
hh.Set("Access-Control-Allow-Headers", "Content-Type, Authorization, If-Match, If-None-Match")
hh.Set("Access-Control-Expose-Headers", "Location, ETag")
}
// requireH264AndOpus does a coarse SDP scan to confirm the offer
// includes both an H.264 video rtpmap and an Opus audio rtpmap. The
// design treats codec mismatch as a 406, never a silent black frame.
//
// This is intentionally a string scan rather than a full SDP parse:
// every modern browser advertises H.264 and Opus by name, and a
// dependency on a real SDP parser for one validation step is
// disproportionate. M4 may swap this for pion/sdp.v3 when other
// surfaces also need parsing.
func requireH264AndOpus(sdp string) error {
lower := strings.ToLower(sdp)
hasH264 := strings.Contains(lower, "h264/90000") || strings.Contains(lower, " h264/")
hasOpus := strings.Contains(lower, "opus/48000") || strings.Contains(lower, " opus/")
if hasH264 && hasOpus {
return nil
}
missing := []string{}
if !hasH264 {
missing = append(missing, "H264")
}
if !hasOpus {
missing = append(missing, "Opus")
}
return &codecMismatchError{missing: missing}
}
type codecMismatchError struct{ missing []string }
func (e *codecMismatchError) Error() string {
return "webrtc: codec mismatch — offer is missing: " + strings.Join(e.missing, ", ")
}

View file

@ -1,251 +0,0 @@
package webrtc
import (
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"github.com/labstack/echo/v4"
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
)
// minimalH264OpusOffer returns an SDP offer that includes both H264
// and Opus rtpmap lines — passes requireH264AndOpus but is otherwise
// nonsense, so CreatePeerFromSources will fail downstream when this
// is wired through. Use it only in tests that don't reach the
// PeerConnection path.
func minimalH264OpusOffer() string {
return "v=0\r\n" +
"o=- 0 0 IN IP4 0.0.0.0\r\ns=-\r\nt=0 0\r\n" +
"m=video 9 UDP/TLS/RTP/SAVPF 102\r\n" +
"a=rtpmap:102 H264/90000\r\n" +
"m=audio 9 UDP/TLS/RTP/SAVPF 111\r\n" +
"a=rtpmap:111 opus/48000/2\r\n"
}
// nonH264Offer is missing H264 entirely. Triggers requireH264AndOpus.
func nonH264Offer() string {
return "v=0\r\n" +
"m=video 9 UDP/TLS/RTP/SAVPF 96\r\n" +
"a=rtpmap:96 VP8/90000\r\n" +
"m=audio 9 UDP/TLS/RTP/SAVPF 111\r\n" +
"a=rtpmap:111 opus/48000/2\r\n"
}
// TestHandler_Subscribe_406OnCodecMismatch verifies an offer that
// doesn't include H264 yields 406, per the design's error matrix.
func TestHandler_Subscribe_406OnCodecMismatch(t *testing.T) {
sub := newTestSubsystem(t)
sub.mu.Lock()
sub.streams["s"] = &processStream{id: "s"}
sub.mu.Unlock()
h := NewHandler(sub, 0)
e := echo.New()
req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(nonH264Offer()))
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("s")
if err := h.Subscribe(c); err != nil {
t.Fatalf("Subscribe: %v", err)
}
if rec.Code != http.StatusNotAcceptable {
t.Fatalf("expected 406, got %d: %s", rec.Code, rec.Body.String())
}
if !strings.Contains(rec.Body.String(), "H264") {
t.Errorf("body should mention missing codec: %q", rec.Body.String())
}
}
// TestHandler_Subscribe_503OnTotalCap simulates the total cap being
// exhausted by another subscriber. We don't actually create real peers
// (would need a real PeerConnection); instead we pre-load the atomic
// counter so the cap check fires.
func TestHandler_Subscribe_503OnTotalCap(t *testing.T) {
sub := newTestSubsystem(t)
sub.mu.Lock()
sub.streams["s"] = &processStream{id: "s"}
sub.mu.Unlock()
h := NewHandlerWithCaps(sub, 1, 100)
atomic.StoreInt64(&h.count, 1) // simulate one in-flight peer
e := echo.New()
req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(minimalH264OpusOffer()))
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("s")
_ = h.Subscribe(c)
if rec.Code != http.StatusServiceUnavailable {
t.Fatalf("expected 503, got %d: %s", rec.Code, rec.Body.String())
}
if !strings.Contains(rec.Body.String(), corewebrtc.ErrPeerCapReached.Error()) {
t.Errorf("body should mention peer cap: %q", rec.Body.String())
}
}
// TestHandler_Subscribe_503OnPerStreamCap simulates the per-stream cap
// being exhausted. Same trick as above but populating the per-stream
// index directly.
func TestHandler_Subscribe_503OnPerStreamCap(t *testing.T) {
sub := newTestSubsystem(t)
sub.mu.Lock()
sub.streams["s"] = &processStream{id: "s"}
sub.mu.Unlock()
h := NewHandlerWithCaps(sub, 100, 1)
// Drop a placeholder peer into the per-stream bucket so the cap
// arithmetic trips on the next subscribe.
h.mu.Lock()
h.peersByStream["s"] = map[string]*corewebrtc.Peer{"existing": nil}
h.mu.Unlock()
e := echo.New()
req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(minimalH264OpusOffer()))
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("s")
_ = h.Subscribe(c)
if rec.Code != http.StatusServiceUnavailable {
t.Fatalf("expected 503, got %d: %s", rec.Code, rec.Body.String())
}
if !strings.Contains(rec.Body.String(), "per-stream") {
t.Errorf("body should mention per-stream cap: %q", rec.Body.String())
}
}
// TestHandler_Trickle_404WhenUnknown verifies a PATCH for an unknown
// resource returns 404 (we still treat the resource as authoritative
// here; only DELETE is idempotent per spec).
func TestHandler_Trickle_404WhenUnknown(t *testing.T) {
h := NewHandler(newTestSubsystem(t), 0)
e := echo.New()
req := httptest.NewRequest(http.MethodPatch, "/whep/id/unknown", strings.NewReader(""))
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id", "resource")
c.SetParamValues("id", "unknown")
if err := h.Trickle(c); err != nil {
t.Fatalf("Trickle: %v", err)
}
if rec.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d", rec.Code)
}
}
// TestHandler_PreflightCORS verifies OPTIONS returns 204 with the
// browser-friendly CORS headers.
func TestHandler_PreflightCORS(t *testing.T) {
h := NewHandler(newTestSubsystem(t), 0)
e := echo.New()
req := httptest.NewRequest(http.MethodOptions, "/whep/x", nil)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("x")
if err := h.preflight(c); err != nil {
t.Fatalf("preflight: %v", err)
}
if rec.Code != http.StatusNoContent {
t.Fatalf("expected 204, got %d", rec.Code)
}
hh := rec.Header()
for _, k := range []string{
"Access-Control-Allow-Origin",
"Access-Control-Allow-Methods",
"Access-Control-Allow-Headers",
"Access-Control-Expose-Headers",
} {
if hh.Get(k) == "" {
t.Errorf("missing CORS header %q", k)
}
}
}
// TestHandler_RegisterMountsAllRoutes is a sanity check that
// Handler.Register installs OPTIONS / POST / DELETE / PATCH on the
// expected paths. Echo's Group has no public route enumerator, so we
// dispatch synthetic requests and assert the right methods are
// reachable.
func TestHandler_RegisterMountsAllRoutes(t *testing.T) {
h := NewHandler(newTestSubsystem(t), 0)
e := echo.New()
g := e.Group("")
h.Register(g)
cases := []struct {
method, path string
want int
}{
{http.MethodOptions, "/whep/foo", http.StatusNoContent},
{http.MethodOptions, "/whep/foo/bar", http.StatusNoContent},
{http.MethodPost, "/whep/foo", http.StatusNotFound}, // stream missing -> 404
{http.MethodDelete, "/whep/foo/bar", http.StatusNoContent},
{http.MethodPatch, "/whep/foo/bar", http.StatusNotFound},
}
for _, tc := range cases {
req := httptest.NewRequest(tc.method, tc.path, strings.NewReader(""))
rec := httptest.NewRecorder()
e.ServeHTTP(rec, req)
if rec.Code != tc.want {
t.Errorf("%s %s: got %d want %d (%s)", tc.method, tc.path, rec.Code, tc.want, rec.Body.String())
}
}
}
// TestHandler_Close_DrainsPeers seeds a fake peer into the index and
// verifies Close clears it without panicking.
func TestHandler_Close_DrainsPeers(t *testing.T) {
h := NewHandler(newTestSubsystem(t), 0)
h.mu.Lock()
h.peersByStream["s"] = map[string]*corewebrtc.Peer{"r1": nil}
h.peerStream["r1"] = "s"
atomic.StoreInt64(&h.count, 1)
h.mu.Unlock()
h.Close()
if got := atomic.LoadInt64(&h.count); got != 0 {
t.Errorf("count after Close = %d, want 0", got)
}
h.mu.Lock()
if len(h.peersByStream) != 0 || len(h.peerStream) != 0 {
t.Errorf("indexes not cleared")
}
h.mu.Unlock()
}
// TestRequireH264AndOpus covers the SDP scanner's positive +
// negative cases.
func TestRequireH264AndOpus(t *testing.T) {
cases := []struct {
name string
sdp string
ok bool
}{
{"both", minimalH264OpusOffer(), true},
{"missing h264", nonH264Offer(), false},
{"missing opus", "m=video 9 UDP/TLS/RTP/SAVPF 102\r\na=rtpmap:102 H264/90000\r\n", false},
{"capitalized", "a=rtpmap:111 OPUS/48000\r\na=rtpmap:102 H264/90000", true},
{"empty", "", false},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
err := requireH264AndOpus(c.sdp)
if c.ok && err != nil {
t.Errorf("expected ok, got %v", err)
}
if !c.ok && err == nil {
t.Errorf("expected error")
}
})
}
}

View file

@ -68,11 +68,9 @@ func TestHandler_Subscribe_400OnEmptyBody(t *testing.T) {
} }
} }
// TestHandler_Unsubscribe_204WhenUnknown verifies a DELETE with an // TestHandler_Unsubscribe_404WhenUnknown verifies a DELETE with an
// unknown resource id returns 204 (idempotent), per the WHEP spec // unknown resource id returns 404 and no state mutation.
// and the M2/M3 design's error matrix. Pre-M3 this returned 404; the func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) {
// updated semantics let clients re-issue DELETE without erroring.
func TestHandler_Unsubscribe_204WhenUnknown(t *testing.T) {
h := NewHandler(newTestSubsystem(t), 0) h := NewHandler(newTestSubsystem(t), 0)
e := echo.New() e := echo.New()
@ -85,7 +83,7 @@ func TestHandler_Unsubscribe_204WhenUnknown(t *testing.T) {
if err := h.Unsubscribe(c); err != nil { if err := h.Unsubscribe(c); err != nil {
t.Fatalf("Unsubscribe returned error: %v", err) t.Fatalf("Unsubscribe returned error: %v", err)
} }
if rec.Code != http.StatusNoContent { if rec.Code != http.StatusNotFound {
t.Fatalf("expected 204, got %d", rec.Code) t.Fatalf("expected 404, got %d", rec.Code)
} }
} }

View file

@ -94,7 +94,6 @@ func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.Conf
func (s *Subsystem) onProcessStop(id string) { func (s *Subsystem) onProcessStop(id string) {
s.mu.Lock() s.mu.Lock()
st, ok := s.streams[id] st, ok := s.streams[id]
teardown := s.teardown
if ok { if ok {
delete(s.streams, id) delete(s.streams, id)
} }
@ -103,16 +102,6 @@ func (s *Subsystem) onProcessStop(id string) {
if !ok { if !ok {
return return
} }
// Broadcast first, so any subscribed peers get torn down while
// the streamID is still meaningful. The handler's tearDownStreamPeers
// drives each Peer.Close() which in turn unsubscribes from the
// Sources we're about to shut down — preventing a "subscribers fan
// out into a closed channel" race.
if teardown != nil {
teardown(id)
}
if st.video != nil { if st.video != nil {
_ = st.video.Close() _ = st.video.Close()
} }

View file

@ -1,257 +0,0 @@
package webrtc
import (
"net"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/labstack/echo/v4"
pionwebrtc "github.com/pion/webrtc/v4"
"github.com/datarhei/core/v16/config"
appcfg "github.com/datarhei/core/v16/restream/app"
)
// TestIntegration_FiveViewerFanout drives the M3 acceptance criterion
// "5 concurrent viewers, all error paths correct, clean teardown" in
// the wide direction. Five Pion subscribers attach to a single
// process's stream pair and each receives RTP without crosstalk; on
// teardown every subscriber's PeerConnection observes its tracks
// closing.
//
// Verifies (in order):
// * subsystem.onProcessStart returns adjacent UDP ports
// * 5 WHEP POSTs in parallel succeed (per-stream cap default = 8)
// * every subscriber's video and audio track receives at least one
// RTP packet within the timeout
// * onProcessStop tears every subscriber down (PeerConnection
// transitions away from connected/connecting)
func TestIntegration_FiveViewerFanout(t *testing.T) {
const N = 5
sub, err := New(config.DataWebRTC{Enable: true}, nil)
if err != nil {
t.Fatalf("subsystem New: %v", err)
}
defer sub.Close()
h := NewHandler(sub, 0)
defer h.Close()
processID := "fanout"
legs, err := sub.onProcessStart(processID, &appcfg.Config{
ID: processID,
WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111},
})
if err != nil {
t.Fatalf("onProcessStart: %v", err)
}
if len(legs) != 2 {
t.Fatalf("expected 2 legs, got %d", len(legs))
}
videoPort, err := portFromLegAddress(legs[0].Address)
if err != nil {
t.Fatalf("video port: %v", err)
}
audioPort, err := portFromLegAddress(legs[1].Address)
if err != nil {
t.Fatalf("audio port: %v", err)
}
e := echo.New()
g := e.Group("")
h.Register(g)
srv := httptest.NewServer(e)
defer srv.Close()
// Each subscriber tracks first-RTP-received signals for V and A.
type viewer struct {
pc *pionwebrtc.PeerConnection
videoCh chan struct{}
audioCh chan struct{}
}
viewers := make([]*viewer, N)
api := func() *pionwebrtc.API {
me := &pionwebrtc.MediaEngine{}
_ = me.RegisterDefaultCodecs()
return pionwebrtc.NewAPI(pionwebrtc.WithMediaEngine(me))
}()
subscribe := func(i int) error {
pc, err := api.NewPeerConnection(pionwebrtc.Configuration{})
if err != nil {
return err
}
v := &viewer{pc: pc, videoCh: make(chan struct{}, 1), audioCh: make(chan struct{}, 1)}
viewers[i] = v
var vGot, aGot atomic.Bool
pc.OnTrack(func(tr *pionwebrtc.TrackRemote, _ *pionwebrtc.RTPReceiver) {
go func() {
if _, _, rerr := tr.ReadRTP(); rerr != nil {
return
}
switch tr.Kind() {
case pionwebrtc.RTPCodecTypeVideo:
if vGot.CompareAndSwap(false, true) {
v.videoCh <- struct{}{}
}
case pionwebrtc.RTPCodecTypeAudio:
if aGot.CompareAndSwap(false, true) {
v.audioCh <- struct{}{}
}
}
}()
})
_, _ = pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeVideo,
pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly})
_, _ = pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeAudio,
pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly})
offer, err := pc.CreateOffer(nil)
if err != nil {
return err
}
gather := pionwebrtc.GatheringCompletePromise(pc)
if err := pc.SetLocalDescription(offer); err != nil {
return err
}
<-gather
resp, err := http.Post(srv.URL+"/whep/"+processID, "application/sdp",
strings.NewReader(pc.LocalDescription().SDP))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
t.Errorf("viewer %d: WHEP %d", i, resp.StatusCode)
return nil
}
buf := make([]byte, 1<<15)
n, _ := resp.Body.Read(buf)
return pc.SetRemoteDescription(pionwebrtc.SessionDescription{
Type: pionwebrtc.SDPTypeAnswer,
SDP: string(buf[:n]),
})
}
// Subscribe all N viewers in parallel.
var wg sync.WaitGroup
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
if err := subscribe(i); err != nil {
t.Errorf("viewer %d subscribe: %v", i, err)
}
}(i)
}
wg.Wait()
for i := 0; i < N; i++ {
if viewers[i] == nil || viewers[i].pc == nil {
t.Fatalf("viewer %d not constructed", i)
}
defer viewers[i].pc.Close()
}
// Spray RTP into both ports until every viewer reports first-RTP.
videoSender, _ := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(videoPort))
audioSender, _ := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(audioPort))
defer videoSender.Close()
defer audioSender.Close()
stop := make(chan struct{})
go func() {
ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()
var seq uint16
for {
select {
case <-stop:
return
case <-ticker.C:
seq++
_, _ = videoSender.Write(synthRTPPacket(102, seq, uint32(seq)*3000, 0xcafe0000, []byte("vvvvvvvv")))
_, _ = audioSender.Write(synthRTPPacket(111, seq, uint32(seq)*960, 0xbeef0000, []byte("aaaaaaaa")))
}
}
}()
defer close(stop)
deadline := time.After(15 * time.Second)
for i, v := range viewers {
select {
case <-v.videoCh:
case <-deadline:
t.Fatalf("viewer %d: no video RTP within 15s", i)
}
select {
case <-v.audioCh:
case <-deadline:
t.Fatalf("viewer %d: no audio RTP within 15s", i)
}
}
// Confirm the per-stream peer index has all N entries.
h.mu.Lock()
got := len(h.peersByStream[processID])
h.mu.Unlock()
if got != N {
t.Errorf("peersByStream[%s] = %d, want %d", processID, got, N)
}
// Tear the process down — every viewer's PC should observe state
// transitioning away from connected within a short window.
sub.onProcessStop(processID)
// After teardown the peer index for this stream should be empty.
// Closing peers is async (driven by Done channel), so poll briefly.
deadline2 := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline2) {
h.mu.Lock()
empty := len(h.peersByStream[processID]) == 0
h.mu.Unlock()
if empty {
break
}
time.Sleep(50 * time.Millisecond)
}
h.mu.Lock()
leftover := len(h.peersByStream[processID])
h.mu.Unlock()
if leftover != 0 {
t.Errorf("after onProcessStop, %d peers remain in index", leftover)
}
}
// TestSubsystem_TeardownHookFiresOnProcessStop is a unit-level check
// that the teardown callback the Handler installs actually runs.
func TestSubsystem_TeardownHookFiresOnProcessStop(t *testing.T) {
sub, err := New(config.DataWebRTC{Enable: true}, nil)
if err != nil {
t.Fatalf("New: %v", err)
}
defer sub.Close()
var fired atomic.Int32
sub.SetTeardownHook(func(streamID string) {
if streamID == "p1" {
fired.Add(1)
}
})
if _, err := sub.onProcessStart("p1", &appcfg.Config{
ID: "p1",
WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111},
}); err != nil {
t.Fatalf("onProcessStart: %v", err)
}
sub.onProcessStop("p1")
if got := fired.Load(); got != 1 {
t.Errorf("teardown fired %d times, want 1", got)
}
}

View file

@ -31,12 +31,6 @@ type Subsystem struct {
mu sync.Mutex mu sync.Mutex
streams map[string]*processStream // processID -> stream pair streams map[string]*processStream // processID -> stream pair
// teardown is set by the Handler (or any other consumer) so the
// Subsystem can broadcast process-stop events. Called *before*
// the per-stream Sources are closed, so consumers can yank their
// own indexes while the stream id is still valid.
teardown func(streamID string)
} }
// processStream captures the two Sources (video + audio) backing a // processStream captures the two Sources (video + audio) backing a
@ -116,19 +110,6 @@ func (s *Subsystem) Close() {
} }
} }
// SetTeardownHook registers a callback invoked just before a stream's
// Sources are closed in onProcessStop. The callback is expected to
// tear down any external resources keyed by streamID — most importantly
// the WHEP Handler's per-stream peer index.
//
// Calling SetTeardownHook again replaces the previous callback; pass
// nil to detach. Only one consumer is supported by design.
func (s *Subsystem) SetTeardownHook(fn func(streamID string)) {
s.mu.Lock()
defer s.mu.Unlock()
s.teardown = fn
}
// lookup returns the per-process stream pair for id, or nil, false. // lookup returns the per-process stream pair for id, or nil, false.
// Used by the WHEP handler. // Used by the WHEP handler.
func (s *Subsystem) lookup(id string) (*processStream, bool) { func (s *Subsystem) lookup(id string) (*processStream, bool) {

View file

@ -152,12 +152,6 @@ func (p *Peer) Answer() webrtc.SessionDescription { return p.answer }
// ResourceID returns the stable resource id used in the WHEP Location header. // ResourceID returns the stable resource id used in the WHEP Location header.
func (p *Peer) ResourceID() string { return p.resourceID } func (p *Peer) ResourceID() string { return p.resourceID }
// Done returns a channel that is closed when the Peer has been torn down
// (either explicitly via Close, or because Pion observed an ICE
// failure / disconnection). Consumers can range over it to drive
// index cleanup without polling.
func (p *Peer) Done() <-chan struct{} { return p.done }
// Close tears down the peer connection and unsubscribes from each // Close tears down the peer connection and unsubscribes from each
// source. Safe to call multiple times. // source. Safe to call multiple times.
func (p *Peer) Close() error { func (p *Peer) Close() error {
@ -263,14 +257,6 @@ func (f *PeerFactory) CreatePeerFromSources(ctx context.Context,
return p, nil return p, nil
} }
// AddICECandidate forwards a trickle-ICE candidate to the underlying
// PeerConnection. Returns the underlying error if the candidate is
// malformed or the connection has already been closed.
func (p *Peer) AddICECandidate(c webrtc.ICECandidateInit) error {
return p.pc.AddICECandidate(c)
}
func newResourceID() string { func newResourceID() string {
b := make([]byte, 8) b := make([]byte, 8)
_, _ = rand.Read(b) _, _ = rand.Read(b)