diff --git a/core/webrtc/source.go b/core/webrtc/source.go index 521ec7f..7c3d4e8 100644 --- a/core/webrtc/source.go +++ b/core/webrtc/source.go @@ -19,6 +19,11 @@ type Source struct { started bool closed bool done chan struct{} + + // cache is non-nil only for video sources that have had + // EnableKeyFrameCache() called. It holds the most recent H.264 IDR + // burst so new subscribers can receive a keyframe immediately. + cache *keyFrameCache } // NewSource binds a UDP socket on 127.0.0.1:port. Pass port=0 to let the OS @@ -61,13 +66,58 @@ func (s *Source) LocalAddr() *net.UDPAddr { return s.conn.LocalAddr().(*net.UDPAddr) } +// EnableKeyFrameCache activates H.264 IDR keyframe burst caching for +// this source. Once enabled, new calls to Subscribe() will pre-fill the +// returned channel with the most recent IDR burst before registering it +// in the live fanout, cutting first-frame latency for late-joining peers +// from up to one keyframe interval to nearly zero. +// +// Call this on video sources only; calling it on audio sources is +// harmless but wastes memory accumulating non-IDR packets that will +// never trigger a cache reset. +// +// Must be called before Start(). Subsequent calls are no-ops. +func (s *Source) EnableKeyFrameCache() { + s.mu.Lock() + defer s.mu.Unlock() + if s.cache == nil { + s.cache = newKeyFrameCache() + } +} + // Subscribe returns a new buffered channel that receives every RTP packet // read from the UDP socket. bufDepth is the channel buffer size; when full, // packets are dropped (preventing a slow subscriber from back-pressuring // the reader). +// +// If a keyframe cache is active (EnableKeyFrameCache was called), the +// channel is pre-filled with the most recent IDR burst before being +// registered in the live fanout, so the subscriber receives a complete +// reference frame immediately rather than waiting for the next keyframe. func (s *Source) Subscribe(bufDepth int) chan *rtp.Packet { ch := make(chan *rtp.Packet, bufDepth) + + // Snapshot outside s.mu to avoid any cross-lock ordering issue: + // readLoop acquires cache.mu (in push) then s.mu (in fanout), so + // we must not hold s.mu while calling snapshot (which acquires + // cache.mu). s.cache itself is immutable after EnableKeyFrameCache. + var burst []*rtp.Packet + if s.cache != nil { + burst = s.cache.snapshot() + } + s.mu.Lock() + // Pre-fill with the IDR burst. Use a labeled break so that a full + // channel (bufDepth smaller than burst length) stops pre-filling + // gracefully — the subscriber will catch the next live keyframe. +prefill: + for _, pkt := range burst { + select { + case ch <- pkt: + default: + break prefill + } + } s.subscribers[ch] = struct{}{} s.mu.Unlock() return ch @@ -118,6 +168,15 @@ func (s *Source) readLoop() { continue } + // Update the keyframe cache (video sources only; push is a + // no-op on audio sources because isH264IDRStart returns false + // for Opus payload types). Called before the fanout so that a + // subscriber joining concurrently gets a snapshot that includes + // this packet if it is an IDR start. + if s.cache != nil { + s.cache.push(pkt) + } + s.mu.Lock() for ch := range s.subscribers { select {