feat(test): add 5-peer sustained WHEP load test (closes #14)
This commit is contained in:
parent
60f64fe76b
commit
561a93e044
1 changed files with 385 additions and 0 deletions
385
test/load/sustained.go
Normal file
385
test/load/sustained.go
Normal file
|
|
@ -0,0 +1,385 @@
|
|||
// Dragon Fork — headless WHEP subscriber load test.
|
||||
//
|
||||
// Drives N concurrent WHEP peers against a single stream for a configurable
|
||||
// duration and produces a markdown report suitable for committing to
|
||||
// test/load/results/.
|
||||
//
|
||||
// Usage:
|
||||
//
|
||||
// go run ./test/load/sustained.go \
|
||||
// -url http://10.0.0.25:8080 \
|
||||
// -stream mystream \
|
||||
// -peers 5 \
|
||||
// -duration 10m \
|
||||
// -auth "Bearer <TOKEN>" \
|
||||
// -out test/load/results/
|
||||
//
|
||||
// The program exits 0 on success or 1 if any peer failed to connect.
|
||||
// Run the Grafana dashboard alongside to observe Prometheus metrics
|
||||
// during the test (see deploy/truenas/core/docker-compose.yml).
|
||||
|
||||
//go:build ignore
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
// peerResult holds per-peer stats collected over the run.
|
||||
type peerResult struct {
|
||||
index int
|
||||
connected bool
|
||||
iceEstablishMs float64
|
||||
packetsReceived uint64
|
||||
seqGaps uint64 // proxy for packet loss
|
||||
jitterSamplesMs []float64
|
||||
disconnectedAt time.Time
|
||||
err string
|
||||
}
|
||||
|
||||
func main() {
|
||||
coreURL := flag.String("url", "http://localhost:8080", "Dragon Fork Core base URL")
|
||||
streamID := flag.String("stream", "", "Process ID with webrtc.enabled=true")
|
||||
nPeers := flag.Int("peers", 5, "Number of concurrent WHEP subscribers")
|
||||
duration := flag.Duration("duration", 10*time.Minute, "Test duration")
|
||||
auth := flag.String("auth", "", "Authorization header value (e.g. 'Bearer TOKEN')")
|
||||
outDir := flag.String("out", "test/load/results", "Output directory for markdown report")
|
||||
flag.Parse()
|
||||
|
||||
if *streamID == "" {
|
||||
fmt.Fprintln(os.Stderr, "error: -stream is required")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Printf("Dragon Fork WHEP load test\n")
|
||||
fmt.Printf(" target: %s\n", *coreURL)
|
||||
fmt.Printf(" stream: %s\n", *streamID)
|
||||
fmt.Printf(" peers: %d\n", *nPeers)
|
||||
fmt.Printf(" duration: %s\n", *duration)
|
||||
fmt.Println()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), *duration+30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
results := make([]peerResult, *nPeers)
|
||||
var wg sync.WaitGroup
|
||||
var anyFail int32
|
||||
|
||||
for i := 0; i < *nPeers; i++ {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
res := runPeer(ctx, idx, *coreURL, *streamID, *auth, *duration)
|
||||
results[idx] = res
|
||||
if !res.connected {
|
||||
atomic.StoreInt32(&anyFail, 1)
|
||||
}
|
||||
}(i)
|
||||
// Stagger connection attempts 200ms apart to avoid thundering herd.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
report := buildReport(*coreURL, *streamID, *nPeers, *duration, results)
|
||||
fmt.Print(report)
|
||||
|
||||
if err := os.MkdirAll(*outDir, 0o755); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "mkdir %s: %v\n", *outDir, err)
|
||||
} else {
|
||||
ts := time.Now().UTC().Format("2006-01-02T150405Z")
|
||||
fname := filepath.Join(*outDir, fmt.Sprintf("%s-%s-%dp.md", ts, *streamID, *nPeers))
|
||||
if err := os.WriteFile(fname, []byte(report), 0o644); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "write report: %v\n", err)
|
||||
} else {
|
||||
fmt.Printf("\nReport written to %s\n", fname)
|
||||
}
|
||||
}
|
||||
|
||||
if anyFail != 0 {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// runPeer connects one WHEP subscriber, reads packets for the test duration,
|
||||
// and collects statistics.
|
||||
func runPeer(ctx context.Context, idx int, coreURL, streamID, auth string, dur time.Duration) peerResult {
|
||||
res := peerResult{index: idx}
|
||||
|
||||
// Build a minimal SDP offer using Pion.
|
||||
me := &webrtc.MediaEngine{}
|
||||
if err := me.RegisterDefaultCodecs(); err != nil {
|
||||
res.err = fmt.Sprintf("media engine: %v", err)
|
||||
return res
|
||||
}
|
||||
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
|
||||
cfg := webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
|
||||
pc, err := api.NewPeerConnection(cfg)
|
||||
if err != nil {
|
||||
res.err = fmt.Sprintf("new peer connection: %v", err)
|
||||
return res
|
||||
}
|
||||
defer pc.Close()
|
||||
|
||||
// Add receive-only transceivers so the SDP offer contains the right m= lines.
|
||||
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
||||
res.err = fmt.Sprintf("add video transceiver: %v", err)
|
||||
return res
|
||||
}
|
||||
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
||||
res.err = fmt.Sprintf("add audio transceiver: %v", err)
|
||||
return res
|
||||
}
|
||||
|
||||
offer, err := pc.CreateOffer(nil)
|
||||
if err != nil {
|
||||
res.err = fmt.Sprintf("create offer: %v", err)
|
||||
return res
|
||||
}
|
||||
if err := pc.SetLocalDescription(offer); err != nil {
|
||||
res.err = fmt.Sprintf("set local desc: %v", err)
|
||||
return res
|
||||
}
|
||||
|
||||
// POST the offer to the WHEP endpoint.
|
||||
whepURL := strings.TrimRight(coreURL, "/") + "/api/v3/whep/" + streamID
|
||||
t0 := time.Now()
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, whepURL, bytes.NewReader([]byte(offer.SDP)))
|
||||
if err != nil {
|
||||
res.err = fmt.Sprintf("build http request: %v", err)
|
||||
return res
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/sdp")
|
||||
if auth != "" {
|
||||
httpReq.Header.Set("Authorization", auth)
|
||||
}
|
||||
|
||||
resp, err := http.DefaultClient.Do(httpReq)
|
||||
if err != nil {
|
||||
res.err = fmt.Sprintf("POST /whep: %v", err)
|
||||
return res
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusCreated {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
res.err = fmt.Sprintf("WHEP POST returned %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
|
||||
return res
|
||||
}
|
||||
|
||||
answerSDP, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
res.err = fmt.Sprintf("read answer: %v", err)
|
||||
return res
|
||||
}
|
||||
|
||||
answer := webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: string(answerSDP)}
|
||||
if err := pc.SetRemoteDescription(answer); err != nil {
|
||||
res.err = fmt.Sprintf("set remote desc: %v", err)
|
||||
return res
|
||||
}
|
||||
|
||||
// Wait for ICE connected.
|
||||
connCh := make(chan struct{})
|
||||
var connOnce sync.Once
|
||||
pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
|
||||
if s == webrtc.PeerConnectionStateConnected {
|
||||
connOnce.Do(func() { close(connCh) })
|
||||
}
|
||||
})
|
||||
|
||||
select {
|
||||
case <-connCh:
|
||||
res.iceEstablishMs = float64(time.Since(t0).Milliseconds())
|
||||
res.connected = true
|
||||
case <-time.After(15 * time.Second):
|
||||
res.err = "ICE connection timeout (15s)"
|
||||
return res
|
||||
case <-ctx.Done():
|
||||
res.err = "context cancelled before ICE connected"
|
||||
return res
|
||||
}
|
||||
|
||||
fmt.Printf(" peer %d connected (ICE: %.0fms)\n", idx, res.iceEstablishMs)
|
||||
|
||||
// Collect RTP statistics for the test duration.
|
||||
var mu sync.Mutex
|
||||
var lastSeq uint16
|
||||
var seenFirst bool
|
||||
|
||||
pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
|
||||
prevArrival := time.Now()
|
||||
var prevRTPTimestamp uint32
|
||||
var jitter float64
|
||||
clockRate := float64(track.Codec().ClockRate)
|
||||
|
||||
buf := make([]byte, 1500)
|
||||
for {
|
||||
n, _, err := track.Read(buf)
|
||||
if err != nil || n < 12 {
|
||||
return
|
||||
}
|
||||
arrivalTime := time.Now()
|
||||
|
||||
// Sequence number gap tracking.
|
||||
seq := uint16(buf[2])<<8 | uint16(buf[3])
|
||||
mu.Lock()
|
||||
atomic.AddUint64(&res.packetsReceived, 1)
|
||||
if seenFirst {
|
||||
expected := lastSeq + 1
|
||||
if seq != expected {
|
||||
gaps := uint64(seq - expected)
|
||||
if gaps < 1000 {
|
||||
atomic.AddUint64(&res.seqGaps, gaps)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
seenFirst = true
|
||||
}
|
||||
lastSeq = seq
|
||||
mu.Unlock()
|
||||
|
||||
// RFC 3550 jitter (simplified: interarrival time deviation).
|
||||
rtpTS := uint32(buf[4])<<24 | uint32(buf[5])<<16 | uint32(buf[6])<<8 | uint32(buf[7])
|
||||
if prevRTPTimestamp != 0 {
|
||||
sendDiff := float64(int32(rtpTS-prevRTPTimestamp)) / clockRate
|
||||
recvDiff := arrivalTime.Sub(prevArrival).Seconds()
|
||||
d := math.Abs(recvDiff - sendDiff)
|
||||
jitter += (d - jitter) / 16 // running average
|
||||
mu.Lock()
|
||||
res.jitterSamplesMs = append(res.jitterSamplesMs, jitter*1000)
|
||||
mu.Unlock()
|
||||
}
|
||||
prevRTPTimestamp = rtpTS
|
||||
prevArrival = arrivalTime
|
||||
}
|
||||
})
|
||||
|
||||
// Wait for test duration or context cancellation.
|
||||
testTimer := time.NewTimer(dur)
|
||||
defer testTimer.Stop()
|
||||
select {
|
||||
case <-testTimer.C:
|
||||
case <-ctx.Done():
|
||||
res.disconnectedAt = time.Now()
|
||||
}
|
||||
|
||||
// DELETE the WHEP resource.
|
||||
location := resp.Header.Get("Location")
|
||||
if location != "" {
|
||||
delURL := strings.TrimRight(coreURL, "/") + location
|
||||
delReq, _ := http.NewRequest(http.MethodDelete, delURL, nil)
|
||||
if auth != "" {
|
||||
delReq.Header.Set("Authorization", auth)
|
||||
}
|
||||
_, _ = http.DefaultClient.Do(delReq)
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func buildReport(coreURL, streamID string, nPeers int, dur time.Duration, results []peerResult) string {
|
||||
var b strings.Builder
|
||||
ts := time.Now().UTC().Format(time.RFC3339)
|
||||
|
||||
b.WriteString("# Dragon Fork WHEP Sustained Load Test\n\n")
|
||||
fmt.Fprintf(&b, "**Date:** %s \n", ts)
|
||||
fmt.Fprintf(&b, "**Target:** %s \n", coreURL)
|
||||
fmt.Fprintf(&b, "**Stream:** %s \n", streamID)
|
||||
fmt.Fprintf(&b, "**Peers:** %d \n", nPeers)
|
||||
fmt.Fprintf(&b, "**Duration:** %s \n\n", dur)
|
||||
|
||||
// Summary table.
|
||||
connected := 0
|
||||
var iceMs []float64
|
||||
var allJitter []float64
|
||||
var totalPkts, totalGaps uint64
|
||||
for _, r := range results {
|
||||
if r.connected {
|
||||
connected++
|
||||
iceMs = append(iceMs, r.iceEstablishMs)
|
||||
allJitter = append(allJitter, r.jitterSamplesMs...)
|
||||
totalPkts += r.packetsReceived
|
||||
totalGaps += r.seqGaps
|
||||
}
|
||||
}
|
||||
|
||||
b.WriteString("## Summary\n\n")
|
||||
fmt.Fprintf(&b, "| Metric | Value |\n|---|---|\n")
|
||||
fmt.Fprintf(&b, "| Peers connected | %d / %d |\n", connected, nPeers)
|
||||
fmt.Fprintf(&b, "| Total packets received | %d |\n", totalPkts)
|
||||
lossRate := 0.0
|
||||
if totalPkts+totalGaps > 0 {
|
||||
lossRate = float64(totalGaps) / float64(totalPkts+totalGaps) * 100
|
||||
}
|
||||
fmt.Fprintf(&b, "| Packet loss estimate | %.2f%% |\n", lossRate)
|
||||
if len(iceMs) > 0 {
|
||||
fmt.Fprintf(&b, "| ICE establishment p50 | %.0fms |\n", percentile(iceMs, 50))
|
||||
fmt.Fprintf(&b, "| ICE establishment p95 | %.0fms |\n", percentile(iceMs, 95))
|
||||
}
|
||||
if len(allJitter) > 0 {
|
||||
fmt.Fprintf(&b, "| Jitter p50 | %.2fms |\n", percentile(allJitter, 50))
|
||||
fmt.Fprintf(&b, "| Jitter p95 | %.2fms |\n", percentile(allJitter, 95))
|
||||
}
|
||||
b.WriteString("\n")
|
||||
|
||||
// Per-peer detail.
|
||||
b.WriteString("## Per-Peer Detail\n\n")
|
||||
b.WriteString("| Peer | Connected | ICE ms | Packets | Loss est. | Jitter p95 |\n")
|
||||
b.WriteString("|---|---|---|---|---|---|\n")
|
||||
for _, r := range results {
|
||||
if r.err != "" {
|
||||
fmt.Fprintf(&b, "| %d | ❌ | — | — | — | %s |\n", r.index, r.err)
|
||||
continue
|
||||
}
|
||||
lossEst := 0.0
|
||||
if r.packetsReceived+r.seqGaps > 0 {
|
||||
lossEst = float64(r.seqGaps) / float64(r.packetsReceived+r.seqGaps) * 100
|
||||
}
|
||||
jP95 := 0.0
|
||||
if len(r.jitterSamplesMs) > 0 {
|
||||
jP95 = percentile(r.jitterSamplesMs, 95)
|
||||
}
|
||||
fmt.Fprintf(&b, "| %d | ✓ | %.0f | %d | %.2f%% | %.2fms |\n",
|
||||
r.index, r.iceEstablishMs, r.packetsReceived, lossEst, jP95)
|
||||
}
|
||||
b.WriteString("\n")
|
||||
|
||||
b.WriteString("---\n")
|
||||
b.WriteString("*Generated by `test/load/sustained.go`. Observe Prometheus metrics during run via Grafana.*\n")
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func percentile(data []float64, p float64) float64 {
|
||||
if len(data) == 0 {
|
||||
return 0
|
||||
}
|
||||
sorted := make([]float64, len(data))
|
||||
copy(sorted, data)
|
||||
sort.Float64s(sorted)
|
||||
idx := (p / 100) * float64(len(sorted)-1)
|
||||
lo := int(idx)
|
||||
hi := lo + 1
|
||||
if hi >= len(sorted) {
|
||||
return sorted[lo]
|
||||
}
|
||||
frac := idx - float64(lo)
|
||||
return sorted[lo]*(1-frac) + sorted[hi]*frac
|
||||
}
|
||||
Loading…
Reference in a new issue