From b1057756d28fd108078df9d6a2ef6aeb49302081 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Sat, 9 May 2026 16:33:35 -0400 Subject: [PATCH] =?UTF-8?q?fix(whip):=20rewrite=20lifecycle=20hooks=20?= =?UTF-8?q?=E2=80=94=20correct=20port=20allocation,=20clean=20FFmpeg=20RTP?= =?UTF-8?q?=20input=20legs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/webrtc/whip_lifecycle.go | 179 ++++++++++++++++++++--------------- 1 file changed, 101 insertions(+), 78 deletions(-) diff --git a/app/webrtc/whip_lifecycle.go b/app/webrtc/whip_lifecycle.go index 3679fc5..a7c57eb 100644 --- a/app/webrtc/whip_lifecycle.go +++ b/app/webrtc/whip_lifecycle.go @@ -15,21 +15,19 @@ type ingestStream struct { audioPort int } -// onWHIPProcessStart is registered as the restream ProcessStartHook for -// WHIP ingest. It fires just before FFmpeg starts. +// onWHIPProcessStart is registered as the restream OnInputStart hook. +// It fires just before FFmpeg starts, holding the restream write lock. // // When the per-process WHIPIngest config is disabled it returns (nil, nil) // so FFmpeg starts normally. When enabled it: // -// 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1). -// 2. Registers the pair under the process ID in the subsystem's -// whipIngests map so WHIPHandler can forward WebRTC tracks there. -// 3. Builds the two RTP ConfigIO *input* legs via buildWHIPInputArgs and -// returns them to the restream manager, which prepends them to -// cfg.Input and rebuilds the FFmpeg command. -// -// FFmpeg will bind those two ports as RTP listeners; the IngestPeer will -// send received WebRTC RTP packets there once the publisher connects. +// 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1) +// using the same retry strategy as the WHEP egress allocator. +// 2. Registers the pair under the process ID in whipIngests so the +// WHIPHandler can route incoming WebRTC RTP to them. +// 3. Returns two RTP ConfigIO input legs that FFmpeg will open as UDP +// listeners. The restream manager prepends them to cfg.Input and +// rebuilds the FFmpeg command before Start(). func (s *Subsystem) onWHIPProcessStart(id string, cfg *appcfg.Config) ([]appcfg.ConfigIO, error) { if cfg == nil || !cfg.WHIPIngest.Enabled { return nil, nil @@ -52,24 +50,14 @@ func (s *Subsystem) onWHIPProcessStart(id string, cfg *appcfg.Config) ([]appcfg. } s.mu.Unlock() - videoPort, err := Alloc() + // Find an adjacent pair (V, V+1). The same retry logic used by + // the WHEP egress allocator (allocAdjacentPair) except we only + // need port numbers, not Source objects. + videoPort, err := allocAdjacentPortPair() if err != nil { - return nil, fmt.Errorf("webrtc: whip: allocate video port: %w", err) + return nil, fmt.Errorf("webrtc: whip: allocate port pair for process %q: %w", id, err) } audioPort := videoPort + 1 - // Verify the audio port is also free by trying to allocate the specific port. - // If we can't, try a fresh adjacent pair via retry. - for attempt := 0; attempt < allocAttempts; attempt++ { - if isPortFree(audioPort) { - break - } - // Release current video port and try again. - videoPort, err = Alloc() - if err != nil { - return nil, fmt.Errorf("webrtc: whip: allocate port pair (attempt %d): %w", attempt, err) - } - audioPort = videoPort + 1 - } s.mu.Lock() s.whipIngests[id] = &ingestStream{ @@ -90,10 +78,9 @@ func (s *Subsystem) onWHIPProcessStart(id string, cfg *appcfg.Config) ([]appcfg. return buildWHIPInputLegs(wcfg, videoPort), nil } -// onWHIPProcessStop is registered as the restream ProcessStopHook for -// WHIP ingest. It fires just after FFmpeg has been stopped. It removes -// the port allocation and signals the WHIPHandler to close any active -// publisher. +// onWHIPProcessStop is registered as the restream OnInputStop hook. +// It fires just after FFmpeg has been stopped. It removes the port +// allocation and signals the WHIPHandler to close any active publisher. func (s *Subsystem) onWHIPProcessStop(id string) { s.mu.Lock() _, ok := s.whipIngests[id] @@ -114,59 +101,95 @@ func (s *Subsystem) onWHIPProcessStop(id string) { s.logger.WithField("id", id).Info().Log("WebRTC WHIP ingest torn down for process") } +// allocAdjacentPortPair finds two consecutive free loopback UDP ports +// (V, V+1) and returns V. It retries up to allocAttempts times because +// the kernel may hand us an odd-ended pair whose +1 neighbor is taken. +// +// The caller owns the returned port numbers; FFmpeg will bind them +// immediately on process start via the ConfigIO input legs. +func allocAdjacentPortPair() (int, error) { + var lastErr error + for attempt := 0; attempt < allocAttempts; attempt++ { + videoPort, err := Alloc() + if err != nil { + lastErr = err + continue + } + // Verify the audio port (videoPort+1) is also free. + // We do a quick bind-and-release on videoPort+1. + audioPort := videoPort + 1 + if audioPort > 65535 { + lastErr = fmt.Errorf("video port %d would require audio port > 65535", videoPort) + continue + } + // Try to claim videoPort+1 momentarily to verify availability. + if err := checkPortFree(audioPort); err != nil { + lastErr = err + continue + } + return videoPort, nil + } + if lastErr == nil { + lastErr = fmt.Errorf("unknown allocation failure") + } + return 0, fmt.Errorf("after %d attempts: %w", allocAttempts, lastErr) +} + +// checkPortFree does a quick UDP bind-and-release on the given loopback +// port. Returns nil if the port appears free, non-nil if it is in use. +// There is an inherent TOCTOU race between this check and FFmpeg's bind; +// callers should handle the bind failure gracefully (process restart). +func checkPortFree(port int) error { + import_net_listen := func() error { + // Inline the check without importing net again (already in portalloc.go + // via the package-level Alloc function). We re-use Alloc's pattern: + // bind :port, get error, close. + // + // This file is in the same package so we can call the unexported helper. + // However, since we only have Alloc() (which uses :0), we implement + // this using net directly here. + return nil + } + _ = import_net_listen + // Use net directly since this package already imports it via portalloc.go. + // We can't import "net" twice, but since this file is in the same package + // as portalloc.go which imports "net", the package-level net functions are + // available to us via the Alloc() pattern. We implement the check manually: + return checkUDPPortFree(port) +} + // buildWHIPInputLegs produces the two FFmpeg input ConfigIO legs for -// WHIP ingest. FFmpeg will open each as an RTP listener: +// WHIP ingest. FFmpeg opens each as a UDP RTP listener: // -// ffmpeg -i udp://127.0.0.1:V?overrun_nonfatal=1&fifo_size=50000000 \ -// -i udp://127.0.0.1:A?overrun_nonfatal=1&fifo_size=50000000 \ -// ... +// -i udp://127.0.0.1:V?overrun_nonfatal=1&fifo_size=50000000 +// -i udp://127.0.0.1:A?overrun_nonfatal=1&fifo_size=50000000 // -// Each input address is the loopback UDP socket the WHIPHandler -// (via IngestPeer.forwardTrack) will write received WebRTC RTP into. +// The IngestPeer.forwardTrack goroutine writes received WebRTC RTP to +// these ports once the WHIP publisher connects. func buildWHIPInputLegs(cfg appcfg.ConfigWHIPIngest, videoPort int) []appcfg.ConfigIO { audioPort := videoPort + 1 - sdpv := fmt.Sprintf( - "sdp_flags=custom_io\nudp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", videoPort) - sdpa := fmt.Sprintf( - "sdp_flags=custom_io\nudp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", audioPort) - - videoInput := appcfg.ConfigIO{ - ID: "whip:video", - Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", videoPort), - Options: []string{ - "-re", - "-protocol_whitelist", "udp,rtp", - "-fflags", "+genpts", - "-payload_type", fmt.Sprint(cfg.VideoPT), - "-codec:v", "copy", + return []appcfg.ConfigIO{ + { + ID: "whip:video", + Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", videoPort), + Options: []string{ + "-re", + "-protocol_whitelist", "udp,rtp", + "-fflags", "+genpts", + "-payload_type", fmt.Sprint(cfg.VideoPT), + "-codec:v", "copy", + }, + }, + { + ID: "whip:audio", + Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", audioPort), + Options: []string{ + "-re", + "-protocol_whitelist", "udp,rtp", + "-fflags", "+genpts", + "-payload_type", fmt.Sprint(cfg.AudioPT), + "-codec:a", "copy", + }, }, } - audioInput := appcfg.ConfigIO{ - ID: "whip:audio", - Address: fmt.Sprintf("udp://127.0.0.1:%d?overrun_nonfatal=1&fifo_size=50000000", audioPort), - Options: []string{ - "-re", - "-protocol_whitelist", "udp,rtp", - "-fflags", "+genpts", - "-payload_type", fmt.Sprint(cfg.AudioPT), - "-codec:a", "copy", - }, - } - _ = sdpv - _ = sdpa - return []appcfg.ConfigIO{videoInput, audioInput} -} - -// isPortFree is a best-effort check (UDP only) — it listens momentarily -// and closes the socket. Since this check-then-use has an inherent -// TOCTOU race, callers should be prepared for the actual bind to fail. -func isPortFree(port int) bool { - l, err := Alloc() - if err != nil { - return false - } - // Alloc returns a free port; we just need to check a specific one. - // This is a rough heuristic — if Alloc < port the neighbor may differ. - _ = l - return true }