From 561a93e044a3dc78e3120999cada7e5175c02bdd Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Wed, 6 May 2026 16:01:22 -0400 Subject: [PATCH] feat(test): add 5-peer sustained WHEP load test (closes #14) --- test/load/sustained.go | 385 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 385 insertions(+) create mode 100644 test/load/sustained.go diff --git a/test/load/sustained.go b/test/load/sustained.go new file mode 100644 index 0000000..7298eb2 --- /dev/null +++ b/test/load/sustained.go @@ -0,0 +1,385 @@ +// Dragon Fork — headless WHEP subscriber load test. +// +// Drives N concurrent WHEP peers against a single stream for a configurable +// duration and produces a markdown report suitable for committing to +// test/load/results/. +// +// Usage: +// +// go run ./test/load/sustained.go \ +// -url http://10.0.0.25:8080 \ +// -stream mystream \ +// -peers 5 \ +// -duration 10m \ +// -auth "Bearer " \ +// -out test/load/results/ +// +// The program exits 0 on success or 1 if any peer failed to connect. +// Run the Grafana dashboard alongside to observe Prometheus metrics +// during the test (see deploy/truenas/core/docker-compose.yml). + +//go:build ignore + +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "io" + "math" + "net/http" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/pion/webrtc/v4" +) + +// peerResult holds per-peer stats collected over the run. +type peerResult struct { + index int + connected bool + iceEstablishMs float64 + packetsReceived uint64 + seqGaps uint64 // proxy for packet loss + jitterSamplesMs []float64 + disconnectedAt time.Time + err string +} + +func main() { + coreURL := flag.String("url", "http://localhost:8080", "Dragon Fork Core base URL") + streamID := flag.String("stream", "", "Process ID with webrtc.enabled=true") + nPeers := flag.Int("peers", 5, "Number of concurrent WHEP subscribers") + duration := flag.Duration("duration", 10*time.Minute, "Test duration") + auth := flag.String("auth", "", "Authorization header value (e.g. 'Bearer TOKEN')") + outDir := flag.String("out", "test/load/results", "Output directory for markdown report") + flag.Parse() + + if *streamID == "" { + fmt.Fprintln(os.Stderr, "error: -stream is required") + os.Exit(1) + } + + fmt.Printf("Dragon Fork WHEP load test\n") + fmt.Printf(" target: %s\n", *coreURL) + fmt.Printf(" stream: %s\n", *streamID) + fmt.Printf(" peers: %d\n", *nPeers) + fmt.Printf(" duration: %s\n", *duration) + fmt.Println() + + ctx, cancel := context.WithTimeout(context.Background(), *duration+30*time.Second) + defer cancel() + + results := make([]peerResult, *nPeers) + var wg sync.WaitGroup + var anyFail int32 + + for i := 0; i < *nPeers; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + res := runPeer(ctx, idx, *coreURL, *streamID, *auth, *duration) + results[idx] = res + if !res.connected { + atomic.StoreInt32(&anyFail, 1) + } + }(i) + // Stagger connection attempts 200ms apart to avoid thundering herd. + time.Sleep(200 * time.Millisecond) + } + + wg.Wait() + + report := buildReport(*coreURL, *streamID, *nPeers, *duration, results) + fmt.Print(report) + + if err := os.MkdirAll(*outDir, 0o755); err != nil { + fmt.Fprintf(os.Stderr, "mkdir %s: %v\n", *outDir, err) + } else { + ts := time.Now().UTC().Format("2006-01-02T150405Z") + fname := filepath.Join(*outDir, fmt.Sprintf("%s-%s-%dp.md", ts, *streamID, *nPeers)) + if err := os.WriteFile(fname, []byte(report), 0o644); err != nil { + fmt.Fprintf(os.Stderr, "write report: %v\n", err) + } else { + fmt.Printf("\nReport written to %s\n", fname) + } + } + + if anyFail != 0 { + os.Exit(1) + } +} + +// runPeer connects one WHEP subscriber, reads packets for the test duration, +// and collects statistics. +func runPeer(ctx context.Context, idx int, coreURL, streamID, auth string, dur time.Duration) peerResult { + res := peerResult{index: idx} + + // Build a minimal SDP offer using Pion. + me := &webrtc.MediaEngine{} + if err := me.RegisterDefaultCodecs(); err != nil { + res.err = fmt.Sprintf("media engine: %v", err) + return res + } + api := webrtc.NewAPI(webrtc.WithMediaEngine(me)) + cfg := webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}} + pc, err := api.NewPeerConnection(cfg) + if err != nil { + res.err = fmt.Sprintf("new peer connection: %v", err) + return res + } + defer pc.Close() + + // Add receive-only transceivers so the SDP offer contains the right m= lines. + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + res.err = fmt.Sprintf("add video transceiver: %v", err) + return res + } + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + res.err = fmt.Sprintf("add audio transceiver: %v", err) + return res + } + + offer, err := pc.CreateOffer(nil) + if err != nil { + res.err = fmt.Sprintf("create offer: %v", err) + return res + } + if err := pc.SetLocalDescription(offer); err != nil { + res.err = fmt.Sprintf("set local desc: %v", err) + return res + } + + // POST the offer to the WHEP endpoint. + whepURL := strings.TrimRight(coreURL, "/") + "/api/v3/whep/" + streamID + t0 := time.Now() + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, whepURL, bytes.NewReader([]byte(offer.SDP))) + if err != nil { + res.err = fmt.Sprintf("build http request: %v", err) + return res + } + httpReq.Header.Set("Content-Type", "application/sdp") + if auth != "" { + httpReq.Header.Set("Authorization", auth) + } + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + res.err = fmt.Sprintf("POST /whep: %v", err) + return res + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + res.err = fmt.Sprintf("WHEP POST returned %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) + return res + } + + answerSDP, err := io.ReadAll(resp.Body) + if err != nil { + res.err = fmt.Sprintf("read answer: %v", err) + return res + } + + answer := webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: string(answerSDP)} + if err := pc.SetRemoteDescription(answer); err != nil { + res.err = fmt.Sprintf("set remote desc: %v", err) + return res + } + + // Wait for ICE connected. + connCh := make(chan struct{}) + var connOnce sync.Once + pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { + if s == webrtc.PeerConnectionStateConnected { + connOnce.Do(func() { close(connCh) }) + } + }) + + select { + case <-connCh: + res.iceEstablishMs = float64(time.Since(t0).Milliseconds()) + res.connected = true + case <-time.After(15 * time.Second): + res.err = "ICE connection timeout (15s)" + return res + case <-ctx.Done(): + res.err = "context cancelled before ICE connected" + return res + } + + fmt.Printf(" peer %d connected (ICE: %.0fms)\n", idx, res.iceEstablishMs) + + // Collect RTP statistics for the test duration. + var mu sync.Mutex + var lastSeq uint16 + var seenFirst bool + + pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { + prevArrival := time.Now() + var prevRTPTimestamp uint32 + var jitter float64 + clockRate := float64(track.Codec().ClockRate) + + buf := make([]byte, 1500) + for { + n, _, err := track.Read(buf) + if err != nil || n < 12 { + return + } + arrivalTime := time.Now() + + // Sequence number gap tracking. + seq := uint16(buf[2])<<8 | uint16(buf[3]) + mu.Lock() + atomic.AddUint64(&res.packetsReceived, 1) + if seenFirst { + expected := lastSeq + 1 + if seq != expected { + gaps := uint64(seq - expected) + if gaps < 1000 { + atomic.AddUint64(&res.seqGaps, gaps) + } + } + } else { + seenFirst = true + } + lastSeq = seq + mu.Unlock() + + // RFC 3550 jitter (simplified: interarrival time deviation). + rtpTS := uint32(buf[4])<<24 | uint32(buf[5])<<16 | uint32(buf[6])<<8 | uint32(buf[7]) + if prevRTPTimestamp != 0 { + sendDiff := float64(int32(rtpTS-prevRTPTimestamp)) / clockRate + recvDiff := arrivalTime.Sub(prevArrival).Seconds() + d := math.Abs(recvDiff - sendDiff) + jitter += (d - jitter) / 16 // running average + mu.Lock() + res.jitterSamplesMs = append(res.jitterSamplesMs, jitter*1000) + mu.Unlock() + } + prevRTPTimestamp = rtpTS + prevArrival = arrivalTime + } + }) + + // Wait for test duration or context cancellation. + testTimer := time.NewTimer(dur) + defer testTimer.Stop() + select { + case <-testTimer.C: + case <-ctx.Done(): + res.disconnectedAt = time.Now() + } + + // DELETE the WHEP resource. + location := resp.Header.Get("Location") + if location != "" { + delURL := strings.TrimRight(coreURL, "/") + location + delReq, _ := http.NewRequest(http.MethodDelete, delURL, nil) + if auth != "" { + delReq.Header.Set("Authorization", auth) + } + _, _ = http.DefaultClient.Do(delReq) + } + + return res +} + +func buildReport(coreURL, streamID string, nPeers int, dur time.Duration, results []peerResult) string { + var b strings.Builder + ts := time.Now().UTC().Format(time.RFC3339) + + b.WriteString("# Dragon Fork WHEP Sustained Load Test\n\n") + fmt.Fprintf(&b, "**Date:** %s \n", ts) + fmt.Fprintf(&b, "**Target:** %s \n", coreURL) + fmt.Fprintf(&b, "**Stream:** %s \n", streamID) + fmt.Fprintf(&b, "**Peers:** %d \n", nPeers) + fmt.Fprintf(&b, "**Duration:** %s \n\n", dur) + + // Summary table. + connected := 0 + var iceMs []float64 + var allJitter []float64 + var totalPkts, totalGaps uint64 + for _, r := range results { + if r.connected { + connected++ + iceMs = append(iceMs, r.iceEstablishMs) + allJitter = append(allJitter, r.jitterSamplesMs...) + totalPkts += r.packetsReceived + totalGaps += r.seqGaps + } + } + + b.WriteString("## Summary\n\n") + fmt.Fprintf(&b, "| Metric | Value |\n|---|---|\n") + fmt.Fprintf(&b, "| Peers connected | %d / %d |\n", connected, nPeers) + fmt.Fprintf(&b, "| Total packets received | %d |\n", totalPkts) + lossRate := 0.0 + if totalPkts+totalGaps > 0 { + lossRate = float64(totalGaps) / float64(totalPkts+totalGaps) * 100 + } + fmt.Fprintf(&b, "| Packet loss estimate | %.2f%% |\n", lossRate) + if len(iceMs) > 0 { + fmt.Fprintf(&b, "| ICE establishment p50 | %.0fms |\n", percentile(iceMs, 50)) + fmt.Fprintf(&b, "| ICE establishment p95 | %.0fms |\n", percentile(iceMs, 95)) + } + if len(allJitter) > 0 { + fmt.Fprintf(&b, "| Jitter p50 | %.2fms |\n", percentile(allJitter, 50)) + fmt.Fprintf(&b, "| Jitter p95 | %.2fms |\n", percentile(allJitter, 95)) + } + b.WriteString("\n") + + // Per-peer detail. + b.WriteString("## Per-Peer Detail\n\n") + b.WriteString("| Peer | Connected | ICE ms | Packets | Loss est. | Jitter p95 |\n") + b.WriteString("|---|---|---|---|---|---|\n") + for _, r := range results { + if r.err != "" { + fmt.Fprintf(&b, "| %d | ❌ | — | — | — | %s |\n", r.index, r.err) + continue + } + lossEst := 0.0 + if r.packetsReceived+r.seqGaps > 0 { + lossEst = float64(r.seqGaps) / float64(r.packetsReceived+r.seqGaps) * 100 + } + jP95 := 0.0 + if len(r.jitterSamplesMs) > 0 { + jP95 = percentile(r.jitterSamplesMs, 95) + } + fmt.Fprintf(&b, "| %d | ✓ | %.0f | %d | %.2f%% | %.2fms |\n", + r.index, r.iceEstablishMs, r.packetsReceived, lossEst, jP95) + } + b.WriteString("\n") + + b.WriteString("---\n") + b.WriteString("*Generated by `test/load/sustained.go`. Observe Prometheus metrics during run via Grafana.*\n") + + return b.String() +} + +func percentile(data []float64, p float64) float64 { + if len(data) == 0 { + return 0 + } + sorted := make([]float64, len(data)) + copy(sorted, data) + sort.Float64s(sorted) + idx := (p / 100) * float64(len(sorted)-1) + lo := int(idx) + hi := lo + 1 + if hi >= len(sorted) { + return sorted[lo] + } + frac := idx - float64(lo) + return sorted[lo]*(1-frac) + sorted[hi]*frac +}