From 020a1800ce19f297a61d2dd2629ce830e0cd1fd8 Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Sat, 9 May 2026 19:04:17 -0400 Subject: [PATCH] feat(webrtc): wire keyframe cache into Source (issue #17) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add cache *keyFrameCache field to Source (nil by default). - Add EnableKeyFrameCache() — call before Start() on video sources to activate IDR burst caching. - readLoop: call cache.push(pkt) after each successful unmarshal, before the subscriber fanout. No lock held at push time — push acquires its own mutex internally. - Subscribe: snapshot the cache outside s.mu to avoid any cross-lock complexity, then pre-fill the new channel with the burst before registering it in the subscriber set. Uses a labeled break to stop pre-filling if the channel is full (bufDepth too small for the burst; the subscriber will wait for the next live keyframe instead). --- core/webrtc/source.go | 59 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/core/webrtc/source.go b/core/webrtc/source.go index 521ec7f..7c3d4e8 100644 --- a/core/webrtc/source.go +++ b/core/webrtc/source.go @@ -19,6 +19,11 @@ type Source struct { started bool closed bool done chan struct{} + + // cache is non-nil only for video sources that have had + // EnableKeyFrameCache() called. It holds the most recent H.264 IDR + // burst so new subscribers can receive a keyframe immediately. + cache *keyFrameCache } // NewSource binds a UDP socket on 127.0.0.1:port. Pass port=0 to let the OS @@ -61,13 +66,58 @@ func (s *Source) LocalAddr() *net.UDPAddr { return s.conn.LocalAddr().(*net.UDPAddr) } +// EnableKeyFrameCache activates H.264 IDR keyframe burst caching for +// this source. Once enabled, new calls to Subscribe() will pre-fill the +// returned channel with the most recent IDR burst before registering it +// in the live fanout, cutting first-frame latency for late-joining peers +// from up to one keyframe interval to nearly zero. +// +// Call this on video sources only; calling it on audio sources is +// harmless but wastes memory accumulating non-IDR packets that will +// never trigger a cache reset. +// +// Must be called before Start(). Subsequent calls are no-ops. +func (s *Source) EnableKeyFrameCache() { + s.mu.Lock() + defer s.mu.Unlock() + if s.cache == nil { + s.cache = newKeyFrameCache() + } +} + // Subscribe returns a new buffered channel that receives every RTP packet // read from the UDP socket. bufDepth is the channel buffer size; when full, // packets are dropped (preventing a slow subscriber from back-pressuring // the reader). +// +// If a keyframe cache is active (EnableKeyFrameCache was called), the +// channel is pre-filled with the most recent IDR burst before being +// registered in the live fanout, so the subscriber receives a complete +// reference frame immediately rather than waiting for the next keyframe. func (s *Source) Subscribe(bufDepth int) chan *rtp.Packet { ch := make(chan *rtp.Packet, bufDepth) + + // Snapshot outside s.mu to avoid any cross-lock ordering issue: + // readLoop acquires cache.mu (in push) then s.mu (in fanout), so + // we must not hold s.mu while calling snapshot (which acquires + // cache.mu). s.cache itself is immutable after EnableKeyFrameCache. + var burst []*rtp.Packet + if s.cache != nil { + burst = s.cache.snapshot() + } + s.mu.Lock() + // Pre-fill with the IDR burst. Use a labeled break so that a full + // channel (bufDepth smaller than burst length) stops pre-filling + // gracefully — the subscriber will catch the next live keyframe. +prefill: + for _, pkt := range burst { + select { + case ch <- pkt: + default: + break prefill + } + } s.subscribers[ch] = struct{}{} s.mu.Unlock() return ch @@ -118,6 +168,15 @@ func (s *Source) readLoop() { continue } + // Update the keyframe cache (video sources only; push is a + // no-op on audio sources because isH264IDRStart returns false + // for Opus payload types). Called before the fanout so that a + // subscriber joining concurrently gets a snapshot that includes + // this packet if it is an IDR start. + if s.cache != nil { + s.cache.push(pkt) + } + s.mu.Lock() for ch := range s.subscribers { select {