From 4bef6563c707e4ce9679df2dc9dc0401c66b28cf Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Sat, 9 May 2026 16:32:49 -0400 Subject: [PATCH] feat(whip): extend Subsystem with WHIP ingest state, lookupIngest, WHIPHooks --- app/webrtc/subsystem.go | 79 ++++++++++++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/app/webrtc/subsystem.go b/app/webrtc/subsystem.go index c325b46..6a3e902 100644 --- a/app/webrtc/subsystem.go +++ b/app/webrtc/subsystem.go @@ -18,8 +18,11 @@ import ( // corewebrtc.Config used by the PeerFactory. // - Installing ProcessHooks on Restreamer so that per-process start // events allocate a pair of UDP ports, create Pion Sources, and -// inject RTP output legs into the FFmpeg command line. -// - Serving the WHEP Echo handler (see handler.go). +// inject RTP output legs into the FFmpeg command line (WHEP egress). +// - Optionally installing OnInputStart/OnInputStop hooks for WHIP +// ingest: allocates an adjacent UDP pair and injects RTP input legs. +// - Serving the WHEP Echo handler (see handler.go) and the WHIP Echo +// handler (see whip_handler.go). // // The zero value is not usable; call New. type Subsystem struct { @@ -29,13 +32,18 @@ type Subsystem struct { logger log.Logger mu sync.Mutex - streams map[string]*processStream // processID -> stream pair + streams map[string]*processStream // processID -> WHEP egress stream pair - // teardown is set by the Handler (or any other consumer) so the - // Subsystem can broadcast process-stop events. Called *before* - // the per-stream Sources are closed, so consumers can yank their - // own indexes while the stream id is still valid. + // WHIP ingest: active port-pair allocations keyed by processID. + whipIngests map[string]*ingestStream + + // teardown is set by the WHEP Handler to be called before egress + // Sources close in onProcessStop. teardown func(streamID string) + + // whipTeardown is set by the WHIPHandler to be called before the + // ingest port allocation is removed in onWHIPProcessStop. + whipTeardown func(streamID string) } // processStream captures the two Sources (video + audio) backing a @@ -74,11 +82,12 @@ func New(dataCfg config.DataWebRTC, logger log.Logger) (*Subsystem, error) { } return &Subsystem{ - globalCfg: dataCfg, - coreCfg: coreCfg, - factory: factory, - logger: logger.WithComponent("WebRTC"), - streams: make(map[string]*processStream), + globalCfg: dataCfg, + coreCfg: coreCfg, + factory: factory, + logger: logger.WithComponent("WebRTC"), + streams: make(map[string]*processStream), + whipIngests: make(map[string]*ingestStream), }, nil } @@ -92,6 +101,10 @@ func (s *Subsystem) Enabled() bool { // Hooks returns the restream.ProcessHooks the subsystem expects to be // installed via restream.Restreamer.SetHooks. Exactly one Subsystem // instance should be installed per Restreamer. +// +// This returns only the WHEP egress hooks (OnStart/OnStop). Call +// WHIPHooks() to get the WHIP ingest hooks, and merge them with +// SetHooks if WHIP is also required. func (s *Subsystem) Hooks() restream.ProcessHooks { return restream.ProcessHooks{ OnStart: s.onProcessStart, @@ -99,6 +112,28 @@ func (s *Subsystem) Hooks() restream.ProcessHooks { } } +// WHIPHooks returns the restream.ProcessHooks for WHIP ingest +// (OnInputStart / OnInputStop). Merge these with the output from +// Hooks() before calling restream.Restreamer.SetHooks. +func (s *Subsystem) WHIPHooks() restream.ProcessHooks { + return restream.ProcessHooks{ + OnInputStart: s.onWHIPProcessStart, + OnInputStop: s.onWHIPProcessStop, + } +} + +// MergedHooks returns ProcessHooks with both WHEP egress (OnStart/OnStop) +// and WHIP ingest (OnInputStart/OnInputStop) wired in. Convenience +// helper so callers don't have to merge manually. +func (s *Subsystem) MergedHooks() restream.ProcessHooks { + return restream.ProcessHooks{ + OnStart: s.onProcessStart, + OnStop: s.onProcessStop, + OnInputStart: s.onWHIPProcessStart, + OnInputStop: s.onWHIPProcessStop, + } +} + // Close tears down every active per-process stream. It is safe to // call during Core shutdown; subsequent WHEP requests will 404. func (s *Subsystem) Close() { @@ -128,6 +163,15 @@ func (s *Subsystem) SetTeardownHook(fn func(streamID string)) { s.teardown = fn } +// SetWHIPTeardownHook registers a callback invoked just before a WHIP +// ingest allocation is removed in onWHIPProcessStop. The WHIPHandler +// uses this to close any active publisher when FFmpeg stops. +func (s *Subsystem) SetWHIPTeardownHook(fn func(streamID string)) { + s.mu.Lock() + defer s.mu.Unlock() + s.whipTeardown = fn +} + // StreamCount returns the number of processes currently registered with // active WebRTC egress. Used by the Prometheus snapshot collector. func (s *Subsystem) StreamCount() int { @@ -136,7 +180,7 @@ func (s *Subsystem) StreamCount() int { return len(s.streams) } -// lookup returns the per-process stream pair for id, or nil, false. +// lookup returns the per-process WHEP stream pair for id, or nil, false. // Used by the WHEP handler. func (s *Subsystem) lookup(id string) (*processStream, bool) { s.mu.Lock() @@ -144,3 +188,12 @@ func (s *Subsystem) lookup(id string) (*processStream, bool) { st, ok := s.streams[id] return st, ok } + +// lookupIngest returns the per-process WHIP ingest port allocation for +// id, or nil, false. Used by the WHIPHandler. +func (s *Subsystem) lookupIngest(id string) (*ingestStream, bool) { + s.mu.Lock() + defer s.mu.Unlock() + st, ok := s.whipIngests[id] + return st, ok +}