docs(m2): implementation plan
This commit is contained in:
parent
86bae816c1
commit
c38036de94
1 changed files with 839 additions and 0 deletions
839
docs/superpowers/plans/2026-04-17-m2-webrtc-core-integration.md
Normal file
839
docs/superpowers/plans/2026-04-17-m2-webrtc-core-integration.md
Normal file
|
|
@ -0,0 +1,839 @@
|
|||
# M2 — WebRTC into datarhei Core proper — Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Wire the M1 `core/webrtc` package into the datarhei Core binary as a first-class output, served via WHEP under `/api/v3/process/{id}/whep`, with an eagerly bound `Source` per WebRTC-enabled process.
|
||||
|
||||
**Architecture:** New `app/webrtc` sibling subsystem that hooks into restream's process lifecycle. Two small additions to restream (`ProcessHooks` callbacks + `AppendOutput` method). Reuses the untouched M1 `core/webrtc` package. UI lives in a separate core-ui repo and is deferred to a sibling plan.
|
||||
|
||||
**Tech Stack:** Go 1.24, Pion WebRTC v4 (via `core/webrtc` from M1), Echo v4 HTTP router, existing datarhei Core subsystem pattern.
|
||||
|
||||
**Spec:** `docs/design/2026-04-17-datarhei-dragon-fork-m2-webrtc-core-integration.md`
|
||||
|
||||
**Branch:** `m2-webrtc-core-integration` (already created from `m1-webrtc-poc`).
|
||||
|
||||
---
|
||||
|
||||
## File Structure
|
||||
|
||||
**New files:**
|
||||
- `app/webrtc/portalloc.go` + `portalloc_test.go` — ephemeral UDP port allocation
|
||||
- `app/webrtc/ffmpeg_args.go` + `ffmpeg_args_test.go` — builds `-f rtp …` output fragments
|
||||
- `app/webrtc/lifecycle.go` + `lifecycle_test.go` — `OnStart`/`OnStop` hook bodies
|
||||
- `app/webrtc/subsystem.go` + `subsystem_test.go` — `WebRTC` struct; `Start`/`Stop`
|
||||
- `app/webrtc/handler.go` + `handler_test.go` — WHEP HTTP handler
|
||||
- `core/webrtc/registry.go` already exists — no changes.
|
||||
|
||||
**Modified files:**
|
||||
- `restream/app/process.go` — add `ConfigWebRTC` type and `WebRTC` field on `Config`. Update `Clone()` and `CreateCommand()`.
|
||||
- `restream/restream.go` — add `ProcessHooks` and `AppendOutput`.
|
||||
- `config/data.go` — add `WebRTC` block on `Data` struct.
|
||||
- `config/config.go` — `vars.Register` entries for WebRTC fields.
|
||||
- `app/api/api.go` — instantiate the WebRTC subsystem alongside restream.
|
||||
- `http/server.go` — mount `/whep` routes under existing `/api/v3` group.
|
||||
|
||||
---
|
||||
|
||||
## Task 1 — `ConfigWebRTC` on restream's `Config`
|
||||
|
||||
**Files:**
|
||||
- Modify: `restream/app/process.go`
|
||||
|
||||
- [ ] **Step 1.1 — Add `ConfigWebRTC` type + field**
|
||||
|
||||
Append after `ConfigIO` definition (~line 34), add field to `Config`:
|
||||
|
||||
```go
|
||||
type ConfigWebRTC struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
VideoPT uint8 `json:"video_pt"`
|
||||
AudioPT uint8 `json:"audio_pt"`
|
||||
ForceTranscode bool `json:"force_transcode"`
|
||||
}
|
||||
|
||||
func (w ConfigWebRTC) Clone() ConfigWebRTC { return w }
|
||||
```
|
||||
|
||||
Add to `Config` struct:
|
||||
```go
|
||||
WebRTC ConfigWebRTC `json:"webrtc"`
|
||||
```
|
||||
|
||||
- [ ] **Step 1.2 — Update `Config.Clone()` to carry WebRTC**
|
||||
|
||||
```go
|
||||
clone.WebRTC = config.WebRTC.Clone()
|
||||
```
|
||||
|
||||
- [ ] **Step 1.3 — Verify build**
|
||||
|
||||
Run: `go build ./restream/...`
|
||||
Expected: no errors.
|
||||
|
||||
- [ ] **Step 1.4 — Commit**
|
||||
|
||||
```bash
|
||||
git add restream/app/process.go
|
||||
git commit -m "feat(restream): add ConfigWebRTC per-process field"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 2 — `DataWebRTC` on global config
|
||||
|
||||
**Files:**
|
||||
- Modify: `config/data.go`
|
||||
- Modify: `config/config.go`
|
||||
|
||||
- [ ] **Step 2.1 — Add `WebRTC` block to `Data`**
|
||||
|
||||
In `config/data.go`, following the pattern of `SRT`/`FFmpeg` blocks, add near the similar service blocks:
|
||||
|
||||
```go
|
||||
WebRTC struct {
|
||||
Enable bool `json:"enable"`
|
||||
PublicIP string `json:"public_ip"`
|
||||
NAT1To1IPs []string `json:"nat_1_to_1_ips"`
|
||||
UDPMuxPort int `json:"udp_mux_port"`
|
||||
} `json:"webrtc"`
|
||||
```
|
||||
|
||||
- [ ] **Step 2.2 — Register vars**
|
||||
|
||||
In `config/config.go`, at the end of the `vars.Register` block, add:
|
||||
|
||||
```go
|
||||
d.vars.Register(value.NewBool(&d.WebRTC.Enable, false), "webrtc.enable", "CORE_WEBRTC_ENABLE", nil, "Enable WebRTC egress subsystem", false, false)
|
||||
d.vars.Register(value.NewString(&d.WebRTC.PublicIP, ""), "webrtc.public_ip", "CORE_WEBRTC_PUBLIC_IP", nil, "ICE NAT1To1 host candidate IP", false, false)
|
||||
d.vars.Register(value.NewStringList(&d.WebRTC.NAT1To1IPs, []string{}, " "), "webrtc.nat_1_to_1_ips", "CORE_WEBRTC_NAT_1_TO_1_IPS", nil, "Advanced: multiple NAT1To1 IPs", false, false)
|
||||
d.vars.Register(value.NewInt(&d.WebRTC.UDPMuxPort, 0), "webrtc.udp_mux_port", "CORE_WEBRTC_UDP_MUX_PORT", nil, "Single UDP port for all ICE traffic (0 = ephemeral)", false, false)
|
||||
```
|
||||
|
||||
(If the project uses a different `vars.Register` signature, match the neighbors.)
|
||||
|
||||
- [ ] **Step 2.3 — Verify build and commit**
|
||||
|
||||
```bash
|
||||
go build ./config/...
|
||||
git add config/data.go config/config.go
|
||||
git commit -m "feat(config): add webrtc global config block"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 3 — `ProcessHooks` + `AppendOutput` on restream
|
||||
|
||||
**Files:**
|
||||
- Modify: `restream/restream.go`
|
||||
|
||||
- [ ] **Step 3.1 — Add `ProcessHook`, `ProcessHooks` types and field on restream struct**
|
||||
|
||||
Near the top (after imports, in the types region):
|
||||
|
||||
```go
|
||||
// ProcessHook is called at well-defined points in a process's lifecycle.
|
||||
// Return a non-nil error to abort the start (OnStart only; OnStop errors
|
||||
// are logged and otherwise ignored).
|
||||
type ProcessHook func(id string, cfg *app.Config) error
|
||||
|
||||
// ProcessHooks carries optional lifecycle callbacks for restream to invoke.
|
||||
// A nil hook is a no-op.
|
||||
type ProcessHooks struct {
|
||||
OnStart ProcessHook // fires after args are assembled, before exec
|
||||
OnStop ProcessHook // fires after wait() returns
|
||||
}
|
||||
```
|
||||
|
||||
Add a field to the `restream` struct:
|
||||
```go
|
||||
hooks ProcessHooks
|
||||
```
|
||||
|
||||
Add a `SetHooks` method:
|
||||
```go
|
||||
func (r *restream) SetHooks(h ProcessHooks) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.hooks = h
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3.2 — Wire OnStart / OnStop into the task lifecycle**
|
||||
|
||||
Find the `startProcess` / `ffmpeg.Start()` call site (~line 1065 per survey). Before the `Start()` call, insert:
|
||||
|
||||
```go
|
||||
if r.hooks.OnStart != nil {
|
||||
if err := r.hooks.OnStart(task.id, task.config); err != nil {
|
||||
r.logger.WithField("id", task.id).WithError(err).Error().Log("OnStart hook aborted process start")
|
||||
return err
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Find `stopProcess` / `ffmpeg.Stop()` (~line 1094). After the stop completes, add:
|
||||
|
||||
```go
|
||||
if r.hooks.OnStop != nil {
|
||||
if err := r.hooks.OnStop(task.id, task.config); err != nil {
|
||||
r.logger.WithField("id", task.id).WithError(err).Warn().Log("OnStop hook returned error")
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3.3 — `AppendOutput`**
|
||||
|
||||
Add:
|
||||
|
||||
```go
|
||||
// AppendOutput appends extra FFmpeg args to a process's pending command.
|
||||
// Only valid during OnStart (between hook fire and exec). Returns an
|
||||
// error otherwise.
|
||||
func (r *restream) AppendOutput(id string, extra []string) error {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
t, ok := r.tasks[id]
|
||||
if !ok {
|
||||
return fmt.Errorf("restream: no such process %q", id)
|
||||
}
|
||||
if t.config == nil {
|
||||
return fmt.Errorf("restream: process %q has no config", id)
|
||||
}
|
||||
// Append to the free-form Options slice on a synthetic ConfigIO so
|
||||
// CreateCommand picks it up. We model this as an extra Output with
|
||||
// empty Address — address is carried inside extra itself.
|
||||
t.config.Output = append(t.config.Output, app.ConfigIO{
|
||||
ID: "webrtc",
|
||||
Options: extra,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
Note: callers build `extra` so that the last element is the UDP address; the appended `ConfigIO` has empty `Address` so `CreateCommand` won't double-append. Instead, fix `CreateCommand` to support this — or (cleaner) pass the address as the last entry of `Options` and set the inserted `ConfigIO.Address` to that last entry, dropping it from `Options`. Concretely:
|
||||
|
||||
```go
|
||||
func (r *restream) AppendOutput(id string, extra []string) error {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
t, ok := r.tasks[id]
|
||||
if !ok {
|
||||
return fmt.Errorf("restream: no such process %q", id)
|
||||
}
|
||||
if t.config == nil || len(extra) == 0 {
|
||||
return fmt.Errorf("restream: append-output invalid args")
|
||||
}
|
||||
opts, addr := extra[:len(extra)-1], extra[len(extra)-1]
|
||||
t.config.Output = append(t.config.Output, app.ConfigIO{
|
||||
ID: "webrtc",
|
||||
Address: addr,
|
||||
Options: append([]string{}, opts...),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3.4 — Verify build and commit**
|
||||
|
||||
```bash
|
||||
go build ./restream/...
|
||||
git add restream/restream.go
|
||||
git commit -m "feat(restream): add ProcessHooks and AppendOutput"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 4 — `app/webrtc/portalloc.go` (TDD)
|
||||
|
||||
**Files:**
|
||||
- Create: `app/webrtc/portalloc.go`
|
||||
- Create: `app/webrtc/portalloc_test.go`
|
||||
|
||||
- [ ] **Step 4.1 — Write failing test**
|
||||
|
||||
```go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAlloc_ReturnsPortBindable(t *testing.T) {
|
||||
for i := 0; i < 100; i++ {
|
||||
p, err := Alloc()
|
||||
if err != nil {
|
||||
t.Fatalf("Alloc: %v", err)
|
||||
}
|
||||
c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127,0,0,1), Port: p})
|
||||
if err != nil {
|
||||
t.Fatalf("iter %d: rebind %d: %v", i, p, err)
|
||||
}
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAlloc_Nonzero(t *testing.T) {
|
||||
p, err := Alloc()
|
||||
if err != nil { t.Fatal(err) }
|
||||
if p == 0 { t.Fatal("expected non-zero port") }
|
||||
fmt.Sprintf("%d", p)
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4.2 — Run test (should fail to compile)**
|
||||
|
||||
```bash
|
||||
go test ./app/webrtc/ -run TestAlloc -race
|
||||
```
|
||||
|
||||
- [ ] **Step 4.3 — Implement**
|
||||
|
||||
```go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
// Alloc binds :0 on loopback UDPv4, records the assigned port, closes the
|
||||
// socket, and returns the port. Callers must re-bind immediately; if the
|
||||
// port is taken in the gap (rare), the rebind will fail and the caller
|
||||
// should propagate that error.
|
||||
func Alloc() (int, error) {
|
||||
c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127,0,0,1), Port: 0})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("webrtc portalloc: %w", err)
|
||||
}
|
||||
defer c.Close()
|
||||
return c.LocalAddr().(*net.UDPAddr).Port, nil
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4.4 — Run, commit**
|
||||
|
||||
```bash
|
||||
go test ./app/webrtc/ -race
|
||||
git add app/webrtc/portalloc.go app/webrtc/portalloc_test.go
|
||||
git commit -m "feat(app/webrtc): ephemeral loopback UDP port allocator"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 5 — `app/webrtc/ffmpeg_args.go` (TDD)
|
||||
|
||||
**Files:**
|
||||
- Create: `app/webrtc/ffmpeg_args.go`
|
||||
- Create: `app/webrtc/ffmpeg_args_test.go`
|
||||
|
||||
- [ ] **Step 5.1 — Write failing test**
|
||||
|
||||
```go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
func TestBuildArgs_CopyCodecs(t *testing.T) {
|
||||
cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}
|
||||
got := BuildArgs(cfg, 49200)
|
||||
want := []string{
|
||||
"-map", "0:v:0", "-c:v", "copy", "-payload_type", "102", "-f", "rtp",
|
||||
"udp://127.0.0.1:49200?pkt_size=1316",
|
||||
"-map", "0:a:0", "-c:a", "copy", "-payload_type", "111", "-f", "rtp",
|
||||
"udp://127.0.0.1:49201?pkt_size=1316",
|
||||
}
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("BuildArgs mismatch\ngot: %v\nwant: %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildArgs_ForceTranscode(t *testing.T) {
|
||||
cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111, ForceTranscode: true}
|
||||
got := BuildArgs(cfg, 49200)
|
||||
// video leg should include -c:v libx264 / profile=baseline
|
||||
if !containsSeq(got, []string{"-c:v", "libx264"}) {
|
||||
t.Fatalf("expected -c:v libx264, got %v", got)
|
||||
}
|
||||
if !containsSeq(got, []string{"-c:a", "libopus"}) {
|
||||
t.Fatalf("expected -c:a libopus, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func containsSeq(haystack, needle []string) bool {
|
||||
for i := 0; i+len(needle) <= len(haystack); i++ {
|
||||
match := true
|
||||
for j := range needle {
|
||||
if haystack[i+j] != needle[j] { match = false; break }
|
||||
}
|
||||
if match { return true }
|
||||
}
|
||||
return false
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 5.2 — Implement**
|
||||
|
||||
```go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
// BuildArgs returns the FFmpeg output-leg args for a WebRTC-enabled
|
||||
// process. The caller passes a video RTP port; audio uses port+1.
|
||||
// The returned slice is designed for restream.AppendOutput — the final
|
||||
// element is the UDP address, the rest are options.
|
||||
//
|
||||
// We emit two separate outputs (one per track) so that -payload_type
|
||||
// applies correctly to each. This produces *two* calls' worth of args
|
||||
// but AppendOutput currently handles one output at a time. Callers
|
||||
// should split on the boundary (the second `-map` token).
|
||||
func BuildArgs(cfg appcfg.ConfigWebRTC, videoPort int) []string {
|
||||
vcopy := []string{"-c:v", "copy"}
|
||||
acopy := []string{"-c:a", "copy"}
|
||||
if cfg.ForceTranscode {
|
||||
vcopy = []string{
|
||||
"-c:v", "libx264",
|
||||
"-preset", "veryfast",
|
||||
"-profile:v", "baseline",
|
||||
"-pix_fmt", "yuv420p",
|
||||
"-tune", "zerolatency",
|
||||
"-g", "60",
|
||||
}
|
||||
acopy = []string{"-c:a", "libopus", "-b:a", "96k"}
|
||||
}
|
||||
|
||||
args := []string{"-map", "0:v:0"}
|
||||
args = append(args, vcopy...)
|
||||
args = append(args, "-payload_type", fmt.Sprint(cfg.VideoPT), "-f", "rtp",
|
||||
fmt.Sprintf("udp://127.0.0.1:%d?pkt_size=1316", videoPort))
|
||||
|
||||
args = append(args, "-map", "0:a:0")
|
||||
args = append(args, acopy...)
|
||||
args = append(args, "-payload_type", fmt.Sprint(cfg.AudioPT), "-f", "rtp",
|
||||
fmt.Sprintf("udp://127.0.0.1:%d?pkt_size=1316", videoPort+1))
|
||||
|
||||
return args
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 5.3 — Run, commit**
|
||||
|
||||
```bash
|
||||
go test ./app/webrtc/ -race
|
||||
git add app/webrtc/ffmpeg_args.go app/webrtc/ffmpeg_args_test.go
|
||||
git commit -m "feat(app/webrtc): FFmpeg RTP output arg builder"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 6 — `app/webrtc/subsystem.go` + `lifecycle.go` (TDD)
|
||||
|
||||
**Files:**
|
||||
- Create: `app/webrtc/subsystem.go`, `subsystem_test.go`
|
||||
- Create: `app/webrtc/lifecycle.go`, `lifecycle_test.go`
|
||||
|
||||
- [ ] **Step 6.1 — Subsystem skeleton with dependency interface**
|
||||
|
||||
Because restream is a large package, define the dependency as an interface the subsystem needs:
|
||||
|
||||
```go
|
||||
// app/webrtc/subsystem.go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
core "github.com/datarhei/core/v16/core/webrtc"
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
type Restreamer interface {
|
||||
SetHooks(ProcessHooks)
|
||||
AppendOutput(id string, extra []string) error
|
||||
}
|
||||
|
||||
type ProcessHook func(id string, cfg *appcfg.Config) error
|
||||
|
||||
type ProcessHooks struct {
|
||||
OnStart ProcessHook
|
||||
OnStop ProcessHook
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
PublicIP string
|
||||
NAT1To1IPs []string
|
||||
}
|
||||
|
||||
type Subsystem struct {
|
||||
cfg Config
|
||||
restream Restreamer
|
||||
registry *core.Registry
|
||||
factory *core.PeerFactory
|
||||
|
||||
mu sync.Mutex
|
||||
peers map[string]map[string]*core.Peer // processID -> peerID -> peer
|
||||
started bool
|
||||
}
|
||||
|
||||
func New(cfg Config, r Restreamer) (*Subsystem, error) {
|
||||
ccfg := core.DefaultConfig()
|
||||
ccfg.PublicIP = cfg.PublicIP
|
||||
ccfg.NAT1To1IPs = cfg.NAT1To1IPs
|
||||
f, err := core.NewPeerFactory(ccfg)
|
||||
if err != nil { return nil, err }
|
||||
return &Subsystem{
|
||||
cfg: cfg,
|
||||
restream: r,
|
||||
registry: core.NewRegistry(),
|
||||
factory: f,
|
||||
peers: make(map[string]map[string]*core.Peer),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Subsystem) Start() error {
|
||||
s.mu.Lock()
|
||||
if s.started { s.mu.Unlock(); return nil }
|
||||
s.started = true
|
||||
s.mu.Unlock()
|
||||
s.restream.SetHooks(ProcessHooks{
|
||||
OnStart: s.onProcessStart,
|
||||
OnStop: s.onProcessStop,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Subsystem) Stop() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.started = false
|
||||
s.restream.SetHooks(ProcessHooks{}) // clear
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
**Note:** There's a type mismatch: `restream.ProcessHooks` is in package `restream`, this subsystem has its own `webrtc.ProcessHooks`. In the wiring task we either (a) import `restream.ProcessHooks` in the subsystem, or (b) define an adapter. Cleanest: the subsystem imports `restream` and uses `restream.ProcessHooks`. Let me rewrite using the real type — replace the local `ProcessHook`/`ProcessHooks` with `restream.ProcessHooks`. Do that in the actual implementation; the plan keeps the outline for readability.
|
||||
|
||||
- [ ] **Step 6.2 — Lifecycle (onProcessStart / onProcessStop)**
|
||||
|
||||
```go
|
||||
// app/webrtc/lifecycle.go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
core "github.com/datarhei/core/v16/core/webrtc"
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) error {
|
||||
if cfg == nil || !cfg.WebRTC.Enabled { return nil }
|
||||
|
||||
port, err := Alloc()
|
||||
if err != nil { return fmt.Errorf("webrtc: alloc port: %w", err) }
|
||||
|
||||
args := BuildArgs(cfg.WebRTC, port)
|
||||
if err := s.restream.AppendOutput(id, args); err != nil {
|
||||
return fmt.Errorf("webrtc: append output: %w", err)
|
||||
}
|
||||
|
||||
src, err := core.NewSourceOn(id, "127.0.0.1", port)
|
||||
if err != nil { return fmt.Errorf("webrtc: bind source: %w", err) }
|
||||
src.Start()
|
||||
if err := s.registry.Register(id, src); err != nil {
|
||||
src.Close()
|
||||
return fmt.Errorf("webrtc: register source: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Subsystem) onProcessStop(id string, _ *appcfg.Config) error {
|
||||
s.mu.Lock()
|
||||
peers := s.peers[id]
|
||||
delete(s.peers, id)
|
||||
s.mu.Unlock()
|
||||
for _, p := range peers { _ = p.Close() }
|
||||
if src, ok := s.registry.Get(id); ok {
|
||||
s.registry.Remove(id)
|
||||
_ = src.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 6.3 — Lifecycle test**
|
||||
|
||||
```go
|
||||
// lifecycle_test.go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
type fakeRestream struct {
|
||||
appended map[string][]string
|
||||
}
|
||||
|
||||
func (f *fakeRestream) SetHooks(ProcessHooks) {}
|
||||
func (f *fakeRestream) AppendOutput(id string, extra []string) error {
|
||||
if f.appended == nil { f.appended = map[string][]string{} }
|
||||
f.appended[id] = extra
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestLifecycle_DisabledIsNoop(t *testing.T) {
|
||||
f := &fakeRestream{}
|
||||
s, err := New(Config{}, f)
|
||||
if err != nil { t.Fatal(err) }
|
||||
cfg := &appcfg.Config{ID: "p1", WebRTC: appcfg.ConfigWebRTC{Enabled: false}}
|
||||
if err := s.onProcessStart("p1", cfg); err != nil { t.Fatal(err) }
|
||||
if _, ok := f.appended["p1"]; ok { t.Fatal("expected no append for disabled") }
|
||||
}
|
||||
|
||||
func TestLifecycle_EnabledAppendsAndRegisters(t *testing.T) {
|
||||
f := &fakeRestream{}
|
||||
s, err := New(Config{}, f)
|
||||
if err != nil { t.Fatal(err) }
|
||||
cfg := &appcfg.Config{ID: "p2", WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}}
|
||||
if err := s.onProcessStart("p2", cfg); err != nil { t.Fatal(err) }
|
||||
if len(f.appended["p2"]) == 0 { t.Fatal("expected append") }
|
||||
if _, ok := s.registry.Get("p2"); !ok { t.Fatal("expected registered source") }
|
||||
// teardown
|
||||
if err := s.onProcessStop("p2", cfg); err != nil { t.Fatal(err) }
|
||||
if _, ok := s.registry.Get("p2"); ok { t.Fatal("expected removed") }
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 6.4 — Run, commit**
|
||||
|
||||
```bash
|
||||
go test ./app/webrtc/ -race
|
||||
git add app/webrtc/subsystem.go app/webrtc/subsystem_test.go app/webrtc/lifecycle.go app/webrtc/lifecycle_test.go
|
||||
git commit -m "feat(app/webrtc): subsystem skeleton + process lifecycle hooks"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 7 — `app/webrtc/handler.go` (WHEP HTTP)
|
||||
|
||||
**Files:**
|
||||
- Create: `app/webrtc/handler.go`, `handler_test.go`
|
||||
|
||||
- [ ] **Step 7.1 — Handler: delegate to M1's WHEP handler with process-ID lookup**
|
||||
|
||||
```go
|
||||
// handler.go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
core "github.com/datarhei/core/v16/core/webrtc"
|
||||
"github.com/labstack/echo/v4"
|
||||
)
|
||||
|
||||
// Subscribe handles POST /api/v3/process/:id/whep — look up the Source
|
||||
// for the given process, run a WHEP offer/answer cycle, and forward
|
||||
// RTP to the new peer.
|
||||
func (s *Subsystem) Subscribe(c echo.Context) error {
|
||||
id := c.Param("id")
|
||||
src, ok := s.registry.Get(id)
|
||||
if !ok {
|
||||
return echo.NewHTTPError(http.StatusNotFound, "stream not found")
|
||||
}
|
||||
// Delegate to the M1 WHEP handler — but we already have the source
|
||||
// so we call the lower-level path.
|
||||
offer, err := readBody(c)
|
||||
if err != nil { return echo.NewHTTPError(http.StatusBadRequest, err.Error()) }
|
||||
|
||||
peer, answer, err := s.factory.NewPeerFromOffer(src, offer)
|
||||
if err != nil {
|
||||
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
|
||||
}
|
||||
peerID := peer.ID()
|
||||
s.mu.Lock()
|
||||
if s.peers[id] == nil { s.peers[id] = map[string]*core.Peer{} }
|
||||
s.peers[id][peerID] = peer
|
||||
s.mu.Unlock()
|
||||
|
||||
c.Response().Header().Set("Location",
|
||||
"/api/v3/process/"+id+"/whep/"+peerID)
|
||||
return c.Blob(http.StatusCreated, "application/sdp", []byte(answer))
|
||||
}
|
||||
|
||||
// Unsubscribe handles DELETE /api/v3/process/:id/whep/:peerid.
|
||||
func (s *Subsystem) Unsubscribe(c echo.Context) error {
|
||||
id, peerID := c.Param("id"), c.Param("peerid")
|
||||
s.mu.Lock()
|
||||
peer := s.peers[id][peerID]
|
||||
delete(s.peers[id], peerID)
|
||||
s.mu.Unlock()
|
||||
if peer != nil { _ = peer.Close() }
|
||||
return c.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func readBody(c echo.Context) (string, error) {
|
||||
buf := make([]byte, 0, 8192)
|
||||
for {
|
||||
tmp := make([]byte, 4096)
|
||||
n, err := c.Request().Body.Read(tmp)
|
||||
if n > 0 { buf = append(buf, tmp[:n]...) }
|
||||
if err != nil { break }
|
||||
}
|
||||
return string(buf), nil
|
||||
}
|
||||
```
|
||||
|
||||
**Note:** If `core/webrtc.PeerFactory` doesn't expose `NewPeerFromOffer`, swap in whatever API M1 provided (`factory.NewPeer(...)` taking source+offer). If the M1 handler is higher-level, wrap it instead of reimplementing.
|
||||
|
||||
- [ ] **Step 7.2 — Handler test: 404 on unknown id**
|
||||
|
||||
```go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
)
|
||||
|
||||
func TestSubscribe_404OnUnknown(t *testing.T) {
|
||||
f := &fakeRestream{}
|
||||
s, _ := New(Config{}, f)
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(""))
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id"); c.SetParamValues("missing")
|
||||
err := s.Subscribe(c)
|
||||
if he, ok := err.(*echo.HTTPError); !ok || he.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnsubscribe_IdempotentNoContent(t *testing.T) {
|
||||
f := &fakeRestream{}
|
||||
s, _ := New(Config{}, f)
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodDelete, "/", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id", "peerid"); c.SetParamValues("p", "nope")
|
||||
if err := s.Unsubscribe(c); err != nil { t.Fatal(err) }
|
||||
if rec.Code != http.StatusNoContent {
|
||||
t.Fatalf("expected 204, got %d", rec.Code)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 7.3 — Run, commit**
|
||||
|
||||
```bash
|
||||
go test ./app/webrtc/ -race
|
||||
git add app/webrtc/handler.go app/webrtc/handler_test.go
|
||||
git commit -m "feat(app/webrtc): WHEP HTTP handler"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 8 — Wire subsystem into app/api/api.go + http/server.go
|
||||
|
||||
**Files:**
|
||||
- Modify: `app/api/api.go`
|
||||
- Modify: `http/server.go`
|
||||
|
||||
- [ ] **Step 8.1 — Instantiate subsystem in api.New**
|
||||
|
||||
In `app/api/api.go`, after `restream := restream.New(...)`, when `cfg.WebRTC.Enable` is true, create the subsystem:
|
||||
|
||||
```go
|
||||
if cfg.WebRTC.Enable {
|
||||
webrtcSub, err := webrtcapp.New(webrtcapp.Config{
|
||||
PublicIP: cfg.WebRTC.PublicIP,
|
||||
NAT1To1IPs: cfg.WebRTC.NAT1To1IPs,
|
||||
}, restream)
|
||||
if err != nil {
|
||||
a.log.logger.core.Warn().WithError(err).Log("webrtc subsystem disabled")
|
||||
} else {
|
||||
_ = webrtcSub.Start()
|
||||
a.webrtc = webrtcSub
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Store on the api struct: `webrtc *webrtcapp.Subsystem`.
|
||||
|
||||
- [ ] **Step 8.2 — Mount HTTP routes**
|
||||
|
||||
In `http/server.go` near line 568 (where `v3.POST("/process", ...)` lives):
|
||||
|
||||
```go
|
||||
if s.webrtc != nil {
|
||||
v3.POST("/process/:id/whep", s.webrtc.Subscribe)
|
||||
v3.DELETE("/process/:id/whep/:peerid", s.webrtc.Unsubscribe)
|
||||
}
|
||||
```
|
||||
|
||||
Plumb `s.webrtc` from api → http/server constructor.
|
||||
|
||||
- [ ] **Step 8.3 — Verify build**
|
||||
|
||||
```bash
|
||||
go build ./...
|
||||
```
|
||||
|
||||
- [ ] **Step 8.4 — Commit**
|
||||
|
||||
```bash
|
||||
git add app/api/api.go http/server.go
|
||||
git commit -m "feat(core): wire webrtc subsystem + WHEP routes"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 9 — Integration smoke test
|
||||
|
||||
**Files:**
|
||||
- Create: `app/webrtc/integration_test.go`
|
||||
|
||||
- [ ] **Step 9.1 — Synthetic RTP → WHEP end-to-end**
|
||||
|
||||
Import M1's `test/whep-client` as a library. Boot a Subsystem, inject synthetic RTP on the allocated port (mimic Task 6's lifecycle), POST a WHEP offer, assert both tracks arrive. See M1's `test/whep-client/main_test.go` for reference.
|
||||
|
||||
- [ ] **Step 9.2 — Run with -race and commit**
|
||||
|
||||
---
|
||||
|
||||
## Task 10 — TrueNAS redeploy
|
||||
|
||||
- [ ] **Step 10.1 — Rebuild Core image (Dockerfile currently targets `cmd/webrtc-poc`; add a second target or switch to the root `./` build for Core proper).**
|
||||
- [ ] **Step 10.2 — Redeploy via docker compose on TrueNAS; verify WHEP endpoint returns 404 before any process exists, 201 after enabling WebRTC on a process.**
|
||||
|
||||
---
|
||||
|
||||
## Out of scope for this plan
|
||||
|
||||
- `core-ui/src/views/Edit/LiveTab.jsx` — core-ui is a separate repo and requires its own plan. Track as M2.5 once core-ui is cloned into the workspace.
|
||||
|
||||
## Self-review notes
|
||||
|
||||
- Task 7 depends on `core/webrtc.PeerFactory.NewPeerFromOffer` signature from M1; if it's named differently, adjust the call site (don't rewrite the handler).
|
||||
- Task 3 Step 3.3 assumes `restream.tasks` is a map keyed by id with a `*task` value that carries `config`. Confirm by reading around line 90 before implementing; the exact struct name may differ.
|
||||
- Task 2 `vars.NewStringList` / `vars.NewInt` signatures need confirming against the real `config/vars/value` package.
|
||||
Loading…
Reference in a new issue