diff --git a/app/webrtc/handler.go b/app/webrtc/handler.go new file mode 100644 index 0000000..34db336 --- /dev/null +++ b/app/webrtc/handler.go @@ -0,0 +1,134 @@ +package webrtc + +import ( + "io" + "net/http" + "strings" + "sync" + "sync/atomic" + + "github.com/labstack/echo/v4" + "github.com/pion/webrtc/v4" + + corewebrtc "github.com/datarhei/core/v16/core/webrtc" +) + +// Handler exposes the subsystem's WHEP Echo handlers. Wire them into +// the /api/v3 group (or a sibling group) via Handler.Register. +type Handler struct { + sub *Subsystem + + peersMu sync.Mutex + peers map[string]*corewebrtc.Peer // resourceID -> peer + count int64 // atomic, for cap check without lock + maxCap int64 +} + +// NewHandler wraps the subsystem in an Echo-compatible HTTP handler. +// The maxPeers argument caps concurrent subscribers; pass 0 to use a +// generous default (matches corewebrtc.DefaultConfig). +func NewHandler(s *Subsystem, maxPeers int) *Handler { + cap := int64(maxPeers) + if cap <= 0 { + cap = int64(corewebrtc.DefaultConfig().MaxPeersTotal) + } + return &Handler{ + sub: s, + peers: make(map[string]*corewebrtc.Peer), + maxCap: cap, + } +} + +// Register mounts the WHEP routes on the provided Echo group. WHEP +// POST is /whep/:id, WHEP DELETE is /whep/:id/:resource. +// +// The routes are deliberately unauthenticated in M2 because WHEP +// clients (browsers, OBS) don't carry the Core JWT. M3 will add +// per-process signed-URL tokens; for M2 the deployment is expected +// to put the endpoint behind an authenticated reverse-proxy or VPN. +func (h *Handler) Register(g *echo.Group) { + g.POST("/whep/:id", h.Subscribe) + g.DELETE("/whep/:id/:resource", h.Unsubscribe) +} + +// Subscribe handles POST /whep/:id. Request body is an SDP offer, +// response is an SDP answer with a Location header pointing at the +// DELETE resource. +func (h *Handler) Subscribe(c echo.Context) error { + id := c.Param("id") + if id == "" { + return c.String(http.StatusBadRequest, "missing stream id") + } + + if atomic.LoadInt64(&h.count) >= h.maxCap { + return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error()) + } + + stream, ok := h.sub.lookup(id) + if !ok { + return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error()) + } + + body, err := io.ReadAll(c.Request().Body) + if err != nil { + return c.String(http.StatusBadRequest, "read body: "+err.Error()) + } + if len(body) == 0 || !strings.HasPrefix(string(body), "v=") { + return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error()) + } + + offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} + peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer) + if err != nil { + return c.String(http.StatusInternalServerError, "create peer: "+err.Error()) + } + + h.peersMu.Lock() + h.peers[peer.ResourceID()] = peer + h.peersMu.Unlock() + atomic.AddInt64(&h.count, 1) + + c.Response().Header().Set("Content-Type", "application/sdp") + c.Response().Header().Set("Location", "/whep/"+id+"/"+peer.ResourceID()) + return c.String(http.StatusCreated, peer.Answer().SDP) +} + +// Unsubscribe handles DELETE /whep/:id/:resource. The :id is part of +// the path per WHEP spec but we only need :resource to locate the +// peer; :id is accepted for route symmetry. +func (h *Handler) Unsubscribe(c echo.Context) error { + resource := c.Param("resource") + if resource == "" { + return c.String(http.StatusBadRequest, "missing resource id") + } + + h.peersMu.Lock() + peer, ok := h.peers[resource] + if ok { + delete(h.peers, resource) + } + h.peersMu.Unlock() + + if !ok { + return c.NoContent(http.StatusNotFound) + } + _ = peer.Close() + atomic.AddInt64(&h.count, -1) + return c.NoContent(http.StatusNoContent) +} + +// Close tears down every active peer (e.g., during Core shutdown). +func (h *Handler) Close() { + h.peersMu.Lock() + peers := make([]*corewebrtc.Peer, 0, len(h.peers)) + for _, p := range h.peers { + peers = append(peers, p) + } + h.peers = make(map[string]*corewebrtc.Peer) + h.peersMu.Unlock() + + for _, p := range peers { + _ = p.Close() + } + atomic.StoreInt64(&h.count, 0) +} diff --git a/app/webrtc/handler_test.go b/app/webrtc/handler_test.go new file mode 100644 index 0000000..51e88d0 --- /dev/null +++ b/app/webrtc/handler_test.go @@ -0,0 +1,89 @@ +package webrtc + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/labstack/echo/v4" + + "github.com/datarhei/core/v16/config" +) + +func newTestSubsystem(t *testing.T) *Subsystem { + t.Helper() + s, err := New(config.DataWebRTC{Enable: true}, nil) + if err != nil { + t.Fatalf("New: %v", err) + } + return s +} + +// TestHandler_Subscribe_404WhenStreamMissing verifies the WHEP POST +// returns 404 when no process has registered a stream for that id. +func TestHandler_Subscribe_404WhenStreamMissing(t *testing.T) { + h := NewHandler(newTestSubsystem(t), 0) + + e := echo.New() + req := httptest.NewRequest(http.MethodPost, "/whep/ghost", strings.NewReader("v=0\r\n")) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + c.SetParamNames("id") + c.SetParamValues("ghost") + + if err := h.Subscribe(c); err != nil { + t.Fatalf("Subscribe returned error: %v", err) + } + + if rec.Code != http.StatusNotFound { + t.Fatalf("expected 404, got %d: %s", rec.Code, rec.Body.String()) + } +} + +// TestHandler_Subscribe_400OnEmptyBody verifies invalid SDP offers +// short-circuit before any peer is created. Requires a registered +// stream so lookup doesn't 404 first. +func TestHandler_Subscribe_400OnEmptyBody(t *testing.T) { + sub := newTestSubsystem(t) + // Register a dummy stream so the handler reaches body validation. + sub.mu.Lock() + sub.streams["probe"] = &processStream{id: "probe"} // video/audio nil is fine here — we never get past body parse + sub.mu.Unlock() + + h := NewHandler(sub, 0) + + e := echo.New() + req := httptest.NewRequest(http.MethodPost, "/whep/probe", strings.NewReader("")) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + c.SetParamNames("id") + c.SetParamValues("probe") + + if err := h.Subscribe(c); err != nil { + t.Fatalf("Subscribe returned error: %v", err) + } + if rec.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d: %s", rec.Code, rec.Body.String()) + } +} + +// TestHandler_Unsubscribe_404WhenUnknown verifies a DELETE with an +// unknown resource id returns 404 and no state mutation. +func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) { + h := NewHandler(newTestSubsystem(t), 0) + + e := echo.New() + req := httptest.NewRequest(http.MethodDelete, "/whep/id/unknown", nil) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + c.SetParamNames("id", "resource") + c.SetParamValues("id", "unknown") + + if err := h.Unsubscribe(c); err != nil { + t.Fatalf("Unsubscribe returned error: %v", err) + } + if rec.Code != http.StatusNotFound { + t.Fatalf("expected 404, got %d", rec.Code) + } +}