Merge branch 'm3-robustness' into m2-webrtc-core-integration
Conflict resolution: keep M3's full handler.go rewrite (per-stream index, error matrix, PATCH, CORS, auto-cleanup) and re-apply the swagger annotations from #7 onto the new function declarations, including a fresh annotation for the M3-introduced Trickle endpoint. Swagger docs regenerated to pick up all three. Race-clean: go test -race ./app/webrtc/... green.
This commit is contained in:
commit
6eaf346d06
11 changed files with 1026 additions and 65 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -25,3 +25,4 @@
|
||||||
*.flv
|
*.flv
|
||||||
|
|
||||||
.VSCodeCounter
|
.VSCodeCounter
|
||||||
|
whep-client
|
||||||
|
|
|
||||||
|
|
@ -13,50 +13,84 @@ import (
|
||||||
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Default per-stream peer cap when the caller passes 0. The total cap
|
||||||
|
// (passed to NewHandler) is enforced separately and takes precedence.
|
||||||
|
const defaultMaxPeersPerStream = 8
|
||||||
|
|
||||||
// Handler exposes the subsystem's WHEP Echo handlers. Wire them into
|
// Handler exposes the subsystem's WHEP Echo handlers. Wire them into
|
||||||
// the /api/v3 group (or a sibling group) via Handler.Register.
|
// the /api/v3 group (or a sibling group) via Handler.Register.
|
||||||
|
//
|
||||||
|
// Lifecycle: peers are tracked in a streamID→resourceID→Peer index.
|
||||||
|
// On every Subscribe we spin a tiny goroutine watching the new peer's
|
||||||
|
// Done() channel; when ICE fails or Close() runs the index entry is
|
||||||
|
// removed and the counters tick back down — no leaks if the browser
|
||||||
|
// rage-quits.
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
sub *Subsystem
|
sub *Subsystem
|
||||||
|
|
||||||
peersMu sync.Mutex
|
mu sync.Mutex
|
||||||
peers map[string]*corewebrtc.Peer // resourceID -> peer
|
peersByStream map[string]map[string]*corewebrtc.Peer // streamID -> resource -> peer
|
||||||
count int64 // atomic, for cap check without lock
|
peerStream map[string]string // resource -> streamID (reverse index)
|
||||||
maxCap int64
|
count int64 // atomic
|
||||||
|
maxCapTotal int64
|
||||||
|
maxCapPerStrm int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHandler wraps the subsystem in an Echo-compatible HTTP handler.
|
// NewHandler wraps the subsystem in an Echo-compatible HTTP handler.
|
||||||
// The maxPeers argument caps concurrent subscribers; pass 0 to use a
|
// The maxPeers argument caps concurrent subscribers across all streams;
|
||||||
// generous default (matches corewebrtc.DefaultConfig).
|
// pass 0 to use a generous default (matches corewebrtc.DefaultConfig).
|
||||||
|
// The per-stream cap is taken from the corewebrtc default; pass a
|
||||||
|
// non-zero value to override via NewHandlerWithCaps.
|
||||||
func NewHandler(s *Subsystem, maxPeers int) *Handler {
|
func NewHandler(s *Subsystem, maxPeers int) *Handler {
|
||||||
cap := int64(maxPeers)
|
return NewHandlerWithCaps(s, maxPeers, 0)
|
||||||
if cap <= 0 {
|
|
||||||
cap = int64(corewebrtc.DefaultConfig().MaxPeersTotal)
|
|
||||||
}
|
|
||||||
return &Handler{
|
|
||||||
sub: s,
|
|
||||||
peers: make(map[string]*corewebrtc.Peer),
|
|
||||||
maxCap: cap,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register mounts the WHEP routes on the provided Echo group. WHEP
|
// NewHandlerWithCaps is NewHandler plus an explicit per-stream cap.
|
||||||
// POST is /whep/:id, WHEP DELETE is /whep/:id/:resource.
|
// maxPeersPerStream <= 0 falls back to defaultMaxPeersPerStream.
|
||||||
|
func NewHandlerWithCaps(s *Subsystem, maxPeers, maxPeersPerStream int) *Handler {
|
||||||
|
total := int64(maxPeers)
|
||||||
|
if total <= 0 {
|
||||||
|
total = int64(corewebrtc.DefaultConfig().MaxPeersTotal)
|
||||||
|
}
|
||||||
|
perStream := int64(maxPeersPerStream)
|
||||||
|
if perStream <= 0 {
|
||||||
|
perStream = defaultMaxPeersPerStream
|
||||||
|
}
|
||||||
|
h := &Handler{
|
||||||
|
sub: s,
|
||||||
|
peersByStream: make(map[string]map[string]*corewebrtc.Peer),
|
||||||
|
peerStream: make(map[string]string),
|
||||||
|
maxCapTotal: total,
|
||||||
|
maxCapPerStrm: perStream,
|
||||||
|
}
|
||||||
|
// Subsystem broadcasts process-stop via this hook so the handler
|
||||||
|
// can yank stale peer entries before their Sources close out
|
||||||
|
// from underneath them.
|
||||||
|
if s != nil {
|
||||||
|
s.SetTeardownHook(h.tearDownStreamPeers)
|
||||||
|
}
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register mounts the WHEP routes on the provided Echo group.
|
||||||
//
|
//
|
||||||
// The routes are deliberately unauthenticated in M2 because WHEP
|
// CORS preflights are answered on every WHEP path; regular WHEP
|
||||||
// clients (browsers, OBS) don't carry the Core JWT. M3 will add
|
// responses also carry the Access-Control-* headers so browser-side
|
||||||
// per-process signed-URL tokens; for M2 the deployment is expected
|
// players living on a different origin can subscribe.
|
||||||
// to put the endpoint behind an authenticated reverse-proxy or VPN.
|
|
||||||
func (h *Handler) Register(g *echo.Group) {
|
func (h *Handler) Register(g *echo.Group) {
|
||||||
|
g.OPTIONS("/whep/:id", h.preflight)
|
||||||
|
g.OPTIONS("/whep/:id/:resource", h.preflight)
|
||||||
g.POST("/whep/:id", h.Subscribe)
|
g.POST("/whep/:id", h.Subscribe)
|
||||||
g.DELETE("/whep/:id/:resource", h.Unsubscribe)
|
g.DELETE("/whep/:id/:resource", h.Unsubscribe)
|
||||||
|
g.PATCH("/whep/:id/:resource", h.Trickle)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe handles POST /whep/:id. Request body is an SDP offer,
|
// Subscribe handles POST /whep/:id. Request body is an SDP offer,
|
||||||
// response is an SDP answer with a Location header pointing at the
|
// response is an SDP answer with a Location header pointing at the
|
||||||
// DELETE resource.
|
// DELETE/PATCH resource.
|
||||||
//
|
//
|
||||||
// @Summary Subscribe to a WebRTC stream via WHEP
|
// @Summary Subscribe to a WebRTC stream via WHEP
|
||||||
// @Description Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE resource for teardown.
|
// @Description Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE.
|
||||||
// @Tags v16.16.0
|
// @Tags v16.16.0
|
||||||
// @ID webrtc-3-whep-subscribe
|
// @ID webrtc-3-whep-subscribe
|
||||||
// @Accept application/sdp
|
// @Accept application/sdp
|
||||||
|
|
@ -65,16 +99,21 @@ func (h *Handler) Register(g *echo.Group) {
|
||||||
// @Success 201 {string} string "SDP answer"
|
// @Success 201 {string} string "SDP answer"
|
||||||
// @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP"
|
// @Failure 400 {string} string "missing stream id, malformed body, or invalid SDP"
|
||||||
// @Failure 404 {string} string "no stream registered for this process id"
|
// @Failure 404 {string} string "no stream registered for this process id"
|
||||||
// @Failure 503 {string} string "peer cap reached"
|
// @Failure 406 {string} string "offer SDP missing required H264 / Opus rtpmap"
|
||||||
|
// @Failure 503 {string} string "peer cap reached (per-stream or total)"
|
||||||
|
// @Failure 504 {string} string "ICE gathering timeout"
|
||||||
// @Security ApiKeyAuth
|
// @Security ApiKeyAuth
|
||||||
// @Router /api/v3/whep/{id} [post]
|
// @Router /api/v3/whep/{id} [post]
|
||||||
func (h *Handler) Subscribe(c echo.Context) error {
|
func (h *Handler) Subscribe(c echo.Context) error {
|
||||||
|
addCORS(c)
|
||||||
|
|
||||||
id := c.Param("id")
|
id := c.Param("id")
|
||||||
if id == "" {
|
if id == "" {
|
||||||
return c.String(http.StatusBadRequest, "missing stream id")
|
return c.String(http.StatusBadRequest, "missing stream id")
|
||||||
}
|
}
|
||||||
|
|
||||||
if atomic.LoadInt64(&h.count) >= h.maxCap {
|
// Total cap: cheap atomic check before doing real work.
|
||||||
|
if atomic.LoadInt64(&h.count) >= h.maxCapTotal {
|
||||||
return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error())
|
return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -83,6 +122,14 @@ func (h *Handler) Subscribe(c echo.Context) error {
|
||||||
return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error())
|
return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Per-stream cap: needs the lock since we're indexing per stream.
|
||||||
|
h.mu.Lock()
|
||||||
|
if int64(len(h.peersByStream[id])) >= h.maxCapPerStrm {
|
||||||
|
h.mu.Unlock()
|
||||||
|
return c.String(http.StatusServiceUnavailable, "webrtc: per-stream peer cap reached")
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
|
||||||
body, err := io.ReadAll(c.Request().Body)
|
body, err := io.ReadAll(c.Request().Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return c.String(http.StatusBadRequest, "read body: "+err.Error())
|
return c.String(http.StatusBadRequest, "read body: "+err.Error())
|
||||||
|
|
@ -90,29 +137,52 @@ func (h *Handler) Subscribe(c echo.Context) error {
|
||||||
if len(body) == 0 || !strings.HasPrefix(string(body), "v=") {
|
if len(body) == 0 || !strings.HasPrefix(string(body), "v=") {
|
||||||
return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error())
|
return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error())
|
||||||
}
|
}
|
||||||
|
if err := requireH264AndOpus(string(body)); err != nil {
|
||||||
|
return c.String(http.StatusNotAcceptable, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
|
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
|
||||||
peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer)
|
peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
|
// Surface the design's error matrix.
|
||||||
|
switch err {
|
||||||
|
case corewebrtc.ErrICETimeout:
|
||||||
|
return c.String(http.StatusGatewayTimeout, err.Error())
|
||||||
|
case corewebrtc.ErrCodecMismatch:
|
||||||
|
return c.String(http.StatusNotAcceptable, err.Error())
|
||||||
|
default:
|
||||||
|
return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
h.peersMu.Lock()
|
rid := peer.ResourceID()
|
||||||
h.peers[peer.ResourceID()] = peer
|
h.mu.Lock()
|
||||||
h.peersMu.Unlock()
|
if h.peersByStream[id] == nil {
|
||||||
|
h.peersByStream[id] = make(map[string]*corewebrtc.Peer)
|
||||||
|
}
|
||||||
|
h.peersByStream[id][rid] = peer
|
||||||
|
h.peerStream[rid] = id
|
||||||
|
h.mu.Unlock()
|
||||||
atomic.AddInt64(&h.count, 1)
|
atomic.AddInt64(&h.count, 1)
|
||||||
|
|
||||||
|
// Auto-cleanup: when Pion's OnConnectionStateChange triggers
|
||||||
|
// peer.Close() (ICE failed/disconnected), the Done channel
|
||||||
|
// closes and we yank the index entry. Without this the map
|
||||||
|
// leaks for the lifetime of the handler.
|
||||||
|
go h.awaitPeerClose(rid, peer)
|
||||||
|
|
||||||
c.Response().Header().Set("Content-Type", "application/sdp")
|
c.Response().Header().Set("Content-Type", "application/sdp")
|
||||||
c.Response().Header().Set("Location", "/whep/"+id+"/"+peer.ResourceID())
|
c.Response().Header().Set("Location", "/whep/"+id+"/"+rid)
|
||||||
|
c.Response().Header().Set("ETag", `"`+rid+`"`)
|
||||||
return c.String(http.StatusCreated, peer.Answer().SDP)
|
return c.String(http.StatusCreated, peer.Answer().SDP)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe handles DELETE /whep/:id/:resource. The :id is part of
|
// Unsubscribe handles DELETE /whep/:id/:resource. Per WHEP spec we
|
||||||
// the path per WHEP spec but we only need :resource to locate the
|
// return 204 even when the resource is unknown — DELETE is idempotent
|
||||||
// peer; :id is accepted for route symmetry.
|
// and a re-issued tear-down should never error out.
|
||||||
//
|
//
|
||||||
// @Summary Tear down a WHEP subscription
|
// @Summary Tear down a WHEP subscription
|
||||||
// @Description Tear down a WebRTC peer connection by its resource id (returned in the Location header by Subscribe). Idempotent: returns 204 even when the resource is unknown, per the WHEP spec.
|
// @Description Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec.
|
||||||
// @Tags v16.16.0
|
// @Tags v16.16.0
|
||||||
// @ID webrtc-3-whep-unsubscribe
|
// @ID webrtc-3-whep-unsubscribe
|
||||||
// @Param id path string true "Process ID"
|
// @Param id path string true "Process ID"
|
||||||
|
|
@ -122,38 +192,196 @@ func (h *Handler) Subscribe(c echo.Context) error {
|
||||||
// @Security ApiKeyAuth
|
// @Security ApiKeyAuth
|
||||||
// @Router /api/v3/whep/{id}/{resource} [delete]
|
// @Router /api/v3/whep/{id}/{resource} [delete]
|
||||||
func (h *Handler) Unsubscribe(c echo.Context) error {
|
func (h *Handler) Unsubscribe(c echo.Context) error {
|
||||||
|
addCORS(c)
|
||||||
|
|
||||||
resource := c.Param("resource")
|
resource := c.Param("resource")
|
||||||
if resource == "" {
|
if resource == "" {
|
||||||
return c.String(http.StatusBadRequest, "missing resource id")
|
return c.String(http.StatusBadRequest, "missing resource id")
|
||||||
}
|
}
|
||||||
|
|
||||||
h.peersMu.Lock()
|
h.mu.Lock()
|
||||||
peer, ok := h.peers[resource]
|
streamID := h.peerStream[resource]
|
||||||
if ok {
|
var peer *corewebrtc.Peer
|
||||||
delete(h.peers, resource)
|
if streamID != "" {
|
||||||
|
peer = h.peersByStream[streamID][resource]
|
||||||
|
delete(h.peersByStream[streamID], resource)
|
||||||
|
if len(h.peersByStream[streamID]) == 0 {
|
||||||
|
delete(h.peersByStream, streamID)
|
||||||
|
}
|
||||||
|
delete(h.peerStream, resource)
|
||||||
}
|
}
|
||||||
h.peersMu.Unlock()
|
h.mu.Unlock()
|
||||||
|
|
||||||
if !ok {
|
if peer != nil {
|
||||||
|
_ = peer.Close()
|
||||||
|
}
|
||||||
|
if streamID != "" {
|
||||||
|
atomic.AddInt64(&h.count, -1)
|
||||||
|
}
|
||||||
|
return c.NoContent(http.StatusNoContent)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trickle handles PATCH /whep/:id/:resource — adds ICE candidates
|
||||||
|
// from a trickle-ice-sdpfrag body. Empty body is a no-op (clients
|
||||||
|
// signal end-of-candidates via an a=end-of-candidates line, which
|
||||||
|
// AddICECandidate accepts).
|
||||||
|
//
|
||||||
|
// @Summary Trickle ICE candidates for a WHEP subscription
|
||||||
|
// @Description Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.
|
||||||
|
// @Tags v16.16.0
|
||||||
|
// @ID webrtc-3-whep-trickle
|
||||||
|
// @Accept application/trickle-ice-sdpfrag
|
||||||
|
// @Param id path string true "Process ID"
|
||||||
|
// @Param resource path string true "Resource ID from the Subscribe Location header"
|
||||||
|
// @Success 204 "no content"
|
||||||
|
// @Failure 400 {string} string "missing resource id or unreadable body"
|
||||||
|
// @Failure 404 {string} string "peer not found"
|
||||||
|
// @Security ApiKeyAuth
|
||||||
|
// @Router /api/v3/whep/{id}/{resource} [patch]
|
||||||
|
func (h *Handler) Trickle(c echo.Context) error {
|
||||||
|
addCORS(c)
|
||||||
|
|
||||||
|
resource := c.Param("resource")
|
||||||
|
if resource == "" {
|
||||||
|
return c.String(http.StatusBadRequest, "missing resource id")
|
||||||
|
}
|
||||||
|
|
||||||
|
h.mu.Lock()
|
||||||
|
streamID := h.peerStream[resource]
|
||||||
|
var peer *corewebrtc.Peer
|
||||||
|
if streamID != "" {
|
||||||
|
peer = h.peersByStream[streamID][resource]
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
if peer == nil {
|
||||||
return c.NoContent(http.StatusNotFound)
|
return c.NoContent(http.StatusNotFound)
|
||||||
}
|
}
|
||||||
_ = peer.Close()
|
|
||||||
atomic.AddInt64(&h.count, -1)
|
body, err := io.ReadAll(c.Request().Body)
|
||||||
|
if err != nil {
|
||||||
|
return c.String(http.StatusBadRequest, "read body: "+err.Error())
|
||||||
|
}
|
||||||
|
for _, line := range strings.Split(string(body), "\n") {
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
if !strings.HasPrefix(line, "a=candidate:") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cand := strings.TrimPrefix(line, "a=")
|
||||||
|
_ = peer.AddICECandidate(webrtc.ICECandidateInit{Candidate: cand})
|
||||||
|
}
|
||||||
|
return c.NoContent(http.StatusNoContent)
|
||||||
|
}
|
||||||
|
|
||||||
|
// preflight answers a CORS OPTIONS request; the headers are also
|
||||||
|
// echoed on every other response.
|
||||||
|
func (h *Handler) preflight(c echo.Context) error {
|
||||||
|
addCORS(c)
|
||||||
return c.NoContent(http.StatusNoContent)
|
return c.NoContent(http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close tears down every active peer (e.g., during Core shutdown).
|
// Close tears down every active peer (e.g., during Core shutdown).
|
||||||
func (h *Handler) Close() {
|
func (h *Handler) Close() {
|
||||||
h.peersMu.Lock()
|
h.mu.Lock()
|
||||||
peers := make([]*corewebrtc.Peer, 0, len(h.peers))
|
peers := make([]*corewebrtc.Peer, 0)
|
||||||
for _, p := range h.peers {
|
for _, m := range h.peersByStream {
|
||||||
peers = append(peers, p)
|
for _, p := range m {
|
||||||
|
peers = append(peers, p)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
h.peers = make(map[string]*corewebrtc.Peer)
|
h.peersByStream = make(map[string]map[string]*corewebrtc.Peer)
|
||||||
h.peersMu.Unlock()
|
h.peerStream = make(map[string]string)
|
||||||
|
h.mu.Unlock()
|
||||||
|
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
_ = p.Close()
|
if p != nil {
|
||||||
|
_ = p.Close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
atomic.StoreInt64(&h.count, 0)
|
atomic.StoreInt64(&h.count, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// awaitPeerClose blocks on peer.Done() and yanks the index entry when
|
||||||
|
// the peer self-closes (ICE failed/disconnected). Idempotent with
|
||||||
|
// the Unsubscribe path: if Unsubscribe ran first the index is already
|
||||||
|
// empty and we just decrement the counter once on first arrival.
|
||||||
|
func (h *Handler) awaitPeerClose(resource string, peer *corewebrtc.Peer) {
|
||||||
|
<-peer.Done()
|
||||||
|
h.mu.Lock()
|
||||||
|
streamID := h.peerStream[resource]
|
||||||
|
_, present := h.peerStream[resource]
|
||||||
|
if present {
|
||||||
|
delete(h.peerStream, resource)
|
||||||
|
if streamID != "" {
|
||||||
|
delete(h.peersByStream[streamID], resource)
|
||||||
|
if len(h.peersByStream[streamID]) == 0 {
|
||||||
|
delete(h.peersByStream, streamID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
if present {
|
||||||
|
atomic.AddInt64(&h.count, -1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// tearDownStreamPeers is the callback the Subsystem runs in its
|
||||||
|
// onProcessStop hook. It closes every peer subscribed to that
|
||||||
|
// stream (driving each one's Done() and indirectly awaitPeerClose).
|
||||||
|
func (h *Handler) tearDownStreamPeers(streamID string) {
|
||||||
|
h.mu.Lock()
|
||||||
|
bucket := h.peersByStream[streamID]
|
||||||
|
peers := make([]*corewebrtc.Peer, 0, len(bucket))
|
||||||
|
for _, p := range bucket {
|
||||||
|
peers = append(peers, p)
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
|
||||||
|
for _, p := range peers {
|
||||||
|
if p != nil {
|
||||||
|
_ = p.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// addCORS emits the response headers a browser-side WHEP player
|
||||||
|
// expects. WHEP's Location and ETag headers must be exposed for
|
||||||
|
// fetch() to read them across origins.
|
||||||
|
func addCORS(c echo.Context) {
|
||||||
|
hh := c.Response().Header()
|
||||||
|
hh.Set("Access-Control-Allow-Origin", "*")
|
||||||
|
hh.Set("Access-Control-Allow-Methods", "POST, DELETE, PATCH, OPTIONS")
|
||||||
|
hh.Set("Access-Control-Allow-Headers", "Content-Type, Authorization, If-Match, If-None-Match")
|
||||||
|
hh.Set("Access-Control-Expose-Headers", "Location, ETag")
|
||||||
|
}
|
||||||
|
|
||||||
|
// requireH264AndOpus does a coarse SDP scan to confirm the offer
|
||||||
|
// includes both an H.264 video rtpmap and an Opus audio rtpmap. The
|
||||||
|
// design treats codec mismatch as a 406, never a silent black frame.
|
||||||
|
//
|
||||||
|
// This is intentionally a string scan rather than a full SDP parse:
|
||||||
|
// every modern browser advertises H.264 and Opus by name, and a
|
||||||
|
// dependency on a real SDP parser for one validation step is
|
||||||
|
// disproportionate. M4 may swap this for pion/sdp.v3 when other
|
||||||
|
// surfaces also need parsing.
|
||||||
|
func requireH264AndOpus(sdp string) error {
|
||||||
|
lower := strings.ToLower(sdp)
|
||||||
|
hasH264 := strings.Contains(lower, "h264/90000") || strings.Contains(lower, " h264/")
|
||||||
|
hasOpus := strings.Contains(lower, "opus/48000") || strings.Contains(lower, " opus/")
|
||||||
|
if hasH264 && hasOpus {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
missing := []string{}
|
||||||
|
if !hasH264 {
|
||||||
|
missing = append(missing, "H264")
|
||||||
|
}
|
||||||
|
if !hasOpus {
|
||||||
|
missing = append(missing, "Opus")
|
||||||
|
}
|
||||||
|
return &codecMismatchError{missing: missing}
|
||||||
|
}
|
||||||
|
|
||||||
|
type codecMismatchError struct{ missing []string }
|
||||||
|
|
||||||
|
func (e *codecMismatchError) Error() string {
|
||||||
|
return "webrtc: codec mismatch — offer is missing: " + strings.Join(e.missing, ", ")
|
||||||
|
}
|
||||||
|
|
|
||||||
251
app/webrtc/handler_m3_test.go
Normal file
251
app/webrtc/handler_m3_test.go
Normal file
|
|
@ -0,0 +1,251 @@
|
||||||
|
package webrtc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/labstack/echo/v4"
|
||||||
|
|
||||||
|
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// minimalH264OpusOffer returns an SDP offer that includes both H264
|
||||||
|
// and Opus rtpmap lines — passes requireH264AndOpus but is otherwise
|
||||||
|
// nonsense, so CreatePeerFromSources will fail downstream when this
|
||||||
|
// is wired through. Use it only in tests that don't reach the
|
||||||
|
// PeerConnection path.
|
||||||
|
func minimalH264OpusOffer() string {
|
||||||
|
return "v=0\r\n" +
|
||||||
|
"o=- 0 0 IN IP4 0.0.0.0\r\ns=-\r\nt=0 0\r\n" +
|
||||||
|
"m=video 9 UDP/TLS/RTP/SAVPF 102\r\n" +
|
||||||
|
"a=rtpmap:102 H264/90000\r\n" +
|
||||||
|
"m=audio 9 UDP/TLS/RTP/SAVPF 111\r\n" +
|
||||||
|
"a=rtpmap:111 opus/48000/2\r\n"
|
||||||
|
}
|
||||||
|
|
||||||
|
// nonH264Offer is missing H264 entirely. Triggers requireH264AndOpus.
|
||||||
|
func nonH264Offer() string {
|
||||||
|
return "v=0\r\n" +
|
||||||
|
"m=video 9 UDP/TLS/RTP/SAVPF 96\r\n" +
|
||||||
|
"a=rtpmap:96 VP8/90000\r\n" +
|
||||||
|
"m=audio 9 UDP/TLS/RTP/SAVPF 111\r\n" +
|
||||||
|
"a=rtpmap:111 opus/48000/2\r\n"
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHandler_Subscribe_406OnCodecMismatch verifies an offer that
|
||||||
|
// doesn't include H264 yields 406, per the design's error matrix.
|
||||||
|
func TestHandler_Subscribe_406OnCodecMismatch(t *testing.T) {
|
||||||
|
sub := newTestSubsystem(t)
|
||||||
|
sub.mu.Lock()
|
||||||
|
sub.streams["s"] = &processStream{id: "s"}
|
||||||
|
sub.mu.Unlock()
|
||||||
|
h := NewHandler(sub, 0)
|
||||||
|
|
||||||
|
e := echo.New()
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(nonH264Offer()))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c := e.NewContext(req, rec)
|
||||||
|
c.SetParamNames("id")
|
||||||
|
c.SetParamValues("s")
|
||||||
|
|
||||||
|
if err := h.Subscribe(c); err != nil {
|
||||||
|
t.Fatalf("Subscribe: %v", err)
|
||||||
|
}
|
||||||
|
if rec.Code != http.StatusNotAcceptable {
|
||||||
|
t.Fatalf("expected 406, got %d: %s", rec.Code, rec.Body.String())
|
||||||
|
}
|
||||||
|
if !strings.Contains(rec.Body.String(), "H264") {
|
||||||
|
t.Errorf("body should mention missing codec: %q", rec.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHandler_Subscribe_503OnTotalCap simulates the total cap being
|
||||||
|
// exhausted by another subscriber. We don't actually create real peers
|
||||||
|
// (would need a real PeerConnection); instead we pre-load the atomic
|
||||||
|
// counter so the cap check fires.
|
||||||
|
func TestHandler_Subscribe_503OnTotalCap(t *testing.T) {
|
||||||
|
sub := newTestSubsystem(t)
|
||||||
|
sub.mu.Lock()
|
||||||
|
sub.streams["s"] = &processStream{id: "s"}
|
||||||
|
sub.mu.Unlock()
|
||||||
|
h := NewHandlerWithCaps(sub, 1, 100)
|
||||||
|
atomic.StoreInt64(&h.count, 1) // simulate one in-flight peer
|
||||||
|
|
||||||
|
e := echo.New()
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(minimalH264OpusOffer()))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c := e.NewContext(req, rec)
|
||||||
|
c.SetParamNames("id")
|
||||||
|
c.SetParamValues("s")
|
||||||
|
_ = h.Subscribe(c)
|
||||||
|
if rec.Code != http.StatusServiceUnavailable {
|
||||||
|
t.Fatalf("expected 503, got %d: %s", rec.Code, rec.Body.String())
|
||||||
|
}
|
||||||
|
if !strings.Contains(rec.Body.String(), corewebrtc.ErrPeerCapReached.Error()) {
|
||||||
|
t.Errorf("body should mention peer cap: %q", rec.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHandler_Subscribe_503OnPerStreamCap simulates the per-stream cap
|
||||||
|
// being exhausted. Same trick as above but populating the per-stream
|
||||||
|
// index directly.
|
||||||
|
func TestHandler_Subscribe_503OnPerStreamCap(t *testing.T) {
|
||||||
|
sub := newTestSubsystem(t)
|
||||||
|
sub.mu.Lock()
|
||||||
|
sub.streams["s"] = &processStream{id: "s"}
|
||||||
|
sub.mu.Unlock()
|
||||||
|
h := NewHandlerWithCaps(sub, 100, 1)
|
||||||
|
// Drop a placeholder peer into the per-stream bucket so the cap
|
||||||
|
// arithmetic trips on the next subscribe.
|
||||||
|
h.mu.Lock()
|
||||||
|
h.peersByStream["s"] = map[string]*corewebrtc.Peer{"existing": nil}
|
||||||
|
h.mu.Unlock()
|
||||||
|
|
||||||
|
e := echo.New()
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/whep/s", strings.NewReader(minimalH264OpusOffer()))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c := e.NewContext(req, rec)
|
||||||
|
c.SetParamNames("id")
|
||||||
|
c.SetParamValues("s")
|
||||||
|
_ = h.Subscribe(c)
|
||||||
|
if rec.Code != http.StatusServiceUnavailable {
|
||||||
|
t.Fatalf("expected 503, got %d: %s", rec.Code, rec.Body.String())
|
||||||
|
}
|
||||||
|
if !strings.Contains(rec.Body.String(), "per-stream") {
|
||||||
|
t.Errorf("body should mention per-stream cap: %q", rec.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHandler_Trickle_404WhenUnknown verifies a PATCH for an unknown
|
||||||
|
// resource returns 404 (we still treat the resource as authoritative
|
||||||
|
// here; only DELETE is idempotent per spec).
|
||||||
|
func TestHandler_Trickle_404WhenUnknown(t *testing.T) {
|
||||||
|
h := NewHandler(newTestSubsystem(t), 0)
|
||||||
|
|
||||||
|
e := echo.New()
|
||||||
|
req := httptest.NewRequest(http.MethodPatch, "/whep/id/unknown", strings.NewReader(""))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c := e.NewContext(req, rec)
|
||||||
|
c.SetParamNames("id", "resource")
|
||||||
|
c.SetParamValues("id", "unknown")
|
||||||
|
|
||||||
|
if err := h.Trickle(c); err != nil {
|
||||||
|
t.Fatalf("Trickle: %v", err)
|
||||||
|
}
|
||||||
|
if rec.Code != http.StatusNotFound {
|
||||||
|
t.Fatalf("expected 404, got %d", rec.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHandler_PreflightCORS verifies OPTIONS returns 204 with the
|
||||||
|
// browser-friendly CORS headers.
|
||||||
|
func TestHandler_PreflightCORS(t *testing.T) {
|
||||||
|
h := NewHandler(newTestSubsystem(t), 0)
|
||||||
|
|
||||||
|
e := echo.New()
|
||||||
|
req := httptest.NewRequest(http.MethodOptions, "/whep/x", nil)
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c := e.NewContext(req, rec)
|
||||||
|
c.SetParamNames("id")
|
||||||
|
c.SetParamValues("x")
|
||||||
|
|
||||||
|
if err := h.preflight(c); err != nil {
|
||||||
|
t.Fatalf("preflight: %v", err)
|
||||||
|
}
|
||||||
|
if rec.Code != http.StatusNoContent {
|
||||||
|
t.Fatalf("expected 204, got %d", rec.Code)
|
||||||
|
}
|
||||||
|
hh := rec.Header()
|
||||||
|
for _, k := range []string{
|
||||||
|
"Access-Control-Allow-Origin",
|
||||||
|
"Access-Control-Allow-Methods",
|
||||||
|
"Access-Control-Allow-Headers",
|
||||||
|
"Access-Control-Expose-Headers",
|
||||||
|
} {
|
||||||
|
if hh.Get(k) == "" {
|
||||||
|
t.Errorf("missing CORS header %q", k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHandler_RegisterMountsAllRoutes is a sanity check that
|
||||||
|
// Handler.Register installs OPTIONS / POST / DELETE / PATCH on the
|
||||||
|
// expected paths. Echo's Group has no public route enumerator, so we
|
||||||
|
// dispatch synthetic requests and assert the right methods are
|
||||||
|
// reachable.
|
||||||
|
func TestHandler_RegisterMountsAllRoutes(t *testing.T) {
|
||||||
|
h := NewHandler(newTestSubsystem(t), 0)
|
||||||
|
e := echo.New()
|
||||||
|
g := e.Group("")
|
||||||
|
h.Register(g)
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
method, path string
|
||||||
|
want int
|
||||||
|
}{
|
||||||
|
{http.MethodOptions, "/whep/foo", http.StatusNoContent},
|
||||||
|
{http.MethodOptions, "/whep/foo/bar", http.StatusNoContent},
|
||||||
|
{http.MethodPost, "/whep/foo", http.StatusNotFound}, // stream missing -> 404
|
||||||
|
{http.MethodDelete, "/whep/foo/bar", http.StatusNoContent},
|
||||||
|
{http.MethodPatch, "/whep/foo/bar", http.StatusNotFound},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
req := httptest.NewRequest(tc.method, tc.path, strings.NewReader(""))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
e.ServeHTTP(rec, req)
|
||||||
|
if rec.Code != tc.want {
|
||||||
|
t.Errorf("%s %s: got %d want %d (%s)", tc.method, tc.path, rec.Code, tc.want, rec.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHandler_Close_DrainsPeers seeds a fake peer into the index and
|
||||||
|
// verifies Close clears it without panicking.
|
||||||
|
func TestHandler_Close_DrainsPeers(t *testing.T) {
|
||||||
|
h := NewHandler(newTestSubsystem(t), 0)
|
||||||
|
h.mu.Lock()
|
||||||
|
h.peersByStream["s"] = map[string]*corewebrtc.Peer{"r1": nil}
|
||||||
|
h.peerStream["r1"] = "s"
|
||||||
|
atomic.StoreInt64(&h.count, 1)
|
||||||
|
h.mu.Unlock()
|
||||||
|
|
||||||
|
h.Close()
|
||||||
|
if got := atomic.LoadInt64(&h.count); got != 0 {
|
||||||
|
t.Errorf("count after Close = %d, want 0", got)
|
||||||
|
}
|
||||||
|
h.mu.Lock()
|
||||||
|
if len(h.peersByStream) != 0 || len(h.peerStream) != 0 {
|
||||||
|
t.Errorf("indexes not cleared")
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRequireH264AndOpus covers the SDP scanner's positive +
|
||||||
|
// negative cases.
|
||||||
|
func TestRequireH264AndOpus(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
sdp string
|
||||||
|
ok bool
|
||||||
|
}{
|
||||||
|
{"both", minimalH264OpusOffer(), true},
|
||||||
|
{"missing h264", nonH264Offer(), false},
|
||||||
|
{"missing opus", "m=video 9 UDP/TLS/RTP/SAVPF 102\r\na=rtpmap:102 H264/90000\r\n", false},
|
||||||
|
{"capitalized", "a=rtpmap:111 OPUS/48000\r\na=rtpmap:102 H264/90000", true},
|
||||||
|
{"empty", "", false},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
err := requireH264AndOpus(c.sdp)
|
||||||
|
if c.ok && err != nil {
|
||||||
|
t.Errorf("expected ok, got %v", err)
|
||||||
|
}
|
||||||
|
if !c.ok && err == nil {
|
||||||
|
t.Errorf("expected error")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -68,9 +68,11 @@ func TestHandler_Subscribe_400OnEmptyBody(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestHandler_Unsubscribe_404WhenUnknown verifies a DELETE with an
|
// TestHandler_Unsubscribe_204WhenUnknown verifies a DELETE with an
|
||||||
// unknown resource id returns 404 and no state mutation.
|
// unknown resource id returns 204 (idempotent), per the WHEP spec
|
||||||
func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) {
|
// and the M2/M3 design's error matrix. Pre-M3 this returned 404; the
|
||||||
|
// updated semantics let clients re-issue DELETE without erroring.
|
||||||
|
func TestHandler_Unsubscribe_204WhenUnknown(t *testing.T) {
|
||||||
h := NewHandler(newTestSubsystem(t), 0)
|
h := NewHandler(newTestSubsystem(t), 0)
|
||||||
|
|
||||||
e := echo.New()
|
e := echo.New()
|
||||||
|
|
@ -83,7 +85,7 @@ func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) {
|
||||||
if err := h.Unsubscribe(c); err != nil {
|
if err := h.Unsubscribe(c); err != nil {
|
||||||
t.Fatalf("Unsubscribe returned error: %v", err)
|
t.Fatalf("Unsubscribe returned error: %v", err)
|
||||||
}
|
}
|
||||||
if rec.Code != http.StatusNotFound {
|
if rec.Code != http.StatusNoContent {
|
||||||
t.Fatalf("expected 404, got %d", rec.Code)
|
t.Fatalf("expected 204, got %d", rec.Code)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -94,6 +94,7 @@ func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.Conf
|
||||||
func (s *Subsystem) onProcessStop(id string) {
|
func (s *Subsystem) onProcessStop(id string) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
st, ok := s.streams[id]
|
st, ok := s.streams[id]
|
||||||
|
teardown := s.teardown
|
||||||
if ok {
|
if ok {
|
||||||
delete(s.streams, id)
|
delete(s.streams, id)
|
||||||
}
|
}
|
||||||
|
|
@ -102,6 +103,16 @@ func (s *Subsystem) onProcessStop(id string) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Broadcast first, so any subscribed peers get torn down while
|
||||||
|
// the streamID is still meaningful. The handler's tearDownStreamPeers
|
||||||
|
// drives each Peer.Close() which in turn unsubscribes from the
|
||||||
|
// Sources we're about to shut down — preventing a "subscribers fan
|
||||||
|
// out into a closed channel" race.
|
||||||
|
if teardown != nil {
|
||||||
|
teardown(id)
|
||||||
|
}
|
||||||
|
|
||||||
if st.video != nil {
|
if st.video != nil {
|
||||||
_ = st.video.Close()
|
_ = st.video.Close()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
257
app/webrtc/multiviewer_test.go
Normal file
257
app/webrtc/multiviewer_test.go
Normal file
|
|
@ -0,0 +1,257 @@
|
||||||
|
package webrtc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/labstack/echo/v4"
|
||||||
|
pionwebrtc "github.com/pion/webrtc/v4"
|
||||||
|
|
||||||
|
"github.com/datarhei/core/v16/config"
|
||||||
|
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestIntegration_FiveViewerFanout drives the M3 acceptance criterion
|
||||||
|
// "5 concurrent viewers, all error paths correct, clean teardown" in
|
||||||
|
// the wide direction. Five Pion subscribers attach to a single
|
||||||
|
// process's stream pair and each receives RTP without crosstalk; on
|
||||||
|
// teardown every subscriber's PeerConnection observes its tracks
|
||||||
|
// closing.
|
||||||
|
//
|
||||||
|
// Verifies (in order):
|
||||||
|
// * subsystem.onProcessStart returns adjacent UDP ports
|
||||||
|
// * 5 WHEP POSTs in parallel succeed (per-stream cap default = 8)
|
||||||
|
// * every subscriber's video and audio track receives at least one
|
||||||
|
// RTP packet within the timeout
|
||||||
|
// * onProcessStop tears every subscriber down (PeerConnection
|
||||||
|
// transitions away from connected/connecting)
|
||||||
|
func TestIntegration_FiveViewerFanout(t *testing.T) {
|
||||||
|
const N = 5
|
||||||
|
|
||||||
|
sub, err := New(config.DataWebRTC{Enable: true}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("subsystem New: %v", err)
|
||||||
|
}
|
||||||
|
defer sub.Close()
|
||||||
|
|
||||||
|
h := NewHandler(sub, 0)
|
||||||
|
defer h.Close()
|
||||||
|
|
||||||
|
processID := "fanout"
|
||||||
|
legs, err := sub.onProcessStart(processID, &appcfg.Config{
|
||||||
|
ID: processID,
|
||||||
|
WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("onProcessStart: %v", err)
|
||||||
|
}
|
||||||
|
if len(legs) != 2 {
|
||||||
|
t.Fatalf("expected 2 legs, got %d", len(legs))
|
||||||
|
}
|
||||||
|
videoPort, err := portFromLegAddress(legs[0].Address)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("video port: %v", err)
|
||||||
|
}
|
||||||
|
audioPort, err := portFromLegAddress(legs[1].Address)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("audio port: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
e := echo.New()
|
||||||
|
g := e.Group("")
|
||||||
|
h.Register(g)
|
||||||
|
srv := httptest.NewServer(e)
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
// Each subscriber tracks first-RTP-received signals for V and A.
|
||||||
|
type viewer struct {
|
||||||
|
pc *pionwebrtc.PeerConnection
|
||||||
|
videoCh chan struct{}
|
||||||
|
audioCh chan struct{}
|
||||||
|
}
|
||||||
|
viewers := make([]*viewer, N)
|
||||||
|
api := func() *pionwebrtc.API {
|
||||||
|
me := &pionwebrtc.MediaEngine{}
|
||||||
|
_ = me.RegisterDefaultCodecs()
|
||||||
|
return pionwebrtc.NewAPI(pionwebrtc.WithMediaEngine(me))
|
||||||
|
}()
|
||||||
|
|
||||||
|
subscribe := func(i int) error {
|
||||||
|
pc, err := api.NewPeerConnection(pionwebrtc.Configuration{})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
v := &viewer{pc: pc, videoCh: make(chan struct{}, 1), audioCh: make(chan struct{}, 1)}
|
||||||
|
viewers[i] = v
|
||||||
|
var vGot, aGot atomic.Bool
|
||||||
|
pc.OnTrack(func(tr *pionwebrtc.TrackRemote, _ *pionwebrtc.RTPReceiver) {
|
||||||
|
go func() {
|
||||||
|
if _, _, rerr := tr.ReadRTP(); rerr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch tr.Kind() {
|
||||||
|
case pionwebrtc.RTPCodecTypeVideo:
|
||||||
|
if vGot.CompareAndSwap(false, true) {
|
||||||
|
v.videoCh <- struct{}{}
|
||||||
|
}
|
||||||
|
case pionwebrtc.RTPCodecTypeAudio:
|
||||||
|
if aGot.CompareAndSwap(false, true) {
|
||||||
|
v.audioCh <- struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
_, _ = pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeVideo,
|
||||||
|
pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly})
|
||||||
|
_, _ = pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeAudio,
|
||||||
|
pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly})
|
||||||
|
offer, err := pc.CreateOffer(nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
gather := pionwebrtc.GatheringCompletePromise(pc)
|
||||||
|
if err := pc.SetLocalDescription(offer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
<-gather
|
||||||
|
resp, err := http.Post(srv.URL+"/whep/"+processID, "application/sdp",
|
||||||
|
strings.NewReader(pc.LocalDescription().SDP))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != http.StatusCreated {
|
||||||
|
t.Errorf("viewer %d: WHEP %d", i, resp.StatusCode)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
buf := make([]byte, 1<<15)
|
||||||
|
n, _ := resp.Body.Read(buf)
|
||||||
|
return pc.SetRemoteDescription(pionwebrtc.SessionDescription{
|
||||||
|
Type: pionwebrtc.SDPTypeAnswer,
|
||||||
|
SDP: string(buf[:n]),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe all N viewers in parallel.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := subscribe(i); err != nil {
|
||||||
|
t.Errorf("viewer %d subscribe: %v", i, err)
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
if viewers[i] == nil || viewers[i].pc == nil {
|
||||||
|
t.Fatalf("viewer %d not constructed", i)
|
||||||
|
}
|
||||||
|
defer viewers[i].pc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spray RTP into both ports until every viewer reports first-RTP.
|
||||||
|
videoSender, _ := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(videoPort))
|
||||||
|
audioSender, _ := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(audioPort))
|
||||||
|
defer videoSender.Close()
|
||||||
|
defer audioSender.Close()
|
||||||
|
stop := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(20 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
var seq uint16
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stop:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
seq++
|
||||||
|
_, _ = videoSender.Write(synthRTPPacket(102, seq, uint32(seq)*3000, 0xcafe0000, []byte("vvvvvvvv")))
|
||||||
|
_, _ = audioSender.Write(synthRTPPacket(111, seq, uint32(seq)*960, 0xbeef0000, []byte("aaaaaaaa")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
defer close(stop)
|
||||||
|
|
||||||
|
deadline := time.After(15 * time.Second)
|
||||||
|
for i, v := range viewers {
|
||||||
|
select {
|
||||||
|
case <-v.videoCh:
|
||||||
|
case <-deadline:
|
||||||
|
t.Fatalf("viewer %d: no video RTP within 15s", i)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-v.audioCh:
|
||||||
|
case <-deadline:
|
||||||
|
t.Fatalf("viewer %d: no audio RTP within 15s", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Confirm the per-stream peer index has all N entries.
|
||||||
|
h.mu.Lock()
|
||||||
|
got := len(h.peersByStream[processID])
|
||||||
|
h.mu.Unlock()
|
||||||
|
if got != N {
|
||||||
|
t.Errorf("peersByStream[%s] = %d, want %d", processID, got, N)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tear the process down — every viewer's PC should observe state
|
||||||
|
// transitioning away from connected within a short window.
|
||||||
|
sub.onProcessStop(processID)
|
||||||
|
|
||||||
|
// After teardown the peer index for this stream should be empty.
|
||||||
|
// Closing peers is async (driven by Done channel), so poll briefly.
|
||||||
|
deadline2 := time.Now().Add(3 * time.Second)
|
||||||
|
for time.Now().Before(deadline2) {
|
||||||
|
h.mu.Lock()
|
||||||
|
empty := len(h.peersByStream[processID]) == 0
|
||||||
|
h.mu.Unlock()
|
||||||
|
if empty {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
h.mu.Lock()
|
||||||
|
leftover := len(h.peersByStream[processID])
|
||||||
|
h.mu.Unlock()
|
||||||
|
if leftover != 0 {
|
||||||
|
t.Errorf("after onProcessStop, %d peers remain in index", leftover)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSubsystem_TeardownHookFiresOnProcessStop is a unit-level check
|
||||||
|
// that the teardown callback the Handler installs actually runs.
|
||||||
|
func TestSubsystem_TeardownHookFiresOnProcessStop(t *testing.T) {
|
||||||
|
sub, err := New(config.DataWebRTC{Enable: true}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("New: %v", err)
|
||||||
|
}
|
||||||
|
defer sub.Close()
|
||||||
|
|
||||||
|
var fired atomic.Int32
|
||||||
|
sub.SetTeardownHook(func(streamID string) {
|
||||||
|
if streamID == "p1" {
|
||||||
|
fired.Add(1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if _, err := sub.onProcessStart("p1", &appcfg.Config{
|
||||||
|
ID: "p1",
|
||||||
|
WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111},
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("onProcessStart: %v", err)
|
||||||
|
}
|
||||||
|
sub.onProcessStop("p1")
|
||||||
|
if got := fired.Load(); got != 1 {
|
||||||
|
t.Errorf("teardown fired %d times, want 1", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -31,6 +31,12 @@ type Subsystem struct {
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
streams map[string]*processStream // processID -> stream pair
|
streams map[string]*processStream // processID -> stream pair
|
||||||
|
|
||||||
|
// teardown is set by the Handler (or any other consumer) so the
|
||||||
|
// Subsystem can broadcast process-stop events. Called *before*
|
||||||
|
// the per-stream Sources are closed, so consumers can yank their
|
||||||
|
// own indexes while the stream id is still valid.
|
||||||
|
teardown func(streamID string)
|
||||||
}
|
}
|
||||||
|
|
||||||
// processStream captures the two Sources (video + audio) backing a
|
// processStream captures the two Sources (video + audio) backing a
|
||||||
|
|
@ -110,6 +116,19 @@ func (s *Subsystem) Close() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetTeardownHook registers a callback invoked just before a stream's
|
||||||
|
// Sources are closed in onProcessStop. The callback is expected to
|
||||||
|
// tear down any external resources keyed by streamID — most importantly
|
||||||
|
// the WHEP Handler's per-stream peer index.
|
||||||
|
//
|
||||||
|
// Calling SetTeardownHook again replaces the previous callback; pass
|
||||||
|
// nil to detach. Only one consumer is supported by design.
|
||||||
|
func (s *Subsystem) SetTeardownHook(fn func(streamID string)) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.teardown = fn
|
||||||
|
}
|
||||||
|
|
||||||
// lookup returns the per-process stream pair for id, or nil, false.
|
// lookup returns the per-process stream pair for id, or nil, false.
|
||||||
// Used by the WHEP handler.
|
// Used by the WHEP handler.
|
||||||
func (s *Subsystem) lookup(id string) (*processStream, bool) {
|
func (s *Subsystem) lookup(id string) (*processStream, bool) {
|
||||||
|
|
|
||||||
|
|
@ -152,6 +152,12 @@ func (p *Peer) Answer() webrtc.SessionDescription { return p.answer }
|
||||||
// ResourceID returns the stable resource id used in the WHEP Location header.
|
// ResourceID returns the stable resource id used in the WHEP Location header.
|
||||||
func (p *Peer) ResourceID() string { return p.resourceID }
|
func (p *Peer) ResourceID() string { return p.resourceID }
|
||||||
|
|
||||||
|
// Done returns a channel that is closed when the Peer has been torn down
|
||||||
|
// (either explicitly via Close, or because Pion observed an ICE
|
||||||
|
// failure / disconnection). Consumers can range over it to drive
|
||||||
|
// index cleanup without polling.
|
||||||
|
func (p *Peer) Done() <-chan struct{} { return p.done }
|
||||||
|
|
||||||
// Close tears down the peer connection and unsubscribes from each
|
// Close tears down the peer connection and unsubscribes from each
|
||||||
// source. Safe to call multiple times.
|
// source. Safe to call multiple times.
|
||||||
func (p *Peer) Close() error {
|
func (p *Peer) Close() error {
|
||||||
|
|
@ -257,6 +263,14 @@ func (f *PeerFactory) CreatePeerFromSources(ctx context.Context,
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// AddICECandidate forwards a trickle-ICE candidate to the underlying
|
||||||
|
// PeerConnection. Returns the underlying error if the candidate is
|
||||||
|
// malformed or the connection has already been closed.
|
||||||
|
func (p *Peer) AddICECandidate(c webrtc.ICECandidateInit) error {
|
||||||
|
return p.pc.AddICECandidate(c)
|
||||||
|
}
|
||||||
|
|
||||||
func newResourceID() string {
|
func newResourceID() string {
|
||||||
b := make([]byte, 8)
|
b := make([]byte, 8)
|
||||||
_, _ = rand.Read(b)
|
_, _ = rand.Read(b)
|
||||||
|
|
|
||||||
73
docs/docs.go
73
docs/docs.go
|
|
@ -1910,7 +1910,7 @@ const docTemplate = `{
|
||||||
"ApiKeyAuth": []
|
"ApiKeyAuth": []
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"description": "Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE resource for teardown.",
|
"description": "Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE.",
|
||||||
"consumes": [
|
"consumes": [
|
||||||
"application/sdp"
|
"application/sdp"
|
||||||
],
|
],
|
||||||
|
|
@ -1950,8 +1950,20 @@ const docTemplate = `{
|
||||||
"type": "string"
|
"type": "string"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"406": {
|
||||||
|
"description": "offer SDP missing required H264 / Opus rtpmap",
|
||||||
|
"schema": {
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
"503": {
|
"503": {
|
||||||
"description": "peer cap reached",
|
"description": "peer cap reached (per-stream or total)",
|
||||||
|
"schema": {
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"504": {
|
||||||
|
"description": "ICE gathering timeout",
|
||||||
"schema": {
|
"schema": {
|
||||||
"type": "string"
|
"type": "string"
|
||||||
}
|
}
|
||||||
|
|
@ -1966,7 +1978,7 @@ const docTemplate = `{
|
||||||
"ApiKeyAuth": []
|
"ApiKeyAuth": []
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"description": "Tear down a WebRTC peer connection by its resource id (returned in the Location header by Subscribe). Idempotent: returns 204 even when the resource is unknown, per the WHEP spec.",
|
"description": "Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec.",
|
||||||
"tags": [
|
"tags": [
|
||||||
"v16.16.0"
|
"v16.16.0"
|
||||||
],
|
],
|
||||||
|
|
@ -1999,6 +2011,55 @@ const docTemplate = `{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
"patch": {
|
||||||
|
"security": [
|
||||||
|
{
|
||||||
|
"ApiKeyAuth": []
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": "Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.",
|
||||||
|
"consumes": [
|
||||||
|
"application/trickle-ice-sdpfrag"
|
||||||
|
],
|
||||||
|
"tags": [
|
||||||
|
"v16.16.0"
|
||||||
|
],
|
||||||
|
"summary": "Trickle ICE candidates for a WHEP subscription",
|
||||||
|
"operationId": "webrtc-3-whep-trickle",
|
||||||
|
"parameters": [
|
||||||
|
{
|
||||||
|
"type": "string",
|
||||||
|
"description": "Process ID",
|
||||||
|
"name": "id",
|
||||||
|
"in": "path",
|
||||||
|
"required": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "string",
|
||||||
|
"description": "Resource ID from the Subscribe Location header",
|
||||||
|
"name": "resource",
|
||||||
|
"in": "path",
|
||||||
|
"required": true
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"responses": {
|
||||||
|
"204": {
|
||||||
|
"description": "no content"
|
||||||
|
},
|
||||||
|
"400": {
|
||||||
|
"description": "missing resource id or unreadable body",
|
||||||
|
"schema": {
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"404": {
|
||||||
|
"description": "peer not found",
|
||||||
|
"schema": {
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"/api/v3/widget/process/{id}": {
|
"/api/v3/widget/process/{id}": {
|
||||||
|
|
@ -3283,6 +3344,9 @@ const docTemplate = `{
|
||||||
"api.ProcessConfigWebRTC": {
|
"api.ProcessConfigWebRTC": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
|
"audio_map": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
"audio_pt": {
|
"audio_pt": {
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
},
|
},
|
||||||
|
|
@ -3292,6 +3356,9 @@ const docTemplate = `{
|
||||||
"force_transcode": {
|
"force_transcode": {
|
||||||
"type": "boolean"
|
"type": "boolean"
|
||||||
},
|
},
|
||||||
|
"video_map": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
"video_pt": {
|
"video_pt": {
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1903,7 +1903,7 @@
|
||||||
"ApiKeyAuth": []
|
"ApiKeyAuth": []
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"description": "Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE resource for teardown.",
|
"description": "Subscribe to a process's WebRTC egress stream. Body is the SDP offer (Content-Type: application/sdp). Response is the SDP answer; the Location header points at the DELETE/PATCH resource for teardown and trickle ICE.",
|
||||||
"consumes": [
|
"consumes": [
|
||||||
"application/sdp"
|
"application/sdp"
|
||||||
],
|
],
|
||||||
|
|
@ -1943,8 +1943,20 @@
|
||||||
"type": "string"
|
"type": "string"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"406": {
|
||||||
|
"description": "offer SDP missing required H264 / Opus rtpmap",
|
||||||
|
"schema": {
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
"503": {
|
"503": {
|
||||||
"description": "peer cap reached",
|
"description": "peer cap reached (per-stream or total)",
|
||||||
|
"schema": {
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"504": {
|
||||||
|
"description": "ICE gathering timeout",
|
||||||
"schema": {
|
"schema": {
|
||||||
"type": "string"
|
"type": "string"
|
||||||
}
|
}
|
||||||
|
|
@ -1959,7 +1971,7 @@
|
||||||
"ApiKeyAuth": []
|
"ApiKeyAuth": []
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"description": "Tear down a WebRTC peer connection by its resource id (returned in the Location header by Subscribe). Idempotent: returns 204 even when the resource is unknown, per the WHEP spec.",
|
"description": "Idempotent peer teardown by resource id (returned in the Location header by Subscribe). Returns 204 even when the resource is unknown, per the WHEP spec.",
|
||||||
"tags": [
|
"tags": [
|
||||||
"v16.16.0"
|
"v16.16.0"
|
||||||
],
|
],
|
||||||
|
|
@ -1992,6 +2004,55 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
"patch": {
|
||||||
|
"security": [
|
||||||
|
{
|
||||||
|
"ApiKeyAuth": []
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": "Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.",
|
||||||
|
"consumes": [
|
||||||
|
"application/trickle-ice-sdpfrag"
|
||||||
|
],
|
||||||
|
"tags": [
|
||||||
|
"v16.16.0"
|
||||||
|
],
|
||||||
|
"summary": "Trickle ICE candidates for a WHEP subscription",
|
||||||
|
"operationId": "webrtc-3-whep-trickle",
|
||||||
|
"parameters": [
|
||||||
|
{
|
||||||
|
"type": "string",
|
||||||
|
"description": "Process ID",
|
||||||
|
"name": "id",
|
||||||
|
"in": "path",
|
||||||
|
"required": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "string",
|
||||||
|
"description": "Resource ID from the Subscribe Location header",
|
||||||
|
"name": "resource",
|
||||||
|
"in": "path",
|
||||||
|
"required": true
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"responses": {
|
||||||
|
"204": {
|
||||||
|
"description": "no content"
|
||||||
|
},
|
||||||
|
"400": {
|
||||||
|
"description": "missing resource id or unreadable body",
|
||||||
|
"schema": {
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"404": {
|
||||||
|
"description": "peer not found",
|
||||||
|
"schema": {
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"/api/v3/widget/process/{id}": {
|
"/api/v3/widget/process/{id}": {
|
||||||
|
|
@ -3276,6 +3337,9 @@
|
||||||
"api.ProcessConfigWebRTC": {
|
"api.ProcessConfigWebRTC": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
|
"audio_map": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
"audio_pt": {
|
"audio_pt": {
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
},
|
},
|
||||||
|
|
@ -3285,6 +3349,9 @@
|
||||||
"force_transcode": {
|
"force_transcode": {
|
||||||
"type": "boolean"
|
"type": "boolean"
|
||||||
},
|
},
|
||||||
|
"video_map": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
"video_pt": {
|
"video_pt": {
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -796,12 +796,16 @@ definitions:
|
||||||
type: object
|
type: object
|
||||||
api.ProcessConfigWebRTC:
|
api.ProcessConfigWebRTC:
|
||||||
properties:
|
properties:
|
||||||
|
audio_map:
|
||||||
|
type: string
|
||||||
audio_pt:
|
audio_pt:
|
||||||
type: integer
|
type: integer
|
||||||
enabled:
|
enabled:
|
||||||
type: boolean
|
type: boolean
|
||||||
force_transcode:
|
force_transcode:
|
||||||
type: boolean
|
type: boolean
|
||||||
|
video_map:
|
||||||
|
type: string
|
||||||
video_pt:
|
video_pt:
|
||||||
type: integer
|
type: integer
|
||||||
type: object
|
type: object
|
||||||
|
|
@ -3223,7 +3227,7 @@ paths:
|
||||||
- application/sdp
|
- application/sdp
|
||||||
description: 'Subscribe to a process''s WebRTC egress stream. Body is the SDP
|
description: 'Subscribe to a process''s WebRTC egress stream. Body is the SDP
|
||||||
offer (Content-Type: application/sdp). Response is the SDP answer; the Location
|
offer (Content-Type: application/sdp). Response is the SDP answer; the Location
|
||||||
header points at the DELETE resource for teardown.'
|
header points at the DELETE/PATCH resource for teardown and trickle ICE.'
|
||||||
operationId: webrtc-3-whep-subscribe
|
operationId: webrtc-3-whep-subscribe
|
||||||
parameters:
|
parameters:
|
||||||
- description: Process ID with config.webrtc.enabled=true
|
- description: Process ID with config.webrtc.enabled=true
|
||||||
|
|
@ -3246,8 +3250,16 @@ paths:
|
||||||
description: no stream registered for this process id
|
description: no stream registered for this process id
|
||||||
schema:
|
schema:
|
||||||
type: string
|
type: string
|
||||||
|
"406":
|
||||||
|
description: offer SDP missing required H264 / Opus rtpmap
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
"503":
|
"503":
|
||||||
description: peer cap reached
|
description: peer cap reached (per-stream or total)
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
"504":
|
||||||
|
description: ICE gathering timeout
|
||||||
schema:
|
schema:
|
||||||
type: string
|
type: string
|
||||||
security:
|
security:
|
||||||
|
|
@ -3257,9 +3269,9 @@ paths:
|
||||||
- v16.16.0
|
- v16.16.0
|
||||||
/api/v3/whep/{id}/{resource}:
|
/api/v3/whep/{id}/{resource}:
|
||||||
delete:
|
delete:
|
||||||
description: 'Tear down a WebRTC peer connection by its resource id (returned
|
description: Idempotent peer teardown by resource id (returned in the Location
|
||||||
in the Location header by Subscribe). Idempotent: returns 204 even when the
|
header by Subscribe). Returns 204 even when the resource is unknown, per the
|
||||||
resource is unknown, per the WHEP spec.'
|
WHEP spec.
|
||||||
operationId: webrtc-3-whep-unsubscribe
|
operationId: webrtc-3-whep-unsubscribe
|
||||||
parameters:
|
parameters:
|
||||||
- description: Process ID
|
- description: Process ID
|
||||||
|
|
@ -3284,6 +3296,38 @@ paths:
|
||||||
summary: Tear down a WHEP subscription
|
summary: Tear down a WHEP subscription
|
||||||
tags:
|
tags:
|
||||||
- v16.16.0
|
- v16.16.0
|
||||||
|
patch:
|
||||||
|
consumes:
|
||||||
|
- application/trickle-ice-sdpfrag
|
||||||
|
description: Add ICE candidates to an existing WebRTC peer. Body is application/trickle-ice-sdpfrag.
|
||||||
|
operationId: webrtc-3-whep-trickle
|
||||||
|
parameters:
|
||||||
|
- description: Process ID
|
||||||
|
in: path
|
||||||
|
name: id
|
||||||
|
required: true
|
||||||
|
type: string
|
||||||
|
- description: Resource ID from the Subscribe Location header
|
||||||
|
in: path
|
||||||
|
name: resource
|
||||||
|
required: true
|
||||||
|
type: string
|
||||||
|
responses:
|
||||||
|
"204":
|
||||||
|
description: no content
|
||||||
|
"400":
|
||||||
|
description: missing resource id or unreadable body
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
"404":
|
||||||
|
description: peer not found
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
security:
|
||||||
|
- ApiKeyAuth: []
|
||||||
|
summary: Trickle ICE candidates for a WHEP subscription
|
||||||
|
tags:
|
||||||
|
- v16.16.0
|
||||||
/api/v3/widget/process/{id}:
|
/api/v3/widget/process/{id}:
|
||||||
get:
|
get:
|
||||||
description: Fetch minimal statistics about a process, which is not protected
|
description: Fetch minimal statistics about a process, which is not protected
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue