package webrtc import ( "fmt" "net" appcfg "github.com/datarhei/core/v16/restream/app" ) // ingestStream captures the two loopback UDP ports that FFmpeg binds // for WHIP ingest — video on videoPort, audio on audioPort. // The WHIPHandler writes received WebRTC RTP to these ports. type ingestStream struct { id string videoPort int audioPort int } // onWHIPProcessStart is registered as the restream OnInputStart hook. // It fires just before FFmpeg starts, holding the restream write lock. // // When the per-process WHIPIngest config is disabled it returns (nil, nil) // so FFmpeg starts normally. When enabled it: // // 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1) // using the same retry strategy as the WHEP egress allocator. // 2. Registers the pair under the process ID in whipIngests so the // WHIPHandler can route incoming WebRTC RTP to them. // 3. Returns two RTP ConfigIO input legs that FFmpeg will open as UDP // listeners. The restream manager prepends them to cfg.Input and // rebuilds the FFmpeg command before Start(). func (s *Subsystem) onWHIPProcessStart(id string, cfg *appcfg.Config) ([]appcfg.ConfigIO, error) { if cfg == nil || !cfg.WHIPIngest.Enabled { return nil, nil } // Normalize PTs — zero values mean "use defaults". wcfg := cfg.WHIPIngest if wcfg.VideoPT == 0 { wcfg.VideoPT = defaultVideoPT } if wcfg.AudioPT == 0 { wcfg.AudioPT = defaultAudioPT } // Refuse to re-register. s.mu.Lock() if _, exists := s.whipIngests[id]; exists { s.mu.Unlock() return nil, fmt.Errorf("webrtc: whip: process %q already has an active ingest", id) } s.mu.Unlock() // Find an adjacent pair (V, V+1). The same retry logic used by // the WHEP egress allocator (allocAdjacentPair) except we only // need port numbers, not Source objects. videoPort, err := allocAdjacentPortPair() if err != nil { return nil, fmt.Errorf("webrtc: whip: allocate port pair for process %q: %w", id, err) } audioPort := videoPort + 1 s.mu.Lock() s.whipIngests[id] = &ingestStream{ id: id, videoPort: videoPort, audioPort: audioPort, } s.mu.Unlock() s.logger.WithFields(map[string]interface{}{ "id": id, "video_port": videoPort, "audio_port": audioPort, "video_pt": wcfg.VideoPT, "audio_pt": wcfg.AudioPT, }).Info().Log("WebRTC WHIP ingest registered for process") return buildWHIPInputLegs(wcfg, videoPort), nil } // onWHIPProcessStop is registered as the restream OnInputStop hook. // It fires just after FFmpeg has been stopped. It removes the port // allocation and signals the WHIPHandler to close any active publisher. func (s *Subsystem) onWHIPProcessStop(id string) { s.mu.Lock() _, ok := s.whipIngests[id] teardown := s.whipTeardown if ok { delete(s.whipIngests, id) } s.mu.Unlock() if !ok { return } if teardown != nil { teardown(id) } s.logger.WithField("id", id).Info().Log("WebRTC WHIP ingest torn down for process") } // allocAdjacentPortPair finds two consecutive free loopback UDP ports // (V, V+1) and returns V. It retries up to allocAttempts times because // the kernel may hand us a port whose +1 neighbor is already taken. // // The caller owns the returned port numbers; FFmpeg will bind them // immediately on process start via the ConfigIO input legs. func allocAdjacentPortPair() (int, error) { var lastErr error for attempt := 0; attempt < allocAttempts; attempt++ { videoPort, err := Alloc() if err != nil { lastErr = err continue } audioPort := videoPort + 1 if audioPort > 65535 { lastErr = fmt.Errorf("video port %d would push audio port above 65535", videoPort) continue } // Verify the audio port (videoPort+1) is also free by attempting // a momentary bind. TOCTOU race is accepted; FFmpeg will fail-fast // on the actual bind and the process will restart cleanly. if err := checkPortFree(audioPort); err != nil { lastErr = err continue } return videoPort, nil } if lastErr == nil { lastErr = fmt.Errorf("unknown allocation failure") } return 0, fmt.Errorf("after %d attempts: %w", allocAttempts, lastErr) } // checkPortFree attempts a momentary UDP bind on the given loopback // port. Returns nil if the port appears available, non-nil otherwise. func checkPortFree(port int) error { c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: port}) if err != nil { return fmt.Errorf("port %d not free: %w", port, err) } _ = c.Close() return nil } // buildWHIPInputLegs produces the two FFmpeg input ConfigIO legs for // WHIP ingest. FFmpeg opens each as a UDP RTP listener: // // -i udp://127.0.0.1:V?overrun_nonfatal=1&fifo_size=50000000 // -i udp://127.0.0.1:A?overrun_nonfatal=1&fifo_size=50000000 // // The IngestPeer.forwardTrack goroutine writes received WebRTC RTP to // these ports once the WHIP publisher connects. func buildWHIPInputLegs(cfg appcfg.ConfigWHIPIngest, videoPort int) []appcfg.ConfigIO { audioPort := videoPort + 1 return []appcfg.ConfigIO{ { ID: "whip:video", Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", videoPort), Options: []string{ "-re", "-protocol_whitelist", "udp,rtp", "-fflags", "+genpts", "-payload_type", fmt.Sprint(cfg.VideoPT), "-codec:v", "copy", }, }, { ID: "whip:audio", Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", audioPort), Options: []string{ "-re", "-protocol_whitelist", "udp,rtp", "-fflags", "+genpts", "-payload_type", fmt.Sprint(cfg.AudioPT), "-codec:a", "copy", }, }, } }