package webrtc import ( "fmt" "net" "sync" "github.com/pion/rtp" ) // Source reads RTP packets from a local UDP socket and fans them out to // subscribed peers via per-subscriber buffered channels. type Source struct { id string conn *net.UDPConn mu sync.Mutex subscribers map[chan *rtp.Packet]struct{} started bool closed bool done chan struct{} } // NewSource binds a UDP socket on 127.0.0.1:port. Pass port=0 to let the OS // assign an ephemeral port (useful for tests). func NewSource(streamID string, port int) (*Source, error) { addr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: port} conn, err := net.ListenUDP("udp4", addr) if err != nil { return nil, fmt.Errorf("webrtc: listen udp: %w", err) } return &Source{ id: streamID, conn: conn, subscribers: make(map[chan *rtp.Packet]struct{}), done: make(chan struct{}), }, nil } // ID returns the registered stream identifier. func (s *Source) ID() string { return s.id } // LocalAddr returns the UDP address the source is listening on. func (s *Source) LocalAddr() *net.UDPAddr { return s.conn.LocalAddr().(*net.UDPAddr) } // Subscribe returns a new buffered channel that receives every RTP packet // read from the UDP socket. bufDepth is the channel buffer size; when full, // packets are dropped (preventing a slow subscriber from back-pressuring // the reader). func (s *Source) Subscribe(bufDepth int) chan *rtp.Packet { ch := make(chan *rtp.Packet, bufDepth) s.mu.Lock() s.subscribers[ch] = struct{}{} s.mu.Unlock() return ch } // Unsubscribe removes ch from the subscriber set and closes it. func (s *Source) Unsubscribe(ch chan *rtp.Packet) { s.mu.Lock() defer s.mu.Unlock() if _, ok := s.subscribers[ch]; ok { delete(s.subscribers, ch) close(ch) } } // Start begins the RTP reader goroutine. Safe to call once; subsequent calls // are no-ops. func (s *Source) Start() { s.mu.Lock() if s.started || s.closed { s.mu.Unlock() return } s.started = true s.mu.Unlock() go s.readLoop() } func (s *Source) readLoop() { buf := make([]byte, 1500) // MTU-sized; RTP over UDP should fit for { select { case <-s.done: return default: } n, _, err := s.conn.ReadFromUDP(buf) if err != nil { // Socket closed or error — exit the loop. return } pkt := &rtp.Packet{} if err := pkt.Unmarshal(buf[:n]); err != nil { // Malformed packet; skip without crashing. continue } s.mu.Lock() for ch := range s.subscribers { select { case ch <- pkt: default: // Subscriber full — drop to protect the reader. } } s.mu.Unlock() } } // Close stops the reader goroutine, closes the UDP socket, and closes every // subscriber channel. func (s *Source) Close() error { s.mu.Lock() if s.closed { s.mu.Unlock() return nil } s.closed = true close(s.done) for ch := range s.subscribers { delete(s.subscribers, ch) close(ch) } s.mu.Unlock() return s.conn.Close() }