datarhei-dragonfork-core/docs/design/2026-04-16-datarhei-dragon-fork-m1-webrtc-poc.md

54 KiB
Executable file
Raw Permalink Blame History

Datarhei - Dragon Fork M1: Media-Path PoC Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Prove a working end-to-end WebRTC egress path: an FFmpeg publisher pushes RTP into a new Go package, which serves it to a Pion-based WHEP client that successfully decodes video frames.

Architecture: New standalone Go package core/webrtc inside the datarhei Core fork. FFmpeg produces RTP on a local UDP socket → package reads RTP → WHEP HTTP endpoint serves it via Pion PeerConnection → test client subscribes and decodes. No datarhei process-model integration yet (that's M2). This milestone answers: does the integration pattern actually work, and what are the gotchas?

Tech Stack: Go 1.22+, Pion WebRTC v4 (github.com/pion/webrtc/v4), Pion RTP, FFmpeg 6.x (publisher + test pattern), standard library net/http.

Out of scope for this plan: datarhei process-model integration (M2), multi-viewer fan-out polish (M3), CI test harness (M4), branding (M5). Separate plans will be written for each after M1 completes.


Prerequisites

  • Go 1.22+ installed locally
  • git configured
  • FFmpeg 6.x on PATH (ffmpeg -version reports 6.0 or newer)
  • A Git host account (GitHub recommended for initial fork)
  • Linux or macOS development machine (Windows works but UDP port behavior differs; document what you're on when filing any issues)

File Structure

Files created in this milestone:

core/webrtc/
  config.go            # Config struct + defaults + Validate()
  registry.go          # stream_id → *Source map, thread-safe
  registry_test.go
  source.go            # RTP UDP reader + fan-out ring buffer
  source_test.go
  peer.go              # PeerConnection factory, track attachment
  peer_test.go
  whep.go              # HTTP handler: POST /whep/{stream_id}
  whep_test.go
  ice.go               # SettingEngine builder (NAT1To1, ICE servers)
  ice_test.go
  errors.go            # Typed error values (ErrStreamNotFound, etc.)

cmd/webrtc-poc/
  main.go              # Standalone PoC binary (NOT datarhei Core yet)

test/
  publish.sh           # FFmpeg publisher script (testsrc2 → local RTP)
  whep-client/
    main.go            # Pion-based test WHEP subscriber
    main_test.go

docs/design/
  (copy of the approved spec from brainstorming)

Total new Go files: 11 source + 5 test = 16 files. Total lines: ~1200-1500 including tests and comments.


Task 0: Fork the repo and set up the workspace

Files:

  • Create: new fork of datarhei/core

Rationale: Everything else depends on having a fork to commit into. No code yet — just repo setup.

  • Step 1: Fork datarhei/core on your Git host

On GitHub: navigate to https://github.com/datarhei/core, click Fork, name the fork datarhei-dragonfork-core under your wilddragon org (or personal account).

  • Step 2: Clone the fork locally

Run:

git clone git@github.com:wilddragon/datarhei-dragonfork-core.git
cd datarhei-dragonfork-core

Expected: repo cloned, git status clean on main branch.

  • Step 3: Create the M1 working branch

Run:

git checkout -b m1-webrtc-poc

Expected: branch created and checked out.

  • Step 4: Copy the approved design spec into the repo
mkdir -p docs/design
cp /path/to/2026-04-16-datarhei-dragon-fork-webrtc-design.md docs/design/

(Path will be wherever you saved the spec from the brainstorming session.)

  • Step 5: Verify the repo builds unchanged

Run:

go build ./...

Expected: build succeeds. If it fails, the fork is broken before you started — stop and fix upstream issues first.

  • Step 6: Run upstream tests

Run:

go test ./...

Expected: all tests pass (or at least match what upstream CI shows green). Document any pre-existing flakes in a NOTES.md file so you don't later blame your changes for them.

  • Step 7: Commit the spec and a NOTES.md baseline
git add docs/design/2026-04-16-datarhei-dragon-fork-webrtc-design.md NOTES.md
git commit -m "docs: add Dragon Fork WebRTC egress design spec"

Task 1: Add Pion WebRTC dependency

Files:

  • Modify: go.mod

  • Modify: go.sum

  • Step 1: Add Pion dependencies

Run from repo root:

go get github.com/pion/webrtc/v4@latest
go get github.com/pion/rtp@latest
go get github.com/pion/rtcp@latest

Expected: go.mod updated with new require entries; go.sum updated.

  • Step 2: Tidy dependencies

Run:

go mod tidy

Expected: no errors. go.mod stable.

  • Step 3: Sanity-check that Pion loads

Create a throwaway file /tmp/pion_smoke.go:

package main

import (
	"fmt"

	"github.com/pion/webrtc/v4"
)

func main() {
	api := webrtc.NewAPI()
	pc, err := api.NewPeerConnection(webrtc.Configuration{})
	if err != nil {
		panic(err)
	}
	fmt.Println("Pion OK, state:", pc.ConnectionState())
	_ = pc.Close()
}

Run:

go run /tmp/pion_smoke.go

Expected output: Pion OK, state: new (or similar). Delete the file after.

  • Step 4: Commit
git add go.mod go.sum
git commit -m "build: add Pion WebRTC v4 dependency"

Task 2: Create the core/webrtc package skeleton + typed errors

Files:

  • Create: core/webrtc/errors.go

  • Create: core/webrtc/doc.go

  • Step 1: Create core/webrtc/doc.go

// Package webrtc implements the Dragon Fork WebRTC egress module.
//
// It exposes a WHEP (WebRTC-HTTP Egress Protocol) HTTP endpoint and serves
// live RTP produced by an FFmpeg process on a local UDP socket to one or
// more WebRTC peer connections built with Pion.
//
// This package is additive: it does not modify existing datarhei ingest,
// transcode, or non-WebRTC output code paths. The only contact with
// existing code is a new URL scheme ("webrtc://") registered with the
// output resolver (done in milestone M2, not here).
package webrtc
  • Step 2: Create core/webrtc/errors.go
package webrtc

import "errors"

// Sentinel errors returned by package functions.
var (
	// ErrStreamNotFound indicates a WHEP subscribe referenced a stream_id
	// that has no registered source. Maps to HTTP 404.
	ErrStreamNotFound = errors.New("webrtc: stream not found")

	// ErrPeerCapReached indicates max_peers_total has been exceeded.
	// Maps to HTTP 503.
	ErrPeerCapReached = errors.New("webrtc: peer capacity reached")

	// ErrCodecMismatch indicates the viewer's SDP offer does not include
	// a codec the source can serve (expected H.264 + Opus). Maps to HTTP 406.
	ErrCodecMismatch = errors.New("webrtc: codec mismatch")

	// ErrInvalidSDP indicates the request body was not a parseable SDP offer.
	// Maps to HTTP 400.
	ErrInvalidSDP = errors.New("webrtc: invalid SDP")

	// ErrICETimeout indicates ICE gathering did not complete within the
	// configured timeout. Maps to HTTP 500.
	ErrICETimeout = errors.New("webrtc: ICE gathering timeout")
)
  • Step 3: Verify the package compiles

Run:

go build ./core/webrtc/...

Expected: no output (successful build).

  • Step 4: Commit
git add core/webrtc/doc.go core/webrtc/errors.go
git commit -m "feat(webrtc): add package skeleton and typed errors"

Task 3: Config struct

Files:

  • Create: core/webrtc/config.go

  • Create: core/webrtc/config_test.go

  • Step 1: Write the failing test core/webrtc/config_test.go

package webrtc

import (
	"testing"
)

func TestConfig_Defaults(t *testing.T) {
	c := DefaultConfig()
	if !c.Enabled {
		t.Error("default Enabled should be true")
	}
	if c.WHEPListen != ":8787" {
		t.Errorf("default WHEPListen = %q, want :8787", c.WHEPListen)
	}
	if c.UDPPortRange.Low != 10000 || c.UDPPortRange.High != 10100 {
		t.Errorf("default UDPPortRange = %v, want 10000-10100", c.UDPPortRange)
	}
	if c.MaxPeersTotal != 32 {
		t.Errorf("default MaxPeersTotal = %d, want 32", c.MaxPeersTotal)
	}
	if len(c.ICEServers) == 0 {
		t.Error("default ICEServers should have at least one STUN entry")
	}
}

func TestConfig_Validate(t *testing.T) {
	tests := []struct {
		name    string
		mutate  func(*Config)
		wantErr bool
	}{
		{"defaults are valid", func(c *Config) {}, false},
		{"empty listen", func(c *Config) { c.WHEPListen = "" }, true},
		{"inverted port range", func(c *Config) { c.UDPPortRange.Low = 20000; c.UDPPortRange.High = 10000 }, true},
		{"zero max peers", func(c *Config) { c.MaxPeersTotal = 0 }, true},
		{"negative max peers", func(c *Config) { c.MaxPeersTotal = -1 }, true},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			c := DefaultConfig()
			tt.mutate(&c)
			err := c.Validate()
			if (err != nil) != tt.wantErr {
				t.Errorf("Validate() err = %v, wantErr %v", err, tt.wantErr)
			}
		})
	}
}
  • Step 2: Run the test and verify it fails

Run:

go test ./core/webrtc/ -run TestConfig -v

Expected: FAIL with "undefined: Config" / "undefined: DefaultConfig".

  • Step 3: Write the minimal implementation core/webrtc/config.go
package webrtc

import "fmt"

// PortRange represents an inclusive UDP port range.
type PortRange struct {
	Low, High int
}

// Config controls the WebRTC egress module.
type Config struct {
	// Enabled toggles the entire module. When false, no endpoints are served.
	Enabled bool

	// WHEPListen is the address the WHEP HTTP endpoint binds to (e.g. ":8787").
	WHEPListen string

	// PublicIP is the server's externally-reachable IP, advertised in ICE
	// candidates via NAT1To1. Empty means rely on STUN discovery.
	PublicIP string

	// UDPPortRange bounds the local UDP ports allocated for FFmpeg→Pion RTP.
	UDPPortRange PortRange

	// ICEServers is the list of STUN/TURN URIs given to each PeerConnection.
	ICEServers []string

	// MaxPeersTotal is a hard safety cap on concurrent subscribers.
	MaxPeersTotal int
}

// DefaultConfig returns production-reasonable defaults.
func DefaultConfig() Config {
	return Config{
		Enabled:       true,
		WHEPListen:    ":8787",
		PublicIP:      "",
		UDPPortRange:  PortRange{Low: 10000, High: 10100},
		ICEServers:    []string{"stun:stun.cloudflare.com:3478", "stun:stun.l.google.com:19302"},
		MaxPeersTotal: 32,
	}
}

// Validate returns an error if the config is internally inconsistent.
func (c Config) Validate() error {
	if c.WHEPListen == "" {
		return fmt.Errorf("webrtc: WHEPListen must not be empty")
	}
	if c.UDPPortRange.Low <= 0 || c.UDPPortRange.High <= 0 {
		return fmt.Errorf("webrtc: UDPPortRange must have positive bounds, got %v", c.UDPPortRange)
	}
	if c.UDPPortRange.Low > c.UDPPortRange.High {
		return fmt.Errorf("webrtc: UDPPortRange.Low > High (%d > %d)", c.UDPPortRange.Low, c.UDPPortRange.High)
	}
	if c.MaxPeersTotal <= 0 {
		return fmt.Errorf("webrtc: MaxPeersTotal must be positive, got %d", c.MaxPeersTotal)
	}
	return nil
}
  • Step 4: Run the tests and verify they pass

Run:

go test ./core/webrtc/ -run TestConfig -v

Expected: PASS for both TestConfig_Defaults and TestConfig_Validate (all subtests).

  • Step 5: Commit
git add core/webrtc/config.go core/webrtc/config_test.go
git commit -m "feat(webrtc): add Config with defaults and validation"

Task 4: Registry — stream_id → Source mapping

Files:

  • Create: core/webrtc/registry.go
  • Create: core/webrtc/registry_test.go

A Source (defined in the next task) represents a live RTP stream that peers can subscribe to. The Registry is the thread-safe map that maps stream IDs to active sources. Writing this first because Source depends on it less than it does on Source.

  • Step 1: Write the failing test core/webrtc/registry_test.go
package webrtc

import (
	"sync"
	"testing"
)

// mockSource implements the minimum Source-like shape needed by the registry.
// The real Source type is defined in Task 5; the registry only needs a
// stable type to store and retrieve.
type mockSource struct {
	id string
}

func (m *mockSource) ID() string { return m.id }

func TestRegistry_RegisterAndLookup(t *testing.T) {
	r := NewRegistry()
	src := &mockSource{id: "streamA"}

	if err := r.Register("streamA", src); err != nil {
		t.Fatalf("Register returned error: %v", err)
	}

	got, ok := r.Lookup("streamA")
	if !ok {
		t.Fatal("Lookup(streamA) returned ok=false, want true")
	}
	if got != src {
		t.Errorf("Lookup returned %v, want %v", got, src)
	}
}

func TestRegistry_LookupMissing(t *testing.T) {
	r := NewRegistry()
	_, ok := r.Lookup("nope")
	if ok {
		t.Error("Lookup on empty registry returned ok=true, want false")
	}
}

func TestRegistry_DuplicateRegister(t *testing.T) {
	r := NewRegistry()
	_ = r.Register("streamA", &mockSource{id: "streamA"})

	if err := r.Register("streamA", &mockSource{id: "streamA"}); err == nil {
		t.Error("duplicate Register should return error, got nil")
	}
}

func TestRegistry_Deregister(t *testing.T) {
	r := NewRegistry()
	_ = r.Register("streamA", &mockSource{id: "streamA"})
	r.Deregister("streamA")

	if _, ok := r.Lookup("streamA"); ok {
		t.Error("after Deregister, Lookup should return ok=false")
	}
}

func TestRegistry_ConcurrentAccess(t *testing.T) {
	r := NewRegistry()
	var wg sync.WaitGroup

	for i := 0; i < 100; i++ {
		wg.Add(3)
		id := string(rune('a' + (i % 26)))
		go func() { defer wg.Done(); _ = r.Register(id, &mockSource{id: id}) }()
		go func() { defer wg.Done(); _, _ = r.Lookup(id) }()
		go func() { defer wg.Done(); r.Deregister(id) }()
	}
	wg.Wait()
	// No assertion — test passes if -race doesn't flag anything.
}
  • Step 2: Run the test and verify it fails

Run:

go test ./core/webrtc/ -run TestRegistry -v

Expected: FAIL with "undefined: NewRegistry".

  • Step 3: Write the minimal implementation core/webrtc/registry.go
package webrtc

import (
	"fmt"
	"sync"
)

// SourceHandle is the minimal interface the Registry stores per stream_id.
// The concrete type is *Source, defined in source.go.
type SourceHandle interface {
	ID() string
}

// Registry is a thread-safe map from stream_id to active SourceHandle.
type Registry struct {
	mu      sync.RWMutex
	streams map[string]SourceHandle
}

// NewRegistry returns an empty Registry.
func NewRegistry() *Registry {
	return &Registry{streams: make(map[string]SourceHandle)}
}

// Register associates src with streamID. Returns an error if streamID is
// already registered.
func (r *Registry) Register(streamID string, src SourceHandle) error {
	r.mu.Lock()
	defer r.mu.Unlock()
	if _, exists := r.streams[streamID]; exists {
		return fmt.Errorf("webrtc: stream %q already registered", streamID)
	}
	r.streams[streamID] = src
	return nil
}

// Lookup returns the handle for streamID. The second return value is false
// if no source is registered.
func (r *Registry) Lookup(streamID string) (SourceHandle, bool) {
	r.mu.RLock()
	defer r.mu.RUnlock()
	src, ok := r.streams[streamID]
	return src, ok
}

// Deregister removes streamID. No-op if not present.
func (r *Registry) Deregister(streamID string) {
	r.mu.Lock()
	defer r.mu.Unlock()
	delete(r.streams, streamID)
}
  • Step 4: Run the tests and verify they pass

Run:

go test ./core/webrtc/ -run TestRegistry -v -race

Expected: PASS for all TestRegistry subtests, no data races.

  • Step 5: Commit
git add core/webrtc/registry.go core/webrtc/registry_test.go
git commit -m "feat(webrtc): add thread-safe Registry for stream_id → SourceHandle"

Task 5: Source — RTP UDP reader + subscriber fan-out

Files:

  • Create: core/webrtc/source.go
  • Create: core/webrtc/source_test.go

A Source owns a UDP socket bound to a local port, reads RTP packets, and forwards them to every subscribed peer's video/audio track. For M1, we deliberately keep the fan-out simple (per-subscriber goroutine writing to a buffered channel) because at 15 viewers the naive model is entirely sufficient.

  • Step 1: Write the failing test core/webrtc/source_test.go
package webrtc

import (
	"net"
	"testing"
	"time"

	"github.com/pion/rtp"
)

func TestSource_ID(t *testing.T) {
	s, err := NewSource("streamA", 0) // 0 = ephemeral port
	if err != nil {
		t.Fatalf("NewSource: %v", err)
	}
	defer s.Close()

	if s.ID() != "streamA" {
		t.Errorf("ID() = %q, want streamA", s.ID())
	}
}

func TestSource_ReceiveAndFanout(t *testing.T) {
	s, err := NewSource("streamA", 0)
	if err != nil {
		t.Fatalf("NewSource: %v", err)
	}
	defer s.Close()

	// Subscribe before sending.
	sub := s.Subscribe(16) // buffer depth 16
	defer s.Unsubscribe(sub)

	s.Start()

	// Build and send a minimal RTP packet to the source's UDP port.
	pkt := &rtp.Packet{
		Header: rtp.Header{
			Version:        2,
			PayloadType:    96,
			SequenceNumber: 1,
			Timestamp:      1000,
			SSRC:           0xDEADBEEF,
		},
		Payload: []byte{0x01, 0x02, 0x03, 0x04},
	}
	raw, err := pkt.Marshal()
	if err != nil {
		t.Fatalf("pkt.Marshal: %v", err)
	}

	conn, err := net.Dial("udp", s.LocalAddr().String())
	if err != nil {
		t.Fatalf("net.Dial: %v", err)
	}
	defer conn.Close()

	if _, err := conn.Write(raw); err != nil {
		t.Fatalf("conn.Write: %v", err)
	}

	select {
	case got := <-sub:
		if got.SSRC != 0xDEADBEEF {
			t.Errorf("received SSRC = %x, want DEADBEEF", got.SSRC)
		}
		if got.SequenceNumber != 1 {
			t.Errorf("received SeqNum = %d, want 1", got.SequenceNumber)
		}
	case <-time.After(2 * time.Second):
		t.Fatal("timed out waiting for RTP packet on subscriber channel")
	}
}

func TestSource_MultipleSubscribers(t *testing.T) {
	s, err := NewSource("streamA", 0)
	if err != nil {
		t.Fatalf("NewSource: %v", err)
	}
	defer s.Close()

	subs := []chan *rtp.Packet{
		s.Subscribe(8),
		s.Subscribe(8),
		s.Subscribe(8),
	}
	for _, sub := range subs {
		defer s.Unsubscribe(sub)
	}

	s.Start()

	raw, _ := (&rtp.Packet{
		Header:  rtp.Header{Version: 2, PayloadType: 96, SequenceNumber: 42, SSRC: 1},
		Payload: []byte{0xAA},
	}).Marshal()
	conn, _ := net.Dial("udp", s.LocalAddr().String())
	defer conn.Close()
	_, _ = conn.Write(raw)

	for i, sub := range subs {
		select {
		case got := <-sub:
			if got.SequenceNumber != 42 {
				t.Errorf("sub %d got seq %d, want 42", i, got.SequenceNumber)
			}
		case <-time.After(2 * time.Second):
			t.Errorf("sub %d timed out", i)
		}
	}
}

func TestSource_UnsubscribeStopsDelivery(t *testing.T) {
	s, _ := NewSource("streamA", 0)
	defer s.Close()
	sub := s.Subscribe(8)
	s.Start()
	s.Unsubscribe(sub)

	// After Unsubscribe, the channel should be closed.
	select {
	case _, ok := <-sub:
		if ok {
			t.Error("expected channel closed after Unsubscribe, got value")
		}
	case <-time.After(500 * time.Millisecond):
		t.Error("timed out waiting for channel close")
	}
}
  • Step 2: Run the test and verify it fails

Run:

go test ./core/webrtc/ -run TestSource -v

Expected: FAIL with "undefined: NewSource".

  • Step 3: Write the minimal implementation core/webrtc/source.go
package webrtc

import (
	"fmt"
	"net"
	"sync"

	"github.com/pion/rtp"
)

// Source reads RTP packets from a local UDP socket and fans them out to
// subscribed peers via per-subscriber buffered channels.
type Source struct {
	id   string
	conn *net.UDPConn

	mu          sync.Mutex
	subscribers map[chan *rtp.Packet]struct{}
	started     bool
	closed      bool
	done        chan struct{}
}

// NewSource binds a UDP socket on 127.0.0.1:port. Pass port=0 to let the OS
// assign an ephemeral port (useful for tests).
func NewSource(streamID string, port int) (*Source, error) {
	addr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: port}
	conn, err := net.ListenUDP("udp4", addr)
	if err != nil {
		return nil, fmt.Errorf("webrtc: listen udp: %w", err)
	}
	return &Source{
		id:          streamID,
		conn:        conn,
		subscribers: make(map[chan *rtp.Packet]struct{}),
		done:        make(chan struct{}),
	}, nil
}

// ID returns the registered stream identifier.
func (s *Source) ID() string { return s.id }

// LocalAddr returns the UDP address the source is listening on.
func (s *Source) LocalAddr() *net.UDPAddr {
	return s.conn.LocalAddr().(*net.UDPAddr)
}

// Subscribe returns a new buffered channel that receives every RTP packet
// read from the UDP socket. bufDepth is the channel buffer size; when full,
// packets are dropped (preventing a slow subscriber from back-pressuring
// the reader).
func (s *Source) Subscribe(bufDepth int) chan *rtp.Packet {
	ch := make(chan *rtp.Packet, bufDepth)
	s.mu.Lock()
	s.subscribers[ch] = struct{}{}
	s.mu.Unlock()
	return ch
}

// Unsubscribe removes ch from the subscriber set and closes it.
func (s *Source) Unsubscribe(ch chan *rtp.Packet) {
	s.mu.Lock()
	defer s.mu.Unlock()
	if _, ok := s.subscribers[ch]; ok {
		delete(s.subscribers, ch)
		close(ch)
	}
}

// Start begins the RTP reader goroutine. Safe to call once; subsequent calls
// are no-ops.
func (s *Source) Start() {
	s.mu.Lock()
	if s.started || s.closed {
		s.mu.Unlock()
		return
	}
	s.started = true
	s.mu.Unlock()

	go s.readLoop()
}

func (s *Source) readLoop() {
	buf := make([]byte, 1500) // MTU-sized; RTP over UDP should fit
	for {
		select {
		case <-s.done:
			return
		default:
		}

		n, _, err := s.conn.ReadFromUDP(buf)
		if err != nil {
			// Socket closed or error — exit the loop.
			return
		}

		pkt := &rtp.Packet{}
		if err := pkt.Unmarshal(buf[:n]); err != nil {
			// Malformed packet; skip without crashing.
			continue
		}

		s.mu.Lock()
		for ch := range s.subscribers {
			select {
			case ch <- pkt:
			default:
				// Subscriber full — drop to protect the reader.
			}
		}
		s.mu.Unlock()
	}
}

// Close stops the reader goroutine, closes the UDP socket, and closes every
// subscriber channel.
func (s *Source) Close() error {
	s.mu.Lock()
	if s.closed {
		s.mu.Unlock()
		return nil
	}
	s.closed = true
	close(s.done)
	for ch := range s.subscribers {
		delete(s.subscribers, ch)
		close(ch)
	}
	s.mu.Unlock()
	return s.conn.Close()
}
  • Step 4: Run the tests and verify they pass

Run:

go test ./core/webrtc/ -run TestSource -v -race

Expected: PASS for all TestSource subtests, no data races.

  • Step 5: Commit
git add core/webrtc/source.go core/webrtc/source_test.go
git commit -m "feat(webrtc): add Source with UDP RTP reader and subscriber fan-out"

Task 6: ICE config helper (SettingEngine builder)

Files:

  • Create: core/webrtc/ice.go
  • Create: core/webrtc/ice_test.go

Isolated helper that translates our Config into Pion's SettingEngine + Configuration pair. Keeping it separate makes peer.go simpler and the ICE config trivially testable.

  • Step 1: Write the failing test core/webrtc/ice_test.go
package webrtc

import (
	"testing"

	"github.com/pion/webrtc/v4"
)

func TestBuildICEConfig_Defaults(t *testing.T) {
	c := DefaultConfig()
	rtcConfig, _, err := BuildICEConfig(c)
	if err != nil {
		t.Fatalf("BuildICEConfig: %v", err)
	}
	if len(rtcConfig.ICEServers) == 0 {
		t.Error("ICEServers should not be empty")
	}
	// First default is Cloudflare STUN.
	if rtcConfig.ICEServers[0].URLs[0] != "stun:stun.cloudflare.com:3478" {
		t.Errorf("first ICE server = %q, want stun:stun.cloudflare.com:3478",
			rtcConfig.ICEServers[0].URLs[0])
	}
}

func TestBuildICEConfig_PublicIP(t *testing.T) {
	c := DefaultConfig()
	c.PublicIP = "203.0.113.10"
	_, se, err := BuildICEConfig(c)
	if err != nil {
		t.Fatalf("BuildICEConfig: %v", err)
	}
	if se == nil {
		t.Fatal("SettingEngine should not be nil when PublicIP is set")
	}
	// We can't introspect NAT1To1IPs directly from Pion's public API; the
	// smoke test is that building an API from this engine works.
	api := webrtc.NewAPI(webrtc.WithSettingEngine(*se))
	if api == nil {
		t.Fatal("NewAPI returned nil")
	}
}

func TestBuildICEConfig_InvalidConfig(t *testing.T) {
	c := DefaultConfig()
	c.WHEPListen = ""
	_, _, err := BuildICEConfig(c)
	if err == nil {
		t.Error("BuildICEConfig should reject invalid config")
	}
}
  • Step 2: Run the test and verify it fails

Run:

go test ./core/webrtc/ -run TestBuildICEConfig -v

Expected: FAIL with "undefined: BuildICEConfig".

  • Step 3: Write the minimal implementation core/webrtc/ice.go
package webrtc

import (
	"github.com/pion/webrtc/v4"
)

// BuildICEConfig translates a Config into the two Pion config pieces every
// PeerConnection needs: a webrtc.Configuration (with ICE servers) and a
// SettingEngine (with NAT1To1 and port range tuning).
//
// The returned *SettingEngine may be nil if no engine-level tuning is
// required (i.e. PublicIP unset and UDPPortRange at defaults). Callers
// should only pass it to webrtc.NewAPI when non-nil.
func BuildICEConfig(c Config) (webrtc.Configuration, *webrtc.SettingEngine, error) {
	if err := c.Validate(); err != nil {
		return webrtc.Configuration{}, nil, err
	}

	rtcConfig := webrtc.Configuration{
		ICEServers: make([]webrtc.ICEServer, 0, len(c.ICEServers)),
	}
	for _, uri := range c.ICEServers {
		rtcConfig.ICEServers = append(rtcConfig.ICEServers, webrtc.ICEServer{
			URLs: []string{uri},
		})
	}

	var se *webrtc.SettingEngine
	if c.PublicIP != "" || c.UDPPortRange.Low > 0 {
		engine := webrtc.SettingEngine{}
		if c.PublicIP != "" {
			engine.SetNAT1To1IPs([]string{c.PublicIP}, webrtc.ICECandidateTypeHost)
		}
		// Constrain the ephemeral UDP range Pion allocates for ICE candidates.
		// Note: this is a separate concern from our FFmpeg→Source UDP ports;
		// Pion uses its own port pool for the WebRTC media path.
		if c.UDPPortRange.Low > 0 && c.UDPPortRange.High >= c.UDPPortRange.Low {
			if err := engine.SetEphemeralUDPPortRange(
				uint16(c.UDPPortRange.Low), uint16(c.UDPPortRange.High)); err != nil {
				return webrtc.Configuration{}, nil, err
			}
		}
		se = &engine
	}

	return rtcConfig, se, nil
}
  • Step 4: Run the tests and verify they pass

Run:

go test ./core/webrtc/ -run TestBuildICEConfig -v

Expected: PASS for all TestBuildICEConfig subtests.

  • Step 5: Commit
git add core/webrtc/ice.go core/webrtc/ice_test.go
git commit -m "feat(webrtc): add ICE config helper (Configuration + SettingEngine)"

Task 7: Peer — PeerConnection factory + track attachment

Files:

  • Create: core/webrtc/peer.go
  • Create: core/webrtc/peer_test.go

Creates a Pion PeerConnection, adds video (H.264) and audio (Opus) TrackLocalStaticRTP, negotiates SDP, and starts a goroutine that forwards packets from a Source subscription into those tracks.

  • Step 1: Write the failing test core/webrtc/peer_test.go
package webrtc

import (
	"context"
	"testing"
	"time"

	"github.com/pion/webrtc/v4"
)

// minimalOfferSDP returns an SDP offer that advertises H.264 (video) and
// Opus (audio) as recvonly — the minimum a WHEP client sends.
func minimalOfferSDP(t *testing.T) webrtc.SessionDescription {
	t.Helper()
	// Create a throwaway PC to generate a valid offer.
	me := &webrtc.MediaEngine{}
	if err := me.RegisterDefaultCodecs(); err != nil {
		t.Fatalf("RegisterDefaultCodecs: %v", err)
	}
	api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
	pc, err := api.NewPeerConnection(webrtc.Configuration{})
	if err != nil {
		t.Fatalf("NewPeerConnection: %v", err)
	}
	defer pc.Close()

	if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
		webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
		t.Fatalf("AddTransceiver video: %v", err)
	}
	if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
		webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
		t.Fatalf("AddTransceiver audio: %v", err)
	}
	offer, err := pc.CreateOffer(nil)
	if err != nil {
		t.Fatalf("CreateOffer: %v", err)
	}
	return offer
}

func TestPeerFactory_CreateAnswer(t *testing.T) {
	src, err := NewSource("streamA", 0)
	if err != nil {
		t.Fatalf("NewSource: %v", err)
	}
	defer src.Close()
	src.Start()

	cfg := DefaultConfig()
	factory, err := NewPeerFactory(cfg)
	if err != nil {
		t.Fatalf("NewPeerFactory: %v", err)
	}

	offer := minimalOfferSDP(t)

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	peer, err := factory.CreatePeer(ctx, src, offer)
	if err != nil {
		t.Fatalf("CreatePeer: %v", err)
	}
	defer peer.Close()

	if peer.Answer().Type != webrtc.SDPTypeAnswer {
		t.Errorf("Answer().Type = %v, want answer", peer.Answer().Type)
	}
	if peer.ResourceID() == "" {
		t.Error("ResourceID should be non-empty")
	}
}

func TestPeerFactory_ClosesCleanly(t *testing.T) {
	src, _ := NewSource("streamA", 0)
	defer src.Close()
	src.Start()

	factory, _ := NewPeerFactory(DefaultConfig())
	offer := minimalOfferSDP(t)

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	peer, err := factory.CreatePeer(ctx, src, offer)
	if err != nil {
		t.Fatalf("CreatePeer: %v", err)
	}
	if err := peer.Close(); err != nil {
		t.Errorf("Close: %v", err)
	}
	// Second close should be a no-op, not panic.
	if err := peer.Close(); err != nil {
		t.Errorf("second Close: %v", err)
	}
}
  • Step 2: Run the test and verify it fails

Run:

go test ./core/webrtc/ -run TestPeerFactory -v

Expected: FAIL with "undefined: NewPeerFactory".

  • Step 3: Write the minimal implementation core/webrtc/peer.go
package webrtc

import (
	"context"
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"sync"

	"github.com/pion/rtp"
	"github.com/pion/webrtc/v4"
)

// PeerFactory builds PeerConnections from a shared Pion API instance
// configured from Config.
type PeerFactory struct {
	api       *webrtc.API
	rtcConfig webrtc.Configuration
}

// NewPeerFactory initializes a Pion API with the codec set we support
// (H.264 + Opus) and applies the provided Config.
func NewPeerFactory(c Config) (*PeerFactory, error) {
	if err := c.Validate(); err != nil {
		return nil, err
	}

	me := &webrtc.MediaEngine{}
	if err := me.RegisterDefaultCodecs(); err != nil {
		return nil, fmt.Errorf("webrtc: register default codecs: %w", err)
	}

	rtcConfig, se, err := BuildICEConfig(c)
	if err != nil {
		return nil, err
	}

	opts := []func(*webrtc.API){webrtc.WithMediaEngine(me)}
	if se != nil {
		opts = append(opts, webrtc.WithSettingEngine(*se))
	}
	api := webrtc.NewAPI(opts...)

	return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil
}

// Peer wraps a Pion PeerConnection bound to a Source's subscription.
type Peer struct {
	resourceID string
	pc         *webrtc.PeerConnection
	answer     webrtc.SessionDescription
	source     *Source
	sub        chan *rtp.Packet
	done       chan struct{}
	once       sync.Once
}

// CreatePeer builds a PeerConnection, sets the remote offer, generates an
// answer, attaches video+audio tracks fed from src, and blocks until ICE
// gathering completes or ctx expires.
func (f *PeerFactory) CreatePeer(ctx context.Context, src *Source, offer webrtc.SessionDescription) (*Peer, error) {
	pc, err := f.api.NewPeerConnection(f.rtcConfig)
	if err != nil {
		return nil, fmt.Errorf("webrtc: new peer connection: %w", err)
	}

	videoTrack, err := webrtc.NewTrackLocalStaticRTP(
		webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264},
		"video", "dragonfork")
	if err != nil {
		_ = pc.Close()
		return nil, fmt.Errorf("webrtc: new video track: %w", err)
	}
	audioTrack, err := webrtc.NewTrackLocalStaticRTP(
		webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus},
		"audio", "dragonfork")
	if err != nil {
		_ = pc.Close()
		return nil, fmt.Errorf("webrtc: new audio track: %w", err)
	}
	if _, err := pc.AddTrack(videoTrack); err != nil {
		_ = pc.Close()
		return nil, fmt.Errorf("webrtc: add video track: %w", err)
	}
	if _, err := pc.AddTrack(audioTrack); err != nil {
		_ = pc.Close()
		return nil, fmt.Errorf("webrtc: add audio track: %w", err)
	}

	if err := pc.SetRemoteDescription(offer); err != nil {
		_ = pc.Close()
		return nil, fmt.Errorf("webrtc: set remote: %w", err)
	}
	answer, err := pc.CreateAnswer(nil)
	if err != nil {
		_ = pc.Close()
		return nil, fmt.Errorf("webrtc: create answer: %w", err)
	}

	gatherComplete := webrtc.GatheringCompletePromise(pc)
	if err := pc.SetLocalDescription(answer); err != nil {
		_ = pc.Close()
		return nil, fmt.Errorf("webrtc: set local: %w", err)
	}

	select {
	case <-gatherComplete:
	case <-ctx.Done():
		_ = pc.Close()
		return nil, ErrICETimeout
	}

	sub := src.Subscribe(64)

	p := &Peer{
		resourceID: newResourceID(),
		pc:         pc,
		answer:     *pc.LocalDescription(),
		source:     src,
		sub:        sub,
		done:       make(chan struct{}),
	}

	pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) {
		if st == webrtc.PeerConnectionStateFailed ||
			st == webrtc.PeerConnectionStateDisconnected ||
			st == webrtc.PeerConnectionStateClosed {
			_ = p.Close()
		}
	})

	go forwardRTP(p.done, sub, videoTrack, audioTrack)

	return p, nil
}

// Answer returns the locally-created SDP answer. Valid after CreatePeer.
func (p *Peer) Answer() webrtc.SessionDescription { return p.answer }

// ResourceID returns the stable resource id used in the WHEP Location header.
func (p *Peer) ResourceID() string { return p.resourceID }

// Close tears down the peer connection and unsubscribes from the source.
// Safe to call multiple times.
func (p *Peer) Close() error {
	var err error
	p.once.Do(func() {
		close(p.done)
		p.source.Unsubscribe(p.sub)
		err = p.pc.Close()
	})
	return err
}

func newResourceID() string {
	b := make([]byte, 8)
	_, _ = rand.Read(b)
	return hex.EncodeToString(b)
}
  • Step 4: Create core/webrtc/forward.go with the RTP forwarder
package webrtc

import (
	"github.com/pion/rtp"
	"github.com/pion/webrtc/v4"
)

// forwardRTP reads packets from sub and writes them to the correct track
// based on payload type (H.264 → video, Opus → audio). Payload-type
// inspection is the simplest M1 approach; M2 will switch to per-track
// source channels once the process resolver manages separate video/audio
// UDP ports.
func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet,
	video, audio *webrtc.TrackLocalStaticRTP) {
	for {
		select {
		case <-done:
			return
		case pkt, ok := <-sub:
			if !ok {
				return
			}
			// Pion default H.264 PT = 102, Opus PT = 111. If the publisher
			// uses different PTs we'll revisit in M2 — for M1 PoC we
			// configure FFmpeg to these values explicitly in the publisher
			// script.
			switch pkt.PayloadType {
			case 102:
				_ = video.WriteRTP(pkt)
			case 111:
				_ = audio.WriteRTP(pkt)
			default:
				// Unknown PT — drop. Log in M3.
			}
		}
	}
}
  • Step 5: Run the tests and verify they pass

Run:

go test ./core/webrtc/ -run TestPeerFactory -v

Expected: PASS for TestPeerFactory_CreateAnswer and TestPeerFactory_ClosesCleanly.

  • Step 6: Commit
git add core/webrtc/peer.go core/webrtc/peer_test.go core/webrtc/forward.go
git commit -m "feat(webrtc): add PeerFactory, Peer, and RTP forwarder"

Task 8: WHEP HTTP handler (happy path only)

Files:

  • Create: core/webrtc/whep.go
  • Create: core/webrtc/whep_test.go

For M1, the WHEP handler supports only POST /whep/{stream_id} happy path. Error paths (404/406/503) and DELETE/PATCH come in M3.

  • Step 1: Write the failing test core/webrtc/whep_test.go
package webrtc

import (
	"context"
	"io"
	"net/http"
	"net/http/httptest"
	"strings"
	"testing"
	"time"

	"github.com/pion/webrtc/v4"
)

func TestWHEP_POSTReturns201WithSDP(t *testing.T) {
	// Set up a Source and register it.
	src, _ := NewSource("streamA", 0)
	defer src.Close()
	src.Start()

	reg := NewRegistry()
	_ = reg.Register("streamA", src)

	factory, _ := NewPeerFactory(DefaultConfig())

	handler := NewWHEPHandler(reg, factory, DefaultConfig())

	// Build an offer using a throwaway PC.
	me := &webrtc.MediaEngine{}
	_ = me.RegisterDefaultCodecs()
	api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
	pc, _ := api.NewPeerConnection(webrtc.Configuration{})
	defer pc.Close()
	_, _ = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
		webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly})
	_, _ = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
		webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly})
	offer, _ := pc.CreateOffer(nil)

	req := httptest.NewRequest(http.MethodPost, "/whep/streamA",
		strings.NewReader(offer.SDP))
	req.Header.Set("Content-Type", "application/sdp")
	// Give the handler generous ICE gathering time in tests.
	ctx, cancel := context.WithTimeout(req.Context(), 10*time.Second)
	defer cancel()
	req = req.WithContext(ctx)

	rr := httptest.NewRecorder()
	handler.ServeHTTP(rr, req)

	if rr.Code != http.StatusCreated {
		body, _ := io.ReadAll(rr.Result().Body)
		t.Fatalf("status = %d, want 201. body=%s", rr.Code, string(body))
	}
	if ct := rr.Header().Get("Content-Type"); ct != "application/sdp" {
		t.Errorf("Content-Type = %q, want application/sdp", ct)
	}
	if loc := rr.Header().Get("Location"); !strings.HasPrefix(loc, "/whep/streamA/") {
		t.Errorf("Location = %q, want /whep/streamA/<id>", loc)
	}
	if !strings.Contains(rr.Body.String(), "v=0") {
		t.Errorf("body does not look like SDP: %s", rr.Body.String())
	}
}
  • Step 2: Run the test and verify it fails

Run:

go test ./core/webrtc/ -run TestWHEP -v

Expected: FAIL with "undefined: NewWHEPHandler".

  • Step 3: Write the minimal implementation core/webrtc/whep.go
package webrtc

import (
	"io"
	"net/http"
	"strings"
	"sync"
	"sync/atomic"

	"github.com/pion/webrtc/v4"
)

// WHEPHandler serves the WebRTC-HTTP Egress Protocol POST.
type WHEPHandler struct {
	registry *Registry
	factory  *PeerFactory
	config   Config

	mu         sync.Mutex
	peers      map[string]*Peer // resourceID → Peer
	peersCount int64            // atomic, for cap check without lock
}

// NewWHEPHandler constructs a handler with the given dependencies.
func NewWHEPHandler(r *Registry, f *PeerFactory, c Config) *WHEPHandler {
	return &WHEPHandler{
		registry: r,
		factory:  f,
		config:   c,
		peers:    make(map[string]*Peer),
	}
}

// ServeHTTP handles POST /whep/{stream_id}. Other methods and paths return 405.
func (h *WHEPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		w.Header().Set("Allow", "POST")
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}

	// Extract stream_id from path: /whep/{stream_id}
	streamID := strings.TrimPrefix(r.URL.Path, "/whep/")
	if streamID == "" || strings.Contains(streamID, "/") {
		http.Error(w, "invalid stream id", http.StatusBadRequest)
		return
	}

	// Peer cap enforcement (happy path still respects the cap).
	if atomic.LoadInt64(&h.peersCount) >= int64(h.config.MaxPeersTotal) {
		http.Error(w, ErrPeerCapReached.Error(), http.StatusServiceUnavailable)
		return
	}

	handle, ok := h.registry.Lookup(streamID)
	if !ok {
		http.Error(w, ErrStreamNotFound.Error(), http.StatusNotFound)
		return
	}
	src, ok := handle.(*Source)
	if !ok {
		http.Error(w, "registered source is not a *Source", http.StatusInternalServerError)
		return
	}

	body, err := io.ReadAll(r.Body)
	if err != nil {
		http.Error(w, "read body: "+err.Error(), http.StatusBadRequest)
		return
	}
	if len(body) == 0 {
		http.Error(w, ErrInvalidSDP.Error(), http.StatusBadRequest)
		return
	}

	offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}

	peer, err := h.factory.CreatePeer(r.Context(), src, offer)
	if err != nil {
		http.Error(w, "create peer: "+err.Error(), http.StatusInternalServerError)
		return
	}

	h.mu.Lock()
	h.peers[peer.ResourceID()] = peer
	h.mu.Unlock()
	atomic.AddInt64(&h.peersCount, 1)

	w.Header().Set("Content-Type", "application/sdp")
	w.Header().Set("Location", "/whep/"+streamID+"/"+peer.ResourceID())
	w.WriteHeader(http.StatusCreated)
	_, _ = io.WriteString(w, peer.Answer().SDP)
}
  • Step 4: Run the tests and verify they pass

Run:

go test ./core/webrtc/ -run TestWHEP -v

Expected: PASS for TestWHEP_POSTReturns201WithSDP.

  • Step 5: Run the full package test suite

Run:

go test ./core/webrtc/... -v -race

Expected: ALL PASS. No race warnings.

  • Step 6: Commit
git add core/webrtc/whep.go core/webrtc/whep_test.go
git commit -m "feat(webrtc): add WHEP POST handler (happy path)"

Task 9: Standalone PoC binary

Files:

  • Create: cmd/webrtc-poc/main.go

The PoC binary wires everything together: creates a Source, registers it, starts the WHEP handler, and blocks. M2 replaces this with integration into datarhei Core's normal startup.

  • Step 1: Write cmd/webrtc-poc/main.go
// Command webrtc-poc runs a minimal Dragon Fork WebRTC egress server for
// manual end-to-end testing. It listens for RTP on 127.0.0.1:10000 as
// stream "test" and serves WHEP at :8787.
//
// This is NOT part of the datarhei Core binary. It will be removed or
// demoted to an internal test helper once milestone M2 lands.
package main

import (
	"flag"
	"log"
	"net/http"

	"<FORK_MODULE_PATH>/core/webrtc"
)

func main() {
	var (
		streamID = flag.String("stream", "test", "stream id to serve")
		rtpPort  = flag.Int("rtp-port", 10000, "UDP port to receive RTP on")
		listen   = flag.String("listen", ":8787", "WHEP HTTP listen address")
		publicIP = flag.String("public-ip", "", "server public IP for NAT1To1 (optional)")
	)
	flag.Parse()

	cfg := webrtc.DefaultConfig()
	cfg.WHEPListen = *listen
	cfg.PublicIP = *publicIP

	src, err := webrtc.NewSource(*streamID, *rtpPort)
	if err != nil {
		log.Fatalf("NewSource: %v", err)
	}
	src.Start()
	defer src.Close()
	log.Printf("listening for RTP on %s", src.LocalAddr())

	reg := webrtc.NewRegistry()
	if err := reg.Register(*streamID, src); err != nil {
		log.Fatalf("Register: %v", err)
	}

	factory, err := webrtc.NewPeerFactory(cfg)
	if err != nil {
		log.Fatalf("NewPeerFactory: %v", err)
	}

	handler := webrtc.NewWHEPHandler(reg, factory, cfg)

	mux := http.NewServeMux()
	mux.Handle("/whep/", handler)

	log.Printf("WHEP listening on %s — POST /whep/%s to subscribe", *listen, *streamID)
	log.Fatal(http.ListenAndServe(*listen, mux))
}
  • Step 2: Substitute the real module path

Open the file and replace <FORK_MODULE_PATH> with the actual module path from your fork's go.mod first line. For example, if go.mod starts with module github.com/wilddragon/datarhei-dragonfork-core, the import becomes:

import "github.com/wilddragon/datarhei-dragonfork-core/core/webrtc"

Do this once by editing the file — do not commit the placeholder string.

  • Step 3: Build the binary

Run:

go build -o /tmp/webrtc-poc ./cmd/webrtc-poc

Expected: binary produced at /tmp/webrtc-poc, no build errors.

  • Step 4: Smoke-run it

Run:

/tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787 &
sleep 1
curl -sS -X POST -H 'Content-Type: application/sdp' \
    --data 'v=0' http://127.0.0.1:8787/whep/test

Expected: curl returns a response (will likely be 400 or 500 because the body isn't a real offer — that's fine; the important thing is the server is up and routing the request). The server log shows the POST arrived.

Kill the server:

kill %1
wait 2>/dev/null
  • Step 5: Commit
git add cmd/webrtc-poc/main.go
git commit -m "feat(webrtc): add standalone webrtc-poc binary for M1 testing"

Task 10: FFmpeg publisher script

Files:

  • Create: test/publish.sh

A shell script that runs FFmpeg to generate a test pattern and push it as RTP to the PoC binary's port. Uses testsrc2 with a burned-in timecode (useful later for latency measurement).

  • Step 1: Create test/publish.sh
#!/usr/bin/env bash
# test/publish.sh — Dragon Fork M1 publisher
#
# Pushes an FFmpeg testsrc2 pattern as H.264 + Opus RTP to the webrtc-poc
# binary's local UDP port(s). Requires FFmpeg 6.x.
set -euo pipefail

HOST="${HOST:-127.0.0.1}"
VIDEO_PORT="${VIDEO_PORT:-10000}"
AUDIO_PORT="${AUDIO_PORT:-10002}"
FPS="${FPS:-30}"
SIZE="${SIZE:-640x360}"

echo "publishing testsrc2 → $HOST:$VIDEO_PORT (video, PT=102)"
echo "                     $HOST:$AUDIO_PORT (audio, PT=111)"

exec ffmpeg -hide_banner -re \
    -f lavfi -i "testsrc2=size=${SIZE}:rate=${FPS}" \
    -f lavfi -i "sine=frequency=440:sample_rate=48000" \
    -vf "drawtext=text='%{localtime\\:%H\\\\\\:%M\\\\\\:%S.%3N}':x=10:y=10:fontsize=32:fontcolor=white:box=1:boxcolor=black@0.8" \
    -c:v libx264 -preset ultrafast -tune zerolatency \
    -profile:v baseline -pix_fmt yuv420p \
    -b:v 1500k -maxrate 1500k -bufsize 500k \
    -g 60 -keyint_min 60 -x264-params "repeat-headers=1" \
    -payload_type 102 -f rtp "rtp://${HOST}:${VIDEO_PORT}?pkt_size=1200" \
    -c:a libopus -b:a 128k \
    -payload_type 111 -f rtp "rtp://${HOST}:${AUDIO_PORT}?pkt_size=1200"
  • Step 2: Make it executable

Run:

chmod +x test/publish.sh
  • Step 3: Smoke-test it against the PoC binary

In one terminal:

/tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787

In another (will fail until we bind both ports — M1 uses a single source port; the script pushes audio to 10002 but the PoC only listens on 10000 — that's fine for M1 since we're primarily testing video):

./test/publish.sh

Expected: FFmpeg runs and prints RTP packet stats. The PoC binary logs nothing specifically (it's silently reading RTP — we haven't added verbose logging in M1). Let it run 3 seconds, then Ctrl+C both.

(In M2 we'll separate video and audio into distinct sources; for M1 the single-port video-only path is enough to prove the pipeline.)

  • Step 4: Commit
git add test/publish.sh
git commit -m "test: add FFmpeg publisher script for M1 PoC"

Task 11: Pion-based test WHEP client

Files:

  • Create: test/whep-client/main.go
  • Create: test/whep-client/main_test.go

A Go binary that acts as a headless WHEP client: POSTs an SDP offer, parses the SDP answer, establishes a PeerConnection, and verifies that RTP packets arrive. Essential for automated end-to-end verification without a browser in the loop.

  • Step 1: Create test/whep-client/main.go
// Command whep-client subscribes to a Dragon Fork WHEP endpoint and logs
// the first N received RTP packets, then exits. Used for M1 end-to-end
// verification.
package main

import (
	"bytes"
	"context"
	"flag"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"sync/atomic"
	"time"

	"github.com/pion/webrtc/v4"
)

func main() {
	var (
		whepURL = flag.String("url", "http://127.0.0.1:8787/whep/test", "WHEP endpoint URL")
		wantPkt = flag.Int("pkts", 30, "exit after receiving this many video RTP packets")
		timeout = flag.Duration("timeout", 15*time.Second, "overall timeout")
	)
	flag.Parse()

	if err := run(*whepURL, *wantPkt, *timeout); err != nil {
		log.Fatalf("whep-client: %v", err)
	}
}

func run(whepURL string, wantPkt int, timeout time.Duration) error {
	me := &webrtc.MediaEngine{}
	if err := me.RegisterDefaultCodecs(); err != nil {
		return fmt.Errorf("register codecs: %w", err)
	}
	api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
	pc, err := api.NewPeerConnection(webrtc.Configuration{})
	if err != nil {
		return fmt.Errorf("new pc: %w", err)
	}
	defer pc.Close()

	if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
		webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
		return err
	}
	if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
		webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
		return err
	}

	var videoCount int64
	pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
		kind := track.Kind().String()
		log.Printf("OnTrack: kind=%s codec=%s", kind, track.Codec().MimeType)
		go func() {
			buf := make([]byte, 1500)
			for {
				n, _, err := track.Read(buf)
				if err != nil {
					log.Printf("track.Read (%s): %v", kind, err)
					return
				}
				if kind == "video" {
					atomic.AddInt64(&videoCount, 1)
				}
				_ = n
			}
		}()
	})

	offer, err := pc.CreateOffer(nil)
	if err != nil {
		return err
	}
	gatherComplete := webrtc.GatheringCompletePromise(pc)
	if err := pc.SetLocalDescription(offer); err != nil {
		return err
	}
	<-gatherComplete
	offer = *pc.LocalDescription()

	answerSDP, err := httpPostSDP(whepURL, offer.SDP)
	if err != nil {
		return fmt.Errorf("WHEP POST: %w", err)
	}

	if err := pc.SetRemoteDescription(webrtc.SessionDescription{
		Type: webrtc.SDPTypeAnswer, SDP: answerSDP,
	}); err != nil {
		return fmt.Errorf("set remote: %w", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	deadline := time.After(timeout)
	tick := time.NewTicker(250 * time.Millisecond)
	defer tick.Stop()
	for {
		select {
		case <-tick.C:
			if atomic.LoadInt64(&videoCount) >= int64(wantPkt) {
				log.Printf("OK: received %d video packets", videoCount)
				fmt.Fprintln(os.Stdout, "PASS")
				return nil
			}
		case <-deadline:
			return fmt.Errorf("timeout after %s: only %d video packets received",
				timeout, atomic.LoadInt64(&videoCount))
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

func httpPostSDP(url, sdp string) (string, error) {
	req, err := http.NewRequest(http.MethodPost, url, bytes.NewBufferString(sdp))
	if err != nil {
		return "", err
	}
	req.Header.Set("Content-Type", "application/sdp")
	req.Header.Set("Accept", "application/sdp")

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return "", err
	}
	defer resp.Body.Close()

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		return "", err
	}
	if resp.StatusCode != http.StatusCreated {
		return "", fmt.Errorf("status %d: %s", resp.StatusCode, string(body))
	}
	return string(body), nil
}
  • Step 2: Create test/whep-client/main_test.go
package main

import (
	"strings"
	"testing"
)

// Placeholder unit test. The real validation happens end-to-end in Task 12.
// This at least keeps `go test ./test/whep-client/...` happy in CI later.
func TestHTTPPostSDP_RejectsNon2xx(t *testing.T) {
	_, err := httpPostSDP("http://127.0.0.1:1/whep/none", "v=0\n")
	if err == nil {
		t.Fatal("expected error from unreachable endpoint")
	}
	if !strings.Contains(err.Error(), "dial") && !strings.Contains(err.Error(), "connect") {
		t.Errorf("expected dial/connect error, got: %v", err)
	}
}
  • Step 3: Build and test

Run:

go build -o /tmp/whep-client ./test/whep-client
go test ./test/whep-client/... -v

Expected: binary builds; TestHTTPPostSDP_RejectsNon2xx passes.

  • Step 4: Commit
git add test/whep-client/
git commit -m "test: add Pion-based WHEP client for end-to-end M1 verification"

Task 12: End-to-end PoC verification

Files: None (runtime verification only)

This is the moment of truth — FFmpeg → webrtc-poc → whep-client, all the way through, with the whep-client confirming it decoded video.

  • Step 1: Open three terminals

Terminal A — the PoC server. Terminal B — the FFmpeg publisher. Terminal C — the test WHEP client.

  • Step 2: Start the PoC server (Terminal A)

Run:

/tmp/webrtc-poc -stream test -rtp-port 10000 -listen :8787

Expected: server logs listening for RTP on 127.0.0.1:10000 and WHEP listening on :8787 — POST /whep/test to subscribe. Leave running.

  • Step 3: Start the FFmpeg publisher (Terminal B)

Run:

./test/publish.sh

Expected: FFmpeg begins producing frames and reporting throughput (something like frame= 30 fps=30 q=...). Leave running.

  • Step 4: Run the WHEP test client (Terminal C)

Run:

/tmp/whep-client -url http://127.0.0.1:8787/whep/test -pkts 30 -timeout 15s

Expected output within a few seconds:

OnTrack: kind=video codec=video/H264
OK: received 30 video packets
PASS

The client exits 0. This is M1's success criterion.

  • Step 5: If it fails, systematic debug

If the client times out:

  1. Confirm the FFmpeg publisher is actually producing RTP: run tcpdump -i lo -n udp port 10000 in a fourth terminal — you should see packets. If not, the publisher is broken; re-check payload types and destination port in publish.sh.
  2. Confirm the PoC server is receiving RTP: temporarily add a log line in source.go readLoop (log.Printf("rtp seq=%d", pkt.SequenceNumber)) and rebuild. If nothing logs, the UDP bind is wrong.
  3. Confirm the PeerConnection is establishing: check the WHEP POST response was 201; check the client's pc.ConnectionState() transitions through connecting to connected.
  4. Confirm codec payload types match: the publish script uses PT 102 for H.264 and 111 for Opus; Pion's default H.264 PT is 102 and Opus is 111. If you see RTP arriving in the reader but forwardRTP drops them, the PTs don't match — add a log line in forward.go to confirm.

Log each issue and fix in NOTES.md as you go — those are exactly the gotchas we want recorded for M2+.

  • Step 6: Tear down

Ctrl+C in all three terminals.

  • Step 7: Document M1 success in NOTES.md

Append to NOTES.md:

## M1 PoC verified — <DATE>

FFmpeg (testsrc2 + drawtext timecode, H.264 baseline, Opus) → webrtc-poc → Pion WHEP client.

- Video PT: 102 / Audio PT: 111 (matched between publisher and client)
- ICE gathering completed within ~<N>s
- First video packet received ~<N>s after POST (bounded by 2s GOP once we add forced keyframes in M2)
- <Any gotchas encountered, with fixes>

Ready for M2 (datarhei process-model integration).

Run:

git add NOTES.md
git commit -m "docs: record M1 PoC success and observations"
  • Step 8: Push the branch

Run:

git push -u origin m1-webrtc-poc

Expected: branch pushed to your fork's remote. M1 complete.


Exit Criteria

M1 is done when all of the following are true:

  1. go build ./... succeeds on the fork.
  2. go test ./core/webrtc/... -race passes — all unit tests green, no race warnings.
  3. test/whep-client/main.go prints PASS against a running webrtc-poc + test/publish.sh pair within 15 seconds.
  4. NOTES.md records the verification run and any gotchas encountered.
  5. Branch m1-webrtc-poc is pushed to the fork remote.

What comes next

  • M2 plan will be written after M1 is verified. It covers: adding the webrtc:// URL scheme to datarhei Core's output resolver, wiring the Registry and WHEPHandler into Core's normal startup, separating video/audio onto distinct UDP ports, and hooking into the process lifecycle so sources register/deregister automatically with the existing FFmpeg processes datarhei already manages.
  • M3 plan will cover multi-viewer robustness: DELETE/PATCH WHEP methods, error-path HTTP codes (404/406/503), the admin API endpoints, and PLI absorption + RTCP BYE on teardown.
  • M4 plan will cover CI integration: running the end-to-end harness from Task 12 in CI, the pixel-sampling latency measurement, and the p95 gates.
  • M5 plan will cover branding and release: logo swap in the Restreamer Vue UI, README rewrite with upstream attribution, NOTICE/CREDITS, Docker image publishing, and tagging v0.1.0-dragonfork.

Each follow-on plan is small enough (~37 days of work) that writing it after M1 lets us incorporate lessons learned without re-planning from the top.