feat(webrtc): wire app/webrtc subsystem into Core lifecycle
Installs the WebRTC egress subsystem at Core boot when cfg.WebRTC.Enable is true and the subsystem constructs cleanly: - http.Config gains an optional WebRTC *appwebrtc.Handler field; server.setRoutesV3 mounts its WHEP routes on the JWT-protected /api/v3 group. - api.start() constructs the Subsystem, registers its ProcessHooks with the restreamer, and builds a Handler. A construction failure is logged and Core continues without WebRTC — consistent with disabling the subsystem outright. - api.stop() closes the Handler (tearing down active peers) before closing the Subsystem (releasing per-process UDP sockets), mirroring the RTMP/SRT teardown pattern. Verified: go build ./... clean; go test ./app/webrtc/... ./core/webrtc/... ./restream/... ./http/... all pass. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
f6d5b3378a
commit
83eaa28601
2 changed files with 44 additions and 0 deletions
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/datarhei/core/v16/app"
|
"github.com/datarhei/core/v16/app"
|
||||||
|
appwebrtc "github.com/datarhei/core/v16/app/webrtc"
|
||||||
"github.com/datarhei/core/v16/config"
|
"github.com/datarhei/core/v16/config"
|
||||||
configstore "github.com/datarhei/core/v16/config/store"
|
configstore "github.com/datarhei/core/v16/config/store"
|
||||||
configvars "github.com/datarhei/core/v16/config/vars"
|
configvars "github.com/datarhei/core/v16/config/vars"
|
||||||
|
|
@ -73,6 +74,8 @@ type api struct {
|
||||||
s3fs map[string]fs.Filesystem
|
s3fs map[string]fs.Filesystem
|
||||||
rtmpserver rtmp.Server
|
rtmpserver rtmp.Server
|
||||||
srtserver srt.Server
|
srtserver srt.Server
|
||||||
|
webrtcsub *appwebrtc.Subsystem
|
||||||
|
webrtchandler *appwebrtc.Handler
|
||||||
metrics monitor.HistoryMonitor
|
metrics monitor.HistoryMonitor
|
||||||
prom prometheus.Metrics
|
prom prometheus.Metrics
|
||||||
service service.Service
|
service service.Service
|
||||||
|
|
@ -617,6 +620,22 @@ func (a *api) start() error {
|
||||||
|
|
||||||
a.restream = restream
|
a.restream = restream
|
||||||
|
|
||||||
|
// Build the WebRTC egress subsystem if the operator enabled it.
|
||||||
|
// Failure to construct the subsystem (e.g., invalid NAT1To1 IP)
|
||||||
|
// is logged and the subsystem declines to install hooks — Core
|
||||||
|
// starts normally without WebRTC support, consistent with how
|
||||||
|
// disabling the subsystem at runtime is handled.
|
||||||
|
if cfg.WebRTC.Enable {
|
||||||
|
webrtcSub, werr := appwebrtc.New(cfg.WebRTC, a.log.logger.core)
|
||||||
|
if werr != nil {
|
||||||
|
a.log.logger.core.Warn().WithError(werr).Log("WebRTC subsystem disabled: construction failed")
|
||||||
|
} else {
|
||||||
|
a.restream.SetHooks(webrtcSub.Hooks())
|
||||||
|
a.webrtcsub = webrtcSub
|
||||||
|
a.webrtchandler = appwebrtc.NewHandler(webrtcSub, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var httpjwt jwt.JWT
|
var httpjwt jwt.JWT
|
||||||
|
|
||||||
if cfg.API.Auth.Enable {
|
if cfg.API.Auth.Enable {
|
||||||
|
|
@ -1014,6 +1033,7 @@ func (a *api) start() error {
|
||||||
},
|
},
|
||||||
RTMP: a.rtmpserver,
|
RTMP: a.rtmpserver,
|
||||||
SRT: a.srtserver,
|
SRT: a.srtserver,
|
||||||
|
WebRTC: a.webrtchandler,
|
||||||
JWT: a.httpjwt,
|
JWT: a.httpjwt,
|
||||||
Config: a.config.store,
|
Config: a.config.store,
|
||||||
Sessions: a.sessions,
|
Sessions: a.sessions,
|
||||||
|
|
@ -1354,6 +1374,17 @@ func (a *api) stop() {
|
||||||
a.srtserver = nil
|
a.srtserver = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tear down the WebRTC subsystem: close any active WHEP peers
|
||||||
|
// first, then release all per-process UDP sockets.
|
||||||
|
if a.webrtchandler != nil {
|
||||||
|
a.webrtchandler.Close()
|
||||||
|
a.webrtchandler = nil
|
||||||
|
}
|
||||||
|
if a.webrtcsub != nil {
|
||||||
|
a.webrtcsub.Close()
|
||||||
|
a.webrtcsub = nil
|
||||||
|
}
|
||||||
|
|
||||||
// Stop the RTMP server
|
// Stop the RTMP server
|
||||||
if a.rtmpserver != nil {
|
if a.rtmpserver != nil {
|
||||||
a.log.logger.rtmp.Info().Log("Stopping ...")
|
a.log.logger.rtmp.Info().Log("Stopping ...")
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
appwebrtc "github.com/datarhei/core/v16/app/webrtc"
|
||||||
cfgstore "github.com/datarhei/core/v16/config/store"
|
cfgstore "github.com/datarhei/core/v16/config/store"
|
||||||
"github.com/datarhei/core/v16/http/cache"
|
"github.com/datarhei/core/v16/http/cache"
|
||||||
"github.com/datarhei/core/v16/http/errorhandler"
|
"github.com/datarhei/core/v16/http/errorhandler"
|
||||||
|
|
@ -86,6 +87,7 @@ type Config struct {
|
||||||
Cors CorsConfig
|
Cors CorsConfig
|
||||||
RTMP rtmp.Server
|
RTMP rtmp.Server
|
||||||
SRT srt.Server
|
SRT srt.Server
|
||||||
|
WebRTC *appwebrtc.Handler
|
||||||
JWT jwt.JWT
|
JWT jwt.JWT
|
||||||
Config cfgstore.Store
|
Config cfgstore.Store
|
||||||
Cache cache.Cacher
|
Cache cache.Cacher
|
||||||
|
|
@ -124,6 +126,7 @@ type server struct {
|
||||||
session *api.SessionHandler
|
session *api.SessionHandler
|
||||||
widget *api.WidgetHandler
|
widget *api.WidgetHandler
|
||||||
resources *api.MetricsHandler
|
resources *api.MetricsHandler
|
||||||
|
webrtc *appwebrtc.Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
middleware struct {
|
middleware struct {
|
||||||
|
|
@ -238,6 +241,10 @@ func NewServer(config Config) (Server, error) {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.WebRTC != nil {
|
||||||
|
s.v3handler.webrtc = config.WebRTC
|
||||||
|
}
|
||||||
|
|
||||||
if config.Prometheus != nil {
|
if config.Prometheus != nil {
|
||||||
s.handler.prometheus = handler.NewPrometheus(
|
s.handler.prometheus = handler.NewPrometheus(
|
||||||
config.Prometheus.HTTPHandler(),
|
config.Prometheus.HTTPHandler(),
|
||||||
|
|
@ -545,6 +552,12 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
|
||||||
s.router.GET("/api/v3/widget/process/:id", s.v3handler.widget.Get)
|
s.router.GET("/api/v3/widget/process/:id", s.v3handler.widget.Get)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// v3 WebRTC (WHEP egress). Mounted on the v3 group so JWT auth
|
||||||
|
// covers it in M2; public embed tokens will ship in M3.
|
||||||
|
if s.v3handler.webrtc != nil {
|
||||||
|
s.v3handler.webrtc.Register(v3)
|
||||||
|
}
|
||||||
|
|
||||||
// v3 Restreamer
|
// v3 Restreamer
|
||||||
if s.v3handler.restream != nil {
|
if s.v3handler.restream != nil {
|
||||||
v3.GET("/skills", s.v3handler.restream.Skills)
|
v3.GET("/skills", s.v3handler.restream.Skills)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue