package webrtc import ( "net" "testing" "time" "github.com/pion/rtp" ) func TestSource_ID(t *testing.T) { s, err := NewSource("streamA", 0) // 0 = ephemeral port if err != nil { t.Fatalf("NewSource: %v", err) } defer s.Close() if s.ID() != "streamA" { t.Errorf("ID() = %q, want streamA", s.ID()) } } func TestSource_ReceiveAndFanout(t *testing.T) { s, err := NewSource("streamA", 0) if err != nil { t.Fatalf("NewSource: %v", err) } defer s.Close() // Subscribe before sending. sub := s.Subscribe(16) // buffer depth 16 defer s.Unsubscribe(sub) s.Start() // Build and send a minimal RTP packet to the source's UDP port. pkt := &rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: 96, SequenceNumber: 1, Timestamp: 1000, SSRC: 0xDEADBEEF, }, Payload: []byte{0x01, 0x02, 0x03, 0x04}, } raw, err := pkt.Marshal() if err != nil { t.Fatalf("pkt.Marshal: %v", err) } conn, err := net.Dial("udp", s.LocalAddr().String()) if err != nil { t.Fatalf("net.Dial: %v", err) } defer conn.Close() if _, err := conn.Write(raw); err != nil { t.Fatalf("conn.Write: %v", err) } select { case got := <-sub: if got.SSRC != 0xDEADBEEF { t.Errorf("received SSRC = %x, want DEADBEEF", got.SSRC) } if got.SequenceNumber != 1 { t.Errorf("received SeqNum = %d, want 1", got.SequenceNumber) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for RTP packet on subscriber channel") } } func TestSource_MultipleSubscribers(t *testing.T) { s, err := NewSource("streamA", 0) if err != nil { t.Fatalf("NewSource: %v", err) } defer s.Close() subs := []chan *rtp.Packet{ s.Subscribe(8), s.Subscribe(8), s.Subscribe(8), } for _, sub := range subs { defer s.Unsubscribe(sub) } s.Start() raw, _ := (&rtp.Packet{ Header: rtp.Header{Version: 2, PayloadType: 96, SequenceNumber: 42, SSRC: 1}, Payload: []byte{0xAA}, }).Marshal() conn, _ := net.Dial("udp", s.LocalAddr().String()) defer conn.Close() _, _ = conn.Write(raw) for i, sub := range subs { select { case got := <-sub: if got.SequenceNumber != 42 { t.Errorf("sub %d got seq %d, want 42", i, got.SequenceNumber) } case <-time.After(2 * time.Second): t.Errorf("sub %d timed out", i) } } } func TestSource_UnsubscribeStopsDelivery(t *testing.T) { s, _ := NewSource("streamA", 0) defer s.Close() sub := s.Subscribe(8) s.Start() s.Unsubscribe(sub) // After Unsubscribe, the channel should be closed. select { case _, ok := <-sub: if ok { t.Error("expected channel closed after Unsubscribe, got value") } case <-time.After(500 * time.Millisecond): t.Error("timed out waiting for channel close") } }