feat(restream): add ProcessHooks for WebRTC subsystem integration
Adds a pair of lifecycle callbacks the app/webrtc subsystem installs via SetHooks: - OnStart fires synchronously just before ffmpeg.Start(). It receives the task config and may return []ConfigIO extras to append to the output list. When extras are appended, startProcess rebuilds the FFmpeg command and the underlying process.Process before starting. A non-nil error aborts the start. - OnStop fires synchronously just after ffmpeg.Stop() so subsystems can tear down per-process state. Hooks run with the restream write lock held; they must not call back into Restreamer methods or they will deadlock. This is the pattern app/webrtc uses to inject per-process RTP output legs without having to reach into restream internals from outside.
This commit is contained in:
parent
16ae17d2a1
commit
46531bb479
1 changed files with 76 additions and 0 deletions
|
|
@ -55,6 +55,30 @@ type Restreamer interface {
|
||||||
GetProcessMetadata(id, key string) (interface{}, error) // Get previously set metadata from a process
|
GetProcessMetadata(id, key string) (interface{}, error) // Get previously set metadata from a process
|
||||||
SetMetadata(key string, data interface{}) error // Set general metadata
|
SetMetadata(key string, data interface{}) error // Set general metadata
|
||||||
GetMetadata(key string) (interface{}, error) // Get previously set general metadata
|
GetMetadata(key string) (interface{}, error) // Get previously set general metadata
|
||||||
|
SetHooks(hooks ProcessHooks) // Install per-process lifecycle hooks (e.g., WebRTC subsystem)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessStartHook is invoked synchronously inside startProcess just
|
||||||
|
// before FFmpeg is started. It receives a pointer to the task config;
|
||||||
|
// returning a non-empty slice of ConfigIO appends those output legs to
|
||||||
|
// cfg.Output and causes the FFmpeg command to be rebuilt before
|
||||||
|
// Start(). Returning a non-nil error aborts the start.
|
||||||
|
//
|
||||||
|
// Hooks run with the restream write lock held, so they must not call
|
||||||
|
// back into the Restreamer interface (it would deadlock). They can,
|
||||||
|
// however, mutate cfg.WebRTC metadata or read cfg fields freely.
|
||||||
|
type ProcessStartHook func(id string, cfg *app.Config) ([]app.ConfigIO, error)
|
||||||
|
|
||||||
|
// ProcessStopHook is invoked synchronously inside stopProcess just
|
||||||
|
// after FFmpeg has been stopped. It is a notification for subsystems
|
||||||
|
// to tear down any per-process state they attached at start.
|
||||||
|
type ProcessStopHook func(id string)
|
||||||
|
|
||||||
|
// ProcessHooks bundles the lifecycle callbacks a sibling subsystem
|
||||||
|
// (currently: app/webrtc) installs via SetHooks.
|
||||||
|
type ProcessHooks struct {
|
||||||
|
OnStart ProcessStartHook
|
||||||
|
OnStop ProcessStopHook
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config is the required configuration for a new restreamer instance.
|
// Config is the required configuration for a new restreamer instance.
|
||||||
|
|
@ -102,12 +126,24 @@ type restream struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
metadata map[string]interface{}
|
metadata map[string]interface{}
|
||||||
|
|
||||||
|
hooks ProcessHooks
|
||||||
|
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
|
||||||
startOnce sync.Once
|
startOnce sync.Once
|
||||||
stopOnce sync.Once
|
stopOnce sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetHooks installs the process lifecycle hooks. The caller is
|
||||||
|
// responsible for installing hooks before Start() is invoked; calling
|
||||||
|
// SetHooks on a running instance is safe but only affects subsequent
|
||||||
|
// start/stop transitions (not the one currently in flight).
|
||||||
|
func (r *restream) SetHooks(hooks ProcessHooks) {
|
||||||
|
r.lock.Lock()
|
||||||
|
defer r.lock.Unlock()
|
||||||
|
r.hooks = hooks
|
||||||
|
}
|
||||||
|
|
||||||
// New returns a new instance that implements the Restreamer interface
|
// New returns a new instance that implements the Restreamer interface
|
||||||
func New(config Config) (Restreamer, error) {
|
func New(config Config) (Restreamer, error) {
|
||||||
r := &restream{
|
r := &restream{
|
||||||
|
|
@ -1062,6 +1098,39 @@ func (r *restream) startProcess(id string) error {
|
||||||
|
|
||||||
task.process.Order = "start"
|
task.process.Order = "start"
|
||||||
|
|
||||||
|
// Invoke the per-process start hook (used by app/webrtc to append
|
||||||
|
// RTP output legs). If it returns ConfigIO entries, append them to
|
||||||
|
// the output list and rebuild the FFmpeg process with the new
|
||||||
|
// command before we start it.
|
||||||
|
if r.hooks.OnStart != nil {
|
||||||
|
extras, err := r.hooks.OnStart(task.id, task.config)
|
||||||
|
if err != nil {
|
||||||
|
r.logger.WithField("id", task.id).WithError(err).Error().Log("Start hook aborted process start")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(extras) > 0 {
|
||||||
|
task.config.Output = append(task.config.Output, extras...)
|
||||||
|
task.command = task.config.CreateCommand()
|
||||||
|
|
||||||
|
newFFmpeg, ferr := r.ffmpeg.New(ffmpeg.ProcessConfig{
|
||||||
|
Reconnect: task.config.Reconnect,
|
||||||
|
ReconnectDelay: time.Duration(task.config.ReconnectDelay) * time.Second,
|
||||||
|
StaleTimeout: time.Duration(task.config.StaleTimeout) * time.Second,
|
||||||
|
LimitCPU: task.config.LimitCPU,
|
||||||
|
LimitMemory: task.config.LimitMemory,
|
||||||
|
LimitDuration: time.Duration(task.config.LimitWaitFor) * time.Second,
|
||||||
|
Command: task.command,
|
||||||
|
Parser: task.parser,
|
||||||
|
Logger: task.logger,
|
||||||
|
})
|
||||||
|
if ferr != nil {
|
||||||
|
r.logger.WithField("id", task.id).WithError(ferr).Error().Log("Failed to rebuild FFmpeg after start hook")
|
||||||
|
return ferr
|
||||||
|
}
|
||||||
|
task.ffmpeg = newFFmpeg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
task.ffmpeg.Start()
|
task.ffmpeg.Start()
|
||||||
|
|
||||||
r.nProc++
|
r.nProc++
|
||||||
|
|
@ -1105,6 +1174,13 @@ func (r *restream) stopProcess(id string) error {
|
||||||
|
|
||||||
r.nProc--
|
r.nProc--
|
||||||
|
|
||||||
|
// Notify subsystems (app/webrtc) that this process has been
|
||||||
|
// stopped so they can tear down any per-process state. Hook is
|
||||||
|
// best-effort: errors are the hook's problem to log.
|
||||||
|
if r.hooks.OnStop != nil {
|
||||||
|
r.hooks.OnStop(task.id)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue