feat(webrtc): add app/webrtc subsystem + lifecycle hooks
Introduces the subsystem layer that sits alongside api.API and wires
the M1 core/webrtc primitives into the per-process restream lifecycle.
app/webrtc/subsystem.go:
- Subsystem struct holding the global WebRTC config, core PeerFactory,
per-process stream map, and logger
- New(config.DataWebRTC, logger) constructor
- Enabled(), Hooks(), Close(), lookup() methods
app/webrtc/lifecycle.go:
- onProcessStart: allocates an adjacent UDP port pair, binds two
Pion Sources (video on V, audio on V+1), registers them under the
process id, and returns the two RTP output legs to append to the
FFmpeg command.
- onProcessStop: tears down the pair.
- allocAdjacentPair: retries up to 10 times to find a free (V, V+1)
pair since the kernel's ephemeral picker can hand us an odd port.
- splitRTPLegs: converts BuildArgs' flat []string into two ConfigIO
entries by splitting on the second -map token.
core/webrtc/peer.go + forward.go:
- Adds PeerFactory.CreatePeerFromSources for the M2 two-source
forwarding mode (video and audio on separate UDP ports, no
payload-type sniffing). Leaves CreatePeer intact for the M1 PoC.
- Adds forwardRTPSplit companion goroutine.
config/data.go:
- Promote anonymous WebRTC struct to named type DataWebRTC so
app/webrtc can accept it by value.
2026-04-17 10:02:00 -04:00
|
|
|
package webrtc
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
|
|
"github.com/datarhei/core/v16/config"
|
|
|
|
|
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
|
|
|
|
"github.com/datarhei/core/v16/log"
|
|
|
|
|
"github.com/datarhei/core/v16/restream"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Subsystem is the app-level WebRTC egress manager. It sits alongside
|
|
|
|
|
// api.API as a sibling — both consume the Restream service, both wire
|
2026-05-06 15:57:13 -04:00
|
|
|
// themselves into the Echo HTTP router. The subsystem is responsible for:
|
feat(webrtc): add app/webrtc subsystem + lifecycle hooks
Introduces the subsystem layer that sits alongside api.API and wires
the M1 core/webrtc primitives into the per-process restream lifecycle.
app/webrtc/subsystem.go:
- Subsystem struct holding the global WebRTC config, core PeerFactory,
per-process stream map, and logger
- New(config.DataWebRTC, logger) constructor
- Enabled(), Hooks(), Close(), lookup() methods
app/webrtc/lifecycle.go:
- onProcessStart: allocates an adjacent UDP port pair, binds two
Pion Sources (video on V, audio on V+1), registers them under the
process id, and returns the two RTP output legs to append to the
FFmpeg command.
- onProcessStop: tears down the pair.
- allocAdjacentPair: retries up to 10 times to find a free (V, V+1)
pair since the kernel's ephemeral picker can hand us an odd port.
- splitRTPLegs: converts BuildArgs' flat []string into two ConfigIO
entries by splitting on the second -map token.
core/webrtc/peer.go + forward.go:
- Adds PeerFactory.CreatePeerFromSources for the M2 two-source
forwarding mode (video and audio on separate UDP ports, no
payload-type sniffing). Leaves CreatePeer intact for the M1 PoC.
- Adds forwardRTPSplit companion goroutine.
config/data.go:
- Promote anonymous WebRTC struct to named type DataWebRTC so
app/webrtc can accept it by value.
2026-04-17 10:02:00 -04:00
|
|
|
//
|
|
|
|
|
// - Translating the global config.DataWebRTC into the core-level
|
|
|
|
|
// corewebrtc.Config used by the PeerFactory.
|
|
|
|
|
// - Installing ProcessHooks on Restreamer so that per-process start
|
|
|
|
|
// events allocate a pair of UDP ports, create Pion Sources, and
|
2026-05-09 16:32:49 -04:00
|
|
|
// inject RTP output legs into the FFmpeg command line (WHEP egress).
|
|
|
|
|
// - Optionally installing OnInputStart/OnInputStop hooks for WHIP
|
|
|
|
|
// ingest: allocates an adjacent UDP pair and injects RTP input legs.
|
|
|
|
|
// - Serving the WHEP Echo handler (see handler.go) and the WHIP Echo
|
|
|
|
|
// handler (see whip_handler.go).
|
feat(webrtc): add app/webrtc subsystem + lifecycle hooks
Introduces the subsystem layer that sits alongside api.API and wires
the M1 core/webrtc primitives into the per-process restream lifecycle.
app/webrtc/subsystem.go:
- Subsystem struct holding the global WebRTC config, core PeerFactory,
per-process stream map, and logger
- New(config.DataWebRTC, logger) constructor
- Enabled(), Hooks(), Close(), lookup() methods
app/webrtc/lifecycle.go:
- onProcessStart: allocates an adjacent UDP port pair, binds two
Pion Sources (video on V, audio on V+1), registers them under the
process id, and returns the two RTP output legs to append to the
FFmpeg command.
- onProcessStop: tears down the pair.
- allocAdjacentPair: retries up to 10 times to find a free (V, V+1)
pair since the kernel's ephemeral picker can hand us an odd port.
- splitRTPLegs: converts BuildArgs' flat []string into two ConfigIO
entries by splitting on the second -map token.
core/webrtc/peer.go + forward.go:
- Adds PeerFactory.CreatePeerFromSources for the M2 two-source
forwarding mode (video and audio on separate UDP ports, no
payload-type sniffing). Leaves CreatePeer intact for the M1 PoC.
- Adds forwardRTPSplit companion goroutine.
config/data.go:
- Promote anonymous WebRTC struct to named type DataWebRTC so
app/webrtc can accept it by value.
2026-04-17 10:02:00 -04:00
|
|
|
//
|
|
|
|
|
// The zero value is not usable; call New.
|
|
|
|
|
type Subsystem struct {
|
|
|
|
|
globalCfg config.DataWebRTC
|
|
|
|
|
coreCfg corewebrtc.Config
|
|
|
|
|
factory *corewebrtc.PeerFactory
|
|
|
|
|
logger log.Logger
|
|
|
|
|
|
|
|
|
|
mu sync.Mutex
|
2026-05-09 16:32:49 -04:00
|
|
|
streams map[string]*processStream // processID -> WHEP egress stream pair
|
2026-05-03 07:23:55 -04:00
|
|
|
|
2026-05-09 16:32:49 -04:00
|
|
|
// WHIP ingest: active port-pair allocations keyed by processID.
|
|
|
|
|
whipIngests map[string]*ingestStream
|
|
|
|
|
|
|
|
|
|
// teardown is set by the WHEP Handler to be called before egress
|
|
|
|
|
// Sources close in onProcessStop.
|
2026-05-03 07:23:55 -04:00
|
|
|
teardown func(streamID string)
|
2026-05-09 16:32:49 -04:00
|
|
|
|
|
|
|
|
// whipTeardown is set by the WHIPHandler to be called before the
|
|
|
|
|
// ingest port allocation is removed in onWHIPProcessStop.
|
|
|
|
|
whipTeardown func(streamID string)
|
feat(webrtc): add app/webrtc subsystem + lifecycle hooks
Introduces the subsystem layer that sits alongside api.API and wires
the M1 core/webrtc primitives into the per-process restream lifecycle.
app/webrtc/subsystem.go:
- Subsystem struct holding the global WebRTC config, core PeerFactory,
per-process stream map, and logger
- New(config.DataWebRTC, logger) constructor
- Enabled(), Hooks(), Close(), lookup() methods
app/webrtc/lifecycle.go:
- onProcessStart: allocates an adjacent UDP port pair, binds two
Pion Sources (video on V, audio on V+1), registers them under the
process id, and returns the two RTP output legs to append to the
FFmpeg command.
- onProcessStop: tears down the pair.
- allocAdjacentPair: retries up to 10 times to find a free (V, V+1)
pair since the kernel's ephemeral picker can hand us an odd port.
- splitRTPLegs: converts BuildArgs' flat []string into two ConfigIO
entries by splitting on the second -map token.
core/webrtc/peer.go + forward.go:
- Adds PeerFactory.CreatePeerFromSources for the M2 two-source
forwarding mode (video and audio on separate UDP ports, no
payload-type sniffing). Leaves CreatePeer intact for the M1 PoC.
- Adds forwardRTPSplit companion goroutine.
config/data.go:
- Promote anonymous WebRTC struct to named type DataWebRTC so
app/webrtc can accept it by value.
2026-04-17 10:02:00 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// processStream captures the two Sources (video + audio) backing a
|
|
|
|
|
// single running process's WHEP egress.
|
|
|
|
|
type processStream struct {
|
|
|
|
|
id string
|
|
|
|
|
video *corewebrtc.Source
|
|
|
|
|
audio *corewebrtc.Source
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// New constructs a Subsystem from the global WebRTC config section.
|
|
|
|
|
// The provided ffmpegUDPMax is advisory for logs only (M2 uses the
|
|
|
|
|
// OS's ephemeral range via Alloc). Returns an error if the PeerFactory
|
|
|
|
|
// cannot be built (e.g., bad NAT1To1 IPs).
|
|
|
|
|
func New(dataCfg config.DataWebRTC, logger log.Logger) (*Subsystem, error) {
|
|
|
|
|
if logger == nil {
|
|
|
|
|
logger = log.New("")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
coreCfg := corewebrtc.DefaultConfig()
|
|
|
|
|
coreCfg.Enabled = dataCfg.Enable
|
|
|
|
|
coreCfg.PublicIP = dataCfg.PublicIP
|
|
|
|
|
|
|
|
|
|
// If the operator configured multiple NAT1To1 IPs (e.g., dual
|
|
|
|
|
// LAN/public), they take precedence over PublicIP. Wire them
|
|
|
|
|
// through via PublicIP as the first entry; core/webrtc currently
|
|
|
|
|
// reads a single PublicIP, so M2 joins the list with the first
|
|
|
|
|
// entry winning. (Multi-IP NAT1To1 is an M3 enhancement.)
|
|
|
|
|
if len(dataCfg.NAT1To1IPs) > 0 && coreCfg.PublicIP == "" {
|
|
|
|
|
coreCfg.PublicIP = dataCfg.NAT1To1IPs[0]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
factory, err := corewebrtc.NewPeerFactory(coreCfg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("webrtc subsystem: build peer factory: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &Subsystem{
|
2026-05-09 16:32:49 -04:00
|
|
|
globalCfg: dataCfg,
|
|
|
|
|
coreCfg: coreCfg,
|
|
|
|
|
factory: factory,
|
|
|
|
|
logger: logger.WithComponent("WebRTC"),
|
|
|
|
|
streams: make(map[string]*processStream),
|
|
|
|
|
whipIngests: make(map[string]*ingestStream),
|
feat(webrtc): add app/webrtc subsystem + lifecycle hooks
Introduces the subsystem layer that sits alongside api.API and wires
the M1 core/webrtc primitives into the per-process restream lifecycle.
app/webrtc/subsystem.go:
- Subsystem struct holding the global WebRTC config, core PeerFactory,
per-process stream map, and logger
- New(config.DataWebRTC, logger) constructor
- Enabled(), Hooks(), Close(), lookup() methods
app/webrtc/lifecycle.go:
- onProcessStart: allocates an adjacent UDP port pair, binds two
Pion Sources (video on V, audio on V+1), registers them under the
process id, and returns the two RTP output legs to append to the
FFmpeg command.
- onProcessStop: tears down the pair.
- allocAdjacentPair: retries up to 10 times to find a free (V, V+1)
pair since the kernel's ephemeral picker can hand us an odd port.
- splitRTPLegs: converts BuildArgs' flat []string into two ConfigIO
entries by splitting on the second -map token.
core/webrtc/peer.go + forward.go:
- Adds PeerFactory.CreatePeerFromSources for the M2 two-source
forwarding mode (video and audio on separate UDP ports, no
payload-type sniffing). Leaves CreatePeer intact for the M1 PoC.
- Adds forwardRTPSplit companion goroutine.
config/data.go:
- Promote anonymous WebRTC struct to named type DataWebRTC so
app/webrtc can accept it by value.
2026-04-17 10:02:00 -04:00
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Enabled reports whether the subsystem should register hooks and
|
|
|
|
|
// serve the WHEP endpoint. Called by the API wiring layer to decide
|
|
|
|
|
// whether to install anything.
|
|
|
|
|
func (s *Subsystem) Enabled() bool {
|
|
|
|
|
return s.globalCfg.Enable
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Hooks returns the restream.ProcessHooks the subsystem expects to be
|
|
|
|
|
// installed via restream.Restreamer.SetHooks. Exactly one Subsystem
|
|
|
|
|
// instance should be installed per Restreamer.
|
2026-05-09 16:32:49 -04:00
|
|
|
//
|
|
|
|
|
// This returns only the WHEP egress hooks (OnStart/OnStop). Call
|
|
|
|
|
// WHIPHooks() to get the WHIP ingest hooks, and merge them with
|
|
|
|
|
// SetHooks if WHIP is also required.
|
feat(webrtc): add app/webrtc subsystem + lifecycle hooks
Introduces the subsystem layer that sits alongside api.API and wires
the M1 core/webrtc primitives into the per-process restream lifecycle.
app/webrtc/subsystem.go:
- Subsystem struct holding the global WebRTC config, core PeerFactory,
per-process stream map, and logger
- New(config.DataWebRTC, logger) constructor
- Enabled(), Hooks(), Close(), lookup() methods
app/webrtc/lifecycle.go:
- onProcessStart: allocates an adjacent UDP port pair, binds two
Pion Sources (video on V, audio on V+1), registers them under the
process id, and returns the two RTP output legs to append to the
FFmpeg command.
- onProcessStop: tears down the pair.
- allocAdjacentPair: retries up to 10 times to find a free (V, V+1)
pair since the kernel's ephemeral picker can hand us an odd port.
- splitRTPLegs: converts BuildArgs' flat []string into two ConfigIO
entries by splitting on the second -map token.
core/webrtc/peer.go + forward.go:
- Adds PeerFactory.CreatePeerFromSources for the M2 two-source
forwarding mode (video and audio on separate UDP ports, no
payload-type sniffing). Leaves CreatePeer intact for the M1 PoC.
- Adds forwardRTPSplit companion goroutine.
config/data.go:
- Promote anonymous WebRTC struct to named type DataWebRTC so
app/webrtc can accept it by value.
2026-04-17 10:02:00 -04:00
|
|
|
func (s *Subsystem) Hooks() restream.ProcessHooks {
|
|
|
|
|
return restream.ProcessHooks{
|
|
|
|
|
OnStart: s.onProcessStart,
|
|
|
|
|
OnStop: s.onProcessStop,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-09 16:32:49 -04:00
|
|
|
// WHIPHooks returns the restream.ProcessHooks for WHIP ingest
|
|
|
|
|
// (OnInputStart / OnInputStop). Merge these with the output from
|
|
|
|
|
// Hooks() before calling restream.Restreamer.SetHooks.
|
|
|
|
|
func (s *Subsystem) WHIPHooks() restream.ProcessHooks {
|
|
|
|
|
return restream.ProcessHooks{
|
|
|
|
|
OnInputStart: s.onWHIPProcessStart,
|
|
|
|
|
OnInputStop: s.onWHIPProcessStop,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MergedHooks returns ProcessHooks with both WHEP egress (OnStart/OnStop)
|
|
|
|
|
// and WHIP ingest (OnInputStart/OnInputStop) wired in. Convenience
|
|
|
|
|
// helper so callers don't have to merge manually.
|
|
|
|
|
func (s *Subsystem) MergedHooks() restream.ProcessHooks {
|
|
|
|
|
return restream.ProcessHooks{
|
|
|
|
|
OnStart: s.onProcessStart,
|
|
|
|
|
OnStop: s.onProcessStop,
|
|
|
|
|
OnInputStart: s.onWHIPProcessStart,
|
|
|
|
|
OnInputStop: s.onWHIPProcessStop,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
feat(webrtc): add app/webrtc subsystem + lifecycle hooks
Introduces the subsystem layer that sits alongside api.API and wires
the M1 core/webrtc primitives into the per-process restream lifecycle.
app/webrtc/subsystem.go:
- Subsystem struct holding the global WebRTC config, core PeerFactory,
per-process stream map, and logger
- New(config.DataWebRTC, logger) constructor
- Enabled(), Hooks(), Close(), lookup() methods
app/webrtc/lifecycle.go:
- onProcessStart: allocates an adjacent UDP port pair, binds two
Pion Sources (video on V, audio on V+1), registers them under the
process id, and returns the two RTP output legs to append to the
FFmpeg command.
- onProcessStop: tears down the pair.
- allocAdjacentPair: retries up to 10 times to find a free (V, V+1)
pair since the kernel's ephemeral picker can hand us an odd port.
- splitRTPLegs: converts BuildArgs' flat []string into two ConfigIO
entries by splitting on the second -map token.
core/webrtc/peer.go + forward.go:
- Adds PeerFactory.CreatePeerFromSources for the M2 two-source
forwarding mode (video and audio on separate UDP ports, no
payload-type sniffing). Leaves CreatePeer intact for the M1 PoC.
- Adds forwardRTPSplit companion goroutine.
config/data.go:
- Promote anonymous WebRTC struct to named type DataWebRTC so
app/webrtc can accept it by value.
2026-04-17 10:02:00 -04:00
|
|
|
// Close tears down every active per-process stream. It is safe to
|
|
|
|
|
// call during Core shutdown; subsequent WHEP requests will 404.
|
|
|
|
|
func (s *Subsystem) Close() {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
for id, st := range s.streams {
|
|
|
|
|
if st.video != nil {
|
|
|
|
|
_ = st.video.Close()
|
|
|
|
|
}
|
|
|
|
|
if st.audio != nil {
|
|
|
|
|
_ = st.audio.Close()
|
|
|
|
|
}
|
|
|
|
|
delete(s.streams, id)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-03 07:23:55 -04:00
|
|
|
// SetTeardownHook registers a callback invoked just before a stream's
|
|
|
|
|
// Sources are closed in onProcessStop. The callback is expected to
|
|
|
|
|
// tear down any external resources keyed by streamID — most importantly
|
|
|
|
|
// the WHEP Handler's per-stream peer index.
|
|
|
|
|
//
|
|
|
|
|
// Calling SetTeardownHook again replaces the previous callback; pass
|
|
|
|
|
// nil to detach. Only one consumer is supported by design.
|
|
|
|
|
func (s *Subsystem) SetTeardownHook(fn func(streamID string)) {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
s.teardown = fn
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-09 16:32:49 -04:00
|
|
|
// SetWHIPTeardownHook registers a callback invoked just before a WHIP
|
|
|
|
|
// ingest allocation is removed in onWHIPProcessStop. The WHIPHandler
|
|
|
|
|
// uses this to close any active publisher when FFmpeg stops.
|
|
|
|
|
func (s *Subsystem) SetWHIPTeardownHook(fn func(streamID string)) {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
s.whipTeardown = fn
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-06 15:57:13 -04:00
|
|
|
// StreamCount returns the number of processes currently registered with
|
|
|
|
|
// active WebRTC egress. Used by the Prometheus snapshot collector.
|
|
|
|
|
func (s *Subsystem) StreamCount() int {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
return len(s.streams)
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-10 13:30:35 -04:00
|
|
|
// ICEServerURIs returns the ICE server URI list from the core config.
|
|
|
|
|
// Used by the WHEP handler to emit RFC 9429 §4.3 Link headers so that
|
|
|
|
|
// browsers can discover STUN/TURN servers without a separate signalling
|
|
|
|
|
// round-trip.
|
|
|
|
|
func (s *Subsystem) ICEServerURIs() []string {
|
|
|
|
|
return s.coreCfg.ICEServers
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-09 16:32:49 -04:00
|
|
|
// lookup returns the per-process WHEP stream pair for id, or nil, false.
|
feat(webrtc): add app/webrtc subsystem + lifecycle hooks
Introduces the subsystem layer that sits alongside api.API and wires
the M1 core/webrtc primitives into the per-process restream lifecycle.
app/webrtc/subsystem.go:
- Subsystem struct holding the global WebRTC config, core PeerFactory,
per-process stream map, and logger
- New(config.DataWebRTC, logger) constructor
- Enabled(), Hooks(), Close(), lookup() methods
app/webrtc/lifecycle.go:
- onProcessStart: allocates an adjacent UDP port pair, binds two
Pion Sources (video on V, audio on V+1), registers them under the
process id, and returns the two RTP output legs to append to the
FFmpeg command.
- onProcessStop: tears down the pair.
- allocAdjacentPair: retries up to 10 times to find a free (V, V+1)
pair since the kernel's ephemeral picker can hand us an odd port.
- splitRTPLegs: converts BuildArgs' flat []string into two ConfigIO
entries by splitting on the second -map token.
core/webrtc/peer.go + forward.go:
- Adds PeerFactory.CreatePeerFromSources for the M2 two-source
forwarding mode (video and audio on separate UDP ports, no
payload-type sniffing). Leaves CreatePeer intact for the M1 PoC.
- Adds forwardRTPSplit companion goroutine.
config/data.go:
- Promote anonymous WebRTC struct to named type DataWebRTC so
app/webrtc can accept it by value.
2026-04-17 10:02:00 -04:00
|
|
|
// Used by the WHEP handler.
|
|
|
|
|
func (s *Subsystem) lookup(id string) (*processStream, bool) {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
st, ok := s.streams[id]
|
|
|
|
|
return st, ok
|
|
|
|
|
}
|
2026-05-09 16:32:49 -04:00
|
|
|
|
|
|
|
|
// lookupIngest returns the per-process WHIP ingest port allocation for
|
|
|
|
|
// id, or nil, false. Used by the WHIPHandler.
|
|
|
|
|
func (s *Subsystem) lookupIngest(id string) (*ingestStream, bool) {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
st, ok := s.whipIngests[id]
|
|
|
|
|
return st, ok
|
|
|
|
|
}
|