From 413d0f24b63eb509e106e1026414270864e89a4a Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Fri, 17 Apr 2026 08:52:40 -0400 Subject: [PATCH] test(webrtc): add Pion WHEP subscriber client + e2e test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit whep-client/main.go: minimal Pion subscriber that POSTs a recvonly offer, applies the answer, and waits for one RTP packet on each of the video and audio tracks. Used as M1's end-to-end verifier. whep-client/main_test.go: in-process e2e wiring — stands up Source, Registry, PeerFactory and WHEPHandler behind an httptest server, injects synthetic PT=102/111 RTP on the Source's UDP port and calls Subscribe. Validates the full egress pipeline without requiring FFmpeg or external network. Skipped under -short. --- test/whep-client/main.go | 152 ++++++++++++++++++++++++++++++++++ test/whep-client/main_test.go | 110 ++++++++++++++++++++++++ 2 files changed, 262 insertions(+) create mode 100644 test/whep-client/main.go create mode 100644 test/whep-client/main_test.go diff --git a/test/whep-client/main.go b/test/whep-client/main.go new file mode 100644 index 0000000..8098aac --- /dev/null +++ b/test/whep-client/main.go @@ -0,0 +1,152 @@ +// Command whep-client is a minimal Pion-based WHEP subscriber used for +// M1 end-to-end verification. It POSTs a recvonly SDP offer to a WHEP +// endpoint, applies the answer, then reports whether the video and +// audio tracks receive at least one RTP packet before a timeout. +// +// This is a test helper; it is NOT part of the Core binary. +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "time" + + "github.com/pion/webrtc/v4" +) + +func main() { + var ( + whepURL = flag.String("url", "http://127.0.0.1:8787/whep/test", "WHEP endpoint URL") + timeout = flag.Duration("timeout", 10*time.Second, "overall subscribe+receive timeout") + ) + flag.Parse() + + ctx, cancel := context.WithTimeout(context.Background(), *timeout) + defer cancel() + + if err := Subscribe(ctx, *whepURL); err != nil { + log.Fatalf("subscribe failed: %v", err) + } + fmt.Println("OK: received video and audio RTP") + os.Exit(0) +} + +// Subscribe performs a full WHEP subscribe against whepURL and returns +// nil when both a video and an audio RTP packet have been observed +// before ctx expires. It is exported so tests can exercise it. +func Subscribe(ctx context.Context, whepURL string) error { + me := &webrtc.MediaEngine{} + if err := me.RegisterDefaultCodecs(); err != nil { + return fmt.Errorf("register codecs: %w", err) + } + api := webrtc.NewAPI(webrtc.WithMediaEngine(me)) + + pc, err := api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + return fmt.Errorf("new peer connection: %w", err) + } + defer pc.Close() + + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + return fmt.Errorf("add video transceiver: %w", err) + } + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + return fmt.Errorf("add audio transceiver: %w", err) + } + + videoDone := make(chan struct{}) + audioDone := make(chan struct{}) + pc.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { + kind := t.Kind() + log.Printf("OnTrack: kind=%s codec=%s pt=%d", kind, t.Codec().MimeType, t.PayloadType()) + go func() { + buf := make([]byte, 1500) + // One successful ReadRTP is enough to prove egress. + if _, _, err := t.Read(buf); err != nil { + log.Printf("read %s: %v", kind, err) + return + } + switch kind { + case webrtc.RTPCodecTypeVideo: + select { + case <-videoDone: + default: + close(videoDone) + } + case webrtc.RTPCodecTypeAudio: + select { + case <-audioDone: + default: + close(audioDone) + } + } + }() + }) + + offer, err := pc.CreateOffer(nil) + if err != nil { + return fmt.Errorf("create offer: %w", err) + } + gather := webrtc.GatheringCompletePromise(pc) + if err := pc.SetLocalDescription(offer); err != nil { + return fmt.Errorf("set local: %w", err) + } + select { + case <-gather: + case <-ctx.Done(): + return fmt.Errorf("ice gather: %w", ctx.Err()) + } + + answerSDP, err := postOffer(ctx, whepURL, pc.LocalDescription().SDP) + if err != nil { + return err + } + if err := pc.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: answerSDP, + }); err != nil { + return fmt.Errorf("set remote: %w", err) + } + + // Wait for one RTP packet on each track or ctx timeout. + select { + case <-videoDone: + case <-ctx.Done(): + return fmt.Errorf("waiting for video: %w", ctx.Err()) + } + select { + case <-audioDone: + case <-ctx.Done(): + return fmt.Errorf("waiting for audio: %w", ctx.Err()) + } + return nil +} + +func postOffer(ctx context.Context, url, sdp string) (string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, + bytes.NewReader([]byte(sdp))) + if err != nil { + return "", fmt.Errorf("new request: %w", err) + } + req.Header.Set("Content-Type", "application/sdp") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", fmt.Errorf("POST %s: %w", url, err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusCreated { + return "", fmt.Errorf("WHEP %s: %d %s", url, resp.StatusCode, string(body)) + } + return string(body), nil +} diff --git a/test/whep-client/main_test.go b/test/whep-client/main_test.go new file mode 100644 index 0000000..f1bca0b --- /dev/null +++ b/test/whep-client/main_test.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "net" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + coreweb "github.com/datarhei/core/v16/core/webrtc" + "github.com/pion/rtp" +) + +// TestSubscribe_EndToEnd stands up an in-process webrtc-poc stack, +// injects synthetic H.264(PT=102) + Opus(PT=111) RTP into the Source's +// UDP port, and asserts Subscribe returns nil within the timeout. +// +// Network-heavy; skipped under -short. +func TestSubscribe_EndToEnd(t *testing.T) { + if testing.Short() { + t.Skip("skipping end-to-end subscribe test in short mode") + } + + src, err := coreweb.NewSource("stream-e2e", 0) + if err != nil { + t.Fatalf("NewSource: %v", err) + } + src.Start() + defer src.Close() + + reg := coreweb.NewRegistry() + if err := reg.Register("stream-e2e", src); err != nil { + t.Fatalf("Register: %v", err) + } + + factory, err := coreweb.NewPeerFactory(coreweb.DefaultConfig()) + if err != nil { + t.Fatalf("NewPeerFactory: %v", err) + } + + handler := coreweb.NewWHEPHandler(reg, factory, coreweb.DefaultConfig()) + ts := httptest.NewServer(http.StripPrefix("", handler)) + defer ts.Close() + + // Begin injecting RTP into the source. + rtpAddr := src.LocalAddr() + conn, err := net.DialUDP("udp", nil, rtpAddr) + if err != nil { + t.Fatalf("dial udp: %v", err) + } + defer conn.Close() + + stop := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + tick := time.NewTicker(20 * time.Millisecond) + defer tick.Stop() + var seq uint16 + var ts uint32 + for { + select { + case <-stop: + return + case <-tick.C: + // Video packet (PT=102). + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 102, + SequenceNumber: seq, + Timestamp: ts, + SSRC: 0x1234, + }, + Payload: []byte{0x00, 0x00, 0x00, 0x01, 0x09, 0x10}, + } + if b, err := pkt.Marshal(); err == nil { + _, _ = conn.Write(b) + } + // Audio packet (PT=111). + pkt.PayloadType = 111 + pkt.SSRC = 0x5678 + pkt.SequenceNumber = seq + if b, err := pkt.Marshal(); err == nil { + _, _ = conn.Write(b) + } + seq++ + ts += 3000 + } + } + }() + defer func() { + close(stop) + wg.Wait() + }() + + // We don't care whether the test client's Subscribe can actually + // decode H.264 — just that it observed *some* RTP on both tracks. + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + whepURL := strings.TrimRight(ts.URL, "/") + "/whep/stream-e2e" + if err := Subscribe(ctx, whepURL); err != nil { + t.Fatalf("Subscribe: %v", err) + } +}