Subsystem.SetTeardownHook installs a callback the subsystem invokes just before closing per-stream Sources in onProcessStop. Used by the WHEP Handler in M3 to drain its per-stream peer index before the underlying Sources go away — closes the 'subscribers fan out into a closed channel' race the design's §6 error matrix calls out as 'Publisher disconnects / FFmpeg exits'. Single consumer by design (one subsystem, one handler). Calling SetTeardownHook again replaces the previous callback; nil detaches. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
202 lines
6 KiB
Go
202 lines
6 KiB
Go
package webrtc
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
|
appcfg "github.com/datarhei/core/v16/restream/app"
|
|
)
|
|
|
|
// Default payload types. These match the values the M1 PoC / M2
|
|
// forwarder expects (H.264 = 102, Opus = 111). Operators can override
|
|
// per-process via the restream Config.
|
|
const (
|
|
defaultVideoPT = 102
|
|
defaultAudioPT = 111
|
|
)
|
|
|
|
// allocAttempts is the maximum number of times onProcessStart will
|
|
// retry port allocation to find two adjacent free loopback UDP ports.
|
|
// The kernel sometimes hands us an odd port for video, making V+1
|
|
// unavailable — in practice 2-3 retries is plenty.
|
|
const allocAttempts = 10
|
|
|
|
// onProcessStart is registered as the restream ProcessStartHook. It
|
|
// fires with the restream write lock held, just before FFmpeg Start.
|
|
//
|
|
// When the per-process WebRTC config is disabled, it returns (nil, nil)
|
|
// — FFmpeg starts normally without any extra output legs. When enabled
|
|
// it:
|
|
//
|
|
// 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1).
|
|
// 2. Binds Pion Sources on those ports and registers the pair under
|
|
// the process ID.
|
|
// 3. Builds the two RTP ConfigIO output legs via BuildArgs and returns
|
|
// them to the restream manager, which appends them to cfg.Output
|
|
// and rebuilds the FFmpeg command.
|
|
//
|
|
// Any error aborts the process start. On partial allocation failure,
|
|
// all allocated resources are cleaned up before returning.
|
|
func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.ConfigIO, error) {
|
|
if cfg == nil || !cfg.WebRTC.Enabled {
|
|
return nil, nil
|
|
}
|
|
|
|
// Normalize PTs — zero values mean "use defaults".
|
|
wcfg := cfg.WebRTC
|
|
if wcfg.VideoPT == 0 {
|
|
wcfg.VideoPT = defaultVideoPT
|
|
}
|
|
if wcfg.AudioPT == 0 {
|
|
wcfg.AudioPT = defaultAudioPT
|
|
}
|
|
|
|
// Refuse to re-register — the restream manager should never
|
|
// double-start a process but defensive check avoids a silent
|
|
// Source leak if it does.
|
|
s.mu.Lock()
|
|
if _, exists := s.streams[id]; exists {
|
|
s.mu.Unlock()
|
|
return nil, fmt.Errorf("webrtc: process %q already has an active stream", id)
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
videoPort, videoSrc, audioSrc, err := s.allocAdjacentPair(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Start the UDP readers so they're draining packets the moment
|
|
// FFmpeg comes online.
|
|
videoSrc.Start()
|
|
audioSrc.Start()
|
|
|
|
s.mu.Lock()
|
|
s.streams[id] = &processStream{id: id, video: videoSrc, audio: audioSrc}
|
|
s.mu.Unlock()
|
|
|
|
s.logger.WithFields(map[string]interface{}{
|
|
"id": id,
|
|
"video_port": videoPort,
|
|
"audio_port": videoPort + 1,
|
|
"video_pt": wcfg.VideoPT,
|
|
"audio_pt": wcfg.AudioPT,
|
|
}).Info().Log("WebRTC egress registered for process")
|
|
|
|
args := BuildArgs(wcfg, videoPort)
|
|
return splitRTPLegs(args), nil
|
|
}
|
|
|
|
// onProcessStop is registered as the restream ProcessStopHook. It
|
|
// fires with the restream write lock held, just after FFmpeg has been
|
|
// stopped. It tears down the per-process Sources (which closes their
|
|
// sockets and hangs up any subscribed peers).
|
|
func (s *Subsystem) onProcessStop(id string) {
|
|
s.mu.Lock()
|
|
st, ok := s.streams[id]
|
|
teardown := s.teardown
|
|
if ok {
|
|
delete(s.streams, id)
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Broadcast first, so any subscribed peers get torn down while
|
|
// the streamID is still meaningful. The handler's tearDownStreamPeers
|
|
// drives each Peer.Close() which in turn unsubscribes from the
|
|
// Sources we're about to shut down — preventing a "subscribers fan
|
|
// out into a closed channel" race.
|
|
if teardown != nil {
|
|
teardown(id)
|
|
}
|
|
|
|
if st.video != nil {
|
|
_ = st.video.Close()
|
|
}
|
|
if st.audio != nil {
|
|
_ = st.audio.Close()
|
|
}
|
|
s.logger.WithField("id", id).Info().Log("WebRTC egress torn down for process")
|
|
}
|
|
|
|
// allocAdjacentPair finds a pair of free loopback UDP ports (V, V+1)
|
|
// and binds a Source to each. It retries up to allocAttempts times
|
|
// because the kernel's ephemeral picker may hand us a port whose +1
|
|
// neighbor is already taken. Caller owns the returned Sources; on
|
|
// error all partial allocations are cleaned up.
|
|
func (s *Subsystem) allocAdjacentPair(id string) (int, *corewebrtc.Source, *corewebrtc.Source, error) {
|
|
var lastErr error
|
|
for attempt := 0; attempt < allocAttempts; attempt++ {
|
|
port, err := Alloc()
|
|
if err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
|
|
videoSrc, err := corewebrtc.NewSourceOn(id, "127.0.0.1", port)
|
|
if err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
audioSrc, err := corewebrtc.NewSourceOn(id+":audio", "127.0.0.1", port+1)
|
|
if err != nil {
|
|
_ = videoSrc.Close()
|
|
lastErr = err
|
|
continue
|
|
}
|
|
return port, videoSrc, audioSrc, nil
|
|
}
|
|
if lastErr == nil {
|
|
lastErr = fmt.Errorf("unknown allocation failure")
|
|
}
|
|
return 0, nil, nil, fmt.Errorf("webrtc: allocate adjacent UDP port pair after %d attempts: %w", allocAttempts, lastErr)
|
|
}
|
|
|
|
// splitRTPLegs converts the flat BuildArgs output into two ConfigIO
|
|
// entries — one per RTP output leg. It splits on the second "-map"
|
|
// token, which marks the audio leg's start (see ffmpeg_args_test.go).
|
|
// The Address of each ConfigIO is the last argument (the udp:// URL);
|
|
// everything preceding it forms that output's Options.
|
|
func splitRTPLegs(args []string) []appcfg.ConfigIO {
|
|
// Find the two -map indices.
|
|
mapIdx := []int{}
|
|
for i, a := range args {
|
|
if a == "-map" {
|
|
mapIdx = append(mapIdx, i)
|
|
}
|
|
}
|
|
if len(mapIdx) != 2 {
|
|
// BuildArgs always emits exactly 2 -maps; a different count
|
|
// means an upstream bug. Return a single leg covering
|
|
// everything to avoid silent truncation.
|
|
return []appcfg.ConfigIO{toLeg(args)}
|
|
}
|
|
|
|
videoTokens := args[mapIdx[0]:mapIdx[1]]
|
|
audioTokens := args[mapIdx[1]:]
|
|
|
|
return []appcfg.ConfigIO{
|
|
toLeg(videoTokens),
|
|
toLeg(audioTokens),
|
|
}
|
|
}
|
|
|
|
// toLeg splits a contiguous RTP-output token slice into a ConfigIO:
|
|
// the trailing token is the udp:// Address; everything before is the
|
|
// Options slice.
|
|
func toLeg(tokens []string) appcfg.ConfigIO {
|
|
if len(tokens) == 0 {
|
|
return appcfg.ConfigIO{}
|
|
}
|
|
addr := tokens[len(tokens)-1]
|
|
opts := make([]string, len(tokens)-1)
|
|
copy(opts, tokens[:len(tokens)-1])
|
|
return appcfg.ConfigIO{
|
|
ID: "webrtc",
|
|
Address: addr,
|
|
Options: opts,
|
|
}
|
|
}
|