feat(whip): extend Subsystem with WHIP ingest state, lookupIngest, WHIPHooks
Some checks failed
ci / race tests (push) Blocked by required conditions
ci / WebRTC smoke (5-viewer fanout) (push) Blocked by required conditions
ci / WebRTC latency p95 gate (push) Blocked by required conditions
ci / vet + build (push) Has been cancelled

This commit is contained in:
Zac Gaetano 2026-05-09 16:32:49 -04:00
parent 6c9d1864dd
commit 4bef6563c7

View file

@ -18,8 +18,11 @@ import (
// corewebrtc.Config used by the PeerFactory. // corewebrtc.Config used by the PeerFactory.
// - Installing ProcessHooks on Restreamer so that per-process start // - Installing ProcessHooks on Restreamer so that per-process start
// events allocate a pair of UDP ports, create Pion Sources, and // events allocate a pair of UDP ports, create Pion Sources, and
// inject RTP output legs into the FFmpeg command line. // inject RTP output legs into the FFmpeg command line (WHEP egress).
// - Serving the WHEP Echo handler (see handler.go). // - Optionally installing OnInputStart/OnInputStop hooks for WHIP
// ingest: allocates an adjacent UDP pair and injects RTP input legs.
// - Serving the WHEP Echo handler (see handler.go) and the WHIP Echo
// handler (see whip_handler.go).
// //
// The zero value is not usable; call New. // The zero value is not usable; call New.
type Subsystem struct { type Subsystem struct {
@ -29,13 +32,18 @@ type Subsystem struct {
logger log.Logger logger log.Logger
mu sync.Mutex mu sync.Mutex
streams map[string]*processStream // processID -> stream pair streams map[string]*processStream // processID -> WHEP egress stream pair
// teardown is set by the Handler (or any other consumer) so the // WHIP ingest: active port-pair allocations keyed by processID.
// Subsystem can broadcast process-stop events. Called *before* whipIngests map[string]*ingestStream
// the per-stream Sources are closed, so consumers can yank their
// own indexes while the stream id is still valid. // teardown is set by the WHEP Handler to be called before egress
// Sources close in onProcessStop.
teardown func(streamID string) teardown func(streamID string)
// whipTeardown is set by the WHIPHandler to be called before the
// ingest port allocation is removed in onWHIPProcessStop.
whipTeardown func(streamID string)
} }
// processStream captures the two Sources (video + audio) backing a // processStream captures the two Sources (video + audio) backing a
@ -74,11 +82,12 @@ func New(dataCfg config.DataWebRTC, logger log.Logger) (*Subsystem, error) {
} }
return &Subsystem{ return &Subsystem{
globalCfg: dataCfg, globalCfg: dataCfg,
coreCfg: coreCfg, coreCfg: coreCfg,
factory: factory, factory: factory,
logger: logger.WithComponent("WebRTC"), logger: logger.WithComponent("WebRTC"),
streams: make(map[string]*processStream), streams: make(map[string]*processStream),
whipIngests: make(map[string]*ingestStream),
}, nil }, nil
} }
@ -92,6 +101,10 @@ func (s *Subsystem) Enabled() bool {
// Hooks returns the restream.ProcessHooks the subsystem expects to be // Hooks returns the restream.ProcessHooks the subsystem expects to be
// installed via restream.Restreamer.SetHooks. Exactly one Subsystem // installed via restream.Restreamer.SetHooks. Exactly one Subsystem
// instance should be installed per Restreamer. // instance should be installed per Restreamer.
//
// This returns only the WHEP egress hooks (OnStart/OnStop). Call
// WHIPHooks() to get the WHIP ingest hooks, and merge them with
// SetHooks if WHIP is also required.
func (s *Subsystem) Hooks() restream.ProcessHooks { func (s *Subsystem) Hooks() restream.ProcessHooks {
return restream.ProcessHooks{ return restream.ProcessHooks{
OnStart: s.onProcessStart, OnStart: s.onProcessStart,
@ -99,6 +112,28 @@ func (s *Subsystem) Hooks() restream.ProcessHooks {
} }
} }
// WHIPHooks returns the restream.ProcessHooks for WHIP ingest
// (OnInputStart / OnInputStop). Merge these with the output from
// Hooks() before calling restream.Restreamer.SetHooks.
func (s *Subsystem) WHIPHooks() restream.ProcessHooks {
return restream.ProcessHooks{
OnInputStart: s.onWHIPProcessStart,
OnInputStop: s.onWHIPProcessStop,
}
}
// MergedHooks returns ProcessHooks with both WHEP egress (OnStart/OnStop)
// and WHIP ingest (OnInputStart/OnInputStop) wired in. Convenience
// helper so callers don't have to merge manually.
func (s *Subsystem) MergedHooks() restream.ProcessHooks {
return restream.ProcessHooks{
OnStart: s.onProcessStart,
OnStop: s.onProcessStop,
OnInputStart: s.onWHIPProcessStart,
OnInputStop: s.onWHIPProcessStop,
}
}
// Close tears down every active per-process stream. It is safe to // Close tears down every active per-process stream. It is safe to
// call during Core shutdown; subsequent WHEP requests will 404. // call during Core shutdown; subsequent WHEP requests will 404.
func (s *Subsystem) Close() { func (s *Subsystem) Close() {
@ -128,6 +163,15 @@ func (s *Subsystem) SetTeardownHook(fn func(streamID string)) {
s.teardown = fn s.teardown = fn
} }
// SetWHIPTeardownHook registers a callback invoked just before a WHIP
// ingest allocation is removed in onWHIPProcessStop. The WHIPHandler
// uses this to close any active publisher when FFmpeg stops.
func (s *Subsystem) SetWHIPTeardownHook(fn func(streamID string)) {
s.mu.Lock()
defer s.mu.Unlock()
s.whipTeardown = fn
}
// StreamCount returns the number of processes currently registered with // StreamCount returns the number of processes currently registered with
// active WebRTC egress. Used by the Prometheus snapshot collector. // active WebRTC egress. Used by the Prometheus snapshot collector.
func (s *Subsystem) StreamCount() int { func (s *Subsystem) StreamCount() int {
@ -136,7 +180,7 @@ func (s *Subsystem) StreamCount() int {
return len(s.streams) return len(s.streams)
} }
// lookup returns the per-process stream pair for id, or nil, false. // lookup returns the per-process WHEP stream pair for id, or nil, false.
// Used by the WHEP handler. // Used by the WHEP handler.
func (s *Subsystem) lookup(id string) (*processStream, bool) { func (s *Subsystem) lookup(id string) (*processStream, bool) {
s.mu.Lock() s.mu.Lock()
@ -144,3 +188,12 @@ func (s *Subsystem) lookup(id string) (*processStream, bool) {
st, ok := s.streams[id] st, ok := s.streams[id]
return st, ok return st, ok
} }
// lookupIngest returns the per-process WHIP ingest port allocation for
// id, or nil, false. Used by the WHIPHandler.
func (s *Subsystem) lookupIngest(id string) (*ingestStream, bool) {
s.mu.Lock()
defer s.mu.Unlock()
st, ok := s.whipIngests[id]
return st, ok
}