diff --git a/NOTES.md b/NOTES.md new file mode 100644 index 0000000..b115634 --- /dev/null +++ b/NOTES.md @@ -0,0 +1,26 @@ +# Datarhei - Dragon Fork — Implementation Notes + +This file tracks observations, gotchas, and decisions made during the Dragon +Fork WebRTC egress implementation. Keep entries chronological; each milestone +adds a new section. + +## Baseline (M1, 2026-04-17) + +- Forked from upstream `datarhei/core` commit `0de97f4` ("Add linux/arm/v8 build"). +- Upstream module path: `github.com/datarhei/core/v16`. The Dragon Fork keeps + this module path so internal imports don't churn; the fork is distinguished + by its repo location (`forge.wilddragon.net/zgaetano/datarhei-dragonfork-core`) + and branch history, not its Go module identity. +- Toolchain: Go 1.22.8, FFmpeg 4.4.2 in the sandbox. FFmpeg 6.x recommended + for publishers in Task 10; 4.4.2 is sufficient for the PoC (libx264 + + libopus + RTP muxer all present). +- `go build ./...` on the clean fork: succeeds. +- `go test -short ./...` on the clean fork: all packages pass. No upstream + flakes observed. + +### Pre-existing state of note +- None flagged. + +--- + + diff --git a/docs/design/2026-04-16-datarhei-dragon-fork-m1-webrtc-poc.md b/docs/design/2026-04-16-datarhei-dragon-fork-m1-webrtc-poc.md new file mode 100755 index 0000000..e0776a0 --- /dev/null +++ b/docs/design/2026-04-16-datarhei-dragon-fork-m1-webrtc-poc.md @@ -0,0 +1,2081 @@ +# Datarhei - Dragon Fork M1: Media-Path PoC Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Prove a working end-to-end WebRTC egress path: an FFmpeg publisher pushes RTP into a new Go package, which serves it to a Pion-based WHEP client that successfully decodes video frames. + +**Architecture:** New standalone Go package `core/webrtc` inside the datarhei Core fork. FFmpeg produces RTP on a local UDP socket → package reads RTP → WHEP HTTP endpoint serves it via Pion `PeerConnection` → test client subscribes and decodes. No datarhei process-model integration yet (that's M2). This milestone answers: *does the integration pattern actually work, and what are the gotchas?* + +**Tech Stack:** Go 1.22+, [Pion WebRTC v4](https://github.com/pion/webrtc) (`github.com/pion/webrtc/v4`), [Pion RTP](https://github.com/pion/rtp), FFmpeg 6.x (publisher + test pattern), standard library `net/http`. + +**Out of scope for this plan:** datarhei process-model integration (M2), multi-viewer fan-out polish (M3), CI test harness (M4), branding (M5). Separate plans will be written for each after M1 completes. + +--- + +## Prerequisites + +- Go 1.22+ installed locally +- `git` configured +- FFmpeg 6.x on PATH (`ffmpeg -version` reports 6.0 or newer) +- A Git host account (GitHub recommended for initial fork) +- Linux or macOS development machine (Windows works but UDP port behavior differs; document what you're on when filing any issues) + +--- + +## File Structure + +Files created in this milestone: + +``` +core/webrtc/ + config.go # Config struct + defaults + Validate() + registry.go # stream_id → *Source map, thread-safe + registry_test.go + source.go # RTP UDP reader + fan-out ring buffer + source_test.go + peer.go # PeerConnection factory, track attachment + peer_test.go + whep.go # HTTP handler: POST /whep/{stream_id} + whep_test.go + ice.go # SettingEngine builder (NAT1To1, ICE servers) + ice_test.go + errors.go # Typed error values (ErrStreamNotFound, etc.) + +cmd/webrtc-poc/ + main.go # Standalone PoC binary (NOT datarhei Core yet) + +test/ + publish.sh # FFmpeg publisher script (testsrc2 → local RTP) + whep-client/ + main.go # Pion-based test WHEP subscriber + main_test.go + +docs/design/ + (copy of the approved spec from brainstorming) +``` + +Total new Go files: 11 source + 5 test = 16 files. Total lines: ~1200-1500 including tests and comments. + +--- + +## Task 0: Fork the repo and set up the workspace + +**Files:** +- Create: new fork of `datarhei/core` + +**Rationale:** Everything else depends on having a fork to commit into. No code yet — just repo setup. + +- [ ] **Step 1: Fork datarhei/core on your Git host** + +On GitHub: navigate to https://github.com/datarhei/core, click Fork, name the fork `datarhei-dragonfork-core` under your `wilddragon` org (or personal account). + +- [ ] **Step 2: Clone the fork locally** + +Run: +```bash +git clone git@github.com:wilddragon/datarhei-dragonfork-core.git +cd datarhei-dragonfork-core +``` + +Expected: repo cloned, `git status` clean on `main` branch. + +- [ ] **Step 3: Create the M1 working branch** + +Run: +```bash +git checkout -b m1-webrtc-poc +``` + +Expected: branch created and checked out. + +- [ ] **Step 4: Copy the approved design spec into the repo** + +```bash +mkdir -p docs/design +cp /path/to/2026-04-16-datarhei-dragon-fork-webrtc-design.md docs/design/ +``` + +(Path will be wherever you saved the spec from the brainstorming session.) + +- [ ] **Step 5: Verify the repo builds unchanged** + +Run: +```bash +go build ./... +``` + +Expected: build succeeds. If it fails, the fork is broken before you started — stop and fix upstream issues first. + +- [ ] **Step 6: Run upstream tests** + +Run: +```bash +go test ./... +``` + +Expected: all tests pass (or at least match what upstream CI shows green). Document any pre-existing flakes in a NOTES.md file so you don't later blame your changes for them. + +- [ ] **Step 7: Commit the spec and a NOTES.md baseline** + +```bash +git add docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md NOTES.md +git commit -m "docs: add Dragon Fork WebRTC egress design spec" +``` + +--- + +## Task 1: Add Pion WebRTC dependency + +**Files:** +- Modify: `go.mod` +- Modify: `go.sum` + +- [ ] **Step 1: Add Pion dependencies** + +Run from repo root: +```bash +go get github.com/pion/webrtc/v4@latest +go get github.com/pion/rtp@latest +go get github.com/pion/rtcp@latest +``` + +Expected: `go.mod` updated with new `require` entries; `go.sum` updated. + +- [ ] **Step 2: Tidy dependencies** + +Run: +```bash +go mod tidy +``` + +Expected: no errors. `go.mod` stable. + +- [ ] **Step 3: Sanity-check that Pion loads** + +Create a throwaway file `/tmp/pion_smoke.go`: +```go +package main + +import ( + "fmt" + + "github.com/pion/webrtc/v4" +) + +func main() { + api := webrtc.NewAPI() + pc, err := api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + panic(err) + } + fmt.Println("Pion OK, state:", pc.ConnectionState()) + _ = pc.Close() +} +``` + +Run: +```bash +go run /tmp/pion_smoke.go +``` + +Expected output: `Pion OK, state: new` (or similar). Delete the file after. + +- [ ] **Step 4: Commit** + +```bash +git add go.mod go.sum +git commit -m "build: add Pion WebRTC v4 dependency" +``` + +--- + +## Task 2: Create the core/webrtc package skeleton + typed errors + +**Files:** +- Create: `core/webrtc/errors.go` +- Create: `core/webrtc/doc.go` + +- [ ] **Step 1: Create `core/webrtc/doc.go`** + +```go +// Package webrtc implements the Dragon Fork WebRTC egress module. +// +// It exposes a WHEP (WebRTC-HTTP Egress Protocol) HTTP endpoint and serves +// live RTP produced by an FFmpeg process on a local UDP socket to one or +// more WebRTC peer connections built with Pion. +// +// This package is additive: it does not modify existing datarhei ingest, +// transcode, or non-WebRTC output code paths. The only contact with +// existing code is a new URL scheme ("webrtc://") registered with the +// output resolver (done in milestone M2, not here). +package webrtc +``` + +- [ ] **Step 2: Create `core/webrtc/errors.go`** + +```go +package webrtc + +import "errors" + +// Sentinel errors returned by package functions. +var ( + // ErrStreamNotFound indicates a WHEP subscribe referenced a stream_id + // that has no registered source. Maps to HTTP 404. + ErrStreamNotFound = errors.New("webrtc: stream not found") + + // ErrPeerCapReached indicates max_peers_total has been exceeded. + // Maps to HTTP 503. + ErrPeerCapReached = errors.New("webrtc: peer capacity reached") + + // ErrCodecMismatch indicates the viewer's SDP offer does not include + // a codec the source can serve (expected H.264 + Opus). Maps to HTTP 406. + ErrCodecMismatch = errors.New("webrtc: codec mismatch") + + // ErrInvalidSDP indicates the request body was not a parseable SDP offer. + // Maps to HTTP 400. + ErrInvalidSDP = errors.New("webrtc: invalid SDP") + + // ErrICETimeout indicates ICE gathering did not complete within the + // configured timeout. Maps to HTTP 500. + ErrICETimeout = errors.New("webrtc: ICE gathering timeout") +) +``` + +- [ ] **Step 3: Verify the package compiles** + +Run: +```bash +go build ./core/webrtc/... +``` + +Expected: no output (successful build). + +- [ ] **Step 4: Commit** + +```bash +git add core/webrtc/doc.go core/webrtc/errors.go +git commit -m "feat(webrtc): add package skeleton and typed errors" +``` + +--- + +## Task 3: Config struct + +**Files:** +- Create: `core/webrtc/config.go` +- Create: `core/webrtc/config_test.go` + +- [ ] **Step 1: Write the failing test `core/webrtc/config_test.go`** + +```go +package webrtc + +import ( + "testing" +) + +func TestConfig_Defaults(t *testing.T) { + c := DefaultConfig() + if !c.Enabled { + t.Error("default Enabled should be true") + } + if c.WHEPListen != ":8787" { + t.Errorf("default WHEPListen = %q, want :8787", c.WHEPListen) + } + if c.UDPPortRange.Low != 10000 || c.UDPPortRange.High != 10100 { + t.Errorf("default UDPPortRange = %v, want 10000-10100", c.UDPPortRange) + } + if c.MaxPeersTotal != 32 { + t.Errorf("default MaxPeersTotal = %d, want 32", c.MaxPeersTotal) + } + if len(c.ICEServers) == 0 { + t.Error("default ICEServers should have at least one STUN entry") + } +} + +func TestConfig_Validate(t *testing.T) { + tests := []struct { + name string + mutate func(*Config) + wantErr bool + }{ + {"defaults are valid", func(c *Config) {}, false}, + {"empty listen", func(c *Config) { c.WHEPListen = "" }, true}, + {"inverted port range", func(c *Config) { c.UDPPortRange.Low = 20000; c.UDPPortRange.High = 10000 }, true}, + {"zero max peers", func(c *Config) { c.MaxPeersTotal = 0 }, true}, + {"negative max peers", func(c *Config) { c.MaxPeersTotal = -1 }, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := DefaultConfig() + tt.mutate(&c) + err := c.Validate() + if (err != nil) != tt.wantErr { + t.Errorf("Validate() err = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} +``` + +- [ ] **Step 2: Run the test and verify it fails** + +Run: +```bash +go test ./core/webrtc/ -run TestConfig -v +``` + +Expected: FAIL with "undefined: Config" / "undefined: DefaultConfig". + +- [ ] **Step 3: Write the minimal implementation `core/webrtc/config.go`** + +```go +package webrtc + +import "fmt" + +// PortRange represents an inclusive UDP port range. +type PortRange struct { + Low, High int +} + +// Config controls the WebRTC egress module. +type Config struct { + // Enabled toggles the entire module. When false, no endpoints are served. + Enabled bool + + // WHEPListen is the address the WHEP HTTP endpoint binds to (e.g. ":8787"). + WHEPListen string + + // PublicIP is the server's externally-reachable IP, advertised in ICE + // candidates via NAT1To1. Empty means rely on STUN discovery. + PublicIP string + + // UDPPortRange bounds the local UDP ports allocated for FFmpeg→Pion RTP. + UDPPortRange PortRange + + // ICEServers is the list of STUN/TURN URIs given to each PeerConnection. + ICEServers []string + + // MaxPeersTotal is a hard safety cap on concurrent subscribers. + MaxPeersTotal int +} + +// DefaultConfig returns production-reasonable defaults. +func DefaultConfig() Config { + return Config{ + Enabled: true, + WHEPListen: ":8787", + PublicIP: "", + UDPPortRange: PortRange{Low: 10000, High: 10100}, + ICEServers: []string{"stun:stun.cloudflare.com:3478", "stun:stun.l.google.com:19302"}, + MaxPeersTotal: 32, + } +} + +// Validate returns an error if the config is internally inconsistent. +func (c Config) Validate() error { + if c.WHEPListen == "" { + return fmt.Errorf("webrtc: WHEPListen must not be empty") + } + if c.UDPPortRange.Low <= 0 || c.UDPPortRange.High <= 0 { + return fmt.Errorf("webrtc: UDPPortRange must have positive bounds, got %v", c.UDPPortRange) + } + if c.UDPPortRange.Low > c.UDPPortRange.High { + return fmt.Errorf("webrtc: UDPPortRange.Low > High (%d > %d)", c.UDPPortRange.Low, c.UDPPortRange.High) + } + if c.MaxPeersTotal <= 0 { + return fmt.Errorf("webrtc: MaxPeersTotal must be positive, got %d", c.MaxPeersTotal) + } + return nil +} +``` + +- [ ] **Step 4: Run the tests and verify they pass** + +Run: +```bash +go test ./core/webrtc/ -run TestConfig -v +``` + +Expected: PASS for both `TestConfig_Defaults` and `TestConfig_Validate` (all subtests). + +- [ ] **Step 5: Commit** + +```bash +git add core/webrtc/config.go core/webrtc/config_test.go +git commit -m "feat(webrtc): add Config with defaults and validation" +``` + +--- + +## Task 4: Registry — stream_id → Source mapping + +**Files:** +- Create: `core/webrtc/registry.go` +- Create: `core/webrtc/registry_test.go` + +A **Source** (defined in the next task) represents a live RTP stream that peers can subscribe to. The **Registry** is the thread-safe map that maps stream IDs to active sources. Writing this first because Source depends on it less than it does on Source. + +- [ ] **Step 1: Write the failing test `core/webrtc/registry_test.go`** + +```go +package webrtc + +import ( + "sync" + "testing" +) + +// mockSource implements the minimum Source-like shape needed by the registry. +// The real Source type is defined in Task 5; the registry only needs a +// stable type to store and retrieve. +type mockSource struct { + id string +} + +func (m *mockSource) ID() string { return m.id } + +func TestRegistry_RegisterAndLookup(t *testing.T) { + r := NewRegistry() + src := &mockSource{id: "streamA"} + + if err := r.Register("streamA", src); err != nil { + t.Fatalf("Register returned error: %v", err) + } + + got, ok := r.Lookup("streamA") + if !ok { + t.Fatal("Lookup(streamA) returned ok=false, want true") + } + if got != src { + t.Errorf("Lookup returned %v, want %v", got, src) + } +} + +func TestRegistry_LookupMissing(t *testing.T) { + r := NewRegistry() + _, ok := r.Lookup("nope") + if ok { + t.Error("Lookup on empty registry returned ok=true, want false") + } +} + +func TestRegistry_DuplicateRegister(t *testing.T) { + r := NewRegistry() + _ = r.Register("streamA", &mockSource{id: "streamA"}) + + if err := r.Register("streamA", &mockSource{id: "streamA"}); err == nil { + t.Error("duplicate Register should return error, got nil") + } +} + +func TestRegistry_Deregister(t *testing.T) { + r := NewRegistry() + _ = r.Register("streamA", &mockSource{id: "streamA"}) + r.Deregister("streamA") + + if _, ok := r.Lookup("streamA"); ok { + t.Error("after Deregister, Lookup should return ok=false") + } +} + +func TestRegistry_ConcurrentAccess(t *testing.T) { + r := NewRegistry() + var wg sync.WaitGroup + + for i := 0; i < 100; i++ { + wg.Add(3) + id := string(rune('a' + (i % 26))) + go func() { defer wg.Done(); _ = r.Register(id, &mockSource{id: id}) }() + go func() { defer wg.Done(); _, _ = r.Lookup(id) }() + go func() { defer wg.Done(); r.Deregister(id) }() + } + wg.Wait() + // No assertion — test passes if -race doesn't flag anything. +} +``` + +- [ ] **Step 2: Run the test and verify it fails** + +Run: +```bash +go test ./core/webrtc/ -run TestRegistry -v +``` + +Expected: FAIL with "undefined: NewRegistry". + +- [ ] **Step 3: Write the minimal implementation `core/webrtc/registry.go`** + +```go +package webrtc + +import ( + "fmt" + "sync" +) + +// SourceHandle is the minimal interface the Registry stores per stream_id. +// The concrete type is *Source, defined in source.go. +type SourceHandle interface { + ID() string +} + +// Registry is a thread-safe map from stream_id to active SourceHandle. +type Registry struct { + mu sync.RWMutex + streams map[string]SourceHandle +} + +// NewRegistry returns an empty Registry. +func NewRegistry() *Registry { + return &Registry{streams: make(map[string]SourceHandle)} +} + +// Register associates src with streamID. Returns an error if streamID is +// already registered. +func (r *Registry) Register(streamID string, src SourceHandle) error { + r.mu.Lock() + defer r.mu.Unlock() + if _, exists := r.streams[streamID]; exists { + return fmt.Errorf("webrtc: stream %q already registered", streamID) + } + r.streams[streamID] = src + return nil +} + +// Lookup returns the handle for streamID. The second return value is false +// if no source is registered. +func (r *Registry) Lookup(streamID string) (SourceHandle, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + src, ok := r.streams[streamID] + return src, ok +} + +// Deregister removes streamID. No-op if not present. +func (r *Registry) Deregister(streamID string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.streams, streamID) +} +``` + +- [ ] **Step 4: Run the tests and verify they pass** + +Run: +```bash +go test ./core/webrtc/ -run TestRegistry -v -race +``` + +Expected: PASS for all TestRegistry subtests, no data races. + +- [ ] **Step 5: Commit** + +```bash +git add core/webrtc/registry.go core/webrtc/registry_test.go +git commit -m "feat(webrtc): add thread-safe Registry for stream_id → SourceHandle" +``` + +--- + +## Task 5: Source — RTP UDP reader + subscriber fan-out + +**Files:** +- Create: `core/webrtc/source.go` +- Create: `core/webrtc/source_test.go` + +A Source owns a UDP socket bound to a local port, reads RTP packets, and forwards them to every subscribed peer's video/audio track. For M1, we deliberately keep the fan-out simple (per-subscriber goroutine writing to a buffered channel) because at 1–5 viewers the naive model is entirely sufficient. + +- [ ] **Step 1: Write the failing test `core/webrtc/source_test.go`** + +```go +package webrtc + +import ( + "net" + "testing" + "time" + + "github.com/pion/rtp" +) + +func TestSource_ID(t *testing.T) { + s, err := NewSource("streamA", 0) // 0 = ephemeral port + if err != nil { + t.Fatalf("NewSource: %v", err) + } + defer s.Close() + + if s.ID() != "streamA" { + t.Errorf("ID() = %q, want streamA", s.ID()) + } +} + +func TestSource_ReceiveAndFanout(t *testing.T) { + s, err := NewSource("streamA", 0) + if err != nil { + t.Fatalf("NewSource: %v", err) + } + defer s.Close() + + // Subscribe before sending. + sub := s.Subscribe(16) // buffer depth 16 + defer s.Unsubscribe(sub) + + s.Start() + + // Build and send a minimal RTP packet to the source's UDP port. + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + SequenceNumber: 1, + Timestamp: 1000, + SSRC: 0xDEADBEEF, + }, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, + } + raw, err := pkt.Marshal() + if err != nil { + t.Fatalf("pkt.Marshal: %v", err) + } + + conn, err := net.Dial("udp", s.LocalAddr().String()) + if err != nil { + t.Fatalf("net.Dial: %v", err) + } + defer conn.Close() + + if _, err := conn.Write(raw); err != nil { + t.Fatalf("conn.Write: %v", err) + } + + select { + case got := <-sub: + if got.SSRC != 0xDEADBEEF { + t.Errorf("received SSRC = %x, want DEADBEEF", got.SSRC) + } + if got.SequenceNumber != 1 { + t.Errorf("received SeqNum = %d, want 1", got.SequenceNumber) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for RTP packet on subscriber channel") + } +} + +func TestSource_MultipleSubscribers(t *testing.T) { + s, err := NewSource("streamA", 0) + if err != nil { + t.Fatalf("NewSource: %v", err) + } + defer s.Close() + + subs := []chan *rtp.Packet{ + s.Subscribe(8), + s.Subscribe(8), + s.Subscribe(8), + } + for _, sub := range subs { + defer s.Unsubscribe(sub) + } + + s.Start() + + raw, _ := (&rtp.Packet{ + Header: rtp.Header{Version: 2, PayloadType: 96, SequenceNumber: 42, SSRC: 1}, + Payload: []byte{0xAA}, + }).Marshal() + conn, _ := net.Dial("udp", s.LocalAddr().String()) + defer conn.Close() + _, _ = conn.Write(raw) + + for i, sub := range subs { + select { + case got := <-sub: + if got.SequenceNumber != 42 { + t.Errorf("sub %d got seq %d, want 42", i, got.SequenceNumber) + } + case <-time.After(2 * time.Second): + t.Errorf("sub %d timed out", i) + } + } +} + +func TestSource_UnsubscribeStopsDelivery(t *testing.T) { + s, _ := NewSource("streamA", 0) + defer s.Close() + sub := s.Subscribe(8) + s.Start() + s.Unsubscribe(sub) + + // After Unsubscribe, the channel should be closed. + select { + case _, ok := <-sub: + if ok { + t.Error("expected channel closed after Unsubscribe, got value") + } + case <-time.After(500 * time.Millisecond): + t.Error("timed out waiting for channel close") + } +} +``` + +- [ ] **Step 2: Run the test and verify it fails** + +Run: +```bash +go test ./core/webrtc/ -run TestSource -v +``` + +Expected: FAIL with "undefined: NewSource". + +- [ ] **Step 3: Write the minimal implementation `core/webrtc/source.go`** + +```go +package webrtc + +import ( + "fmt" + "net" + "sync" + + "github.com/pion/rtp" +) + +// Source reads RTP packets from a local UDP socket and fans them out to +// subscribed peers via per-subscriber buffered channels. +type Source struct { + id string + conn *net.UDPConn + + mu sync.Mutex + subscribers map[chan *rtp.Packet]struct{} + started bool + closed bool + done chan struct{} +} + +// NewSource binds a UDP socket on 127.0.0.1:port. Pass port=0 to let the OS +// assign an ephemeral port (useful for tests). +func NewSource(streamID string, port int) (*Source, error) { + addr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: port} + conn, err := net.ListenUDP("udp4", addr) + if err != nil { + return nil, fmt.Errorf("webrtc: listen udp: %w", err) + } + return &Source{ + id: streamID, + conn: conn, + subscribers: make(map[chan *rtp.Packet]struct{}), + done: make(chan struct{}), + }, nil +} + +// ID returns the registered stream identifier. +func (s *Source) ID() string { return s.id } + +// LocalAddr returns the UDP address the source is listening on. +func (s *Source) LocalAddr() *net.UDPAddr { + return s.conn.LocalAddr().(*net.UDPAddr) +} + +// Subscribe returns a new buffered channel that receives every RTP packet +// read from the UDP socket. bufDepth is the channel buffer size; when full, +// packets are dropped (preventing a slow subscriber from back-pressuring +// the reader). +func (s *Source) Subscribe(bufDepth int) chan *rtp.Packet { + ch := make(chan *rtp.Packet, bufDepth) + s.mu.Lock() + s.subscribers[ch] = struct{}{} + s.mu.Unlock() + return ch +} + +// Unsubscribe removes ch from the subscriber set and closes it. +func (s *Source) Unsubscribe(ch chan *rtp.Packet) { + s.mu.Lock() + defer s.mu.Unlock() + if _, ok := s.subscribers[ch]; ok { + delete(s.subscribers, ch) + close(ch) + } +} + +// Start begins the RTP reader goroutine. Safe to call once; subsequent calls +// are no-ops. +func (s *Source) Start() { + s.mu.Lock() + if s.started || s.closed { + s.mu.Unlock() + return + } + s.started = true + s.mu.Unlock() + + go s.readLoop() +} + +func (s *Source) readLoop() { + buf := make([]byte, 1500) // MTU-sized; RTP over UDP should fit + for { + select { + case <-s.done: + return + default: + } + + n, _, err := s.conn.ReadFromUDP(buf) + if err != nil { + // Socket closed or error — exit the loop. + return + } + + pkt := &rtp.Packet{} + if err := pkt.Unmarshal(buf[:n]); err != nil { + // Malformed packet; skip without crashing. + continue + } + + s.mu.Lock() + for ch := range s.subscribers { + select { + case ch <- pkt: + default: + // Subscriber full — drop to protect the reader. + } + } + s.mu.Unlock() + } +} + +// Close stops the reader goroutine, closes the UDP socket, and closes every +// subscriber channel. +func (s *Source) Close() error { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return nil + } + s.closed = true + close(s.done) + for ch := range s.subscribers { + delete(s.subscribers, ch) + close(ch) + } + s.mu.Unlock() + return s.conn.Close() +} +``` + +- [ ] **Step 4: Run the tests and verify they pass** + +Run: +```bash +go test ./core/webrtc/ -run TestSource -v -race +``` + +Expected: PASS for all TestSource subtests, no data races. + +- [ ] **Step 5: Commit** + +```bash +git add core/webrtc/source.go core/webrtc/source_test.go +git commit -m "feat(webrtc): add Source with UDP RTP reader and subscriber fan-out" +``` + +--- + +## Task 6: ICE config helper (SettingEngine builder) + +**Files:** +- Create: `core/webrtc/ice.go` +- Create: `core/webrtc/ice_test.go` + +Isolated helper that translates our `Config` into Pion's `SettingEngine` + `Configuration` pair. Keeping it separate makes peer.go simpler and the ICE config trivially testable. + +- [ ] **Step 1: Write the failing test `core/webrtc/ice_test.go`** + +```go +package webrtc + +import ( + "testing" + + "github.com/pion/webrtc/v4" +) + +func TestBuildICEConfig_Defaults(t *testing.T) { + c := DefaultConfig() + rtcConfig, _, err := BuildICEConfig(c) + if err != nil { + t.Fatalf("BuildICEConfig: %v", err) + } + if len(rtcConfig.ICEServers) == 0 { + t.Error("ICEServers should not be empty") + } + // First default is Cloudflare STUN. + if rtcConfig.ICEServers[0].URLs[0] != "stun:stun.cloudflare.com:3478" { + t.Errorf("first ICE server = %q, want stun:stun.cloudflare.com:3478", + rtcConfig.ICEServers[0].URLs[0]) + } +} + +func TestBuildICEConfig_PublicIP(t *testing.T) { + c := DefaultConfig() + c.PublicIP = "203.0.113.10" + _, se, err := BuildICEConfig(c) + if err != nil { + t.Fatalf("BuildICEConfig: %v", err) + } + if se == nil { + t.Fatal("SettingEngine should not be nil when PublicIP is set") + } + // We can't introspect NAT1To1IPs directly from Pion's public API; the + // smoke test is that building an API from this engine works. + api := webrtc.NewAPI(webrtc.WithSettingEngine(*se)) + if api == nil { + t.Fatal("NewAPI returned nil") + } +} + +func TestBuildICEConfig_InvalidConfig(t *testing.T) { + c := DefaultConfig() + c.WHEPListen = "" + _, _, err := BuildICEConfig(c) + if err == nil { + t.Error("BuildICEConfig should reject invalid config") + } +} +``` + +- [ ] **Step 2: Run the test and verify it fails** + +Run: +```bash +go test ./core/webrtc/ -run TestBuildICEConfig -v +``` + +Expected: FAIL with "undefined: BuildICEConfig". + +- [ ] **Step 3: Write the minimal implementation `core/webrtc/ice.go`** + +```go +package webrtc + +import ( + "github.com/pion/webrtc/v4" +) + +// BuildICEConfig translates a Config into the two Pion config pieces every +// PeerConnection needs: a webrtc.Configuration (with ICE servers) and a +// SettingEngine (with NAT1To1 and port range tuning). +// +// The returned *SettingEngine may be nil if no engine-level tuning is +// required (i.e. PublicIP unset and UDPPortRange at defaults). Callers +// should only pass it to webrtc.NewAPI when non-nil. +func BuildICEConfig(c Config) (webrtc.Configuration, *webrtc.SettingEngine, error) { + if err := c.Validate(); err != nil { + return webrtc.Configuration{}, nil, err + } + + rtcConfig := webrtc.Configuration{ + ICEServers: make([]webrtc.ICEServer, 0, len(c.ICEServers)), + } + for _, uri := range c.ICEServers { + rtcConfig.ICEServers = append(rtcConfig.ICEServers, webrtc.ICEServer{ + URLs: []string{uri}, + }) + } + + var se *webrtc.SettingEngine + if c.PublicIP != "" || c.UDPPortRange.Low > 0 { + engine := webrtc.SettingEngine{} + if c.PublicIP != "" { + engine.SetNAT1To1IPs([]string{c.PublicIP}, webrtc.ICECandidateTypeHost) + } + // Constrain the ephemeral UDP range Pion allocates for ICE candidates. + // Note: this is a separate concern from our FFmpeg→Source UDP ports; + // Pion uses its own port pool for the WebRTC media path. + if c.UDPPortRange.Low > 0 && c.UDPPortRange.High >= c.UDPPortRange.Low { + if err := engine.SetEphemeralUDPPortRange( + uint16(c.UDPPortRange.Low), uint16(c.UDPPortRange.High)); err != nil { + return webrtc.Configuration{}, nil, err + } + } + se = &engine + } + + return rtcConfig, se, nil +} +``` + +- [ ] **Step 4: Run the tests and verify they pass** + +Run: +```bash +go test ./core/webrtc/ -run TestBuildICEConfig -v +``` + +Expected: PASS for all TestBuildICEConfig subtests. + +- [ ] **Step 5: Commit** + +```bash +git add core/webrtc/ice.go core/webrtc/ice_test.go +git commit -m "feat(webrtc): add ICE config helper (Configuration + SettingEngine)" +``` + +--- + +## Task 7: Peer — PeerConnection factory + track attachment + +**Files:** +- Create: `core/webrtc/peer.go` +- Create: `core/webrtc/peer_test.go` + +Creates a Pion `PeerConnection`, adds video (H.264) and audio (Opus) `TrackLocalStaticRTP`, negotiates SDP, and starts a goroutine that forwards packets from a Source subscription into those tracks. + +- [ ] **Step 1: Write the failing test `core/webrtc/peer_test.go`** + +```go +package webrtc + +import ( + "context" + "testing" + "time" + + "github.com/pion/webrtc/v4" +) + +// minimalOfferSDP returns an SDP offer that advertises H.264 (video) and +// Opus (audio) as recvonly — the minimum a WHEP client sends. +func minimalOfferSDP(t *testing.T) webrtc.SessionDescription { + t.Helper() + // Create a throwaway PC to generate a valid offer. + me := &webrtc.MediaEngine{} + if err := me.RegisterDefaultCodecs(); err != nil { + t.Fatalf("RegisterDefaultCodecs: %v", err) + } + api := webrtc.NewAPI(webrtc.WithMediaEngine(me)) + pc, err := api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + t.Fatalf("NewPeerConnection: %v", err) + } + defer pc.Close() + + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + t.Fatalf("AddTransceiver video: %v", err) + } + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + t.Fatalf("AddTransceiver audio: %v", err) + } + offer, err := pc.CreateOffer(nil) + if err != nil { + t.Fatalf("CreateOffer: %v", err) + } + return offer +} + +func TestPeerFactory_CreateAnswer(t *testing.T) { + src, err := NewSource("streamA", 0) + if err != nil { + t.Fatalf("NewSource: %v", err) + } + defer src.Close() + src.Start() + + cfg := DefaultConfig() + factory, err := NewPeerFactory(cfg) + if err != nil { + t.Fatalf("NewPeerFactory: %v", err) + } + + offer := minimalOfferSDP(t) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + peer, err := factory.CreatePeer(ctx, src, offer) + if err != nil { + t.Fatalf("CreatePeer: %v", err) + } + defer peer.Close() + + if peer.Answer().Type != webrtc.SDPTypeAnswer { + t.Errorf("Answer().Type = %v, want answer", peer.Answer().Type) + } + if peer.ResourceID() == "" { + t.Error("ResourceID should be non-empty") + } +} + +func TestPeerFactory_ClosesCleanly(t *testing.T) { + src, _ := NewSource("streamA", 0) + defer src.Close() + src.Start() + + factory, _ := NewPeerFactory(DefaultConfig()) + offer := minimalOfferSDP(t) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + peer, err := factory.CreatePeer(ctx, src, offer) + if err != nil { + t.Fatalf("CreatePeer: %v", err) + } + if err := peer.Close(); err != nil { + t.Errorf("Close: %v", err) + } + // Second close should be a no-op, not panic. + if err := peer.Close(); err != nil { + t.Errorf("second Close: %v", err) + } +} +``` + +- [ ] **Step 2: Run the test and verify it fails** + +Run: +```bash +go test ./core/webrtc/ -run TestPeerFactory -v +``` + +Expected: FAIL with "undefined: NewPeerFactory". + +- [ ] **Step 3: Write the minimal implementation `core/webrtc/peer.go`** + +```go +package webrtc + +import ( + "context" + "crypto/rand" + "encoding/hex" + "fmt" + "sync" + + "github.com/pion/rtp" + "github.com/pion/webrtc/v4" +) + +// PeerFactory builds PeerConnections from a shared Pion API instance +// configured from Config. +type PeerFactory struct { + api *webrtc.API + rtcConfig webrtc.Configuration +} + +// NewPeerFactory initializes a Pion API with the codec set we support +// (H.264 + Opus) and applies the provided Config. +func NewPeerFactory(c Config) (*PeerFactory, error) { + if err := c.Validate(); err != nil { + return nil, err + } + + me := &webrtc.MediaEngine{} + if err := me.RegisterDefaultCodecs(); err != nil { + return nil, fmt.Errorf("webrtc: register default codecs: %w", err) + } + + rtcConfig, se, err := BuildICEConfig(c) + if err != nil { + return nil, err + } + + opts := []func(*webrtc.API){webrtc.WithMediaEngine(me)} + if se != nil { + opts = append(opts, webrtc.WithSettingEngine(*se)) + } + api := webrtc.NewAPI(opts...) + + return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil +} + +// Peer wraps a Pion PeerConnection bound to a Source's subscription. +type Peer struct { + resourceID string + pc *webrtc.PeerConnection + answer webrtc.SessionDescription + source *Source + sub chan *rtp.Packet + done chan struct{} + once sync.Once +} + +// CreatePeer builds a PeerConnection, sets the remote offer, generates an +// answer, attaches video+audio tracks fed from src, and blocks until ICE +// gathering completes or ctx expires. +func (f *PeerFactory) CreatePeer(ctx context.Context, src *Source, offer webrtc.SessionDescription) (*Peer, error) { + pc, err := f.api.NewPeerConnection(f.rtcConfig) + if err != nil { + return nil, fmt.Errorf("webrtc: new peer connection: %w", err) + } + + videoTrack, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, + "video", "dragonfork") + if err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: new video track: %w", err) + } + audioTrack, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, + "audio", "dragonfork") + if err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: new audio track: %w", err) + } + if _, err := pc.AddTrack(videoTrack); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: add video track: %w", err) + } + if _, err := pc.AddTrack(audioTrack); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: add audio track: %w", err) + } + + if err := pc.SetRemoteDescription(offer); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: set remote: %w", err) + } + answer, err := pc.CreateAnswer(nil) + if err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: create answer: %w", err) + } + + gatherComplete := webrtc.GatheringCompletePromise(pc) + if err := pc.SetLocalDescription(answer); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: set local: %w", err) + } + + select { + case <-gatherComplete: + case <-ctx.Done(): + _ = pc.Close() + return nil, ErrICETimeout + } + + sub := src.Subscribe(64) + + p := &Peer{ + resourceID: newResourceID(), + pc: pc, + answer: *pc.LocalDescription(), + source: src, + sub: sub, + done: make(chan struct{}), + } + + pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) { + if st == webrtc.PeerConnectionStateFailed || + st == webrtc.PeerConnectionStateDisconnected || + st == webrtc.PeerConnectionStateClosed { + _ = p.Close() + } + }) + + go forwardRTP(p.done, sub, videoTrack, audioTrack) + + return p, nil +} + +// Answer returns the locally-created SDP answer. Valid after CreatePeer. +func (p *Peer) Answer() webrtc.SessionDescription { return p.answer } + +// ResourceID returns the stable resource id used in the WHEP Location header. +func (p *Peer) ResourceID() string { return p.resourceID } + +// Close tears down the peer connection and unsubscribes from the source. +// Safe to call multiple times. +func (p *Peer) Close() error { + var err error + p.once.Do(func() { + close(p.done) + p.source.Unsubscribe(p.sub) + err = p.pc.Close() + }) + return err +} + +func newResourceID() string { + b := make([]byte, 8) + _, _ = rand.Read(b) + return hex.EncodeToString(b) +} +``` + +- [ ] **Step 4: Create `core/webrtc/forward.go` with the RTP forwarder** + +```go +package webrtc + +import ( + "github.com/pion/rtp" + "github.com/pion/webrtc/v4" +) + +// forwardRTP reads packets from sub and writes them to the correct track +// based on payload type (H.264 → video, Opus → audio). Payload-type +// inspection is the simplest M1 approach; M2 will switch to per-track +// source channels once the process resolver manages separate video/audio +// UDP ports. +func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet, + video, audio *webrtc.TrackLocalStaticRTP) { + for { + select { + case <-done: + return + case pkt, ok := <-sub: + if !ok { + return + } + // Pion default H.264 PT = 102, Opus PT = 111. If the publisher + // uses different PTs we'll revisit in M2 — for M1 PoC we + // configure FFmpeg to these values explicitly in the publisher + // script. + switch pkt.PayloadType { + case 102: + _ = video.WriteRTP(pkt) + case 111: + _ = audio.WriteRTP(pkt) + default: + // Unknown PT — drop. Log in M3. + } + } + } +} +``` + +- [ ] **Step 5: Run the tests and verify they pass** + +Run: +```bash +go test ./core/webrtc/ -run TestPeerFactory -v +``` + +Expected: PASS for `TestPeerFactory_CreateAnswer` and `TestPeerFactory_ClosesCleanly`. + +- [ ] **Step 6: Commit** + +```bash +git add core/webrtc/peer.go core/webrtc/peer_test.go core/webrtc/forward.go +git commit -m "feat(webrtc): add PeerFactory, Peer, and RTP forwarder" +``` + +--- + +## Task 8: WHEP HTTP handler (happy path only) + +**Files:** +- Create: `core/webrtc/whep.go` +- Create: `core/webrtc/whep_test.go` + +For M1, the WHEP handler supports only `POST /whep/{stream_id}` happy path. Error paths (404/406/503) and DELETE/PATCH come in M3. + +- [ ] **Step 1: Write the failing test `core/webrtc/whep_test.go`** + +```go +package webrtc + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/pion/webrtc/v4" +) + +func TestWHEP_POSTReturns201WithSDP(t *testing.T) { + // Set up a Source and register it. + src, _ := NewSource("streamA", 0) + defer src.Close() + src.Start() + + reg := NewRegistry() + _ = reg.Register("streamA", src) + + factory, _ := NewPeerFactory(DefaultConfig()) + + handler := NewWHEPHandler(reg, factory, DefaultConfig()) + + // Build an offer using a throwaway PC. + me := &webrtc.MediaEngine{} + _ = me.RegisterDefaultCodecs() + api := webrtc.NewAPI(webrtc.WithMediaEngine(me)) + pc, _ := api.NewPeerConnection(webrtc.Configuration{}) + defer pc.Close() + _, _ = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}) + _, _ = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}) + offer, _ := pc.CreateOffer(nil) + + req := httptest.NewRequest(http.MethodPost, "/whep/streamA", + strings.NewReader(offer.SDP)) + req.Header.Set("Content-Type", "application/sdp") + // Give the handler generous ICE gathering time in tests. + ctx, cancel := context.WithTimeout(req.Context(), 10*time.Second) + defer cancel() + req = req.WithContext(ctx) + + rr := httptest.NewRecorder() + handler.ServeHTTP(rr, req) + + if rr.Code != http.StatusCreated { + body, _ := io.ReadAll(rr.Result().Body) + t.Fatalf("status = %d, want 201. body=%s", rr.Code, string(body)) + } + if ct := rr.Header().Get("Content-Type"); ct != "application/sdp" { + t.Errorf("Content-Type = %q, want application/sdp", ct) + } + if loc := rr.Header().Get("Location"); !strings.HasPrefix(loc, "/whep/streamA/") { + t.Errorf("Location = %q, want /whep/streamA/", loc) + } + if !strings.Contains(rr.Body.String(), "v=0") { + t.Errorf("body does not look like SDP: %s", rr.Body.String()) + } +} +``` + +- [ ] **Step 2: Run the test and verify it fails** + +Run: +```bash +go test ./core/webrtc/ -run TestWHEP -v +``` + +Expected: FAIL with "undefined: NewWHEPHandler". + +- [ ] **Step 3: Write the minimal implementation `core/webrtc/whep.go`** + +```go +package webrtc + +import ( + "io" + "net/http" + "strings" + "sync" + "sync/atomic" + + "github.com/pion/webrtc/v4" +) + +// WHEPHandler serves the WebRTC-HTTP Egress Protocol POST. +type WHEPHandler struct { + registry *Registry + factory *PeerFactory + config Config + + mu sync.Mutex + peers map[string]*Peer // resourceID → Peer + peersCount int64 // atomic, for cap check without lock +} + +// NewWHEPHandler constructs a handler with the given dependencies. +func NewWHEPHandler(r *Registry, f *PeerFactory, c Config) *WHEPHandler { + return &WHEPHandler{ + registry: r, + factory: f, + config: c, + peers: make(map[string]*Peer), + } +} + +// ServeHTTP handles POST /whep/{stream_id}. Other methods and paths return 405. +func (h *WHEPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.Header().Set("Allow", "POST") + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + // Extract stream_id from path: /whep/{stream_id} + streamID := strings.TrimPrefix(r.URL.Path, "/whep/") + if streamID == "" || strings.Contains(streamID, "/") { + http.Error(w, "invalid stream id", http.StatusBadRequest) + return + } + + // Peer cap enforcement (happy path still respects the cap). + if atomic.LoadInt64(&h.peersCount) >= int64(h.config.MaxPeersTotal) { + http.Error(w, ErrPeerCapReached.Error(), http.StatusServiceUnavailable) + return + } + + handle, ok := h.registry.Lookup(streamID) + if !ok { + http.Error(w, ErrStreamNotFound.Error(), http.StatusNotFound) + return + } + src, ok := handle.(*Source) + if !ok { + http.Error(w, "registered source is not a *Source", http.StatusInternalServerError) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "read body: "+err.Error(), http.StatusBadRequest) + return + } + if len(body) == 0 { + http.Error(w, ErrInvalidSDP.Error(), http.StatusBadRequest) + return + } + + offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} + + peer, err := h.factory.CreatePeer(r.Context(), src, offer) + if err != nil { + http.Error(w, "create peer: "+err.Error(), http.StatusInternalServerError) + return + } + + h.mu.Lock() + h.peers[peer.ResourceID()] = peer + h.mu.Unlock() + atomic.AddInt64(&h.peersCount, 1) + + w.Header().Set("Content-Type", "application/sdp") + w.Header().Set("Location", "/whep/"+streamID+"/"+peer.ResourceID()) + w.WriteHeader(http.StatusCreated) + _, _ = io.WriteString(w, peer.Answer().SDP) +} +``` + +- [ ] **Step 4: Run the tests and verify they pass** + +Run: +```bash +go test ./core/webrtc/ -run TestWHEP -v +``` + +Expected: PASS for `TestWHEP_POSTReturns201WithSDP`. + +- [ ] **Step 5: Run the full package test suite** + +Run: +```bash +go test ./core/webrtc/... -v -race +``` + +Expected: ALL PASS. No race warnings. + +- [ ] **Step 6: Commit** + +```bash +git add core/webrtc/whep.go core/webrtc/whep_test.go +git commit -m "feat(webrtc): add WHEP POST handler (happy path)" +``` + +--- + +## Task 9: Standalone PoC binary + +**Files:** +- Create: `cmd/webrtc-poc/main.go` + +The PoC binary wires everything together: creates a Source, registers it, starts the WHEP handler, and blocks. M2 replaces this with integration into datarhei Core's normal startup. + +- [ ] **Step 1: Write `cmd/webrtc-poc/main.go`** + +```go +// Command webrtc-poc runs a minimal Dragon Fork WebRTC egress server for +// manual end-to-end testing. It listens for RTP on 127.0.0.1:10000 as +// stream "test" and serves WHEP at :8787. +// +// This is NOT part of the datarhei Core binary. It will be removed or +// demoted to an internal test helper once milestone M2 lands. +package main + +import ( + "flag" + "log" + "net/http" + + "/core/webrtc" +) + +func main() { + var ( + streamID = flag.String("stream", "test", "stream id to serve") + rtpPort = flag.Int("rtp-port", 10000, "UDP port to receive RTP on") + listen = flag.String("listen", ":8787", "WHEP HTTP listen address") + publicIP = flag.String("public-ip", "", "server public IP for NAT1To1 (optional)") + ) + flag.Parse() + + cfg := webrtc.DefaultConfig() + cfg.WHEPListen = *listen + cfg.PublicIP = *publicIP + + src, err := webrtc.NewSource(*streamID, *rtpPort) + if err != nil { + log.Fatalf("NewSource: %v", err) + } + src.Start() + defer src.Close() + log.Printf("listening for RTP on %s", src.LocalAddr()) + + reg := webrtc.NewRegistry() + if err := reg.Register(*streamID, src); err != nil { + log.Fatalf("Register: %v", err) + } + + factory, err := webrtc.NewPeerFactory(cfg) + if err != nil { + log.Fatalf("NewPeerFactory: %v", err) + } + + handler := webrtc.NewWHEPHandler(reg, factory, cfg) + + mux := http.NewServeMux() + mux.Handle("/whep/", handler) + + log.Printf("WHEP listening on %s — POST /whep/%s to subscribe", *listen, *streamID) + log.Fatal(http.ListenAndServe(*listen, mux)) +} +``` + +- [ ] **Step 2: Substitute the real module path** + +Open the file and replace `` with the actual module path from your fork's `go.mod` first line. For example, if `go.mod` starts with `module github.com/wilddragon/datarhei-dragonfork-core`, the import becomes: + +```go +import "github.com/wilddragon/datarhei-dragonfork-core/core/webrtc" +``` + +Do this once by editing the file — do not commit the placeholder string. + +- [ ] **Step 3: Build the binary** + +Run: +```bash +go build -o /tmp/webrtc-poc ./cmd/webrtc-poc +``` + +Expected: binary produced at `/tmp/webrtc-poc`, no build errors. + +- [ ] **Step 4: Smoke-run it** + +Run: +```bash +/tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787 & +sleep 1 +curl -sS -X POST -H 'Content-Type: application/sdp' \ + --data 'v=0' http://127.0.0.1:8787/whep/test +``` + +Expected: curl returns a response (will likely be 400 or 500 because the body isn't a real offer — that's fine; the important thing is the server is up and routing the request). The server log shows the POST arrived. + +Kill the server: +```bash +kill %1 +wait 2>/dev/null +``` + +- [ ] **Step 5: Commit** + +```bash +git add cmd/webrtc-poc/main.go +git commit -m "feat(webrtc): add standalone webrtc-poc binary for M1 testing" +``` + +--- + +## Task 10: FFmpeg publisher script + +**Files:** +- Create: `test/publish.sh` + +A shell script that runs FFmpeg to generate a test pattern and push it as RTP to the PoC binary's port. Uses `testsrc2` with a burned-in timecode (useful later for latency measurement). + +- [ ] **Step 1: Create `test/publish.sh`** + +```bash +#!/usr/bin/env bash +# test/publish.sh — Dragon Fork M1 publisher +# +# Pushes an FFmpeg testsrc2 pattern as H.264 + Opus RTP to the webrtc-poc +# binary's local UDP port(s). Requires FFmpeg 6.x. +set -euo pipefail + +HOST="${HOST:-127.0.0.1}" +VIDEO_PORT="${VIDEO_PORT:-10000}" +AUDIO_PORT="${AUDIO_PORT:-10002}" +FPS="${FPS:-30}" +SIZE="${SIZE:-640x360}" + +echo "publishing testsrc2 → $HOST:$VIDEO_PORT (video, PT=102)" +echo " $HOST:$AUDIO_PORT (audio, PT=111)" + +exec ffmpeg -hide_banner -re \ + -f lavfi -i "testsrc2=size=${SIZE}:rate=${FPS}" \ + -f lavfi -i "sine=frequency=440:sample_rate=48000" \ + -vf "drawtext=text='%{localtime\\:%H\\\\\\:%M\\\\\\:%S.%3N}':x=10:y=10:fontsize=32:fontcolor=white:box=1:boxcolor=black@0.8" \ + -c:v libx264 -preset ultrafast -tune zerolatency \ + -profile:v baseline -pix_fmt yuv420p \ + -b:v 1500k -maxrate 1500k -bufsize 500k \ + -g 60 -keyint_min 60 -x264-params "repeat-headers=1" \ + -payload_type 102 -f rtp "rtp://${HOST}:${VIDEO_PORT}?pkt_size=1200" \ + -c:a libopus -b:a 128k \ + -payload_type 111 -f rtp "rtp://${HOST}:${AUDIO_PORT}?pkt_size=1200" +``` + +- [ ] **Step 2: Make it executable** + +Run: +```bash +chmod +x test/publish.sh +``` + +- [ ] **Step 3: Smoke-test it against the PoC binary** + +In one terminal: +```bash +/tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787 +``` + +In another (will fail until we bind both ports — M1 uses a single source port; the script pushes audio to 10002 but the PoC only listens on 10000 — that's fine for M1 since we're primarily testing video): +```bash +./test/publish.sh +``` + +Expected: FFmpeg runs and prints RTP packet stats. The PoC binary logs nothing specifically (it's silently reading RTP — we haven't added verbose logging in M1). Let it run 3 seconds, then Ctrl+C both. + +(In M2 we'll separate video and audio into distinct sources; for M1 the single-port video-only path is enough to prove the pipeline.) + +- [ ] **Step 4: Commit** + +```bash +git add test/publish.sh +git commit -m "test: add FFmpeg publisher script for M1 PoC" +``` + +--- + +## Task 11: Pion-based test WHEP client + +**Files:** +- Create: `test/whep-client/main.go` +- Create: `test/whep-client/main_test.go` + +A Go binary that acts as a headless WHEP client: POSTs an SDP offer, parses the SDP answer, establishes a `PeerConnection`, and verifies that RTP packets arrive. Essential for automated end-to-end verification without a browser in the loop. + +- [ ] **Step 1: Create `test/whep-client/main.go`** + +```go +// Command whep-client subscribes to a Dragon Fork WHEP endpoint and logs +// the first N received RTP packets, then exits. Used for M1 end-to-end +// verification. +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "sync/atomic" + "time" + + "github.com/pion/webrtc/v4" +) + +func main() { + var ( + whepURL = flag.String("url", "http://127.0.0.1:8787/whep/test", "WHEP endpoint URL") + wantPkt = flag.Int("pkts", 30, "exit after receiving this many video RTP packets") + timeout = flag.Duration("timeout", 15*time.Second, "overall timeout") + ) + flag.Parse() + + if err := run(*whepURL, *wantPkt, *timeout); err != nil { + log.Fatalf("whep-client: %v", err) + } +} + +func run(whepURL string, wantPkt int, timeout time.Duration) error { + me := &webrtc.MediaEngine{} + if err := me.RegisterDefaultCodecs(); err != nil { + return fmt.Errorf("register codecs: %w", err) + } + api := webrtc.NewAPI(webrtc.WithMediaEngine(me)) + pc, err := api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + return fmt.Errorf("new pc: %w", err) + } + defer pc.Close() + + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + return err + } + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + return err + } + + var videoCount int64 + pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { + kind := track.Kind().String() + log.Printf("OnTrack: kind=%s codec=%s", kind, track.Codec().MimeType) + go func() { + buf := make([]byte, 1500) + for { + n, _, err := track.Read(buf) + if err != nil { + log.Printf("track.Read (%s): %v", kind, err) + return + } + if kind == "video" { + atomic.AddInt64(&videoCount, 1) + } + _ = n + } + }() + }) + + offer, err := pc.CreateOffer(nil) + if err != nil { + return err + } + gatherComplete := webrtc.GatheringCompletePromise(pc) + if err := pc.SetLocalDescription(offer); err != nil { + return err + } + <-gatherComplete + offer = *pc.LocalDescription() + + answerSDP, err := httpPostSDP(whepURL, offer.SDP) + if err != nil { + return fmt.Errorf("WHEP POST: %w", err) + } + + if err := pc.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, SDP: answerSDP, + }); err != nil { + return fmt.Errorf("set remote: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + deadline := time.After(timeout) + tick := time.NewTicker(250 * time.Millisecond) + defer tick.Stop() + for { + select { + case <-tick.C: + if atomic.LoadInt64(&videoCount) >= int64(wantPkt) { + log.Printf("OK: received %d video packets", videoCount) + fmt.Fprintln(os.Stdout, "PASS") + return nil + } + case <-deadline: + return fmt.Errorf("timeout after %s: only %d video packets received", + timeout, atomic.LoadInt64(&videoCount)) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func httpPostSDP(url, sdp string) (string, error) { + req, err := http.NewRequest(http.MethodPost, url, bytes.NewBufferString(sdp)) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/sdp") + req.Header.Set("Accept", "application/sdp") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + if resp.StatusCode != http.StatusCreated { + return "", fmt.Errorf("status %d: %s", resp.StatusCode, string(body)) + } + return string(body), nil +} +``` + +- [ ] **Step 2: Create `test/whep-client/main_test.go`** + +```go +package main + +import ( + "strings" + "testing" +) + +// Placeholder unit test. The real validation happens end-to-end in Task 12. +// This at least keeps `go test ./test/whep-client/...` happy in CI later. +func TestHTTPPostSDP_RejectsNon2xx(t *testing.T) { + _, err := httpPostSDP("http://127.0.0.1:1/whep/none", "v=0\n") + if err == nil { + t.Fatal("expected error from unreachable endpoint") + } + if !strings.Contains(err.Error(), "dial") && !strings.Contains(err.Error(), "connect") { + t.Errorf("expected dial/connect error, got: %v", err) + } +} +``` + +- [ ] **Step 3: Build and test** + +Run: +```bash +go build -o /tmp/whep-client ./test/whep-client +go test ./test/whep-client/... -v +``` + +Expected: binary builds; `TestHTTPPostSDP_RejectsNon2xx` passes. + +- [ ] **Step 4: Commit** + +```bash +git add test/whep-client/ +git commit -m "test: add Pion-based WHEP client for end-to-end M1 verification" +``` + +--- + +## Task 12: End-to-end PoC verification + +**Files:** None (runtime verification only) + +This is the moment of truth — FFmpeg → webrtc-poc → whep-client, all the way through, with the whep-client confirming it decoded video. + +- [ ] **Step 1: Open three terminals** + +Terminal A — the PoC server. +Terminal B — the FFmpeg publisher. +Terminal C — the test WHEP client. + +- [ ] **Step 2: Start the PoC server (Terminal A)** + +Run: +```bash +/tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787 +``` + +Expected: server logs `listening for RTP on 127.0.0.1:10000` and `WHEP listening on :8787 — POST /whep/test to subscribe`. Leave running. + +- [ ] **Step 3: Start the FFmpeg publisher (Terminal B)** + +Run: +```bash +./test/publish.sh +``` + +Expected: FFmpeg begins producing frames and reporting throughput (something like `frame= 30 fps=30 q=...`). Leave running. + +- [ ] **Step 4: Run the WHEP test client (Terminal C)** + +Run: +```bash +/tmp/whep-client -url http://127.0.0.1:8787/whep/test -pkts 30 -timeout 15s +``` + +Expected output within a few seconds: + +``` +OnTrack: kind=video codec=video/H264 +OK: received 30 video packets +PASS +``` + +The client exits 0. **This is M1's success criterion.** + +- [ ] **Step 5: If it fails, systematic debug** + +If the client times out: + +1. Confirm the FFmpeg publisher is actually producing RTP: run `tcpdump -i lo -n udp port 10000` in a fourth terminal — you should see packets. If not, the publisher is broken; re-check payload types and destination port in `publish.sh`. +2. Confirm the PoC server is receiving RTP: temporarily add a log line in `source.go` readLoop (`log.Printf("rtp seq=%d", pkt.SequenceNumber)`) and rebuild. If nothing logs, the UDP bind is wrong. +3. Confirm the PeerConnection is establishing: check the WHEP POST response was 201; check the client's `pc.ConnectionState()` transitions through `connecting` to `connected`. +4. Confirm codec payload types match: the publish script uses PT 102 for H.264 and 111 for Opus; Pion's default H.264 PT is 102 and Opus is 111. If you see RTP arriving in the reader but `forwardRTP` drops them, the PTs don't match — add a log line in `forward.go` to confirm. + +Log each issue and fix in `NOTES.md` as you go — those are exactly the gotchas we want recorded for M2+. + +- [ ] **Step 6: Tear down** + +Ctrl+C in all three terminals. + +- [ ] **Step 7: Document M1 success in NOTES.md** + +Append to `NOTES.md`: + +```markdown +## M1 PoC verified — + +FFmpeg (testsrc2 + drawtext timecode, H.264 baseline, Opus) → webrtc-poc → Pion WHEP client. + +- Video PT: 102 / Audio PT: 111 (matched between publisher and client) +- ICE gathering completed within ~s +- First video packet received ~s after POST (bounded by 2s GOP once we add forced keyframes in M2) +- + +Ready for M2 (datarhei process-model integration). +``` + +Run: +```bash +git add NOTES.md +git commit -m "docs: record M1 PoC success and observations" +``` + +- [ ] **Step 8: Push the branch** + +Run: +```bash +git push -u origin m1-webrtc-poc +``` + +Expected: branch pushed to your fork's remote. M1 complete. + +--- + +## Exit Criteria + +M1 is done when **all of the following are true**: + +1. `go build ./...` succeeds on the fork. +2. `go test ./core/webrtc/... -race` passes — all unit tests green, no race warnings. +3. `test/whep-client/main.go` prints `PASS` against a running `webrtc-poc` + `test/publish.sh` pair within 15 seconds. +4. `NOTES.md` records the verification run and any gotchas encountered. +5. Branch `m1-webrtc-poc` is pushed to the fork remote. + +--- + +## What comes next + +- **M2 plan** will be written after M1 is verified. It covers: adding the `webrtc://` URL scheme to datarhei Core's output resolver, wiring the Registry and WHEPHandler into Core's normal startup, separating video/audio onto distinct UDP ports, and hooking into the process lifecycle so sources register/deregister automatically with the existing FFmpeg processes datarhei already manages. +- **M3 plan** will cover multi-viewer robustness: DELETE/PATCH WHEP methods, error-path HTTP codes (404/406/503), the admin API endpoints, and PLI absorption + RTCP BYE on teardown. +- **M4 plan** will cover CI integration: running the end-to-end harness from Task 12 in CI, the pixel-sampling latency measurement, and the p95 gates. +- **M5 plan** will cover branding and release: logo swap in the Restreamer Vue UI, README rewrite with upstream attribution, `NOTICE`/`CREDITS`, Docker image publishing, and tagging `v0.1.0-dragonfork`. + +Each follow-on plan is small enough (~3–7 days of work) that writing it after M1 lets us incorporate lessons learned without re-planning from the top. diff --git a/docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md b/docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md new file mode 100755 index 0000000..c1614e8 --- /dev/null +++ b/docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md @@ -0,0 +1,282 @@ +# Datarhei - Dragon Fork: Low-Latency WebRTC Output + +**Status:** Draft for review +**Author:** Zac (Wild Dragon) +**Date:** 2026-04-16 +**Upstream:** [datarhei/core](https://github.com/datarhei/core), [datarhei/restreamer](https://github.com/datarhei/restreamer) + +--- + +## Summary + +Fork datarhei Core and add a native WebRTC egress module ("Dragon Fork") that delivers sub-second live video to a small audience (1–5 viewers) via the WHEP protocol. All existing datarhei ingest paths (RTMP, SRT, RTSP) and outputs (HLS, DASH, SRT, etc.) remain untouched. The new module taps the existing FFmpeg pipeline via local RTP and fans packets to browser clients using [Pion](https://github.com/pion/webrtc). + +The fork is branded **"Datarhei - Dragon Fork"** — preserving upstream attribution (Apache 2.0 / MIT) while marking it as a Wild Dragon-branded distribution. + +## Goals + +- Sub-second end-to-end latency for a 1-to-few live broadcast (target: glass-to-glass p95 < 300ms on RTMP ingest, < 200ms on SRT ingest). +- Zero changes to existing datarhei ingest, transcoding, or non-WebRTC outputs. +- Viewer connects with plain WHEP (HTTP POST with SDP offer, receives SDP answer). +- Additive package — reverting the fork's WebRTC work is a `git revert` away. +- Practical deployment: single binary, single Docker image, no new infrastructure dependencies beyond optional TURN. + +## Non-Goals (v1) + +- SFU clustering or cascading (irrelevant at 1–5 viewers). +- Simulcast, SVC, or adaptive bitrate on the WebRTC path. +- LL-HLS / LL-DASH outputs. +- WHIP *ingest* (accepting WebRTC as input). Tracked as a candidate for v2 — it is the only out-of-scope feature that would meaningfully tighten the latency budget further. +- In-memory keyframe cache for faster first-frame rendering (v2 optimization). +- DVR / recording tied to the WebRTC output. +- Bundled TURN server — users run `coturn` themselves if required. +- Any Ant Media Server or Millicast feature beyond WHEP egress (conference rooms, analytics, geo-routing, multi-view, token-gated playback, etc.). + +## Context & Constraints + +- **Scale:** 1–5 concurrent viewers per stream, typically 1. Single-node SFU is more than enough. +- **Ingest:** RTMP and SRT (both already supported by datarhei). +- **Publisher control:** Publisher codec settings are controllable. Expected feed: H.264 baseline/constrained-baseline + AAC (OBS default) or Opus where possible. +- **Latency budget:** + - RTMP ingest path: ~100–300ms publisher buffering + ~30ms server hop + ~50–150ms network + ~30ms decode ⇒ realistic p95 **250–500ms**. + - SRT (low-latency mode) ingest path: ~20–120ms publisher buffering + same server/network/decode ⇒ realistic p95 **150–300ms**. +- **Existing datarhei:** Already deployed and trusted. The fork builds on that trust, it does not replace it. + +## Architecture + +### Data flow + +``` +Publisher (OBS / encoder) + │ RTMP or SRT (H.264 + AAC/Opus) + ▼ +datarhei ingest [existing] + │ + ▼ +FFmpeg process [existing, orchestrated by datarhei Core] + │ -c:v copy (H.264 passthrough, no re-encode) + │ -c:a libopus (AAC → Opus, ~5–15ms) + │ -force_key_frames (2s GOP on the webrtc output) + │ -f rtp rtp://127.0.0.1: + │ -f rtp rtp://127.0.0.1: + ▼ +Local UDP sockets (RTP) + │ + ▼ +┌──────────────────────────────────────┐ +│ NEW: core/webrtc module (Pion) │ +│ • RTP reader per stream │ +│ • Registry: stream_id → source │ +│ • WHEP HTTP endpoint │ +│ • PeerConnection fan-out │ +└──────────────────────────────────────┘ + │ + ▼ +WebRTC peers (browsers, 1–5) +``` + +### Why this shape + +- **FFmpeg → local RTP → Pion** is the standard integration pattern for attaching WebRTC to a non-WebRTC media server. It reuses datarhei's existing FFmpeg supervision, keeps the new code strictly egress-side, and avoids writing RTP packetization in Go. +- **H.264 passthrough + Opus-only transcode** means no GPU dependency, minimal server CPU, and the smallest achievable added latency on the egress hop. +- **WHEP** (a simple HTTP request/response) sidesteps the complexity of custom WebSocket signaling. It is the protocol Ant Media Server and Millicast both standardized on, and is supported by modern players and browser libraries. +- **Purely additive:** existing ingest, transcode, and non-WebRTC output code paths are unchanged. The only contact with existing code is registering a new URL scheme (`webrtc://`) with the output resolver — a new handler, not a modification of existing handlers. Isolated blast radius. + +## Module Design + +### Package layout + +``` +core/webrtc/ + config.go # configuration struct + validation + registry.go # stream_id → Source mapping (thread-safe) + source.go # RTP reader from local UDP, fan-out to subscribers + peer.go # PeerConnection lifecycle + track attachment + whep.go # HTTP handlers for POST/DELETE/PATCH /whep/{stream} + ice.go # ICE server + NAT1To1 config + keyframe.go # GOP enforcement helpers +``` + +### Peer connection lifecycle (WHEP) + +1. Viewer sends `POST /whep/{stream_id}` with SDP offer (`Content-Type: application/sdp`). +2. Handler looks up `stream_id` in `Registry`. If missing, return `404 Not Found`. +3. If codec negotiation would fail (viewer does not offer H.264 or Opus), return `406 Not Acceptable` with a body describing the mismatch. +4. If `max_peers_total` would be exceeded, return `503 Service Unavailable`. +5. Create a Pion `PeerConnection`, add two `TrackLocalStaticRTP` tracks (video H.264, audio Opus) with SSRCs matching the source. +6. Set remote description, create answer, set local description, wait for ICE gathering (with a 5s timeout and trickle-ICE support via `PATCH`). +7. Return `201 Created`, `Location: /whep/{stream_id}/{resource_id}`, SDP answer in body. +8. A source goroutine now forwards RTP packets to this peer's tracks. +9. Teardown on either `DELETE /whep/{stream_id}/{resource_id}` or ICE state `disconnected`/`failed`. + +### Source fan-out + +One goroutine per active stream reads RTP packets from its local UDP socket and writes into an in-memory ring buffer. Each subscribed peer has a goroutine that reads from the ring and writes to its `TrackLocalStaticRTP`. At 1–5 viewers, overhead is negligible. + +### Keyframe strategy + +RTP from FFmpeg is one-way, so viewer-originated PLI/FIR cannot be propagated back to the encoder. We enforce a **2-second forced keyframe interval on the WebRTC output** via `-force_key_frames "expr:gte(t,n_forced*2)"`. Worst-case first-frame latency on join is ~2s. + +RTCP PLI from viewers is absorbed and logged. Pion's built-in NACK/retransmission handles typical packet-loss recovery transparently. + +### ICE / NAT / TURN + +- Default STUN servers: `stun:stun.cloudflare.com:3478`, `stun:stun.l.google.com:19302` (overridable). +- Optional TURN: config field accepts one or more TURN URIs with credentials. Not required at target scale but wired through for flexibility. +- Public IP advertised via Pion `SettingEngine.SetNAT1To1IPs` — the operator provides the server's public IP once in config; Pion inserts it into candidates. Avoids requiring a STUN round-trip from the server itself. + +## Datarhei Integration + +### New output type: `webrtc://` + +A new URL scheme recognized by the datarhei Core output resolver. Example process configuration: + +```json +{ + "id": "myStream", + "input": [{ "address": "{rtmp,name=myStream.stream}", "options": [] }], + "output": [ + { "address": "...existing HLS output..." }, + { + "address": "webrtc://internal/myStream", + "options": ["-c:v", "copy", "-an"] + }, + { + "address": "webrtc://internal/myStream?track=audio", + "options": ["-c:a", "libopus", "-b:a", "128k", "-vn"] + } + ] +} +``` + +### Resolver behavior + +On process start, each `webrtc://` output triggers the resolver to: + +1. Allocate a local UDP port from the configured `udp_port_range`. +2. Register `(stream_id, track, ssrc, port)` in `webrtc.Registry`. +3. Rewrite the FFmpeg output from `webrtc://internal/{stream_id}` to `rtp://127.0.0.1:?pkt_size=1200`, and (for video tracks only) prepend `-force_key_frames "expr:gte(t,n_forced*2)"` to the options list. Both transformations are done by the resolver — the user's process JSON never contains these details. + +On process stop (clean exit, crash, or user stop): + +1. Tear down all peer connections subscribed to this stream (RTCP BYE + `PeerConnection.Close()`). +2. Deregister from the registry. +3. Release UDP ports to the pool. + +Hooked into datarhei's existing process lifecycle events — no new supervision logic required. + +### API endpoints + +| Method | Path | Purpose | Auth | +|---|---|---|---| +| `POST` | `/whep/{stream_id}` | Subscribe (SDP offer in, SDP answer out) | Public or token-gated (see Open Questions) | +| `DELETE` | `/whep/{stream_id}/{resource_id}` | Unsubscribe | — | +| `PATCH` | `/whep/{stream_id}/{resource_id}` | Trickle ICE | — | +| `GET` | `/api/v3/webrtc/streams` | List active streams + subscriber counts | Admin | +| `GET` | `/api/v3/webrtc/streams/{id}/peers` | Per-stream peer stats | Admin | + +### Configuration + +Added to datarhei Core's config (HCL/JSON; example in HCL): + +```hcl +webrtc { + enabled = true + whep_listen = ":8787" + public_ip = "203.0.113.10" + udp_port_range = "10000-10100" + ice_servers = ["stun:stun.cloudflare.com:3478"] + max_peers_total = 32 +} +``` + +### UI + +**Out of scope for v1.** API-only first. The Restreamer Vue UI gets a minor addition in a later release: a "WebRTC" checkbox on each stream, the WHEP URL, and a live viewer count. UI work is decoupled and non-blocking. + +## Error Handling & Edge Cases + +| Scenario | Behavior | +|---|---| +| Publisher disconnects / FFmpeg exits | Registry emits "source removed"; all peers for that stream torn down with RTCP BYE; WHEP returns 404 until stream restarts. | +| Viewer disconnects (tab close, network) | Pion `OnConnectionStateChange` → cleanup; peer unsubscribed; no server-side retry. | +| First-frame on join | Up to ~2s (forced-GOP interval). Acceptable for broadcast. v2 optimization: in-memory keyframe cache. | +| Viewer codec mismatch | `406 Not Acceptable` with body describing mismatch. In practice never hit — every modern browser supports H.264 baseline + Opus via WebRTC. | +| UDP port exhaustion | Process start fails with clear error. At target scale (≤5 streams) irrelevant. | +| Peer cap reached | `503 Service Unavailable` on new WHEP POSTs. Hard safety rail. | +| ICE gathering timeout | 5s limit; return `500` with diagnostic error message. | +| TURN credential failure | Logged; surfaced in `/api/v3/webrtc/streams` so admins see it without tailing logs. | +| FFmpeg-to-UDP push failure (port conflict, etc.) | Piggybacks on existing datarhei FFmpeg supervision (restart with backoff). No new logic. | + +## Testing + +### Unit tests (`core/webrtc`) + +- `registry`: register/deregister, concurrent access, not-found paths. +- `source`: RTP reading, fan-out to N subscribers, subscriber cleanup on close. +- `whep`: handlers with mock peer-connection factory; verify `201`/`404`/`406`/`503`; SDP parse happy path + malformed input. +- `ice`: config → Pion `SettingEngine` translation. + +Coverage target: ~70% on this package. Not chasing 100% — some Pion paths are impractical to mock meaningfully. + +### Integration tests (end-to-end, in CI) + +1. Start forked datarhei Core in-process. +2. Launch an FFmpeg publisher sending a deterministic test pattern (`testsrc2` with burned-in frame counter + timecode) over RTMP. +3. Configure a process with `webrtc://` outputs. +4. Use a Pion-based test WHEP client (headless — no browser) to subscribe. +5. Assert: connection establishes, RTP arrives, keyframe seen within 3s of subscribe. + +### Latency measurement (CI pass/fail) + +- Publisher embeds a frame counter via `drawtext` in `testsrc2`. +- Test client decodes and extracts the frame counter (simple pixel sampling against a known bounding box — lighter than full OCR, no new dependency). +- Latency per frame = wall-clock at decode − publisher wall-clock at encode. +- 60-second run; record p50/p95/p99. +- CI gate: + - RTMP ingest path: p95 < 300ms. + - SRT ingest path: p95 < 200ms. + +### Browser smoke test (manual) + +A `test/whep-player.html` — plain HTML + `RTCPeerConnection` + a WHEP URL input. Used for real-browser / real-network human verification. Documented in `TESTING.md`, not automated. + +### Load test (one-shot, not CI) + +Script opens 5 concurrent WHEP peers against one stream, holds 10 minutes, reports CPU/memory/packet-loss/jitter. Run once before cutting v1. + +## Milestones + +| # | Scope | Duration | Exit criteria | +|---|---|---|---| +| M1 | Media-path PoC (hardcoded stream, manual FFmpeg, test WHEP client, no datarhei integration) | 1–2 weeks | 1 publisher → 1 viewer, decoded video | +| M2 | Process integration (`webrtc://` resolver, config, WHEP served from Core, lifecycle hooks) | 1 week | Standard datarhei process JSON with `webrtc://` output works end-to-end | +| M3 | Robustness + multi-viewer (fan-out, teardown paths, keyframe enforcement, error codes, admin API) | 1 week | 5 concurrent viewers, all error paths correct, clean teardown | +| M4 | Tests & CI (unit, integration, latency p95 gate, browser smoke, `TESTING.md`) | 3–5 days | CI green, latency targets met | +| M5 | Dragon Fork branding & release (UI logo swap, README, `NOTICE`/`CREDITS`, Docker image, tag `v0.1.0-dragonfork`) | 1–2 days | Publishable release | + +**Total realistic scope: ~4–5 weeks of focused work.** + +## Branding + +- **Project name:** Datarhei - Dragon Fork +- **Go module path:** `github.com/wilddragon/datarhei-dragonfork-core` (placeholder — confirm at M5) +- **Docker images:** `wilddragon/datarhei-dragonfork-core`, `wilddragon/datarhei-dragonfork-restreamer` +- **Logo asset:** Wild Dragon mark, used as Restreamer UI logo, README header, and any shipped WHEP viewer page +- **Upstream attribution:** `NOTICE` / `CREDITS` file referencing datarhei Core (Apache 2.0) and Restreamer (MIT); README header clearly labels the project as a fork. + +## Open Questions (to resolve during M1–M2) + +1. **WHEP auth model.** Public endpoint vs. simple bearer token vs. time-limited signed URL. Not decided; for an invite-only audience of 1–5 viewers, a shared bearer token is probably fine. Can revisit once M1 is working. +2. **Exact Go module path.** Depends on repo location. +3. **Restreamer UI version target.** Confirm which UI repo/branch to rebrand at M5. + +## References + +- [datarhei/core](https://github.com/datarhei/core) (Apache 2.0) +- [datarhei/restreamer](https://github.com/datarhei/restreamer) (MIT) +- [Pion WebRTC](https://github.com/pion/webrtc) (MIT) +- [WHEP draft spec (IETF)](https://datatracker.ietf.org/doc/draft-murillo-whep/) +- [WHIP draft spec (IETF)](https://datatracker.ietf.org/doc/draft-ietf-wish-whip/) — referenced for the future v2 ingest path +- [Ant Media Server Community](https://github.com/ant-media/Ant-Media-Server) — prior-art reference for WHEP/WHIP in a Java SFU +- [OvenMediaEngine](https://github.com/AirenSoft/OvenMediaEngine) — prior-art reference for sub-second WebRTC broadcast