# Datarhei - Dragon Fork M1: Media-Path PoC Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Prove a working end-to-end WebRTC egress path: an FFmpeg publisher pushes RTP into a new Go package, which serves it to a Pion-based WHEP client that successfully decodes video frames. **Architecture:** New standalone Go package `core/webrtc` inside the datarhei Core fork. FFmpeg produces RTP on a local UDP socket → package reads RTP → WHEP HTTP endpoint serves it via Pion `PeerConnection` → test client subscribes and decodes. No datarhei process-model integration yet (that's M2). This milestone answers: *does the integration pattern actually work, and what are the gotchas?* **Tech Stack:** Go 1.22+, [Pion WebRTC v4](https://github.com/pion/webrtc) (`github.com/pion/webrtc/v4`), [Pion RTP](https://github.com/pion/rtp), FFmpeg 6.x (publisher + test pattern), standard library `net/http`. **Out of scope for this plan:** datarhei process-model integration (M2), multi-viewer fan-out polish (M3), CI test harness (M4), branding (M5). Separate plans will be written for each after M1 completes. --- ## Prerequisites - Go 1.22+ installed locally - `git` configured - FFmpeg 6.x on PATH (`ffmpeg -version` reports 6.0 or newer) - A Git host account (GitHub recommended for initial fork) - Linux or macOS development machine (Windows works but UDP port behavior differs; document what you're on when filing any issues) --- ## File Structure Files created in this milestone: ``` core/webrtc/ config.go # Config struct + defaults + Validate() registry.go # stream_id → *Source map, thread-safe registry_test.go source.go # RTP UDP reader + fan-out ring buffer source_test.go peer.go # PeerConnection factory, track attachment peer_test.go whep.go # HTTP handler: POST /whep/{stream_id} whep_test.go ice.go # SettingEngine builder (NAT1To1, ICE servers) ice_test.go errors.go # Typed error values (ErrStreamNotFound, etc.) cmd/webrtc-poc/ main.go # Standalone PoC binary (NOT datarhei Core yet) test/ publish.sh # FFmpeg publisher script (testsrc2 → local RTP) whep-client/ main.go # Pion-based test WHEP subscriber main_test.go docs/design/ (copy of the approved spec from brainstorming) ``` Total new Go files: 11 source + 5 test = 16 files. Total lines: ~1200-1500 including tests and comments. --- ## Task 0: Fork the repo and set up the workspace **Files:** - Create: new fork of `datarhei/core` **Rationale:** Everything else depends on having a fork to commit into. No code yet — just repo setup. - [ ] **Step 1: Fork datarhei/core on your Git host** On GitHub: navigate to https://github.com/datarhei/core, click Fork, name the fork `datarhei-dragonfork-core` under your `wilddragon` org (or personal account). - [ ] **Step 2: Clone the fork locally** Run: ```bash git clone git@github.com:wilddragon/datarhei-dragonfork-core.git cd datarhei-dragonfork-core ``` Expected: repo cloned, `git status` clean on `main` branch. - [ ] **Step 3: Create the M1 working branch** Run: ```bash git checkout -b m1-webrtc-poc ``` Expected: branch created and checked out. - [ ] **Step 4: Copy the approved design spec into the repo** ```bash mkdir -p docs/design cp /path/to/2026-04-16-datarhei-dragon-fork-webrtc-design.md docs/design/ ``` (Path will be wherever you saved the spec from the brainstorming session.) - [ ] **Step 5: Verify the repo builds unchanged** Run: ```bash go build ./... ``` Expected: build succeeds. If it fails, the fork is broken before you started — stop and fix upstream issues first. - [ ] **Step 6: Run upstream tests** Run: ```bash go test ./... ``` Expected: all tests pass (or at least match what upstream CI shows green). Document any pre-existing flakes in a NOTES.md file so you don't later blame your changes for them. - [ ] **Step 7: Commit the spec and a NOTES.md baseline** ```bash git add docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md NOTES.md git commit -m "docs: add Dragon Fork WebRTC egress design spec" ``` --- ## Task 1: Add Pion WebRTC dependency **Files:** - Modify: `go.mod` - Modify: `go.sum` - [ ] **Step 1: Add Pion dependencies** Run from repo root: ```bash go get github.com/pion/webrtc/v4@latest go get github.com/pion/rtp@latest go get github.com/pion/rtcp@latest ``` Expected: `go.mod` updated with new `require` entries; `go.sum` updated. - [ ] **Step 2: Tidy dependencies** Run: ```bash go mod tidy ``` Expected: no errors. `go.mod` stable. - [ ] **Step 3: Sanity-check that Pion loads** Create a throwaway file `/tmp/pion_smoke.go`: ```go package main import ( "fmt" "github.com/pion/webrtc/v4" ) func main() { api := webrtc.NewAPI() pc, err := api.NewPeerConnection(webrtc.Configuration{}) if err != nil { panic(err) } fmt.Println("Pion OK, state:", pc.ConnectionState()) _ = pc.Close() } ``` Run: ```bash go run /tmp/pion_smoke.go ``` Expected output: `Pion OK, state: new` (or similar). Delete the file after. - [ ] **Step 4: Commit** ```bash git add go.mod go.sum git commit -m "build: add Pion WebRTC v4 dependency" ``` --- ## Task 2: Create the core/webrtc package skeleton + typed errors **Files:** - Create: `core/webrtc/errors.go` - Create: `core/webrtc/doc.go` - [ ] **Step 1: Create `core/webrtc/doc.go`** ```go // Package webrtc implements the Dragon Fork WebRTC egress module. // // It exposes a WHEP (WebRTC-HTTP Egress Protocol) HTTP endpoint and serves // live RTP produced by an FFmpeg process on a local UDP socket to one or // more WebRTC peer connections built with Pion. // // This package is additive: it does not modify existing datarhei ingest, // transcode, or non-WebRTC output code paths. The only contact with // existing code is a new URL scheme ("webrtc://") registered with the // output resolver (done in milestone M2, not here). package webrtc ``` - [ ] **Step 2: Create `core/webrtc/errors.go`** ```go package webrtc import "errors" // Sentinel errors returned by package functions. var ( // ErrStreamNotFound indicates a WHEP subscribe referenced a stream_id // that has no registered source. Maps to HTTP 404. ErrStreamNotFound = errors.New("webrtc: stream not found") // ErrPeerCapReached indicates max_peers_total has been exceeded. // Maps to HTTP 503. ErrPeerCapReached = errors.New("webrtc: peer capacity reached") // ErrCodecMismatch indicates the viewer's SDP offer does not include // a codec the source can serve (expected H.264 + Opus). Maps to HTTP 406. ErrCodecMismatch = errors.New("webrtc: codec mismatch") // ErrInvalidSDP indicates the request body was not a parseable SDP offer. // Maps to HTTP 400. ErrInvalidSDP = errors.New("webrtc: invalid SDP") // ErrICETimeout indicates ICE gathering did not complete within the // configured timeout. Maps to HTTP 500. ErrICETimeout = errors.New("webrtc: ICE gathering timeout") ) ``` - [ ] **Step 3: Verify the package compiles** Run: ```bash go build ./core/webrtc/... ``` Expected: no output (successful build). - [ ] **Step 4: Commit** ```bash git add core/webrtc/doc.go core/webrtc/errors.go git commit -m "feat(webrtc): add package skeleton and typed errors" ``` --- ## Task 3: Config struct **Files:** - Create: `core/webrtc/config.go` - Create: `core/webrtc/config_test.go` - [ ] **Step 1: Write the failing test `core/webrtc/config_test.go`** ```go package webrtc import ( "testing" ) func TestConfig_Defaults(t *testing.T) { c := DefaultConfig() if !c.Enabled { t.Error("default Enabled should be true") } if c.WHEPListen != ":8787" { t.Errorf("default WHEPListen = %q, want :8787", c.WHEPListen) } if c.UDPPortRange.Low != 10000 || c.UDPPortRange.High != 10100 { t.Errorf("default UDPPortRange = %v, want 10000-10100", c.UDPPortRange) } if c.MaxPeersTotal != 32 { t.Errorf("default MaxPeersTotal = %d, want 32", c.MaxPeersTotal) } if len(c.ICEServers) == 0 { t.Error("default ICEServers should have at least one STUN entry") } } func TestConfig_Validate(t *testing.T) { tests := []struct { name string mutate func(*Config) wantErr bool }{ {"defaults are valid", func(c *Config) {}, false}, {"empty listen", func(c *Config) { c.WHEPListen = "" }, true}, {"inverted port range", func(c *Config) { c.UDPPortRange.Low = 20000; c.UDPPortRange.High = 10000 }, true}, {"zero max peers", func(c *Config) { c.MaxPeersTotal = 0 }, true}, {"negative max peers", func(c *Config) { c.MaxPeersTotal = -1 }, true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := DefaultConfig() tt.mutate(&c) err := c.Validate() if (err != nil) != tt.wantErr { t.Errorf("Validate() err = %v, wantErr %v", err, tt.wantErr) } }) } } ``` - [ ] **Step 2: Run the test and verify it fails** Run: ```bash go test ./core/webrtc/ -run TestConfig -v ``` Expected: FAIL with "undefined: Config" / "undefined: DefaultConfig". - [ ] **Step 3: Write the minimal implementation `core/webrtc/config.go`** ```go package webrtc import "fmt" // PortRange represents an inclusive UDP port range. type PortRange struct { Low, High int } // Config controls the WebRTC egress module. type Config struct { // Enabled toggles the entire module. When false, no endpoints are served. Enabled bool // WHEPListen is the address the WHEP HTTP endpoint binds to (e.g. ":8787"). WHEPListen string // PublicIP is the server's externally-reachable IP, advertised in ICE // candidates via NAT1To1. Empty means rely on STUN discovery. PublicIP string // UDPPortRange bounds the local UDP ports allocated for FFmpeg→Pion RTP. UDPPortRange PortRange // ICEServers is the list of STUN/TURN URIs given to each PeerConnection. ICEServers []string // MaxPeersTotal is a hard safety cap on concurrent subscribers. MaxPeersTotal int } // DefaultConfig returns production-reasonable defaults. func DefaultConfig() Config { return Config{ Enabled: true, WHEPListen: ":8787", PublicIP: "", UDPPortRange: PortRange{Low: 10000, High: 10100}, ICEServers: []string{"stun:stun.cloudflare.com:3478", "stun:stun.l.google.com:19302"}, MaxPeersTotal: 32, } } // Validate returns an error if the config is internally inconsistent. func (c Config) Validate() error { if c.WHEPListen == "" { return fmt.Errorf("webrtc: WHEPListen must not be empty") } if c.UDPPortRange.Low <= 0 || c.UDPPortRange.High <= 0 { return fmt.Errorf("webrtc: UDPPortRange must have positive bounds, got %v", c.UDPPortRange) } if c.UDPPortRange.Low > c.UDPPortRange.High { return fmt.Errorf("webrtc: UDPPortRange.Low > High (%d > %d)", c.UDPPortRange.Low, c.UDPPortRange.High) } if c.MaxPeersTotal <= 0 { return fmt.Errorf("webrtc: MaxPeersTotal must be positive, got %d", c.MaxPeersTotal) } return nil } ``` - [ ] **Step 4: Run the tests and verify they pass** Run: ```bash go test ./core/webrtc/ -run TestConfig -v ``` Expected: PASS for both `TestConfig_Defaults` and `TestConfig_Validate` (all subtests). - [ ] **Step 5: Commit** ```bash git add core/webrtc/config.go core/webrtc/config_test.go git commit -m "feat(webrtc): add Config with defaults and validation" ``` --- ## Task 4: Registry — stream_id → Source mapping **Files:** - Create: `core/webrtc/registry.go` - Create: `core/webrtc/registry_test.go` A **Source** (defined in the next task) represents a live RTP stream that peers can subscribe to. The **Registry** is the thread-safe map that maps stream IDs to active sources. Writing this first because Source depends on it less than it does on Source. - [ ] **Step 1: Write the failing test `core/webrtc/registry_test.go`** ```go package webrtc import ( "sync" "testing" ) // mockSource implements the minimum Source-like shape needed by the registry. // The real Source type is defined in Task 5; the registry only needs a // stable type to store and retrieve. type mockSource struct { id string } func (m *mockSource) ID() string { return m.id } func TestRegistry_RegisterAndLookup(t *testing.T) { r := NewRegistry() src := &mockSource{id: "streamA"} if err := r.Register("streamA", src); err != nil { t.Fatalf("Register returned error: %v", err) } got, ok := r.Lookup("streamA") if !ok { t.Fatal("Lookup(streamA) returned ok=false, want true") } if got != src { t.Errorf("Lookup returned %v, want %v", got, src) } } func TestRegistry_LookupMissing(t *testing.T) { r := NewRegistry() _, ok := r.Lookup("nope") if ok { t.Error("Lookup on empty registry returned ok=true, want false") } } func TestRegistry_DuplicateRegister(t *testing.T) { r := NewRegistry() _ = r.Register("streamA", &mockSource{id: "streamA"}) if err := r.Register("streamA", &mockSource{id: "streamA"}); err == nil { t.Error("duplicate Register should return error, got nil") } } func TestRegistry_Deregister(t *testing.T) { r := NewRegistry() _ = r.Register("streamA", &mockSource{id: "streamA"}) r.Deregister("streamA") if _, ok := r.Lookup("streamA"); ok { t.Error("after Deregister, Lookup should return ok=false") } } func TestRegistry_ConcurrentAccess(t *testing.T) { r := NewRegistry() var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(3) id := string(rune('a' + (i % 26))) go func() { defer wg.Done(); _ = r.Register(id, &mockSource{id: id}) }() go func() { defer wg.Done(); _, _ = r.Lookup(id) }() go func() { defer wg.Done(); r.Deregister(id) }() } wg.Wait() // No assertion — test passes if -race doesn't flag anything. } ``` - [ ] **Step 2: Run the test and verify it fails** Run: ```bash go test ./core/webrtc/ -run TestRegistry -v ``` Expected: FAIL with "undefined: NewRegistry". - [ ] **Step 3: Write the minimal implementation `core/webrtc/registry.go`** ```go package webrtc import ( "fmt" "sync" ) // SourceHandle is the minimal interface the Registry stores per stream_id. // The concrete type is *Source, defined in source.go. type SourceHandle interface { ID() string } // Registry is a thread-safe map from stream_id to active SourceHandle. type Registry struct { mu sync.RWMutex streams map[string]SourceHandle } // NewRegistry returns an empty Registry. func NewRegistry() *Registry { return &Registry{streams: make(map[string]SourceHandle)} } // Register associates src with streamID. Returns an error if streamID is // already registered. func (r *Registry) Register(streamID string, src SourceHandle) error { r.mu.Lock() defer r.mu.Unlock() if _, exists := r.streams[streamID]; exists { return fmt.Errorf("webrtc: stream %q already registered", streamID) } r.streams[streamID] = src return nil } // Lookup returns the handle for streamID. The second return value is false // if no source is registered. func (r *Registry) Lookup(streamID string) (SourceHandle, bool) { r.mu.RLock() defer r.mu.RUnlock() src, ok := r.streams[streamID] return src, ok } // Deregister removes streamID. No-op if not present. func (r *Registry) Deregister(streamID string) { r.mu.Lock() defer r.mu.Unlock() delete(r.streams, streamID) } ``` - [ ] **Step 4: Run the tests and verify they pass** Run: ```bash go test ./core/webrtc/ -run TestRegistry -v -race ``` Expected: PASS for all TestRegistry subtests, no data races. - [ ] **Step 5: Commit** ```bash git add core/webrtc/registry.go core/webrtc/registry_test.go git commit -m "feat(webrtc): add thread-safe Registry for stream_id → SourceHandle" ``` --- ## Task 5: Source — RTP UDP reader + subscriber fan-out **Files:** - Create: `core/webrtc/source.go` - Create: `core/webrtc/source_test.go` A Source owns a UDP socket bound to a local port, reads RTP packets, and forwards them to every subscribed peer's video/audio track. For M1, we deliberately keep the fan-out simple (per-subscriber goroutine writing to a buffered channel) because at 1–5 viewers the naive model is entirely sufficient. - [ ] **Step 1: Write the failing test `core/webrtc/source_test.go`** ```go package webrtc import ( "net" "testing" "time" "github.com/pion/rtp" ) func TestSource_ID(t *testing.T) { s, err := NewSource("streamA", 0) // 0 = ephemeral port if err != nil { t.Fatalf("NewSource: %v", err) } defer s.Close() if s.ID() != "streamA" { t.Errorf("ID() = %q, want streamA", s.ID()) } } func TestSource_ReceiveAndFanout(t *testing.T) { s, err := NewSource("streamA", 0) if err != nil { t.Fatalf("NewSource: %v", err) } defer s.Close() // Subscribe before sending. sub := s.Subscribe(16) // buffer depth 16 defer s.Unsubscribe(sub) s.Start() // Build and send a minimal RTP packet to the source's UDP port. pkt := &rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: 96, SequenceNumber: 1, Timestamp: 1000, SSRC: 0xDEADBEEF, }, Payload: []byte{0x01, 0x02, 0x03, 0x04}, } raw, err := pkt.Marshal() if err != nil { t.Fatalf("pkt.Marshal: %v", err) } conn, err := net.Dial("udp", s.LocalAddr().String()) if err != nil { t.Fatalf("net.Dial: %v", err) } defer conn.Close() if _, err := conn.Write(raw); err != nil { t.Fatalf("conn.Write: %v", err) } select { case got := <-sub: if got.SSRC != 0xDEADBEEF { t.Errorf("received SSRC = %x, want DEADBEEF", got.SSRC) } if got.SequenceNumber != 1 { t.Errorf("received SeqNum = %d, want 1", got.SequenceNumber) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for RTP packet on subscriber channel") } } func TestSource_MultipleSubscribers(t *testing.T) { s, err := NewSource("streamA", 0) if err != nil { t.Fatalf("NewSource: %v", err) } defer s.Close() subs := []chan *rtp.Packet{ s.Subscribe(8), s.Subscribe(8), s.Subscribe(8), } for _, sub := range subs { defer s.Unsubscribe(sub) } s.Start() raw, _ := (&rtp.Packet{ Header: rtp.Header{Version: 2, PayloadType: 96, SequenceNumber: 42, SSRC: 1}, Payload: []byte{0xAA}, }).Marshal() conn, _ := net.Dial("udp", s.LocalAddr().String()) defer conn.Close() _, _ = conn.Write(raw) for i, sub := range subs { select { case got := <-sub: if got.SequenceNumber != 42 { t.Errorf("sub %d got seq %d, want 42", i, got.SequenceNumber) } case <-time.After(2 * time.Second): t.Errorf("sub %d timed out", i) } } } func TestSource_UnsubscribeStopsDelivery(t *testing.T) { s, _ := NewSource("streamA", 0) defer s.Close() sub := s.Subscribe(8) s.Start() s.Unsubscribe(sub) // After Unsubscribe, the channel should be closed. select { case _, ok := <-sub: if ok { t.Error("expected channel closed after Unsubscribe, got value") } case <-time.After(500 * time.Millisecond): t.Error("timed out waiting for channel close") } } ``` - [ ] **Step 2: Run the test and verify it fails** Run: ```bash go test ./core/webrtc/ -run TestSource -v ``` Expected: FAIL with "undefined: NewSource". - [ ] **Step 3: Write the minimal implementation `core/webrtc/source.go`** ```go package webrtc import ( "fmt" "net" "sync" "github.com/pion/rtp" ) // Source reads RTP packets from a local UDP socket and fans them out to // subscribed peers via per-subscriber buffered channels. type Source struct { id string conn *net.UDPConn mu sync.Mutex subscribers map[chan *rtp.Packet]struct{} started bool closed bool done chan struct{} } // NewSource binds a UDP socket on 127.0.0.1:port. Pass port=0 to let the OS // assign an ephemeral port (useful for tests). func NewSource(streamID string, port int) (*Source, error) { addr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: port} conn, err := net.ListenUDP("udp4", addr) if err != nil { return nil, fmt.Errorf("webrtc: listen udp: %w", err) } return &Source{ id: streamID, conn: conn, subscribers: make(map[chan *rtp.Packet]struct{}), done: make(chan struct{}), }, nil } // ID returns the registered stream identifier. func (s *Source) ID() string { return s.id } // LocalAddr returns the UDP address the source is listening on. func (s *Source) LocalAddr() *net.UDPAddr { return s.conn.LocalAddr().(*net.UDPAddr) } // Subscribe returns a new buffered channel that receives every RTP packet // read from the UDP socket. bufDepth is the channel buffer size; when full, // packets are dropped (preventing a slow subscriber from back-pressuring // the reader). func (s *Source) Subscribe(bufDepth int) chan *rtp.Packet { ch := make(chan *rtp.Packet, bufDepth) s.mu.Lock() s.subscribers[ch] = struct{}{} s.mu.Unlock() return ch } // Unsubscribe removes ch from the subscriber set and closes it. func (s *Source) Unsubscribe(ch chan *rtp.Packet) { s.mu.Lock() defer s.mu.Unlock() if _, ok := s.subscribers[ch]; ok { delete(s.subscribers, ch) close(ch) } } // Start begins the RTP reader goroutine. Safe to call once; subsequent calls // are no-ops. func (s *Source) Start() { s.mu.Lock() if s.started || s.closed { s.mu.Unlock() return } s.started = true s.mu.Unlock() go s.readLoop() } func (s *Source) readLoop() { buf := make([]byte, 1500) // MTU-sized; RTP over UDP should fit for { select { case <-s.done: return default: } n, _, err := s.conn.ReadFromUDP(buf) if err != nil { // Socket closed or error — exit the loop. return } pkt := &rtp.Packet{} if err := pkt.Unmarshal(buf[:n]); err != nil { // Malformed packet; skip without crashing. continue } s.mu.Lock() for ch := range s.subscribers { select { case ch <- pkt: default: // Subscriber full — drop to protect the reader. } } s.mu.Unlock() } } // Close stops the reader goroutine, closes the UDP socket, and closes every // subscriber channel. func (s *Source) Close() error { s.mu.Lock() if s.closed { s.mu.Unlock() return nil } s.closed = true close(s.done) for ch := range s.subscribers { delete(s.subscribers, ch) close(ch) } s.mu.Unlock() return s.conn.Close() } ``` - [ ] **Step 4: Run the tests and verify they pass** Run: ```bash go test ./core/webrtc/ -run TestSource -v -race ``` Expected: PASS for all TestSource subtests, no data races. - [ ] **Step 5: Commit** ```bash git add core/webrtc/source.go core/webrtc/source_test.go git commit -m "feat(webrtc): add Source with UDP RTP reader and subscriber fan-out" ``` --- ## Task 6: ICE config helper (SettingEngine builder) **Files:** - Create: `core/webrtc/ice.go` - Create: `core/webrtc/ice_test.go` Isolated helper that translates our `Config` into Pion's `SettingEngine` + `Configuration` pair. Keeping it separate makes peer.go simpler and the ICE config trivially testable. - [ ] **Step 1: Write the failing test `core/webrtc/ice_test.go`** ```go package webrtc import ( "testing" "github.com/pion/webrtc/v4" ) func TestBuildICEConfig_Defaults(t *testing.T) { c := DefaultConfig() rtcConfig, _, err := BuildICEConfig(c) if err != nil { t.Fatalf("BuildICEConfig: %v", err) } if len(rtcConfig.ICEServers) == 0 { t.Error("ICEServers should not be empty") } // First default is Cloudflare STUN. if rtcConfig.ICEServers[0].URLs[0] != "stun:stun.cloudflare.com:3478" { t.Errorf("first ICE server = %q, want stun:stun.cloudflare.com:3478", rtcConfig.ICEServers[0].URLs[0]) } } func TestBuildICEConfig_PublicIP(t *testing.T) { c := DefaultConfig() c.PublicIP = "203.0.113.10" _, se, err := BuildICEConfig(c) if err != nil { t.Fatalf("BuildICEConfig: %v", err) } if se == nil { t.Fatal("SettingEngine should not be nil when PublicIP is set") } // We can't introspect NAT1To1IPs directly from Pion's public API; the // smoke test is that building an API from this engine works. api := webrtc.NewAPI(webrtc.WithSettingEngine(*se)) if api == nil { t.Fatal("NewAPI returned nil") } } func TestBuildICEConfig_InvalidConfig(t *testing.T) { c := DefaultConfig() c.WHEPListen = "" _, _, err := BuildICEConfig(c) if err == nil { t.Error("BuildICEConfig should reject invalid config") } } ``` - [ ] **Step 2: Run the test and verify it fails** Run: ```bash go test ./core/webrtc/ -run TestBuildICEConfig -v ``` Expected: FAIL with "undefined: BuildICEConfig". - [ ] **Step 3: Write the minimal implementation `core/webrtc/ice.go`** ```go package webrtc import ( "github.com/pion/webrtc/v4" ) // BuildICEConfig translates a Config into the two Pion config pieces every // PeerConnection needs: a webrtc.Configuration (with ICE servers) and a // SettingEngine (with NAT1To1 and port range tuning). // // The returned *SettingEngine may be nil if no engine-level tuning is // required (i.e. PublicIP unset and UDPPortRange at defaults). Callers // should only pass it to webrtc.NewAPI when non-nil. func BuildICEConfig(c Config) (webrtc.Configuration, *webrtc.SettingEngine, error) { if err := c.Validate(); err != nil { return webrtc.Configuration{}, nil, err } rtcConfig := webrtc.Configuration{ ICEServers: make([]webrtc.ICEServer, 0, len(c.ICEServers)), } for _, uri := range c.ICEServers { rtcConfig.ICEServers = append(rtcConfig.ICEServers, webrtc.ICEServer{ URLs: []string{uri}, }) } var se *webrtc.SettingEngine if c.PublicIP != "" || c.UDPPortRange.Low > 0 { engine := webrtc.SettingEngine{} if c.PublicIP != "" { engine.SetNAT1To1IPs([]string{c.PublicIP}, webrtc.ICECandidateTypeHost) } // Constrain the ephemeral UDP range Pion allocates for ICE candidates. // Note: this is a separate concern from our FFmpeg→Source UDP ports; // Pion uses its own port pool for the WebRTC media path. if c.UDPPortRange.Low > 0 && c.UDPPortRange.High >= c.UDPPortRange.Low { if err := engine.SetEphemeralUDPPortRange( uint16(c.UDPPortRange.Low), uint16(c.UDPPortRange.High)); err != nil { return webrtc.Configuration{}, nil, err } } se = &engine } return rtcConfig, se, nil } ``` - [ ] **Step 4: Run the tests and verify they pass** Run: ```bash go test ./core/webrtc/ -run TestBuildICEConfig -v ``` Expected: PASS for all TestBuildICEConfig subtests. - [ ] **Step 5: Commit** ```bash git add core/webrtc/ice.go core/webrtc/ice_test.go git commit -m "feat(webrtc): add ICE config helper (Configuration + SettingEngine)" ``` --- ## Task 7: Peer — PeerConnection factory + track attachment **Files:** - Create: `core/webrtc/peer.go` - Create: `core/webrtc/peer_test.go` Creates a Pion `PeerConnection`, adds video (H.264) and audio (Opus) `TrackLocalStaticRTP`, negotiates SDP, and starts a goroutine that forwards packets from a Source subscription into those tracks. - [ ] **Step 1: Write the failing test `core/webrtc/peer_test.go`** ```go package webrtc import ( "context" "testing" "time" "github.com/pion/webrtc/v4" ) // minimalOfferSDP returns an SDP offer that advertises H.264 (video) and // Opus (audio) as recvonly — the minimum a WHEP client sends. func minimalOfferSDP(t *testing.T) webrtc.SessionDescription { t.Helper() // Create a throwaway PC to generate a valid offer. me := &webrtc.MediaEngine{} if err := me.RegisterDefaultCodecs(); err != nil { t.Fatalf("RegisterDefaultCodecs: %v", err) } api := webrtc.NewAPI(webrtc.WithMediaEngine(me)) pc, err := api.NewPeerConnection(webrtc.Configuration{}) if err != nil { t.Fatalf("NewPeerConnection: %v", err) } defer pc.Close() if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { t.Fatalf("AddTransceiver video: %v", err) } if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { t.Fatalf("AddTransceiver audio: %v", err) } offer, err := pc.CreateOffer(nil) if err != nil { t.Fatalf("CreateOffer: %v", err) } return offer } func TestPeerFactory_CreateAnswer(t *testing.T) { src, err := NewSource("streamA", 0) if err != nil { t.Fatalf("NewSource: %v", err) } defer src.Close() src.Start() cfg := DefaultConfig() factory, err := NewPeerFactory(cfg) if err != nil { t.Fatalf("NewPeerFactory: %v", err) } offer := minimalOfferSDP(t) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() peer, err := factory.CreatePeer(ctx, src, offer) if err != nil { t.Fatalf("CreatePeer: %v", err) } defer peer.Close() if peer.Answer().Type != webrtc.SDPTypeAnswer { t.Errorf("Answer().Type = %v, want answer", peer.Answer().Type) } if peer.ResourceID() == "" { t.Error("ResourceID should be non-empty") } } func TestPeerFactory_ClosesCleanly(t *testing.T) { src, _ := NewSource("streamA", 0) defer src.Close() src.Start() factory, _ := NewPeerFactory(DefaultConfig()) offer := minimalOfferSDP(t) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() peer, err := factory.CreatePeer(ctx, src, offer) if err != nil { t.Fatalf("CreatePeer: %v", err) } if err := peer.Close(); err != nil { t.Errorf("Close: %v", err) } // Second close should be a no-op, not panic. if err := peer.Close(); err != nil { t.Errorf("second Close: %v", err) } } ``` - [ ] **Step 2: Run the test and verify it fails** Run: ```bash go test ./core/webrtc/ -run TestPeerFactory -v ``` Expected: FAIL with "undefined: NewPeerFactory". - [ ] **Step 3: Write the minimal implementation `core/webrtc/peer.go`** ```go package webrtc import ( "context" "crypto/rand" "encoding/hex" "fmt" "sync" "github.com/pion/rtp" "github.com/pion/webrtc/v4" ) // PeerFactory builds PeerConnections from a shared Pion API instance // configured from Config. type PeerFactory struct { api *webrtc.API rtcConfig webrtc.Configuration } // NewPeerFactory initializes a Pion API with the codec set we support // (H.264 + Opus) and applies the provided Config. func NewPeerFactory(c Config) (*PeerFactory, error) { if err := c.Validate(); err != nil { return nil, err } me := &webrtc.MediaEngine{} if err := me.RegisterDefaultCodecs(); err != nil { return nil, fmt.Errorf("webrtc: register default codecs: %w", err) } rtcConfig, se, err := BuildICEConfig(c) if err != nil { return nil, err } opts := []func(*webrtc.API){webrtc.WithMediaEngine(me)} if se != nil { opts = append(opts, webrtc.WithSettingEngine(*se)) } api := webrtc.NewAPI(opts...) return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil } // Peer wraps a Pion PeerConnection bound to a Source's subscription. type Peer struct { resourceID string pc *webrtc.PeerConnection answer webrtc.SessionDescription source *Source sub chan *rtp.Packet done chan struct{} once sync.Once } // CreatePeer builds a PeerConnection, sets the remote offer, generates an // answer, attaches video+audio tracks fed from src, and blocks until ICE // gathering completes or ctx expires. func (f *PeerFactory) CreatePeer(ctx context.Context, src *Source, offer webrtc.SessionDescription) (*Peer, error) { pc, err := f.api.NewPeerConnection(f.rtcConfig) if err != nil { return nil, fmt.Errorf("webrtc: new peer connection: %w", err) } videoTrack, err := webrtc.NewTrackLocalStaticRTP( webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "dragonfork") if err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: new video track: %w", err) } audioTrack, err := webrtc.NewTrackLocalStaticRTP( webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "dragonfork") if err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: new audio track: %w", err) } if _, err := pc.AddTrack(videoTrack); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: add video track: %w", err) } if _, err := pc.AddTrack(audioTrack); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: add audio track: %w", err) } if err := pc.SetRemoteDescription(offer); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: set remote: %w", err) } answer, err := pc.CreateAnswer(nil) if err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: create answer: %w", err) } gatherComplete := webrtc.GatheringCompletePromise(pc) if err := pc.SetLocalDescription(answer); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: set local: %w", err) } select { case <-gatherComplete: case <-ctx.Done(): _ = pc.Close() return nil, ErrICETimeout } sub := src.Subscribe(64) p := &Peer{ resourceID: newResourceID(), pc: pc, answer: *pc.LocalDescription(), source: src, sub: sub, done: make(chan struct{}), } pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) { if st == webrtc.PeerConnectionStateFailed || st == webrtc.PeerConnectionStateDisconnected || st == webrtc.PeerConnectionStateClosed { _ = p.Close() } }) go forwardRTP(p.done, sub, videoTrack, audioTrack) return p, nil } // Answer returns the locally-created SDP answer. Valid after CreatePeer. func (p *Peer) Answer() webrtc.SessionDescription { return p.answer } // ResourceID returns the stable resource id used in the WHEP Location header. func (p *Peer) ResourceID() string { return p.resourceID } // Close tears down the peer connection and unsubscribes from the source. // Safe to call multiple times. func (p *Peer) Close() error { var err error p.once.Do(func() { close(p.done) p.source.Unsubscribe(p.sub) err = p.pc.Close() }) return err } func newResourceID() string { b := make([]byte, 8) _, _ = rand.Read(b) return hex.EncodeToString(b) } ``` - [ ] **Step 4: Create `core/webrtc/forward.go` with the RTP forwarder** ```go package webrtc import ( "github.com/pion/rtp" "github.com/pion/webrtc/v4" ) // forwardRTP reads packets from sub and writes them to the correct track // based on payload type (H.264 → video, Opus → audio). Payload-type // inspection is the simplest M1 approach; M2 will switch to per-track // source channels once the process resolver manages separate video/audio // UDP ports. func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet, video, audio *webrtc.TrackLocalStaticRTP) { for { select { case <-done: return case pkt, ok := <-sub: if !ok { return } // Pion default H.264 PT = 102, Opus PT = 111. If the publisher // uses different PTs we'll revisit in M2 — for M1 PoC we // configure FFmpeg to these values explicitly in the publisher // script. switch pkt.PayloadType { case 102: _ = video.WriteRTP(pkt) case 111: _ = audio.WriteRTP(pkt) default: // Unknown PT — drop. Log in M3. } } } } ``` - [ ] **Step 5: Run the tests and verify they pass** Run: ```bash go test ./core/webrtc/ -run TestPeerFactory -v ``` Expected: PASS for `TestPeerFactory_CreateAnswer` and `TestPeerFactory_ClosesCleanly`. - [ ] **Step 6: Commit** ```bash git add core/webrtc/peer.go core/webrtc/peer_test.go core/webrtc/forward.go git commit -m "feat(webrtc): add PeerFactory, Peer, and RTP forwarder" ``` --- ## Task 8: WHEP HTTP handler (happy path only) **Files:** - Create: `core/webrtc/whep.go` - Create: `core/webrtc/whep_test.go` For M1, the WHEP handler supports only `POST /whep/{stream_id}` happy path. Error paths (404/406/503) and DELETE/PATCH come in M3. - [ ] **Step 1: Write the failing test `core/webrtc/whep_test.go`** ```go package webrtc import ( "context" "io" "net/http" "net/http/httptest" "strings" "testing" "time" "github.com/pion/webrtc/v4" ) func TestWHEP_POSTReturns201WithSDP(t *testing.T) { // Set up a Source and register it. src, _ := NewSource("streamA", 0) defer src.Close() src.Start() reg := NewRegistry() _ = reg.Register("streamA", src) factory, _ := NewPeerFactory(DefaultConfig()) handler := NewWHEPHandler(reg, factory, DefaultConfig()) // Build an offer using a throwaway PC. me := &webrtc.MediaEngine{} _ = me.RegisterDefaultCodecs() api := webrtc.NewAPI(webrtc.WithMediaEngine(me)) pc, _ := api.NewPeerConnection(webrtc.Configuration{}) defer pc.Close() _, _ = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}) _, _ = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}) offer, _ := pc.CreateOffer(nil) req := httptest.NewRequest(http.MethodPost, "/whep/streamA", strings.NewReader(offer.SDP)) req.Header.Set("Content-Type", "application/sdp") // Give the handler generous ICE gathering time in tests. ctx, cancel := context.WithTimeout(req.Context(), 10*time.Second) defer cancel() req = req.WithContext(ctx) rr := httptest.NewRecorder() handler.ServeHTTP(rr, req) if rr.Code != http.StatusCreated { body, _ := io.ReadAll(rr.Result().Body) t.Fatalf("status = %d, want 201. body=%s", rr.Code, string(body)) } if ct := rr.Header().Get("Content-Type"); ct != "application/sdp" { t.Errorf("Content-Type = %q, want application/sdp", ct) } if loc := rr.Header().Get("Location"); !strings.HasPrefix(loc, "/whep/streamA/") { t.Errorf("Location = %q, want /whep/streamA/", loc) } if !strings.Contains(rr.Body.String(), "v=0") { t.Errorf("body does not look like SDP: %s", rr.Body.String()) } } ``` - [ ] **Step 2: Run the test and verify it fails** Run: ```bash go test ./core/webrtc/ -run TestWHEP -v ``` Expected: FAIL with "undefined: NewWHEPHandler". - [ ] **Step 3: Write the minimal implementation `core/webrtc/whep.go`** ```go package webrtc import ( "io" "net/http" "strings" "sync" "sync/atomic" "github.com/pion/webrtc/v4" ) // WHEPHandler serves the WebRTC-HTTP Egress Protocol POST. type WHEPHandler struct { registry *Registry factory *PeerFactory config Config mu sync.Mutex peers map[string]*Peer // resourceID → Peer peersCount int64 // atomic, for cap check without lock } // NewWHEPHandler constructs a handler with the given dependencies. func NewWHEPHandler(r *Registry, f *PeerFactory, c Config) *WHEPHandler { return &WHEPHandler{ registry: r, factory: f, config: c, peers: make(map[string]*Peer), } } // ServeHTTP handles POST /whep/{stream_id}. Other methods and paths return 405. func (h *WHEPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.Header().Set("Allow", "POST") http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } // Extract stream_id from path: /whep/{stream_id} streamID := strings.TrimPrefix(r.URL.Path, "/whep/") if streamID == "" || strings.Contains(streamID, "/") { http.Error(w, "invalid stream id", http.StatusBadRequest) return } // Peer cap enforcement (happy path still respects the cap). if atomic.LoadInt64(&h.peersCount) >= int64(h.config.MaxPeersTotal) { http.Error(w, ErrPeerCapReached.Error(), http.StatusServiceUnavailable) return } handle, ok := h.registry.Lookup(streamID) if !ok { http.Error(w, ErrStreamNotFound.Error(), http.StatusNotFound) return } src, ok := handle.(*Source) if !ok { http.Error(w, "registered source is not a *Source", http.StatusInternalServerError) return } body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, "read body: "+err.Error(), http.StatusBadRequest) return } if len(body) == 0 { http.Error(w, ErrInvalidSDP.Error(), http.StatusBadRequest) return } offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} peer, err := h.factory.CreatePeer(r.Context(), src, offer) if err != nil { http.Error(w, "create peer: "+err.Error(), http.StatusInternalServerError) return } h.mu.Lock() h.peers[peer.ResourceID()] = peer h.mu.Unlock() atomic.AddInt64(&h.peersCount, 1) w.Header().Set("Content-Type", "application/sdp") w.Header().Set("Location", "/whep/"+streamID+"/"+peer.ResourceID()) w.WriteHeader(http.StatusCreated) _, _ = io.WriteString(w, peer.Answer().SDP) } ``` - [ ] **Step 4: Run the tests and verify they pass** Run: ```bash go test ./core/webrtc/ -run TestWHEP -v ``` Expected: PASS for `TestWHEP_POSTReturns201WithSDP`. - [ ] **Step 5: Run the full package test suite** Run: ```bash go test ./core/webrtc/... -v -race ``` Expected: ALL PASS. No race warnings. - [ ] **Step 6: Commit** ```bash git add core/webrtc/whep.go core/webrtc/whep_test.go git commit -m "feat(webrtc): add WHEP POST handler (happy path)" ``` --- ## Task 9: Standalone PoC binary **Files:** - Create: `cmd/webrtc-poc/main.go` The PoC binary wires everything together: creates a Source, registers it, starts the WHEP handler, and blocks. M2 replaces this with integration into datarhei Core's normal startup. - [ ] **Step 1: Write `cmd/webrtc-poc/main.go`** ```go // Command webrtc-poc runs a minimal Dragon Fork WebRTC egress server for // manual end-to-end testing. It listens for RTP on 127.0.0.1:10000 as // stream "test" and serves WHEP at :8787. // // This is NOT part of the datarhei Core binary. It will be removed or // demoted to an internal test helper once milestone M2 lands. package main import ( "flag" "log" "net/http" "/core/webrtc" ) func main() { var ( streamID = flag.String("stream", "test", "stream id to serve") rtpPort = flag.Int("rtp-port", 10000, "UDP port to receive RTP on") listen = flag.String("listen", ":8787", "WHEP HTTP listen address") publicIP = flag.String("public-ip", "", "server public IP for NAT1To1 (optional)") ) flag.Parse() cfg := webrtc.DefaultConfig() cfg.WHEPListen = *listen cfg.PublicIP = *publicIP src, err := webrtc.NewSource(*streamID, *rtpPort) if err != nil { log.Fatalf("NewSource: %v", err) } src.Start() defer src.Close() log.Printf("listening for RTP on %s", src.LocalAddr()) reg := webrtc.NewRegistry() if err := reg.Register(*streamID, src); err != nil { log.Fatalf("Register: %v", err) } factory, err := webrtc.NewPeerFactory(cfg) if err != nil { log.Fatalf("NewPeerFactory: %v", err) } handler := webrtc.NewWHEPHandler(reg, factory, cfg) mux := http.NewServeMux() mux.Handle("/whep/", handler) log.Printf("WHEP listening on %s — POST /whep/%s to subscribe", *listen, *streamID) log.Fatal(http.ListenAndServe(*listen, mux)) } ``` - [ ] **Step 2: Substitute the real module path** Open the file and replace `` with the actual module path from your fork's `go.mod` first line. For example, if `go.mod` starts with `module github.com/wilddragon/datarhei-dragonfork-core`, the import becomes: ```go import "github.com/wilddragon/datarhei-dragonfork-core/core/webrtc" ``` Do this once by editing the file — do not commit the placeholder string. - [ ] **Step 3: Build the binary** Run: ```bash go build -o /tmp/webrtc-poc ./cmd/webrtc-poc ``` Expected: binary produced at `/tmp/webrtc-poc`, no build errors. - [ ] **Step 4: Smoke-run it** Run: ```bash /tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787 & sleep 1 curl -sS -X POST -H 'Content-Type: application/sdp' \ --data 'v=0' http://127.0.0.1:8787/whep/test ``` Expected: curl returns a response (will likely be 400 or 500 because the body isn't a real offer — that's fine; the important thing is the server is up and routing the request). The server log shows the POST arrived. Kill the server: ```bash kill %1 wait 2>/dev/null ``` - [ ] **Step 5: Commit** ```bash git add cmd/webrtc-poc/main.go git commit -m "feat(webrtc): add standalone webrtc-poc binary for M1 testing" ``` --- ## Task 10: FFmpeg publisher script **Files:** - Create: `test/publish.sh` A shell script that runs FFmpeg to generate a test pattern and push it as RTP to the PoC binary's port. Uses `testsrc2` with a burned-in timecode (useful later for latency measurement). - [ ] **Step 1: Create `test/publish.sh`** ```bash #!/usr/bin/env bash # test/publish.sh — Dragon Fork M1 publisher # # Pushes an FFmpeg testsrc2 pattern as H.264 + Opus RTP to the webrtc-poc # binary's local UDP port(s). Requires FFmpeg 6.x. set -euo pipefail HOST="${HOST:-127.0.0.1}" VIDEO_PORT="${VIDEO_PORT:-10000}" AUDIO_PORT="${AUDIO_PORT:-10002}" FPS="${FPS:-30}" SIZE="${SIZE:-640x360}" echo "publishing testsrc2 → $HOST:$VIDEO_PORT (video, PT=102)" echo " $HOST:$AUDIO_PORT (audio, PT=111)" exec ffmpeg -hide_banner -re \ -f lavfi -i "testsrc2=size=${SIZE}:rate=${FPS}" \ -f lavfi -i "sine=frequency=440:sample_rate=48000" \ -vf "drawtext=text='%{localtime\\:%H\\\\\\:%M\\\\\\:%S.%3N}':x=10:y=10:fontsize=32:fontcolor=white:box=1:boxcolor=black@0.8" \ -c:v libx264 -preset ultrafast -tune zerolatency \ -profile:v baseline -pix_fmt yuv420p \ -b:v 1500k -maxrate 1500k -bufsize 500k \ -g 60 -keyint_min 60 -x264-params "repeat-headers=1" \ -payload_type 102 -f rtp "rtp://${HOST}:${VIDEO_PORT}?pkt_size=1200" \ -c:a libopus -b:a 128k \ -payload_type 111 -f rtp "rtp://${HOST}:${AUDIO_PORT}?pkt_size=1200" ``` - [ ] **Step 2: Make it executable** Run: ```bash chmod +x test/publish.sh ``` - [ ] **Step 3: Smoke-test it against the PoC binary** In one terminal: ```bash /tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787 ``` In another (will fail until we bind both ports — M1 uses a single source port; the script pushes audio to 10002 but the PoC only listens on 10000 — that's fine for M1 since we're primarily testing video): ```bash ./test/publish.sh ``` Expected: FFmpeg runs and prints RTP packet stats. The PoC binary logs nothing specifically (it's silently reading RTP — we haven't added verbose logging in M1). Let it run 3 seconds, then Ctrl+C both. (In M2 we'll separate video and audio into distinct sources; for M1 the single-port video-only path is enough to prove the pipeline.) - [ ] **Step 4: Commit** ```bash git add test/publish.sh git commit -m "test: add FFmpeg publisher script for M1 PoC" ``` --- ## Task 11: Pion-based test WHEP client **Files:** - Create: `test/whep-client/main.go` - Create: `test/whep-client/main_test.go` A Go binary that acts as a headless WHEP client: POSTs an SDP offer, parses the SDP answer, establishes a `PeerConnection`, and verifies that RTP packets arrive. Essential for automated end-to-end verification without a browser in the loop. - [ ] **Step 1: Create `test/whep-client/main.go`** ```go // Command whep-client subscribes to a Dragon Fork WHEP endpoint and logs // the first N received RTP packets, then exits. Used for M1 end-to-end // verification. package main import ( "bytes" "context" "flag" "fmt" "io" "log" "net/http" "os" "sync/atomic" "time" "github.com/pion/webrtc/v4" ) func main() { var ( whepURL = flag.String("url", "http://127.0.0.1:8787/whep/test", "WHEP endpoint URL") wantPkt = flag.Int("pkts", 30, "exit after receiving this many video RTP packets") timeout = flag.Duration("timeout", 15*time.Second, "overall timeout") ) flag.Parse() if err := run(*whepURL, *wantPkt, *timeout); err != nil { log.Fatalf("whep-client: %v", err) } } func run(whepURL string, wantPkt int, timeout time.Duration) error { me := &webrtc.MediaEngine{} if err := me.RegisterDefaultCodecs(); err != nil { return fmt.Errorf("register codecs: %w", err) } api := webrtc.NewAPI(webrtc.WithMediaEngine(me)) pc, err := api.NewPeerConnection(webrtc.Configuration{}) if err != nil { return fmt.Errorf("new pc: %w", err) } defer pc.Close() if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { return err } if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { return err } var videoCount int64 pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { kind := track.Kind().String() log.Printf("OnTrack: kind=%s codec=%s", kind, track.Codec().MimeType) go func() { buf := make([]byte, 1500) for { n, _, err := track.Read(buf) if err != nil { log.Printf("track.Read (%s): %v", kind, err) return } if kind == "video" { atomic.AddInt64(&videoCount, 1) } _ = n } }() }) offer, err := pc.CreateOffer(nil) if err != nil { return err } gatherComplete := webrtc.GatheringCompletePromise(pc) if err := pc.SetLocalDescription(offer); err != nil { return err } <-gatherComplete offer = *pc.LocalDescription() answerSDP, err := httpPostSDP(whepURL, offer.SDP) if err != nil { return fmt.Errorf("WHEP POST: %w", err) } if err := pc.SetRemoteDescription(webrtc.SessionDescription{ Type: webrtc.SDPTypeAnswer, SDP: answerSDP, }); err != nil { return fmt.Errorf("set remote: %w", err) } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() deadline := time.After(timeout) tick := time.NewTicker(250 * time.Millisecond) defer tick.Stop() for { select { case <-tick.C: if atomic.LoadInt64(&videoCount) >= int64(wantPkt) { log.Printf("OK: received %d video packets", videoCount) fmt.Fprintln(os.Stdout, "PASS") return nil } case <-deadline: return fmt.Errorf("timeout after %s: only %d video packets received", timeout, atomic.LoadInt64(&videoCount)) case <-ctx.Done(): return ctx.Err() } } } func httpPostSDP(url, sdp string) (string, error) { req, err := http.NewRequest(http.MethodPost, url, bytes.NewBufferString(sdp)) if err != nil { return "", err } req.Header.Set("Content-Type", "application/sdp") req.Header.Set("Accept", "application/sdp") resp, err := http.DefaultClient.Do(req) if err != nil { return "", err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return "", err } if resp.StatusCode != http.StatusCreated { return "", fmt.Errorf("status %d: %s", resp.StatusCode, string(body)) } return string(body), nil } ``` - [ ] **Step 2: Create `test/whep-client/main_test.go`** ```go package main import ( "strings" "testing" ) // Placeholder unit test. The real validation happens end-to-end in Task 12. // This at least keeps `go test ./test/whep-client/...` happy in CI later. func TestHTTPPostSDP_RejectsNon2xx(t *testing.T) { _, err := httpPostSDP("http://127.0.0.1:1/whep/none", "v=0\n") if err == nil { t.Fatal("expected error from unreachable endpoint") } if !strings.Contains(err.Error(), "dial") && !strings.Contains(err.Error(), "connect") { t.Errorf("expected dial/connect error, got: %v", err) } } ``` - [ ] **Step 3: Build and test** Run: ```bash go build -o /tmp/whep-client ./test/whep-client go test ./test/whep-client/... -v ``` Expected: binary builds; `TestHTTPPostSDP_RejectsNon2xx` passes. - [ ] **Step 4: Commit** ```bash git add test/whep-client/ git commit -m "test: add Pion-based WHEP client for end-to-end M1 verification" ``` --- ## Task 12: End-to-end PoC verification **Files:** None (runtime verification only) This is the moment of truth — FFmpeg → webrtc-poc → whep-client, all the way through, with the whep-client confirming it decoded video. - [ ] **Step 1: Open three terminals** Terminal A — the PoC server. Terminal B — the FFmpeg publisher. Terminal C — the test WHEP client. - [ ] **Step 2: Start the PoC server (Terminal A)** Run: ```bash /tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787 ``` Expected: server logs `listening for RTP on 127.0.0.1:10000` and `WHEP listening on :8787 — POST /whep/test to subscribe`. Leave running. - [ ] **Step 3: Start the FFmpeg publisher (Terminal B)** Run: ```bash ./test/publish.sh ``` Expected: FFmpeg begins producing frames and reporting throughput (something like `frame= 30 fps=30 q=...`). Leave running. - [ ] **Step 4: Run the WHEP test client (Terminal C)** Run: ```bash /tmp/whep-client -url http://127.0.0.1:8787/whep/test -pkts 30 -timeout 15s ``` Expected output within a few seconds: ``` OnTrack: kind=video codec=video/H264 OK: received 30 video packets PASS ``` The client exits 0. **This is M1's success criterion.** - [ ] **Step 5: If it fails, systematic debug** If the client times out: 1. Confirm the FFmpeg publisher is actually producing RTP: run `tcpdump -i lo -n udp port 10000` in a fourth terminal — you should see packets. If not, the publisher is broken; re-check payload types and destination port in `publish.sh`. 2. Confirm the PoC server is receiving RTP: temporarily add a log line in `source.go` readLoop (`log.Printf("rtp seq=%d", pkt.SequenceNumber)`) and rebuild. If nothing logs, the UDP bind is wrong. 3. Confirm the PeerConnection is establishing: check the WHEP POST response was 201; check the client's `pc.ConnectionState()` transitions through `connecting` to `connected`. 4. Confirm codec payload types match: the publish script uses PT 102 for H.264 and 111 for Opus; Pion's default H.264 PT is 102 and Opus is 111. If you see RTP arriving in the reader but `forwardRTP` drops them, the PTs don't match — add a log line in `forward.go` to confirm. Log each issue and fix in `NOTES.md` as you go — those are exactly the gotchas we want recorded for M2+. - [ ] **Step 6: Tear down** Ctrl+C in all three terminals. - [ ] **Step 7: Document M1 success in NOTES.md** Append to `NOTES.md`: ```markdown ## M1 PoC verified — FFmpeg (testsrc2 + drawtext timecode, H.264 baseline, Opus) → webrtc-poc → Pion WHEP client. - Video PT: 102 / Audio PT: 111 (matched between publisher and client) - ICE gathering completed within ~s - First video packet received ~s after POST (bounded by 2s GOP once we add forced keyframes in M2) - Ready for M2 (datarhei process-model integration). ``` Run: ```bash git add NOTES.md git commit -m "docs: record M1 PoC success and observations" ``` - [ ] **Step 8: Push the branch** Run: ```bash git push -u origin m1-webrtc-poc ``` Expected: branch pushed to your fork's remote. M1 complete. --- ## Exit Criteria M1 is done when **all of the following are true**: 1. `go build ./...` succeeds on the fork. 2. `go test ./core/webrtc/... -race` passes — all unit tests green, no race warnings. 3. `test/whep-client/main.go` prints `PASS` against a running `webrtc-poc` + `test/publish.sh` pair within 15 seconds. 4. `NOTES.md` records the verification run and any gotchas encountered. 5. Branch `m1-webrtc-poc` is pushed to the fork remote. --- ## What comes next - **M2 plan** will be written after M1 is verified. It covers: adding the `webrtc://` URL scheme to datarhei Core's output resolver, wiring the Registry and WHEPHandler into Core's normal startup, separating video/audio onto distinct UDP ports, and hooking into the process lifecycle so sources register/deregister automatically with the existing FFmpeg processes datarhei already manages. - **M3 plan** will cover multi-viewer robustness: DELETE/PATCH WHEP methods, error-path HTTP codes (404/406/503), the admin API endpoints, and PLI absorption + RTCP BYE on teardown. - **M4 plan** will cover CI integration: running the end-to-end harness from Task 12 in CI, the pixel-sampling latency measurement, and the p95 gates. - **M5 plan** will cover branding and release: logo swap in the Restreamer Vue UI, README rewrite with upstream attribution, `NOTICE`/`CREDITS`, Docker image publishing, and tagging `v0.1.0-dragonfork`. Each follow-on plan is small enough (~3–7 days of work) that writing it after M1 lets us incorporate lessons learned without re-planning from the top.