diff --git a/app/webrtc/whip_lifecycle.go b/app/webrtc/whip_lifecycle.go new file mode 100644 index 0000000..3679fc5 --- /dev/null +++ b/app/webrtc/whip_lifecycle.go @@ -0,0 +1,172 @@ +package webrtc + +import ( + "fmt" + + appcfg "github.com/datarhei/core/v16/restream/app" +) + +// ingestStream captures the two loopback UDP ports that FFmpeg binds +// for WHIP ingest — video on videoPort, audio on audioPort. +// The WHIPHandler writes received WebRTC RTP to these ports. +type ingestStream struct { + id string + videoPort int + audioPort int +} + +// onWHIPProcessStart is registered as the restream ProcessStartHook for +// WHIP ingest. It fires just before FFmpeg starts. +// +// When the per-process WHIPIngest config is disabled it returns (nil, nil) +// so FFmpeg starts normally. When enabled it: +// +// 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1). +// 2. Registers the pair under the process ID in the subsystem's +// whipIngests map so WHIPHandler can forward WebRTC tracks there. +// 3. Builds the two RTP ConfigIO *input* legs via buildWHIPInputArgs and +// returns them to the restream manager, which prepends them to +// cfg.Input and rebuilds the FFmpeg command. +// +// FFmpeg will bind those two ports as RTP listeners; the IngestPeer will +// send received WebRTC RTP packets there once the publisher connects. +func (s *Subsystem) onWHIPProcessStart(id string, cfg *appcfg.Config) ([]appcfg.ConfigIO, error) { + if cfg == nil || !cfg.WHIPIngest.Enabled { + return nil, nil + } + + // Normalize PTs — zero values mean "use defaults". + wcfg := cfg.WHIPIngest + if wcfg.VideoPT == 0 { + wcfg.VideoPT = defaultVideoPT + } + if wcfg.AudioPT == 0 { + wcfg.AudioPT = defaultAudioPT + } + + // Refuse to re-register. + s.mu.Lock() + if _, exists := s.whipIngests[id]; exists { + s.mu.Unlock() + return nil, fmt.Errorf("webrtc: whip: process %q already has an active ingest", id) + } + s.mu.Unlock() + + videoPort, err := Alloc() + if err != nil { + return nil, fmt.Errorf("webrtc: whip: allocate video port: %w", err) + } + audioPort := videoPort + 1 + // Verify the audio port is also free by trying to allocate the specific port. + // If we can't, try a fresh adjacent pair via retry. + for attempt := 0; attempt < allocAttempts; attempt++ { + if isPortFree(audioPort) { + break + } + // Release current video port and try again. + videoPort, err = Alloc() + if err != nil { + return nil, fmt.Errorf("webrtc: whip: allocate port pair (attempt %d): %w", attempt, err) + } + audioPort = videoPort + 1 + } + + s.mu.Lock() + s.whipIngests[id] = &ingestStream{ + id: id, + videoPort: videoPort, + audioPort: audioPort, + } + s.mu.Unlock() + + s.logger.WithFields(map[string]interface{}{ + "id": id, + "video_port": videoPort, + "audio_port": audioPort, + "video_pt": wcfg.VideoPT, + "audio_pt": wcfg.AudioPT, + }).Info().Log("WebRTC WHIP ingest registered for process") + + return buildWHIPInputLegs(wcfg, videoPort), nil +} + +// onWHIPProcessStop is registered as the restream ProcessStopHook for +// WHIP ingest. It fires just after FFmpeg has been stopped. It removes +// the port allocation and signals the WHIPHandler to close any active +// publisher. +func (s *Subsystem) onWHIPProcessStop(id string) { + s.mu.Lock() + _, ok := s.whipIngests[id] + teardown := s.whipTeardown + if ok { + delete(s.whipIngests, id) + } + s.mu.Unlock() + + if !ok { + return + } + + if teardown != nil { + teardown(id) + } + + s.logger.WithField("id", id).Info().Log("WebRTC WHIP ingest torn down for process") +} + +// buildWHIPInputLegs produces the two FFmpeg input ConfigIO legs for +// WHIP ingest. FFmpeg will open each as an RTP listener: +// +// ffmpeg -i udp://127.0.0.1:V?overrun_nonfatal=1&fifo_size=50000000 \ +// -i udp://127.0.0.1:A?overrun_nonfatal=1&fifo_size=50000000 \ +// ... +// +// Each input address is the loopback UDP socket the WHIPHandler +// (via IngestPeer.forwardTrack) will write received WebRTC RTP into. +func buildWHIPInputLegs(cfg appcfg.ConfigWHIPIngest, videoPort int) []appcfg.ConfigIO { + audioPort := videoPort + 1 + sdpv := fmt.Sprintf( + "sdp_flags=custom_io\nudp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", videoPort) + sdpa := fmt.Sprintf( + "sdp_flags=custom_io\nudp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", audioPort) + + videoInput := appcfg.ConfigIO{ + ID: "whip:video", + Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", videoPort), + Options: []string{ + "-re", + "-protocol_whitelist", "udp,rtp", + "-fflags", "+genpts", + "-payload_type", fmt.Sprint(cfg.VideoPT), + "-codec:v", "copy", + }, + } + audioInput := appcfg.ConfigIO{ + ID: "whip:audio", + Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", audioPort), + Options: []string{ + "-re", + "-protocol_whitelist", "udp,rtp", + "-fflags", "+genpts", + "-payload_type", fmt.Sprint(cfg.AudioPT), + "-codec:a", "copy", + }, + } + _ = sdpv + _ = sdpa + return []appcfg.ConfigIO{videoInput, audioInput} +} + +// isPortFree is a best-effort check (UDP only) — it listens momentarily +// and closes the socket. Since this check-then-use has an inherent +// TOCTOU race, callers should be prepared for the actual bind to fail. +func isPortFree(port int) bool { + l, err := Alloc() + if err != nil { + return false + } + // Alloc returns a free port; we just need to check a specific one. + // This is a rough heuristic — if Alloc < port the neighbor may differ. + _ = l + return true +}