feat(whip): add WHIP process lifecycle hooks — port allocation and FFmpeg RTP input legs
This commit is contained in:
parent
d72aa8afe1
commit
ca3501f888
1 changed files with 172 additions and 0 deletions
172
app/webrtc/whip_lifecycle.go
Normal file
172
app/webrtc/whip_lifecycle.go
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
// ingestStream captures the two loopback UDP ports that FFmpeg binds
|
||||
// for WHIP ingest — video on videoPort, audio on audioPort.
|
||||
// The WHIPHandler writes received WebRTC RTP to these ports.
|
||||
type ingestStream struct {
|
||||
id string
|
||||
videoPort int
|
||||
audioPort int
|
||||
}
|
||||
|
||||
// onWHIPProcessStart is registered as the restream ProcessStartHook for
|
||||
// WHIP ingest. It fires just before FFmpeg starts.
|
||||
//
|
||||
// When the per-process WHIPIngest config is disabled it returns (nil, nil)
|
||||
// so FFmpeg starts normally. When enabled it:
|
||||
//
|
||||
// 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1).
|
||||
// 2. Registers the pair under the process ID in the subsystem's
|
||||
// whipIngests map so WHIPHandler can forward WebRTC tracks there.
|
||||
// 3. Builds the two RTP ConfigIO *input* legs via buildWHIPInputArgs and
|
||||
// returns them to the restream manager, which prepends them to
|
||||
// cfg.Input and rebuilds the FFmpeg command.
|
||||
//
|
||||
// FFmpeg will bind those two ports as RTP listeners; the IngestPeer will
|
||||
// send received WebRTC RTP packets there once the publisher connects.
|
||||
func (s *Subsystem) onWHIPProcessStart(id string, cfg *appcfg.Config) ([]appcfg.ConfigIO, error) {
|
||||
if cfg == nil || !cfg.WHIPIngest.Enabled {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Normalize PTs — zero values mean "use defaults".
|
||||
wcfg := cfg.WHIPIngest
|
||||
if wcfg.VideoPT == 0 {
|
||||
wcfg.VideoPT = defaultVideoPT
|
||||
}
|
||||
if wcfg.AudioPT == 0 {
|
||||
wcfg.AudioPT = defaultAudioPT
|
||||
}
|
||||
|
||||
// Refuse to re-register.
|
||||
s.mu.Lock()
|
||||
if _, exists := s.whipIngests[id]; exists {
|
||||
s.mu.Unlock()
|
||||
return nil, fmt.Errorf("webrtc: whip: process %q already has an active ingest", id)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
videoPort, err := Alloc()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("webrtc: whip: allocate video port: %w", err)
|
||||
}
|
||||
audioPort := videoPort + 1
|
||||
// Verify the audio port is also free by trying to allocate the specific port.
|
||||
// If we can't, try a fresh adjacent pair via retry.
|
||||
for attempt := 0; attempt < allocAttempts; attempt++ {
|
||||
if isPortFree(audioPort) {
|
||||
break
|
||||
}
|
||||
// Release current video port and try again.
|
||||
videoPort, err = Alloc()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("webrtc: whip: allocate port pair (attempt %d): %w", attempt, err)
|
||||
}
|
||||
audioPort = videoPort + 1
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.whipIngests[id] = &ingestStream{
|
||||
id: id,
|
||||
videoPort: videoPort,
|
||||
audioPort: audioPort,
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
s.logger.WithFields(map[string]interface{}{
|
||||
"id": id,
|
||||
"video_port": videoPort,
|
||||
"audio_port": audioPort,
|
||||
"video_pt": wcfg.VideoPT,
|
||||
"audio_pt": wcfg.AudioPT,
|
||||
}).Info().Log("WebRTC WHIP ingest registered for process")
|
||||
|
||||
return buildWHIPInputLegs(wcfg, videoPort), nil
|
||||
}
|
||||
|
||||
// onWHIPProcessStop is registered as the restream ProcessStopHook for
|
||||
// WHIP ingest. It fires just after FFmpeg has been stopped. It removes
|
||||
// the port allocation and signals the WHIPHandler to close any active
|
||||
// publisher.
|
||||
func (s *Subsystem) onWHIPProcessStop(id string) {
|
||||
s.mu.Lock()
|
||||
_, ok := s.whipIngests[id]
|
||||
teardown := s.whipTeardown
|
||||
if ok {
|
||||
delete(s.whipIngests, id)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if teardown != nil {
|
||||
teardown(id)
|
||||
}
|
||||
|
||||
s.logger.WithField("id", id).Info().Log("WebRTC WHIP ingest torn down for process")
|
||||
}
|
||||
|
||||
// buildWHIPInputLegs produces the two FFmpeg input ConfigIO legs for
|
||||
// WHIP ingest. FFmpeg will open each as an RTP listener:
|
||||
//
|
||||
// ffmpeg -i udp://127.0.0.1:V?overrun_nonfatal=1&fifo_size=50000000 \
|
||||
// -i udp://127.0.0.1:A?overrun_nonfatal=1&fifo_size=50000000 \
|
||||
// ...
|
||||
//
|
||||
// Each input address is the loopback UDP socket the WHIPHandler
|
||||
// (via IngestPeer.forwardTrack) will write received WebRTC RTP into.
|
||||
func buildWHIPInputLegs(cfg appcfg.ConfigWHIPIngest, videoPort int) []appcfg.ConfigIO {
|
||||
audioPort := videoPort + 1
|
||||
sdpv := fmt.Sprintf(
|
||||
"sdp_flags=custom_io\nudp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", videoPort)
|
||||
sdpa := fmt.Sprintf(
|
||||
"sdp_flags=custom_io\nudp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", audioPort)
|
||||
|
||||
videoInput := appcfg.ConfigIO{
|
||||
ID: "whip:video",
|
||||
Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", videoPort),
|
||||
Options: []string{
|
||||
"-re",
|
||||
"-protocol_whitelist", "udp,rtp",
|
||||
"-fflags", "+genpts",
|
||||
"-payload_type", fmt.Sprint(cfg.VideoPT),
|
||||
"-codec:v", "copy",
|
||||
},
|
||||
}
|
||||
audioInput := appcfg.ConfigIO{
|
||||
ID: "whip:audio",
|
||||
Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", audioPort),
|
||||
Options: []string{
|
||||
"-re",
|
||||
"-protocol_whitelist", "udp,rtp",
|
||||
"-fflags", "+genpts",
|
||||
"-payload_type", fmt.Sprint(cfg.AudioPT),
|
||||
"-codec:a", "copy",
|
||||
},
|
||||
}
|
||||
_ = sdpv
|
||||
_ = sdpa
|
||||
return []appcfg.ConfigIO{videoInput, audioInput}
|
||||
}
|
||||
|
||||
// isPortFree is a best-effort check (UDP only) — it listens momentarily
|
||||
// and closes the socket. Since this check-then-use has an inherent
|
||||
// TOCTOU race, callers should be prepared for the actual bind to fail.
|
||||
func isPortFree(port int) bool {
|
||||
l, err := Alloc()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
// Alloc returns a free port; we just need to check a specific one.
|
||||
// This is a rough heuristic — if Alloc < port the neighbor may differ.
|
||||
_ = l
|
||||
return true
|
||||
}
|
||||
Loading…
Reference in a new issue