From 16ae17d2a18029d1e36da134007e8f3ed4769140 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Fri, 17 Apr 2026 09:52:09 -0400 Subject: [PATCH] feat(app/webrtc): port allocator + FFmpeg arg builder Adds Alloc(), the ephemeral loopback UDP port grabber the subsystem uses to pick the RTP port it will hand to FFmpeg and then re-bind with core/webrtc.NewSourceOn. Covered by a 100x rebind test. Adds BuildArgs(), which emits the -f rtp output fragments (video on the passed port, audio on port+1) with copy codecs by default and an H.264 baseline / libopus re-encode leg when ForceTranscode is set. Covered by three unit tests. --- app/webrtc/ffmpeg_args.go | 52 ++++++++++++++++++++ app/webrtc/ffmpeg_args_test.go | 89 ++++++++++++++++++++++++++++++++++ app/webrtc/portalloc.go | 31 ++++++++++++ app/webrtc/portalloc_test.go | 43 ++++++++++++++++ 4 files changed, 215 insertions(+) create mode 100644 app/webrtc/ffmpeg_args.go create mode 100644 app/webrtc/ffmpeg_args_test.go create mode 100644 app/webrtc/portalloc.go create mode 100644 app/webrtc/portalloc_test.go diff --git a/app/webrtc/ffmpeg_args.go b/app/webrtc/ffmpeg_args.go new file mode 100644 index 0000000..14f8e6e --- /dev/null +++ b/app/webrtc/ffmpeg_args.go @@ -0,0 +1,52 @@ +package webrtc + +import ( + "fmt" + + appcfg "github.com/datarhei/core/v16/restream/app" +) + +// BuildArgs emits the FFmpeg output-leg args for the WebRTC side of a +// process. It produces two separate "outputs" — one for video on +// videoPort, one for audio on videoPort+1. Each output ends with its +// UDP address so the slice is structured for consumption by +// restream.AppendOutput after splitting on the track boundary. +// +// Copy vs. re-encode: if ForceTranscode is false, we assume the upstream +// source is already H.264 + Opus and pass them through (copy). When the +// source doesn't match, FFmpeg will fail at runtime and the process will +// restart — the user can flip ForceTranscode on to get a baseline-profile +// H.264 + Opus re-encode. +func BuildArgs(cfg appcfg.ConfigWebRTC, videoPort int) []string { + vcopy := []string{"-c:v", "copy"} + acopy := []string{"-c:a", "copy"} + if cfg.ForceTranscode { + vcopy = []string{ + "-c:v", "libx264", + "-preset", "veryfast", + "-profile:v", "baseline", + "-pix_fmt", "yuv420p", + "-tune", "zerolatency", + "-g", "60", + } + acopy = []string{"-c:a", "libopus", "-b:a", "96k"} + } + + args := []string{"-map", "0:v:0"} + args = append(args, vcopy...) + args = append(args, + "-payload_type", fmt.Sprint(cfg.VideoPT), + "-f", "rtp", + fmt.Sprintf("udp://127.0.0.1:%d?pkt_size=1316", videoPort), + ) + + args = append(args, "-map", "0:a:0") + args = append(args, acopy...) + args = append(args, + "-payload_type", fmt.Sprint(cfg.AudioPT), + "-f", "rtp", + fmt.Sprintf("udp://127.0.0.1:%d?pkt_size=1316", videoPort+1), + ) + + return args +} diff --git a/app/webrtc/ffmpeg_args_test.go b/app/webrtc/ffmpeg_args_test.go new file mode 100644 index 0000000..7e1cb52 --- /dev/null +++ b/app/webrtc/ffmpeg_args_test.go @@ -0,0 +1,89 @@ +package webrtc + +import ( + "strings" + "testing" + + appcfg "github.com/datarhei/core/v16/restream/app" +) + +func TestBuildArgs_CopyCodecs(t *testing.T) { + cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111} + got := BuildArgs(cfg, 49200) + + // Must contain -c:v copy and -c:a copy when ForceTranscode is false. + if !contains(got, "-c:v", "copy") { + t.Fatalf("expected -c:v copy, got %v", got) + } + if !contains(got, "-c:a", "copy") { + t.Fatalf("expected -c:a copy, got %v", got) + } + + // Two UDP addresses, one per track, with port+1 for audio. + if !any(got, "udp://127.0.0.1:49200?") { + t.Fatalf("expected video udp on 49200, got %v", got) + } + if !any(got, "udp://127.0.0.1:49201?") { + t.Fatalf("expected audio udp on 49201, got %v", got) + } + + // Payload types must be stringified. + if !contains(got, "-payload_type", "102") { + t.Fatalf("expected video PT 102, got %v", got) + } + if !contains(got, "-payload_type", "111") { + t.Fatalf("expected audio PT 111, got %v", got) + } +} + +func TestBuildArgs_ForceTranscode(t *testing.T) { + cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111, ForceTranscode: true} + got := BuildArgs(cfg, 49200) + + if !contains(got, "-c:v", "libx264") { + t.Fatalf("expected -c:v libx264, got %v", got) + } + if !contains(got, "-profile:v", "baseline") { + t.Fatalf("expected baseline profile, got %v", got) + } + if !contains(got, "-c:a", "libopus") { + t.Fatalf("expected -c:a libopus, got %v", got) + } +} + +func TestBuildArgs_TwoTrackBoundary(t *testing.T) { + cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111} + got := BuildArgs(cfg, 49200) + + // The second `-map` marks the start of the audio leg — the split + // point restream.AppendOutput callers use. + mapCount := 0 + for _, a := range got { + if a == "-map" { + mapCount++ + } + } + if mapCount != 2 { + t.Fatalf("expected exactly 2 -map tokens, got %d in %v", mapCount, got) + } +} + +// contains reports whether the two-token sequence appears consecutively. +func contains(haystack []string, a, b string) bool { + for i := 0; i+1 < len(haystack); i++ { + if haystack[i] == a && haystack[i+1] == b { + return true + } + } + return false +} + +// any reports whether any element of haystack starts with prefix. +func any(haystack []string, prefix string) bool { + for _, h := range haystack { + if strings.HasPrefix(h, prefix) { + return true + } + } + return false +} diff --git a/app/webrtc/portalloc.go b/app/webrtc/portalloc.go new file mode 100644 index 0000000..9dd1d70 --- /dev/null +++ b/app/webrtc/portalloc.go @@ -0,0 +1,31 @@ +// Package webrtc is the datarhei Core subsystem that turns WebRTC into +// a first-class output alongside RTMP, SRT, and HLS. It owns the WHEP +// HTTP handler, wires FFmpeg's RTP output into per-process Pion +// Sources, and tracks active peer connections. +// +// See docs/design/2026-04-17-datarhei-dragon-fork-m2-webrtc-core-integration.md +// for the full design. +package webrtc + +import ( + "fmt" + "net" +) + +// Alloc binds :0 on loopback UDPv4, records the port the kernel assigned, +// closes the socket, and returns the port number. +// +// The caller is expected to re-bind that exact port via +// core/webrtc.NewSourceOn immediately. There is a microsecond-sized race +// window where another process on the host could grab the port; if that +// happens, the caller's rebind will fail and the error should be +// propagated. In practice this is rare enough that a retry loop would be +// unnecessary churn. +func Alloc() (int, error) { + c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}) + if err != nil { + return 0, fmt.Errorf("webrtc: portalloc: %w", err) + } + defer c.Close() + return c.LocalAddr().(*net.UDPAddr).Port, nil +} diff --git a/app/webrtc/portalloc_test.go b/app/webrtc/portalloc_test.go new file mode 100644 index 0000000..e8d0f9f --- /dev/null +++ b/app/webrtc/portalloc_test.go @@ -0,0 +1,43 @@ +package webrtc + +import ( + "net" + "testing" +) + +// TestAlloc_ReturnsRebindablePort exercises the alloc/close/rebind +// sequence 100 times. If a fast rebind race existed in normal +// conditions, this would surface it. +func TestAlloc_ReturnsRebindablePort(t *testing.T) { + for i := 0; i < 100; i++ { + p, err := Alloc() + if err != nil { + t.Fatalf("iter %d: Alloc: %v", i, err) + } + if p == 0 { + t.Fatalf("iter %d: expected non-zero port", i) + } + c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: p}) + if err != nil { + t.Fatalf("iter %d: rebind port %d: %v", i, p, err) + } + c.Close() + } +} + +// TestAlloc_DistinctPorts confirms the OS doesn't hand us the same +// ephemeral port twice in quick succession (it shouldn't — the socket +// is briefly held in the bound state on close). +func TestAlloc_DistinctPorts(t *testing.T) { + seen := map[int]bool{} + for i := 0; i < 10; i++ { + p, err := Alloc() + if err != nil { + t.Fatal(err) + } + if seen[p] { + t.Fatalf("duplicate port %d", p) + } + seen[p] = true + } +}