datarhei-dragonfork-core/docs/design/2026-04-16-datarhei-dragon-fork-m1-webrtc-poc.md

2082 lines
54 KiB
Markdown
Raw Normal View History

# Datarhei - Dragon Fork M1: Media-Path PoC Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Prove a working end-to-end WebRTC egress path: an FFmpeg publisher pushes RTP into a new Go package, which serves it to a Pion-based WHEP client that successfully decodes video frames.
**Architecture:** New standalone Go package `core/webrtc` inside the datarhei Core fork. FFmpeg produces RTP on a local UDP socket → package reads RTP → WHEP HTTP endpoint serves it via Pion `PeerConnection` → test client subscribes and decodes. No datarhei process-model integration yet (that's M2). This milestone answers: *does the integration pattern actually work, and what are the gotchas?*
**Tech Stack:** Go 1.22+, [Pion WebRTC v4](https://github.com/pion/webrtc) (`github.com/pion/webrtc/v4`), [Pion RTP](https://github.com/pion/rtp), FFmpeg 6.x (publisher + test pattern), standard library `net/http`.
**Out of scope for this plan:** datarhei process-model integration (M2), multi-viewer fan-out polish (M3), CI test harness (M4), branding (M5). Separate plans will be written for each after M1 completes.
---
## Prerequisites
- Go 1.22+ installed locally
- `git` configured
- FFmpeg 6.x on PATH (`ffmpeg -version` reports 6.0 or newer)
- A Git host account (GitHub recommended for initial fork)
- Linux or macOS development machine (Windows works but UDP port behavior differs; document what you're on when filing any issues)
---
## File Structure
Files created in this milestone:
```
core/webrtc/
config.go # Config struct + defaults + Validate()
registry.go # stream_id → *Source map, thread-safe
registry_test.go
source.go # RTP UDP reader + fan-out ring buffer
source_test.go
peer.go # PeerConnection factory, track attachment
peer_test.go
whep.go # HTTP handler: POST /whep/{stream_id}
whep_test.go
ice.go # SettingEngine builder (NAT1To1, ICE servers)
ice_test.go
errors.go # Typed error values (ErrStreamNotFound, etc.)
cmd/webrtc-poc/
main.go # Standalone PoC binary (NOT datarhei Core yet)
test/
publish.sh # FFmpeg publisher script (testsrc2 → local RTP)
whep-client/
main.go # Pion-based test WHEP subscriber
main_test.go
docs/design/
(copy of the approved spec from brainstorming)
```
Total new Go files: 11 source + 5 test = 16 files. Total lines: ~1200-1500 including tests and comments.
---
## Task 0: Fork the repo and set up the workspace
**Files:**
- Create: new fork of `datarhei/core`
**Rationale:** Everything else depends on having a fork to commit into. No code yet — just repo setup.
- [ ] **Step 1: Fork datarhei/core on your Git host**
On GitHub: navigate to https://github.com/datarhei/core, click Fork, name the fork `datarhei-dragonfork-core` under your `wilddragon` org (or personal account).
- [ ] **Step 2: Clone the fork locally**
Run:
```bash
git clone git@github.com:wilddragon/datarhei-dragonfork-core.git
cd datarhei-dragonfork-core
```
Expected: repo cloned, `git status` clean on `main` branch.
- [ ] **Step 3: Create the M1 working branch**
Run:
```bash
git checkout -b m1-webrtc-poc
```
Expected: branch created and checked out.
- [ ] **Step 4: Copy the approved design spec into the repo**
```bash
mkdir -p docs/design
cp /path/to/2026-04-16-datarhei-dragon-fork-webrtc-design.md docs/design/
```
(Path will be wherever you saved the spec from the brainstorming session.)
- [ ] **Step 5: Verify the repo builds unchanged**
Run:
```bash
go build ./...
```
Expected: build succeeds. If it fails, the fork is broken before you started — stop and fix upstream issues first.
- [ ] **Step 6: Run upstream tests**
Run:
```bash
go test ./...
```
Expected: all tests pass (or at least match what upstream CI shows green). Document any pre-existing flakes in a NOTES.md file so you don't later blame your changes for them.
- [ ] **Step 7: Commit the spec and a NOTES.md baseline**
```bash
git add docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md NOTES.md
git commit -m "docs: add Dragon Fork WebRTC egress design spec"
```
---
## Task 1: Add Pion WebRTC dependency
**Files:**
- Modify: `go.mod`
- Modify: `go.sum`
- [ ] **Step 1: Add Pion dependencies**
Run from repo root:
```bash
go get github.com/pion/webrtc/v4@latest
go get github.com/pion/rtp@latest
go get github.com/pion/rtcp@latest
```
Expected: `go.mod` updated with new `require` entries; `go.sum` updated.
- [ ] **Step 2: Tidy dependencies**
Run:
```bash
go mod tidy
```
Expected: no errors. `go.mod` stable.
- [ ] **Step 3: Sanity-check that Pion loads**
Create a throwaway file `/tmp/pion_smoke.go`:
```go
package main
import (
"fmt"
"github.com/pion/webrtc/v4"
)
func main() {
api := webrtc.NewAPI()
pc, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
panic(err)
}
fmt.Println("Pion OK, state:", pc.ConnectionState())
_ = pc.Close()
}
```
Run:
```bash
go run /tmp/pion_smoke.go
```
Expected output: `Pion OK, state: new` (or similar). Delete the file after.
- [ ] **Step 4: Commit**
```bash
git add go.mod go.sum
git commit -m "build: add Pion WebRTC v4 dependency"
```
---
## Task 2: Create the core/webrtc package skeleton + typed errors
**Files:**
- Create: `core/webrtc/errors.go`
- Create: `core/webrtc/doc.go`
- [ ] **Step 1: Create `core/webrtc/doc.go`**
```go
// Package webrtc implements the Dragon Fork WebRTC egress module.
//
// It exposes a WHEP (WebRTC-HTTP Egress Protocol) HTTP endpoint and serves
// live RTP produced by an FFmpeg process on a local UDP socket to one or
// more WebRTC peer connections built with Pion.
//
// This package is additive: it does not modify existing datarhei ingest,
// transcode, or non-WebRTC output code paths. The only contact with
// existing code is a new URL scheme ("webrtc://") registered with the
// output resolver (done in milestone M2, not here).
package webrtc
```
- [ ] **Step 2: Create `core/webrtc/errors.go`**
```go
package webrtc
import "errors"
// Sentinel errors returned by package functions.
var (
// ErrStreamNotFound indicates a WHEP subscribe referenced a stream_id
// that has no registered source. Maps to HTTP 404.
ErrStreamNotFound = errors.New("webrtc: stream not found")
// ErrPeerCapReached indicates max_peers_total has been exceeded.
// Maps to HTTP 503.
ErrPeerCapReached = errors.New("webrtc: peer capacity reached")
// ErrCodecMismatch indicates the viewer's SDP offer does not include
// a codec the source can serve (expected H.264 + Opus). Maps to HTTP 406.
ErrCodecMismatch = errors.New("webrtc: codec mismatch")
// ErrInvalidSDP indicates the request body was not a parseable SDP offer.
// Maps to HTTP 400.
ErrInvalidSDP = errors.New("webrtc: invalid SDP")
// ErrICETimeout indicates ICE gathering did not complete within the
// configured timeout. Maps to HTTP 500.
ErrICETimeout = errors.New("webrtc: ICE gathering timeout")
)
```
- [ ] **Step 3: Verify the package compiles**
Run:
```bash
go build ./core/webrtc/...
```
Expected: no output (successful build).
- [ ] **Step 4: Commit**
```bash
git add core/webrtc/doc.go core/webrtc/errors.go
git commit -m "feat(webrtc): add package skeleton and typed errors"
```
---
## Task 3: Config struct
**Files:**
- Create: `core/webrtc/config.go`
- Create: `core/webrtc/config_test.go`
- [ ] **Step 1: Write the failing test `core/webrtc/config_test.go`**
```go
package webrtc
import (
"testing"
)
func TestConfig_Defaults(t *testing.T) {
c := DefaultConfig()
if !c.Enabled {
t.Error("default Enabled should be true")
}
if c.WHEPListen != ":8787" {
t.Errorf("default WHEPListen = %q, want :8787", c.WHEPListen)
}
if c.UDPPortRange.Low != 10000 || c.UDPPortRange.High != 10100 {
t.Errorf("default UDPPortRange = %v, want 10000-10100", c.UDPPortRange)
}
if c.MaxPeersTotal != 32 {
t.Errorf("default MaxPeersTotal = %d, want 32", c.MaxPeersTotal)
}
if len(c.ICEServers) == 0 {
t.Error("default ICEServers should have at least one STUN entry")
}
}
func TestConfig_Validate(t *testing.T) {
tests := []struct {
name string
mutate func(*Config)
wantErr bool
}{
{"defaults are valid", func(c *Config) {}, false},
{"empty listen", func(c *Config) { c.WHEPListen = "" }, true},
{"inverted port range", func(c *Config) { c.UDPPortRange.Low = 20000; c.UDPPortRange.High = 10000 }, true},
{"zero max peers", func(c *Config) { c.MaxPeersTotal = 0 }, true},
{"negative max peers", func(c *Config) { c.MaxPeersTotal = -1 }, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := DefaultConfig()
tt.mutate(&c)
err := c.Validate()
if (err != nil) != tt.wantErr {
t.Errorf("Validate() err = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
```
- [ ] **Step 2: Run the test and verify it fails**
Run:
```bash
go test ./core/webrtc/ -run TestConfig -v
```
Expected: FAIL with "undefined: Config" / "undefined: DefaultConfig".
- [ ] **Step 3: Write the minimal implementation `core/webrtc/config.go`**
```go
package webrtc
import "fmt"
// PortRange represents an inclusive UDP port range.
type PortRange struct {
Low, High int
}
// Config controls the WebRTC egress module.
type Config struct {
// Enabled toggles the entire module. When false, no endpoints are served.
Enabled bool
// WHEPListen is the address the WHEP HTTP endpoint binds to (e.g. ":8787").
WHEPListen string
// PublicIP is the server's externally-reachable IP, advertised in ICE
// candidates via NAT1To1. Empty means rely on STUN discovery.
PublicIP string
// UDPPortRange bounds the local UDP ports allocated for FFmpeg→Pion RTP.
UDPPortRange PortRange
// ICEServers is the list of STUN/TURN URIs given to each PeerConnection.
ICEServers []string
// MaxPeersTotal is a hard safety cap on concurrent subscribers.
MaxPeersTotal int
}
// DefaultConfig returns production-reasonable defaults.
func DefaultConfig() Config {
return Config{
Enabled: true,
WHEPListen: ":8787",
PublicIP: "",
UDPPortRange: PortRange{Low: 10000, High: 10100},
ICEServers: []string{"stun:stun.cloudflare.com:3478", "stun:stun.l.google.com:19302"},
MaxPeersTotal: 32,
}
}
// Validate returns an error if the config is internally inconsistent.
func (c Config) Validate() error {
if c.WHEPListen == "" {
return fmt.Errorf("webrtc: WHEPListen must not be empty")
}
if c.UDPPortRange.Low <= 0 || c.UDPPortRange.High <= 0 {
return fmt.Errorf("webrtc: UDPPortRange must have positive bounds, got %v", c.UDPPortRange)
}
if c.UDPPortRange.Low > c.UDPPortRange.High {
return fmt.Errorf("webrtc: UDPPortRange.Low > High (%d > %d)", c.UDPPortRange.Low, c.UDPPortRange.High)
}
if c.MaxPeersTotal <= 0 {
return fmt.Errorf("webrtc: MaxPeersTotal must be positive, got %d", c.MaxPeersTotal)
}
return nil
}
```
- [ ] **Step 4: Run the tests and verify they pass**
Run:
```bash
go test ./core/webrtc/ -run TestConfig -v
```
Expected: PASS for both `TestConfig_Defaults` and `TestConfig_Validate` (all subtests).
- [ ] **Step 5: Commit**
```bash
git add core/webrtc/config.go core/webrtc/config_test.go
git commit -m "feat(webrtc): add Config with defaults and validation"
```
---
## Task 4: Registry — stream_id → Source mapping
**Files:**
- Create: `core/webrtc/registry.go`
- Create: `core/webrtc/registry_test.go`
A **Source** (defined in the next task) represents a live RTP stream that peers can subscribe to. The **Registry** is the thread-safe map that maps stream IDs to active sources. Writing this first because Source depends on it less than it does on Source.
- [ ] **Step 1: Write the failing test `core/webrtc/registry_test.go`**
```go
package webrtc
import (
"sync"
"testing"
)
// mockSource implements the minimum Source-like shape needed by the registry.
// The real Source type is defined in Task 5; the registry only needs a
// stable type to store and retrieve.
type mockSource struct {
id string
}
func (m *mockSource) ID() string { return m.id }
func TestRegistry_RegisterAndLookup(t *testing.T) {
r := NewRegistry()
src := &mockSource{id: "streamA"}
if err := r.Register("streamA", src); err != nil {
t.Fatalf("Register returned error: %v", err)
}
got, ok := r.Lookup("streamA")
if !ok {
t.Fatal("Lookup(streamA) returned ok=false, want true")
}
if got != src {
t.Errorf("Lookup returned %v, want %v", got, src)
}
}
func TestRegistry_LookupMissing(t *testing.T) {
r := NewRegistry()
_, ok := r.Lookup("nope")
if ok {
t.Error("Lookup on empty registry returned ok=true, want false")
}
}
func TestRegistry_DuplicateRegister(t *testing.T) {
r := NewRegistry()
_ = r.Register("streamA", &mockSource{id: "streamA"})
if err := r.Register("streamA", &mockSource{id: "streamA"}); err == nil {
t.Error("duplicate Register should return error, got nil")
}
}
func TestRegistry_Deregister(t *testing.T) {
r := NewRegistry()
_ = r.Register("streamA", &mockSource{id: "streamA"})
r.Deregister("streamA")
if _, ok := r.Lookup("streamA"); ok {
t.Error("after Deregister, Lookup should return ok=false")
}
}
func TestRegistry_ConcurrentAccess(t *testing.T) {
r := NewRegistry()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(3)
id := string(rune('a' + (i % 26)))
go func() { defer wg.Done(); _ = r.Register(id, &mockSource{id: id}) }()
go func() { defer wg.Done(); _, _ = r.Lookup(id) }()
go func() { defer wg.Done(); r.Deregister(id) }()
}
wg.Wait()
// No assertion — test passes if -race doesn't flag anything.
}
```
- [ ] **Step 2: Run the test and verify it fails**
Run:
```bash
go test ./core/webrtc/ -run TestRegistry -v
```
Expected: FAIL with "undefined: NewRegistry".
- [ ] **Step 3: Write the minimal implementation `core/webrtc/registry.go`**
```go
package webrtc
import (
"fmt"
"sync"
)
// SourceHandle is the minimal interface the Registry stores per stream_id.
// The concrete type is *Source, defined in source.go.
type SourceHandle interface {
ID() string
}
// Registry is a thread-safe map from stream_id to active SourceHandle.
type Registry struct {
mu sync.RWMutex
streams map[string]SourceHandle
}
// NewRegistry returns an empty Registry.
func NewRegistry() *Registry {
return &Registry{streams: make(map[string]SourceHandle)}
}
// Register associates src with streamID. Returns an error if streamID is
// already registered.
func (r *Registry) Register(streamID string, src SourceHandle) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.streams[streamID]; exists {
return fmt.Errorf("webrtc: stream %q already registered", streamID)
}
r.streams[streamID] = src
return nil
}
// Lookup returns the handle for streamID. The second return value is false
// if no source is registered.
func (r *Registry) Lookup(streamID string) (SourceHandle, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
src, ok := r.streams[streamID]
return src, ok
}
// Deregister removes streamID. No-op if not present.
func (r *Registry) Deregister(streamID string) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.streams, streamID)
}
```
- [ ] **Step 4: Run the tests and verify they pass**
Run:
```bash
go test ./core/webrtc/ -run TestRegistry -v -race
```
Expected: PASS for all TestRegistry subtests, no data races.
- [ ] **Step 5: Commit**
```bash
git add core/webrtc/registry.go core/webrtc/registry_test.go
git commit -m "feat(webrtc): add thread-safe Registry for stream_id → SourceHandle"
```
---
## Task 5: Source — RTP UDP reader + subscriber fan-out
**Files:**
- Create: `core/webrtc/source.go`
- Create: `core/webrtc/source_test.go`
A Source owns a UDP socket bound to a local port, reads RTP packets, and forwards them to every subscribed peer's video/audio track. For M1, we deliberately keep the fan-out simple (per-subscriber goroutine writing to a buffered channel) because at 15 viewers the naive model is entirely sufficient.
- [ ] **Step 1: Write the failing test `core/webrtc/source_test.go`**
```go
package webrtc
import (
"net"
"testing"
"time"
"github.com/pion/rtp"
)
func TestSource_ID(t *testing.T) {
s, err := NewSource("streamA", 0) // 0 = ephemeral port
if err != nil {
t.Fatalf("NewSource: %v", err)
}
defer s.Close()
if s.ID() != "streamA" {
t.Errorf("ID() = %q, want streamA", s.ID())
}
}
func TestSource_ReceiveAndFanout(t *testing.T) {
s, err := NewSource("streamA", 0)
if err != nil {
t.Fatalf("NewSource: %v", err)
}
defer s.Close()
// Subscribe before sending.
sub := s.Subscribe(16) // buffer depth 16
defer s.Unsubscribe(sub)
s.Start()
// Build and send a minimal RTP packet to the source's UDP port.
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SequenceNumber: 1,
Timestamp: 1000,
SSRC: 0xDEADBEEF,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}
raw, err := pkt.Marshal()
if err != nil {
t.Fatalf("pkt.Marshal: %v", err)
}
conn, err := net.Dial("udp", s.LocalAddr().String())
if err != nil {
t.Fatalf("net.Dial: %v", err)
}
defer conn.Close()
if _, err := conn.Write(raw); err != nil {
t.Fatalf("conn.Write: %v", err)
}
select {
case got := <-sub:
if got.SSRC != 0xDEADBEEF {
t.Errorf("received SSRC = %x, want DEADBEEF", got.SSRC)
}
if got.SequenceNumber != 1 {
t.Errorf("received SeqNum = %d, want 1", got.SequenceNumber)
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for RTP packet on subscriber channel")
}
}
func TestSource_MultipleSubscribers(t *testing.T) {
s, err := NewSource("streamA", 0)
if err != nil {
t.Fatalf("NewSource: %v", err)
}
defer s.Close()
subs := []chan *rtp.Packet{
s.Subscribe(8),
s.Subscribe(8),
s.Subscribe(8),
}
for _, sub := range subs {
defer s.Unsubscribe(sub)
}
s.Start()
raw, _ := (&rtp.Packet{
Header: rtp.Header{Version: 2, PayloadType: 96, SequenceNumber: 42, SSRC: 1},
Payload: []byte{0xAA},
}).Marshal()
conn, _ := net.Dial("udp", s.LocalAddr().String())
defer conn.Close()
_, _ = conn.Write(raw)
for i, sub := range subs {
select {
case got := <-sub:
if got.SequenceNumber != 42 {
t.Errorf("sub %d got seq %d, want 42", i, got.SequenceNumber)
}
case <-time.After(2 * time.Second):
t.Errorf("sub %d timed out", i)
}
}
}
func TestSource_UnsubscribeStopsDelivery(t *testing.T) {
s, _ := NewSource("streamA", 0)
defer s.Close()
sub := s.Subscribe(8)
s.Start()
s.Unsubscribe(sub)
// After Unsubscribe, the channel should be closed.
select {
case _, ok := <-sub:
if ok {
t.Error("expected channel closed after Unsubscribe, got value")
}
case <-time.After(500 * time.Millisecond):
t.Error("timed out waiting for channel close")
}
}
```
- [ ] **Step 2: Run the test and verify it fails**
Run:
```bash
go test ./core/webrtc/ -run TestSource -v
```
Expected: FAIL with "undefined: NewSource".
- [ ] **Step 3: Write the minimal implementation `core/webrtc/source.go`**
```go
package webrtc
import (
"fmt"
"net"
"sync"
"github.com/pion/rtp"
)
// Source reads RTP packets from a local UDP socket and fans them out to
// subscribed peers via per-subscriber buffered channels.
type Source struct {
id string
conn *net.UDPConn
mu sync.Mutex
subscribers map[chan *rtp.Packet]struct{}
started bool
closed bool
done chan struct{}
}
// NewSource binds a UDP socket on 127.0.0.1:port. Pass port=0 to let the OS
// assign an ephemeral port (useful for tests).
func NewSource(streamID string, port int) (*Source, error) {
addr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: port}
conn, err := net.ListenUDP("udp4", addr)
if err != nil {
return nil, fmt.Errorf("webrtc: listen udp: %w", err)
}
return &Source{
id: streamID,
conn: conn,
subscribers: make(map[chan *rtp.Packet]struct{}),
done: make(chan struct{}),
}, nil
}
// ID returns the registered stream identifier.
func (s *Source) ID() string { return s.id }
// LocalAddr returns the UDP address the source is listening on.
func (s *Source) LocalAddr() *net.UDPAddr {
return s.conn.LocalAddr().(*net.UDPAddr)
}
// Subscribe returns a new buffered channel that receives every RTP packet
// read from the UDP socket. bufDepth is the channel buffer size; when full,
// packets are dropped (preventing a slow subscriber from back-pressuring
// the reader).
func (s *Source) Subscribe(bufDepth int) chan *rtp.Packet {
ch := make(chan *rtp.Packet, bufDepth)
s.mu.Lock()
s.subscribers[ch] = struct{}{}
s.mu.Unlock()
return ch
}
// Unsubscribe removes ch from the subscriber set and closes it.
func (s *Source) Unsubscribe(ch chan *rtp.Packet) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.subscribers[ch]; ok {
delete(s.subscribers, ch)
close(ch)
}
}
// Start begins the RTP reader goroutine. Safe to call once; subsequent calls
// are no-ops.
func (s *Source) Start() {
s.mu.Lock()
if s.started || s.closed {
s.mu.Unlock()
return
}
s.started = true
s.mu.Unlock()
go s.readLoop()
}
func (s *Source) readLoop() {
buf := make([]byte, 1500) // MTU-sized; RTP over UDP should fit
for {
select {
case <-s.done:
return
default:
}
n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
// Socket closed or error — exit the loop.
return
}
pkt := &rtp.Packet{}
if err := pkt.Unmarshal(buf[:n]); err != nil {
// Malformed packet; skip without crashing.
continue
}
s.mu.Lock()
for ch := range s.subscribers {
select {
case ch <- pkt:
default:
// Subscriber full — drop to protect the reader.
}
}
s.mu.Unlock()
}
}
// Close stops the reader goroutine, closes the UDP socket, and closes every
// subscriber channel.
func (s *Source) Close() error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return nil
}
s.closed = true
close(s.done)
for ch := range s.subscribers {
delete(s.subscribers, ch)
close(ch)
}
s.mu.Unlock()
return s.conn.Close()
}
```
- [ ] **Step 4: Run the tests and verify they pass**
Run:
```bash
go test ./core/webrtc/ -run TestSource -v -race
```
Expected: PASS for all TestSource subtests, no data races.
- [ ] **Step 5: Commit**
```bash
git add core/webrtc/source.go core/webrtc/source_test.go
git commit -m "feat(webrtc): add Source with UDP RTP reader and subscriber fan-out"
```
---
## Task 6: ICE config helper (SettingEngine builder)
**Files:**
- Create: `core/webrtc/ice.go`
- Create: `core/webrtc/ice_test.go`
Isolated helper that translates our `Config` into Pion's `SettingEngine` + `Configuration` pair. Keeping it separate makes peer.go simpler and the ICE config trivially testable.
- [ ] **Step 1: Write the failing test `core/webrtc/ice_test.go`**
```go
package webrtc
import (
"testing"
"github.com/pion/webrtc/v4"
)
func TestBuildICEConfig_Defaults(t *testing.T) {
c := DefaultConfig()
rtcConfig, _, err := BuildICEConfig(c)
if err != nil {
t.Fatalf("BuildICEConfig: %v", err)
}
if len(rtcConfig.ICEServers) == 0 {
t.Error("ICEServers should not be empty")
}
// First default is Cloudflare STUN.
if rtcConfig.ICEServers[0].URLs[0] != "stun:stun.cloudflare.com:3478" {
t.Errorf("first ICE server = %q, want stun:stun.cloudflare.com:3478",
rtcConfig.ICEServers[0].URLs[0])
}
}
func TestBuildICEConfig_PublicIP(t *testing.T) {
c := DefaultConfig()
c.PublicIP = "203.0.113.10"
_, se, err := BuildICEConfig(c)
if err != nil {
t.Fatalf("BuildICEConfig: %v", err)
}
if se == nil {
t.Fatal("SettingEngine should not be nil when PublicIP is set")
}
// We can't introspect NAT1To1IPs directly from Pion's public API; the
// smoke test is that building an API from this engine works.
api := webrtc.NewAPI(webrtc.WithSettingEngine(*se))
if api == nil {
t.Fatal("NewAPI returned nil")
}
}
func TestBuildICEConfig_InvalidConfig(t *testing.T) {
c := DefaultConfig()
c.WHEPListen = ""
_, _, err := BuildICEConfig(c)
if err == nil {
t.Error("BuildICEConfig should reject invalid config")
}
}
```
- [ ] **Step 2: Run the test and verify it fails**
Run:
```bash
go test ./core/webrtc/ -run TestBuildICEConfig -v
```
Expected: FAIL with "undefined: BuildICEConfig".
- [ ] **Step 3: Write the minimal implementation `core/webrtc/ice.go`**
```go
package webrtc
import (
"github.com/pion/webrtc/v4"
)
// BuildICEConfig translates a Config into the two Pion config pieces every
// PeerConnection needs: a webrtc.Configuration (with ICE servers) and a
// SettingEngine (with NAT1To1 and port range tuning).
//
// The returned *SettingEngine may be nil if no engine-level tuning is
// required (i.e. PublicIP unset and UDPPortRange at defaults). Callers
// should only pass it to webrtc.NewAPI when non-nil.
func BuildICEConfig(c Config) (webrtc.Configuration, *webrtc.SettingEngine, error) {
if err := c.Validate(); err != nil {
return webrtc.Configuration{}, nil, err
}
rtcConfig := webrtc.Configuration{
ICEServers: make([]webrtc.ICEServer, 0, len(c.ICEServers)),
}
for _, uri := range c.ICEServers {
rtcConfig.ICEServers = append(rtcConfig.ICEServers, webrtc.ICEServer{
URLs: []string{uri},
})
}
var se *webrtc.SettingEngine
if c.PublicIP != "" || c.UDPPortRange.Low > 0 {
engine := webrtc.SettingEngine{}
if c.PublicIP != "" {
engine.SetNAT1To1IPs([]string{c.PublicIP}, webrtc.ICECandidateTypeHost)
}
// Constrain the ephemeral UDP range Pion allocates for ICE candidates.
// Note: this is a separate concern from our FFmpeg→Source UDP ports;
// Pion uses its own port pool for the WebRTC media path.
if c.UDPPortRange.Low > 0 && c.UDPPortRange.High >= c.UDPPortRange.Low {
if err := engine.SetEphemeralUDPPortRange(
uint16(c.UDPPortRange.Low), uint16(c.UDPPortRange.High)); err != nil {
return webrtc.Configuration{}, nil, err
}
}
se = &engine
}
return rtcConfig, se, nil
}
```
- [ ] **Step 4: Run the tests and verify they pass**
Run:
```bash
go test ./core/webrtc/ -run TestBuildICEConfig -v
```
Expected: PASS for all TestBuildICEConfig subtests.
- [ ] **Step 5: Commit**
```bash
git add core/webrtc/ice.go core/webrtc/ice_test.go
git commit -m "feat(webrtc): add ICE config helper (Configuration + SettingEngine)"
```
---
## Task 7: Peer — PeerConnection factory + track attachment
**Files:**
- Create: `core/webrtc/peer.go`
- Create: `core/webrtc/peer_test.go`
Creates a Pion `PeerConnection`, adds video (H.264) and audio (Opus) `TrackLocalStaticRTP`, negotiates SDP, and starts a goroutine that forwards packets from a Source subscription into those tracks.
- [ ] **Step 1: Write the failing test `core/webrtc/peer_test.go`**
```go
package webrtc
import (
"context"
"testing"
"time"
"github.com/pion/webrtc/v4"
)
// minimalOfferSDP returns an SDP offer that advertises H.264 (video) and
// Opus (audio) as recvonly — the minimum a WHEP client sends.
func minimalOfferSDP(t *testing.T) webrtc.SessionDescription {
t.Helper()
// Create a throwaway PC to generate a valid offer.
me := &webrtc.MediaEngine{}
if err := me.RegisterDefaultCodecs(); err != nil {
t.Fatalf("RegisterDefaultCodecs: %v", err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
pc, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
t.Fatalf("NewPeerConnection: %v", err)
}
defer pc.Close()
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
t.Fatalf("AddTransceiver video: %v", err)
}
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
t.Fatalf("AddTransceiver audio: %v", err)
}
offer, err := pc.CreateOffer(nil)
if err != nil {
t.Fatalf("CreateOffer: %v", err)
}
return offer
}
func TestPeerFactory_CreateAnswer(t *testing.T) {
src, err := NewSource("streamA", 0)
if err != nil {
t.Fatalf("NewSource: %v", err)
}
defer src.Close()
src.Start()
cfg := DefaultConfig()
factory, err := NewPeerFactory(cfg)
if err != nil {
t.Fatalf("NewPeerFactory: %v", err)
}
offer := minimalOfferSDP(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
peer, err := factory.CreatePeer(ctx, src, offer)
if err != nil {
t.Fatalf("CreatePeer: %v", err)
}
defer peer.Close()
if peer.Answer().Type != webrtc.SDPTypeAnswer {
t.Errorf("Answer().Type = %v, want answer", peer.Answer().Type)
}
if peer.ResourceID() == "" {
t.Error("ResourceID should be non-empty")
}
}
func TestPeerFactory_ClosesCleanly(t *testing.T) {
src, _ := NewSource("streamA", 0)
defer src.Close()
src.Start()
factory, _ := NewPeerFactory(DefaultConfig())
offer := minimalOfferSDP(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
peer, err := factory.CreatePeer(ctx, src, offer)
if err != nil {
t.Fatalf("CreatePeer: %v", err)
}
if err := peer.Close(); err != nil {
t.Errorf("Close: %v", err)
}
// Second close should be a no-op, not panic.
if err := peer.Close(); err != nil {
t.Errorf("second Close: %v", err)
}
}
```
- [ ] **Step 2: Run the test and verify it fails**
Run:
```bash
go test ./core/webrtc/ -run TestPeerFactory -v
```
Expected: FAIL with "undefined: NewPeerFactory".
- [ ] **Step 3: Write the minimal implementation `core/webrtc/peer.go`**
```go
package webrtc
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"sync"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
)
// PeerFactory builds PeerConnections from a shared Pion API instance
// configured from Config.
type PeerFactory struct {
api *webrtc.API
rtcConfig webrtc.Configuration
}
// NewPeerFactory initializes a Pion API with the codec set we support
// (H.264 + Opus) and applies the provided Config.
func NewPeerFactory(c Config) (*PeerFactory, error) {
if err := c.Validate(); err != nil {
return nil, err
}
me := &webrtc.MediaEngine{}
if err := me.RegisterDefaultCodecs(); err != nil {
return nil, fmt.Errorf("webrtc: register default codecs: %w", err)
}
rtcConfig, se, err := BuildICEConfig(c)
if err != nil {
return nil, err
}
opts := []func(*webrtc.API){webrtc.WithMediaEngine(me)}
if se != nil {
opts = append(opts, webrtc.WithSettingEngine(*se))
}
api := webrtc.NewAPI(opts...)
return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil
}
// Peer wraps a Pion PeerConnection bound to a Source's subscription.
type Peer struct {
resourceID string
pc *webrtc.PeerConnection
answer webrtc.SessionDescription
source *Source
sub chan *rtp.Packet
done chan struct{}
once sync.Once
}
// CreatePeer builds a PeerConnection, sets the remote offer, generates an
// answer, attaches video+audio tracks fed from src, and blocks until ICE
// gathering completes or ctx expires.
func (f *PeerFactory) CreatePeer(ctx context.Context, src *Source, offer webrtc.SessionDescription) (*Peer, error) {
pc, err := f.api.NewPeerConnection(f.rtcConfig)
if err != nil {
return nil, fmt.Errorf("webrtc: new peer connection: %w", err)
}
videoTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264},
"video", "dragonfork")
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: new video track: %w", err)
}
audioTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus},
"audio", "dragonfork")
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: new audio track: %w", err)
}
if _, err := pc.AddTrack(videoTrack); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: add video track: %w", err)
}
if _, err := pc.AddTrack(audioTrack); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: add audio track: %w", err)
}
if err := pc.SetRemoteDescription(offer); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: set remote: %w", err)
}
answer, err := pc.CreateAnswer(nil)
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: create answer: %w", err)
}
gatherComplete := webrtc.GatheringCompletePromise(pc)
if err := pc.SetLocalDescription(answer); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: set local: %w", err)
}
select {
case <-gatherComplete:
case <-ctx.Done():
_ = pc.Close()
return nil, ErrICETimeout
}
sub := src.Subscribe(64)
p := &Peer{
resourceID: newResourceID(),
pc: pc,
answer: *pc.LocalDescription(),
source: src,
sub: sub,
done: make(chan struct{}),
}
pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) {
if st == webrtc.PeerConnectionStateFailed ||
st == webrtc.PeerConnectionStateDisconnected ||
st == webrtc.PeerConnectionStateClosed {
_ = p.Close()
}
})
go forwardRTP(p.done, sub, videoTrack, audioTrack)
return p, nil
}
// Answer returns the locally-created SDP answer. Valid after CreatePeer.
func (p *Peer) Answer() webrtc.SessionDescription { return p.answer }
// ResourceID returns the stable resource id used in the WHEP Location header.
func (p *Peer) ResourceID() string { return p.resourceID }
// Close tears down the peer connection and unsubscribes from the source.
// Safe to call multiple times.
func (p *Peer) Close() error {
var err error
p.once.Do(func() {
close(p.done)
p.source.Unsubscribe(p.sub)
err = p.pc.Close()
})
return err
}
func newResourceID() string {
b := make([]byte, 8)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}
```
- [ ] **Step 4: Create `core/webrtc/forward.go` with the RTP forwarder**
```go
package webrtc
import (
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
)
// forwardRTP reads packets from sub and writes them to the correct track
// based on payload type (H.264 → video, Opus → audio). Payload-type
// inspection is the simplest M1 approach; M2 will switch to per-track
// source channels once the process resolver manages separate video/audio
// UDP ports.
func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet,
video, audio *webrtc.TrackLocalStaticRTP) {
for {
select {
case <-done:
return
case pkt, ok := <-sub:
if !ok {
return
}
// Pion default H.264 PT = 102, Opus PT = 111. If the publisher
// uses different PTs we'll revisit in M2 — for M1 PoC we
// configure FFmpeg to these values explicitly in the publisher
// script.
switch pkt.PayloadType {
case 102:
_ = video.WriteRTP(pkt)
case 111:
_ = audio.WriteRTP(pkt)
default:
// Unknown PT — drop. Log in M3.
}
}
}
}
```
- [ ] **Step 5: Run the tests and verify they pass**
Run:
```bash
go test ./core/webrtc/ -run TestPeerFactory -v
```
Expected: PASS for `TestPeerFactory_CreateAnswer` and `TestPeerFactory_ClosesCleanly`.
- [ ] **Step 6: Commit**
```bash
git add core/webrtc/peer.go core/webrtc/peer_test.go core/webrtc/forward.go
git commit -m "feat(webrtc): add PeerFactory, Peer, and RTP forwarder"
```
---
## Task 8: WHEP HTTP handler (happy path only)
**Files:**
- Create: `core/webrtc/whep.go`
- Create: `core/webrtc/whep_test.go`
For M1, the WHEP handler supports only `POST /whep/{stream_id}` happy path. Error paths (404/406/503) and DELETE/PATCH come in M3.
- [ ] **Step 1: Write the failing test `core/webrtc/whep_test.go`**
```go
package webrtc
import (
"context"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/pion/webrtc/v4"
)
func TestWHEP_POSTReturns201WithSDP(t *testing.T) {
// Set up a Source and register it.
src, _ := NewSource("streamA", 0)
defer src.Close()
src.Start()
reg := NewRegistry()
_ = reg.Register("streamA", src)
factory, _ := NewPeerFactory(DefaultConfig())
handler := NewWHEPHandler(reg, factory, DefaultConfig())
// Build an offer using a throwaway PC.
me := &webrtc.MediaEngine{}
_ = me.RegisterDefaultCodecs()
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
pc, _ := api.NewPeerConnection(webrtc.Configuration{})
defer pc.Close()
_, _ = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly})
_, _ = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly})
offer, _ := pc.CreateOffer(nil)
req := httptest.NewRequest(http.MethodPost, "/whep/streamA",
strings.NewReader(offer.SDP))
req.Header.Set("Content-Type", "application/sdp")
// Give the handler generous ICE gathering time in tests.
ctx, cancel := context.WithTimeout(req.Context(), 10*time.Second)
defer cancel()
req = req.WithContext(ctx)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
if rr.Code != http.StatusCreated {
body, _ := io.ReadAll(rr.Result().Body)
t.Fatalf("status = %d, want 201. body=%s", rr.Code, string(body))
}
if ct := rr.Header().Get("Content-Type"); ct != "application/sdp" {
t.Errorf("Content-Type = %q, want application/sdp", ct)
}
if loc := rr.Header().Get("Location"); !strings.HasPrefix(loc, "/whep/streamA/") {
t.Errorf("Location = %q, want /whep/streamA/<id>", loc)
}
if !strings.Contains(rr.Body.String(), "v=0") {
t.Errorf("body does not look like SDP: %s", rr.Body.String())
}
}
```
- [ ] **Step 2: Run the test and verify it fails**
Run:
```bash
go test ./core/webrtc/ -run TestWHEP -v
```
Expected: FAIL with "undefined: NewWHEPHandler".
- [ ] **Step 3: Write the minimal implementation `core/webrtc/whep.go`**
```go
package webrtc
import (
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"github.com/pion/webrtc/v4"
)
// WHEPHandler serves the WebRTC-HTTP Egress Protocol POST.
type WHEPHandler struct {
registry *Registry
factory *PeerFactory
config Config
mu sync.Mutex
peers map[string]*Peer // resourceID → Peer
peersCount int64 // atomic, for cap check without lock
}
// NewWHEPHandler constructs a handler with the given dependencies.
func NewWHEPHandler(r *Registry, f *PeerFactory, c Config) *WHEPHandler {
return &WHEPHandler{
registry: r,
factory: f,
config: c,
peers: make(map[string]*Peer),
}
}
// ServeHTTP handles POST /whep/{stream_id}. Other methods and paths return 405.
func (h *WHEPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.Header().Set("Allow", "POST")
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
// Extract stream_id from path: /whep/{stream_id}
streamID := strings.TrimPrefix(r.URL.Path, "/whep/")
if streamID == "" || strings.Contains(streamID, "/") {
http.Error(w, "invalid stream id", http.StatusBadRequest)
return
}
// Peer cap enforcement (happy path still respects the cap).
if atomic.LoadInt64(&h.peersCount) >= int64(h.config.MaxPeersTotal) {
http.Error(w, ErrPeerCapReached.Error(), http.StatusServiceUnavailable)
return
}
handle, ok := h.registry.Lookup(streamID)
if !ok {
http.Error(w, ErrStreamNotFound.Error(), http.StatusNotFound)
return
}
src, ok := handle.(*Source)
if !ok {
http.Error(w, "registered source is not a *Source", http.StatusInternalServerError)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "read body: "+err.Error(), http.StatusBadRequest)
return
}
if len(body) == 0 {
http.Error(w, ErrInvalidSDP.Error(), http.StatusBadRequest)
return
}
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
peer, err := h.factory.CreatePeer(r.Context(), src, offer)
if err != nil {
http.Error(w, "create peer: "+err.Error(), http.StatusInternalServerError)
return
}
h.mu.Lock()
h.peers[peer.ResourceID()] = peer
h.mu.Unlock()
atomic.AddInt64(&h.peersCount, 1)
w.Header().Set("Content-Type", "application/sdp")
w.Header().Set("Location", "/whep/"+streamID+"/"+peer.ResourceID())
w.WriteHeader(http.StatusCreated)
_, _ = io.WriteString(w, peer.Answer().SDP)
}
```
- [ ] **Step 4: Run the tests and verify they pass**
Run:
```bash
go test ./core/webrtc/ -run TestWHEP -v
```
Expected: PASS for `TestWHEP_POSTReturns201WithSDP`.
- [ ] **Step 5: Run the full package test suite**
Run:
```bash
go test ./core/webrtc/... -v -race
```
Expected: ALL PASS. No race warnings.
- [ ] **Step 6: Commit**
```bash
git add core/webrtc/whep.go core/webrtc/whep_test.go
git commit -m "feat(webrtc): add WHEP POST handler (happy path)"
```
---
## Task 9: Standalone PoC binary
**Files:**
- Create: `cmd/webrtc-poc/main.go`
The PoC binary wires everything together: creates a Source, registers it, starts the WHEP handler, and blocks. M2 replaces this with integration into datarhei Core's normal startup.
- [ ] **Step 1: Write `cmd/webrtc-poc/main.go`**
```go
// Command webrtc-poc runs a minimal Dragon Fork WebRTC egress server for
// manual end-to-end testing. It listens for RTP on 127.0.0.1:10000 as
// stream "test" and serves WHEP at :8787.
//
// This is NOT part of the datarhei Core binary. It will be removed or
// demoted to an internal test helper once milestone M2 lands.
package main
import (
"flag"
"log"
"net/http"
"<FORK_MODULE_PATH>/core/webrtc"
)
func main() {
var (
streamID = flag.String("stream", "test", "stream id to serve")
rtpPort = flag.Int("rtp-port", 10000, "UDP port to receive RTP on")
listen = flag.String("listen", ":8787", "WHEP HTTP listen address")
publicIP = flag.String("public-ip", "", "server public IP for NAT1To1 (optional)")
)
flag.Parse()
cfg := webrtc.DefaultConfig()
cfg.WHEPListen = *listen
cfg.PublicIP = *publicIP
src, err := webrtc.NewSource(*streamID, *rtpPort)
if err != nil {
log.Fatalf("NewSource: %v", err)
}
src.Start()
defer src.Close()
log.Printf("listening for RTP on %s", src.LocalAddr())
reg := webrtc.NewRegistry()
if err := reg.Register(*streamID, src); err != nil {
log.Fatalf("Register: %v", err)
}
factory, err := webrtc.NewPeerFactory(cfg)
if err != nil {
log.Fatalf("NewPeerFactory: %v", err)
}
handler := webrtc.NewWHEPHandler(reg, factory, cfg)
mux := http.NewServeMux()
mux.Handle("/whep/", handler)
log.Printf("WHEP listening on %s — POST /whep/%s to subscribe", *listen, *streamID)
log.Fatal(http.ListenAndServe(*listen, mux))
}
```
- [ ] **Step 2: Substitute the real module path**
Open the file and replace `<FORK_MODULE_PATH>` with the actual module path from your fork's `go.mod` first line. For example, if `go.mod` starts with `module github.com/wilddragon/datarhei-dragonfork-core`, the import becomes:
```go
import "github.com/wilddragon/datarhei-dragonfork-core/core/webrtc"
```
Do this once by editing the file — do not commit the placeholder string.
- [ ] **Step 3: Build the binary**
Run:
```bash
go build -o /tmp/webrtc-poc ./cmd/webrtc-poc
```
Expected: binary produced at `/tmp/webrtc-poc`, no build errors.
- [ ] **Step 4: Smoke-run it**
Run:
```bash
/tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787 &
sleep 1
curl -sS -X POST -H 'Content-Type: application/sdp' \
--data 'v=0' http://127.0.0.1:8787/whep/test
```
Expected: curl returns a response (will likely be 400 or 500 because the body isn't a real offer — that's fine; the important thing is the server is up and routing the request). The server log shows the POST arrived.
Kill the server:
```bash
kill %1
wait 2>/dev/null
```
- [ ] **Step 5: Commit**
```bash
git add cmd/webrtc-poc/main.go
git commit -m "feat(webrtc): add standalone webrtc-poc binary for M1 testing"
```
---
## Task 10: FFmpeg publisher script
**Files:**
- Create: `test/publish.sh`
A shell script that runs FFmpeg to generate a test pattern and push it as RTP to the PoC binary's port. Uses `testsrc2` with a burned-in timecode (useful later for latency measurement).
- [ ] **Step 1: Create `test/publish.sh`**
```bash
#!/usr/bin/env bash
# test/publish.sh — Dragon Fork M1 publisher
#
# Pushes an FFmpeg testsrc2 pattern as H.264 + Opus RTP to the webrtc-poc
# binary's local UDP port(s). Requires FFmpeg 6.x.
set -euo pipefail
HOST="${HOST:-127.0.0.1}"
VIDEO_PORT="${VIDEO_PORT:-10000}"
AUDIO_PORT="${AUDIO_PORT:-10002}"
FPS="${FPS:-30}"
SIZE="${SIZE:-640x360}"
echo "publishing testsrc2 → $HOST:$VIDEO_PORT (video, PT=102)"
echo " $HOST:$AUDIO_PORT (audio, PT=111)"
exec ffmpeg -hide_banner -re \
-f lavfi -i "testsrc2=size=${SIZE}:rate=${FPS}" \
-f lavfi -i "sine=frequency=440:sample_rate=48000" \
-vf "drawtext=text='%{localtime\\:%H\\\\\\:%M\\\\\\:%S.%3N}':x=10:y=10:fontsize=32:fontcolor=white:box=1:boxcolor=black@0.8" \
-c:v libx264 -preset ultrafast -tune zerolatency \
-profile:v baseline -pix_fmt yuv420p \
-b:v 1500k -maxrate 1500k -bufsize 500k \
-g 60 -keyint_min 60 -x264-params "repeat-headers=1" \
-payload_type 102 -f rtp "rtp://${HOST}:${VIDEO_PORT}?pkt_size=1200" \
-c:a libopus -b:a 128k \
-payload_type 111 -f rtp "rtp://${HOST}:${AUDIO_PORT}?pkt_size=1200"
```
- [ ] **Step 2: Make it executable**
Run:
```bash
chmod +x test/publish.sh
```
- [ ] **Step 3: Smoke-test it against the PoC binary**
In one terminal:
```bash
/tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787
```
In another (will fail until we bind both ports — M1 uses a single source port; the script pushes audio to 10002 but the PoC only listens on 10000 — that's fine for M1 since we're primarily testing video):
```bash
./test/publish.sh
```
Expected: FFmpeg runs and prints RTP packet stats. The PoC binary logs nothing specifically (it's silently reading RTP — we haven't added verbose logging in M1). Let it run 3 seconds, then Ctrl+C both.
(In M2 we'll separate video and audio into distinct sources; for M1 the single-port video-only path is enough to prove the pipeline.)
- [ ] **Step 4: Commit**
```bash
git add test/publish.sh
git commit -m "test: add FFmpeg publisher script for M1 PoC"
```
---
## Task 11: Pion-based test WHEP client
**Files:**
- Create: `test/whep-client/main.go`
- Create: `test/whep-client/main_test.go`
A Go binary that acts as a headless WHEP client: POSTs an SDP offer, parses the SDP answer, establishes a `PeerConnection`, and verifies that RTP packets arrive. Essential for automated end-to-end verification without a browser in the loop.
- [ ] **Step 1: Create `test/whep-client/main.go`**
```go
// Command whep-client subscribes to a Dragon Fork WHEP endpoint and logs
// the first N received RTP packets, then exits. Used for M1 end-to-end
// verification.
package main
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"sync/atomic"
"time"
"github.com/pion/webrtc/v4"
)
func main() {
var (
whepURL = flag.String("url", "http://127.0.0.1:8787/whep/test", "WHEP endpoint URL")
wantPkt = flag.Int("pkts", 30, "exit after receiving this many video RTP packets")
timeout = flag.Duration("timeout", 15*time.Second, "overall timeout")
)
flag.Parse()
if err := run(*whepURL, *wantPkt, *timeout); err != nil {
log.Fatalf("whep-client: %v", err)
}
}
func run(whepURL string, wantPkt int, timeout time.Duration) error {
me := &webrtc.MediaEngine{}
if err := me.RegisterDefaultCodecs(); err != nil {
return fmt.Errorf("register codecs: %w", err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
pc, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return fmt.Errorf("new pc: %w", err)
}
defer pc.Close()
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
return err
}
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
return err
}
var videoCount int64
pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
kind := track.Kind().String()
log.Printf("OnTrack: kind=%s codec=%s", kind, track.Codec().MimeType)
go func() {
buf := make([]byte, 1500)
for {
n, _, err := track.Read(buf)
if err != nil {
log.Printf("track.Read (%s): %v", kind, err)
return
}
if kind == "video" {
atomic.AddInt64(&videoCount, 1)
}
_ = n
}
}()
})
offer, err := pc.CreateOffer(nil)
if err != nil {
return err
}
gatherComplete := webrtc.GatheringCompletePromise(pc)
if err := pc.SetLocalDescription(offer); err != nil {
return err
}
<-gatherComplete
offer = *pc.LocalDescription()
answerSDP, err := httpPostSDP(whepURL, offer.SDP)
if err != nil {
return fmt.Errorf("WHEP POST: %w", err)
}
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer, SDP: answerSDP,
}); err != nil {
return fmt.Errorf("set remote: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
deadline := time.After(timeout)
tick := time.NewTicker(250 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-tick.C:
if atomic.LoadInt64(&videoCount) >= int64(wantPkt) {
log.Printf("OK: received %d video packets", videoCount)
fmt.Fprintln(os.Stdout, "PASS")
return nil
}
case <-deadline:
return fmt.Errorf("timeout after %s: only %d video packets received",
timeout, atomic.LoadInt64(&videoCount))
case <-ctx.Done():
return ctx.Err()
}
}
}
func httpPostSDP(url, sdp string) (string, error) {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBufferString(sdp))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/sdp")
req.Header.Set("Accept", "application/sdp")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusCreated {
return "", fmt.Errorf("status %d: %s", resp.StatusCode, string(body))
}
return string(body), nil
}
```
- [ ] **Step 2: Create `test/whep-client/main_test.go`**
```go
package main
import (
"strings"
"testing"
)
// Placeholder unit test. The real validation happens end-to-end in Task 12.
// This at least keeps `go test ./test/whep-client/...` happy in CI later.
func TestHTTPPostSDP_RejectsNon2xx(t *testing.T) {
_, err := httpPostSDP("http://127.0.0.1:1/whep/none", "v=0\n")
if err == nil {
t.Fatal("expected error from unreachable endpoint")
}
if !strings.Contains(err.Error(), "dial") && !strings.Contains(err.Error(), "connect") {
t.Errorf("expected dial/connect error, got: %v", err)
}
}
```
- [ ] **Step 3: Build and test**
Run:
```bash
go build -o /tmp/whep-client ./test/whep-client
go test ./test/whep-client/... -v
```
Expected: binary builds; `TestHTTPPostSDP_RejectsNon2xx` passes.
- [ ] **Step 4: Commit**
```bash
git add test/whep-client/
git commit -m "test: add Pion-based WHEP client for end-to-end M1 verification"
```
---
## Task 12: End-to-end PoC verification
**Files:** None (runtime verification only)
This is the moment of truth — FFmpeg → webrtc-poc → whep-client, all the way through, with the whep-client confirming it decoded video.
- [ ] **Step 1: Open three terminals**
Terminal A — the PoC server.
Terminal B — the FFmpeg publisher.
Terminal C — the test WHEP client.
- [ ] **Step 2: Start the PoC server (Terminal A)**
Run:
```bash
/tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787
```
Expected: server logs `listening for RTP on 127.0.0.1:10000` and `WHEP listening on :8787 — POST /whep/test to subscribe`. Leave running.
- [ ] **Step 3: Start the FFmpeg publisher (Terminal B)**
Run:
```bash
./test/publish.sh
```
Expected: FFmpeg begins producing frames and reporting throughput (something like `frame= 30 fps=30 q=...`). Leave running.
- [ ] **Step 4: Run the WHEP test client (Terminal C)**
Run:
```bash
/tmp/whep-client -url http://127.0.0.1:8787/whep/test -pkts 30 -timeout 15s
```
Expected output within a few seconds:
```
OnTrack: kind=video codec=video/H264
OK: received 30 video packets
PASS
```
The client exits 0. **This is M1's success criterion.**
- [ ] **Step 5: If it fails, systematic debug**
If the client times out:
1. Confirm the FFmpeg publisher is actually producing RTP: run `tcpdump -i lo -n udp port 10000` in a fourth terminal — you should see packets. If not, the publisher is broken; re-check payload types and destination port in `publish.sh`.
2. Confirm the PoC server is receiving RTP: temporarily add a log line in `source.go` readLoop (`log.Printf("rtp seq=%d", pkt.SequenceNumber)`) and rebuild. If nothing logs, the UDP bind is wrong.
3. Confirm the PeerConnection is establishing: check the WHEP POST response was 201; check the client's `pc.ConnectionState()` transitions through `connecting` to `connected`.
4. Confirm codec payload types match: the publish script uses PT 102 for H.264 and 111 for Opus; Pion's default H.264 PT is 102 and Opus is 111. If you see RTP arriving in the reader but `forwardRTP` drops them, the PTs don't match — add a log line in `forward.go` to confirm.
Log each issue and fix in `NOTES.md` as you go — those are exactly the gotchas we want recorded for M2+.
- [ ] **Step 6: Tear down**
Ctrl+C in all three terminals.
- [ ] **Step 7: Document M1 success in NOTES.md**
Append to `NOTES.md`:
```markdown
## M1 PoC verified — <DATE>
FFmpeg (testsrc2 + drawtext timecode, H.264 baseline, Opus) → webrtc-poc → Pion WHEP client.
- Video PT: 102 / Audio PT: 111 (matched between publisher and client)
- ICE gathering completed within ~<N>s
- First video packet received ~<N>s after POST (bounded by 2s GOP once we add forced keyframes in M2)
- <Any gotchas encountered, with fixes>
Ready for M2 (datarhei process-model integration).
```
Run:
```bash
git add NOTES.md
git commit -m "docs: record M1 PoC success and observations"
```
- [ ] **Step 8: Push the branch**
Run:
```bash
git push -u origin m1-webrtc-poc
```
Expected: branch pushed to your fork's remote. M1 complete.
---
## Exit Criteria
M1 is done when **all of the following are true**:
1. `go build ./...` succeeds on the fork.
2. `go test ./core/webrtc/... -race` passes — all unit tests green, no race warnings.
3. `test/whep-client/main.go` prints `PASS` against a running `webrtc-poc` + `test/publish.sh` pair within 15 seconds.
4. `NOTES.md` records the verification run and any gotchas encountered.
5. Branch `m1-webrtc-poc` is pushed to the fork remote.
---
## What comes next
- **M2 plan** will be written after M1 is verified. It covers: adding the `webrtc://` URL scheme to datarhei Core's output resolver, wiring the Registry and WHEPHandler into Core's normal startup, separating video/audio onto distinct UDP ports, and hooking into the process lifecycle so sources register/deregister automatically with the existing FFmpeg processes datarhei already manages.
- **M3 plan** will cover multi-viewer robustness: DELETE/PATCH WHEP methods, error-path HTTP codes (404/406/503), the admin API endpoints, and PLI absorption + RTCP BYE on teardown.
- **M4 plan** will cover CI integration: running the end-to-end harness from Task 12 in CI, the pixel-sampling latency measurement, and the p95 gates.
- **M5 plan** will cover branding and release: logo swap in the Restreamer Vue UI, README rewrite with upstream attribution, `NOTICE`/`CREDITS`, Docker image publishing, and tagging `v0.1.0-dragonfork`.
Each follow-on plan is small enough (~37 days of work) that writing it after M1 lets us incorporate lessons learned without re-planning from the top.