package webrtc import ( "net" "net/http" "net/http/httptest" "net/url" "regexp" "strconv" "strings" "sync/atomic" "testing" "time" "github.com/labstack/echo/v4" "github.com/pion/rtp" pionwebrtc "github.com/pion/webrtc/v4" "github.com/datarhei/core/v16/config" appcfg "github.com/datarhei/core/v16/restream/app" ) // TestIntegration_SyntheticRTPToWHEP wires the full M2 subsystem end to // end using in-process UDP sockets and a Pion WHEP subscriber: // // 1. Build a Subsystem and Handler (no Core/HTTP server needed). // 2. Fire the OnStart hook directly — this allocates two adjacent // loopback UDP ports and registers a process stream. // 3. Extract the allocated video + audio ports from the returned // ConfigIO legs. // 4. Build a Pion PeerConnection (recvonly video + audio) and POST its // SDP offer through the Echo Handler. // 5. Plumb the returned answer into the PC. // 6. Spray synthetic RTP packets at both UDP ports. // 7. Assert that the PC sees OnTrack for both kinds and at least one // RTP packet arrives on each track inside the timeout budget. // // This is the single highest-leverage integration test for M2 — it // catches the whole stack: port allocation, hook contract, two-track // forwarding, WHEP handshake, and JWT-mounted routing doesn't interfere // with the handler's internal flow. func TestIntegration_SyntheticRTPToWHEP(t *testing.T) { // --- 1. Construct subsystem + handler. --- sub, err := New(config.DataWebRTC{Enable: true}, nil) if err != nil { t.Fatalf("subsystem New: %v", err) } defer sub.Close() h := NewHandler(sub, 0) defer h.Close() // --- 2. Fire OnStart directly to populate the stream registry // and allocate ports. We bypass the restream manager by // invoking the hook the subsystem would have registered. processID := "integration-probe" legs, err := sub.onProcessStart(processID, &appcfg.Config{ ID: processID, WebRTC: appcfg.ConfigWebRTC{ Enabled: true, VideoPT: 102, AudioPT: 111, }, }) if err != nil { t.Fatalf("onProcessStart: %v", err) } if len(legs) != 2 { t.Fatalf("expected 2 output legs, got %d", len(legs)) } defer sub.onProcessStop(processID) // --- 3. Extract UDP ports from leg addresses. --- videoPort, err := portFromLegAddress(legs[0].Address) if err != nil { t.Fatalf("video leg address %q: %v", legs[0].Address, err) } audioPort, err := portFromLegAddress(legs[1].Address) if err != nil { t.Fatalf("audio leg address %q: %v", legs[1].Address, err) } if audioPort != videoPort+1 { t.Fatalf("expected adjacent ports, got video=%d audio=%d", videoPort, audioPort) } // --- 4. Mount the handler in an Echo server (httptest) so we // exercise the real route registration path. --- e := echo.New() g := e.Group("") h.Register(g) srv := httptest.NewServer(e) defer srv.Close() // --- 5. Build the WHEP subscriber PeerConnection. --- me := &pionwebrtc.MediaEngine{} if err := me.RegisterDefaultCodecs(); err != nil { t.Fatalf("register default codecs: %v", err) } api := pionwebrtc.NewAPI(pionwebrtc.WithMediaEngine(me)) pc, err := api.NewPeerConnection(pionwebrtc.Configuration{}) if err != nil { t.Fatalf("new PC: %v", err) } defer pc.Close() if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeVideo, pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil { t.Fatalf("add video transceiver: %v", err) } if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeAudio, pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil { t.Fatalf("add audio transceiver: %v", err) } // Signal when each track has produced its first RTP packet. var videoGot, audioGot atomic.Bool videoCh := make(chan struct{}, 1) audioCh := make(chan struct{}, 1) pc.OnTrack(func(tr *pionwebrtc.TrackRemote, _ *pionwebrtc.RTPReceiver) { // Read a single RTP packet and signal the appropriate channel. go func() { if _, _, readErr := tr.ReadRTP(); readErr != nil { return } switch tr.Kind() { case pionwebrtc.RTPCodecTypeVideo: if videoGot.CompareAndSwap(false, true) { videoCh <- struct{}{} } case pionwebrtc.RTPCodecTypeAudio: if audioGot.CompareAndSwap(false, true) { audioCh <- struct{}{} } } }() }) offer, err := pc.CreateOffer(nil) if err != nil { t.Fatalf("create offer: %v", err) } gatherLocal := pionwebrtc.GatheringCompletePromise(pc) if err := pc.SetLocalDescription(offer); err != nil { t.Fatalf("set local: %v", err) } select { case <-gatherLocal: case <-time.After(5 * time.Second): t.Fatalf("local ICE gathering timeout") } offerSDP := pc.LocalDescription().SDP // --- 6. POST the offer to the WHEP endpoint. --- resp, err := http.Post(srv.URL+"/whep/"+processID, "application/sdp", strings.NewReader(offerSDP)) if err != nil { t.Fatalf("POST /whep: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusCreated { t.Fatalf("POST /whep status = %d, want 201", resp.StatusCode) } answerBuf := make([]byte, 1<<15) n, _ := resp.Body.Read(answerBuf) answerSDP := string(answerBuf[:n]) if !strings.Contains(answerSDP, "v=0") { t.Fatalf("answer SDP malformed: %q", answerSDP) } loc := resp.Header.Get("Location") if loc == "" || !strings.HasPrefix(loc, "/whep/"+processID+"/") { t.Fatalf("Location header bad: %q", loc) } if err := pc.SetRemoteDescription(pionwebrtc.SessionDescription{ Type: pionwebrtc.SDPTypeAnswer, SDP: answerSDP, }); err != nil { t.Fatalf("set remote: %v", err) } // --- 7. Spray synthetic RTP into both UDP ports. --- videoSender, err := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(videoPort)) if err != nil { t.Fatalf("dial video: %v", err) } defer videoSender.Close() audioSender, err := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(audioPort)) if err != nil { t.Fatalf("dial audio: %v", err) } defer audioSender.Close() stopSend := make(chan struct{}) defer close(stopSend) go func() { ticker := time.NewTicker(20 * time.Millisecond) defer ticker.Stop() var vseq, aseq uint16 for { select { case <-stopSend: return case <-ticker.C: vseq++ aseq++ vpkt := synthRTPPacket(102, vseq, uint32(vseq)*3000, 0xcafe0000, []byte("vvvvvvvv")) _, _ = videoSender.Write(vpkt) apkt := synthRTPPacket(111, aseq, uint32(aseq)*960, 0xbeef0000, []byte("aaaaaaaa")) _, _ = audioSender.Write(apkt) } } }() // --- 8. Wait for both tracks' first packet. --- waitFor := func(name string, ch chan struct{}) { select { case <-ch: // success case <-time.After(10 * time.Second): t.Fatalf("%s: no RTP received via WHEP within 10s", name) } } waitFor("video", videoCh) waitFor("audio", audioCh) // Sanity: the Location path should DELETE cleanly. parsedLoc, err := url.Parse(loc) if err != nil { t.Fatalf("parse Location: %v", err) } deleteReq, _ := http.NewRequest(http.MethodDelete, srv.URL+parsedLoc.Path, nil) delResp, err := http.DefaultClient.Do(deleteReq) if err != nil { t.Fatalf("DELETE /whep/.../resource: %v", err) } _ = delResp.Body.Close() if delResp.StatusCode != http.StatusNoContent { t.Fatalf("DELETE status = %d, want 204", delResp.StatusCode) } } // portFromLegAddress pulls the UDP port out of a leg Address like // "udp://127.0.0.1:49200?pkt_size=1316". func portFromLegAddress(addr string) (int, error) { re := regexp.MustCompile(`udp://[^:]+:(\d+)`) m := re.FindStringSubmatch(addr) if len(m) != 2 { return 0, &portParseError{addr: addr} } return strconv.Atoi(m[1]) } type portParseError struct{ addr string } func (e *portParseError) Error() string { return "cannot parse port from " + e.addr } // synthRTPPacket builds a minimal valid RTP packet for injection testing. func synthRTPPacket(pt uint8, seq uint16, ts uint32, ssrc uint32, payload []byte) []byte { p := &rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: pt, SequenceNumber: seq, Timestamp: ts, SSRC: ssrc, Marker: false, }, Payload: payload, } b, _ := p.Marshal() return b }