From 83eaa28601460c3432d52bf973dc535baeaa55b2 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Fri, 17 Apr 2026 10:08:54 -0400 Subject: [PATCH] feat(webrtc): wire app/webrtc subsystem into Core lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Installs the WebRTC egress subsystem at Core boot when cfg.WebRTC.Enable is true and the subsystem constructs cleanly: - http.Config gains an optional WebRTC *appwebrtc.Handler field; server.setRoutesV3 mounts its WHEP routes on the JWT-protected /api/v3 group. - api.start() constructs the Subsystem, registers its ProcessHooks with the restreamer, and builds a Handler. A construction failure is logged and Core continues without WebRTC — consistent with disabling the subsystem outright. - api.stop() closes the Handler (tearing down active peers) before closing the Subsystem (releasing per-process UDP sockets), mirroring the RTMP/SRT teardown pattern. Verified: go build ./... clean; go test ./app/webrtc/... ./core/webrtc/... ./restream/... ./http/... all pass. Co-Authored-By: Claude Opus 4.7 --- app/api/api.go | 31 +++++++++++++++++++++++++++++++ http/server.go | 13 +++++++++++++ 2 files changed, 44 insertions(+) diff --git a/app/api/api.go b/app/api/api.go index cfc6d89..f049b81 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -16,6 +16,7 @@ import ( "time" "github.com/datarhei/core/v16/app" + appwebrtc "github.com/datarhei/core/v16/app/webrtc" "github.com/datarhei/core/v16/config" configstore "github.com/datarhei/core/v16/config/store" configvars "github.com/datarhei/core/v16/config/vars" @@ -73,6 +74,8 @@ type api struct { s3fs map[string]fs.Filesystem rtmpserver rtmp.Server srtserver srt.Server + webrtcsub *appwebrtc.Subsystem + webrtchandler *appwebrtc.Handler metrics monitor.HistoryMonitor prom prometheus.Metrics service service.Service @@ -617,6 +620,22 @@ func (a *api) start() error { a.restream = restream + // Build the WebRTC egress subsystem if the operator enabled it. + // Failure to construct the subsystem (e.g., invalid NAT1To1 IP) + // is logged and the subsystem declines to install hooks — Core + // starts normally without WebRTC support, consistent with how + // disabling the subsystem at runtime is handled. + if cfg.WebRTC.Enable { + webrtcSub, werr := appwebrtc.New(cfg.WebRTC, a.log.logger.core) + if werr != nil { + a.log.logger.core.Warn().WithError(werr).Log("WebRTC subsystem disabled: construction failed") + } else { + a.restream.SetHooks(webrtcSub.Hooks()) + a.webrtcsub = webrtcSub + a.webrtchandler = appwebrtc.NewHandler(webrtcSub, 0) + } + } + var httpjwt jwt.JWT if cfg.API.Auth.Enable { @@ -1014,6 +1033,7 @@ func (a *api) start() error { }, RTMP: a.rtmpserver, SRT: a.srtserver, + WebRTC: a.webrtchandler, JWT: a.httpjwt, Config: a.config.store, Sessions: a.sessions, @@ -1354,6 +1374,17 @@ func (a *api) stop() { a.srtserver = nil } + // Tear down the WebRTC subsystem: close any active WHEP peers + // first, then release all per-process UDP sockets. + if a.webrtchandler != nil { + a.webrtchandler.Close() + a.webrtchandler = nil + } + if a.webrtcsub != nil { + a.webrtcsub.Close() + a.webrtcsub = nil + } + // Stop the RTMP server if a.rtmpserver != nil { a.log.logger.rtmp.Info().Log("Stopping ...") diff --git a/http/server.go b/http/server.go index 3b0f02a..330a5dd 100644 --- a/http/server.go +++ b/http/server.go @@ -33,6 +33,7 @@ import ( "net/http" "strings" + appwebrtc "github.com/datarhei/core/v16/app/webrtc" cfgstore "github.com/datarhei/core/v16/config/store" "github.com/datarhei/core/v16/http/cache" "github.com/datarhei/core/v16/http/errorhandler" @@ -86,6 +87,7 @@ type Config struct { Cors CorsConfig RTMP rtmp.Server SRT srt.Server + WebRTC *appwebrtc.Handler JWT jwt.JWT Config cfgstore.Store Cache cache.Cacher @@ -124,6 +126,7 @@ type server struct { session *api.SessionHandler widget *api.WidgetHandler resources *api.MetricsHandler + webrtc *appwebrtc.Handler } middleware struct { @@ -238,6 +241,10 @@ func NewServer(config Config) (Server, error) { ) } + if config.WebRTC != nil { + s.v3handler.webrtc = config.WebRTC + } + if config.Prometheus != nil { s.handler.prometheus = handler.NewPrometheus( config.Prometheus.HTTPHandler(), @@ -545,6 +552,12 @@ func (s *server) setRoutesV3(v3 *echo.Group) { s.router.GET("/api/v3/widget/process/:id", s.v3handler.widget.Get) } + // v3 WebRTC (WHEP egress). Mounted on the v3 group so JWT auth + // covers it in M2; public embed tokens will ship in M3. + if s.v3handler.webrtc != nil { + s.v3handler.webrtc.Register(v3) + } + // v3 Restreamer if s.v3handler.restream != nil { v3.GET("/skills", s.v3handler.restream.Skills)