//go:build latency // +build latency package webrtc // Server-hop latency benchmark. Build-tagged off the default test // suite because it's a load test, not a unit test: // // go test -tags latency -timeout 60s -count=1 ./app/webrtc/... \ // -run TestLatencyServerHop -v // // What this measures // ------------------- // RTP packet arrival latency end-to-end through the Core WebRTC // egress path: // // publisher (this test) ── UDP ──▶ corewebrtc.Source // │ // ▼ subscriber fan-out // Peer ── ICE+SRTP ──▶ Pion subscriber // │ // ▼ ReadRTP // // What it does NOT measure (and why) // ---------------------------------- // The design (docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md // §7) calls for true glass-to-glass latency: publisher embeds a frame // counter via FFmpeg drawtext, subscriber decodes H.264 and samples a // pixel bounding box, diff is the e2e number. Implementing that in // pure Go would require a cgo H.264 decoder or an FFmpeg-as-sidecar // pipe. Both are heavier than the ~150 LOC this test costs and add a // dependency that doesn't pay off for the dominant CI question // ("did anybody regress the server hop?"). Encode/decode latency // is roughly fixed by the codec stack and isn't something Core code // changes can move. // // We sidestep the decoder by embedding a wall-clock timestamp in the // RTP packet payload (first 8 bytes, big-endian UnixNano). The // subscriber reads it via track.ReadRTP() and diffs against time.Now() // at arrival. This gives us a true server-hop measurement that // exercises: // // - Source.readLoop unmarshalling // - Source.subscribers fan-out // - forwardRTPSplit goroutine // - Pion's TrackLocalStaticRTP.WriteRTP // - DTLS-SRTP encrypt // - ICE socket write // - DTLS-SRTP decrypt at the subscriber // - subscriber TrackRemote.ReadRTP unmarshal // // Threshold // --------- // p95 < 50ms on a quiet Linux host (loopback + Pion). The CI runner // is shared so we set the gate at 200ms — generous, but a regression // that crosses it indicates a genuine slowdown rather than runner // noise. import ( "encoding/binary" "net" "net/http" "net/http/httptest" "sort" "strconv" "strings" "sync" "sync/atomic" "testing" "time" "github.com/labstack/echo/v4" "github.com/pion/rtp" pionwebrtc "github.com/pion/webrtc/v4" "github.com/datarhei/core/v16/config" appcfg "github.com/datarhei/core/v16/restream/app" ) const ( latencyPackets = 1000 latencyRateHz = 60 latencyP95Budget = 50 * time.Millisecond // CI gate; p95 is sub-ms locally ) func TestLatencyServerHop(t *testing.T) { sub, err := New(config.DataWebRTC{Enable: true}, nil) if err != nil { t.Fatalf("subsystem New: %v", err) } defer sub.Close() h := NewHandler(sub, 0) defer h.Close() processID := "latency-probe" legs, err := sub.onProcessStart(processID, &appcfg.Config{ ID: processID, WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}, }) if err != nil { t.Fatalf("onProcessStart: %v", err) } defer sub.onProcessStop(processID) videoPort, err := portFromLegAddress(legs[0].Address) if err != nil { t.Fatalf("video port: %v", err) } e := echo.New() g := e.Group("") h.Register(g) srv := httptest.NewServer(e) defer srv.Close() pc, samples := buildSubscriber(t, srv.URL, processID) defer pc.Close() // Sender: synthetic RTP packets with UnixNano in the first 8 bytes // of payload. We only stream video (latency on audio is identical // in this path). conn, err := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(videoPort)) if err != nil { t.Fatalf("dial: %v", err) } defer conn.Close() tick := time.NewTicker(time.Second / latencyRateHz) defer tick.Stop() var seq uint16 for i := 0; i < latencyPackets; i++ { <-tick.C seq++ payload := make([]byte, 200) binary.BigEndian.PutUint64(payload, uint64(time.Now().UnixNano())) pkt := &rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: 102, SequenceNumber: seq, Timestamp: uint32(seq) * 3000, SSRC: 0xdeadbeef, }, Payload: payload, } b, _ := pkt.Marshal() _, _ = conn.Write(b) } // Wait for the receiver to drain — give it 2× the send window. deadline := time.After(time.Duration(latencyPackets*2) * time.Second / latencyRateHz) for { if int(samples.Load()) >= latencyPackets-50 { break // 5% tolerance for in-flight loss; loopback rarely loses } select { case <-deadline: break case <-time.After(10 * time.Millisecond): continue } break } got := samples.Drain() if len(got) < latencyPackets/2 { t.Fatalf("only %d/%d samples received — too lossy to gate", len(got), latencyPackets) } p50, p95, p99 := percentile(got, 50), percentile(got, 95), percentile(got, 99) t.Logf("latency over %d samples: p50=%v p95=%v p99=%v", len(got), p50, p95, p99) if p95 > latencyP95Budget { t.Fatalf("p95 latency %v exceeds budget %v (%d samples)", p95, latencyP95Budget, len(got)) } } // latencySamples is a goroutine-safe append-only sample buffer. The // receiver goroutine appends; the test goroutine reads via Drain // after the run completes. type latencySamples struct { mu sync.Mutex samples []time.Duration count atomic.Int32 } func (s *latencySamples) Add(d time.Duration) { s.mu.Lock() s.samples = append(s.samples, d) s.mu.Unlock() s.count.Add(1) } func (s *latencySamples) Load() int32 { return s.count.Load() } func (s *latencySamples) Drain() []time.Duration { s.mu.Lock() defer s.mu.Unlock() out := make([]time.Duration, len(s.samples)) copy(out, s.samples) return out } // buildSubscriber spins up a Pion peer, performs the WHEP handshake, // returns a samples buffer that latencyArrival fills as packets land. func buildSubscriber(t *testing.T, srvURL, processID string) (*pionwebrtc.PeerConnection, *latencySamples) { t.Helper() me := &pionwebrtc.MediaEngine{} if err := me.RegisterDefaultCodecs(); err != nil { t.Fatalf("register codecs: %v", err) } api := pionwebrtc.NewAPI(pionwebrtc.WithMediaEngine(me)) pc, err := api.NewPeerConnection(pionwebrtc.Configuration{}) if err != nil { t.Fatalf("new PC: %v", err) } if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeVideo, pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil { t.Fatalf("add video tx: %v", err) } if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeAudio, pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil { t.Fatalf("add audio tx: %v", err) } samples := &latencySamples{} pc.OnTrack(func(tr *pionwebrtc.TrackRemote, _ *pionwebrtc.RTPReceiver) { if tr.Kind() != pionwebrtc.RTPCodecTypeVideo { return } go func() { for { p, _, err := tr.ReadRTP() if err != nil { return } if len(p.Payload) < 8 { continue } sentNs := int64(binary.BigEndian.Uint64(p.Payload[:8])) samples.Add(time.Duration(time.Now().UnixNano() - sentNs)) } }() }) offer, err := pc.CreateOffer(nil) if err != nil { t.Fatalf("offer: %v", err) } gather := pionwebrtc.GatheringCompletePromise(pc) if err := pc.SetLocalDescription(offer); err != nil { t.Fatalf("set local: %v", err) } <-gather resp, err := http.Post(srvURL+"/whep/"+processID, "application/sdp", strings.NewReader(pc.LocalDescription().SDP)) if err != nil { t.Fatalf("POST /whep: %v", err) } if resp.StatusCode != http.StatusCreated { t.Fatalf("WHEP status = %d", resp.StatusCode) } buf := make([]byte, 1<<15) n, _ := resp.Body.Read(buf) resp.Body.Close() if err := pc.SetRemoteDescription(pionwebrtc.SessionDescription{ Type: pionwebrtc.SDPTypeAnswer, SDP: string(buf[:n]), }); err != nil { t.Fatalf("set remote: %v", err) } // Give ICE a moment to settle before the publisher fires. time.Sleep(500 * time.Millisecond) return pc, samples } func percentile(samples []time.Duration, p int) time.Duration { if len(samples) == 0 { return 0 } sort.Slice(samples, func(i, j int) bool { return samples[i] < samples[j] }) idx := (p * len(samples)) / 100 if idx >= len(samples) { idx = len(samples) - 1 } return samples[idx] }