datarhei-dragonfork-core/core/webrtc/source.go
Zac Gaetano 9e3f031f95
Some checks failed
tests / build (push) Failing after 3s
CodeQL / Analyze (pull_request) Failing after 3s
tests / build (pull_request) Failing after 3s
feat(webrtc): add -rtp-host flag + TrueNAS Docker deploy
- core/webrtc: NewSourceOn(streamID, host, port) allows binding the
  RTP UDP socket on something other than 127.0.0.1, required when the
  PoC runs in a container and must accept RTP from LAN publishers.
  NewSource(streamID, port) stays as a convenience wrapper on
  127.0.0.1 for existing tests and tight local tests.

- cmd/webrtc-poc: new -rtp-host flag (default 127.0.0.1 for safety).

- deploy/docker/Dockerfile: two-stage build, scratch runtime, ~14 MB.

- deploy/truenas/docker-compose.yml: host-networked stack template
  driven by a .env file. Host networking is required for WebRTC ICE
  to work without NAT rewriting per-candidate.

- deploy/truenas/README.md: operator runbook with port picking,
  bring-up, verification curls, and security notes.
2026-04-17 09:05:37 -04:00

149 lines
3.5 KiB
Go

package webrtc
import (
"fmt"
"net"
"sync"
"github.com/pion/rtp"
)
// Source reads RTP packets from a local UDP socket and fans them out to
// subscribed peers via per-subscriber buffered channels.
type Source struct {
id string
conn *net.UDPConn
mu sync.Mutex
subscribers map[chan *rtp.Packet]struct{}
started bool
closed bool
done chan struct{}
}
// NewSource binds a UDP socket on 127.0.0.1:port. Pass port=0 to let the OS
// assign an ephemeral port (useful for tests). Equivalent to
// NewSourceOn(streamID, "127.0.0.1", port).
func NewSource(streamID string, port int) (*Source, error) {
return NewSourceOn(streamID, "127.0.0.1", port)
}
// NewSourceOn binds a UDP socket on host:port. Use "0.0.0.0" to accept
// RTP from any LAN publisher — required when running in a container
// with host networking that needs to receive from other hosts. Empty
// host is treated as 127.0.0.1 for backward compatibility.
func NewSourceOn(streamID, host string, port int) (*Source, error) {
if host == "" {
host = "127.0.0.1"
}
ip := net.ParseIP(host)
if ip == nil {
return nil, fmt.Errorf("webrtc: invalid host %q", host)
}
addr := &net.UDPAddr{IP: ip, Port: port}
conn, err := net.ListenUDP("udp4", addr)
if err != nil {
return nil, fmt.Errorf("webrtc: listen udp: %w", err)
}
return &Source{
id: streamID,
conn: conn,
subscribers: make(map[chan *rtp.Packet]struct{}),
done: make(chan struct{}),
}, nil
}
// ID returns the registered stream identifier.
func (s *Source) ID() string { return s.id }
// LocalAddr returns the UDP address the source is listening on.
func (s *Source) LocalAddr() *net.UDPAddr {
return s.conn.LocalAddr().(*net.UDPAddr)
}
// Subscribe returns a new buffered channel that receives every RTP packet
// read from the UDP socket. bufDepth is the channel buffer size; when full,
// packets are dropped (preventing a slow subscriber from back-pressuring
// the reader).
func (s *Source) Subscribe(bufDepth int) chan *rtp.Packet {
ch := make(chan *rtp.Packet, bufDepth)
s.mu.Lock()
s.subscribers[ch] = struct{}{}
s.mu.Unlock()
return ch
}
// Unsubscribe removes ch from the subscriber set and closes it.
func (s *Source) Unsubscribe(ch chan *rtp.Packet) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.subscribers[ch]; ok {
delete(s.subscribers, ch)
close(ch)
}
}
// Start begins the RTP reader goroutine. Safe to call once; subsequent calls
// are no-ops.
func (s *Source) Start() {
s.mu.Lock()
if s.started || s.closed {
s.mu.Unlock()
return
}
s.started = true
s.mu.Unlock()
go s.readLoop()
}
func (s *Source) readLoop() {
buf := make([]byte, 1500) // MTU-sized; RTP over UDP should fit
for {
select {
case <-s.done:
return
default:
}
n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
// Socket closed or error — exit the loop.
return
}
pkt := &rtp.Packet{}
if err := pkt.Unmarshal(buf[:n]); err != nil {
// Malformed packet; skip without crashing.
continue
}
s.mu.Lock()
for ch := range s.subscribers {
select {
case ch <- pkt:
default:
// Subscriber full — drop to protect the reader.
}
}
s.mu.Unlock()
}
}
// Close stops the reader goroutine, closes the UDP socket, and closes every
// subscriber channel.
func (s *Source) Close() error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return nil
}
s.closed = true
close(s.done)
for ch := range s.subscribers {
delete(s.subscribers, ch)
close(ch)
}
s.mu.Unlock()
return s.conn.Close()
}