package webrtc import ( "fmt" appcfg "github.com/datarhei/core/v16/restream/app" ) // ingestStream captures the two loopback UDP ports that FFmpeg binds // for WHIP ingest — video on videoPort, audio on audioPort. // The WHIPHandler writes received WebRTC RTP to these ports. type ingestStream struct { id string videoPort int audioPort int } // onWHIPProcessStart is registered as the restream OnInputStart hook. // It fires just before FFmpeg starts, holding the restream write lock. // // When the per-process WHIPIngest config is disabled it returns (nil, nil) // so FFmpeg starts normally. When enabled it: // // 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1) // using the same retry strategy as the WHEP egress allocator. // 2. Registers the pair under the process ID in whipIngests so the // WHIPHandler can route incoming WebRTC RTP to them. // 3. Returns two RTP ConfigIO input legs that FFmpeg will open as UDP // listeners. The restream manager prepends them to cfg.Input and // rebuilds the FFmpeg command before Start(). func (s *Subsystem) onWHIPProcessStart(id string, cfg *appcfg.Config) ([]appcfg.ConfigIO, error) { if cfg == nil || !cfg.WHIPIngest.Enabled { return nil, nil } // Normalize PTs — zero values mean "use defaults". wcfg := cfg.WHIPIngest if wcfg.VideoPT == 0 { wcfg.VideoPT = defaultVideoPT } if wcfg.AudioPT == 0 { wcfg.AudioPT = defaultAudioPT } // Refuse to re-register. s.mu.Lock() if _, exists := s.whipIngests[id]; exists { s.mu.Unlock() return nil, fmt.Errorf("webrtc: whip: process %q already has an active ingest", id) } s.mu.Unlock() // Find an adjacent pair (V, V+1). The same retry logic used by // the WHEP egress allocator (allocAdjacentPair) except we only // need port numbers, not Source objects. videoPort, err := allocAdjacentPortPair() if err != nil { return nil, fmt.Errorf("webrtc: whip: allocate port pair for process %q: %w", id, err) } audioPort := videoPort + 1 s.mu.Lock() s.whipIngests[id] = &ingestStream{ id: id, videoPort: videoPort, audioPort: audioPort, } s.mu.Unlock() s.logger.WithFields(map[string]interface{}{ "id": id, "video_port": videoPort, "audio_port": audioPort, "video_pt": wcfg.VideoPT, "audio_pt": wcfg.AudioPT, }).Info().Log("WebRTC WHIP ingest registered for process") return buildWHIPInputLegs(wcfg, videoPort), nil } // onWHIPProcessStop is registered as the restream OnInputStop hook. // It fires just after FFmpeg has been stopped. It removes the port // allocation and signals the WHIPHandler to close any active publisher. func (s *Subsystem) onWHIPProcessStop(id string) { s.mu.Lock() _, ok := s.whipIngests[id] teardown := s.whipTeardown if ok { delete(s.whipIngests, id) } s.mu.Unlock() if !ok { return } if teardown != nil { teardown(id) } s.logger.WithField("id", id).Info().Log("WebRTC WHIP ingest torn down for process") } // allocAdjacentPortPair finds two consecutive free loopback UDP ports // (V, V+1) and returns V. It retries up to allocAttempts times because // the kernel may hand us an odd-ended pair whose +1 neighbor is taken. // // The caller owns the returned port numbers; FFmpeg will bind them // immediately on process start via the ConfigIO input legs. func allocAdjacentPortPair() (int, error) { var lastErr error for attempt := 0; attempt < allocAttempts; attempt++ { videoPort, err := Alloc() if err != nil { lastErr = err continue } // Verify the audio port (videoPort+1) is also free. // We do a quick bind-and-release on videoPort+1. audioPort := videoPort + 1 if audioPort > 65535 { lastErr = fmt.Errorf("video port %d would require audio port > 65535", videoPort) continue } // Try to claim videoPort+1 momentarily to verify availability. if err := checkPortFree(audioPort); err != nil { lastErr = err continue } return videoPort, nil } if lastErr == nil { lastErr = fmt.Errorf("unknown allocation failure") } return 0, fmt.Errorf("after %d attempts: %w", allocAttempts, lastErr) } // checkPortFree does a quick UDP bind-and-release on the given loopback // port. Returns nil if the port appears free, non-nil if it is in use. // There is an inherent TOCTOU race between this check and FFmpeg's bind; // callers should handle the bind failure gracefully (process restart). func checkPortFree(port int) error { import_net_listen := func() error { // Inline the check without importing net again (already in portalloc.go // via the package-level Alloc function). We re-use Alloc's pattern: // bind :port, get error, close. // // This file is in the same package so we can call the unexported helper. // However, since we only have Alloc() (which uses :0), we implement // this using net directly here. return nil } _ = import_net_listen // Use net directly since this package already imports it via portalloc.go. // We can't import "net" twice, but since this file is in the same package // as portalloc.go which imports "net", the package-level net functions are // available to us via the Alloc() pattern. We implement the check manually: return checkUDPPortFree(port) } // buildWHIPInputLegs produces the two FFmpeg input ConfigIO legs for // WHIP ingest. FFmpeg opens each as a UDP RTP listener: // // -i udp://127.0.0.1:V?overrun_nonfatal=1&fifo_size=50000000 // -i udp://127.0.0.1:A?overrun_nonfatal=1&fifo_size=50000000 // // The IngestPeer.forwardTrack goroutine writes received WebRTC RTP to // these ports once the WHIP publisher connects. func buildWHIPInputLegs(cfg appcfg.ConfigWHIPIngest, videoPort int) []appcfg.ConfigIO { audioPort := videoPort + 1 return []appcfg.ConfigIO{ { ID: "whip:video", Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", videoPort), Options: []string{ "-re", "-protocol_whitelist", "udp,rtp", "-fflags", "+genpts", "-payload_type", fmt.Sprint(cfg.VideoPT), "-codec:v", "copy", }, }, { ID: "whip:audio", Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", audioPort), Options: []string{ "-re", "-protocol_whitelist", "udp,rtp", "-fflags", "+genpts", "-payload_type", fmt.Sprint(cfg.AudioPT), "-codec:a", "copy", }, }, } }