package webrtc import ( "fmt" corewebrtc "github.com/datarhei/core/v16/core/webrtc" appcfg "github.com/datarhei/core/v16/restream/app" ) // Default payload types. These match the values the M1 PoC / M2 // forwarder expects (H.264 = 102, Opus = 111). Operators can override // per-process via the restream Config. const ( defaultVideoPT = 102 defaultAudioPT = 111 ) // allocAttempts is the maximum number of times onProcessStart will // retry port allocation to find two adjacent free loopback UDP ports. // The kernel sometimes hands us an odd port for video, making V+1 // unavailable — in practice 2-3 retries is plenty. const allocAttempts = 10 // onProcessStart is registered as the restream ProcessStartHook. It // fires with the restream write lock held, just before FFmpeg Start. // // When the per-process WebRTC config is disabled, it returns (nil, nil) // — FFmpeg starts normally without any extra output legs. When enabled // it: // // 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1). // 2. Binds Pion Sources on those ports and registers the pair under // the process ID. // 3. Builds the two RTP ConfigIO output legs via BuildArgs and returns // them to the restream manager, which appends them to cfg.Output // and rebuilds the FFmpeg command. // // Any error aborts the process start. On partial allocation failure, // all allocated resources are cleaned up before returning. func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.ConfigIO, error) { if cfg == nil || !cfg.WebRTC.Enabled { return nil, nil } // Normalize PTs — zero values mean "use defaults". wcfg := cfg.WebRTC if wcfg.VideoPT == 0 { wcfg.VideoPT = defaultVideoPT } if wcfg.AudioPT == 0 { wcfg.AudioPT = defaultAudioPT } // Refuse to re-register — the restream manager should never // double-start a process but defensive check avoids a silent // Source leak if it does. s.mu.Lock() if _, exists := s.streams[id]; exists { s.mu.Unlock() return nil, fmt.Errorf("webrtc: process %q already has an active stream", id) } s.mu.Unlock() videoPort, videoSrc, audioSrc, err := s.allocAdjacentPair(id) if err != nil { return nil, err } // Start the UDP readers so they're draining packets the moment // FFmpeg comes online. videoSrc.Start() audioSrc.Start() s.mu.Lock() s.streams[id] = &processStream{id: id, video: videoSrc, audio: audioSrc} s.mu.Unlock() s.logger.WithFields(map[string]interface{}{ "id": id, "video_port": videoPort, "audio_port": videoPort + 1, "video_pt": wcfg.VideoPT, "audio_pt": wcfg.AudioPT, }).Info().Log("WebRTC egress registered for process") args := BuildArgs(wcfg, videoPort) return splitRTPLegs(args), nil } // onProcessStop is registered as the restream ProcessStopHook. It // fires with the restream write lock held, just after FFmpeg has been // stopped. It tears down the per-process Sources (which closes their // sockets and hangs up any subscribed peers). func (s *Subsystem) onProcessStop(id string) { s.mu.Lock() st, ok := s.streams[id] teardown := s.teardown if ok { delete(s.streams, id) } s.mu.Unlock() if !ok { return } // Broadcast first, so any subscribed peers get torn down while // the streamID is still meaningful. The handler's tearDownStreamPeers // drives each Peer.Close() which in turn unsubscribes from the // Sources we're about to shut down — preventing a "subscribers fan // out into a closed channel" race. if teardown != nil { teardown(id) } if st.video != nil { _ = st.video.Close() } if st.audio != nil { _ = st.audio.Close() } s.logger.WithField("id", id).Info().Log("WebRTC egress torn down for process") } // allocAdjacentPair finds a pair of free loopback UDP ports (V, V+1) // and binds a Source to each. It retries up to allocAttempts times // because the kernel's ephemeral picker may hand us a port whose +1 // neighbor is already taken. Caller owns the returned Sources; on // error all partial allocations are cleaned up. func (s *Subsystem) allocAdjacentPair(id string) (int, *corewebrtc.Source, *corewebrtc.Source, error) { var lastErr error for attempt := 0; attempt < allocAttempts; attempt++ { port, err := Alloc() if err != nil { lastErr = err continue } videoSrc, err := corewebrtc.NewSourceOn(id, "127.0.0.1", port) if err != nil { lastErr = err continue } audioSrc, err := corewebrtc.NewSourceOn(id+":audio", "127.0.0.1", port+1) if err != nil { _ = videoSrc.Close() lastErr = err continue } return port, videoSrc, audioSrc, nil } if lastErr == nil { lastErr = fmt.Errorf("unknown allocation failure") } return 0, nil, nil, fmt.Errorf("webrtc: allocate adjacent UDP port pair after %d attempts: %w", allocAttempts, lastErr) } // splitRTPLegs converts the flat BuildArgs output into two ConfigIO // entries — one per RTP output leg. It splits on the second "-map" // token, which marks the audio leg's start (see ffmpeg_args_test.go). // The Address of each ConfigIO is the last argument (the udp:// URL); // everything preceding it forms that output's Options. func splitRTPLegs(args []string) []appcfg.ConfigIO { // Find the two -map indices. mapIdx := []int{} for i, a := range args { if a == "-map" { mapIdx = append(mapIdx, i) } } if len(mapIdx) != 2 { // BuildArgs always emits exactly 2 -maps; a different count // means an upstream bug. Return a single leg covering // everything to avoid silent truncation. return []appcfg.ConfigIO{toLeg(args)} } videoTokens := args[mapIdx[0]:mapIdx[1]] audioTokens := args[mapIdx[1]:] return []appcfg.ConfigIO{ toLeg(videoTokens), toLeg(audioTokens), } } // toLeg splits a contiguous RTP-output token slice into a ConfigIO: // the trailing token is the udp:// Address; everything before is the // Options slice. func toLeg(tokens []string) appcfg.ConfigIO { if len(tokens) == 0 { return appcfg.ConfigIO{} } addr := tokens[len(tokens)-1] opts := make([]string, len(tokens)-1) copy(opts, tokens[:len(tokens)-1]) return appcfg.ConfigIO{ ID: "webrtc", Address: addr, Options: opts, } }