// Dragon Fork — headless WHEP subscriber load test. // // Drives N concurrent WHEP peers against a single stream for a configurable // duration and produces a markdown report suitable for committing to // test/load/results/. // // Usage: // // go run ./test/load/sustained.go \ // -url http://10.0.0.25:8080 \ // -stream mystream \ // -peers 5 \ // -duration 10m \ // -auth "Bearer " \ // -out test/load/results/ // // The program exits 0 on success or 1 if any peer failed to connect. // Run the Grafana dashboard alongside to observe Prometheus metrics // during the test (see deploy/truenas/core/docker-compose.yml). //go:build ignore package main import ( "bytes" "context" "flag" "fmt" "io" "math" "net/http" "os" "path/filepath" "sort" "strings" "sync" "sync/atomic" "time" "github.com/pion/webrtc/v4" ) // peerResult holds per-peer stats collected over the run. type peerResult struct { index int connected bool iceEstablishMs float64 packetsReceived uint64 seqGaps uint64 // proxy for packet loss jitterSamplesMs []float64 disconnectedAt time.Time err string } func main() { coreURL := flag.String("url", "http://localhost:8080", "Dragon Fork Core base URL") streamID := flag.String("stream", "", "Process ID with webrtc.enabled=true") nPeers := flag.Int("peers", 5, "Number of concurrent WHEP subscribers") duration := flag.Duration("duration", 10*time.Minute, "Test duration") auth := flag.String("auth", "", "Authorization header value (e.g. 'Bearer TOKEN')") outDir := flag.String("out", "test/load/results", "Output directory for markdown report") flag.Parse() if *streamID == "" { fmt.Fprintln(os.Stderr, "error: -stream is required") os.Exit(1) } fmt.Printf("Dragon Fork WHEP load test\n") fmt.Printf(" target: %s\n", *coreURL) fmt.Printf(" stream: %s\n", *streamID) fmt.Printf(" peers: %d\n", *nPeers) fmt.Printf(" duration: %s\n", *duration) fmt.Println() ctx, cancel := context.WithTimeout(context.Background(), *duration+30*time.Second) defer cancel() results := make([]peerResult, *nPeers) var wg sync.WaitGroup var anyFail int32 for i := 0; i < *nPeers; i++ { wg.Add(1) go func(idx int) { defer wg.Done() res := runPeer(ctx, idx, *coreURL, *streamID, *auth, *duration) results[idx] = res if !res.connected { atomic.StoreInt32(&anyFail, 1) } }(i) // Stagger connection attempts 200ms apart to avoid thundering herd. time.Sleep(200 * time.Millisecond) } wg.Wait() report := buildReport(*coreURL, *streamID, *nPeers, *duration, results) fmt.Print(report) if err := os.MkdirAll(*outDir, 0o755); err != nil { fmt.Fprintf(os.Stderr, "mkdir %s: %v\n", *outDir, err) } else { ts := time.Now().UTC().Format("2006-01-02T150405Z") fname := filepath.Join(*outDir, fmt.Sprintf("%s-%s-%dp.md", ts, *streamID, *nPeers)) if err := os.WriteFile(fname, []byte(report), 0o644); err != nil { fmt.Fprintf(os.Stderr, "write report: %v\n", err) } else { fmt.Printf("\nReport written to %s\n", fname) } } if anyFail != 0 { os.Exit(1) } } // runPeer connects one WHEP subscriber, reads packets for the test duration, // and collects statistics. func runPeer(ctx context.Context, idx int, coreURL, streamID, auth string, dur time.Duration) peerResult { res := peerResult{index: idx} // Build a minimal SDP offer using Pion. me := &webrtc.MediaEngine{} if err := me.RegisterDefaultCodecs(); err != nil { res.err = fmt.Sprintf("media engine: %v", err) return res } api := webrtc.NewAPI(webrtc.WithMediaEngine(me)) cfg := webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}} pc, err := api.NewPeerConnection(cfg) if err != nil { res.err = fmt.Sprintf("new peer connection: %v", err) return res } defer pc.Close() // Add receive-only transceivers so the SDP offer contains the right m= lines. if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { res.err = fmt.Sprintf("add video transceiver: %v", err) return res } if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { res.err = fmt.Sprintf("add audio transceiver: %v", err) return res } offer, err := pc.CreateOffer(nil) if err != nil { res.err = fmt.Sprintf("create offer: %v", err) return res } if err := pc.SetLocalDescription(offer); err != nil { res.err = fmt.Sprintf("set local desc: %v", err) return res } // POST the offer to the WHEP endpoint. whepURL := strings.TrimRight(coreURL, "/") + "/api/v3/whep/" + streamID t0 := time.Now() httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, whepURL, bytes.NewReader([]byte(offer.SDP))) if err != nil { res.err = fmt.Sprintf("build http request: %v", err) return res } httpReq.Header.Set("Content-Type", "application/sdp") if auth != "" { httpReq.Header.Set("Authorization", auth) } resp, err := http.DefaultClient.Do(httpReq) if err != nil { res.err = fmt.Sprintf("POST /whep: %v", err) return res } defer resp.Body.Close() if resp.StatusCode != http.StatusCreated { body, _ := io.ReadAll(resp.Body) res.err = fmt.Sprintf("WHEP POST returned %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) return res } answerSDP, err := io.ReadAll(resp.Body) if err != nil { res.err = fmt.Sprintf("read answer: %v", err) return res } answer := webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: string(answerSDP)} if err := pc.SetRemoteDescription(answer); err != nil { res.err = fmt.Sprintf("set remote desc: %v", err) return res } // Wait for ICE connected. connCh := make(chan struct{}) var connOnce sync.Once pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { if s == webrtc.PeerConnectionStateConnected { connOnce.Do(func() { close(connCh) }) } }) select { case <-connCh: res.iceEstablishMs = float64(time.Since(t0).Milliseconds()) res.connected = true case <-time.After(15 * time.Second): res.err = "ICE connection timeout (15s)" return res case <-ctx.Done(): res.err = "context cancelled before ICE connected" return res } fmt.Printf(" peer %d connected (ICE: %.0fms)\n", idx, res.iceEstablishMs) // Collect RTP statistics for the test duration. var mu sync.Mutex var lastSeq uint16 var seenFirst bool pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { prevArrival := time.Now() var prevRTPTimestamp uint32 var jitter float64 clockRate := float64(track.Codec().ClockRate) buf := make([]byte, 1500) for { n, _, err := track.Read(buf) if err != nil || n < 12 { return } arrivalTime := time.Now() // Sequence number gap tracking. seq := uint16(buf[2])<<8 | uint16(buf[3]) mu.Lock() atomic.AddUint64(&res.packetsReceived, 1) if seenFirst { expected := lastSeq + 1 if seq != expected { gaps := uint64(seq - expected) if gaps < 1000 { atomic.AddUint64(&res.seqGaps, gaps) } } } else { seenFirst = true } lastSeq = seq mu.Unlock() // RFC 3550 jitter (simplified: interarrival time deviation). rtpTS := uint32(buf[4])<<24 | uint32(buf[5])<<16 | uint32(buf[6])<<8 | uint32(buf[7]) if prevRTPTimestamp != 0 { sendDiff := float64(int32(rtpTS-prevRTPTimestamp)) / clockRate recvDiff := arrivalTime.Sub(prevArrival).Seconds() d := math.Abs(recvDiff - sendDiff) jitter += (d - jitter) / 16 // running average mu.Lock() res.jitterSamplesMs = append(res.jitterSamplesMs, jitter*1000) mu.Unlock() } prevRTPTimestamp = rtpTS prevArrival = arrivalTime } }) // Wait for test duration or context cancellation. testTimer := time.NewTimer(dur) defer testTimer.Stop() select { case <-testTimer.C: case <-ctx.Done(): res.disconnectedAt = time.Now() } // DELETE the WHEP resource. location := resp.Header.Get("Location") if location != "" { delURL := strings.TrimRight(coreURL, "/") + location delReq, _ := http.NewRequest(http.MethodDelete, delURL, nil) if auth != "" { delReq.Header.Set("Authorization", auth) } _, _ = http.DefaultClient.Do(delReq) } return res } func buildReport(coreURL, streamID string, nPeers int, dur time.Duration, results []peerResult) string { var b strings.Builder ts := time.Now().UTC().Format(time.RFC3339) b.WriteString("# Dragon Fork WHEP Sustained Load Test\n\n") fmt.Fprintf(&b, "**Date:** %s \n", ts) fmt.Fprintf(&b, "**Target:** %s \n", coreURL) fmt.Fprintf(&b, "**Stream:** %s \n", streamID) fmt.Fprintf(&b, "**Peers:** %d \n", nPeers) fmt.Fprintf(&b, "**Duration:** %s \n\n", dur) // Summary table. connected := 0 var iceMs []float64 var allJitter []float64 var totalPkts, totalGaps uint64 for _, r := range results { if r.connected { connected++ iceMs = append(iceMs, r.iceEstablishMs) allJitter = append(allJitter, r.jitterSamplesMs...) totalPkts += r.packetsReceived totalGaps += r.seqGaps } } b.WriteString("## Summary\n\n") fmt.Fprintf(&b, "| Metric | Value |\n|---|---|\n") fmt.Fprintf(&b, "| Peers connected | %d / %d |\n", connected, nPeers) fmt.Fprintf(&b, "| Total packets received | %d |\n", totalPkts) lossRate := 0.0 if totalPkts+totalGaps > 0 { lossRate = float64(totalGaps) / float64(totalPkts+totalGaps) * 100 } fmt.Fprintf(&b, "| Packet loss estimate | %.2f%% |\n", lossRate) if len(iceMs) > 0 { fmt.Fprintf(&b, "| ICE establishment p50 | %.0fms |\n", percentile(iceMs, 50)) fmt.Fprintf(&b, "| ICE establishment p95 | %.0fms |\n", percentile(iceMs, 95)) } if len(allJitter) > 0 { fmt.Fprintf(&b, "| Jitter p50 | %.2fms |\n", percentile(allJitter, 50)) fmt.Fprintf(&b, "| Jitter p95 | %.2fms |\n", percentile(allJitter, 95)) } b.WriteString("\n") // Per-peer detail. b.WriteString("## Per-Peer Detail\n\n") b.WriteString("| Peer | Connected | ICE ms | Packets | Loss est. | Jitter p95 |\n") b.WriteString("|---|---|---|---|---|---|\n") for _, r := range results { if r.err != "" { fmt.Fprintf(&b, "| %d | ❌ | — | — | — | %s |\n", r.index, r.err) continue } lossEst := 0.0 if r.packetsReceived+r.seqGaps > 0 { lossEst = float64(r.seqGaps) / float64(r.packetsReceived+r.seqGaps) * 100 } jP95 := 0.0 if len(r.jitterSamplesMs) > 0 { jP95 = percentile(r.jitterSamplesMs, 95) } fmt.Fprintf(&b, "| %d | ✓ | %.0f | %d | %.2f%% | %.2fms |\n", r.index, r.iceEstablishMs, r.packetsReceived, lossEst, jP95) } b.WriteString("\n") b.WriteString("---\n") b.WriteString("*Generated by `test/load/sustained.go`. Observe Prometheus metrics during run via Grafana.*\n") return b.String() } func percentile(data []float64, p float64) float64 { if len(data) == 0 { return 0 } sorted := make([]float64, len(data)) copy(sorted, data) sort.Float64s(sorted) idx := (p / 100) * float64(len(sorted)-1) lo := int(idx) hi := lo + 1 if hi >= len(sorted) { return sorted[lo] } frac := idx - float64(lo) return sorted[lo]*(1-frac) + sorted[hi]*frac }