feat(restream): extend ProcessHooks with OnInputStart for WHIP ingest input legs
Some checks failed
ci / race tests (push) Blocked by required conditions
ci / WebRTC smoke (5-viewer fanout) (push) Blocked by required conditions
ci / WebRTC latency p95 gate (push) Blocked by required conditions
ci / vet + build (push) Has been cancelled

This commit is contained in:
Zac Gaetano 2026-05-09 16:32:03 -04:00
parent ca3501f888
commit 6c9d1864dd

View file

@ -60,9 +60,11 @@ type Restreamer interface {
// ProcessStartHook is invoked synchronously inside startProcess just
// before FFmpeg is started. It receives a pointer to the task config;
// returning a non-empty slice of ConfigIO appends those output legs to
// cfg.Output and causes the FFmpeg command to be rebuilt before
// Start(). Returning a non-nil error aborts the start.
// returning a non-empty slice of ConfigIO causes the command to be
// rebuilt before Start(). Returning a non-nil error aborts the start.
//
// For OnStart the returned ConfigIO slices are appended to cfg.Output.
// For OnInputStart the returned ConfigIO slices are prepended to cfg.Input.
//
// Hooks run with the restream write lock held, so they must not call
// back into the Restreamer interface (it would deadlock). They can,
@ -76,9 +78,15 @@ type ProcessStopHook func(id string)
// ProcessHooks bundles the lifecycle callbacks a sibling subsystem
// (currently: app/webrtc) installs via SetHooks.
//
// OnStart returns ConfigIO entries appended to cfg.Output (WHEP RTP egress legs).
// OnInputStart returns ConfigIO entries prepended to cfg.Input (WHIP RTP ingest legs).
// OnStop and OnInputStop are called after FFmpeg stops.
type ProcessHooks struct {
OnStart ProcessStartHook
OnStop ProcessStopHook
OnInputStart ProcessStartHook // WHIP ingest: returned legs prepended to cfg.Input
OnInputStop ProcessStopHook // WHIP ingest: teardown notification
}
// Config is the required configuration for a new restreamer instance.
@ -1098,10 +1106,24 @@ func (r *restream) startProcess(id string) error {
task.process.Order = "start"
// Invoke the per-process start hook (used by app/webrtc to append
// RTP output legs). If it returns ConfigIO entries, append them to
// the output list and rebuild the FFmpeg process with the new
// command before we start it.
// Invoke the per-process lifecycle hooks. OnInputStart returns
// ConfigIO entries prepended to cfg.Input (WHIP ingest legs);
// OnStart returns ConfigIO entries appended to cfg.Output (WHEP
// egress legs). Both rebuild the FFmpeg process if non-empty.
needsRebuild := false
if r.hooks.OnInputStart != nil {
inputExtras, err := r.hooks.OnInputStart(task.id, task.config)
if err != nil {
r.logger.WithField("id", task.id).WithError(err).Error().Log("WHIP input hook aborted process start")
return err
}
if len(inputExtras) > 0 {
task.config.Input = append(inputExtras, task.config.Input...)
needsRebuild = true
}
}
if r.hooks.OnStart != nil {
extras, err := r.hooks.OnStart(task.id, task.config)
if err != nil {
@ -1110,6 +1132,11 @@ func (r *restream) startProcess(id string) error {
}
if len(extras) > 0 {
task.config.Output = append(task.config.Output, extras...)
needsRebuild = true
}
}
if needsRebuild {
task.command = task.config.CreateCommand()
newFFmpeg, ferr := r.ffmpeg.New(ffmpeg.ProcessConfig{
@ -1124,12 +1151,11 @@ func (r *restream) startProcess(id string) error {
Logger: task.logger,
})
if ferr != nil {
r.logger.WithField("id", task.id).WithError(ferr).Error().Log("Failed to rebuild FFmpeg after start hook")
r.logger.WithField("id", task.id).WithError(ferr).Error().Log("Failed to rebuild FFmpeg after hooks")
return ferr
}
task.ffmpeg = newFFmpeg
}
}
task.ffmpeg.Start()
@ -1175,11 +1201,14 @@ func (r *restream) stopProcess(id string) error {
r.nProc--
// Notify subsystems (app/webrtc) that this process has been
// stopped so they can tear down any per-process state. Hook is
// stopped so they can tear down any per-process state. Hooks are
// best-effort: errors are the hook's problem to log.
if r.hooks.OnStop != nil {
r.hooks.OnStop(task.id)
}
if r.hooks.OnInputStop != nil {
r.hooks.OnInputStop(task.id)
}
return nil
}