From 6c9d1864dd078233b03ae4e3616769d479cf0017 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Sat, 9 May 2026 16:32:03 -0400 Subject: [PATCH] feat(restream): extend ProcessHooks with OnInputStart for WHIP ingest input legs --- restream/restream.go | 85 +++++++++++++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 28 deletions(-) diff --git a/restream/restream.go b/restream/restream.go index f844f6c..300e6d7 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -60,9 +60,11 @@ type Restreamer interface { // ProcessStartHook is invoked synchronously inside startProcess just // before FFmpeg is started. It receives a pointer to the task config; -// returning a non-empty slice of ConfigIO appends those output legs to -// cfg.Output and causes the FFmpeg command to be rebuilt before -// Start(). Returning a non-nil error aborts the start. +// returning a non-empty slice of ConfigIO causes the command to be +// rebuilt before Start(). Returning a non-nil error aborts the start. +// +// For OnStart the returned ConfigIO slices are appended to cfg.Output. +// For OnInputStart the returned ConfigIO slices are prepended to cfg.Input. // // Hooks run with the restream write lock held, so they must not call // back into the Restreamer interface (it would deadlock). They can, @@ -76,9 +78,15 @@ type ProcessStopHook func(id string) // ProcessHooks bundles the lifecycle callbacks a sibling subsystem // (currently: app/webrtc) installs via SetHooks. +// +// OnStart returns ConfigIO entries appended to cfg.Output (WHEP RTP egress legs). +// OnInputStart returns ConfigIO entries prepended to cfg.Input (WHIP RTP ingest legs). +// OnStop and OnInputStop are called after FFmpeg stops. type ProcessHooks struct { - OnStart ProcessStartHook - OnStop ProcessStopHook + OnStart ProcessStartHook + OnStop ProcessStopHook + OnInputStart ProcessStartHook // WHIP ingest: returned legs prepended to cfg.Input + OnInputStop ProcessStopHook // WHIP ingest: teardown notification } // Config is the required configuration for a new restreamer instance. @@ -1098,10 +1106,24 @@ func (r *restream) startProcess(id string) error { task.process.Order = "start" - // Invoke the per-process start hook (used by app/webrtc to append - // RTP output legs). If it returns ConfigIO entries, append them to - // the output list and rebuild the FFmpeg process with the new - // command before we start it. + // Invoke the per-process lifecycle hooks. OnInputStart returns + // ConfigIO entries prepended to cfg.Input (WHIP ingest legs); + // OnStart returns ConfigIO entries appended to cfg.Output (WHEP + // egress legs). Both rebuild the FFmpeg process if non-empty. + needsRebuild := false + + if r.hooks.OnInputStart != nil { + inputExtras, err := r.hooks.OnInputStart(task.id, task.config) + if err != nil { + r.logger.WithField("id", task.id).WithError(err).Error().Log("WHIP input hook aborted process start") + return err + } + if len(inputExtras) > 0 { + task.config.Input = append(inputExtras, task.config.Input...) + needsRebuild = true + } + } + if r.hooks.OnStart != nil { extras, err := r.hooks.OnStart(task.id, task.config) if err != nil { @@ -1110,27 +1132,31 @@ func (r *restream) startProcess(id string) error { } if len(extras) > 0 { task.config.Output = append(task.config.Output, extras...) - task.command = task.config.CreateCommand() - - newFFmpeg, ferr := r.ffmpeg.New(ffmpeg.ProcessConfig{ - Reconnect: task.config.Reconnect, - ReconnectDelay: time.Duration(task.config.ReconnectDelay) * time.Second, - StaleTimeout: time.Duration(task.config.StaleTimeout) * time.Second, - LimitCPU: task.config.LimitCPU, - LimitMemory: task.config.LimitMemory, - LimitDuration: time.Duration(task.config.LimitWaitFor) * time.Second, - Command: task.command, - Parser: task.parser, - Logger: task.logger, - }) - if ferr != nil { - r.logger.WithField("id", task.id).WithError(ferr).Error().Log("Failed to rebuild FFmpeg after start hook") - return ferr - } - task.ffmpeg = newFFmpeg + needsRebuild = true } } + if needsRebuild { + task.command = task.config.CreateCommand() + + newFFmpeg, ferr := r.ffmpeg.New(ffmpeg.ProcessConfig{ + Reconnect: task.config.Reconnect, + ReconnectDelay: time.Duration(task.config.ReconnectDelay) * time.Second, + StaleTimeout: time.Duration(task.config.StaleTimeout) * time.Second, + LimitCPU: task.config.LimitCPU, + LimitMemory: task.config.LimitMemory, + LimitDuration: time.Duration(task.config.LimitWaitFor) * time.Second, + Command: task.command, + Parser: task.parser, + Logger: task.logger, + }) + if ferr != nil { + r.logger.WithField("id", task.id).WithError(ferr).Error().Log("Failed to rebuild FFmpeg after hooks") + return ferr + } + task.ffmpeg = newFFmpeg + } + task.ffmpeg.Start() r.nProc++ @@ -1175,11 +1201,14 @@ func (r *restream) stopProcess(id string) error { r.nProc-- // Notify subsystems (app/webrtc) that this process has been - // stopped so they can tear down any per-process state. Hook is + // stopped so they can tear down any per-process state. Hooks are // best-effort: errors are the hook's problem to log. if r.hooks.OnStop != nil { r.hooks.OnStop(task.id) } + if r.hooks.OnInputStop != nil { + r.hooks.OnInputStop(task.id) + } return nil }