feat(app/webrtc): port allocator + FFmpeg arg builder

Adds Alloc(), the ephemeral loopback UDP port grabber the subsystem
uses to pick the RTP port it will hand to FFmpeg and then re-bind with
core/webrtc.NewSourceOn. Covered by a 100x rebind test.

Adds BuildArgs(), which emits the -f rtp output fragments (video on
the passed port, audio on port+1) with copy codecs by default and an
H.264 baseline / libopus re-encode leg when ForceTranscode is set.
Covered by three unit tests.
This commit is contained in:
Zac Gaetano 2026-04-17 09:52:09 -04:00
parent 80db028281
commit 16ae17d2a1
4 changed files with 215 additions and 0 deletions

52
app/webrtc/ffmpeg_args.go Normal file
View file

@ -0,0 +1,52 @@
package webrtc
import (
"fmt"
appcfg "github.com/datarhei/core/v16/restream/app"
)
// BuildArgs emits the FFmpeg output-leg args for the WebRTC side of a
// process. It produces two separate "outputs" — one for video on
// videoPort, one for audio on videoPort+1. Each output ends with its
// UDP address so the slice is structured for consumption by
// restream.AppendOutput after splitting on the track boundary.
//
// Copy vs. re-encode: if ForceTranscode is false, we assume the upstream
// source is already H.264 + Opus and pass them through (copy). When the
// source doesn't match, FFmpeg will fail at runtime and the process will
// restart — the user can flip ForceTranscode on to get a baseline-profile
// H.264 + Opus re-encode.
func BuildArgs(cfg appcfg.ConfigWebRTC, videoPort int) []string {
vcopy := []string{"-c:v", "copy"}
acopy := []string{"-c:a", "copy"}
if cfg.ForceTranscode {
vcopy = []string{
"-c:v", "libx264",
"-preset", "veryfast",
"-profile:v", "baseline",
"-pix_fmt", "yuv420p",
"-tune", "zerolatency",
"-g", "60",
}
acopy = []string{"-c:a", "libopus", "-b:a", "96k"}
}
args := []string{"-map", "0:v:0"}
args = append(args, vcopy...)
args = append(args,
"-payload_type", fmt.Sprint(cfg.VideoPT),
"-f", "rtp",
fmt.Sprintf("udp://127.0.0.1:%d?pkt_size=1316", videoPort),
)
args = append(args, "-map", "0:a:0")
args = append(args, acopy...)
args = append(args,
"-payload_type", fmt.Sprint(cfg.AudioPT),
"-f", "rtp",
fmt.Sprintf("udp://127.0.0.1:%d?pkt_size=1316", videoPort+1),
)
return args
}

View file

@ -0,0 +1,89 @@
package webrtc
import (
"strings"
"testing"
appcfg "github.com/datarhei/core/v16/restream/app"
)
func TestBuildArgs_CopyCodecs(t *testing.T) {
cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}
got := BuildArgs(cfg, 49200)
// Must contain -c:v copy and -c:a copy when ForceTranscode is false.
if !contains(got, "-c:v", "copy") {
t.Fatalf("expected -c:v copy, got %v", got)
}
if !contains(got, "-c:a", "copy") {
t.Fatalf("expected -c:a copy, got %v", got)
}
// Two UDP addresses, one per track, with port+1 for audio.
if !any(got, "udp://127.0.0.1:49200?") {
t.Fatalf("expected video udp on 49200, got %v", got)
}
if !any(got, "udp://127.0.0.1:49201?") {
t.Fatalf("expected audio udp on 49201, got %v", got)
}
// Payload types must be stringified.
if !contains(got, "-payload_type", "102") {
t.Fatalf("expected video PT 102, got %v", got)
}
if !contains(got, "-payload_type", "111") {
t.Fatalf("expected audio PT 111, got %v", got)
}
}
func TestBuildArgs_ForceTranscode(t *testing.T) {
cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111, ForceTranscode: true}
got := BuildArgs(cfg, 49200)
if !contains(got, "-c:v", "libx264") {
t.Fatalf("expected -c:v libx264, got %v", got)
}
if !contains(got, "-profile:v", "baseline") {
t.Fatalf("expected baseline profile, got %v", got)
}
if !contains(got, "-c:a", "libopus") {
t.Fatalf("expected -c:a libopus, got %v", got)
}
}
func TestBuildArgs_TwoTrackBoundary(t *testing.T) {
cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}
got := BuildArgs(cfg, 49200)
// The second `-map` marks the start of the audio leg — the split
// point restream.AppendOutput callers use.
mapCount := 0
for _, a := range got {
if a == "-map" {
mapCount++
}
}
if mapCount != 2 {
t.Fatalf("expected exactly 2 -map tokens, got %d in %v", mapCount, got)
}
}
// contains reports whether the two-token sequence appears consecutively.
func contains(haystack []string, a, b string) bool {
for i := 0; i+1 < len(haystack); i++ {
if haystack[i] == a && haystack[i+1] == b {
return true
}
}
return false
}
// any reports whether any element of haystack starts with prefix.
func any(haystack []string, prefix string) bool {
for _, h := range haystack {
if strings.HasPrefix(h, prefix) {
return true
}
}
return false
}

31
app/webrtc/portalloc.go Normal file
View file

@ -0,0 +1,31 @@
// Package webrtc is the datarhei Core subsystem that turns WebRTC into
// a first-class output alongside RTMP, SRT, and HLS. It owns the WHEP
// HTTP handler, wires FFmpeg's RTP output into per-process Pion
// Sources, and tracks active peer connections.
//
// See docs/design/2026-04-17-datarhei-dragon-fork-m2-webrtc-core-integration.md
// for the full design.
package webrtc
import (
"fmt"
"net"
)
// Alloc binds :0 on loopback UDPv4, records the port the kernel assigned,
// closes the socket, and returns the port number.
//
// The caller is expected to re-bind that exact port via
// core/webrtc.NewSourceOn immediately. There is a microsecond-sized race
// window where another process on the host could grab the port; if that
// happens, the caller's rebind will fail and the error should be
// propagated. In practice this is rare enough that a retry loop would be
// unnecessary churn.
func Alloc() (int, error) {
c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0})
if err != nil {
return 0, fmt.Errorf("webrtc: portalloc: %w", err)
}
defer c.Close()
return c.LocalAddr().(*net.UDPAddr).Port, nil
}

View file

@ -0,0 +1,43 @@
package webrtc
import (
"net"
"testing"
)
// TestAlloc_ReturnsRebindablePort exercises the alloc/close/rebind
// sequence 100 times. If a fast rebind race existed in normal
// conditions, this would surface it.
func TestAlloc_ReturnsRebindablePort(t *testing.T) {
for i := 0; i < 100; i++ {
p, err := Alloc()
if err != nil {
t.Fatalf("iter %d: Alloc: %v", i, err)
}
if p == 0 {
t.Fatalf("iter %d: expected non-zero port", i)
}
c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: p})
if err != nil {
t.Fatalf("iter %d: rebind port %d: %v", i, p, err)
}
c.Close()
}
}
// TestAlloc_DistinctPorts confirms the OS doesn't hand us the same
// ephemeral port twice in quick succession (it shouldn't — the socket
// is briefly held in the bound state on close).
func TestAlloc_DistinctPorts(t *testing.T) {
seen := map[int]bool{}
for i := 0; i < 10; i++ {
p, err := Alloc()
if err != nil {
t.Fatal(err)
}
if seen[p] {
t.Fatalf("duplicate port %d", p)
}
seen[p] = true
}
}