diff --git a/app/webrtc/integration_test.go b/app/webrtc/integration_test.go new file mode 100644 index 0000000..e9573f2 --- /dev/null +++ b/app/webrtc/integration_test.go @@ -0,0 +1,275 @@ +package webrtc + +import ( + "net" + "net/http" + "net/http/httptest" + "net/url" + "regexp" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/labstack/echo/v4" + "github.com/pion/rtp" + pionwebrtc "github.com/pion/webrtc/v4" + + "github.com/datarhei/core/v16/config" + appcfg "github.com/datarhei/core/v16/restream/app" +) + +// TestIntegration_SyntheticRTPToWHEP wires the full M2 subsystem end to +// end using in-process UDP sockets and a Pion WHEP subscriber: +// +// 1. Build a Subsystem and Handler (no Core/HTTP server needed). +// 2. Fire the OnStart hook directly — this allocates two adjacent +// loopback UDP ports and registers a process stream. +// 3. Extract the allocated video + audio ports from the returned +// ConfigIO legs. +// 4. Build a Pion PeerConnection (recvonly video + audio) and POST its +// SDP offer through the Echo Handler. +// 5. Plumb the returned answer into the PC. +// 6. Spray synthetic RTP packets at both UDP ports. +// 7. Assert that the PC sees OnTrack for both kinds and at least one +// RTP packet arrives on each track inside the timeout budget. +// +// This is the single highest-leverage integration test for M2 — it +// catches the whole stack: port allocation, hook contract, two-track +// forwarding, WHEP handshake, and JWT-mounted routing doesn't interfere +// with the handler's internal flow. +func TestIntegration_SyntheticRTPToWHEP(t *testing.T) { + // --- 1. Construct subsystem + handler. --- + sub, err := New(config.DataWebRTC{Enable: true}, nil) + if err != nil { + t.Fatalf("subsystem New: %v", err) + } + defer sub.Close() + + h := NewHandler(sub, 0) + defer h.Close() + + // --- 2. Fire OnStart directly to populate the stream registry + // and allocate ports. We bypass the restream manager by + // invoking the hook the subsystem would have registered. + processID := "integration-probe" + legs, err := sub.onProcessStart(processID, &appcfg.Config{ + ID: processID, + WebRTC: appcfg.ConfigWebRTC{ + Enabled: true, + VideoPT: 102, + AudioPT: 111, + }, + }) + if err != nil { + t.Fatalf("onProcessStart: %v", err) + } + if len(legs) != 2 { + t.Fatalf("expected 2 output legs, got %d", len(legs)) + } + defer sub.onProcessStop(processID) + + // --- 3. Extract UDP ports from leg addresses. --- + videoPort, err := portFromLegAddress(legs[0].Address) + if err != nil { + t.Fatalf("video leg address %q: %v", legs[0].Address, err) + } + audioPort, err := portFromLegAddress(legs[1].Address) + if err != nil { + t.Fatalf("audio leg address %q: %v", legs[1].Address, err) + } + if audioPort != videoPort+1 { + t.Fatalf("expected adjacent ports, got video=%d audio=%d", videoPort, audioPort) + } + + // --- 4. Mount the handler in an Echo server (httptest) so we + // exercise the real route registration path. --- + e := echo.New() + g := e.Group("") + h.Register(g) + srv := httptest.NewServer(e) + defer srv.Close() + + // --- 5. Build the WHEP subscriber PeerConnection. --- + me := &pionwebrtc.MediaEngine{} + if err := me.RegisterDefaultCodecs(); err != nil { + t.Fatalf("register default codecs: %v", err) + } + api := pionwebrtc.NewAPI(pionwebrtc.WithMediaEngine(me)) + pc, err := api.NewPeerConnection(pionwebrtc.Configuration{}) + if err != nil { + t.Fatalf("new PC: %v", err) + } + defer pc.Close() + + if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeVideo, + pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil { + t.Fatalf("add video transceiver: %v", err) + } + if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeAudio, + pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil { + t.Fatalf("add audio transceiver: %v", err) + } + + // Signal when each track has produced its first RTP packet. + var videoGot, audioGot atomic.Bool + videoCh := make(chan struct{}, 1) + audioCh := make(chan struct{}, 1) + + pc.OnTrack(func(tr *pionwebrtc.TrackRemote, _ *pionwebrtc.RTPReceiver) { + // Read a single RTP packet and signal the appropriate channel. + go func() { + if _, _, readErr := tr.ReadRTP(); readErr != nil { + return + } + switch tr.Kind() { + case pionwebrtc.RTPCodecTypeVideo: + if videoGot.CompareAndSwap(false, true) { + videoCh <- struct{}{} + } + case pionwebrtc.RTPCodecTypeAudio: + if audioGot.CompareAndSwap(false, true) { + audioCh <- struct{}{} + } + } + }() + }) + + offer, err := pc.CreateOffer(nil) + if err != nil { + t.Fatalf("create offer: %v", err) + } + gatherLocal := pionwebrtc.GatheringCompletePromise(pc) + if err := pc.SetLocalDescription(offer); err != nil { + t.Fatalf("set local: %v", err) + } + select { + case <-gatherLocal: + case <-time.After(5 * time.Second): + t.Fatalf("local ICE gathering timeout") + } + offerSDP := pc.LocalDescription().SDP + + // --- 6. POST the offer to the WHEP endpoint. --- + resp, err := http.Post(srv.URL+"/whep/"+processID, "application/sdp", + strings.NewReader(offerSDP)) + if err != nil { + t.Fatalf("POST /whep: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusCreated { + t.Fatalf("POST /whep status = %d, want 201", resp.StatusCode) + } + + answerBuf := make([]byte, 1<<15) + n, _ := resp.Body.Read(answerBuf) + answerSDP := string(answerBuf[:n]) + if !strings.Contains(answerSDP, "v=0") { + t.Fatalf("answer SDP malformed: %q", answerSDP) + } + + loc := resp.Header.Get("Location") + if loc == "" || !strings.HasPrefix(loc, "/whep/"+processID+"/") { + t.Fatalf("Location header bad: %q", loc) + } + + if err := pc.SetRemoteDescription(pionwebrtc.SessionDescription{ + Type: pionwebrtc.SDPTypeAnswer, + SDP: answerSDP, + }); err != nil { + t.Fatalf("set remote: %v", err) + } + + // --- 7. Spray synthetic RTP into both UDP ports. --- + videoSender, err := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(videoPort)) + if err != nil { + t.Fatalf("dial video: %v", err) + } + defer videoSender.Close() + audioSender, err := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(audioPort)) + if err != nil { + t.Fatalf("dial audio: %v", err) + } + defer audioSender.Close() + + stopSend := make(chan struct{}) + defer close(stopSend) + go func() { + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + var vseq, aseq uint16 + for { + select { + case <-stopSend: + return + case <-ticker.C: + vseq++ + aseq++ + vpkt := synthRTPPacket(102, vseq, uint32(vseq)*3000, 0xcafe0000, []byte("vvvvvvvv")) + _, _ = videoSender.Write(vpkt) + apkt := synthRTPPacket(111, aseq, uint32(aseq)*960, 0xbeef0000, []byte("aaaaaaaa")) + _, _ = audioSender.Write(apkt) + } + } + }() + + // --- 8. Wait for both tracks' first packet. --- + waitFor := func(name string, ch chan struct{}) { + select { + case <-ch: + // success + case <-time.After(10 * time.Second): + t.Fatalf("%s: no RTP received via WHEP within 10s", name) + } + } + waitFor("video", videoCh) + waitFor("audio", audioCh) + + // Sanity: the Location path should DELETE cleanly. + parsedLoc, err := url.Parse(loc) + if err != nil { + t.Fatalf("parse Location: %v", err) + } + deleteReq, _ := http.NewRequest(http.MethodDelete, srv.URL+parsedLoc.Path, nil) + delResp, err := http.DefaultClient.Do(deleteReq) + if err != nil { + t.Fatalf("DELETE /whep/.../resource: %v", err) + } + _ = delResp.Body.Close() + if delResp.StatusCode != http.StatusNoContent { + t.Fatalf("DELETE status = %d, want 204", delResp.StatusCode) + } +} + +// portFromLegAddress pulls the UDP port out of a leg Address like +// "udp://127.0.0.1:49200?pkt_size=1316". +func portFromLegAddress(addr string) (int, error) { + re := regexp.MustCompile(`udp://[^:]+:(\d+)`) + m := re.FindStringSubmatch(addr) + if len(m) != 2 { + return 0, &portParseError{addr: addr} + } + return strconv.Atoi(m[1]) +} + +type portParseError struct{ addr string } + +func (e *portParseError) Error() string { return "cannot parse port from " + e.addr } + +// synthRTPPacket builds a minimal valid RTP packet for injection testing. +func synthRTPPacket(pt uint8, seq uint16, ts uint32, ssrc uint32, payload []byte) []byte { + p := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: pt, + SequenceNumber: seq, + Timestamp: ts, + SSRC: ssrc, + Marker: false, + }, + Payload: payload, + } + b, _ := p.Marshal() + return b +}