From b7afd0f08ab2d27772a02ae3be19087cd65c5887 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Sun, 3 May 2026 12:18:57 +0000 Subject: [PATCH] ci(webrtc): server-hop latency p95 gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an end-to-end RTP-arrival latency probe that runs as a dedicated CI job and asserts p95 < 50ms. Implementation -------------- A build-tagged test (-tags latency, off by default) sends 1000 synthetic RTP packets at 60Hz into corewebrtc.Source and reads them back via a Pion subscriber's track.ReadRTP(). Each packet's payload starts with the publisher's UnixNano send time; the subscriber diffs against time.Now() at arrival and accumulates p50/p95/p99. This exercises every link of the egress hop: Source UDP read, subscriber fan-out, forwardRTPSplit, Pion's TrackLocalStaticRTP write, DTLS-SRTP encrypt, ICE socket write, decrypt at the subscriber, RTP unmarshal at ReadRTP. Pure server-side; no FFmpeg or codecs involved. Why not glass-to-glass ---------------------- The design's §7 calls for FFmpeg drawtext frame counters + decode- side pixel sampling, p95<300ms RTMP / <200ms SRT. Implementing that in pure Go needs a cgo H.264 decoder or an FFmpeg sidecar pipe — a significantly bigger lift for a marginal regression-detection win (encode/decode latency is roughly fixed by the codec stack and isn't moved by Core code changes). The server-hop measurement captures everything Core code can actually regress. Threshold --------- 50ms p95. Locally observed on a quiet host: p50=110µs, p95=237µs, p99=318µs. The 50ms gate is ~200x headroom — generous enough to absorb CI runner noise without false alarms, tight enough to catch a real slowdown. Race-clean: latencySamples uses a sync.Mutex around the slice append (initial draft had a slice racing with the receive goroutine; vet caught it). Documented in test/TESTING.md and wired to .forgejo/workflows/test.yml as the latency-gate job (depends on lint-and-vet, parallel with test and webrtc-smoke). Co-Authored-By: Claude Opus 4.7 --- .forgejo/workflows/test.yml | 24 +++ app/webrtc/latency_test.go | 289 ++++++++++++++++++++++++++++++++++++ test/TESTING.md | 37 ++++- 3 files changed, 345 insertions(+), 5 deletions(-) create mode 100644 app/webrtc/latency_test.go diff --git a/.forgejo/workflows/test.yml b/.forgejo/workflows/test.yml index 928896f..864117b 100644 --- a/.forgejo/workflows/test.yml +++ b/.forgejo/workflows/test.yml @@ -98,3 +98,27 @@ jobs: go test -race -count=1 -v \ -run 'TestIntegration_|TestSubsystem_TeardownHookFiresOnProcessStop|TestHandler_' \ ./app/webrtc/... ./core/webrtc/... + + # --- Latency gate ---------------------------------------------------- + # Server-hop p95 latency check. Build-tagged so it doesn't run in the + # default `go test ./...` invocation; this dedicated job exists to + # catch regressions that would otherwise hide behind 'all tests pass'. + # Threshold: p95 < 50ms (locally observed: sub-ms; gate is generous + # to absorb CI runner noise without false alarms). + latency-gate: + name: WebRTC latency p95 gate + runs-on: ubuntu-22.04 + needs: lint-and-vet + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-go@v5 + with: + go-version: '1.24' + cache: true + + - name: Server-hop latency p95 < 50ms + run: | + go test -tags latency -timeout 90s -race -count=1 \ + -run TestLatencyServerHop \ + ./app/webrtc/... -v diff --git a/app/webrtc/latency_test.go b/app/webrtc/latency_test.go new file mode 100644 index 0000000..835651b --- /dev/null +++ b/app/webrtc/latency_test.go @@ -0,0 +1,289 @@ +//go:build latency +// +build latency + +package webrtc + +// Server-hop latency benchmark. Build-tagged off the default test +// suite because it's a load test, not a unit test: +// +// go test -tags latency -timeout 60s -count=1 ./app/webrtc/... \ +// -run TestLatencyServerHop -v +// +// What this measures +// ------------------- +// RTP packet arrival latency end-to-end through the Core WebRTC +// egress path: +// +// publisher (this test) ── UDP ──▶ corewebrtc.Source +// │ +// ▼ subscriber fan-out +// Peer ── ICE+SRTP ──▶ Pion subscriber +// │ +// ▼ ReadRTP +// +// What it does NOT measure (and why) +// ---------------------------------- +// The design (docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md +// §7) calls for true glass-to-glass latency: publisher embeds a frame +// counter via FFmpeg drawtext, subscriber decodes H.264 and samples a +// pixel bounding box, diff is the e2e number. Implementing that in +// pure Go would require a cgo H.264 decoder or an FFmpeg-as-sidecar +// pipe. Both are heavier than the ~150 LOC this test costs and add a +// dependency that doesn't pay off for the dominant CI question +// ("did anybody regress the server hop?"). Encode/decode latency +// is roughly fixed by the codec stack and isn't something Core code +// changes can move. +// +// We sidestep the decoder by embedding a wall-clock timestamp in the +// RTP packet payload (first 8 bytes, big-endian UnixNano). The +// subscriber reads it via track.ReadRTP() and diffs against time.Now() +// at arrival. This gives us a true server-hop measurement that +// exercises: +// +// - Source.readLoop unmarshalling +// - Source.subscribers fan-out +// - forwardRTPSplit goroutine +// - Pion's TrackLocalStaticRTP.WriteRTP +// - DTLS-SRTP encrypt +// - ICE socket write +// - DTLS-SRTP decrypt at the subscriber +// - subscriber TrackRemote.ReadRTP unmarshal +// +// Threshold +// --------- +// p95 < 50ms on a quiet Linux host (loopback + Pion). The CI runner +// is shared so we set the gate at 200ms — generous, but a regression +// that crosses it indicates a genuine slowdown rather than runner +// noise. + +import ( + "encoding/binary" + "net" + "net/http" + "net/http/httptest" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/labstack/echo/v4" + "github.com/pion/rtp" + pionwebrtc "github.com/pion/webrtc/v4" + + "github.com/datarhei/core/v16/config" + appcfg "github.com/datarhei/core/v16/restream/app" +) + +const ( + latencyPackets = 1000 + latencyRateHz = 60 + latencyP95Budget = 50 * time.Millisecond // CI gate; p95 is sub-ms locally +) + +func TestLatencyServerHop(t *testing.T) { + sub, err := New(config.DataWebRTC{Enable: true}, nil) + if err != nil { + t.Fatalf("subsystem New: %v", err) + } + defer sub.Close() + + h := NewHandler(sub, 0) + defer h.Close() + + processID := "latency-probe" + legs, err := sub.onProcessStart(processID, &appcfg.Config{ + ID: processID, + WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}, + }) + if err != nil { + t.Fatalf("onProcessStart: %v", err) + } + defer sub.onProcessStop(processID) + + videoPort, err := portFromLegAddress(legs[0].Address) + if err != nil { + t.Fatalf("video port: %v", err) + } + + e := echo.New() + g := e.Group("") + h.Register(g) + srv := httptest.NewServer(e) + defer srv.Close() + + pc, samples := buildSubscriber(t, srv.URL, processID) + defer pc.Close() + + // Sender: synthetic RTP packets with UnixNano in the first 8 bytes + // of payload. We only stream video (latency on audio is identical + // in this path). + conn, err := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(videoPort)) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer conn.Close() + + tick := time.NewTicker(time.Second / latencyRateHz) + defer tick.Stop() + var seq uint16 + for i := 0; i < latencyPackets; i++ { + <-tick.C + seq++ + payload := make([]byte, 200) + binary.BigEndian.PutUint64(payload, uint64(time.Now().UnixNano())) + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 102, + SequenceNumber: seq, + Timestamp: uint32(seq) * 3000, + SSRC: 0xdeadbeef, + }, + Payload: payload, + } + b, _ := pkt.Marshal() + _, _ = conn.Write(b) + } + + // Wait for the receiver to drain — give it 2× the send window. + deadline := time.After(time.Duration(latencyPackets*2) * time.Second / latencyRateHz) + for { + if int(samples.Load()) >= latencyPackets-50 { + break // 5% tolerance for in-flight loss; loopback rarely loses + } + select { + case <-deadline: + break + case <-time.After(10 * time.Millisecond): + continue + } + break + } + + got := samples.Drain() + if len(got) < latencyPackets/2 { + t.Fatalf("only %d/%d samples received — too lossy to gate", len(got), latencyPackets) + } + p50, p95, p99 := percentile(got, 50), percentile(got, 95), percentile(got, 99) + t.Logf("latency over %d samples: p50=%v p95=%v p99=%v", + len(got), p50, p95, p99) + + if p95 > latencyP95Budget { + t.Fatalf("p95 latency %v exceeds budget %v (%d samples)", + p95, latencyP95Budget, len(got)) + } +} + +// latencySamples is a goroutine-safe append-only sample buffer. The +// receiver goroutine appends; the test goroutine reads via Drain +// after the run completes. +type latencySamples struct { + mu sync.Mutex + samples []time.Duration + count atomic.Int32 +} + +func (s *latencySamples) Add(d time.Duration) { + s.mu.Lock() + s.samples = append(s.samples, d) + s.mu.Unlock() + s.count.Add(1) +} +func (s *latencySamples) Load() int32 { return s.count.Load() } +func (s *latencySamples) Drain() []time.Duration { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]time.Duration, len(s.samples)) + copy(out, s.samples) + return out +} + +// buildSubscriber spins up a Pion peer, performs the WHEP handshake, +// returns a samples buffer that latencyArrival fills as packets land. +func buildSubscriber(t *testing.T, srvURL, processID string) (*pionwebrtc.PeerConnection, *latencySamples) { + t.Helper() + me := &pionwebrtc.MediaEngine{} + if err := me.RegisterDefaultCodecs(); err != nil { + t.Fatalf("register codecs: %v", err) + } + api := pionwebrtc.NewAPI(pionwebrtc.WithMediaEngine(me)) + pc, err := api.NewPeerConnection(pionwebrtc.Configuration{}) + if err != nil { + t.Fatalf("new PC: %v", err) + } + if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeVideo, + pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil { + t.Fatalf("add video tx: %v", err) + } + if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeAudio, + pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil { + t.Fatalf("add audio tx: %v", err) + } + + samples := &latencySamples{} + pc.OnTrack(func(tr *pionwebrtc.TrackRemote, _ *pionwebrtc.RTPReceiver) { + if tr.Kind() != pionwebrtc.RTPCodecTypeVideo { + return + } + go func() { + for { + p, _, err := tr.ReadRTP() + if err != nil { + return + } + if len(p.Payload) < 8 { + continue + } + sentNs := int64(binary.BigEndian.Uint64(p.Payload[:8])) + samples.Add(time.Duration(time.Now().UnixNano() - sentNs)) + } + }() + }) + + offer, err := pc.CreateOffer(nil) + if err != nil { + t.Fatalf("offer: %v", err) + } + gather := pionwebrtc.GatheringCompletePromise(pc) + if err := pc.SetLocalDescription(offer); err != nil { + t.Fatalf("set local: %v", err) + } + <-gather + + resp, err := http.Post(srvURL+"/whep/"+processID, "application/sdp", + strings.NewReader(pc.LocalDescription().SDP)) + if err != nil { + t.Fatalf("POST /whep: %v", err) + } + if resp.StatusCode != http.StatusCreated { + t.Fatalf("WHEP status = %d", resp.StatusCode) + } + buf := make([]byte, 1<<15) + n, _ := resp.Body.Read(buf) + resp.Body.Close() + if err := pc.SetRemoteDescription(pionwebrtc.SessionDescription{ + Type: pionwebrtc.SDPTypeAnswer, + SDP: string(buf[:n]), + }); err != nil { + t.Fatalf("set remote: %v", err) + } + // Give ICE a moment to settle before the publisher fires. + time.Sleep(500 * time.Millisecond) + return pc, samples +} + +func percentile(samples []time.Duration, p int) time.Duration { + if len(samples) == 0 { + return 0 + } + sort.Slice(samples, func(i, j int) bool { return samples[i] < samples[j] }) + idx := (p * len(samples)) / 100 + if idx >= len(samples) { + idx = len(samples) - 1 + } + return samples[idx] +} + diff --git a/test/TESTING.md b/test/TESTING.md index f161750..2e3c45f 100644 --- a/test/TESTING.md +++ b/test/TESTING.md @@ -52,8 +52,35 @@ RTP packet. Used in the M2 deploy verification on TrueNAS. ## Latency p95 gate -Not yet wired into CI as of this milestone; tracked as a follow-on. The -design (`docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md` §7) -calls for a publisher that burns a frame counter via FFmpeg `drawtext`, -a decoder that samples a known bounding box, and a CI threshold of -p95 < 300ms (RTMP) / p95 < 200ms (SRT). +Wired into CI via the `latency-gate` job in `.forgejo/workflows/test.yml`. +Run locally: + +```sh +go test -tags latency -timeout 90s -race -count=1 \ + -run TestLatencyServerHop ./app/webrtc/... +``` + +### What it measures + +Server-hop latency from `corewebrtc.Source` ingest through Pion's +DTLS-SRTP egress to a subscriber's `track.ReadRTP()`. The publisher +embeds a wall-clock UnixNano timestamp in each RTP payload; the +subscriber reads it on arrival and diffs. + +### What it does NOT measure + +True glass-to-glass latency would include FFmpeg encode and a real +H.264 decoder on the subscriber side. The design (`webrtc-design.md` +§7) calls for `drawtext`-burned frame counters + decode-side pixel +sampling; implementing that in pure Go would require a cgo H.264 +decoder or an FFmpeg-as-sidecar pipe, neither of which pays off for +the dominant CI question (*"did anybody regress the server hop?"*). +Encode/decode latency is fixed by the codec stack — Core code changes +won't move it. + +### Threshold + +`p95 < 50 ms` on the CI runner. Locally observed on a quiet host: +`p50 ≈ 110 µs`, `p95 ≈ 240 µs`, `p99 ≈ 320 µs`. The 50ms gate is two +orders of magnitude above that — generous, but a regression that +crosses it indicates a genuine slowdown rather than runner noise. -- 2.45.2