2026-04-17 08:45:48 -04:00
|
|
|
package webrtc
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"net"
|
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
|
|
"github.com/pion/rtp"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Source reads RTP packets from a local UDP socket and fans them out to
|
|
|
|
|
// subscribed peers via per-subscriber buffered channels.
|
|
|
|
|
type Source struct {
|
|
|
|
|
id string
|
|
|
|
|
conn *net.UDPConn
|
|
|
|
|
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
subscribers map[chan *rtp.Packet]struct{}
|
|
|
|
|
started bool
|
|
|
|
|
closed bool
|
|
|
|
|
done chan struct{}
|
2026-05-09 19:04:17 -04:00
|
|
|
|
|
|
|
|
// cache is non-nil only for video sources that have had
|
|
|
|
|
// EnableKeyFrameCache() called. It holds the most recent H.264 IDR
|
|
|
|
|
// burst so new subscribers can receive a keyframe immediately.
|
|
|
|
|
cache *keyFrameCache
|
2026-04-17 08:45:48 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewSource binds a UDP socket on 127.0.0.1:port. Pass port=0 to let the OS
|
2026-04-17 09:05:37 -04:00
|
|
|
// assign an ephemeral port (useful for tests). Equivalent to
|
|
|
|
|
// NewSourceOn(streamID, "127.0.0.1", port).
|
2026-04-17 08:45:48 -04:00
|
|
|
func NewSource(streamID string, port int) (*Source, error) {
|
2026-04-17 09:05:37 -04:00
|
|
|
return NewSourceOn(streamID, "127.0.0.1", port)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewSourceOn binds a UDP socket on host:port. Use "0.0.0.0" to accept
|
|
|
|
|
// RTP from any LAN publisher — required when running in a container
|
|
|
|
|
// with host networking that needs to receive from other hosts. Empty
|
|
|
|
|
// host is treated as 127.0.0.1 for backward compatibility.
|
|
|
|
|
func NewSourceOn(streamID, host string, port int) (*Source, error) {
|
|
|
|
|
if host == "" {
|
|
|
|
|
host = "127.0.0.1"
|
|
|
|
|
}
|
|
|
|
|
ip := net.ParseIP(host)
|
|
|
|
|
if ip == nil {
|
|
|
|
|
return nil, fmt.Errorf("webrtc: invalid host %q", host)
|
|
|
|
|
}
|
|
|
|
|
addr := &net.UDPAddr{IP: ip, Port: port}
|
2026-04-17 08:45:48 -04:00
|
|
|
conn, err := net.ListenUDP("udp4", addr)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("webrtc: listen udp: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return &Source{
|
|
|
|
|
id: streamID,
|
|
|
|
|
conn: conn,
|
|
|
|
|
subscribers: make(map[chan *rtp.Packet]struct{}),
|
|
|
|
|
done: make(chan struct{}),
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ID returns the registered stream identifier.
|
|
|
|
|
func (s *Source) ID() string { return s.id }
|
|
|
|
|
|
|
|
|
|
// LocalAddr returns the UDP address the source is listening on.
|
|
|
|
|
func (s *Source) LocalAddr() *net.UDPAddr {
|
|
|
|
|
return s.conn.LocalAddr().(*net.UDPAddr)
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-09 19:04:17 -04:00
|
|
|
// EnableKeyFrameCache activates H.264 IDR keyframe burst caching for
|
|
|
|
|
// this source. Once enabled, new calls to Subscribe() will pre-fill the
|
|
|
|
|
// returned channel with the most recent IDR burst before registering it
|
|
|
|
|
// in the live fanout, cutting first-frame latency for late-joining peers
|
|
|
|
|
// from up to one keyframe interval to nearly zero.
|
|
|
|
|
//
|
|
|
|
|
// Call this on video sources only; calling it on audio sources is
|
|
|
|
|
// harmless but wastes memory accumulating non-IDR packets that will
|
|
|
|
|
// never trigger a cache reset.
|
|
|
|
|
//
|
|
|
|
|
// Must be called before Start(). Subsequent calls are no-ops.
|
|
|
|
|
func (s *Source) EnableKeyFrameCache() {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
if s.cache == nil {
|
|
|
|
|
s.cache = newKeyFrameCache()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-17 08:45:48 -04:00
|
|
|
// Subscribe returns a new buffered channel that receives every RTP packet
|
|
|
|
|
// read from the UDP socket. bufDepth is the channel buffer size; when full,
|
|
|
|
|
// packets are dropped (preventing a slow subscriber from back-pressuring
|
|
|
|
|
// the reader).
|
2026-05-09 19:04:17 -04:00
|
|
|
//
|
|
|
|
|
// If a keyframe cache is active (EnableKeyFrameCache was called), the
|
|
|
|
|
// channel is pre-filled with the most recent IDR burst before being
|
|
|
|
|
// registered in the live fanout, so the subscriber receives a complete
|
|
|
|
|
// reference frame immediately rather than waiting for the next keyframe.
|
2026-04-17 08:45:48 -04:00
|
|
|
func (s *Source) Subscribe(bufDepth int) chan *rtp.Packet {
|
|
|
|
|
ch := make(chan *rtp.Packet, bufDepth)
|
2026-05-09 19:04:17 -04:00
|
|
|
|
|
|
|
|
// Snapshot outside s.mu to avoid any cross-lock ordering issue:
|
|
|
|
|
// readLoop acquires cache.mu (in push) then s.mu (in fanout), so
|
|
|
|
|
// we must not hold s.mu while calling snapshot (which acquires
|
|
|
|
|
// cache.mu). s.cache itself is immutable after EnableKeyFrameCache.
|
|
|
|
|
var burst []*rtp.Packet
|
|
|
|
|
if s.cache != nil {
|
|
|
|
|
burst = s.cache.snapshot()
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-17 08:45:48 -04:00
|
|
|
s.mu.Lock()
|
2026-05-09 19:04:17 -04:00
|
|
|
// Pre-fill with the IDR burst. Use a labeled break so that a full
|
|
|
|
|
// channel (bufDepth smaller than burst length) stops pre-filling
|
|
|
|
|
// gracefully — the subscriber will catch the next live keyframe.
|
|
|
|
|
prefill:
|
|
|
|
|
for _, pkt := range burst {
|
|
|
|
|
select {
|
|
|
|
|
case ch <- pkt:
|
|
|
|
|
default:
|
|
|
|
|
break prefill
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-04-17 08:45:48 -04:00
|
|
|
s.subscribers[ch] = struct{}{}
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
return ch
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Unsubscribe removes ch from the subscriber set and closes it.
|
|
|
|
|
func (s *Source) Unsubscribe(ch chan *rtp.Packet) {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
if _, ok := s.subscribers[ch]; ok {
|
|
|
|
|
delete(s.subscribers, ch)
|
|
|
|
|
close(ch)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Start begins the RTP reader goroutine. Safe to call once; subsequent calls
|
|
|
|
|
// are no-ops.
|
|
|
|
|
func (s *Source) Start() {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
if s.started || s.closed {
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
s.started = true
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
go s.readLoop()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Source) readLoop() {
|
|
|
|
|
buf := make([]byte, 1500) // MTU-sized; RTP over UDP should fit
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.done:
|
|
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
n, _, err := s.conn.ReadFromUDP(buf)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// Socket closed or error — exit the loop.
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pkt := &rtp.Packet{}
|
|
|
|
|
if err := pkt.Unmarshal(buf[:n]); err != nil {
|
|
|
|
|
// Malformed packet; skip without crashing.
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-09 19:04:17 -04:00
|
|
|
// Update the keyframe cache (video sources only; push is a
|
|
|
|
|
// no-op on audio sources because isH264IDRStart returns false
|
|
|
|
|
// for Opus payload types). Called before the fanout so that a
|
|
|
|
|
// subscriber joining concurrently gets a snapshot that includes
|
|
|
|
|
// this packet if it is an IDR start.
|
|
|
|
|
if s.cache != nil {
|
|
|
|
|
s.cache.push(pkt)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-17 08:45:48 -04:00
|
|
|
s.mu.Lock()
|
|
|
|
|
for ch := range s.subscribers {
|
|
|
|
|
select {
|
|
|
|
|
case ch <- pkt:
|
|
|
|
|
default:
|
|
|
|
|
// Subscriber full — drop to protect the reader.
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close stops the reader goroutine, closes the UDP socket, and closes every
|
|
|
|
|
// subscriber channel.
|
|
|
|
|
func (s *Source) Close() error {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
if s.closed {
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
s.closed = true
|
|
|
|
|
close(s.done)
|
|
|
|
|
for ch := range s.subscribers {
|
|
|
|
|
delete(s.subscribers, ch)
|
|
|
|
|
close(ch)
|
|
|
|
|
}
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
return s.conn.Close()
|
|
|
|
|
}
|