datarhei-dragonfork-core/test/whep-client/main.go

157 lines
4.3 KiB
Go
Raw Permalink Normal View History

// Command whep-client is a minimal Pion-based WHEP subscriber used for
// M1 end-to-end verification. It POSTs a recvonly SDP offer to a WHEP
// endpoint, applies the answer, then reports whether the video and
// audio tracks receive at least one RTP packet before a timeout.
//
// This is a test helper; it is NOT part of the Core binary.
package main
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"time"
"github.com/pion/webrtc/v4"
)
func main() {
var (
whepURL = flag.String("url", "http://127.0.0.1:8787/whep/test", "WHEP endpoint URL")
token = flag.String("token", "", "Authorization: Bearer <token>; empty means no auth header")
timeout = flag.Duration("timeout", 10*time.Second, "overall subscribe+receive timeout")
)
flag.Parse()
ctx, cancel := context.WithTimeout(context.Background(), *timeout)
defer cancel()
if err := Subscribe(ctx, *whepURL, *token); err != nil {
log.Fatalf("subscribe failed: %v", err)
}
fmt.Println("OK: received video and audio RTP")
os.Exit(0)
}
// Subscribe performs a full WHEP subscribe against whepURL and returns
// nil when both a video and an audio RTP packet have been observed
// before ctx expires. It is exported so tests can exercise it.
func Subscribe(ctx context.Context, whepURL, token string) error {
me := &webrtc.MediaEngine{}
if err := me.RegisterDefaultCodecs(); err != nil {
return fmt.Errorf("register codecs: %w", err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
pc, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return fmt.Errorf("new peer connection: %w", err)
}
defer pc.Close()
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
return fmt.Errorf("add video transceiver: %w", err)
}
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
return fmt.Errorf("add audio transceiver: %w", err)
}
videoDone := make(chan struct{})
audioDone := make(chan struct{})
pc.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
kind := t.Kind()
log.Printf("OnTrack: kind=%s codec=%s pt=%d", kind, t.Codec().MimeType, t.PayloadType())
go func() {
buf := make([]byte, 1500)
// One successful ReadRTP is enough to prove egress.
if _, _, err := t.Read(buf); err != nil {
log.Printf("read %s: %v", kind, err)
return
}
switch kind {
case webrtc.RTPCodecTypeVideo:
select {
case <-videoDone:
default:
close(videoDone)
}
case webrtc.RTPCodecTypeAudio:
select {
case <-audioDone:
default:
close(audioDone)
}
}
}()
})
offer, err := pc.CreateOffer(nil)
if err != nil {
return fmt.Errorf("create offer: %w", err)
}
gather := webrtc.GatheringCompletePromise(pc)
if err := pc.SetLocalDescription(offer); err != nil {
return fmt.Errorf("set local: %w", err)
}
select {
case <-gather:
case <-ctx.Done():
return fmt.Errorf("ice gather: %w", ctx.Err())
}
answerSDP, err := postOffer(ctx, whepURL, token, pc.LocalDescription().SDP)
if err != nil {
return err
}
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: answerSDP,
}); err != nil {
return fmt.Errorf("set remote: %w", err)
}
// Wait for one RTP packet on each track or ctx timeout.
select {
case <-videoDone:
case <-ctx.Done():
return fmt.Errorf("waiting for video: %w", ctx.Err())
}
select {
case <-audioDone:
case <-ctx.Done():
return fmt.Errorf("waiting for audio: %w", ctx.Err())
}
return nil
}
func postOffer(ctx context.Context, url, token, sdp string) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url,
bytes.NewReader([]byte(sdp)))
if err != nil {
return "", fmt.Errorf("new request: %w", err)
}
req.Header.Set("Content-Type", "application/sdp")
if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("POST %s: %w", url, err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusCreated {
return "", fmt.Errorf("WHEP %s: %d %s", url, resp.StatusCode, string(body))
}
return string(body), nil
}