test(webrtc): Source Subscribe pre-fill, Close, and EnableKeyFrameCache
Covers: - Subscribe pre-fills channel from IDR cache immediately on call - No pre-fill when cache is not enabled - Labeled-break stops pre-fill when bufDepth < burst length - Close closes all subscriber channels (no goroutine leak) - EnableKeyFrameCache is idempotent (second call is a no-op)
This commit is contained in:
parent
293536563f
commit
228ed4b09b
1 changed files with 112 additions and 91 deletions
|
|
@ -1,129 +1,150 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
func TestSource_ID(t *testing.T) {
|
||||
s, err := NewSource("streamA", 0) // 0 = ephemeral port
|
||||
// TestSourceSubscribe_PreFillFromCache verifies that a subscriber joining
|
||||
// after an IDR packet has been pushed immediately receives the cached burst
|
||||
// before any live packets arrive.
|
||||
func TestSourceSubscribe_PreFillFromCache(t *testing.T) {
|
||||
src, err := NewSource("test-prefill", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("NewSource: %v", err)
|
||||
}
|
||||
defer s.Close()
|
||||
defer src.Close()
|
||||
|
||||
if s.ID() != "streamA" {
|
||||
t.Errorf("ID() = %q, want streamA", s.ID())
|
||||
}
|
||||
src.EnableKeyFrameCache()
|
||||
src.Start()
|
||||
|
||||
// Push directly into the cache — no need to go through UDP.
|
||||
idrPkt := makePacket([]byte{0x65, 0x88, 0x84})
|
||||
src.cache.push(idrPkt)
|
||||
for i := 0; i < 3; i++ {
|
||||
src.cache.push(makePacket([]byte{0x41, byte(i)}))
|
||||
}
|
||||
|
||||
func TestSource_ReceiveAndFanout(t *testing.T) {
|
||||
s, err := NewSource("streamA", 0)
|
||||
// Subscribe with a buffer big enough to hold the burst.
|
||||
ch := src.Subscribe(64)
|
||||
|
||||
// Channel should already contain 4 packets — no live UDP required.
|
||||
if len(ch) != 4 {
|
||||
t.Errorf("expected 4 pre-filled packets, got %d", len(ch))
|
||||
}
|
||||
|
||||
// First packet must be the IDR.
|
||||
first := <-ch
|
||||
if first.Payload[0]&0x1F != 5 {
|
||||
t.Errorf("first pre-fill packet should be IDR (type 5), got type %d", first.Payload[0]&0x1F)
|
||||
}
|
||||
|
||||
src.Unsubscribe(ch)
|
||||
}
|
||||
|
||||
// TestSourceSubscribe_NoCacheByDefault verifies that without
|
||||
// EnableKeyFrameCache the channel starts empty.
|
||||
func TestSourceSubscribe_NoCacheByDefault(t *testing.T) {
|
||||
src, err := NewSource("test-nocache", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("NewSource: %v", err)
|
||||
}
|
||||
defer s.Close()
|
||||
defer src.Close()
|
||||
|
||||
// Subscribe before sending.
|
||||
sub := s.Subscribe(16) // buffer depth 16
|
||||
defer s.Unsubscribe(sub)
|
||||
src.Start()
|
||||
ch := src.Subscribe(64)
|
||||
|
||||
s.Start()
|
||||
|
||||
// Build and send a minimal RTP packet to the source's UDP port.
|
||||
pkt := &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 1,
|
||||
Timestamp: 1000,
|
||||
SSRC: 0xDEADBEEF,
|
||||
},
|
||||
Payload: []byte{0x01, 0x02, 0x03, 0x04},
|
||||
if len(ch) != 0 {
|
||||
t.Errorf("expected empty channel without cache, got %d packets", len(ch))
|
||||
}
|
||||
raw, err := pkt.Marshal()
|
||||
if err != nil {
|
||||
t.Fatalf("pkt.Marshal: %v", err)
|
||||
src.Unsubscribe(ch)
|
||||
}
|
||||
|
||||
conn, err := net.Dial("udp", s.LocalAddr().String())
|
||||
if err != nil {
|
||||
t.Fatalf("net.Dial: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
if _, err := conn.Write(raw); err != nil {
|
||||
t.Fatalf("conn.Write: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case got := <-sub:
|
||||
if got.SSRC != 0xDEADBEEF {
|
||||
t.Errorf("received SSRC = %x, want DEADBEEF", got.SSRC)
|
||||
}
|
||||
if got.SequenceNumber != 1 {
|
||||
t.Errorf("received SeqNum = %d, want 1", got.SequenceNumber)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for RTP packet on subscriber channel")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSource_MultipleSubscribers(t *testing.T) {
|
||||
s, err := NewSource("streamA", 0)
|
||||
// TestSourceSubscribe_PreFillStopsOnFullChannel verifies that pre-fill does
|
||||
// not block when bufDepth is smaller than the burst length.
|
||||
func TestSourceSubscribe_PreFillStopsOnFullChannel(t *testing.T) {
|
||||
src, err := NewSource("test-smallbuf", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("NewSource: %v", err)
|
||||
}
|
||||
defer s.Close()
|
||||
defer src.Close()
|
||||
|
||||
subs := []chan *rtp.Packet{
|
||||
s.Subscribe(8),
|
||||
s.Subscribe(8),
|
||||
s.Subscribe(8),
|
||||
}
|
||||
for _, sub := range subs {
|
||||
defer s.Unsubscribe(sub)
|
||||
src.EnableKeyFrameCache()
|
||||
src.Start()
|
||||
|
||||
// Push 10 packets into the cache.
|
||||
src.cache.push(makePacket([]byte{0x65, 0x88})) // IDR
|
||||
for i := 0; i < 9; i++ {
|
||||
src.cache.push(makePacket([]byte{0x41, byte(i)}))
|
||||
}
|
||||
|
||||
s.Start()
|
||||
// Subscribe with bufDepth=3 — only 3 should land.
|
||||
ch := src.Subscribe(3)
|
||||
|
||||
raw, _ := (&rtp.Packet{
|
||||
Header: rtp.Header{Version: 2, PayloadType: 96, SequenceNumber: 42, SSRC: 1},
|
||||
Payload: []byte{0xAA},
|
||||
}).Marshal()
|
||||
conn, _ := net.Dial("udp", s.LocalAddr().String())
|
||||
defer conn.Close()
|
||||
_, _ = conn.Write(raw)
|
||||
if len(ch) != 3 {
|
||||
t.Errorf("expected exactly 3 pre-filled packets (bufDepth cap), got %d", len(ch))
|
||||
}
|
||||
src.Unsubscribe(ch)
|
||||
}
|
||||
|
||||
for i, sub := range subs {
|
||||
// TestSourceClose_UnsubscribesAll verifies that Close closes every subscriber
|
||||
// channel so goroutines ranging over them terminate cleanly.
|
||||
func TestSourceClose_UnsubscribesAll(t *testing.T) {
|
||||
src, err := NewSource("test-close", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("NewSource: %v", err)
|
||||
}
|
||||
|
||||
src.Start()
|
||||
ch1 := src.Subscribe(8)
|
||||
ch2 := src.Subscribe(8)
|
||||
|
||||
if err := src.Close(); err != nil {
|
||||
t.Fatalf("Close: %v", err)
|
||||
}
|
||||
|
||||
done := make(chan struct{}, 2)
|
||||
go func() {
|
||||
for range ch1 {
|
||||
}
|
||||
done <- struct{}{}
|
||||
}()
|
||||
go func() {
|
||||
for range ch2 {
|
||||
}
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
timeout := time.After(500 * time.Millisecond)
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case got := <-sub:
|
||||
if got.SequenceNumber != 42 {
|
||||
t.Errorf("sub %d got seq %d, want 42", i, got.SequenceNumber)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Errorf("sub %d timed out", i)
|
||||
case <-done:
|
||||
case <-timeout:
|
||||
t.Error("subscriber channel not closed within 500ms of src.Close()")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSource_UnsubscribeStopsDelivery(t *testing.T) {
|
||||
s, _ := NewSource("streamA", 0)
|
||||
defer s.Close()
|
||||
sub := s.Subscribe(8)
|
||||
s.Start()
|
||||
s.Unsubscribe(sub)
|
||||
// TestEnableKeyFrameCache_Idempotent verifies that calling EnableKeyFrameCache
|
||||
// twice does not replace or reset an existing cache.
|
||||
func TestEnableKeyFrameCache_Idempotent(t *testing.T) {
|
||||
src, err := NewSource("test-idempotent", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("NewSource: %v", err)
|
||||
}
|
||||
defer src.Close()
|
||||
|
||||
// After Unsubscribe, the channel should be closed.
|
||||
select {
|
||||
case _, ok := <-sub:
|
||||
if ok {
|
||||
t.Error("expected channel closed after Unsubscribe, got value")
|
||||
src.EnableKeyFrameCache()
|
||||
firstCache := src.cache
|
||||
src.cache.push(makePacket([]byte{0x65, 0x01}))
|
||||
|
||||
src.EnableKeyFrameCache() // second call — must be a no-op
|
||||
|
||||
if src.cache != firstCache {
|
||||
t.Error("EnableKeyFrameCache should not replace an existing cache")
|
||||
}
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
t.Error("timed out waiting for channel close")
|
||||
if len(src.cache.snapshot()) != 1 {
|
||||
t.Error("second EnableKeyFrameCache call should not clear the cache contents")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue