datarhei-dragonfork-core/app/webrtc/subsystem.go
ZGaetano 4364d9176f
Some checks failed
ci / race tests (push) Blocked by required conditions
ci / WebRTC smoke (5-viewer fanout) (push) Blocked by required conditions
ci / WebRTC latency p95 gate (push) Blocked by required conditions
ci / vet + build (push) Has been cancelled
webrtc: apply operator ICEServers override in subsystem.New for issue #23
2026-05-10 21:15:42 -04:00

226 lines
7.6 KiB
Go

package webrtc
import (
"fmt"
"sync"
"github.com/datarhei/core/v16/config"
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/restream"
)
// Subsystem is the app-level WebRTC egress manager. It sits alongside
// api.API as a sibling — both consume the Restream service, both wire
// themselves into the Echo HTTP router. The subsystem is responsible for:
//
// - Translating the global config.DataWebRTC into the core-level
// corewebrtc.Config used by the PeerFactory.
// - Installing ProcessHooks on Restreamer so that per-process start
// events allocate a pair of UDP ports, create Pion Sources, and
// inject RTP output legs into the FFmpeg command line (WHEP egress).
// - Optionally installing OnInputStart/OnInputStop hooks for WHIP
// ingest: allocates an adjacent UDP pair and injects RTP input legs.
// - Serving the WHEP Echo handler (see handler.go) and the WHIP Echo
// handler (see whip_handler.go).
//
// The zero value is not usable; call New.
type Subsystem struct {
globalCfg config.DataWebRTC
coreCfg corewebrtc.Config
factory *corewebrtc.PeerFactory
logger log.Logger
mu sync.Mutex
streams map[string]*processStream // processID -> WHEP egress stream pair
// WHIP ingest: active port-pair allocations keyed by processID.
whipIngests map[string]*ingestStream
// teardown is set by the WHEP Handler to be called before egress
// Sources close in onProcessStop.
teardown func(streamID string)
// whipTeardown is set by the WHIPHandler to be called before the
// ingest port allocation is removed in onWHIPProcessStop.
whipTeardown func(streamID string)
}
// processStream captures the two Sources (video + audio) backing a
// single running process's WHEP egress.
type processStream struct {
id string
video *corewebrtc.Source
audio *corewebrtc.Source
}
// New constructs a Subsystem from the global WebRTC config section.
// The provided ffmpegUDPMax is advisory for logs only (M2 uses the
// OS's ephemeral range via Alloc). Returns an error if the PeerFactory
// cannot be built (e.g., bad NAT1To1 IPs).
func New(dataCfg config.DataWebRTC, logger log.Logger) (*Subsystem, error) {
if logger == nil {
logger = log.New("")
}
coreCfg := corewebrtc.DefaultConfig()
coreCfg.Enabled = dataCfg.Enable
coreCfg.PublicIP = dataCfg.PublicIP
// Build the NAT1To1IPs list that Pion will use for host candidates.
// Strategy: merge PublicIP and NAT1To1IPs, deduplicating.
// - If PublicIP is set it comes first.
// - Any entries in NAT1To1IPs that differ from PublicIP are appended.
// This replaces the old single-IP workaround and allows dual-homed
// servers (e.g., a LAN IP + a public IP) to advertise host candidates
// on all interfaces simultaneously.
nat1to1IPs := make([]string, 0, len(dataCfg.NAT1To1IPs)+1)
if dataCfg.PublicIP != "" {
nat1to1IPs = append(nat1to1IPs, dataCfg.PublicIP)
}
for _, ip := range dataCfg.NAT1To1IPs {
if ip != dataCfg.PublicIP {
nat1to1IPs = append(nat1to1IPs, ip)
}
}
coreCfg.NAT1To1IPs = nat1to1IPs
// If the operator supplied explicit ICE server URIs via config/env,
// override the built-in defaults (typically Google's public STUN servers).
// An empty list means "keep the built-in defaults".
if len(dataCfg.ICEServers) > 0 {
coreCfg.ICEServers = make([]string, len(dataCfg.ICEServers))
copy(coreCfg.ICEServers, dataCfg.ICEServers)
}
factory, err := corewebrtc.NewPeerFactory(coreCfg)
if err != nil {
return nil, fmt.Errorf("webrtc subsystem: build peer factory: %w", err)
}
return &Subsystem{
globalCfg: dataCfg,
coreCfg: coreCfg,
factory: factory,
logger: logger.WithComponent("WebRTC"),
streams: make(map[string]*processStream),
whipIngests: make(map[string]*ingestStream),
}, nil
}
// Enabled reports whether the subsystem should register hooks and
// serve the WHEP endpoint. Called by the API wiring layer to decide
// whether to install anything.
func (s *Subsystem) Enabled() bool {
return s.globalCfg.Enable
}
// Hooks returns the restream.ProcessHooks the subsystem expects to be
// installed via restream.Restreamer.SetHooks. Exactly one Subsystem
// instance should be installed per Restreamer.
//
// This returns only the WHEP egress hooks (OnStart/OnStop). Call
// WHIPHooks() to get the WHIP ingest hooks, and merge them with
// SetHooks if WHIP is also required.
func (s *Subsystem) Hooks() restream.ProcessHooks {
return restream.ProcessHooks{
OnStart: s.onProcessStart,
OnStop: s.onProcessStop,
}
}
// WHIPHooks returns the restream.ProcessHooks for WHIP ingest
// (OnInputStart / OnInputStop). Merge these with the output from
// Hooks() before calling restream.Restreamer.SetHooks.
func (s *Subsystem) WHIPHooks() restream.ProcessHooks {
return restream.ProcessHooks{
OnInputStart: s.onWHIPProcessStart,
OnInputStop: s.onWHIPProcessStop,
}
}
// MergedHooks returns ProcessHooks with both WHEP egress (OnStart/OnStop)
// and WHIP ingest (OnInputStart/OnInputStop) wired in. Convenience
// helper so callers don't have to merge manually.
func (s *Subsystem) MergedHooks() restream.ProcessHooks {
return restream.ProcessHooks{
OnStart: s.onProcessStart,
OnStop: s.onProcessStop,
OnInputStart: s.onWHIPProcessStart,
OnInputStop: s.onWHIPProcessStop,
}
}
// Close tears down every active per-process stream. It is safe to
// call during Core shutdown; subsequent WHEP requests will 404.
func (s *Subsystem) Close() {
s.mu.Lock()
defer s.mu.Unlock()
for id, st := range s.streams {
if st.video != nil {
_ = st.video.Close()
}
if st.audio != nil {
_ = st.audio.Close()
}
delete(s.streams, id)
}
}
// SetTeardownHook registers a callback invoked just before a stream's
// Sources are closed in onProcessStop. The callback is expected to
// tear down any external resources keyed by streamID — most importantly
// the WHEP Handler's per-stream peer index.
//
// Calling SetTeardownHook again replaces the previous callback; pass
// nil to detach. Only one consumer is supported by design.
func (s *Subsystem) SetTeardownHook(fn func(streamID string)) {
s.mu.Lock()
defer s.mu.Unlock()
s.teardown = fn
}
// SetWHIPTeardownHook registers a callback invoked just before a WHIP
// ingest allocation is removed in onWHIPProcessStop. The WHIPHandler
// uses this to close any active publisher when FFmpeg stops.
func (s *Subsystem) SetWHIPTeardownHook(fn func(streamID string)) {
s.mu.Lock()
defer s.mu.Unlock()
s.whipTeardown = fn
}
// StreamCount returns the number of processes currently registered with
// active WebRTC egress. Used by the Prometheus snapshot collector.
func (s *Subsystem) StreamCount() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.streams)
}
// ICEServerURIs returns the ICE server URI list from the core config.
// Used by the WHEP and WHIP handlers to emit RFC 9429 / RFC 9261 Link
// headers so that browsers can discover STUN/TURN servers without a
// separate signalling round-trip. If the operator configured explicit
// servers via CORE_WEBRTC_ICE_SERVERS those are returned; otherwise
// the built-in Pion defaults are returned.
func (s *Subsystem) ICEServerURIs() []string {
return s.coreCfg.ICEServers
}
// lookup returns the per-process WHEP stream pair for id, or nil, false.
// Used by the WHEP handler.
func (s *Subsystem) lookup(id string) (*processStream, bool) {
s.mu.Lock()
defer s.mu.Unlock()
st, ok := s.streams[id]
return st, ok
}
// lookupIngest returns the per-process WHIP ingest port allocation for
// id, or nil, false. Used by the WHIPHandler.
func (s *Subsystem) lookupIngest(id string) (*ingestStream, bool) {
s.mu.Lock()
defer s.mu.Unlock()
st, ok := s.whipIngests[id]
return st, ok
}