test(webrtc): add Pion WHEP subscriber client + e2e test
Some checks failed
tests / build (push) Failing after 13s
CodeQL / Analyze (pull_request) Failing after 2s
tests / build (pull_request) Failing after 2s

whep-client/main.go: minimal Pion subscriber that POSTs a recvonly
offer, applies the answer, and waits for one RTP packet on each of
the video and audio tracks. Used as M1's end-to-end verifier.

whep-client/main_test.go: in-process e2e wiring — stands up Source,
Registry, PeerFactory and WHEPHandler behind an httptest server,
injects synthetic PT=102/111 RTP on the Source's UDP port and calls
Subscribe. Validates the full egress pipeline without requiring
FFmpeg or external network. Skipped under -short.
This commit is contained in:
Zac Gaetano 2026-04-17 08:52:40 -04:00
parent e471bd02b2
commit 413d0f24b6
2 changed files with 262 additions and 0 deletions

152
test/whep-client/main.go Normal file
View file

@ -0,0 +1,152 @@
// Command whep-client is a minimal Pion-based WHEP subscriber used for
// M1 end-to-end verification. It POSTs a recvonly SDP offer to a WHEP
// endpoint, applies the answer, then reports whether the video and
// audio tracks receive at least one RTP packet before a timeout.
//
// This is a test helper; it is NOT part of the Core binary.
package main
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"time"
"github.com/pion/webrtc/v4"
)
func main() {
var (
whepURL = flag.String("url", "http://127.0.0.1:8787/whep/test", "WHEP endpoint URL")
timeout = flag.Duration("timeout", 10*time.Second, "overall subscribe+receive timeout")
)
flag.Parse()
ctx, cancel := context.WithTimeout(context.Background(), *timeout)
defer cancel()
if err := Subscribe(ctx, *whepURL); err != nil {
log.Fatalf("subscribe failed: %v", err)
}
fmt.Println("OK: received video and audio RTP")
os.Exit(0)
}
// Subscribe performs a full WHEP subscribe against whepURL and returns
// nil when both a video and an audio RTP packet have been observed
// before ctx expires. It is exported so tests can exercise it.
func Subscribe(ctx context.Context, whepURL string) error {
me := &webrtc.MediaEngine{}
if err := me.RegisterDefaultCodecs(); err != nil {
return fmt.Errorf("register codecs: %w", err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
pc, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return fmt.Errorf("new peer connection: %w", err)
}
defer pc.Close()
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
return fmt.Errorf("add video transceiver: %w", err)
}
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
return fmt.Errorf("add audio transceiver: %w", err)
}
videoDone := make(chan struct{})
audioDone := make(chan struct{})
pc.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
kind := t.Kind()
log.Printf("OnTrack: kind=%s codec=%s pt=%d", kind, t.Codec().MimeType, t.PayloadType())
go func() {
buf := make([]byte, 1500)
// One successful ReadRTP is enough to prove egress.
if _, _, err := t.Read(buf); err != nil {
log.Printf("read %s: %v", kind, err)
return
}
switch kind {
case webrtc.RTPCodecTypeVideo:
select {
case <-videoDone:
default:
close(videoDone)
}
case webrtc.RTPCodecTypeAudio:
select {
case <-audioDone:
default:
close(audioDone)
}
}
}()
})
offer, err := pc.CreateOffer(nil)
if err != nil {
return fmt.Errorf("create offer: %w", err)
}
gather := webrtc.GatheringCompletePromise(pc)
if err := pc.SetLocalDescription(offer); err != nil {
return fmt.Errorf("set local: %w", err)
}
select {
case <-gather:
case <-ctx.Done():
return fmt.Errorf("ice gather: %w", ctx.Err())
}
answerSDP, err := postOffer(ctx, whepURL, pc.LocalDescription().SDP)
if err != nil {
return err
}
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: answerSDP,
}); err != nil {
return fmt.Errorf("set remote: %w", err)
}
// Wait for one RTP packet on each track or ctx timeout.
select {
case <-videoDone:
case <-ctx.Done():
return fmt.Errorf("waiting for video: %w", ctx.Err())
}
select {
case <-audioDone:
case <-ctx.Done():
return fmt.Errorf("waiting for audio: %w", ctx.Err())
}
return nil
}
func postOffer(ctx context.Context, url, sdp string) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url,
bytes.NewReader([]byte(sdp)))
if err != nil {
return "", fmt.Errorf("new request: %w", err)
}
req.Header.Set("Content-Type", "application/sdp")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("POST %s: %w", url, err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusCreated {
return "", fmt.Errorf("WHEP %s: %d %s", url, resp.StatusCode, string(body))
}
return string(body), nil
}

View file

@ -0,0 +1,110 @@
package main
import (
"context"
"net"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
coreweb "github.com/datarhei/core/v16/core/webrtc"
"github.com/pion/rtp"
)
// TestSubscribe_EndToEnd stands up an in-process webrtc-poc stack,
// injects synthetic H.264(PT=102) + Opus(PT=111) RTP into the Source's
// UDP port, and asserts Subscribe returns nil within the timeout.
//
// Network-heavy; skipped under -short.
func TestSubscribe_EndToEnd(t *testing.T) {
if testing.Short() {
t.Skip("skipping end-to-end subscribe test in short mode")
}
src, err := coreweb.NewSource("stream-e2e", 0)
if err != nil {
t.Fatalf("NewSource: %v", err)
}
src.Start()
defer src.Close()
reg := coreweb.NewRegistry()
if err := reg.Register("stream-e2e", src); err != nil {
t.Fatalf("Register: %v", err)
}
factory, err := coreweb.NewPeerFactory(coreweb.DefaultConfig())
if err != nil {
t.Fatalf("NewPeerFactory: %v", err)
}
handler := coreweb.NewWHEPHandler(reg, factory, coreweb.DefaultConfig())
ts := httptest.NewServer(http.StripPrefix("", handler))
defer ts.Close()
// Begin injecting RTP into the source.
rtpAddr := src.LocalAddr()
conn, err := net.DialUDP("udp", nil, rtpAddr)
if err != nil {
t.Fatalf("dial udp: %v", err)
}
defer conn.Close()
stop := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
tick := time.NewTicker(20 * time.Millisecond)
defer tick.Stop()
var seq uint16
var ts uint32
for {
select {
case <-stop:
return
case <-tick.C:
// Video packet (PT=102).
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 102,
SequenceNumber: seq,
Timestamp: ts,
SSRC: 0x1234,
},
Payload: []byte{0x00, 0x00, 0x00, 0x01, 0x09, 0x10},
}
if b, err := pkt.Marshal(); err == nil {
_, _ = conn.Write(b)
}
// Audio packet (PT=111).
pkt.PayloadType = 111
pkt.SSRC = 0x5678
pkt.SequenceNumber = seq
if b, err := pkt.Marshal(); err == nil {
_, _ = conn.Write(b)
}
seq++
ts += 3000
}
}
}()
defer func() {
close(stop)
wg.Wait()
}()
// We don't care whether the test client's Subscribe can actually
// decode H.264 — just that it observed *some* RTP on both tracks.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
whepURL := strings.TrimRight(ts.URL, "/") + "/whep/stream-e2e"
if err := Subscribe(ctx, whepURL); err != nil {
t.Fatalf("Subscribe: %v", err)
}
}