From 46531bb479a9c84f3c2c6a2ad47cadf7d5fa952c Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Fri, 17 Apr 2026 09:57:14 -0400 Subject: [PATCH] feat(restream): add ProcessHooks for WebRTC subsystem integration Adds a pair of lifecycle callbacks the app/webrtc subsystem installs via SetHooks: - OnStart fires synchronously just before ffmpeg.Start(). It receives the task config and may return []ConfigIO extras to append to the output list. When extras are appended, startProcess rebuilds the FFmpeg command and the underlying process.Process before starting. A non-nil error aborts the start. - OnStop fires synchronously just after ffmpeg.Stop() so subsystems can tear down per-process state. Hooks run with the restream write lock held; they must not call back into Restreamer methods or they will deadlock. This is the pattern app/webrtc uses to inject per-process RTP output legs without having to reach into restream internals from outside. --- restream/restream.go | 76 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/restream/restream.go b/restream/restream.go index f3ce897..f844f6c 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -55,6 +55,30 @@ type Restreamer interface { GetProcessMetadata(id, key string) (interface{}, error) // Get previously set metadata from a process SetMetadata(key string, data interface{}) error // Set general metadata GetMetadata(key string) (interface{}, error) // Get previously set general metadata + SetHooks(hooks ProcessHooks) // Install per-process lifecycle hooks (e.g., WebRTC subsystem) +} + +// ProcessStartHook is invoked synchronously inside startProcess just +// before FFmpeg is started. It receives a pointer to the task config; +// returning a non-empty slice of ConfigIO appends those output legs to +// cfg.Output and causes the FFmpeg command to be rebuilt before +// Start(). Returning a non-nil error aborts the start. +// +// Hooks run with the restream write lock held, so they must not call +// back into the Restreamer interface (it would deadlock). They can, +// however, mutate cfg.WebRTC metadata or read cfg fields freely. +type ProcessStartHook func(id string, cfg *app.Config) ([]app.ConfigIO, error) + +// ProcessStopHook is invoked synchronously inside stopProcess just +// after FFmpeg has been stopped. It is a notification for subsystems +// to tear down any per-process state they attached at start. +type ProcessStopHook func(id string) + +// ProcessHooks bundles the lifecycle callbacks a sibling subsystem +// (currently: app/webrtc) installs via SetHooks. +type ProcessHooks struct { + OnStart ProcessStartHook + OnStop ProcessStopHook } // Config is the required configuration for a new restreamer instance. @@ -102,12 +126,24 @@ type restream struct { logger log.Logger metadata map[string]interface{} + hooks ProcessHooks + lock sync.RWMutex startOnce sync.Once stopOnce sync.Once } +// SetHooks installs the process lifecycle hooks. The caller is +// responsible for installing hooks before Start() is invoked; calling +// SetHooks on a running instance is safe but only affects subsequent +// start/stop transitions (not the one currently in flight). +func (r *restream) SetHooks(hooks ProcessHooks) { + r.lock.Lock() + defer r.lock.Unlock() + r.hooks = hooks +} + // New returns a new instance that implements the Restreamer interface func New(config Config) (Restreamer, error) { r := &restream{ @@ -1062,6 +1098,39 @@ func (r *restream) startProcess(id string) error { task.process.Order = "start" + // Invoke the per-process start hook (used by app/webrtc to append + // RTP output legs). If it returns ConfigIO entries, append them to + // the output list and rebuild the FFmpeg process with the new + // command before we start it. + if r.hooks.OnStart != nil { + extras, err := r.hooks.OnStart(task.id, task.config) + if err != nil { + r.logger.WithField("id", task.id).WithError(err).Error().Log("Start hook aborted process start") + return err + } + if len(extras) > 0 { + task.config.Output = append(task.config.Output, extras...) + task.command = task.config.CreateCommand() + + newFFmpeg, ferr := r.ffmpeg.New(ffmpeg.ProcessConfig{ + Reconnect: task.config.Reconnect, + ReconnectDelay: time.Duration(task.config.ReconnectDelay) * time.Second, + StaleTimeout: time.Duration(task.config.StaleTimeout) * time.Second, + LimitCPU: task.config.LimitCPU, + LimitMemory: task.config.LimitMemory, + LimitDuration: time.Duration(task.config.LimitWaitFor) * time.Second, + Command: task.command, + Parser: task.parser, + Logger: task.logger, + }) + if ferr != nil { + r.logger.WithField("id", task.id).WithError(ferr).Error().Log("Failed to rebuild FFmpeg after start hook") + return ferr + } + task.ffmpeg = newFFmpeg + } + } + task.ffmpeg.Start() r.nProc++ @@ -1105,6 +1174,13 @@ func (r *restream) stopProcess(id string) error { r.nProc-- + // Notify subsystems (app/webrtc) that this process has been + // stopped so they can tear down any per-process state. Hook is + // best-effort: errors are the hook's problem to log. + if r.hooks.OnStop != nil { + r.hooks.OnStop(task.id) + } + return nil }