package main import ( "context" "net" "net/http" "net/http/httptest" "strings" "sync" "testing" "time" coreweb "github.com/datarhei/core/v16/core/webrtc" "github.com/pion/rtp" ) // TestSubscribe_EndToEnd stands up an in-process webrtc-poc stack, // injects synthetic H.264(PT=102) + Opus(PT=111) RTP into the Source's // UDP port, and asserts Subscribe returns nil within the timeout. // // Network-heavy; skipped under -short. func TestSubscribe_EndToEnd(t *testing.T) { if testing.Short() { t.Skip("skipping end-to-end subscribe test in short mode") } src, err := coreweb.NewSource("stream-e2e", 0) if err != nil { t.Fatalf("NewSource: %v", err) } src.Start() defer src.Close() reg := coreweb.NewRegistry() if err := reg.Register("stream-e2e", src); err != nil { t.Fatalf("Register: %v", err) } factory, err := coreweb.NewPeerFactory(coreweb.DefaultConfig()) if err != nil { t.Fatalf("NewPeerFactory: %v", err) } handler := coreweb.NewWHEPHandler(reg, factory, coreweb.DefaultConfig()) ts := httptest.NewServer(http.StripPrefix("", handler)) defer ts.Close() // Begin injecting RTP into the source. rtpAddr := src.LocalAddr() conn, err := net.DialUDP("udp", nil, rtpAddr) if err != nil { t.Fatalf("dial udp: %v", err) } defer conn.Close() stop := make(chan struct{}) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() tick := time.NewTicker(20 * time.Millisecond) defer tick.Stop() var seq uint16 var ts uint32 for { select { case <-stop: return case <-tick.C: // Video packet (PT=102). pkt := &rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: 102, SequenceNumber: seq, Timestamp: ts, SSRC: 0x1234, }, Payload: []byte{0x00, 0x00, 0x00, 0x01, 0x09, 0x10}, } if b, err := pkt.Marshal(); err == nil { _, _ = conn.Write(b) } // Audio packet (PT=111). pkt.PayloadType = 111 pkt.SSRC = 0x5678 pkt.SequenceNumber = seq if b, err := pkt.Marshal(); err == nil { _, _ = conn.Write(b) } seq++ ts += 3000 } } }() defer func() { close(stop) wg.Wait() }() // We don't care whether the test client's Subscribe can actually // decode H.264 — just that it observed *some* RTP on both tracks. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() whepURL := strings.TrimRight(ts.URL, "/") + "/whep/stream-e2e" if err := Subscribe(ctx, whepURL, ""); err != nil { t.Fatalf("Subscribe: %v", err) } }