Compare commits
10 commits
86bae816c1
...
d96aa70c27
| Author | SHA1 | Date | |
|---|---|---|---|
| d96aa70c27 | |||
| b030102611 | |||
| 83eaa28601 | |||
| f6d5b3378a | |||
| 9d38e9ccdb | |||
| 46531bb479 | |||
| 16ae17d2a1 | |||
| 80db028281 | |||
| eaeefee753 | |||
| c38036de94 |
22 changed files with 2438 additions and 12 deletions
|
|
@ -16,6 +16,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/datarhei/core/v16/app"
|
||||
appwebrtc "github.com/datarhei/core/v16/app/webrtc"
|
||||
"github.com/datarhei/core/v16/config"
|
||||
configstore "github.com/datarhei/core/v16/config/store"
|
||||
configvars "github.com/datarhei/core/v16/config/vars"
|
||||
|
|
@ -73,6 +74,8 @@ type api struct {
|
|||
s3fs map[string]fs.Filesystem
|
||||
rtmpserver rtmp.Server
|
||||
srtserver srt.Server
|
||||
webrtcsub *appwebrtc.Subsystem
|
||||
webrtchandler *appwebrtc.Handler
|
||||
metrics monitor.HistoryMonitor
|
||||
prom prometheus.Metrics
|
||||
service service.Service
|
||||
|
|
@ -617,6 +620,22 @@ func (a *api) start() error {
|
|||
|
||||
a.restream = restream
|
||||
|
||||
// Build the WebRTC egress subsystem if the operator enabled it.
|
||||
// Failure to construct the subsystem (e.g., invalid NAT1To1 IP)
|
||||
// is logged and the subsystem declines to install hooks — Core
|
||||
// starts normally without WebRTC support, consistent with how
|
||||
// disabling the subsystem at runtime is handled.
|
||||
if cfg.WebRTC.Enable {
|
||||
webrtcSub, werr := appwebrtc.New(cfg.WebRTC, a.log.logger.core)
|
||||
if werr != nil {
|
||||
a.log.logger.core.Warn().WithError(werr).Log("WebRTC subsystem disabled: construction failed")
|
||||
} else {
|
||||
a.restream.SetHooks(webrtcSub.Hooks())
|
||||
a.webrtcsub = webrtcSub
|
||||
a.webrtchandler = appwebrtc.NewHandler(webrtcSub, 0)
|
||||
}
|
||||
}
|
||||
|
||||
var httpjwt jwt.JWT
|
||||
|
||||
if cfg.API.Auth.Enable {
|
||||
|
|
@ -1014,6 +1033,7 @@ func (a *api) start() error {
|
|||
},
|
||||
RTMP: a.rtmpserver,
|
||||
SRT: a.srtserver,
|
||||
WebRTC: a.webrtchandler,
|
||||
JWT: a.httpjwt,
|
||||
Config: a.config.store,
|
||||
Sessions: a.sessions,
|
||||
|
|
@ -1354,6 +1374,17 @@ func (a *api) stop() {
|
|||
a.srtserver = nil
|
||||
}
|
||||
|
||||
// Tear down the WebRTC subsystem: close any active WHEP peers
|
||||
// first, then release all per-process UDP sockets.
|
||||
if a.webrtchandler != nil {
|
||||
a.webrtchandler.Close()
|
||||
a.webrtchandler = nil
|
||||
}
|
||||
if a.webrtcsub != nil {
|
||||
a.webrtcsub.Close()
|
||||
a.webrtcsub = nil
|
||||
}
|
||||
|
||||
// Stop the RTMP server
|
||||
if a.rtmpserver != nil {
|
||||
a.log.logger.rtmp.Info().Log("Stopping ...")
|
||||
|
|
|
|||
52
app/webrtc/ffmpeg_args.go
Normal file
52
app/webrtc/ffmpeg_args.go
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
// BuildArgs emits the FFmpeg output-leg args for the WebRTC side of a
|
||||
// process. It produces two separate "outputs" — one for video on
|
||||
// videoPort, one for audio on videoPort+1. Each output ends with its
|
||||
// UDP address so the slice is structured for consumption by
|
||||
// restream.AppendOutput after splitting on the track boundary.
|
||||
//
|
||||
// Copy vs. re-encode: if ForceTranscode is false, we assume the upstream
|
||||
// source is already H.264 + Opus and pass them through (copy). When the
|
||||
// source doesn't match, FFmpeg will fail at runtime and the process will
|
||||
// restart — the user can flip ForceTranscode on to get a baseline-profile
|
||||
// H.264 + Opus re-encode.
|
||||
func BuildArgs(cfg appcfg.ConfigWebRTC, videoPort int) []string {
|
||||
vcopy := []string{"-c:v", "copy"}
|
||||
acopy := []string{"-c:a", "copy"}
|
||||
if cfg.ForceTranscode {
|
||||
vcopy = []string{
|
||||
"-c:v", "libx264",
|
||||
"-preset", "veryfast",
|
||||
"-profile:v", "baseline",
|
||||
"-pix_fmt", "yuv420p",
|
||||
"-tune", "zerolatency",
|
||||
"-g", "60",
|
||||
}
|
||||
acopy = []string{"-c:a", "libopus", "-b:a", "96k"}
|
||||
}
|
||||
|
||||
args := []string{"-map", "0:v:0"}
|
||||
args = append(args, vcopy...)
|
||||
args = append(args,
|
||||
"-payload_type", fmt.Sprint(cfg.VideoPT),
|
||||
"-f", "rtp",
|
||||
fmt.Sprintf("udp://127.0.0.1:%d?pkt_size=1316", videoPort),
|
||||
)
|
||||
|
||||
args = append(args, "-map", "0:a:0")
|
||||
args = append(args, acopy...)
|
||||
args = append(args,
|
||||
"-payload_type", fmt.Sprint(cfg.AudioPT),
|
||||
"-f", "rtp",
|
||||
fmt.Sprintf("udp://127.0.0.1:%d?pkt_size=1316", videoPort+1),
|
||||
)
|
||||
|
||||
return args
|
||||
}
|
||||
89
app/webrtc/ffmpeg_args_test.go
Normal file
89
app/webrtc/ffmpeg_args_test.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
func TestBuildArgs_CopyCodecs(t *testing.T) {
|
||||
cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}
|
||||
got := BuildArgs(cfg, 49200)
|
||||
|
||||
// Must contain -c:v copy and -c:a copy when ForceTranscode is false.
|
||||
if !contains(got, "-c:v", "copy") {
|
||||
t.Fatalf("expected -c:v copy, got %v", got)
|
||||
}
|
||||
if !contains(got, "-c:a", "copy") {
|
||||
t.Fatalf("expected -c:a copy, got %v", got)
|
||||
}
|
||||
|
||||
// Two UDP addresses, one per track, with port+1 for audio.
|
||||
if !any(got, "udp://127.0.0.1:49200?") {
|
||||
t.Fatalf("expected video udp on 49200, got %v", got)
|
||||
}
|
||||
if !any(got, "udp://127.0.0.1:49201?") {
|
||||
t.Fatalf("expected audio udp on 49201, got %v", got)
|
||||
}
|
||||
|
||||
// Payload types must be stringified.
|
||||
if !contains(got, "-payload_type", "102") {
|
||||
t.Fatalf("expected video PT 102, got %v", got)
|
||||
}
|
||||
if !contains(got, "-payload_type", "111") {
|
||||
t.Fatalf("expected audio PT 111, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildArgs_ForceTranscode(t *testing.T) {
|
||||
cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111, ForceTranscode: true}
|
||||
got := BuildArgs(cfg, 49200)
|
||||
|
||||
if !contains(got, "-c:v", "libx264") {
|
||||
t.Fatalf("expected -c:v libx264, got %v", got)
|
||||
}
|
||||
if !contains(got, "-profile:v", "baseline") {
|
||||
t.Fatalf("expected baseline profile, got %v", got)
|
||||
}
|
||||
if !contains(got, "-c:a", "libopus") {
|
||||
t.Fatalf("expected -c:a libopus, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildArgs_TwoTrackBoundary(t *testing.T) {
|
||||
cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}
|
||||
got := BuildArgs(cfg, 49200)
|
||||
|
||||
// The second `-map` marks the start of the audio leg — the split
|
||||
// point restream.AppendOutput callers use.
|
||||
mapCount := 0
|
||||
for _, a := range got {
|
||||
if a == "-map" {
|
||||
mapCount++
|
||||
}
|
||||
}
|
||||
if mapCount != 2 {
|
||||
t.Fatalf("expected exactly 2 -map tokens, got %d in %v", mapCount, got)
|
||||
}
|
||||
}
|
||||
|
||||
// contains reports whether the two-token sequence appears consecutively.
|
||||
func contains(haystack []string, a, b string) bool {
|
||||
for i := 0; i+1 < len(haystack); i++ {
|
||||
if haystack[i] == a && haystack[i+1] == b {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// any reports whether any element of haystack starts with prefix.
|
||||
func any(haystack []string, prefix string) bool {
|
||||
for _, h := range haystack {
|
||||
if strings.HasPrefix(h, prefix) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
134
app/webrtc/handler.go
Normal file
134
app/webrtc/handler.go
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/pion/webrtc/v4"
|
||||
|
||||
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
||||
)
|
||||
|
||||
// Handler exposes the subsystem's WHEP Echo handlers. Wire them into
|
||||
// the /api/v3 group (or a sibling group) via Handler.Register.
|
||||
type Handler struct {
|
||||
sub *Subsystem
|
||||
|
||||
peersMu sync.Mutex
|
||||
peers map[string]*corewebrtc.Peer // resourceID -> peer
|
||||
count int64 // atomic, for cap check without lock
|
||||
maxCap int64
|
||||
}
|
||||
|
||||
// NewHandler wraps the subsystem in an Echo-compatible HTTP handler.
|
||||
// The maxPeers argument caps concurrent subscribers; pass 0 to use a
|
||||
// generous default (matches corewebrtc.DefaultConfig).
|
||||
func NewHandler(s *Subsystem, maxPeers int) *Handler {
|
||||
cap := int64(maxPeers)
|
||||
if cap <= 0 {
|
||||
cap = int64(corewebrtc.DefaultConfig().MaxPeersTotal)
|
||||
}
|
||||
return &Handler{
|
||||
sub: s,
|
||||
peers: make(map[string]*corewebrtc.Peer),
|
||||
maxCap: cap,
|
||||
}
|
||||
}
|
||||
|
||||
// Register mounts the WHEP routes on the provided Echo group. WHEP
|
||||
// POST is /whep/:id, WHEP DELETE is /whep/:id/:resource.
|
||||
//
|
||||
// The routes are deliberately unauthenticated in M2 because WHEP
|
||||
// clients (browsers, OBS) don't carry the Core JWT. M3 will add
|
||||
// per-process signed-URL tokens; for M2 the deployment is expected
|
||||
// to put the endpoint behind an authenticated reverse-proxy or VPN.
|
||||
func (h *Handler) Register(g *echo.Group) {
|
||||
g.POST("/whep/:id", h.Subscribe)
|
||||
g.DELETE("/whep/:id/:resource", h.Unsubscribe)
|
||||
}
|
||||
|
||||
// Subscribe handles POST /whep/:id. Request body is an SDP offer,
|
||||
// response is an SDP answer with a Location header pointing at the
|
||||
// DELETE resource.
|
||||
func (h *Handler) Subscribe(c echo.Context) error {
|
||||
id := c.Param("id")
|
||||
if id == "" {
|
||||
return c.String(http.StatusBadRequest, "missing stream id")
|
||||
}
|
||||
|
||||
if atomic.LoadInt64(&h.count) >= h.maxCap {
|
||||
return c.String(http.StatusServiceUnavailable, corewebrtc.ErrPeerCapReached.Error())
|
||||
}
|
||||
|
||||
stream, ok := h.sub.lookup(id)
|
||||
if !ok {
|
||||
return c.String(http.StatusNotFound, corewebrtc.ErrStreamNotFound.Error())
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(c.Request().Body)
|
||||
if err != nil {
|
||||
return c.String(http.StatusBadRequest, "read body: "+err.Error())
|
||||
}
|
||||
if len(body) == 0 || !strings.HasPrefix(string(body), "v=") {
|
||||
return c.String(http.StatusBadRequest, corewebrtc.ErrInvalidSDP.Error())
|
||||
}
|
||||
|
||||
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
|
||||
peer, err := h.sub.factory.CreatePeerFromSources(c.Request().Context(), stream.video, stream.audio, offer)
|
||||
if err != nil {
|
||||
return c.String(http.StatusInternalServerError, "create peer: "+err.Error())
|
||||
}
|
||||
|
||||
h.peersMu.Lock()
|
||||
h.peers[peer.ResourceID()] = peer
|
||||
h.peersMu.Unlock()
|
||||
atomic.AddInt64(&h.count, 1)
|
||||
|
||||
c.Response().Header().Set("Content-Type", "application/sdp")
|
||||
c.Response().Header().Set("Location", "/whep/"+id+"/"+peer.ResourceID())
|
||||
return c.String(http.StatusCreated, peer.Answer().SDP)
|
||||
}
|
||||
|
||||
// Unsubscribe handles DELETE /whep/:id/:resource. The :id is part of
|
||||
// the path per WHEP spec but we only need :resource to locate the
|
||||
// peer; :id is accepted for route symmetry.
|
||||
func (h *Handler) Unsubscribe(c echo.Context) error {
|
||||
resource := c.Param("resource")
|
||||
if resource == "" {
|
||||
return c.String(http.StatusBadRequest, "missing resource id")
|
||||
}
|
||||
|
||||
h.peersMu.Lock()
|
||||
peer, ok := h.peers[resource]
|
||||
if ok {
|
||||
delete(h.peers, resource)
|
||||
}
|
||||
h.peersMu.Unlock()
|
||||
|
||||
if !ok {
|
||||
return c.NoContent(http.StatusNotFound)
|
||||
}
|
||||
_ = peer.Close()
|
||||
atomic.AddInt64(&h.count, -1)
|
||||
return c.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// Close tears down every active peer (e.g., during Core shutdown).
|
||||
func (h *Handler) Close() {
|
||||
h.peersMu.Lock()
|
||||
peers := make([]*corewebrtc.Peer, 0, len(h.peers))
|
||||
for _, p := range h.peers {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
h.peers = make(map[string]*corewebrtc.Peer)
|
||||
h.peersMu.Unlock()
|
||||
|
||||
for _, p := range peers {
|
||||
_ = p.Close()
|
||||
}
|
||||
atomic.StoreInt64(&h.count, 0)
|
||||
}
|
||||
89
app/webrtc/handler_test.go
Normal file
89
app/webrtc/handler_test.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
|
||||
"github.com/datarhei/core/v16/config"
|
||||
)
|
||||
|
||||
func newTestSubsystem(t *testing.T) *Subsystem {
|
||||
t.Helper()
|
||||
s, err := New(config.DataWebRTC{Enable: true}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("New: %v", err)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// TestHandler_Subscribe_404WhenStreamMissing verifies the WHEP POST
|
||||
// returns 404 when no process has registered a stream for that id.
|
||||
func TestHandler_Subscribe_404WhenStreamMissing(t *testing.T) {
|
||||
h := NewHandler(newTestSubsystem(t), 0)
|
||||
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodPost, "/whep/ghost", strings.NewReader("v=0\r\n"))
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id")
|
||||
c.SetParamValues("ghost")
|
||||
|
||||
if err := h.Subscribe(c); err != nil {
|
||||
t.Fatalf("Subscribe returned error: %v", err)
|
||||
}
|
||||
|
||||
if rec.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404, got %d: %s", rec.Code, rec.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestHandler_Subscribe_400OnEmptyBody verifies invalid SDP offers
|
||||
// short-circuit before any peer is created. Requires a registered
|
||||
// stream so lookup doesn't 404 first.
|
||||
func TestHandler_Subscribe_400OnEmptyBody(t *testing.T) {
|
||||
sub := newTestSubsystem(t)
|
||||
// Register a dummy stream so the handler reaches body validation.
|
||||
sub.mu.Lock()
|
||||
sub.streams["probe"] = &processStream{id: "probe"} // video/audio nil is fine here — we never get past body parse
|
||||
sub.mu.Unlock()
|
||||
|
||||
h := NewHandler(sub, 0)
|
||||
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodPost, "/whep/probe", strings.NewReader(""))
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id")
|
||||
c.SetParamValues("probe")
|
||||
|
||||
if err := h.Subscribe(c); err != nil {
|
||||
t.Fatalf("Subscribe returned error: %v", err)
|
||||
}
|
||||
if rec.Code != http.StatusBadRequest {
|
||||
t.Fatalf("expected 400, got %d: %s", rec.Code, rec.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestHandler_Unsubscribe_404WhenUnknown verifies a DELETE with an
|
||||
// unknown resource id returns 404 and no state mutation.
|
||||
func TestHandler_Unsubscribe_404WhenUnknown(t *testing.T) {
|
||||
h := NewHandler(newTestSubsystem(t), 0)
|
||||
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodDelete, "/whep/id/unknown", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id", "resource")
|
||||
c.SetParamValues("id", "unknown")
|
||||
|
||||
if err := h.Unsubscribe(c); err != nil {
|
||||
t.Fatalf("Unsubscribe returned error: %v", err)
|
||||
}
|
||||
if rec.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404, got %d", rec.Code)
|
||||
}
|
||||
}
|
||||
275
app/webrtc/integration_test.go
Normal file
275
app/webrtc/integration_test.go
Normal file
|
|
@ -0,0 +1,275 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/pion/rtp"
|
||||
pionwebrtc "github.com/pion/webrtc/v4"
|
||||
|
||||
"github.com/datarhei/core/v16/config"
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
// TestIntegration_SyntheticRTPToWHEP wires the full M2 subsystem end to
|
||||
// end using in-process UDP sockets and a Pion WHEP subscriber:
|
||||
//
|
||||
// 1. Build a Subsystem and Handler (no Core/HTTP server needed).
|
||||
// 2. Fire the OnStart hook directly — this allocates two adjacent
|
||||
// loopback UDP ports and registers a process stream.
|
||||
// 3. Extract the allocated video + audio ports from the returned
|
||||
// ConfigIO legs.
|
||||
// 4. Build a Pion PeerConnection (recvonly video + audio) and POST its
|
||||
// SDP offer through the Echo Handler.
|
||||
// 5. Plumb the returned answer into the PC.
|
||||
// 6. Spray synthetic RTP packets at both UDP ports.
|
||||
// 7. Assert that the PC sees OnTrack for both kinds and at least one
|
||||
// RTP packet arrives on each track inside the timeout budget.
|
||||
//
|
||||
// This is the single highest-leverage integration test for M2 — it
|
||||
// catches the whole stack: port allocation, hook contract, two-track
|
||||
// forwarding, WHEP handshake, and JWT-mounted routing doesn't interfere
|
||||
// with the handler's internal flow.
|
||||
func TestIntegration_SyntheticRTPToWHEP(t *testing.T) {
|
||||
// --- 1. Construct subsystem + handler. ---
|
||||
sub, err := New(config.DataWebRTC{Enable: true}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("subsystem New: %v", err)
|
||||
}
|
||||
defer sub.Close()
|
||||
|
||||
h := NewHandler(sub, 0)
|
||||
defer h.Close()
|
||||
|
||||
// --- 2. Fire OnStart directly to populate the stream registry
|
||||
// and allocate ports. We bypass the restream manager by
|
||||
// invoking the hook the subsystem would have registered.
|
||||
processID := "integration-probe"
|
||||
legs, err := sub.onProcessStart(processID, &appcfg.Config{
|
||||
ID: processID,
|
||||
WebRTC: appcfg.ConfigWebRTC{
|
||||
Enabled: true,
|
||||
VideoPT: 102,
|
||||
AudioPT: 111,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("onProcessStart: %v", err)
|
||||
}
|
||||
if len(legs) != 2 {
|
||||
t.Fatalf("expected 2 output legs, got %d", len(legs))
|
||||
}
|
||||
defer sub.onProcessStop(processID)
|
||||
|
||||
// --- 3. Extract UDP ports from leg addresses. ---
|
||||
videoPort, err := portFromLegAddress(legs[0].Address)
|
||||
if err != nil {
|
||||
t.Fatalf("video leg address %q: %v", legs[0].Address, err)
|
||||
}
|
||||
audioPort, err := portFromLegAddress(legs[1].Address)
|
||||
if err != nil {
|
||||
t.Fatalf("audio leg address %q: %v", legs[1].Address, err)
|
||||
}
|
||||
if audioPort != videoPort+1 {
|
||||
t.Fatalf("expected adjacent ports, got video=%d audio=%d", videoPort, audioPort)
|
||||
}
|
||||
|
||||
// --- 4. Mount the handler in an Echo server (httptest) so we
|
||||
// exercise the real route registration path. ---
|
||||
e := echo.New()
|
||||
g := e.Group("")
|
||||
h.Register(g)
|
||||
srv := httptest.NewServer(e)
|
||||
defer srv.Close()
|
||||
|
||||
// --- 5. Build the WHEP subscriber PeerConnection. ---
|
||||
me := &pionwebrtc.MediaEngine{}
|
||||
if err := me.RegisterDefaultCodecs(); err != nil {
|
||||
t.Fatalf("register default codecs: %v", err)
|
||||
}
|
||||
api := pionwebrtc.NewAPI(pionwebrtc.WithMediaEngine(me))
|
||||
pc, err := api.NewPeerConnection(pionwebrtc.Configuration{})
|
||||
if err != nil {
|
||||
t.Fatalf("new PC: %v", err)
|
||||
}
|
||||
defer pc.Close()
|
||||
|
||||
if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeVideo,
|
||||
pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
||||
t.Fatalf("add video transceiver: %v", err)
|
||||
}
|
||||
if _, err := pc.AddTransceiverFromKind(pionwebrtc.RTPCodecTypeAudio,
|
||||
pionwebrtc.RTPTransceiverInit{Direction: pionwebrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
||||
t.Fatalf("add audio transceiver: %v", err)
|
||||
}
|
||||
|
||||
// Signal when each track has produced its first RTP packet.
|
||||
var videoGot, audioGot atomic.Bool
|
||||
videoCh := make(chan struct{}, 1)
|
||||
audioCh := make(chan struct{}, 1)
|
||||
|
||||
pc.OnTrack(func(tr *pionwebrtc.TrackRemote, _ *pionwebrtc.RTPReceiver) {
|
||||
// Read a single RTP packet and signal the appropriate channel.
|
||||
go func() {
|
||||
if _, _, readErr := tr.ReadRTP(); readErr != nil {
|
||||
return
|
||||
}
|
||||
switch tr.Kind() {
|
||||
case pionwebrtc.RTPCodecTypeVideo:
|
||||
if videoGot.CompareAndSwap(false, true) {
|
||||
videoCh <- struct{}{}
|
||||
}
|
||||
case pionwebrtc.RTPCodecTypeAudio:
|
||||
if audioGot.CompareAndSwap(false, true) {
|
||||
audioCh <- struct{}{}
|
||||
}
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
offer, err := pc.CreateOffer(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("create offer: %v", err)
|
||||
}
|
||||
gatherLocal := pionwebrtc.GatheringCompletePromise(pc)
|
||||
if err := pc.SetLocalDescription(offer); err != nil {
|
||||
t.Fatalf("set local: %v", err)
|
||||
}
|
||||
select {
|
||||
case <-gatherLocal:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("local ICE gathering timeout")
|
||||
}
|
||||
offerSDP := pc.LocalDescription().SDP
|
||||
|
||||
// --- 6. POST the offer to the WHEP endpoint. ---
|
||||
resp, err := http.Post(srv.URL+"/whep/"+processID, "application/sdp",
|
||||
strings.NewReader(offerSDP))
|
||||
if err != nil {
|
||||
t.Fatalf("POST /whep: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusCreated {
|
||||
t.Fatalf("POST /whep status = %d, want 201", resp.StatusCode)
|
||||
}
|
||||
|
||||
answerBuf := make([]byte, 1<<15)
|
||||
n, _ := resp.Body.Read(answerBuf)
|
||||
answerSDP := string(answerBuf[:n])
|
||||
if !strings.Contains(answerSDP, "v=0") {
|
||||
t.Fatalf("answer SDP malformed: %q", answerSDP)
|
||||
}
|
||||
|
||||
loc := resp.Header.Get("Location")
|
||||
if loc == "" || !strings.HasPrefix(loc, "/whep/"+processID+"/") {
|
||||
t.Fatalf("Location header bad: %q", loc)
|
||||
}
|
||||
|
||||
if err := pc.SetRemoteDescription(pionwebrtc.SessionDescription{
|
||||
Type: pionwebrtc.SDPTypeAnswer,
|
||||
SDP: answerSDP,
|
||||
}); err != nil {
|
||||
t.Fatalf("set remote: %v", err)
|
||||
}
|
||||
|
||||
// --- 7. Spray synthetic RTP into both UDP ports. ---
|
||||
videoSender, err := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(videoPort))
|
||||
if err != nil {
|
||||
t.Fatalf("dial video: %v", err)
|
||||
}
|
||||
defer videoSender.Close()
|
||||
audioSender, err := net.Dial("udp", "127.0.0.1:"+strconv.Itoa(audioPort))
|
||||
if err != nil {
|
||||
t.Fatalf("dial audio: %v", err)
|
||||
}
|
||||
defer audioSender.Close()
|
||||
|
||||
stopSend := make(chan struct{})
|
||||
defer close(stopSend)
|
||||
go func() {
|
||||
ticker := time.NewTicker(20 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
var vseq, aseq uint16
|
||||
for {
|
||||
select {
|
||||
case <-stopSend:
|
||||
return
|
||||
case <-ticker.C:
|
||||
vseq++
|
||||
aseq++
|
||||
vpkt := synthRTPPacket(102, vseq, uint32(vseq)*3000, 0xcafe0000, []byte("vvvvvvvv"))
|
||||
_, _ = videoSender.Write(vpkt)
|
||||
apkt := synthRTPPacket(111, aseq, uint32(aseq)*960, 0xbeef0000, []byte("aaaaaaaa"))
|
||||
_, _ = audioSender.Write(apkt)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// --- 8. Wait for both tracks' first packet. ---
|
||||
waitFor := func(name string, ch chan struct{}) {
|
||||
select {
|
||||
case <-ch:
|
||||
// success
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatalf("%s: no RTP received via WHEP within 10s", name)
|
||||
}
|
||||
}
|
||||
waitFor("video", videoCh)
|
||||
waitFor("audio", audioCh)
|
||||
|
||||
// Sanity: the Location path should DELETE cleanly.
|
||||
parsedLoc, err := url.Parse(loc)
|
||||
if err != nil {
|
||||
t.Fatalf("parse Location: %v", err)
|
||||
}
|
||||
deleteReq, _ := http.NewRequest(http.MethodDelete, srv.URL+parsedLoc.Path, nil)
|
||||
delResp, err := http.DefaultClient.Do(deleteReq)
|
||||
if err != nil {
|
||||
t.Fatalf("DELETE /whep/.../resource: %v", err)
|
||||
}
|
||||
_ = delResp.Body.Close()
|
||||
if delResp.StatusCode != http.StatusNoContent {
|
||||
t.Fatalf("DELETE status = %d, want 204", delResp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// portFromLegAddress pulls the UDP port out of a leg Address like
|
||||
// "udp://127.0.0.1:49200?pkt_size=1316".
|
||||
func portFromLegAddress(addr string) (int, error) {
|
||||
re := regexp.MustCompile(`udp://[^:]+:(\d+)`)
|
||||
m := re.FindStringSubmatch(addr)
|
||||
if len(m) != 2 {
|
||||
return 0, &portParseError{addr: addr}
|
||||
}
|
||||
return strconv.Atoi(m[1])
|
||||
}
|
||||
|
||||
type portParseError struct{ addr string }
|
||||
|
||||
func (e *portParseError) Error() string { return "cannot parse port from " + e.addr }
|
||||
|
||||
// synthRTPPacket builds a minimal valid RTP packet for injection testing.
|
||||
func synthRTPPacket(pt uint8, seq uint16, ts uint32, ssrc uint32, payload []byte) []byte {
|
||||
p := &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
PayloadType: pt,
|
||||
SequenceNumber: seq,
|
||||
Timestamp: ts,
|
||||
SSRC: ssrc,
|
||||
Marker: false,
|
||||
},
|
||||
Payload: payload,
|
||||
}
|
||||
b, _ := p.Marshal()
|
||||
return b
|
||||
}
|
||||
191
app/webrtc/lifecycle.go
Normal file
191
app/webrtc/lifecycle.go
Normal file
|
|
@ -0,0 +1,191 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
// Default payload types. These match the values the M1 PoC / M2
|
||||
// forwarder expects (H.264 = 102, Opus = 111). Operators can override
|
||||
// per-process via the restream Config.
|
||||
const (
|
||||
defaultVideoPT = 102
|
||||
defaultAudioPT = 111
|
||||
)
|
||||
|
||||
// allocAttempts is the maximum number of times onProcessStart will
|
||||
// retry port allocation to find two adjacent free loopback UDP ports.
|
||||
// The kernel sometimes hands us an odd port for video, making V+1
|
||||
// unavailable — in practice 2-3 retries is plenty.
|
||||
const allocAttempts = 10
|
||||
|
||||
// onProcessStart is registered as the restream ProcessStartHook. It
|
||||
// fires with the restream write lock held, just before FFmpeg Start.
|
||||
//
|
||||
// When the per-process WebRTC config is disabled, it returns (nil, nil)
|
||||
// — FFmpeg starts normally without any extra output legs. When enabled
|
||||
// it:
|
||||
//
|
||||
// 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1).
|
||||
// 2. Binds Pion Sources on those ports and registers the pair under
|
||||
// the process ID.
|
||||
// 3. Builds the two RTP ConfigIO output legs via BuildArgs and returns
|
||||
// them to the restream manager, which appends them to cfg.Output
|
||||
// and rebuilds the FFmpeg command.
|
||||
//
|
||||
// Any error aborts the process start. On partial allocation failure,
|
||||
// all allocated resources are cleaned up before returning.
|
||||
func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.ConfigIO, error) {
|
||||
if cfg == nil || !cfg.WebRTC.Enabled {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Normalize PTs — zero values mean "use defaults".
|
||||
wcfg := cfg.WebRTC
|
||||
if wcfg.VideoPT == 0 {
|
||||
wcfg.VideoPT = defaultVideoPT
|
||||
}
|
||||
if wcfg.AudioPT == 0 {
|
||||
wcfg.AudioPT = defaultAudioPT
|
||||
}
|
||||
|
||||
// Refuse to re-register — the restream manager should never
|
||||
// double-start a process but defensive check avoids a silent
|
||||
// Source leak if it does.
|
||||
s.mu.Lock()
|
||||
if _, exists := s.streams[id]; exists {
|
||||
s.mu.Unlock()
|
||||
return nil, fmt.Errorf("webrtc: process %q already has an active stream", id)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
videoPort, videoSrc, audioSrc, err := s.allocAdjacentPair(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start the UDP readers so they're draining packets the moment
|
||||
// FFmpeg comes online.
|
||||
videoSrc.Start()
|
||||
audioSrc.Start()
|
||||
|
||||
s.mu.Lock()
|
||||
s.streams[id] = &processStream{id: id, video: videoSrc, audio: audioSrc}
|
||||
s.mu.Unlock()
|
||||
|
||||
s.logger.WithFields(map[string]interface{}{
|
||||
"id": id,
|
||||
"video_port": videoPort,
|
||||
"audio_port": videoPort + 1,
|
||||
"video_pt": wcfg.VideoPT,
|
||||
"audio_pt": wcfg.AudioPT,
|
||||
}).Info().Log("WebRTC egress registered for process")
|
||||
|
||||
args := BuildArgs(wcfg, videoPort)
|
||||
return splitRTPLegs(args), nil
|
||||
}
|
||||
|
||||
// onProcessStop is registered as the restream ProcessStopHook. It
|
||||
// fires with the restream write lock held, just after FFmpeg has been
|
||||
// stopped. It tears down the per-process Sources (which closes their
|
||||
// sockets and hangs up any subscribed peers).
|
||||
func (s *Subsystem) onProcessStop(id string) {
|
||||
s.mu.Lock()
|
||||
st, ok := s.streams[id]
|
||||
if ok {
|
||||
delete(s.streams, id)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if st.video != nil {
|
||||
_ = st.video.Close()
|
||||
}
|
||||
if st.audio != nil {
|
||||
_ = st.audio.Close()
|
||||
}
|
||||
s.logger.WithField("id", id).Info().Log("WebRTC egress torn down for process")
|
||||
}
|
||||
|
||||
// allocAdjacentPair finds a pair of free loopback UDP ports (V, V+1)
|
||||
// and binds a Source to each. It retries up to allocAttempts times
|
||||
// because the kernel's ephemeral picker may hand us a port whose +1
|
||||
// neighbor is already taken. Caller owns the returned Sources; on
|
||||
// error all partial allocations are cleaned up.
|
||||
func (s *Subsystem) allocAdjacentPair(id string) (int, *corewebrtc.Source, *corewebrtc.Source, error) {
|
||||
var lastErr error
|
||||
for attempt := 0; attempt < allocAttempts; attempt++ {
|
||||
port, err := Alloc()
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
|
||||
videoSrc, err := corewebrtc.NewSourceOn(id, "127.0.0.1", port)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
audioSrc, err := corewebrtc.NewSourceOn(id+":audio", "127.0.0.1", port+1)
|
||||
if err != nil {
|
||||
_ = videoSrc.Close()
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
return port, videoSrc, audioSrc, nil
|
||||
}
|
||||
if lastErr == nil {
|
||||
lastErr = fmt.Errorf("unknown allocation failure")
|
||||
}
|
||||
return 0, nil, nil, fmt.Errorf("webrtc: allocate adjacent UDP port pair after %d attempts: %w", allocAttempts, lastErr)
|
||||
}
|
||||
|
||||
// splitRTPLegs converts the flat BuildArgs output into two ConfigIO
|
||||
// entries — one per RTP output leg. It splits on the second "-map"
|
||||
// token, which marks the audio leg's start (see ffmpeg_args_test.go).
|
||||
// The Address of each ConfigIO is the last argument (the udp:// URL);
|
||||
// everything preceding it forms that output's Options.
|
||||
func splitRTPLegs(args []string) []appcfg.ConfigIO {
|
||||
// Find the two -map indices.
|
||||
mapIdx := []int{}
|
||||
for i, a := range args {
|
||||
if a == "-map" {
|
||||
mapIdx = append(mapIdx, i)
|
||||
}
|
||||
}
|
||||
if len(mapIdx) != 2 {
|
||||
// BuildArgs always emits exactly 2 -maps; a different count
|
||||
// means an upstream bug. Return a single leg covering
|
||||
// everything to avoid silent truncation.
|
||||
return []appcfg.ConfigIO{toLeg(args)}
|
||||
}
|
||||
|
||||
videoTokens := args[mapIdx[0]:mapIdx[1]]
|
||||
audioTokens := args[mapIdx[1]:]
|
||||
|
||||
return []appcfg.ConfigIO{
|
||||
toLeg(videoTokens),
|
||||
toLeg(audioTokens),
|
||||
}
|
||||
}
|
||||
|
||||
// toLeg splits a contiguous RTP-output token slice into a ConfigIO:
|
||||
// the trailing token is the udp:// Address; everything before is the
|
||||
// Options slice.
|
||||
func toLeg(tokens []string) appcfg.ConfigIO {
|
||||
if len(tokens) == 0 {
|
||||
return appcfg.ConfigIO{}
|
||||
}
|
||||
addr := tokens[len(tokens)-1]
|
||||
opts := make([]string, len(tokens)-1)
|
||||
copy(opts, tokens[:len(tokens)-1])
|
||||
return appcfg.ConfigIO{
|
||||
ID: "webrtc",
|
||||
Address: addr,
|
||||
Options: opts,
|
||||
}
|
||||
}
|
||||
60
app/webrtc/lifecycle_test.go
Normal file
60
app/webrtc/lifecycle_test.go
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
// TestSplitRTPLegs_TwoLegs feeds the real BuildArgs output through
|
||||
// the splitter and checks both legs come out with the correct shape.
|
||||
func TestSplitRTPLegs_TwoLegs(t *testing.T) {
|
||||
args := BuildArgs(appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}, 49200)
|
||||
|
||||
legs := splitRTPLegs(args)
|
||||
if len(legs) != 2 {
|
||||
t.Fatalf("expected 2 legs, got %d: %+v", len(legs), legs)
|
||||
}
|
||||
|
||||
video := legs[0]
|
||||
audio := legs[1]
|
||||
|
||||
// Leg 0 is video: address ends with :49200
|
||||
if !strings.HasSuffix(video.Address, ":49200?pkt_size=1316") {
|
||||
t.Fatalf("video Address unexpected: %q", video.Address)
|
||||
}
|
||||
// Leg 1 is audio: address ends with :49201
|
||||
if !strings.HasSuffix(audio.Address, ":49201?pkt_size=1316") {
|
||||
t.Fatalf("audio Address unexpected: %q", audio.Address)
|
||||
}
|
||||
|
||||
// Each leg's options start with -map, end with -f rtp.
|
||||
if len(video.Options) < 2 || video.Options[0] != "-map" {
|
||||
t.Fatalf("video leg should start with -map, got %v", video.Options)
|
||||
}
|
||||
if video.Options[len(video.Options)-2] != "-f" || video.Options[len(video.Options)-1] != "rtp" {
|
||||
t.Fatalf("video leg should end with -f rtp, got %v", video.Options)
|
||||
}
|
||||
if len(audio.Options) < 2 || audio.Options[0] != "-map" {
|
||||
t.Fatalf("audio leg should start with -map, got %v", audio.Options)
|
||||
}
|
||||
|
||||
// Neither leg's Options should contain the address itself.
|
||||
for _, opt := range video.Options {
|
||||
if strings.HasPrefix(opt, "udp://") {
|
||||
t.Fatalf("video Options must not contain udp:// address: %v", video.Options)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestSplitRTPLegs_FallbackOnUnexpectedShape ensures we don't panic
|
||||
// or drop data if BuildArgs ever changes shape — the splitter returns
|
||||
// a single leg wrapping everything.
|
||||
func TestSplitRTPLegs_FallbackOnUnexpectedShape(t *testing.T) {
|
||||
// Single -map: shouldn't happen, but don't panic.
|
||||
legs := splitRTPLegs([]string{"-map", "0:v:0", "udp://1.2.3.4:5000"})
|
||||
if len(legs) != 1 {
|
||||
t.Fatalf("expected single fallback leg, got %d", len(legs))
|
||||
}
|
||||
}
|
||||
31
app/webrtc/portalloc.go
Normal file
31
app/webrtc/portalloc.go
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
// Package webrtc is the datarhei Core subsystem that turns WebRTC into
|
||||
// a first-class output alongside RTMP, SRT, and HLS. It owns the WHEP
|
||||
// HTTP handler, wires FFmpeg's RTP output into per-process Pion
|
||||
// Sources, and tracks active peer connections.
|
||||
//
|
||||
// See docs/design/2026-04-17-datarhei-dragon-fork-m2-webrtc-core-integration.md
|
||||
// for the full design.
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
// Alloc binds :0 on loopback UDPv4, records the port the kernel assigned,
|
||||
// closes the socket, and returns the port number.
|
||||
//
|
||||
// The caller is expected to re-bind that exact port via
|
||||
// core/webrtc.NewSourceOn immediately. There is a microsecond-sized race
|
||||
// window where another process on the host could grab the port; if that
|
||||
// happens, the caller's rebind will fail and the error should be
|
||||
// propagated. In practice this is rare enough that a retry loop would be
|
||||
// unnecessary churn.
|
||||
func Alloc() (int, error) {
|
||||
c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("webrtc: portalloc: %w", err)
|
||||
}
|
||||
defer c.Close()
|
||||
return c.LocalAddr().(*net.UDPAddr).Port, nil
|
||||
}
|
||||
43
app/webrtc/portalloc_test.go
Normal file
43
app/webrtc/portalloc_test.go
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestAlloc_ReturnsRebindablePort exercises the alloc/close/rebind
|
||||
// sequence 100 times. If a fast rebind race existed in normal
|
||||
// conditions, this would surface it.
|
||||
func TestAlloc_ReturnsRebindablePort(t *testing.T) {
|
||||
for i := 0; i < 100; i++ {
|
||||
p, err := Alloc()
|
||||
if err != nil {
|
||||
t.Fatalf("iter %d: Alloc: %v", i, err)
|
||||
}
|
||||
if p == 0 {
|
||||
t.Fatalf("iter %d: expected non-zero port", i)
|
||||
}
|
||||
c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: p})
|
||||
if err != nil {
|
||||
t.Fatalf("iter %d: rebind port %d: %v", i, p, err)
|
||||
}
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// TestAlloc_DistinctPorts confirms the OS doesn't hand us the same
|
||||
// ephemeral port twice in quick succession (it shouldn't — the socket
|
||||
// is briefly held in the bound state on close).
|
||||
func TestAlloc_DistinctPorts(t *testing.T) {
|
||||
seen := map[int]bool{}
|
||||
for i := 0; i < 10; i++ {
|
||||
p, err := Alloc()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if seen[p] {
|
||||
t.Fatalf("duplicate port %d", p)
|
||||
}
|
||||
seen[p] = true
|
||||
}
|
||||
}
|
||||
120
app/webrtc/subsystem.go
Normal file
120
app/webrtc/subsystem.go
Normal file
|
|
@ -0,0 +1,120 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/datarhei/core/v16/config"
|
||||
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
||||
"github.com/datarhei/core/v16/log"
|
||||
"github.com/datarhei/core/v16/restream"
|
||||
)
|
||||
|
||||
// Subsystem is the app-level WebRTC egress manager. It sits alongside
|
||||
// api.API as a sibling — both consume the Restream service, both wire
|
||||
// themselves into the Echo HTTP router. The subsystem is responsible
|
||||
// for:
|
||||
//
|
||||
// - Translating the global config.DataWebRTC into the core-level
|
||||
// corewebrtc.Config used by the PeerFactory.
|
||||
// - Installing ProcessHooks on Restreamer so that per-process start
|
||||
// events allocate a pair of UDP ports, create Pion Sources, and
|
||||
// inject RTP output legs into the FFmpeg command line.
|
||||
// - Serving the WHEP Echo handler (see handler.go).
|
||||
//
|
||||
// The zero value is not usable; call New.
|
||||
type Subsystem struct {
|
||||
globalCfg config.DataWebRTC
|
||||
coreCfg corewebrtc.Config
|
||||
factory *corewebrtc.PeerFactory
|
||||
logger log.Logger
|
||||
|
||||
mu sync.Mutex
|
||||
streams map[string]*processStream // processID -> stream pair
|
||||
}
|
||||
|
||||
// processStream captures the two Sources (video + audio) backing a
|
||||
// single running process's WHEP egress.
|
||||
type processStream struct {
|
||||
id string
|
||||
video *corewebrtc.Source
|
||||
audio *corewebrtc.Source
|
||||
}
|
||||
|
||||
// New constructs a Subsystem from the global WebRTC config section.
|
||||
// The provided ffmpegUDPMax is advisory for logs only (M2 uses the
|
||||
// OS's ephemeral range via Alloc). Returns an error if the PeerFactory
|
||||
// cannot be built (e.g., bad NAT1To1 IPs).
|
||||
func New(dataCfg config.DataWebRTC, logger log.Logger) (*Subsystem, error) {
|
||||
if logger == nil {
|
||||
logger = log.New("")
|
||||
}
|
||||
|
||||
coreCfg := corewebrtc.DefaultConfig()
|
||||
coreCfg.Enabled = dataCfg.Enable
|
||||
coreCfg.PublicIP = dataCfg.PublicIP
|
||||
|
||||
// If the operator configured multiple NAT1To1 IPs (e.g., dual
|
||||
// LAN/public), they take precedence over PublicIP. Wire them
|
||||
// through via PublicIP as the first entry; core/webrtc currently
|
||||
// reads a single PublicIP, so M2 joins the list with the first
|
||||
// entry winning. (Multi-IP NAT1To1 is an M3 enhancement.)
|
||||
if len(dataCfg.NAT1To1IPs) > 0 && coreCfg.PublicIP == "" {
|
||||
coreCfg.PublicIP = dataCfg.NAT1To1IPs[0]
|
||||
}
|
||||
|
||||
factory, err := corewebrtc.NewPeerFactory(coreCfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("webrtc subsystem: build peer factory: %w", err)
|
||||
}
|
||||
|
||||
return &Subsystem{
|
||||
globalCfg: dataCfg,
|
||||
coreCfg: coreCfg,
|
||||
factory: factory,
|
||||
logger: logger.WithComponent("WebRTC"),
|
||||
streams: make(map[string]*processStream),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Enabled reports whether the subsystem should register hooks and
|
||||
// serve the WHEP endpoint. Called by the API wiring layer to decide
|
||||
// whether to install anything.
|
||||
func (s *Subsystem) Enabled() bool {
|
||||
return s.globalCfg.Enable
|
||||
}
|
||||
|
||||
// Hooks returns the restream.ProcessHooks the subsystem expects to be
|
||||
// installed via restream.Restreamer.SetHooks. Exactly one Subsystem
|
||||
// instance should be installed per Restreamer.
|
||||
func (s *Subsystem) Hooks() restream.ProcessHooks {
|
||||
return restream.ProcessHooks{
|
||||
OnStart: s.onProcessStart,
|
||||
OnStop: s.onProcessStop,
|
||||
}
|
||||
}
|
||||
|
||||
// Close tears down every active per-process stream. It is safe to
|
||||
// call during Core shutdown; subsequent WHEP requests will 404.
|
||||
func (s *Subsystem) Close() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for id, st := range s.streams {
|
||||
if st.video != nil {
|
||||
_ = st.video.Close()
|
||||
}
|
||||
if st.audio != nil {
|
||||
_ = st.audio.Close()
|
||||
}
|
||||
delete(s.streams, id)
|
||||
}
|
||||
}
|
||||
|
||||
// lookup returns the per-process stream pair for id, or nil, false.
|
||||
// Used by the WHEP handler.
|
||||
func (s *Subsystem) lookup(id string) (*processStream, bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
st, ok := s.streams[id]
|
||||
return st, ok
|
||||
}
|
||||
|
|
@ -227,6 +227,12 @@ func (d *Config) init() {
|
|||
d.vars.Register(value.NewBool(&d.SRT.Log.Enable, false), "srt.log.enable", "CORE_SRT_LOG_ENABLE", nil, "Enable SRT server logging", false, false)
|
||||
d.vars.Register(value.NewStringList(&d.SRT.Log.Topics, []string{}, ","), "srt.log.topics", "CORE_SRT_LOG_TOPICS", nil, "List of topics to log", false, false)
|
||||
|
||||
// WebRTC (Dragon Fork M2)
|
||||
d.vars.Register(value.NewBool(&d.WebRTC.Enable, false), "webrtc.enable", "CORE_WEBRTC_ENABLE", nil, "Enable WebRTC egress subsystem", false, false)
|
||||
d.vars.Register(value.NewString(&d.WebRTC.PublicIP, ""), "webrtc.public_ip", "CORE_WEBRTC_PUBLIC_IP", nil, "ICE NAT1To1 host candidate IP (LAN or public)", false, false)
|
||||
d.vars.Register(value.NewStringList(&d.WebRTC.NAT1To1IPs, []string{}, " "), "webrtc.nat_1_to_1_ips", "CORE_WEBRTC_NAT_1_TO_1_IPS", nil, "Advanced: multiple NAT1To1 IPs", false, false)
|
||||
d.vars.Register(value.NewInt(&d.WebRTC.UDPMuxPort, 0), "webrtc.udp_mux_port", "CORE_WEBRTC_UDP_MUX_PORT", nil, "Single UDP port for all ICE traffic (0 = ephemeral)", false, false)
|
||||
|
||||
// FFmpeg
|
||||
d.vars.Register(value.NewExec(&d.FFmpeg.Binary, "ffmpeg", d.fs), "ffmpeg.binary", "CORE_FFMPEG_BINARY", nil, "Path to ffmpeg binary", true, false)
|
||||
d.vars.Register(value.NewInt64(&d.FFmpeg.MaxProcesses, 0), "ffmpeg.max_processes", "CORE_FFMPEG_MAXPROCESSES", nil, "Max. allowed simultaneously running ffmpeg instances, 0 for unlimited", false, false)
|
||||
|
|
|
|||
|
|
@ -113,6 +113,7 @@ type Data struct {
|
|||
Topics []string `json:"topics"`
|
||||
} `json:"log"`
|
||||
} `json:"srt"`
|
||||
WebRTC DataWebRTC `json:"webrtc"`
|
||||
FFmpeg struct {
|
||||
Binary string `json:"binary"`
|
||||
MaxProcesses int64 `json:"max_processes" format:"int64"`
|
||||
|
|
@ -334,3 +335,12 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
|
|||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// DataWebRTC is the global WebRTC egress configuration. Promoted to a
|
||||
// named type so the app/webrtc subsystem can accept it by value.
|
||||
type DataWebRTC struct {
|
||||
Enable bool `json:"enable"`
|
||||
PublicIP string `json:"public_ip"`
|
||||
NAT1To1IPs []string `json:"nat_1_to_1_ips"`
|
||||
UDPMuxPort int `json:"udp_mux_port" format:"int"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,10 +6,9 @@ import (
|
|||
)
|
||||
|
||||
// forwardRTP reads packets from sub and writes them to the correct track
|
||||
// based on payload type (H.264 → video, Opus → audio). Payload-type
|
||||
// inspection is the simplest M1 approach; M2 will switch to per-track
|
||||
// source channels once the process resolver manages separate video/audio
|
||||
// UDP ports.
|
||||
// based on payload type (H.264 → video, Opus → audio). Used by the M1
|
||||
// single-source PoC where FFmpeg emits both video and audio RTP to the
|
||||
// same UDP port.
|
||||
func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet,
|
||||
video, audio *webrtc.TrackLocalStaticRTP) {
|
||||
for {
|
||||
|
|
@ -35,3 +34,29 @@ func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// forwardRTPSplit is the M2 variant: it reads from two independent
|
||||
// per-track channels (one video, one audio) and writes each to its
|
||||
// own Pion track. This is the mode used when the restream manager
|
||||
// emits two FFmpeg RTP legs on separate UDP ports. Either channel
|
||||
// closing or done firing terminates the loop.
|
||||
func forwardRTPSplit(done <-chan struct{},
|
||||
videoSub <-chan *rtp.Packet, audioSub <-chan *rtp.Packet,
|
||||
video, audio *webrtc.TrackLocalStaticRTP) {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case pkt, ok := <-videoSub:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
_ = video.WriteRTP(pkt)
|
||||
case pkt, ok := <-audioSub:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
_ = audio.WriteRTP(pkt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,15 +44,27 @@ func NewPeerFactory(c Config) (*PeerFactory, error) {
|
|||
return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil
|
||||
}
|
||||
|
||||
// Peer wraps a Pion PeerConnection bound to a Source's subscription.
|
||||
// Peer wraps a Pion PeerConnection bound to either a single Source
|
||||
// subscription (M1, payload-type split forwarding) or to a pair of
|
||||
// video+audio Source subscriptions (M2, per-track forwarding).
|
||||
type Peer struct {
|
||||
resourceID string
|
||||
pc *webrtc.PeerConnection
|
||||
answer webrtc.SessionDescription
|
||||
source *Source
|
||||
sub chan *rtp.Packet
|
||||
done chan struct{}
|
||||
once sync.Once
|
||||
|
||||
// M1 single-source mode: source+sub are set, videoSource/audioSource are nil.
|
||||
source *Source
|
||||
sub chan *rtp.Packet
|
||||
|
||||
// M2 two-source mode: videoSource/audioSource and their subs are set,
|
||||
// source/sub are nil.
|
||||
videoSource *Source
|
||||
audioSource *Source
|
||||
videoSub chan *rtp.Packet
|
||||
audioSub chan *rtp.Packet
|
||||
|
||||
done chan struct{}
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// CreatePeer builds a PeerConnection, sets the remote offer, generates an
|
||||
|
|
@ -140,18 +152,111 @@ func (p *Peer) Answer() webrtc.SessionDescription { return p.answer }
|
|||
// ResourceID returns the stable resource id used in the WHEP Location header.
|
||||
func (p *Peer) ResourceID() string { return p.resourceID }
|
||||
|
||||
// Close tears down the peer connection and unsubscribes from the source.
|
||||
// Safe to call multiple times.
|
||||
// Close tears down the peer connection and unsubscribes from each
|
||||
// source. Safe to call multiple times.
|
||||
func (p *Peer) Close() error {
|
||||
var err error
|
||||
p.once.Do(func() {
|
||||
close(p.done)
|
||||
p.source.Unsubscribe(p.sub)
|
||||
if p.source != nil && p.sub != nil {
|
||||
p.source.Unsubscribe(p.sub)
|
||||
}
|
||||
if p.videoSource != nil && p.videoSub != nil {
|
||||
p.videoSource.Unsubscribe(p.videoSub)
|
||||
}
|
||||
if p.audioSource != nil && p.audioSub != nil {
|
||||
p.audioSource.Unsubscribe(p.audioSub)
|
||||
}
|
||||
err = p.pc.Close()
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// CreatePeerFromSources is the M2 entry point: it builds a
|
||||
// PeerConnection with video+audio tracks and subscribes each to its
|
||||
// own dedicated Source. Used when the restream manager emits two
|
||||
// FFmpeg RTP legs on separate UDP ports — there is no payload-type
|
||||
// sniffing required, each Source feeds its matching track directly.
|
||||
func (f *PeerFactory) CreatePeerFromSources(ctx context.Context,
|
||||
videoSrc, audioSrc *Source, offer webrtc.SessionDescription) (*Peer, error) {
|
||||
pc, err := f.api.NewPeerConnection(f.rtcConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("webrtc: new peer connection: %w", err)
|
||||
}
|
||||
|
||||
videoTrack, err := webrtc.NewTrackLocalStaticRTP(
|
||||
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264},
|
||||
"video", "dragonfork")
|
||||
if err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: new video track: %w", err)
|
||||
}
|
||||
audioTrack, err := webrtc.NewTrackLocalStaticRTP(
|
||||
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus},
|
||||
"audio", "dragonfork")
|
||||
if err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: new audio track: %w", err)
|
||||
}
|
||||
if _, err := pc.AddTrack(videoTrack); err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: add video track: %w", err)
|
||||
}
|
||||
if _, err := pc.AddTrack(audioTrack); err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: add audio track: %w", err)
|
||||
}
|
||||
|
||||
if err := pc.SetRemoteDescription(offer); err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: set remote: %w", err)
|
||||
}
|
||||
answer, err := pc.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: create answer: %w", err)
|
||||
}
|
||||
|
||||
gatherComplete := webrtc.GatheringCompletePromise(pc)
|
||||
if err := pc.SetLocalDescription(answer); err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: set local: %w", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-gatherComplete:
|
||||
case <-ctx.Done():
|
||||
_ = pc.Close()
|
||||
return nil, ErrICETimeout
|
||||
}
|
||||
|
||||
videoSub := videoSrc.Subscribe(64)
|
||||
audioSub := audioSrc.Subscribe(64)
|
||||
|
||||
p := &Peer{
|
||||
resourceID: newResourceID(),
|
||||
pc: pc,
|
||||
answer: *pc.LocalDescription(),
|
||||
videoSource: videoSrc,
|
||||
audioSource: audioSrc,
|
||||
videoSub: videoSub,
|
||||
audioSub: audioSub,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) {
|
||||
if st == webrtc.PeerConnectionStateFailed ||
|
||||
st == webrtc.PeerConnectionStateDisconnected ||
|
||||
st == webrtc.PeerConnectionStateClosed {
|
||||
_ = p.Close()
|
||||
}
|
||||
})
|
||||
|
||||
go forwardRTPSplit(p.done, videoSub, audioSub, videoTrack, audioTrack)
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func newResourceID() string {
|
||||
b := make([]byte, 8)
|
||||
_, _ = rand.Read(b)
|
||||
|
|
|
|||
60
deploy/truenas/core/Dockerfile
Normal file
60
deploy/truenas/core/Dockerfile
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
# Dragon Fork datarhei Core image (M2 + WebRTC egress).
|
||||
#
|
||||
# Builds the real root Core binary — the one that replaces the M1 PoC
|
||||
# in production. FFmpeg is baked in so restream processes can run the
|
||||
# RTP output legs emitted by the WebRTC subsystem.
|
||||
#
|
||||
# Two-stage:
|
||||
# 1. builder: compile a static Go binary (CGO off — no dynamic libs)
|
||||
# 2. runtime: alpine with ffmpeg for the subprocess path
|
||||
#
|
||||
# Usage via compose:
|
||||
# docker compose -f deploy/truenas/core/docker-compose.yml up -d --build
|
||||
#
|
||||
# The compose file drives configuration via CORE_* env vars — see
|
||||
# README.md in this directory.
|
||||
|
||||
# ---- builder ----
|
||||
# go.mod requires go 1.24; pinning the image keeps Docker's toolchain
|
||||
# download off the hot path and makes the build reproducible.
|
||||
FROM golang:1.24-alpine3.20 AS builder
|
||||
|
||||
WORKDIR /src
|
||||
RUN apk add --no-cache git make
|
||||
|
||||
COPY . .
|
||||
|
||||
ENV CGO_ENABLED=0 GOOS=linux GOARCH=amd64
|
||||
RUN make release && make import && make ffmigrate
|
||||
|
||||
# ---- runtime ----
|
||||
# Alpine with ffmpeg (Core shells out to it for every restream process).
|
||||
# Scratch isn't an option here because the process manager needs ffmpeg
|
||||
# on PATH.
|
||||
FROM alpine:3.20 AS runtime
|
||||
|
||||
RUN apk add --no-cache ffmpeg tini ca-certificates
|
||||
|
||||
# make release's `-o core` lands the binary inside the core/ Go
|
||||
# package directory (Go cannot overwrite a directory with a file, so
|
||||
# it places the output file _inside_ it). The `import` and `ffmigrate`
|
||||
# Makefile targets cd into app/<name> and write the binary back up to
|
||||
# the repo root with a relative path, so those end up at /src/import
|
||||
# and /src/ffmigrate.
|
||||
COPY --from=builder /src/core/core /core/bin/core
|
||||
COPY --from=builder /src/import /core/bin/import
|
||||
COPY --from=builder /src/ffmigrate /core/bin/ffmigrate
|
||||
COPY --from=builder /src/mime.types /core/mime.types
|
||||
COPY --from=builder /src/run.sh /core/bin/run.sh
|
||||
|
||||
RUN mkdir -p /core/config /core/data
|
||||
|
||||
ENV CORE_CONFIGFILE=/core/config/config.json
|
||||
ENV CORE_STORAGE_DISK_DIR=/core/data
|
||||
ENV CORE_DB_DIR=/core/config
|
||||
|
||||
VOLUME ["/core/data", "/core/config"]
|
||||
EXPOSE 8080/tcp
|
||||
|
||||
ENTRYPOINT ["/sbin/tini", "--", "/core/bin/run.sh"]
|
||||
WORKDIR /core
|
||||
102
deploy/truenas/core/README.md
Normal file
102
deploy/truenas/core/README.md
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
# TrueNAS deploy — datarhei Core (M2, WebRTC-in-Core)
|
||||
|
||||
Host-networked Docker stack that runs the real root Core binary with
|
||||
the M2 WebRTC egress subsystem wired in. This replaces the M1
|
||||
`webrtc-poc` stack — WebRTC is now a first-class output alongside
|
||||
RTMP/SRT/HLS.
|
||||
|
||||
## What changed from M1
|
||||
|
||||
| M1 (webrtc-poc) | M2 (this stack) |
|
||||
| -------------------------------------- | -------------------------------------------- |
|
||||
| Standalone `cmd/webrtc-poc` binary | Full Core with restream, HTTP API, storage |
|
||||
| One hard-coded stream id | Every restream process can opt into WebRTC |
|
||||
| Single UDP ingest, PT-split forwarding | Two UDP ports per process, per-track |
|
||||
| Plain `/whep/:id` on a side port | `/api/v3/whep/:id` on the JWT-protected API |
|
||||
| No auth | JWT (same creds as the rest of Core) |
|
||||
|
||||
## Prereqs
|
||||
|
||||
- Docker on the TrueNAS host (TrueNAS SCALE includes it)
|
||||
- LAN or public IP that clients can reach (set in `.env` as `PUBLIC_IP`)
|
||||
- Admin credentials for Core's API
|
||||
- FFmpeg is bundled in the image — no host install required
|
||||
|
||||
## One-time setup
|
||||
|
||||
```
|
||||
sudo mkdir -p /mnt/NVME/Docker/dragonfork-core
|
||||
cd /mnt/NVME/Docker/dragonfork-core
|
||||
|
||||
# Pull the repo (or sync deploy files) onto the host. The compose
|
||||
# build `context:` points at the repo root.
|
||||
git clone https://forgejo.wilddragon.net/zgaetano/datarhei-dragonfork-core.git
|
||||
cd datarhei-dragonfork-core/deploy/truenas/core
|
||||
|
||||
cat > .env <<EOF
|
||||
PUBLIC_IP=10.0.0.25
|
||||
CORE_HTTP_PORT=8080
|
||||
API_AUTH_USERNAME=admin
|
||||
API_AUTH_PASSWORD=$(openssl rand -base64 24)
|
||||
API_AUTH_JWT_SECRET=$(openssl rand -base64 48)
|
||||
LOG_LEVEL=info
|
||||
EOF
|
||||
|
||||
mkdir -p config data
|
||||
```
|
||||
|
||||
## Run
|
||||
|
||||
```
|
||||
docker compose up -d --build
|
||||
docker compose logs -f
|
||||
```
|
||||
|
||||
You should see Core come up logging all configured listeners, including
|
||||
a line from the WebRTC component confirming the subsystem is enabled.
|
||||
|
||||
## Smoke-test via API
|
||||
|
||||
```
|
||||
# Issue a JWT against the admin creds from .env:
|
||||
TOKEN=$(curl -s -X POST -H 'Content-Type: application/json' \
|
||||
-d '{"username":"admin","password":"<from .env>"}' \
|
||||
http://10.0.0.25:8080/api/login | jq -r '.access_token')
|
||||
|
||||
# Probe the WHEP endpoint — should 404 for an unknown id.
|
||||
curl -i -H "Authorization: Bearer $TOKEN" \
|
||||
-X POST http://10.0.0.25:8080/api/v3/whep/nope
|
||||
# → HTTP/1.1 404 Not Found
|
||||
|
||||
# Create a process with WebRTC enabled, send RTMP to its input, then
|
||||
# subscribe the Pion whep-client to /api/v3/whep/<process-id>.
|
||||
```
|
||||
|
||||
## Cutting over from the M1 PoC
|
||||
|
||||
The M1 `webrtc-poc` stack is independent; it binds its own ports. You
|
||||
can run both side-by-side during the cutover:
|
||||
|
||||
```
|
||||
# Stop the M1 stack when you're ready to retire it:
|
||||
cd /mnt/NVME/Docker/dragonfork-webrtc-poc
|
||||
docker compose down
|
||||
```
|
||||
|
||||
## Teardown
|
||||
|
||||
```
|
||||
docker compose down
|
||||
```
|
||||
|
||||
## Security notes
|
||||
|
||||
- The WHEP endpoint is mounted under `/api/v3`, which is JWT-protected.
|
||||
That's the M2 posture — WHEP clients (browsers) need a token. M3
|
||||
adds per-process signed-URL tokens so embeds don't require admin
|
||||
credentials.
|
||||
- The binary runs as root inside the container; if you need an unpriv
|
||||
user, mount volumes owned by a fixed UID and add a `user:` directive.
|
||||
This matches how the upstream datarhei/core image ships.
|
||||
- Put Caddy or nginx in front for TLS. The media itself is
|
||||
DTLS-SRTP-encrypted regardless.
|
||||
56
deploy/truenas/core/docker-compose.yml
Normal file
56
deploy/truenas/core/docker-compose.yml
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
# Dragon Fork datarhei Core — M2 deployment with WebRTC egress.
|
||||
#
|
||||
# This replaces the M1 webrtc-poc stack. It runs the real root Core
|
||||
# binary with the WebRTC subsystem wired into the restream manager, so
|
||||
# every process whose config has `webrtc.enabled=true` will have its
|
||||
# output fanned out to WHEP subscribers automatically.
|
||||
#
|
||||
# Host networking is required for the same reason as M1: ICE encodes
|
||||
# host:port pairs into SDP candidates, and bridge-mode port mapping
|
||||
# breaks that.
|
||||
#
|
||||
# Copy this file to /mnt/NVME/Docker/dragonfork-core/ alongside a .env:
|
||||
#
|
||||
# PUBLIC_IP=10.0.0.25
|
||||
# API_AUTH_USERNAME=admin
|
||||
# API_AUTH_PASSWORD=change-me-please
|
||||
# API_AUTH_JWT_SECRET=<32+ random bytes, base64>
|
||||
#
|
||||
# Then:
|
||||
# docker compose up -d --build
|
||||
# docker compose logs -f
|
||||
|
||||
services:
|
||||
core:
|
||||
build:
|
||||
context: ../../.. # repo root (where go.mod lives)
|
||||
dockerfile: deploy/truenas/core/Dockerfile
|
||||
container_name: dragonfork-core
|
||||
restart: unless-stopped
|
||||
network_mode: host
|
||||
environment:
|
||||
# --- API ---
|
||||
CORE_ADDRESS: ":${CORE_HTTP_PORT:-8080}"
|
||||
CORE_API_AUTH_ENABLE: "true"
|
||||
CORE_API_AUTH_USERNAME: "${API_AUTH_USERNAME:?set in .env}"
|
||||
CORE_API_AUTH_PASSWORD: "${API_AUTH_PASSWORD:?set in .env}"
|
||||
CORE_API_AUTH_JWT_SECRET: "${API_AUTH_JWT_SECRET:?set in .env}"
|
||||
|
||||
# --- WebRTC egress ---
|
||||
CORE_WEBRTC_ENABLE: "true"
|
||||
CORE_WEBRTC_PUBLIC_IP: "${PUBLIC_IP:?set in .env}"
|
||||
# Leave NAT1To1_IPS empty unless you need multiple advertised IPs.
|
||||
# CORE_WEBRTC_NAT_1_TO_1_IPS: "10.0.0.25 203.0.113.10"
|
||||
|
||||
# --- Storage ---
|
||||
# Let the volumes below provide durable paths; defaults are fine.
|
||||
|
||||
# --- Logging ---
|
||||
CORE_LOG_LEVEL: "${LOG_LEVEL:-info}"
|
||||
|
||||
volumes:
|
||||
- ./config:/core/config
|
||||
- ./data:/core/data
|
||||
|
||||
# No ports: host networking exposes whatever the process binds.
|
||||
# The WHEP endpoint lives at /api/v3/whep/:id on CORE_HTTP_PORT.
|
||||
839
docs/superpowers/plans/2026-04-17-m2-webrtc-core-integration.md
Normal file
839
docs/superpowers/plans/2026-04-17-m2-webrtc-core-integration.md
Normal file
|
|
@ -0,0 +1,839 @@
|
|||
# M2 — WebRTC into datarhei Core proper — Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Wire the M1 `core/webrtc` package into the datarhei Core binary as a first-class output, served via WHEP under `/api/v3/process/{id}/whep`, with an eagerly bound `Source` per WebRTC-enabled process.
|
||||
|
||||
**Architecture:** New `app/webrtc` sibling subsystem that hooks into restream's process lifecycle. Two small additions to restream (`ProcessHooks` callbacks + `AppendOutput` method). Reuses the untouched M1 `core/webrtc` package. UI lives in a separate core-ui repo and is deferred to a sibling plan.
|
||||
|
||||
**Tech Stack:** Go 1.24, Pion WebRTC v4 (via `core/webrtc` from M1), Echo v4 HTTP router, existing datarhei Core subsystem pattern.
|
||||
|
||||
**Spec:** `docs/design/2026-04-17-datarhei-dragon-fork-m2-webrtc-core-integration.md`
|
||||
|
||||
**Branch:** `m2-webrtc-core-integration` (already created from `m1-webrtc-poc`).
|
||||
|
||||
---
|
||||
|
||||
## File Structure
|
||||
|
||||
**New files:**
|
||||
- `app/webrtc/portalloc.go` + `portalloc_test.go` — ephemeral UDP port allocation
|
||||
- `app/webrtc/ffmpeg_args.go` + `ffmpeg_args_test.go` — builds `-f rtp …` output fragments
|
||||
- `app/webrtc/lifecycle.go` + `lifecycle_test.go` — `OnStart`/`OnStop` hook bodies
|
||||
- `app/webrtc/subsystem.go` + `subsystem_test.go` — `WebRTC` struct; `Start`/`Stop`
|
||||
- `app/webrtc/handler.go` + `handler_test.go` — WHEP HTTP handler
|
||||
- `core/webrtc/registry.go` already exists — no changes.
|
||||
|
||||
**Modified files:**
|
||||
- `restream/app/process.go` — add `ConfigWebRTC` type and `WebRTC` field on `Config`. Update `Clone()` and `CreateCommand()`.
|
||||
- `restream/restream.go` — add `ProcessHooks` and `AppendOutput`.
|
||||
- `config/data.go` — add `WebRTC` block on `Data` struct.
|
||||
- `config/config.go` — `vars.Register` entries for WebRTC fields.
|
||||
- `app/api/api.go` — instantiate the WebRTC subsystem alongside restream.
|
||||
- `http/server.go` — mount `/whep` routes under existing `/api/v3` group.
|
||||
|
||||
---
|
||||
|
||||
## Task 1 — `ConfigWebRTC` on restream's `Config`
|
||||
|
||||
**Files:**
|
||||
- Modify: `restream/app/process.go`
|
||||
|
||||
- [ ] **Step 1.1 — Add `ConfigWebRTC` type + field**
|
||||
|
||||
Append after `ConfigIO` definition (~line 34), add field to `Config`:
|
||||
|
||||
```go
|
||||
type ConfigWebRTC struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
VideoPT uint8 `json:"video_pt"`
|
||||
AudioPT uint8 `json:"audio_pt"`
|
||||
ForceTranscode bool `json:"force_transcode"`
|
||||
}
|
||||
|
||||
func (w ConfigWebRTC) Clone() ConfigWebRTC { return w }
|
||||
```
|
||||
|
||||
Add to `Config` struct:
|
||||
```go
|
||||
WebRTC ConfigWebRTC `json:"webrtc"`
|
||||
```
|
||||
|
||||
- [ ] **Step 1.2 — Update `Config.Clone()` to carry WebRTC**
|
||||
|
||||
```go
|
||||
clone.WebRTC = config.WebRTC.Clone()
|
||||
```
|
||||
|
||||
- [ ] **Step 1.3 — Verify build**
|
||||
|
||||
Run: `go build ./restream/...`
|
||||
Expected: no errors.
|
||||
|
||||
- [ ] **Step 1.4 — Commit**
|
||||
|
||||
```bash
|
||||
git add restream/app/process.go
|
||||
git commit -m "feat(restream): add ConfigWebRTC per-process field"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 2 — `DataWebRTC` on global config
|
||||
|
||||
**Files:**
|
||||
- Modify: `config/data.go`
|
||||
- Modify: `config/config.go`
|
||||
|
||||
- [ ] **Step 2.1 — Add `WebRTC` block to `Data`**
|
||||
|
||||
In `config/data.go`, following the pattern of `SRT`/`FFmpeg` blocks, add near the similar service blocks:
|
||||
|
||||
```go
|
||||
WebRTC struct {
|
||||
Enable bool `json:"enable"`
|
||||
PublicIP string `json:"public_ip"`
|
||||
NAT1To1IPs []string `json:"nat_1_to_1_ips"`
|
||||
UDPMuxPort int `json:"udp_mux_port"`
|
||||
} `json:"webrtc"`
|
||||
```
|
||||
|
||||
- [ ] **Step 2.2 — Register vars**
|
||||
|
||||
In `config/config.go`, at the end of the `vars.Register` block, add:
|
||||
|
||||
```go
|
||||
d.vars.Register(value.NewBool(&d.WebRTC.Enable, false), "webrtc.enable", "CORE_WEBRTC_ENABLE", nil, "Enable WebRTC egress subsystem", false, false)
|
||||
d.vars.Register(value.NewString(&d.WebRTC.PublicIP, ""), "webrtc.public_ip", "CORE_WEBRTC_PUBLIC_IP", nil, "ICE NAT1To1 host candidate IP", false, false)
|
||||
d.vars.Register(value.NewStringList(&d.WebRTC.NAT1To1IPs, []string{}, " "), "webrtc.nat_1_to_1_ips", "CORE_WEBRTC_NAT_1_TO_1_IPS", nil, "Advanced: multiple NAT1To1 IPs", false, false)
|
||||
d.vars.Register(value.NewInt(&d.WebRTC.UDPMuxPort, 0), "webrtc.udp_mux_port", "CORE_WEBRTC_UDP_MUX_PORT", nil, "Single UDP port for all ICE traffic (0 = ephemeral)", false, false)
|
||||
```
|
||||
|
||||
(If the project uses a different `vars.Register` signature, match the neighbors.)
|
||||
|
||||
- [ ] **Step 2.3 — Verify build and commit**
|
||||
|
||||
```bash
|
||||
go build ./config/...
|
||||
git add config/data.go config/config.go
|
||||
git commit -m "feat(config): add webrtc global config block"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 3 — `ProcessHooks` + `AppendOutput` on restream
|
||||
|
||||
**Files:**
|
||||
- Modify: `restream/restream.go`
|
||||
|
||||
- [ ] **Step 3.1 — Add `ProcessHook`, `ProcessHooks` types and field on restream struct**
|
||||
|
||||
Near the top (after imports, in the types region):
|
||||
|
||||
```go
|
||||
// ProcessHook is called at well-defined points in a process's lifecycle.
|
||||
// Return a non-nil error to abort the start (OnStart only; OnStop errors
|
||||
// are logged and otherwise ignored).
|
||||
type ProcessHook func(id string, cfg *app.Config) error
|
||||
|
||||
// ProcessHooks carries optional lifecycle callbacks for restream to invoke.
|
||||
// A nil hook is a no-op.
|
||||
type ProcessHooks struct {
|
||||
OnStart ProcessHook // fires after args are assembled, before exec
|
||||
OnStop ProcessHook // fires after wait() returns
|
||||
}
|
||||
```
|
||||
|
||||
Add a field to the `restream` struct:
|
||||
```go
|
||||
hooks ProcessHooks
|
||||
```
|
||||
|
||||
Add a `SetHooks` method:
|
||||
```go
|
||||
func (r *restream) SetHooks(h ProcessHooks) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.hooks = h
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3.2 — Wire OnStart / OnStop into the task lifecycle**
|
||||
|
||||
Find the `startProcess` / `ffmpeg.Start()` call site (~line 1065 per survey). Before the `Start()` call, insert:
|
||||
|
||||
```go
|
||||
if r.hooks.OnStart != nil {
|
||||
if err := r.hooks.OnStart(task.id, task.config); err != nil {
|
||||
r.logger.WithField("id", task.id).WithError(err).Error().Log("OnStart hook aborted process start")
|
||||
return err
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Find `stopProcess` / `ffmpeg.Stop()` (~line 1094). After the stop completes, add:
|
||||
|
||||
```go
|
||||
if r.hooks.OnStop != nil {
|
||||
if err := r.hooks.OnStop(task.id, task.config); err != nil {
|
||||
r.logger.WithField("id", task.id).WithError(err).Warn().Log("OnStop hook returned error")
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3.3 — `AppendOutput`**
|
||||
|
||||
Add:
|
||||
|
||||
```go
|
||||
// AppendOutput appends extra FFmpeg args to a process's pending command.
|
||||
// Only valid during OnStart (between hook fire and exec). Returns an
|
||||
// error otherwise.
|
||||
func (r *restream) AppendOutput(id string, extra []string) error {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
t, ok := r.tasks[id]
|
||||
if !ok {
|
||||
return fmt.Errorf("restream: no such process %q", id)
|
||||
}
|
||||
if t.config == nil {
|
||||
return fmt.Errorf("restream: process %q has no config", id)
|
||||
}
|
||||
// Append to the free-form Options slice on a synthetic ConfigIO so
|
||||
// CreateCommand picks it up. We model this as an extra Output with
|
||||
// empty Address — address is carried inside extra itself.
|
||||
t.config.Output = append(t.config.Output, app.ConfigIO{
|
||||
ID: "webrtc",
|
||||
Options: extra,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
Note: callers build `extra` so that the last element is the UDP address; the appended `ConfigIO` has empty `Address` so `CreateCommand` won't double-append. Instead, fix `CreateCommand` to support this — or (cleaner) pass the address as the last entry of `Options` and set the inserted `ConfigIO.Address` to that last entry, dropping it from `Options`. Concretely:
|
||||
|
||||
```go
|
||||
func (r *restream) AppendOutput(id string, extra []string) error {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
t, ok := r.tasks[id]
|
||||
if !ok {
|
||||
return fmt.Errorf("restream: no such process %q", id)
|
||||
}
|
||||
if t.config == nil || len(extra) == 0 {
|
||||
return fmt.Errorf("restream: append-output invalid args")
|
||||
}
|
||||
opts, addr := extra[:len(extra)-1], extra[len(extra)-1]
|
||||
t.config.Output = append(t.config.Output, app.ConfigIO{
|
||||
ID: "webrtc",
|
||||
Address: addr,
|
||||
Options: append([]string{}, opts...),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3.4 — Verify build and commit**
|
||||
|
||||
```bash
|
||||
go build ./restream/...
|
||||
git add restream/restream.go
|
||||
git commit -m "feat(restream): add ProcessHooks and AppendOutput"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 4 — `app/webrtc/portalloc.go` (TDD)
|
||||
|
||||
**Files:**
|
||||
- Create: `app/webrtc/portalloc.go`
|
||||
- Create: `app/webrtc/portalloc_test.go`
|
||||
|
||||
- [ ] **Step 4.1 — Write failing test**
|
||||
|
||||
```go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAlloc_ReturnsPortBindable(t *testing.T) {
|
||||
for i := 0; i < 100; i++ {
|
||||
p, err := Alloc()
|
||||
if err != nil {
|
||||
t.Fatalf("Alloc: %v", err)
|
||||
}
|
||||
c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127,0,0,1), Port: p})
|
||||
if err != nil {
|
||||
t.Fatalf("iter %d: rebind %d: %v", i, p, err)
|
||||
}
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAlloc_Nonzero(t *testing.T) {
|
||||
p, err := Alloc()
|
||||
if err != nil { t.Fatal(err) }
|
||||
if p == 0 { t.Fatal("expected non-zero port") }
|
||||
fmt.Sprintf("%d", p)
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4.2 — Run test (should fail to compile)**
|
||||
|
||||
```bash
|
||||
go test ./app/webrtc/ -run TestAlloc -race
|
||||
```
|
||||
|
||||
- [ ] **Step 4.3 — Implement**
|
||||
|
||||
```go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
// Alloc binds :0 on loopback UDPv4, records the assigned port, closes the
|
||||
// socket, and returns the port. Callers must re-bind immediately; if the
|
||||
// port is taken in the gap (rare), the rebind will fail and the caller
|
||||
// should propagate that error.
|
||||
func Alloc() (int, error) {
|
||||
c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127,0,0,1), Port: 0})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("webrtc portalloc: %w", err)
|
||||
}
|
||||
defer c.Close()
|
||||
return c.LocalAddr().(*net.UDPAddr).Port, nil
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4.4 — Run, commit**
|
||||
|
||||
```bash
|
||||
go test ./app/webrtc/ -race
|
||||
git add app/webrtc/portalloc.go app/webrtc/portalloc_test.go
|
||||
git commit -m "feat(app/webrtc): ephemeral loopback UDP port allocator"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 5 — `app/webrtc/ffmpeg_args.go` (TDD)
|
||||
|
||||
**Files:**
|
||||
- Create: `app/webrtc/ffmpeg_args.go`
|
||||
- Create: `app/webrtc/ffmpeg_args_test.go`
|
||||
|
||||
- [ ] **Step 5.1 — Write failing test**
|
||||
|
||||
```go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
func TestBuildArgs_CopyCodecs(t *testing.T) {
|
||||
cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}
|
||||
got := BuildArgs(cfg, 49200)
|
||||
want := []string{
|
||||
"-map", "0:v:0", "-c:v", "copy", "-payload_type", "102", "-f", "rtp",
|
||||
"udp://127.0.0.1:49200?pkt_size=1316",
|
||||
"-map", "0:a:0", "-c:a", "copy", "-payload_type", "111", "-f", "rtp",
|
||||
"udp://127.0.0.1:49201?pkt_size=1316",
|
||||
}
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("BuildArgs mismatch\ngot: %v\nwant: %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildArgs_ForceTranscode(t *testing.T) {
|
||||
cfg := appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111, ForceTranscode: true}
|
||||
got := BuildArgs(cfg, 49200)
|
||||
// video leg should include -c:v libx264 / profile=baseline
|
||||
if !containsSeq(got, []string{"-c:v", "libx264"}) {
|
||||
t.Fatalf("expected -c:v libx264, got %v", got)
|
||||
}
|
||||
if !containsSeq(got, []string{"-c:a", "libopus"}) {
|
||||
t.Fatalf("expected -c:a libopus, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func containsSeq(haystack, needle []string) bool {
|
||||
for i := 0; i+len(needle) <= len(haystack); i++ {
|
||||
match := true
|
||||
for j := range needle {
|
||||
if haystack[i+j] != needle[j] { match = false; break }
|
||||
}
|
||||
if match { return true }
|
||||
}
|
||||
return false
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 5.2 — Implement**
|
||||
|
||||
```go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
// BuildArgs returns the FFmpeg output-leg args for a WebRTC-enabled
|
||||
// process. The caller passes a video RTP port; audio uses port+1.
|
||||
// The returned slice is designed for restream.AppendOutput — the final
|
||||
// element is the UDP address, the rest are options.
|
||||
//
|
||||
// We emit two separate outputs (one per track) so that -payload_type
|
||||
// applies correctly to each. This produces *two* calls' worth of args
|
||||
// but AppendOutput currently handles one output at a time. Callers
|
||||
// should split on the boundary (the second `-map` token).
|
||||
func BuildArgs(cfg appcfg.ConfigWebRTC, videoPort int) []string {
|
||||
vcopy := []string{"-c:v", "copy"}
|
||||
acopy := []string{"-c:a", "copy"}
|
||||
if cfg.ForceTranscode {
|
||||
vcopy = []string{
|
||||
"-c:v", "libx264",
|
||||
"-preset", "veryfast",
|
||||
"-profile:v", "baseline",
|
||||
"-pix_fmt", "yuv420p",
|
||||
"-tune", "zerolatency",
|
||||
"-g", "60",
|
||||
}
|
||||
acopy = []string{"-c:a", "libopus", "-b:a", "96k"}
|
||||
}
|
||||
|
||||
args := []string{"-map", "0:v:0"}
|
||||
args = append(args, vcopy...)
|
||||
args = append(args, "-payload_type", fmt.Sprint(cfg.VideoPT), "-f", "rtp",
|
||||
fmt.Sprintf("udp://127.0.0.1:%d?pkt_size=1316", videoPort))
|
||||
|
||||
args = append(args, "-map", "0:a:0")
|
||||
args = append(args, acopy...)
|
||||
args = append(args, "-payload_type", fmt.Sprint(cfg.AudioPT), "-f", "rtp",
|
||||
fmt.Sprintf("udp://127.0.0.1:%d?pkt_size=1316", videoPort+1))
|
||||
|
||||
return args
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 5.3 — Run, commit**
|
||||
|
||||
```bash
|
||||
go test ./app/webrtc/ -race
|
||||
git add app/webrtc/ffmpeg_args.go app/webrtc/ffmpeg_args_test.go
|
||||
git commit -m "feat(app/webrtc): FFmpeg RTP output arg builder"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 6 — `app/webrtc/subsystem.go` + `lifecycle.go` (TDD)
|
||||
|
||||
**Files:**
|
||||
- Create: `app/webrtc/subsystem.go`, `subsystem_test.go`
|
||||
- Create: `app/webrtc/lifecycle.go`, `lifecycle_test.go`
|
||||
|
||||
- [ ] **Step 6.1 — Subsystem skeleton with dependency interface**
|
||||
|
||||
Because restream is a large package, define the dependency as an interface the subsystem needs:
|
||||
|
||||
```go
|
||||
// app/webrtc/subsystem.go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
core "github.com/datarhei/core/v16/core/webrtc"
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
type Restreamer interface {
|
||||
SetHooks(ProcessHooks)
|
||||
AppendOutput(id string, extra []string) error
|
||||
}
|
||||
|
||||
type ProcessHook func(id string, cfg *appcfg.Config) error
|
||||
|
||||
type ProcessHooks struct {
|
||||
OnStart ProcessHook
|
||||
OnStop ProcessHook
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
PublicIP string
|
||||
NAT1To1IPs []string
|
||||
}
|
||||
|
||||
type Subsystem struct {
|
||||
cfg Config
|
||||
restream Restreamer
|
||||
registry *core.Registry
|
||||
factory *core.PeerFactory
|
||||
|
||||
mu sync.Mutex
|
||||
peers map[string]map[string]*core.Peer // processID -> peerID -> peer
|
||||
started bool
|
||||
}
|
||||
|
||||
func New(cfg Config, r Restreamer) (*Subsystem, error) {
|
||||
ccfg := core.DefaultConfig()
|
||||
ccfg.PublicIP = cfg.PublicIP
|
||||
ccfg.NAT1To1IPs = cfg.NAT1To1IPs
|
||||
f, err := core.NewPeerFactory(ccfg)
|
||||
if err != nil { return nil, err }
|
||||
return &Subsystem{
|
||||
cfg: cfg,
|
||||
restream: r,
|
||||
registry: core.NewRegistry(),
|
||||
factory: f,
|
||||
peers: make(map[string]map[string]*core.Peer),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Subsystem) Start() error {
|
||||
s.mu.Lock()
|
||||
if s.started { s.mu.Unlock(); return nil }
|
||||
s.started = true
|
||||
s.mu.Unlock()
|
||||
s.restream.SetHooks(ProcessHooks{
|
||||
OnStart: s.onProcessStart,
|
||||
OnStop: s.onProcessStop,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Subsystem) Stop() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.started = false
|
||||
s.restream.SetHooks(ProcessHooks{}) // clear
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
**Note:** There's a type mismatch: `restream.ProcessHooks` is in package `restream`, this subsystem has its own `webrtc.ProcessHooks`. In the wiring task we either (a) import `restream.ProcessHooks` in the subsystem, or (b) define an adapter. Cleanest: the subsystem imports `restream` and uses `restream.ProcessHooks`. Let me rewrite using the real type — replace the local `ProcessHook`/`ProcessHooks` with `restream.ProcessHooks`. Do that in the actual implementation; the plan keeps the outline for readability.
|
||||
|
||||
- [ ] **Step 6.2 — Lifecycle (onProcessStart / onProcessStop)**
|
||||
|
||||
```go
|
||||
// app/webrtc/lifecycle.go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
core "github.com/datarhei/core/v16/core/webrtc"
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) error {
|
||||
if cfg == nil || !cfg.WebRTC.Enabled { return nil }
|
||||
|
||||
port, err := Alloc()
|
||||
if err != nil { return fmt.Errorf("webrtc: alloc port: %w", err) }
|
||||
|
||||
args := BuildArgs(cfg.WebRTC, port)
|
||||
if err := s.restream.AppendOutput(id, args); err != nil {
|
||||
return fmt.Errorf("webrtc: append output: %w", err)
|
||||
}
|
||||
|
||||
src, err := core.NewSourceOn(id, "127.0.0.1", port)
|
||||
if err != nil { return fmt.Errorf("webrtc: bind source: %w", err) }
|
||||
src.Start()
|
||||
if err := s.registry.Register(id, src); err != nil {
|
||||
src.Close()
|
||||
return fmt.Errorf("webrtc: register source: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Subsystem) onProcessStop(id string, _ *appcfg.Config) error {
|
||||
s.mu.Lock()
|
||||
peers := s.peers[id]
|
||||
delete(s.peers, id)
|
||||
s.mu.Unlock()
|
||||
for _, p := range peers { _ = p.Close() }
|
||||
if src, ok := s.registry.Get(id); ok {
|
||||
s.registry.Remove(id)
|
||||
_ = src.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 6.3 — Lifecycle test**
|
||||
|
||||
```go
|
||||
// lifecycle_test.go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
type fakeRestream struct {
|
||||
appended map[string][]string
|
||||
}
|
||||
|
||||
func (f *fakeRestream) SetHooks(ProcessHooks) {}
|
||||
func (f *fakeRestream) AppendOutput(id string, extra []string) error {
|
||||
if f.appended == nil { f.appended = map[string][]string{} }
|
||||
f.appended[id] = extra
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestLifecycle_DisabledIsNoop(t *testing.T) {
|
||||
f := &fakeRestream{}
|
||||
s, err := New(Config{}, f)
|
||||
if err != nil { t.Fatal(err) }
|
||||
cfg := &appcfg.Config{ID: "p1", WebRTC: appcfg.ConfigWebRTC{Enabled: false}}
|
||||
if err := s.onProcessStart("p1", cfg); err != nil { t.Fatal(err) }
|
||||
if _, ok := f.appended["p1"]; ok { t.Fatal("expected no append for disabled") }
|
||||
}
|
||||
|
||||
func TestLifecycle_EnabledAppendsAndRegisters(t *testing.T) {
|
||||
f := &fakeRestream{}
|
||||
s, err := New(Config{}, f)
|
||||
if err != nil { t.Fatal(err) }
|
||||
cfg := &appcfg.Config{ID: "p2", WebRTC: appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}}
|
||||
if err := s.onProcessStart("p2", cfg); err != nil { t.Fatal(err) }
|
||||
if len(f.appended["p2"]) == 0 { t.Fatal("expected append") }
|
||||
if _, ok := s.registry.Get("p2"); !ok { t.Fatal("expected registered source") }
|
||||
// teardown
|
||||
if err := s.onProcessStop("p2", cfg); err != nil { t.Fatal(err) }
|
||||
if _, ok := s.registry.Get("p2"); ok { t.Fatal("expected removed") }
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 6.4 — Run, commit**
|
||||
|
||||
```bash
|
||||
go test ./app/webrtc/ -race
|
||||
git add app/webrtc/subsystem.go app/webrtc/subsystem_test.go app/webrtc/lifecycle.go app/webrtc/lifecycle_test.go
|
||||
git commit -m "feat(app/webrtc): subsystem skeleton + process lifecycle hooks"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 7 — `app/webrtc/handler.go` (WHEP HTTP)
|
||||
|
||||
**Files:**
|
||||
- Create: `app/webrtc/handler.go`, `handler_test.go`
|
||||
|
||||
- [ ] **Step 7.1 — Handler: delegate to M1's WHEP handler with process-ID lookup**
|
||||
|
||||
```go
|
||||
// handler.go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
core "github.com/datarhei/core/v16/core/webrtc"
|
||||
"github.com/labstack/echo/v4"
|
||||
)
|
||||
|
||||
// Subscribe handles POST /api/v3/process/:id/whep — look up the Source
|
||||
// for the given process, run a WHEP offer/answer cycle, and forward
|
||||
// RTP to the new peer.
|
||||
func (s *Subsystem) Subscribe(c echo.Context) error {
|
||||
id := c.Param("id")
|
||||
src, ok := s.registry.Get(id)
|
||||
if !ok {
|
||||
return echo.NewHTTPError(http.StatusNotFound, "stream not found")
|
||||
}
|
||||
// Delegate to the M1 WHEP handler — but we already have the source
|
||||
// so we call the lower-level path.
|
||||
offer, err := readBody(c)
|
||||
if err != nil { return echo.NewHTTPError(http.StatusBadRequest, err.Error()) }
|
||||
|
||||
peer, answer, err := s.factory.NewPeerFromOffer(src, offer)
|
||||
if err != nil {
|
||||
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
|
||||
}
|
||||
peerID := peer.ID()
|
||||
s.mu.Lock()
|
||||
if s.peers[id] == nil { s.peers[id] = map[string]*core.Peer{} }
|
||||
s.peers[id][peerID] = peer
|
||||
s.mu.Unlock()
|
||||
|
||||
c.Response().Header().Set("Location",
|
||||
"/api/v3/process/"+id+"/whep/"+peerID)
|
||||
return c.Blob(http.StatusCreated, "application/sdp", []byte(answer))
|
||||
}
|
||||
|
||||
// Unsubscribe handles DELETE /api/v3/process/:id/whep/:peerid.
|
||||
func (s *Subsystem) Unsubscribe(c echo.Context) error {
|
||||
id, peerID := c.Param("id"), c.Param("peerid")
|
||||
s.mu.Lock()
|
||||
peer := s.peers[id][peerID]
|
||||
delete(s.peers[id], peerID)
|
||||
s.mu.Unlock()
|
||||
if peer != nil { _ = peer.Close() }
|
||||
return c.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func readBody(c echo.Context) (string, error) {
|
||||
buf := make([]byte, 0, 8192)
|
||||
for {
|
||||
tmp := make([]byte, 4096)
|
||||
n, err := c.Request().Body.Read(tmp)
|
||||
if n > 0 { buf = append(buf, tmp[:n]...) }
|
||||
if err != nil { break }
|
||||
}
|
||||
return string(buf), nil
|
||||
}
|
||||
```
|
||||
|
||||
**Note:** If `core/webrtc.PeerFactory` doesn't expose `NewPeerFromOffer`, swap in whatever API M1 provided (`factory.NewPeer(...)` taking source+offer). If the M1 handler is higher-level, wrap it instead of reimplementing.
|
||||
|
||||
- [ ] **Step 7.2 — Handler test: 404 on unknown id**
|
||||
|
||||
```go
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
)
|
||||
|
||||
func TestSubscribe_404OnUnknown(t *testing.T) {
|
||||
f := &fakeRestream{}
|
||||
s, _ := New(Config{}, f)
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(""))
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id"); c.SetParamValues("missing")
|
||||
err := s.Subscribe(c)
|
||||
if he, ok := err.(*echo.HTTPError); !ok || he.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnsubscribe_IdempotentNoContent(t *testing.T) {
|
||||
f := &fakeRestream{}
|
||||
s, _ := New(Config{}, f)
|
||||
e := echo.New()
|
||||
req := httptest.NewRequest(http.MethodDelete, "/", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
c := e.NewContext(req, rec)
|
||||
c.SetParamNames("id", "peerid"); c.SetParamValues("p", "nope")
|
||||
if err := s.Unsubscribe(c); err != nil { t.Fatal(err) }
|
||||
if rec.Code != http.StatusNoContent {
|
||||
t.Fatalf("expected 204, got %d", rec.Code)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 7.3 — Run, commit**
|
||||
|
||||
```bash
|
||||
go test ./app/webrtc/ -race
|
||||
git add app/webrtc/handler.go app/webrtc/handler_test.go
|
||||
git commit -m "feat(app/webrtc): WHEP HTTP handler"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 8 — Wire subsystem into app/api/api.go + http/server.go
|
||||
|
||||
**Files:**
|
||||
- Modify: `app/api/api.go`
|
||||
- Modify: `http/server.go`
|
||||
|
||||
- [ ] **Step 8.1 — Instantiate subsystem in api.New**
|
||||
|
||||
In `app/api/api.go`, after `restream := restream.New(...)`, when `cfg.WebRTC.Enable` is true, create the subsystem:
|
||||
|
||||
```go
|
||||
if cfg.WebRTC.Enable {
|
||||
webrtcSub, err := webrtcapp.New(webrtcapp.Config{
|
||||
PublicIP: cfg.WebRTC.PublicIP,
|
||||
NAT1To1IPs: cfg.WebRTC.NAT1To1IPs,
|
||||
}, restream)
|
||||
if err != nil {
|
||||
a.log.logger.core.Warn().WithError(err).Log("webrtc subsystem disabled")
|
||||
} else {
|
||||
_ = webrtcSub.Start()
|
||||
a.webrtc = webrtcSub
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Store on the api struct: `webrtc *webrtcapp.Subsystem`.
|
||||
|
||||
- [ ] **Step 8.2 — Mount HTTP routes**
|
||||
|
||||
In `http/server.go` near line 568 (where `v3.POST("/process", ...)` lives):
|
||||
|
||||
```go
|
||||
if s.webrtc != nil {
|
||||
v3.POST("/process/:id/whep", s.webrtc.Subscribe)
|
||||
v3.DELETE("/process/:id/whep/:peerid", s.webrtc.Unsubscribe)
|
||||
}
|
||||
```
|
||||
|
||||
Plumb `s.webrtc` from api → http/server constructor.
|
||||
|
||||
- [ ] **Step 8.3 — Verify build**
|
||||
|
||||
```bash
|
||||
go build ./...
|
||||
```
|
||||
|
||||
- [ ] **Step 8.4 — Commit**
|
||||
|
||||
```bash
|
||||
git add app/api/api.go http/server.go
|
||||
git commit -m "feat(core): wire webrtc subsystem + WHEP routes"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 9 — Integration smoke test
|
||||
|
||||
**Files:**
|
||||
- Create: `app/webrtc/integration_test.go`
|
||||
|
||||
- [ ] **Step 9.1 — Synthetic RTP → WHEP end-to-end**
|
||||
|
||||
Import M1's `test/whep-client` as a library. Boot a Subsystem, inject synthetic RTP on the allocated port (mimic Task 6's lifecycle), POST a WHEP offer, assert both tracks arrive. See M1's `test/whep-client/main_test.go` for reference.
|
||||
|
||||
- [ ] **Step 9.2 — Run with -race and commit**
|
||||
|
||||
---
|
||||
|
||||
## Task 10 — TrueNAS redeploy
|
||||
|
||||
- [ ] **Step 10.1 — Rebuild Core image (Dockerfile currently targets `cmd/webrtc-poc`; add a second target or switch to the root `./` build for Core proper).**
|
||||
- [ ] **Step 10.2 — Redeploy via docker compose on TrueNAS; verify WHEP endpoint returns 404 before any process exists, 201 after enabling WebRTC on a process.**
|
||||
|
||||
---
|
||||
|
||||
## Out of scope for this plan
|
||||
|
||||
- `core-ui/src/views/Edit/LiveTab.jsx` — core-ui is a separate repo and requires its own plan. Track as M2.5 once core-ui is cloned into the workspace.
|
||||
|
||||
## Self-review notes
|
||||
|
||||
- Task 7 depends on `core/webrtc.PeerFactory.NewPeerFromOffer` signature from M1; if it's named differently, adjust the call site (don't rewrite the handler).
|
||||
- Task 3 Step 3.3 assumes `restream.tasks` is a map keyed by id with a `*task` value that carries `config`. Confirm by reading around line 90 before implementing; the exact struct name may differ.
|
||||
- Task 2 `vars.NewStringList` / `vars.NewInt` signatures need confirming against the real `config/vars/value` package.
|
||||
|
|
@ -33,6 +33,7 @@ import (
|
|||
"net/http"
|
||||
"strings"
|
||||
|
||||
appwebrtc "github.com/datarhei/core/v16/app/webrtc"
|
||||
cfgstore "github.com/datarhei/core/v16/config/store"
|
||||
"github.com/datarhei/core/v16/http/cache"
|
||||
"github.com/datarhei/core/v16/http/errorhandler"
|
||||
|
|
@ -86,6 +87,7 @@ type Config struct {
|
|||
Cors CorsConfig
|
||||
RTMP rtmp.Server
|
||||
SRT srt.Server
|
||||
WebRTC *appwebrtc.Handler
|
||||
JWT jwt.JWT
|
||||
Config cfgstore.Store
|
||||
Cache cache.Cacher
|
||||
|
|
@ -124,6 +126,7 @@ type server struct {
|
|||
session *api.SessionHandler
|
||||
widget *api.WidgetHandler
|
||||
resources *api.MetricsHandler
|
||||
webrtc *appwebrtc.Handler
|
||||
}
|
||||
|
||||
middleware struct {
|
||||
|
|
@ -238,6 +241,10 @@ func NewServer(config Config) (Server, error) {
|
|||
)
|
||||
}
|
||||
|
||||
if config.WebRTC != nil {
|
||||
s.v3handler.webrtc = config.WebRTC
|
||||
}
|
||||
|
||||
if config.Prometheus != nil {
|
||||
s.handler.prometheus = handler.NewPrometheus(
|
||||
config.Prometheus.HTTPHandler(),
|
||||
|
|
@ -545,6 +552,12 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
|
|||
s.router.GET("/api/v3/widget/process/:id", s.v3handler.widget.Get)
|
||||
}
|
||||
|
||||
// v3 WebRTC (WHEP egress). Mounted on the v3 group so JWT auth
|
||||
// covers it in M2; public embed tokens will ship in M3.
|
||||
if s.v3handler.webrtc != nil {
|
||||
s.v3handler.webrtc.Register(v3)
|
||||
}
|
||||
|
||||
// v3 Restreamer
|
||||
if s.v3handler.restream != nil {
|
||||
v3.GET("/skills", s.v3handler.restream.Skills)
|
||||
|
|
|
|||
|
|
@ -18,6 +18,23 @@ type ConfigIO struct {
|
|||
Cleanup []ConfigIOCleanup `json:"cleanup"`
|
||||
}
|
||||
|
||||
// ConfigWebRTC carries per-process WebRTC egress settings.
|
||||
//
|
||||
// When Enabled is true the restream manager will (via the app/webrtc
|
||||
// subsystem) append an additional FFmpeg output leg that emits H.264/Opus
|
||||
// RTP to a loopback UDP port the subsystem allocates. The subsystem reads
|
||||
// that RTP and fans it out to WHEP subscribers.
|
||||
type ConfigWebRTC struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
VideoPT uint8 `json:"video_pt"`
|
||||
AudioPT uint8 `json:"audio_pt"`
|
||||
ForceTranscode bool `json:"force_transcode"`
|
||||
}
|
||||
|
||||
// Clone returns a deep copy of the WebRTC config (currently a value copy;
|
||||
// provided for symmetry with other Clone methods and future-proofing).
|
||||
func (w ConfigWebRTC) Clone() ConfigWebRTC { return w }
|
||||
|
||||
func (io ConfigIO) Clone() ConfigIO {
|
||||
clone := ConfigIO{
|
||||
ID: io.ID,
|
||||
|
|
@ -47,6 +64,7 @@ type Config struct {
|
|||
LimitCPU float64 `json:"limit_cpu_usage"` // percent
|
||||
LimitMemory uint64 `json:"limit_memory_bytes"` // bytes
|
||||
LimitWaitFor uint64 `json:"limit_waitfor_seconds"` // seconds
|
||||
WebRTC ConfigWebRTC `json:"webrtc"`
|
||||
}
|
||||
|
||||
func (config *Config) Clone() *Config {
|
||||
|
|
@ -61,6 +79,7 @@ func (config *Config) Clone() *Config {
|
|||
LimitCPU: config.LimitCPU,
|
||||
LimitMemory: config.LimitMemory,
|
||||
LimitWaitFor: config.LimitWaitFor,
|
||||
WebRTC: config.WebRTC.Clone(),
|
||||
}
|
||||
|
||||
clone.Input = make([]ConfigIO, len(config.Input))
|
||||
|
|
|
|||
|
|
@ -55,6 +55,30 @@ type Restreamer interface {
|
|||
GetProcessMetadata(id, key string) (interface{}, error) // Get previously set metadata from a process
|
||||
SetMetadata(key string, data interface{}) error // Set general metadata
|
||||
GetMetadata(key string) (interface{}, error) // Get previously set general metadata
|
||||
SetHooks(hooks ProcessHooks) // Install per-process lifecycle hooks (e.g., WebRTC subsystem)
|
||||
}
|
||||
|
||||
// ProcessStartHook is invoked synchronously inside startProcess just
|
||||
// before FFmpeg is started. It receives a pointer to the task config;
|
||||
// returning a non-empty slice of ConfigIO appends those output legs to
|
||||
// cfg.Output and causes the FFmpeg command to be rebuilt before
|
||||
// Start(). Returning a non-nil error aborts the start.
|
||||
//
|
||||
// Hooks run with the restream write lock held, so they must not call
|
||||
// back into the Restreamer interface (it would deadlock). They can,
|
||||
// however, mutate cfg.WebRTC metadata or read cfg fields freely.
|
||||
type ProcessStartHook func(id string, cfg *app.Config) ([]app.ConfigIO, error)
|
||||
|
||||
// ProcessStopHook is invoked synchronously inside stopProcess just
|
||||
// after FFmpeg has been stopped. It is a notification for subsystems
|
||||
// to tear down any per-process state they attached at start.
|
||||
type ProcessStopHook func(id string)
|
||||
|
||||
// ProcessHooks bundles the lifecycle callbacks a sibling subsystem
|
||||
// (currently: app/webrtc) installs via SetHooks.
|
||||
type ProcessHooks struct {
|
||||
OnStart ProcessStartHook
|
||||
OnStop ProcessStopHook
|
||||
}
|
||||
|
||||
// Config is the required configuration for a new restreamer instance.
|
||||
|
|
@ -102,12 +126,24 @@ type restream struct {
|
|||
logger log.Logger
|
||||
metadata map[string]interface{}
|
||||
|
||||
hooks ProcessHooks
|
||||
|
||||
lock sync.RWMutex
|
||||
|
||||
startOnce sync.Once
|
||||
stopOnce sync.Once
|
||||
}
|
||||
|
||||
// SetHooks installs the process lifecycle hooks. The caller is
|
||||
// responsible for installing hooks before Start() is invoked; calling
|
||||
// SetHooks on a running instance is safe but only affects subsequent
|
||||
// start/stop transitions (not the one currently in flight).
|
||||
func (r *restream) SetHooks(hooks ProcessHooks) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.hooks = hooks
|
||||
}
|
||||
|
||||
// New returns a new instance that implements the Restreamer interface
|
||||
func New(config Config) (Restreamer, error) {
|
||||
r := &restream{
|
||||
|
|
@ -1062,6 +1098,39 @@ func (r *restream) startProcess(id string) error {
|
|||
|
||||
task.process.Order = "start"
|
||||
|
||||
// Invoke the per-process start hook (used by app/webrtc to append
|
||||
// RTP output legs). If it returns ConfigIO entries, append them to
|
||||
// the output list and rebuild the FFmpeg process with the new
|
||||
// command before we start it.
|
||||
if r.hooks.OnStart != nil {
|
||||
extras, err := r.hooks.OnStart(task.id, task.config)
|
||||
if err != nil {
|
||||
r.logger.WithField("id", task.id).WithError(err).Error().Log("Start hook aborted process start")
|
||||
return err
|
||||
}
|
||||
if len(extras) > 0 {
|
||||
task.config.Output = append(task.config.Output, extras...)
|
||||
task.command = task.config.CreateCommand()
|
||||
|
||||
newFFmpeg, ferr := r.ffmpeg.New(ffmpeg.ProcessConfig{
|
||||
Reconnect: task.config.Reconnect,
|
||||
ReconnectDelay: time.Duration(task.config.ReconnectDelay) * time.Second,
|
||||
StaleTimeout: time.Duration(task.config.StaleTimeout) * time.Second,
|
||||
LimitCPU: task.config.LimitCPU,
|
||||
LimitMemory: task.config.LimitMemory,
|
||||
LimitDuration: time.Duration(task.config.LimitWaitFor) * time.Second,
|
||||
Command: task.command,
|
||||
Parser: task.parser,
|
||||
Logger: task.logger,
|
||||
})
|
||||
if ferr != nil {
|
||||
r.logger.WithField("id", task.id).WithError(ferr).Error().Log("Failed to rebuild FFmpeg after start hook")
|
||||
return ferr
|
||||
}
|
||||
task.ffmpeg = newFFmpeg
|
||||
}
|
||||
}
|
||||
|
||||
task.ffmpeg.Start()
|
||||
|
||||
r.nProc++
|
||||
|
|
@ -1105,6 +1174,13 @@ func (r *restream) stopProcess(id string) error {
|
|||
|
||||
r.nProc--
|
||||
|
||||
// Notify subsystems (app/webrtc) that this process has been
|
||||
// stopped so they can tear down any per-process state. Hook is
|
||||
// best-effort: errors are the hook's problem to log.
|
||||
if r.hooks.OnStop != nil {
|
||||
r.hooks.OnStop(task.id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue