datarhei-dragonfork-core/app/webrtc/lifecycle.go
Zac Gaetano 3abd4d8fd1 feat(app/webrtc): broadcast process-stop via SetTeardownHook
Subsystem.SetTeardownHook installs a callback the subsystem invokes
just before closing per-stream Sources in onProcessStop. Used by the
WHEP Handler in M3 to drain its per-stream peer index before the
underlying Sources go away — closes the 'subscribers fan out into a
closed channel' race the design's §6 error matrix calls out as
'Publisher disconnects / FFmpeg exits'.

Single consumer by design (one subsystem, one handler). Calling
SetTeardownHook again replaces the previous callback; nil detaches.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-03 11:23:55 +00:00

202 lines
6 KiB
Go

package webrtc
import (
"fmt"
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
appcfg "github.com/datarhei/core/v16/restream/app"
)
// Default payload types. These match the values the M1 PoC / M2
// forwarder expects (H.264 = 102, Opus = 111). Operators can override
// per-process via the restream Config.
const (
defaultVideoPT = 102
defaultAudioPT = 111
)
// allocAttempts is the maximum number of times onProcessStart will
// retry port allocation to find two adjacent free loopback UDP ports.
// The kernel sometimes hands us an odd port for video, making V+1
// unavailable — in practice 2-3 retries is plenty.
const allocAttempts = 10
// onProcessStart is registered as the restream ProcessStartHook. It
// fires with the restream write lock held, just before FFmpeg Start.
//
// When the per-process WebRTC config is disabled, it returns (nil, nil)
// — FFmpeg starts normally without any extra output legs. When enabled
// it:
//
// 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1).
// 2. Binds Pion Sources on those ports and registers the pair under
// the process ID.
// 3. Builds the two RTP ConfigIO output legs via BuildArgs and returns
// them to the restream manager, which appends them to cfg.Output
// and rebuilds the FFmpeg command.
//
// Any error aborts the process start. On partial allocation failure,
// all allocated resources are cleaned up before returning.
func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.ConfigIO, error) {
if cfg == nil || !cfg.WebRTC.Enabled {
return nil, nil
}
// Normalize PTs — zero values mean "use defaults".
wcfg := cfg.WebRTC
if wcfg.VideoPT == 0 {
wcfg.VideoPT = defaultVideoPT
}
if wcfg.AudioPT == 0 {
wcfg.AudioPT = defaultAudioPT
}
// Refuse to re-register — the restream manager should never
// double-start a process but defensive check avoids a silent
// Source leak if it does.
s.mu.Lock()
if _, exists := s.streams[id]; exists {
s.mu.Unlock()
return nil, fmt.Errorf("webrtc: process %q already has an active stream", id)
}
s.mu.Unlock()
videoPort, videoSrc, audioSrc, err := s.allocAdjacentPair(id)
if err != nil {
return nil, err
}
// Start the UDP readers so they're draining packets the moment
// FFmpeg comes online.
videoSrc.Start()
audioSrc.Start()
s.mu.Lock()
s.streams[id] = &processStream{id: id, video: videoSrc, audio: audioSrc}
s.mu.Unlock()
s.logger.WithFields(map[string]interface{}{
"id": id,
"video_port": videoPort,
"audio_port": videoPort + 1,
"video_pt": wcfg.VideoPT,
"audio_pt": wcfg.AudioPT,
}).Info().Log("WebRTC egress registered for process")
args := BuildArgs(wcfg, videoPort)
return splitRTPLegs(args), nil
}
// onProcessStop is registered as the restream ProcessStopHook. It
// fires with the restream write lock held, just after FFmpeg has been
// stopped. It tears down the per-process Sources (which closes their
// sockets and hangs up any subscribed peers).
func (s *Subsystem) onProcessStop(id string) {
s.mu.Lock()
st, ok := s.streams[id]
teardown := s.teardown
if ok {
delete(s.streams, id)
}
s.mu.Unlock()
if !ok {
return
}
// Broadcast first, so any subscribed peers get torn down while
// the streamID is still meaningful. The handler's tearDownStreamPeers
// drives each Peer.Close() which in turn unsubscribes from the
// Sources we're about to shut down — preventing a "subscribers fan
// out into a closed channel" race.
if teardown != nil {
teardown(id)
}
if st.video != nil {
_ = st.video.Close()
}
if st.audio != nil {
_ = st.audio.Close()
}
s.logger.WithField("id", id).Info().Log("WebRTC egress torn down for process")
}
// allocAdjacentPair finds a pair of free loopback UDP ports (V, V+1)
// and binds a Source to each. It retries up to allocAttempts times
// because the kernel's ephemeral picker may hand us a port whose +1
// neighbor is already taken. Caller owns the returned Sources; on
// error all partial allocations are cleaned up.
func (s *Subsystem) allocAdjacentPair(id string) (int, *corewebrtc.Source, *corewebrtc.Source, error) {
var lastErr error
for attempt := 0; attempt < allocAttempts; attempt++ {
port, err := Alloc()
if err != nil {
lastErr = err
continue
}
videoSrc, err := corewebrtc.NewSourceOn(id, "127.0.0.1", port)
if err != nil {
lastErr = err
continue
}
audioSrc, err := corewebrtc.NewSourceOn(id+":audio", "127.0.0.1", port+1)
if err != nil {
_ = videoSrc.Close()
lastErr = err
continue
}
return port, videoSrc, audioSrc, nil
}
if lastErr == nil {
lastErr = fmt.Errorf("unknown allocation failure")
}
return 0, nil, nil, fmt.Errorf("webrtc: allocate adjacent UDP port pair after %d attempts: %w", allocAttempts, lastErr)
}
// splitRTPLegs converts the flat BuildArgs output into two ConfigIO
// entries — one per RTP output leg. It splits on the second "-map"
// token, which marks the audio leg's start (see ffmpeg_args_test.go).
// The Address of each ConfigIO is the last argument (the udp:// URL);
// everything preceding it forms that output's Options.
func splitRTPLegs(args []string) []appcfg.ConfigIO {
// Find the two -map indices.
mapIdx := []int{}
for i, a := range args {
if a == "-map" {
mapIdx = append(mapIdx, i)
}
}
if len(mapIdx) != 2 {
// BuildArgs always emits exactly 2 -maps; a different count
// means an upstream bug. Return a single leg covering
// everything to avoid silent truncation.
return []appcfg.ConfigIO{toLeg(args)}
}
videoTokens := args[mapIdx[0]:mapIdx[1]]
audioTokens := args[mapIdx[1]:]
return []appcfg.ConfigIO{
toLeg(videoTokens),
toLeg(audioTokens),
}
}
// toLeg splits a contiguous RTP-output token slice into a ConfigIO:
// the trailing token is the udp:// Address; everything before is the
// Options slice.
func toLeg(tokens []string) appcfg.ConfigIO {
if len(tokens) == 0 {
return appcfg.ConfigIO{}
}
addr := tokens[len(tokens)-1]
opts := make([]string, len(tokens)-1)
copy(opts, tokens[:len(tokens)-1])
return appcfg.ConfigIO{
ID: "webrtc",
Address: addr,
Options: opts,
}
}