2026-04-17 08:52:40 -04:00
|
|
|
// Command whep-client is a minimal Pion-based WHEP subscriber used for
|
|
|
|
|
// M1 end-to-end verification. It POSTs a recvonly SDP offer to a WHEP
|
|
|
|
|
// endpoint, applies the answer, then reports whether the video and
|
|
|
|
|
// audio tracks receive at least one RTP packet before a timeout.
|
|
|
|
|
//
|
|
|
|
|
// This is a test helper; it is NOT part of the Core binary.
|
|
|
|
|
package main
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"context"
|
|
|
|
|
"flag"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
|
"log"
|
|
|
|
|
"net/http"
|
|
|
|
|
"os"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/pion/webrtc/v4"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
|
var (
|
|
|
|
|
whepURL = flag.String("url", "http://127.0.0.1:8787/whep/test", "WHEP endpoint URL")
|
2026-05-03 00:59:08 -04:00
|
|
|
token = flag.String("token", "", "Authorization: Bearer <token>; empty means no auth header")
|
2026-04-17 08:52:40 -04:00
|
|
|
timeout = flag.Duration("timeout", 10*time.Second, "overall subscribe+receive timeout")
|
|
|
|
|
)
|
|
|
|
|
flag.Parse()
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), *timeout)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
2026-05-03 00:59:08 -04:00
|
|
|
if err := Subscribe(ctx, *whepURL, *token); err != nil {
|
2026-04-17 08:52:40 -04:00
|
|
|
log.Fatalf("subscribe failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
fmt.Println("OK: received video and audio RTP")
|
|
|
|
|
os.Exit(0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Subscribe performs a full WHEP subscribe against whepURL and returns
|
|
|
|
|
// nil when both a video and an audio RTP packet have been observed
|
|
|
|
|
// before ctx expires. It is exported so tests can exercise it.
|
2026-05-03 00:59:08 -04:00
|
|
|
func Subscribe(ctx context.Context, whepURL, token string) error {
|
2026-04-17 08:52:40 -04:00
|
|
|
me := &webrtc.MediaEngine{}
|
|
|
|
|
if err := me.RegisterDefaultCodecs(); err != nil {
|
|
|
|
|
return fmt.Errorf("register codecs: %w", err)
|
|
|
|
|
}
|
|
|
|
|
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
|
|
|
|
|
|
|
|
|
|
pc, err := api.NewPeerConnection(webrtc.Configuration{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("new peer connection: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer pc.Close()
|
|
|
|
|
|
|
|
|
|
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
|
|
|
|
|
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
|
|
|
|
return fmt.Errorf("add video transceiver: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
|
|
|
|
|
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
|
|
|
|
return fmt.Errorf("add audio transceiver: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
videoDone := make(chan struct{})
|
|
|
|
|
audioDone := make(chan struct{})
|
|
|
|
|
pc.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
|
|
|
|
|
kind := t.Kind()
|
|
|
|
|
log.Printf("OnTrack: kind=%s codec=%s pt=%d", kind, t.Codec().MimeType, t.PayloadType())
|
|
|
|
|
go func() {
|
|
|
|
|
buf := make([]byte, 1500)
|
|
|
|
|
// One successful ReadRTP is enough to prove egress.
|
|
|
|
|
if _, _, err := t.Read(buf); err != nil {
|
|
|
|
|
log.Printf("read %s: %v", kind, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
switch kind {
|
|
|
|
|
case webrtc.RTPCodecTypeVideo:
|
|
|
|
|
select {
|
|
|
|
|
case <-videoDone:
|
|
|
|
|
default:
|
|
|
|
|
close(videoDone)
|
|
|
|
|
}
|
|
|
|
|
case webrtc.RTPCodecTypeAudio:
|
|
|
|
|
select {
|
|
|
|
|
case <-audioDone:
|
|
|
|
|
default:
|
|
|
|
|
close(audioDone)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
offer, err := pc.CreateOffer(nil)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("create offer: %w", err)
|
|
|
|
|
}
|
|
|
|
|
gather := webrtc.GatheringCompletePromise(pc)
|
|
|
|
|
if err := pc.SetLocalDescription(offer); err != nil {
|
|
|
|
|
return fmt.Errorf("set local: %w", err)
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-gather:
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return fmt.Errorf("ice gather: %w", ctx.Err())
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-03 00:59:08 -04:00
|
|
|
answerSDP, err := postOffer(ctx, whepURL, token, pc.LocalDescription().SDP)
|
2026-04-17 08:52:40 -04:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
|
|
|
|
|
Type: webrtc.SDPTypeAnswer,
|
|
|
|
|
SDP: answerSDP,
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return fmt.Errorf("set remote: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait for one RTP packet on each track or ctx timeout.
|
|
|
|
|
select {
|
|
|
|
|
case <-videoDone:
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return fmt.Errorf("waiting for video: %w", ctx.Err())
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-audioDone:
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return fmt.Errorf("waiting for audio: %w", ctx.Err())
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-03 00:59:08 -04:00
|
|
|
func postOffer(ctx context.Context, url, token, sdp string) (string, error) {
|
2026-04-17 08:52:40 -04:00
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url,
|
|
|
|
|
bytes.NewReader([]byte(sdp)))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", fmt.Errorf("new request: %w", err)
|
|
|
|
|
}
|
|
|
|
|
req.Header.Set("Content-Type", "application/sdp")
|
2026-05-03 00:59:08 -04:00
|
|
|
if token != "" {
|
|
|
|
|
req.Header.Set("Authorization", "Bearer "+token)
|
|
|
|
|
}
|
2026-04-17 08:52:40 -04:00
|
|
|
|
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", fmt.Errorf("POST %s: %w", url, err)
|
|
|
|
|
}
|
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
|
|
|
|
|
body, _ := io.ReadAll(resp.Body)
|
|
|
|
|
if resp.StatusCode != http.StatusCreated {
|
|
|
|
|
return "", fmt.Errorf("WHEP %s: %d %s", url, resp.StatusCode, string(body))
|
|
|
|
|
}
|
|
|
|
|
return string(body), nil
|
|
|
|
|
}
|