386 lines
11 KiB
Go
386 lines
11 KiB
Go
|
|
// Dragon Fork — headless WHEP subscriber load test.
|
||
|
|
//
|
||
|
|
// Drives N concurrent WHEP peers against a single stream for a configurable
|
||
|
|
// duration and produces a markdown report suitable for committing to
|
||
|
|
// test/load/results/.
|
||
|
|
//
|
||
|
|
// Usage:
|
||
|
|
//
|
||
|
|
// go run ./test/load/sustained.go \
|
||
|
|
// -url http://10.0.0.25:8080 \
|
||
|
|
// -stream mystream \
|
||
|
|
// -peers 5 \
|
||
|
|
// -duration 10m \
|
||
|
|
// -auth "Bearer <TOKEN>" \
|
||
|
|
// -out test/load/results/
|
||
|
|
//
|
||
|
|
// The program exits 0 on success or 1 if any peer failed to connect.
|
||
|
|
// Run the Grafana dashboard alongside to observe Prometheus metrics
|
||
|
|
// during the test (see deploy/truenas/core/docker-compose.yml).
|
||
|
|
|
||
|
|
//go:build ignore
|
||
|
|
|
||
|
|
package main
|
||
|
|
|
||
|
|
import (
|
||
|
|
"bytes"
|
||
|
|
"context"
|
||
|
|
"flag"
|
||
|
|
"fmt"
|
||
|
|
"io"
|
||
|
|
"math"
|
||
|
|
"net/http"
|
||
|
|
"os"
|
||
|
|
"path/filepath"
|
||
|
|
"sort"
|
||
|
|
"strings"
|
||
|
|
"sync"
|
||
|
|
"sync/atomic"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/pion/webrtc/v4"
|
||
|
|
)
|
||
|
|
|
||
|
|
// peerResult holds per-peer stats collected over the run.
|
||
|
|
type peerResult struct {
|
||
|
|
index int
|
||
|
|
connected bool
|
||
|
|
iceEstablishMs float64
|
||
|
|
packetsReceived uint64
|
||
|
|
seqGaps uint64 // proxy for packet loss
|
||
|
|
jitterSamplesMs []float64
|
||
|
|
disconnectedAt time.Time
|
||
|
|
err string
|
||
|
|
}
|
||
|
|
|
||
|
|
func main() {
|
||
|
|
coreURL := flag.String("url", "http://localhost:8080", "Dragon Fork Core base URL")
|
||
|
|
streamID := flag.String("stream", "", "Process ID with webrtc.enabled=true")
|
||
|
|
nPeers := flag.Int("peers", 5, "Number of concurrent WHEP subscribers")
|
||
|
|
duration := flag.Duration("duration", 10*time.Minute, "Test duration")
|
||
|
|
auth := flag.String("auth", "", "Authorization header value (e.g. 'Bearer TOKEN')")
|
||
|
|
outDir := flag.String("out", "test/load/results", "Output directory for markdown report")
|
||
|
|
flag.Parse()
|
||
|
|
|
||
|
|
if *streamID == "" {
|
||
|
|
fmt.Fprintln(os.Stderr, "error: -stream is required")
|
||
|
|
os.Exit(1)
|
||
|
|
}
|
||
|
|
|
||
|
|
fmt.Printf("Dragon Fork WHEP load test\n")
|
||
|
|
fmt.Printf(" target: %s\n", *coreURL)
|
||
|
|
fmt.Printf(" stream: %s\n", *streamID)
|
||
|
|
fmt.Printf(" peers: %d\n", *nPeers)
|
||
|
|
fmt.Printf(" duration: %s\n", *duration)
|
||
|
|
fmt.Println()
|
||
|
|
|
||
|
|
ctx, cancel := context.WithTimeout(context.Background(), *duration+30*time.Second)
|
||
|
|
defer cancel()
|
||
|
|
|
||
|
|
results := make([]peerResult, *nPeers)
|
||
|
|
var wg sync.WaitGroup
|
||
|
|
var anyFail int32
|
||
|
|
|
||
|
|
for i := 0; i < *nPeers; i++ {
|
||
|
|
wg.Add(1)
|
||
|
|
go func(idx int) {
|
||
|
|
defer wg.Done()
|
||
|
|
res := runPeer(ctx, idx, *coreURL, *streamID, *auth, *duration)
|
||
|
|
results[idx] = res
|
||
|
|
if !res.connected {
|
||
|
|
atomic.StoreInt32(&anyFail, 1)
|
||
|
|
}
|
||
|
|
}(i)
|
||
|
|
// Stagger connection attempts 200ms apart to avoid thundering herd.
|
||
|
|
time.Sleep(200 * time.Millisecond)
|
||
|
|
}
|
||
|
|
|
||
|
|
wg.Wait()
|
||
|
|
|
||
|
|
report := buildReport(*coreURL, *streamID, *nPeers, *duration, results)
|
||
|
|
fmt.Print(report)
|
||
|
|
|
||
|
|
if err := os.MkdirAll(*outDir, 0o755); err != nil {
|
||
|
|
fmt.Fprintf(os.Stderr, "mkdir %s: %v\n", *outDir, err)
|
||
|
|
} else {
|
||
|
|
ts := time.Now().UTC().Format("2006-01-02T150405Z")
|
||
|
|
fname := filepath.Join(*outDir, fmt.Sprintf("%s-%s-%dp.md", ts, *streamID, *nPeers))
|
||
|
|
if err := os.WriteFile(fname, []byte(report), 0o644); err != nil {
|
||
|
|
fmt.Fprintf(os.Stderr, "write report: %v\n", err)
|
||
|
|
} else {
|
||
|
|
fmt.Printf("\nReport written to %s\n", fname)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if anyFail != 0 {
|
||
|
|
os.Exit(1)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// runPeer connects one WHEP subscriber, reads packets for the test duration,
|
||
|
|
// and collects statistics.
|
||
|
|
func runPeer(ctx context.Context, idx int, coreURL, streamID, auth string, dur time.Duration) peerResult {
|
||
|
|
res := peerResult{index: idx}
|
||
|
|
|
||
|
|
// Build a minimal SDP offer using Pion.
|
||
|
|
me := &webrtc.MediaEngine{}
|
||
|
|
if err := me.RegisterDefaultCodecs(); err != nil {
|
||
|
|
res.err = fmt.Sprintf("media engine: %v", err)
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
|
||
|
|
cfg := webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
|
||
|
|
pc, err := api.NewPeerConnection(cfg)
|
||
|
|
if err != nil {
|
||
|
|
res.err = fmt.Sprintf("new peer connection: %v", err)
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
defer pc.Close()
|
||
|
|
|
||
|
|
// Add receive-only transceivers so the SDP offer contains the right m= lines.
|
||
|
|
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
||
|
|
res.err = fmt.Sprintf("add video transceiver: %v", err)
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
||
|
|
res.err = fmt.Sprintf("add audio transceiver: %v", err)
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
|
||
|
|
offer, err := pc.CreateOffer(nil)
|
||
|
|
if err != nil {
|
||
|
|
res.err = fmt.Sprintf("create offer: %v", err)
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
if err := pc.SetLocalDescription(offer); err != nil {
|
||
|
|
res.err = fmt.Sprintf("set local desc: %v", err)
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
|
||
|
|
// POST the offer to the WHEP endpoint.
|
||
|
|
whepURL := strings.TrimRight(coreURL, "/") + "/api/v3/whep/" + streamID
|
||
|
|
t0 := time.Now()
|
||
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, whepURL, bytes.NewReader([]byte(offer.SDP)))
|
||
|
|
if err != nil {
|
||
|
|
res.err = fmt.Sprintf("build http request: %v", err)
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
httpReq.Header.Set("Content-Type", "application/sdp")
|
||
|
|
if auth != "" {
|
||
|
|
httpReq.Header.Set("Authorization", auth)
|
||
|
|
}
|
||
|
|
|
||
|
|
resp, err := http.DefaultClient.Do(httpReq)
|
||
|
|
if err != nil {
|
||
|
|
res.err = fmt.Sprintf("POST /whep: %v", err)
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
defer resp.Body.Close()
|
||
|
|
|
||
|
|
if resp.StatusCode != http.StatusCreated {
|
||
|
|
body, _ := io.ReadAll(resp.Body)
|
||
|
|
res.err = fmt.Sprintf("WHEP POST returned %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
|
||
|
|
answerSDP, err := io.ReadAll(resp.Body)
|
||
|
|
if err != nil {
|
||
|
|
res.err = fmt.Sprintf("read answer: %v", err)
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
|
||
|
|
answer := webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: string(answerSDP)}
|
||
|
|
if err := pc.SetRemoteDescription(answer); err != nil {
|
||
|
|
res.err = fmt.Sprintf("set remote desc: %v", err)
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
|
||
|
|
// Wait for ICE connected.
|
||
|
|
connCh := make(chan struct{})
|
||
|
|
var connOnce sync.Once
|
||
|
|
pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
|
||
|
|
if s == webrtc.PeerConnectionStateConnected {
|
||
|
|
connOnce.Do(func() { close(connCh) })
|
||
|
|
}
|
||
|
|
})
|
||
|
|
|
||
|
|
select {
|
||
|
|
case <-connCh:
|
||
|
|
res.iceEstablishMs = float64(time.Since(t0).Milliseconds())
|
||
|
|
res.connected = true
|
||
|
|
case <-time.After(15 * time.Second):
|
||
|
|
res.err = "ICE connection timeout (15s)"
|
||
|
|
return res
|
||
|
|
case <-ctx.Done():
|
||
|
|
res.err = "context cancelled before ICE connected"
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
|
||
|
|
fmt.Printf(" peer %d connected (ICE: %.0fms)\n", idx, res.iceEstablishMs)
|
||
|
|
|
||
|
|
// Collect RTP statistics for the test duration.
|
||
|
|
var mu sync.Mutex
|
||
|
|
var lastSeq uint16
|
||
|
|
var seenFirst bool
|
||
|
|
|
||
|
|
pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
|
||
|
|
prevArrival := time.Now()
|
||
|
|
var prevRTPTimestamp uint32
|
||
|
|
var jitter float64
|
||
|
|
clockRate := float64(track.Codec().ClockRate)
|
||
|
|
|
||
|
|
buf := make([]byte, 1500)
|
||
|
|
for {
|
||
|
|
n, _, err := track.Read(buf)
|
||
|
|
if err != nil || n < 12 {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
arrivalTime := time.Now()
|
||
|
|
|
||
|
|
// Sequence number gap tracking.
|
||
|
|
seq := uint16(buf[2])<<8 | uint16(buf[3])
|
||
|
|
mu.Lock()
|
||
|
|
atomic.AddUint64(&res.packetsReceived, 1)
|
||
|
|
if seenFirst {
|
||
|
|
expected := lastSeq + 1
|
||
|
|
if seq != expected {
|
||
|
|
gaps := uint64(seq - expected)
|
||
|
|
if gaps < 1000 {
|
||
|
|
atomic.AddUint64(&res.seqGaps, gaps)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
seenFirst = true
|
||
|
|
}
|
||
|
|
lastSeq = seq
|
||
|
|
mu.Unlock()
|
||
|
|
|
||
|
|
// RFC 3550 jitter (simplified: interarrival time deviation).
|
||
|
|
rtpTS := uint32(buf[4])<<24 | uint32(buf[5])<<16 | uint32(buf[6])<<8 | uint32(buf[7])
|
||
|
|
if prevRTPTimestamp != 0 {
|
||
|
|
sendDiff := float64(int32(rtpTS-prevRTPTimestamp)) / clockRate
|
||
|
|
recvDiff := arrivalTime.Sub(prevArrival).Seconds()
|
||
|
|
d := math.Abs(recvDiff - sendDiff)
|
||
|
|
jitter += (d - jitter) / 16 // running average
|
||
|
|
mu.Lock()
|
||
|
|
res.jitterSamplesMs = append(res.jitterSamplesMs, jitter*1000)
|
||
|
|
mu.Unlock()
|
||
|
|
}
|
||
|
|
prevRTPTimestamp = rtpTS
|
||
|
|
prevArrival = arrivalTime
|
||
|
|
}
|
||
|
|
})
|
||
|
|
|
||
|
|
// Wait for test duration or context cancellation.
|
||
|
|
testTimer := time.NewTimer(dur)
|
||
|
|
defer testTimer.Stop()
|
||
|
|
select {
|
||
|
|
case <-testTimer.C:
|
||
|
|
case <-ctx.Done():
|
||
|
|
res.disconnectedAt = time.Now()
|
||
|
|
}
|
||
|
|
|
||
|
|
// DELETE the WHEP resource.
|
||
|
|
location := resp.Header.Get("Location")
|
||
|
|
if location != "" {
|
||
|
|
delURL := strings.TrimRight(coreURL, "/") + location
|
||
|
|
delReq, _ := http.NewRequest(http.MethodDelete, delURL, nil)
|
||
|
|
if auth != "" {
|
||
|
|
delReq.Header.Set("Authorization", auth)
|
||
|
|
}
|
||
|
|
_, _ = http.DefaultClient.Do(delReq)
|
||
|
|
}
|
||
|
|
|
||
|
|
return res
|
||
|
|
}
|
||
|
|
|
||
|
|
func buildReport(coreURL, streamID string, nPeers int, dur time.Duration, results []peerResult) string {
|
||
|
|
var b strings.Builder
|
||
|
|
ts := time.Now().UTC().Format(time.RFC3339)
|
||
|
|
|
||
|
|
b.WriteString("# Dragon Fork WHEP Sustained Load Test\n\n")
|
||
|
|
fmt.Fprintf(&b, "**Date:** %s \n", ts)
|
||
|
|
fmt.Fprintf(&b, "**Target:** %s \n", coreURL)
|
||
|
|
fmt.Fprintf(&b, "**Stream:** %s \n", streamID)
|
||
|
|
fmt.Fprintf(&b, "**Peers:** %d \n", nPeers)
|
||
|
|
fmt.Fprintf(&b, "**Duration:** %s \n\n", dur)
|
||
|
|
|
||
|
|
// Summary table.
|
||
|
|
connected := 0
|
||
|
|
var iceMs []float64
|
||
|
|
var allJitter []float64
|
||
|
|
var totalPkts, totalGaps uint64
|
||
|
|
for _, r := range results {
|
||
|
|
if r.connected {
|
||
|
|
connected++
|
||
|
|
iceMs = append(iceMs, r.iceEstablishMs)
|
||
|
|
allJitter = append(allJitter, r.jitterSamplesMs...)
|
||
|
|
totalPkts += r.packetsReceived
|
||
|
|
totalGaps += r.seqGaps
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
b.WriteString("## Summary\n\n")
|
||
|
|
fmt.Fprintf(&b, "| Metric | Value |\n|---|---|\n")
|
||
|
|
fmt.Fprintf(&b, "| Peers connected | %d / %d |\n", connected, nPeers)
|
||
|
|
fmt.Fprintf(&b, "| Total packets received | %d |\n", totalPkts)
|
||
|
|
lossRate := 0.0
|
||
|
|
if totalPkts+totalGaps > 0 {
|
||
|
|
lossRate = float64(totalGaps) / float64(totalPkts+totalGaps) * 100
|
||
|
|
}
|
||
|
|
fmt.Fprintf(&b, "| Packet loss estimate | %.2f%% |\n", lossRate)
|
||
|
|
if len(iceMs) > 0 {
|
||
|
|
fmt.Fprintf(&b, "| ICE establishment p50 | %.0fms |\n", percentile(iceMs, 50))
|
||
|
|
fmt.Fprintf(&b, "| ICE establishment p95 | %.0fms |\n", percentile(iceMs, 95))
|
||
|
|
}
|
||
|
|
if len(allJitter) > 0 {
|
||
|
|
fmt.Fprintf(&b, "| Jitter p50 | %.2fms |\n", percentile(allJitter, 50))
|
||
|
|
fmt.Fprintf(&b, "| Jitter p95 | %.2fms |\n", percentile(allJitter, 95))
|
||
|
|
}
|
||
|
|
b.WriteString("\n")
|
||
|
|
|
||
|
|
// Per-peer detail.
|
||
|
|
b.WriteString("## Per-Peer Detail\n\n")
|
||
|
|
b.WriteString("| Peer | Connected | ICE ms | Packets | Loss est. | Jitter p95 |\n")
|
||
|
|
b.WriteString("|---|---|---|---|---|---|\n")
|
||
|
|
for _, r := range results {
|
||
|
|
if r.err != "" {
|
||
|
|
fmt.Fprintf(&b, "| %d | ❌ | — | — | — | %s |\n", r.index, r.err)
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
lossEst := 0.0
|
||
|
|
if r.packetsReceived+r.seqGaps > 0 {
|
||
|
|
lossEst = float64(r.seqGaps) / float64(r.packetsReceived+r.seqGaps) * 100
|
||
|
|
}
|
||
|
|
jP95 := 0.0
|
||
|
|
if len(r.jitterSamplesMs) > 0 {
|
||
|
|
jP95 = percentile(r.jitterSamplesMs, 95)
|
||
|
|
}
|
||
|
|
fmt.Fprintf(&b, "| %d | ✓ | %.0f | %d | %.2f%% | %.2fms |\n",
|
||
|
|
r.index, r.iceEstablishMs, r.packetsReceived, lossEst, jP95)
|
||
|
|
}
|
||
|
|
b.WriteString("\n")
|
||
|
|
|
||
|
|
b.WriteString("---\n")
|
||
|
|
b.WriteString("*Generated by `test/load/sustained.go`. Observe Prometheus metrics during run via Grafana.*\n")
|
||
|
|
|
||
|
|
return b.String()
|
||
|
|
}
|
||
|
|
|
||
|
|
func percentile(data []float64, p float64) float64 {
|
||
|
|
if len(data) == 0 {
|
||
|
|
return 0
|
||
|
|
}
|
||
|
|
sorted := make([]float64, len(data))
|
||
|
|
copy(sorted, data)
|
||
|
|
sort.Float64s(sorted)
|
||
|
|
idx := (p / 100) * float64(len(sorted)-1)
|
||
|
|
lo := int(idx)
|
||
|
|
hi := lo + 1
|
||
|
|
if hi >= len(sorted) {
|
||
|
|
return sorted[lo]
|
||
|
|
}
|
||
|
|
frac := idx - float64(lo)
|
||
|
|
return sorted[lo]*(1-frac) + sorted[hi]*frac
|
||
|
|
}
|