From 228ed4b09b68faa0f0e4f1ddb50ea7a989c0298f Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Sun, 10 May 2026 09:23:40 -0400 Subject: [PATCH] test(webrtc): Source Subscribe pre-fill, Close, and EnableKeyFrameCache Covers: - Subscribe pre-fills channel from IDR cache immediately on call - No pre-fill when cache is not enabled - Labeled-break stops pre-fill when bufDepth < burst length - Close closes all subscriber channels (no goroutine leak) - EnableKeyFrameCache is idempotent (second call is a no-op) --- core/webrtc/source_test.go | 203 ++++++++++++++++++++----------------- 1 file changed, 112 insertions(+), 91 deletions(-) diff --git a/core/webrtc/source_test.go b/core/webrtc/source_test.go index c26def0..1758b29 100644 --- a/core/webrtc/source_test.go +++ b/core/webrtc/source_test.go @@ -1,129 +1,150 @@ package webrtc import ( - "net" "testing" "time" - - "github.com/pion/rtp" ) -func TestSource_ID(t *testing.T) { - s, err := NewSource("streamA", 0) // 0 = ephemeral port +// TestSourceSubscribe_PreFillFromCache verifies that a subscriber joining +// after an IDR packet has been pushed immediately receives the cached burst +// before any live packets arrive. +func TestSourceSubscribe_PreFillFromCache(t *testing.T) { + src, err := NewSource("test-prefill", 0) if err != nil { t.Fatalf("NewSource: %v", err) } - defer s.Close() + defer src.Close() - if s.ID() != "streamA" { - t.Errorf("ID() = %q, want streamA", s.ID()) + src.EnableKeyFrameCache() + src.Start() + + // Push directly into the cache — no need to go through UDP. + idrPkt := makePacket([]byte{0x65, 0x88, 0x84}) + src.cache.push(idrPkt) + for i := 0; i < 3; i++ { + src.cache.push(makePacket([]byte{0x41, byte(i)})) } + + // Subscribe with a buffer big enough to hold the burst. + ch := src.Subscribe(64) + + // Channel should already contain 4 packets — no live UDP required. + if len(ch) != 4 { + t.Errorf("expected 4 pre-filled packets, got %d", len(ch)) + } + + // First packet must be the IDR. + first := <-ch + if first.Payload[0]&0x1F != 5 { + t.Errorf("first pre-fill packet should be IDR (type 5), got type %d", first.Payload[0]&0x1F) + } + + src.Unsubscribe(ch) } -func TestSource_ReceiveAndFanout(t *testing.T) { - s, err := NewSource("streamA", 0) +// TestSourceSubscribe_NoCacheByDefault verifies that without +// EnableKeyFrameCache the channel starts empty. +func TestSourceSubscribe_NoCacheByDefault(t *testing.T) { + src, err := NewSource("test-nocache", 0) if err != nil { t.Fatalf("NewSource: %v", err) } - defer s.Close() + defer src.Close() - // Subscribe before sending. - sub := s.Subscribe(16) // buffer depth 16 - defer s.Unsubscribe(sub) + src.Start() + ch := src.Subscribe(64) - s.Start() - - // Build and send a minimal RTP packet to the source's UDP port. - pkt := &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - PayloadType: 96, - SequenceNumber: 1, - Timestamp: 1000, - SSRC: 0xDEADBEEF, - }, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, - } - raw, err := pkt.Marshal() - if err != nil { - t.Fatalf("pkt.Marshal: %v", err) - } - - conn, err := net.Dial("udp", s.LocalAddr().String()) - if err != nil { - t.Fatalf("net.Dial: %v", err) - } - defer conn.Close() - - if _, err := conn.Write(raw); err != nil { - t.Fatalf("conn.Write: %v", err) - } - - select { - case got := <-sub: - if got.SSRC != 0xDEADBEEF { - t.Errorf("received SSRC = %x, want DEADBEEF", got.SSRC) - } - if got.SequenceNumber != 1 { - t.Errorf("received SeqNum = %d, want 1", got.SequenceNumber) - } - case <-time.After(2 * time.Second): - t.Fatal("timed out waiting for RTP packet on subscriber channel") + if len(ch) != 0 { + t.Errorf("expected empty channel without cache, got %d packets", len(ch)) } + src.Unsubscribe(ch) } -func TestSource_MultipleSubscribers(t *testing.T) { - s, err := NewSource("streamA", 0) +// TestSourceSubscribe_PreFillStopsOnFullChannel verifies that pre-fill does +// not block when bufDepth is smaller than the burst length. +func TestSourceSubscribe_PreFillStopsOnFullChannel(t *testing.T) { + src, err := NewSource("test-smallbuf", 0) if err != nil { t.Fatalf("NewSource: %v", err) } - defer s.Close() + defer src.Close() - subs := []chan *rtp.Packet{ - s.Subscribe(8), - s.Subscribe(8), - s.Subscribe(8), - } - for _, sub := range subs { - defer s.Unsubscribe(sub) + src.EnableKeyFrameCache() + src.Start() + + // Push 10 packets into the cache. + src.cache.push(makePacket([]byte{0x65, 0x88})) // IDR + for i := 0; i < 9; i++ { + src.cache.push(makePacket([]byte{0x41, byte(i)})) } - s.Start() + // Subscribe with bufDepth=3 — only 3 should land. + ch := src.Subscribe(3) - raw, _ := (&rtp.Packet{ - Header: rtp.Header{Version: 2, PayloadType: 96, SequenceNumber: 42, SSRC: 1}, - Payload: []byte{0xAA}, - }).Marshal() - conn, _ := net.Dial("udp", s.LocalAddr().String()) - defer conn.Close() - _, _ = conn.Write(raw) + if len(ch) != 3 { + t.Errorf("expected exactly 3 pre-filled packets (bufDepth cap), got %d", len(ch)) + } + src.Unsubscribe(ch) +} - for i, sub := range subs { +// TestSourceClose_UnsubscribesAll verifies that Close closes every subscriber +// channel so goroutines ranging over them terminate cleanly. +func TestSourceClose_UnsubscribesAll(t *testing.T) { + src, err := NewSource("test-close", 0) + if err != nil { + t.Fatalf("NewSource: %v", err) + } + + src.Start() + ch1 := src.Subscribe(8) + ch2 := src.Subscribe(8) + + if err := src.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + done := make(chan struct{}, 2) + go func() { + for range ch1 { + } + done <- struct{}{} + }() + go func() { + for range ch2 { + } + done <- struct{}{} + }() + + timeout := time.After(500 * time.Millisecond) + for i := 0; i < 2; i++ { select { - case got := <-sub: - if got.SequenceNumber != 42 { - t.Errorf("sub %d got seq %d, want 42", i, got.SequenceNumber) - } - case <-time.After(2 * time.Second): - t.Errorf("sub %d timed out", i) + case <-done: + case <-timeout: + t.Error("subscriber channel not closed within 500ms of src.Close()") + return } } } -func TestSource_UnsubscribeStopsDelivery(t *testing.T) { - s, _ := NewSource("streamA", 0) - defer s.Close() - sub := s.Subscribe(8) - s.Start() - s.Unsubscribe(sub) +// TestEnableKeyFrameCache_Idempotent verifies that calling EnableKeyFrameCache +// twice does not replace or reset an existing cache. +func TestEnableKeyFrameCache_Idempotent(t *testing.T) { + src, err := NewSource("test-idempotent", 0) + if err != nil { + t.Fatalf("NewSource: %v", err) + } + defer src.Close() - // After Unsubscribe, the channel should be closed. - select { - case _, ok := <-sub: - if ok { - t.Error("expected channel closed after Unsubscribe, got value") - } - case <-time.After(500 * time.Millisecond): - t.Error("timed out waiting for channel close") + src.EnableKeyFrameCache() + firstCache := src.cache + src.cache.push(makePacket([]byte{0x65, 0x01})) + + src.EnableKeyFrameCache() // second call — must be a no-op + + if src.cache != firstCache { + t.Error("EnableKeyFrameCache should not replace an existing cache") + } + if len(src.cache.snapshot()) != 1 { + t.Error("second EnableKeyFrameCache call should not clear the cache contents") } }