feat(webrtc): wire keyframe cache into Source (issue #17)
Some checks failed
ci / race tests (push) Blocked by required conditions
ci / WebRTC smoke (5-viewer fanout) (push) Blocked by required conditions
ci / WebRTC latency p95 gate (push) Blocked by required conditions
ci / vet + build (push) Has been cancelled

- Add cache *keyFrameCache field to Source (nil by default).
- Add EnableKeyFrameCache() — call before Start() on video sources to
  activate IDR burst caching.
- readLoop: call cache.push(pkt) after each successful unmarshal, before
  the subscriber fanout. No lock held at push time — push acquires its
  own mutex internally.
- Subscribe: snapshot the cache outside s.mu to avoid any cross-lock
  complexity, then pre-fill the new channel with the burst before
  registering it in the subscriber set. Uses a labeled break to stop
  pre-filling if the channel is full (bufDepth too small for the burst;
  the subscriber will wait for the next live keyframe instead).
This commit is contained in:
Zac Gaetano 2026-05-09 19:04:17 -04:00
parent a2e0a8c083
commit 020a1800ce

View file

@ -19,6 +19,11 @@ type Source struct {
started bool
closed bool
done chan struct{}
// cache is non-nil only for video sources that have had
// EnableKeyFrameCache() called. It holds the most recent H.264 IDR
// burst so new subscribers can receive a keyframe immediately.
cache *keyFrameCache
}
// NewSource binds a UDP socket on 127.0.0.1:port. Pass port=0 to let the OS
@ -61,13 +66,58 @@ func (s *Source) LocalAddr() *net.UDPAddr {
return s.conn.LocalAddr().(*net.UDPAddr)
}
// EnableKeyFrameCache activates H.264 IDR keyframe burst caching for
// this source. Once enabled, new calls to Subscribe() will pre-fill the
// returned channel with the most recent IDR burst before registering it
// in the live fanout, cutting first-frame latency for late-joining peers
// from up to one keyframe interval to nearly zero.
//
// Call this on video sources only; calling it on audio sources is
// harmless but wastes memory accumulating non-IDR packets that will
// never trigger a cache reset.
//
// Must be called before Start(). Subsequent calls are no-ops.
func (s *Source) EnableKeyFrameCache() {
s.mu.Lock()
defer s.mu.Unlock()
if s.cache == nil {
s.cache = newKeyFrameCache()
}
}
// Subscribe returns a new buffered channel that receives every RTP packet
// read from the UDP socket. bufDepth is the channel buffer size; when full,
// packets are dropped (preventing a slow subscriber from back-pressuring
// the reader).
//
// If a keyframe cache is active (EnableKeyFrameCache was called), the
// channel is pre-filled with the most recent IDR burst before being
// registered in the live fanout, so the subscriber receives a complete
// reference frame immediately rather than waiting for the next keyframe.
func (s *Source) Subscribe(bufDepth int) chan *rtp.Packet {
ch := make(chan *rtp.Packet, bufDepth)
// Snapshot outside s.mu to avoid any cross-lock ordering issue:
// readLoop acquires cache.mu (in push) then s.mu (in fanout), so
// we must not hold s.mu while calling snapshot (which acquires
// cache.mu). s.cache itself is immutable after EnableKeyFrameCache.
var burst []*rtp.Packet
if s.cache != nil {
burst = s.cache.snapshot()
}
s.mu.Lock()
// Pre-fill with the IDR burst. Use a labeled break so that a full
// channel (bufDepth smaller than burst length) stops pre-filling
// gracefully — the subscriber will catch the next live keyframe.
prefill:
for _, pkt := range burst {
select {
case ch <- pkt:
default:
break prefill
}
}
s.subscribers[ch] = struct{}{}
s.mu.Unlock()
return ch
@ -118,6 +168,15 @@ func (s *Source) readLoop() {
continue
}
// Update the keyframe cache (video sources only; push is a
// no-op on audio sources because isH264IDRStart returns false
// for Opus payload types). Called before the fanout so that a
// subscriber joining concurrently gets a snapshot that includes
// this packet if it is an IDR start.
if s.cache != nil {
s.cache.push(pkt)
}
s.mu.Lock()
for ch := range s.subscribers {
select {