datarhei-dragonfork-core/core/webrtc/keyframecache.go
ZGaetano 8266ca72e6
Some checks failed
ci / race tests (push) Blocked by required conditions
ci / WebRTC smoke (5-viewer fanout) (push) Blocked by required conditions
ci / WebRTC latency p95 gate (push) Blocked by required conditions
ci / vet + build (push) Has been cancelled
fix(webrtc): detect STAP-A IDR start in keyframe cache (issue #18)
Extend isH264IDRStart to handle STAP-A aggregates (NAL type 24, RFC 6184
§5.7.1). The first NAL in the aggregate starts at byte 3 (after the
2-byte size field); if its type is 5 (IDR slice) the packet is treated
as an IDR start and the burst cache is reset.

This closes the gap noted in NOTES.md: a publisher using STAP-A for IDR
(e.g. a custom GStreamer pipeline or hardware encoder) will now correctly
reset the burst rather than accumulating packets until hitting the 512-
packet / 2 MiB capacity cap.
2026-05-10 13:19:56 -04:00

102 lines
3.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package webrtc
import (
"sync"
"github.com/pion/rtp"
)
// keyFrameCache retains the most recent H.264 keyframe burst so that
// new WHEP subscribers can receive it immediately on Subscribe(),
// cutting first-frame latency from up to one IDR interval (typically
// 2 s at a 0.5 Hz keyframe rate) to nearly zero.
//
// A "burst" spans all RTP packets from the first fragment of an IDR NAL
// until (but not including) the next IDR NAL. The cache is bounded by
// maxPackets and maxBytes to cap per-stream memory usage.
//
// Thread safety: all public methods are safe for concurrent use.
// push() is intended to be called only from the single-goroutine
// readLoop — the lock it holds is small and brief.
type keyFrameCache struct {
mu sync.Mutex
packets []*rtp.Packet
byteLen int
maxPackets int
maxBytes int
}
// newKeyFrameCache returns a cache bounded to 512 packets / 2 MiB.
// At typical H.264 streaming bitrates (14 Mbps), an IDR frame plus a
// handful of subsequent P-frames fits comfortably within these limits.
func newKeyFrameCache() *keyFrameCache {
return &keyFrameCache{
packets: make([]*rtp.Packet, 0, 64),
maxPackets: 512,
maxBytes: 2 << 20, // 2 MiB
}
}
// isH264IDRStart returns true if pkt begins an H.264 IDR (keyframe)
// NAL. It recognises three RFC 6184 packetisation modes:
//
// - Single NAL unit (type 5): the entire payload is one IDR slice.
// - FU-A fragment (type 28): the FU header byte has the start bit set
// (0x80) and the inner NAL type is 5.
// - STAP-A aggregate (type 24): the first NAL in the aggregate is an
// IDR slice. STAP-A format: byte 0 = NAL header (type 24), bytes
// 12 = first NAL size (big-endian uint16), byte 3 = first NAL
// header. Minimum valid payload: 4 bytes.
func isH264IDRStart(pkt *rtp.Packet) bool {
p := pkt.Payload
if len(p) == 0 {
return false
}
nalType := p[0] & 0x1F
switch nalType {
case 5: // Single NAL unit, IDR slice
return true
case 24: // STAP-A — bytes 12 are the first NAL's size; byte 3 is its header
return len(p) >= 4 && p[3]&0x1F == 5
case 28: // FU-A — byte 1 is the FU header: bit 7 = start, bits 40 = inner type
return len(p) >= 2 && p[1]&0x80 != 0 && p[1]&0x1F == 5
}
return false
}
// push appends pkt to the cache. If pkt is the start of an H.264 IDR
// NAL the existing burst is cleared first so the cache always holds
// exactly one complete keyframe burst. Packets beyond the capacity
// limits are silently dropped.
//
// push is called exclusively from readLoop (a single goroutine); the
// isH264IDRStart check outside the lock is therefore safe.
func (c *keyFrameCache) push(pkt *rtp.Packet) {
isIDR := isH264IDRStart(pkt)
payloadLen := len(pkt.Payload)
c.mu.Lock()
if isIDR {
c.packets = c.packets[:0]
c.byteLen = 0
}
if len(c.packets) < c.maxPackets && c.byteLen+payloadLen <= c.maxBytes {
c.packets = append(c.packets, pkt)
c.byteLen += payloadLen
}
c.mu.Unlock()
}
// snapshot returns a shallow copy of the current burst. The returned
// slice is safe to iterate without holding any lock; the *rtp.Packet
// values are never mutated after being placed in the cache.
// Returns nil when the cache is empty.
func (c *keyFrameCache) snapshot() []*rtp.Packet {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.packets) == 0 {
return nil
}
snap := make([]*rtp.Packet, len(c.packets))
copy(snap, c.packets)
return snap
}