datarhei-dragonfork-core/app/webrtc/subsystem.go

121 lines
3.6 KiB
Go
Raw Normal View History

package webrtc
import (
"fmt"
"sync"
"github.com/datarhei/core/v16/config"
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/restream"
)
// Subsystem is the app-level WebRTC egress manager. It sits alongside
// api.API as a sibling — both consume the Restream service, both wire
// themselves into the Echo HTTP router. The subsystem is responsible
// for:
//
// - Translating the global config.DataWebRTC into the core-level
// corewebrtc.Config used by the PeerFactory.
// - Installing ProcessHooks on Restreamer so that per-process start
// events allocate a pair of UDP ports, create Pion Sources, and
// inject RTP output legs into the FFmpeg command line.
// - Serving the WHEP Echo handler (see handler.go).
//
// The zero value is not usable; call New.
type Subsystem struct {
globalCfg config.DataWebRTC
coreCfg corewebrtc.Config
factory *corewebrtc.PeerFactory
logger log.Logger
mu sync.Mutex
streams map[string]*processStream // processID -> stream pair
}
// processStream captures the two Sources (video + audio) backing a
// single running process's WHEP egress.
type processStream struct {
id string
video *corewebrtc.Source
audio *corewebrtc.Source
}
// New constructs a Subsystem from the global WebRTC config section.
// The provided ffmpegUDPMax is advisory for logs only (M2 uses the
// OS's ephemeral range via Alloc). Returns an error if the PeerFactory
// cannot be built (e.g., bad NAT1To1 IPs).
func New(dataCfg config.DataWebRTC, logger log.Logger) (*Subsystem, error) {
if logger == nil {
logger = log.New("")
}
coreCfg := corewebrtc.DefaultConfig()
coreCfg.Enabled = dataCfg.Enable
coreCfg.PublicIP = dataCfg.PublicIP
// If the operator configured multiple NAT1To1 IPs (e.g., dual
// LAN/public), they take precedence over PublicIP. Wire them
// through via PublicIP as the first entry; core/webrtc currently
// reads a single PublicIP, so M2 joins the list with the first
// entry winning. (Multi-IP NAT1To1 is an M3 enhancement.)
if len(dataCfg.NAT1To1IPs) > 0 && coreCfg.PublicIP == "" {
coreCfg.PublicIP = dataCfg.NAT1To1IPs[0]
}
factory, err := corewebrtc.NewPeerFactory(coreCfg)
if err != nil {
return nil, fmt.Errorf("webrtc subsystem: build peer factory: %w", err)
}
return &Subsystem{
globalCfg: dataCfg,
coreCfg: coreCfg,
factory: factory,
logger: logger.WithComponent("WebRTC"),
streams: make(map[string]*processStream),
}, nil
}
// Enabled reports whether the subsystem should register hooks and
// serve the WHEP endpoint. Called by the API wiring layer to decide
// whether to install anything.
func (s *Subsystem) Enabled() bool {
return s.globalCfg.Enable
}
// Hooks returns the restream.ProcessHooks the subsystem expects to be
// installed via restream.Restreamer.SetHooks. Exactly one Subsystem
// instance should be installed per Restreamer.
func (s *Subsystem) Hooks() restream.ProcessHooks {
return restream.ProcessHooks{
OnStart: s.onProcessStart,
OnStop: s.onProcessStop,
}
}
// Close tears down every active per-process stream. It is safe to
// call during Core shutdown; subsequent WHEP requests will 404.
func (s *Subsystem) Close() {
s.mu.Lock()
defer s.mu.Unlock()
for id, st := range s.streams {
if st.video != nil {
_ = st.video.Close()
}
if st.audio != nil {
_ = st.audio.Close()
}
delete(s.streams, id)
}
}
// lookup returns the per-process stream pair for id, or nil, false.
// Used by the WHEP handler.
func (s *Subsystem) lookup(id string) (*processStream, bool) {
s.mu.Lock()
defer s.mu.Unlock()
st, ok := s.streams[id]
return st, ok
}