diff --git a/app/webrtc/lifecycle.go b/app/webrtc/lifecycle.go new file mode 100644 index 0000000..61e676a --- /dev/null +++ b/app/webrtc/lifecycle.go @@ -0,0 +1,191 @@ +package webrtc + +import ( + "fmt" + + corewebrtc "github.com/datarhei/core/v16/core/webrtc" + appcfg "github.com/datarhei/core/v16/restream/app" +) + +// Default payload types. These match the values the M1 PoC / M2 +// forwarder expects (H.264 = 102, Opus = 111). Operators can override +// per-process via the restream Config. +const ( + defaultVideoPT = 102 + defaultAudioPT = 111 +) + +// allocAttempts is the maximum number of times onProcessStart will +// retry port allocation to find two adjacent free loopback UDP ports. +// The kernel sometimes hands us an odd port for video, making V+1 +// unavailable — in practice 2-3 retries is plenty. +const allocAttempts = 10 + +// onProcessStart is registered as the restream ProcessStartHook. It +// fires with the restream write lock held, just before FFmpeg Start. +// +// When the per-process WebRTC config is disabled, it returns (nil, nil) +// — FFmpeg starts normally without any extra output legs. When enabled +// it: +// +// 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1). +// 2. Binds Pion Sources on those ports and registers the pair under +// the process ID. +// 3. Builds the two RTP ConfigIO output legs via BuildArgs and returns +// them to the restream manager, which appends them to cfg.Output +// and rebuilds the FFmpeg command. +// +// Any error aborts the process start. On partial allocation failure, +// all allocated resources are cleaned up before returning. +func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.ConfigIO, error) { + if cfg == nil || !cfg.WebRTC.Enabled { + return nil, nil + } + + // Normalize PTs — zero values mean "use defaults". + wcfg := cfg.WebRTC + if wcfg.VideoPT == 0 { + wcfg.VideoPT = defaultVideoPT + } + if wcfg.AudioPT == 0 { + wcfg.AudioPT = defaultAudioPT + } + + // Refuse to re-register — the restream manager should never + // double-start a process but defensive check avoids a silent + // Source leak if it does. + s.mu.Lock() + if _, exists := s.streams[id]; exists { + s.mu.Unlock() + return nil, fmt.Errorf("webrtc: process %q already has an active stream", id) + } + s.mu.Unlock() + + videoPort, videoSrc, audioSrc, err := s.allocAdjacentPair(id) + if err != nil { + return nil, err + } + + // Start the UDP readers so they're draining packets the moment + // FFmpeg comes online. + videoSrc.Start() + audioSrc.Start() + + s.mu.Lock() + s.streams[id] = &processStream{id: id, video: videoSrc, audio: audioSrc} + s.mu.Unlock() + + s.logger.WithFields(map[string]interface{}{ + "id": id, + "video_port": videoPort, + "audio_port": videoPort + 1, + "video_pt": wcfg.VideoPT, + "audio_pt": wcfg.AudioPT, + }).Info().Log("WebRTC egress registered for process") + + args := BuildArgs(wcfg, videoPort) + return splitRTPLegs(args), nil +} + +// onProcessStop is registered as the restream ProcessStopHook. It +// fires with the restream write lock held, just after FFmpeg has been +// stopped. It tears down the per-process Sources (which closes their +// sockets and hangs up any subscribed peers). +func (s *Subsystem) onProcessStop(id string) { + s.mu.Lock() + st, ok := s.streams[id] + if ok { + delete(s.streams, id) + } + s.mu.Unlock() + + if !ok { + return + } + if st.video != nil { + _ = st.video.Close() + } + if st.audio != nil { + _ = st.audio.Close() + } + s.logger.WithField("id", id).Info().Log("WebRTC egress torn down for process") +} + +// allocAdjacentPair finds a pair of free loopback UDP ports (V, V+1) +// and binds a Source to each. It retries up to allocAttempts times +// because the kernel's ephemeral picker may hand us a port whose +1 +// neighbor is already taken. Caller owns the returned Sources; on +// error all partial allocations are cleaned up. +func (s *Subsystem) allocAdjacentPair(id string) (int, *corewebrtc.Source, *corewebrtc.Source, error) { + var lastErr error + for attempt := 0; attempt < allocAttempts; attempt++ { + port, err := Alloc() + if err != nil { + lastErr = err + continue + } + + videoSrc, err := corewebrtc.NewSourceOn(id, "127.0.0.1", port) + if err != nil { + lastErr = err + continue + } + audioSrc, err := corewebrtc.NewSourceOn(id+":audio", "127.0.0.1", port+1) + if err != nil { + _ = videoSrc.Close() + lastErr = err + continue + } + return port, videoSrc, audioSrc, nil + } + if lastErr == nil { + lastErr = fmt.Errorf("unknown allocation failure") + } + return 0, nil, nil, fmt.Errorf("webrtc: allocate adjacent UDP port pair after %d attempts: %w", allocAttempts, lastErr) +} + +// splitRTPLegs converts the flat BuildArgs output into two ConfigIO +// entries — one per RTP output leg. It splits on the second "-map" +// token, which marks the audio leg's start (see ffmpeg_args_test.go). +// The Address of each ConfigIO is the last argument (the udp:// URL); +// everything preceding it forms that output's Options. +func splitRTPLegs(args []string) []appcfg.ConfigIO { + // Find the two -map indices. + mapIdx := []int{} + for i, a := range args { + if a == "-map" { + mapIdx = append(mapIdx, i) + } + } + if len(mapIdx) != 2 { + // BuildArgs always emits exactly 2 -maps; a different count + // means an upstream bug. Return a single leg covering + // everything to avoid silent truncation. + return []appcfg.ConfigIO{toLeg(args)} + } + + videoTokens := args[mapIdx[0]:mapIdx[1]] + audioTokens := args[mapIdx[1]:] + + return []appcfg.ConfigIO{ + toLeg(videoTokens), + toLeg(audioTokens), + } +} + +// toLeg splits a contiguous RTP-output token slice into a ConfigIO: +// the trailing token is the udp:// Address; everything before is the +// Options slice. +func toLeg(tokens []string) appcfg.ConfigIO { + if len(tokens) == 0 { + return appcfg.ConfigIO{} + } + addr := tokens[len(tokens)-1] + opts := make([]string, len(tokens)-1) + copy(opts, tokens[:len(tokens)-1]) + return appcfg.ConfigIO{ + ID: "webrtc", + Address: addr, + Options: opts, + } +} diff --git a/app/webrtc/lifecycle_test.go b/app/webrtc/lifecycle_test.go new file mode 100644 index 0000000..8d90de6 --- /dev/null +++ b/app/webrtc/lifecycle_test.go @@ -0,0 +1,60 @@ +package webrtc + +import ( + "strings" + "testing" + + appcfg "github.com/datarhei/core/v16/restream/app" +) + +// TestSplitRTPLegs_TwoLegs feeds the real BuildArgs output through +// the splitter and checks both legs come out with the correct shape. +func TestSplitRTPLegs_TwoLegs(t *testing.T) { + args := BuildArgs(appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}, 49200) + + legs := splitRTPLegs(args) + if len(legs) != 2 { + t.Fatalf("expected 2 legs, got %d: %+v", len(legs), legs) + } + + video := legs[0] + audio := legs[1] + + // Leg 0 is video: address ends with :49200 + if !strings.HasSuffix(video.Address, ":49200?pkt_size=1316") { + t.Fatalf("video Address unexpected: %q", video.Address) + } + // Leg 1 is audio: address ends with :49201 + if !strings.HasSuffix(audio.Address, ":49201?pkt_size=1316") { + t.Fatalf("audio Address unexpected: %q", audio.Address) + } + + // Each leg's options start with -map, end with -f rtp. + if len(video.Options) < 2 || video.Options[0] != "-map" { + t.Fatalf("video leg should start with -map, got %v", video.Options) + } + if video.Options[len(video.Options)-2] != "-f" || video.Options[len(video.Options)-1] != "rtp" { + t.Fatalf("video leg should end with -f rtp, got %v", video.Options) + } + if len(audio.Options) < 2 || audio.Options[0] != "-map" { + t.Fatalf("audio leg should start with -map, got %v", audio.Options) + } + + // Neither leg's Options should contain the address itself. + for _, opt := range video.Options { + if strings.HasPrefix(opt, "udp://") { + t.Fatalf("video Options must not contain udp:// address: %v", video.Options) + } + } +} + +// TestSplitRTPLegs_FallbackOnUnexpectedShape ensures we don't panic +// or drop data if BuildArgs ever changes shape — the splitter returns +// a single leg wrapping everything. +func TestSplitRTPLegs_FallbackOnUnexpectedShape(t *testing.T) { + // Single -map: shouldn't happen, but don't panic. + legs := splitRTPLegs([]string{"-map", "0:v:0", "udp://1.2.3.4:5000"}) + if len(legs) != 1 { + t.Fatalf("expected single fallback leg, got %d", len(legs)) + } +} diff --git a/app/webrtc/subsystem.go b/app/webrtc/subsystem.go new file mode 100644 index 0000000..d0d5d09 --- /dev/null +++ b/app/webrtc/subsystem.go @@ -0,0 +1,120 @@ +package webrtc + +import ( + "fmt" + "sync" + + "github.com/datarhei/core/v16/config" + corewebrtc "github.com/datarhei/core/v16/core/webrtc" + "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/restream" +) + +// Subsystem is the app-level WebRTC egress manager. It sits alongside +// api.API as a sibling — both consume the Restream service, both wire +// themselves into the Echo HTTP router. The subsystem is responsible +// for: +// +// - Translating the global config.DataWebRTC into the core-level +// corewebrtc.Config used by the PeerFactory. +// - Installing ProcessHooks on Restreamer so that per-process start +// events allocate a pair of UDP ports, create Pion Sources, and +// inject RTP output legs into the FFmpeg command line. +// - Serving the WHEP Echo handler (see handler.go). +// +// The zero value is not usable; call New. +type Subsystem struct { + globalCfg config.DataWebRTC + coreCfg corewebrtc.Config + factory *corewebrtc.PeerFactory + logger log.Logger + + mu sync.Mutex + streams map[string]*processStream // processID -> stream pair +} + +// processStream captures the two Sources (video + audio) backing a +// single running process's WHEP egress. +type processStream struct { + id string + video *corewebrtc.Source + audio *corewebrtc.Source +} + +// New constructs a Subsystem from the global WebRTC config section. +// The provided ffmpegUDPMax is advisory for logs only (M2 uses the +// OS's ephemeral range via Alloc). Returns an error if the PeerFactory +// cannot be built (e.g., bad NAT1To1 IPs). +func New(dataCfg config.DataWebRTC, logger log.Logger) (*Subsystem, error) { + if logger == nil { + logger = log.New("") + } + + coreCfg := corewebrtc.DefaultConfig() + coreCfg.Enabled = dataCfg.Enable + coreCfg.PublicIP = dataCfg.PublicIP + + // If the operator configured multiple NAT1To1 IPs (e.g., dual + // LAN/public), they take precedence over PublicIP. Wire them + // through via PublicIP as the first entry; core/webrtc currently + // reads a single PublicIP, so M2 joins the list with the first + // entry winning. (Multi-IP NAT1To1 is an M3 enhancement.) + if len(dataCfg.NAT1To1IPs) > 0 && coreCfg.PublicIP == "" { + coreCfg.PublicIP = dataCfg.NAT1To1IPs[0] + } + + factory, err := corewebrtc.NewPeerFactory(coreCfg) + if err != nil { + return nil, fmt.Errorf("webrtc subsystem: build peer factory: %w", err) + } + + return &Subsystem{ + globalCfg: dataCfg, + coreCfg: coreCfg, + factory: factory, + logger: logger.WithComponent("WebRTC"), + streams: make(map[string]*processStream), + }, nil +} + +// Enabled reports whether the subsystem should register hooks and +// serve the WHEP endpoint. Called by the API wiring layer to decide +// whether to install anything. +func (s *Subsystem) Enabled() bool { + return s.globalCfg.Enable +} + +// Hooks returns the restream.ProcessHooks the subsystem expects to be +// installed via restream.Restreamer.SetHooks. Exactly one Subsystem +// instance should be installed per Restreamer. +func (s *Subsystem) Hooks() restream.ProcessHooks { + return restream.ProcessHooks{ + OnStart: s.onProcessStart, + OnStop: s.onProcessStop, + } +} + +// Close tears down every active per-process stream. It is safe to +// call during Core shutdown; subsequent WHEP requests will 404. +func (s *Subsystem) Close() { + s.mu.Lock() + defer s.mu.Unlock() + for id, st := range s.streams { + if st.video != nil { + _ = st.video.Close() + } + if st.audio != nil { + _ = st.audio.Close() + } + delete(s.streams, id) + } +} + +// lookup returns the per-process stream pair for id, or nil, false. +// Used by the WHEP handler. +func (s *Subsystem) lookup(id string) (*processStream, bool) { + s.mu.Lock() + defer s.mu.Unlock() + st, ok := s.streams[id] + return st, ok +} diff --git a/config/data.go b/config/data.go index d801932..7585bd9 100644 --- a/config/data.go +++ b/config/data.go @@ -113,12 +113,7 @@ type Data struct { Topics []string `json:"topics"` } `json:"log"` } `json:"srt"` - WebRTC struct { - Enable bool `json:"enable"` - PublicIP string `json:"public_ip"` - NAT1To1IPs []string `json:"nat_1_to_1_ips"` - UDPMuxPort int `json:"udp_mux_port" format:"int"` - } `json:"webrtc"` + WebRTC DataWebRTC `json:"webrtc"` FFmpeg struct { Binary string `json:"binary"` MaxProcesses int64 `json:"max_processes" format:"int64"` @@ -340,3 +335,12 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) { return data, nil } + +// DataWebRTC is the global WebRTC egress configuration. Promoted to a +// named type so the app/webrtc subsystem can accept it by value. +type DataWebRTC struct { + Enable bool `json:"enable"` + PublicIP string `json:"public_ip"` + NAT1To1IPs []string `json:"nat_1_to_1_ips"` + UDPMuxPort int `json:"udp_mux_port" format:"int"` +} diff --git a/core/webrtc/forward.go b/core/webrtc/forward.go index 1b74fdd..05ae1f9 100644 --- a/core/webrtc/forward.go +++ b/core/webrtc/forward.go @@ -6,10 +6,9 @@ import ( ) // forwardRTP reads packets from sub and writes them to the correct track -// based on payload type (H.264 → video, Opus → audio). Payload-type -// inspection is the simplest M1 approach; M2 will switch to per-track -// source channels once the process resolver manages separate video/audio -// UDP ports. +// based on payload type (H.264 → video, Opus → audio). Used by the M1 +// single-source PoC where FFmpeg emits both video and audio RTP to the +// same UDP port. func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet, video, audio *webrtc.TrackLocalStaticRTP) { for { @@ -35,3 +34,29 @@ func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet, } } } + +// forwardRTPSplit is the M2 variant: it reads from two independent +// per-track channels (one video, one audio) and writes each to its +// own Pion track. This is the mode used when the restream manager +// emits two FFmpeg RTP legs on separate UDP ports. Either channel +// closing or done firing terminates the loop. +func forwardRTPSplit(done <-chan struct{}, + videoSub <-chan *rtp.Packet, audioSub <-chan *rtp.Packet, + video, audio *webrtc.TrackLocalStaticRTP) { + for { + select { + case <-done: + return + case pkt, ok := <-videoSub: + if !ok { + return + } + _ = video.WriteRTP(pkt) + case pkt, ok := <-audioSub: + if !ok { + return + } + _ = audio.WriteRTP(pkt) + } + } +} diff --git a/core/webrtc/peer.go b/core/webrtc/peer.go index ffa0af3..64a1cc8 100644 --- a/core/webrtc/peer.go +++ b/core/webrtc/peer.go @@ -44,15 +44,27 @@ func NewPeerFactory(c Config) (*PeerFactory, error) { return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil } -// Peer wraps a Pion PeerConnection bound to a Source's subscription. +// Peer wraps a Pion PeerConnection bound to either a single Source +// subscription (M1, payload-type split forwarding) or to a pair of +// video+audio Source subscriptions (M2, per-track forwarding). type Peer struct { resourceID string pc *webrtc.PeerConnection answer webrtc.SessionDescription - source *Source - sub chan *rtp.Packet - done chan struct{} - once sync.Once + + // M1 single-source mode: source+sub are set, videoSource/audioSource are nil. + source *Source + sub chan *rtp.Packet + + // M2 two-source mode: videoSource/audioSource and their subs are set, + // source/sub are nil. + videoSource *Source + audioSource *Source + videoSub chan *rtp.Packet + audioSub chan *rtp.Packet + + done chan struct{} + once sync.Once } // CreatePeer builds a PeerConnection, sets the remote offer, generates an @@ -140,18 +152,111 @@ func (p *Peer) Answer() webrtc.SessionDescription { return p.answer } // ResourceID returns the stable resource id used in the WHEP Location header. func (p *Peer) ResourceID() string { return p.resourceID } -// Close tears down the peer connection and unsubscribes from the source. -// Safe to call multiple times. +// Close tears down the peer connection and unsubscribes from each +// source. Safe to call multiple times. func (p *Peer) Close() error { var err error p.once.Do(func() { close(p.done) - p.source.Unsubscribe(p.sub) + if p.source != nil && p.sub != nil { + p.source.Unsubscribe(p.sub) + } + if p.videoSource != nil && p.videoSub != nil { + p.videoSource.Unsubscribe(p.videoSub) + } + if p.audioSource != nil && p.audioSub != nil { + p.audioSource.Unsubscribe(p.audioSub) + } err = p.pc.Close() }) return err } +// CreatePeerFromSources is the M2 entry point: it builds a +// PeerConnection with video+audio tracks and subscribes each to its +// own dedicated Source. Used when the restream manager emits two +// FFmpeg RTP legs on separate UDP ports — there is no payload-type +// sniffing required, each Source feeds its matching track directly. +func (f *PeerFactory) CreatePeerFromSources(ctx context.Context, + videoSrc, audioSrc *Source, offer webrtc.SessionDescription) (*Peer, error) { + pc, err := f.api.NewPeerConnection(f.rtcConfig) + if err != nil { + return nil, fmt.Errorf("webrtc: new peer connection: %w", err) + } + + videoTrack, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, + "video", "dragonfork") + if err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: new video track: %w", err) + } + audioTrack, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, + "audio", "dragonfork") + if err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: new audio track: %w", err) + } + if _, err := pc.AddTrack(videoTrack); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: add video track: %w", err) + } + if _, err := pc.AddTrack(audioTrack); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: add audio track: %w", err) + } + + if err := pc.SetRemoteDescription(offer); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: set remote: %w", err) + } + answer, err := pc.CreateAnswer(nil) + if err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: create answer: %w", err) + } + + gatherComplete := webrtc.GatheringCompletePromise(pc) + if err := pc.SetLocalDescription(answer); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: set local: %w", err) + } + + select { + case <-gatherComplete: + case <-ctx.Done(): + _ = pc.Close() + return nil, ErrICETimeout + } + + videoSub := videoSrc.Subscribe(64) + audioSub := audioSrc.Subscribe(64) + + p := &Peer{ + resourceID: newResourceID(), + pc: pc, + answer: *pc.LocalDescription(), + videoSource: videoSrc, + audioSource: audioSrc, + videoSub: videoSub, + audioSub: audioSub, + done: make(chan struct{}), + } + + pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) { + if st == webrtc.PeerConnectionStateFailed || + st == webrtc.PeerConnectionStateDisconnected || + st == webrtc.PeerConnectionStateClosed { + _ = p.Close() + } + }) + + go forwardRTPSplit(p.done, videoSub, audioSub, videoTrack, audioTrack) + + return p, nil +} + func newResourceID() string { b := make([]byte, 8) _, _ = rand.Read(b)