feat(webrtc): add app/webrtc subsystem + lifecycle hooks
Introduces the subsystem layer that sits alongside api.API and wires
the M1 core/webrtc primitives into the per-process restream lifecycle.
app/webrtc/subsystem.go:
- Subsystem struct holding the global WebRTC config, core PeerFactory,
per-process stream map, and logger
- New(config.DataWebRTC, logger) constructor
- Enabled(), Hooks(), Close(), lookup() methods
app/webrtc/lifecycle.go:
- onProcessStart: allocates an adjacent UDP port pair, binds two
Pion Sources (video on V, audio on V+1), registers them under the
process id, and returns the two RTP output legs to append to the
FFmpeg command.
- onProcessStop: tears down the pair.
- allocAdjacentPair: retries up to 10 times to find a free (V, V+1)
pair since the kernel's ephemeral picker can hand us an odd port.
- splitRTPLegs: converts BuildArgs' flat []string into two ConfigIO
entries by splitting on the second -map token.
core/webrtc/peer.go + forward.go:
- Adds PeerFactory.CreatePeerFromSources for the M2 two-source
forwarding mode (video and audio on separate UDP ports, no
payload-type sniffing). Leaves CreatePeer intact for the M1 PoC.
- Adds forwardRTPSplit companion goroutine.
config/data.go:
- Promote anonymous WebRTC struct to named type DataWebRTC so
app/webrtc can accept it by value.
This commit is contained in:
parent
46531bb479
commit
9d38e9ccdb
6 changed files with 523 additions and 18 deletions
191
app/webrtc/lifecycle.go
Normal file
191
app/webrtc/lifecycle.go
Normal file
|
|
@ -0,0 +1,191 @@
|
||||||
|
package webrtc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
||||||
|
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Default payload types. These match the values the M1 PoC / M2
|
||||||
|
// forwarder expects (H.264 = 102, Opus = 111). Operators can override
|
||||||
|
// per-process via the restream Config.
|
||||||
|
const (
|
||||||
|
defaultVideoPT = 102
|
||||||
|
defaultAudioPT = 111
|
||||||
|
)
|
||||||
|
|
||||||
|
// allocAttempts is the maximum number of times onProcessStart will
|
||||||
|
// retry port allocation to find two adjacent free loopback UDP ports.
|
||||||
|
// The kernel sometimes hands us an odd port for video, making V+1
|
||||||
|
// unavailable — in practice 2-3 retries is plenty.
|
||||||
|
const allocAttempts = 10
|
||||||
|
|
||||||
|
// onProcessStart is registered as the restream ProcessStartHook. It
|
||||||
|
// fires with the restream write lock held, just before FFmpeg Start.
|
||||||
|
//
|
||||||
|
// When the per-process WebRTC config is disabled, it returns (nil, nil)
|
||||||
|
// — FFmpeg starts normally without any extra output legs. When enabled
|
||||||
|
// it:
|
||||||
|
//
|
||||||
|
// 1. Allocates two adjacent loopback UDP ports (video on V, audio on V+1).
|
||||||
|
// 2. Binds Pion Sources on those ports and registers the pair under
|
||||||
|
// the process ID.
|
||||||
|
// 3. Builds the two RTP ConfigIO output legs via BuildArgs and returns
|
||||||
|
// them to the restream manager, which appends them to cfg.Output
|
||||||
|
// and rebuilds the FFmpeg command.
|
||||||
|
//
|
||||||
|
// Any error aborts the process start. On partial allocation failure,
|
||||||
|
// all allocated resources are cleaned up before returning.
|
||||||
|
func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.ConfigIO, error) {
|
||||||
|
if cfg == nil || !cfg.WebRTC.Enabled {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normalize PTs — zero values mean "use defaults".
|
||||||
|
wcfg := cfg.WebRTC
|
||||||
|
if wcfg.VideoPT == 0 {
|
||||||
|
wcfg.VideoPT = defaultVideoPT
|
||||||
|
}
|
||||||
|
if wcfg.AudioPT == 0 {
|
||||||
|
wcfg.AudioPT = defaultAudioPT
|
||||||
|
}
|
||||||
|
|
||||||
|
// Refuse to re-register — the restream manager should never
|
||||||
|
// double-start a process but defensive check avoids a silent
|
||||||
|
// Source leak if it does.
|
||||||
|
s.mu.Lock()
|
||||||
|
if _, exists := s.streams[id]; exists {
|
||||||
|
s.mu.Unlock()
|
||||||
|
return nil, fmt.Errorf("webrtc: process %q already has an active stream", id)
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
videoPort, videoSrc, audioSrc, err := s.allocAdjacentPair(id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the UDP readers so they're draining packets the moment
|
||||||
|
// FFmpeg comes online.
|
||||||
|
videoSrc.Start()
|
||||||
|
audioSrc.Start()
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
s.streams[id] = &processStream{id: id, video: videoSrc, audio: audioSrc}
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
s.logger.WithFields(map[string]interface{}{
|
||||||
|
"id": id,
|
||||||
|
"video_port": videoPort,
|
||||||
|
"audio_port": videoPort + 1,
|
||||||
|
"video_pt": wcfg.VideoPT,
|
||||||
|
"audio_pt": wcfg.AudioPT,
|
||||||
|
}).Info().Log("WebRTC egress registered for process")
|
||||||
|
|
||||||
|
args := BuildArgs(wcfg, videoPort)
|
||||||
|
return splitRTPLegs(args), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// onProcessStop is registered as the restream ProcessStopHook. It
|
||||||
|
// fires with the restream write lock held, just after FFmpeg has been
|
||||||
|
// stopped. It tears down the per-process Sources (which closes their
|
||||||
|
// sockets and hangs up any subscribed peers).
|
||||||
|
func (s *Subsystem) onProcessStop(id string) {
|
||||||
|
s.mu.Lock()
|
||||||
|
st, ok := s.streams[id]
|
||||||
|
if ok {
|
||||||
|
delete(s.streams, id)
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if st.video != nil {
|
||||||
|
_ = st.video.Close()
|
||||||
|
}
|
||||||
|
if st.audio != nil {
|
||||||
|
_ = st.audio.Close()
|
||||||
|
}
|
||||||
|
s.logger.WithField("id", id).Info().Log("WebRTC egress torn down for process")
|
||||||
|
}
|
||||||
|
|
||||||
|
// allocAdjacentPair finds a pair of free loopback UDP ports (V, V+1)
|
||||||
|
// and binds a Source to each. It retries up to allocAttempts times
|
||||||
|
// because the kernel's ephemeral picker may hand us a port whose +1
|
||||||
|
// neighbor is already taken. Caller owns the returned Sources; on
|
||||||
|
// error all partial allocations are cleaned up.
|
||||||
|
func (s *Subsystem) allocAdjacentPair(id string) (int, *corewebrtc.Source, *corewebrtc.Source, error) {
|
||||||
|
var lastErr error
|
||||||
|
for attempt := 0; attempt < allocAttempts; attempt++ {
|
||||||
|
port, err := Alloc()
|
||||||
|
if err != nil {
|
||||||
|
lastErr = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
videoSrc, err := corewebrtc.NewSourceOn(id, "127.0.0.1", port)
|
||||||
|
if err != nil {
|
||||||
|
lastErr = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
audioSrc, err := corewebrtc.NewSourceOn(id+":audio", "127.0.0.1", port+1)
|
||||||
|
if err != nil {
|
||||||
|
_ = videoSrc.Close()
|
||||||
|
lastErr = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return port, videoSrc, audioSrc, nil
|
||||||
|
}
|
||||||
|
if lastErr == nil {
|
||||||
|
lastErr = fmt.Errorf("unknown allocation failure")
|
||||||
|
}
|
||||||
|
return 0, nil, nil, fmt.Errorf("webrtc: allocate adjacent UDP port pair after %d attempts: %w", allocAttempts, lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitRTPLegs converts the flat BuildArgs output into two ConfigIO
|
||||||
|
// entries — one per RTP output leg. It splits on the second "-map"
|
||||||
|
// token, which marks the audio leg's start (see ffmpeg_args_test.go).
|
||||||
|
// The Address of each ConfigIO is the last argument (the udp:// URL);
|
||||||
|
// everything preceding it forms that output's Options.
|
||||||
|
func splitRTPLegs(args []string) []appcfg.ConfigIO {
|
||||||
|
// Find the two -map indices.
|
||||||
|
mapIdx := []int{}
|
||||||
|
for i, a := range args {
|
||||||
|
if a == "-map" {
|
||||||
|
mapIdx = append(mapIdx, i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(mapIdx) != 2 {
|
||||||
|
// BuildArgs always emits exactly 2 -maps; a different count
|
||||||
|
// means an upstream bug. Return a single leg covering
|
||||||
|
// everything to avoid silent truncation.
|
||||||
|
return []appcfg.ConfigIO{toLeg(args)}
|
||||||
|
}
|
||||||
|
|
||||||
|
videoTokens := args[mapIdx[0]:mapIdx[1]]
|
||||||
|
audioTokens := args[mapIdx[1]:]
|
||||||
|
|
||||||
|
return []appcfg.ConfigIO{
|
||||||
|
toLeg(videoTokens),
|
||||||
|
toLeg(audioTokens),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// toLeg splits a contiguous RTP-output token slice into a ConfigIO:
|
||||||
|
// the trailing token is the udp:// Address; everything before is the
|
||||||
|
// Options slice.
|
||||||
|
func toLeg(tokens []string) appcfg.ConfigIO {
|
||||||
|
if len(tokens) == 0 {
|
||||||
|
return appcfg.ConfigIO{}
|
||||||
|
}
|
||||||
|
addr := tokens[len(tokens)-1]
|
||||||
|
opts := make([]string, len(tokens)-1)
|
||||||
|
copy(opts, tokens[:len(tokens)-1])
|
||||||
|
return appcfg.ConfigIO{
|
||||||
|
ID: "webrtc",
|
||||||
|
Address: addr,
|
||||||
|
Options: opts,
|
||||||
|
}
|
||||||
|
}
|
||||||
60
app/webrtc/lifecycle_test.go
Normal file
60
app/webrtc/lifecycle_test.go
Normal file
|
|
@ -0,0 +1,60 @@
|
||||||
|
package webrtc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
appcfg "github.com/datarhei/core/v16/restream/app"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestSplitRTPLegs_TwoLegs feeds the real BuildArgs output through
|
||||||
|
// the splitter and checks both legs come out with the correct shape.
|
||||||
|
func TestSplitRTPLegs_TwoLegs(t *testing.T) {
|
||||||
|
args := BuildArgs(appcfg.ConfigWebRTC{Enabled: true, VideoPT: 102, AudioPT: 111}, 49200)
|
||||||
|
|
||||||
|
legs := splitRTPLegs(args)
|
||||||
|
if len(legs) != 2 {
|
||||||
|
t.Fatalf("expected 2 legs, got %d: %+v", len(legs), legs)
|
||||||
|
}
|
||||||
|
|
||||||
|
video := legs[0]
|
||||||
|
audio := legs[1]
|
||||||
|
|
||||||
|
// Leg 0 is video: address ends with :49200
|
||||||
|
if !strings.HasSuffix(video.Address, ":49200?pkt_size=1316") {
|
||||||
|
t.Fatalf("video Address unexpected: %q", video.Address)
|
||||||
|
}
|
||||||
|
// Leg 1 is audio: address ends with :49201
|
||||||
|
if !strings.HasSuffix(audio.Address, ":49201?pkt_size=1316") {
|
||||||
|
t.Fatalf("audio Address unexpected: %q", audio.Address)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Each leg's options start with -map, end with -f rtp.
|
||||||
|
if len(video.Options) < 2 || video.Options[0] != "-map" {
|
||||||
|
t.Fatalf("video leg should start with -map, got %v", video.Options)
|
||||||
|
}
|
||||||
|
if video.Options[len(video.Options)-2] != "-f" || video.Options[len(video.Options)-1] != "rtp" {
|
||||||
|
t.Fatalf("video leg should end with -f rtp, got %v", video.Options)
|
||||||
|
}
|
||||||
|
if len(audio.Options) < 2 || audio.Options[0] != "-map" {
|
||||||
|
t.Fatalf("audio leg should start with -map, got %v", audio.Options)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Neither leg's Options should contain the address itself.
|
||||||
|
for _, opt := range video.Options {
|
||||||
|
if strings.HasPrefix(opt, "udp://") {
|
||||||
|
t.Fatalf("video Options must not contain udp:// address: %v", video.Options)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSplitRTPLegs_FallbackOnUnexpectedShape ensures we don't panic
|
||||||
|
// or drop data if BuildArgs ever changes shape — the splitter returns
|
||||||
|
// a single leg wrapping everything.
|
||||||
|
func TestSplitRTPLegs_FallbackOnUnexpectedShape(t *testing.T) {
|
||||||
|
// Single -map: shouldn't happen, but don't panic.
|
||||||
|
legs := splitRTPLegs([]string{"-map", "0:v:0", "udp://1.2.3.4:5000"})
|
||||||
|
if len(legs) != 1 {
|
||||||
|
t.Fatalf("expected single fallback leg, got %d", len(legs))
|
||||||
|
}
|
||||||
|
}
|
||||||
120
app/webrtc/subsystem.go
Normal file
120
app/webrtc/subsystem.go
Normal file
|
|
@ -0,0 +1,120 @@
|
||||||
|
package webrtc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/datarhei/core/v16/config"
|
||||||
|
corewebrtc "github.com/datarhei/core/v16/core/webrtc"
|
||||||
|
"github.com/datarhei/core/v16/log"
|
||||||
|
"github.com/datarhei/core/v16/restream"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Subsystem is the app-level WebRTC egress manager. It sits alongside
|
||||||
|
// api.API as a sibling — both consume the Restream service, both wire
|
||||||
|
// themselves into the Echo HTTP router. The subsystem is responsible
|
||||||
|
// for:
|
||||||
|
//
|
||||||
|
// - Translating the global config.DataWebRTC into the core-level
|
||||||
|
// corewebrtc.Config used by the PeerFactory.
|
||||||
|
// - Installing ProcessHooks on Restreamer so that per-process start
|
||||||
|
// events allocate a pair of UDP ports, create Pion Sources, and
|
||||||
|
// inject RTP output legs into the FFmpeg command line.
|
||||||
|
// - Serving the WHEP Echo handler (see handler.go).
|
||||||
|
//
|
||||||
|
// The zero value is not usable; call New.
|
||||||
|
type Subsystem struct {
|
||||||
|
globalCfg config.DataWebRTC
|
||||||
|
coreCfg corewebrtc.Config
|
||||||
|
factory *corewebrtc.PeerFactory
|
||||||
|
logger log.Logger
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
streams map[string]*processStream // processID -> stream pair
|
||||||
|
}
|
||||||
|
|
||||||
|
// processStream captures the two Sources (video + audio) backing a
|
||||||
|
// single running process's WHEP egress.
|
||||||
|
type processStream struct {
|
||||||
|
id string
|
||||||
|
video *corewebrtc.Source
|
||||||
|
audio *corewebrtc.Source
|
||||||
|
}
|
||||||
|
|
||||||
|
// New constructs a Subsystem from the global WebRTC config section.
|
||||||
|
// The provided ffmpegUDPMax is advisory for logs only (M2 uses the
|
||||||
|
// OS's ephemeral range via Alloc). Returns an error if the PeerFactory
|
||||||
|
// cannot be built (e.g., bad NAT1To1 IPs).
|
||||||
|
func New(dataCfg config.DataWebRTC, logger log.Logger) (*Subsystem, error) {
|
||||||
|
if logger == nil {
|
||||||
|
logger = log.New("")
|
||||||
|
}
|
||||||
|
|
||||||
|
coreCfg := corewebrtc.DefaultConfig()
|
||||||
|
coreCfg.Enabled = dataCfg.Enable
|
||||||
|
coreCfg.PublicIP = dataCfg.PublicIP
|
||||||
|
|
||||||
|
// If the operator configured multiple NAT1To1 IPs (e.g., dual
|
||||||
|
// LAN/public), they take precedence over PublicIP. Wire them
|
||||||
|
// through via PublicIP as the first entry; core/webrtc currently
|
||||||
|
// reads a single PublicIP, so M2 joins the list with the first
|
||||||
|
// entry winning. (Multi-IP NAT1To1 is an M3 enhancement.)
|
||||||
|
if len(dataCfg.NAT1To1IPs) > 0 && coreCfg.PublicIP == "" {
|
||||||
|
coreCfg.PublicIP = dataCfg.NAT1To1IPs[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
factory, err := corewebrtc.NewPeerFactory(coreCfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("webrtc subsystem: build peer factory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Subsystem{
|
||||||
|
globalCfg: dataCfg,
|
||||||
|
coreCfg: coreCfg,
|
||||||
|
factory: factory,
|
||||||
|
logger: logger.WithComponent("WebRTC"),
|
||||||
|
streams: make(map[string]*processStream),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enabled reports whether the subsystem should register hooks and
|
||||||
|
// serve the WHEP endpoint. Called by the API wiring layer to decide
|
||||||
|
// whether to install anything.
|
||||||
|
func (s *Subsystem) Enabled() bool {
|
||||||
|
return s.globalCfg.Enable
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hooks returns the restream.ProcessHooks the subsystem expects to be
|
||||||
|
// installed via restream.Restreamer.SetHooks. Exactly one Subsystem
|
||||||
|
// instance should be installed per Restreamer.
|
||||||
|
func (s *Subsystem) Hooks() restream.ProcessHooks {
|
||||||
|
return restream.ProcessHooks{
|
||||||
|
OnStart: s.onProcessStart,
|
||||||
|
OnStop: s.onProcessStop,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close tears down every active per-process stream. It is safe to
|
||||||
|
// call during Core shutdown; subsequent WHEP requests will 404.
|
||||||
|
func (s *Subsystem) Close() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
for id, st := range s.streams {
|
||||||
|
if st.video != nil {
|
||||||
|
_ = st.video.Close()
|
||||||
|
}
|
||||||
|
if st.audio != nil {
|
||||||
|
_ = st.audio.Close()
|
||||||
|
}
|
||||||
|
delete(s.streams, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lookup returns the per-process stream pair for id, or nil, false.
|
||||||
|
// Used by the WHEP handler.
|
||||||
|
func (s *Subsystem) lookup(id string) (*processStream, bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
st, ok := s.streams[id]
|
||||||
|
return st, ok
|
||||||
|
}
|
||||||
|
|
@ -113,12 +113,7 @@ type Data struct {
|
||||||
Topics []string `json:"topics"`
|
Topics []string `json:"topics"`
|
||||||
} `json:"log"`
|
} `json:"log"`
|
||||||
} `json:"srt"`
|
} `json:"srt"`
|
||||||
WebRTC struct {
|
WebRTC DataWebRTC `json:"webrtc"`
|
||||||
Enable bool `json:"enable"`
|
|
||||||
PublicIP string `json:"public_ip"`
|
|
||||||
NAT1To1IPs []string `json:"nat_1_to_1_ips"`
|
|
||||||
UDPMuxPort int `json:"udp_mux_port" format:"int"`
|
|
||||||
} `json:"webrtc"`
|
|
||||||
FFmpeg struct {
|
FFmpeg struct {
|
||||||
Binary string `json:"binary"`
|
Binary string `json:"binary"`
|
||||||
MaxProcesses int64 `json:"max_processes" format:"int64"`
|
MaxProcesses int64 `json:"max_processes" format:"int64"`
|
||||||
|
|
@ -340,3 +335,12 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
|
||||||
|
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DataWebRTC is the global WebRTC egress configuration. Promoted to a
|
||||||
|
// named type so the app/webrtc subsystem can accept it by value.
|
||||||
|
type DataWebRTC struct {
|
||||||
|
Enable bool `json:"enable"`
|
||||||
|
PublicIP string `json:"public_ip"`
|
||||||
|
NAT1To1IPs []string `json:"nat_1_to_1_ips"`
|
||||||
|
UDPMuxPort int `json:"udp_mux_port" format:"int"`
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// forwardRTP reads packets from sub and writes them to the correct track
|
// forwardRTP reads packets from sub and writes them to the correct track
|
||||||
// based on payload type (H.264 → video, Opus → audio). Payload-type
|
// based on payload type (H.264 → video, Opus → audio). Used by the M1
|
||||||
// inspection is the simplest M1 approach; M2 will switch to per-track
|
// single-source PoC where FFmpeg emits both video and audio RTP to the
|
||||||
// source channels once the process resolver manages separate video/audio
|
// same UDP port.
|
||||||
// UDP ports.
|
|
||||||
func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet,
|
func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet,
|
||||||
video, audio *webrtc.TrackLocalStaticRTP) {
|
video, audio *webrtc.TrackLocalStaticRTP) {
|
||||||
for {
|
for {
|
||||||
|
|
@ -35,3 +34,29 @@ func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// forwardRTPSplit is the M2 variant: it reads from two independent
|
||||||
|
// per-track channels (one video, one audio) and writes each to its
|
||||||
|
// own Pion track. This is the mode used when the restream manager
|
||||||
|
// emits two FFmpeg RTP legs on separate UDP ports. Either channel
|
||||||
|
// closing or done firing terminates the loop.
|
||||||
|
func forwardRTPSplit(done <-chan struct{},
|
||||||
|
videoSub <-chan *rtp.Packet, audioSub <-chan *rtp.Packet,
|
||||||
|
video, audio *webrtc.TrackLocalStaticRTP) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
case pkt, ok := <-videoSub:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = video.WriteRTP(pkt)
|
||||||
|
case pkt, ok := <-audioSub:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = audio.WriteRTP(pkt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,15 +44,27 @@ func NewPeerFactory(c Config) (*PeerFactory, error) {
|
||||||
return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil
|
return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Peer wraps a Pion PeerConnection bound to a Source's subscription.
|
// Peer wraps a Pion PeerConnection bound to either a single Source
|
||||||
|
// subscription (M1, payload-type split forwarding) or to a pair of
|
||||||
|
// video+audio Source subscriptions (M2, per-track forwarding).
|
||||||
type Peer struct {
|
type Peer struct {
|
||||||
resourceID string
|
resourceID string
|
||||||
pc *webrtc.PeerConnection
|
pc *webrtc.PeerConnection
|
||||||
answer webrtc.SessionDescription
|
answer webrtc.SessionDescription
|
||||||
source *Source
|
|
||||||
sub chan *rtp.Packet
|
// M1 single-source mode: source+sub are set, videoSource/audioSource are nil.
|
||||||
done chan struct{}
|
source *Source
|
||||||
once sync.Once
|
sub chan *rtp.Packet
|
||||||
|
|
||||||
|
// M2 two-source mode: videoSource/audioSource and their subs are set,
|
||||||
|
// source/sub are nil.
|
||||||
|
videoSource *Source
|
||||||
|
audioSource *Source
|
||||||
|
videoSub chan *rtp.Packet
|
||||||
|
audioSub chan *rtp.Packet
|
||||||
|
|
||||||
|
done chan struct{}
|
||||||
|
once sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreatePeer builds a PeerConnection, sets the remote offer, generates an
|
// CreatePeer builds a PeerConnection, sets the remote offer, generates an
|
||||||
|
|
@ -140,18 +152,111 @@ func (p *Peer) Answer() webrtc.SessionDescription { return p.answer }
|
||||||
// ResourceID returns the stable resource id used in the WHEP Location header.
|
// ResourceID returns the stable resource id used in the WHEP Location header.
|
||||||
func (p *Peer) ResourceID() string { return p.resourceID }
|
func (p *Peer) ResourceID() string { return p.resourceID }
|
||||||
|
|
||||||
// Close tears down the peer connection and unsubscribes from the source.
|
// Close tears down the peer connection and unsubscribes from each
|
||||||
// Safe to call multiple times.
|
// source. Safe to call multiple times.
|
||||||
func (p *Peer) Close() error {
|
func (p *Peer) Close() error {
|
||||||
var err error
|
var err error
|
||||||
p.once.Do(func() {
|
p.once.Do(func() {
|
||||||
close(p.done)
|
close(p.done)
|
||||||
p.source.Unsubscribe(p.sub)
|
if p.source != nil && p.sub != nil {
|
||||||
|
p.source.Unsubscribe(p.sub)
|
||||||
|
}
|
||||||
|
if p.videoSource != nil && p.videoSub != nil {
|
||||||
|
p.videoSource.Unsubscribe(p.videoSub)
|
||||||
|
}
|
||||||
|
if p.audioSource != nil && p.audioSub != nil {
|
||||||
|
p.audioSource.Unsubscribe(p.audioSub)
|
||||||
|
}
|
||||||
err = p.pc.Close()
|
err = p.pc.Close()
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreatePeerFromSources is the M2 entry point: it builds a
|
||||||
|
// PeerConnection with video+audio tracks and subscribes each to its
|
||||||
|
// own dedicated Source. Used when the restream manager emits two
|
||||||
|
// FFmpeg RTP legs on separate UDP ports — there is no payload-type
|
||||||
|
// sniffing required, each Source feeds its matching track directly.
|
||||||
|
func (f *PeerFactory) CreatePeerFromSources(ctx context.Context,
|
||||||
|
videoSrc, audioSrc *Source, offer webrtc.SessionDescription) (*Peer, error) {
|
||||||
|
pc, err := f.api.NewPeerConnection(f.rtcConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("webrtc: new peer connection: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
videoTrack, err := webrtc.NewTrackLocalStaticRTP(
|
||||||
|
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264},
|
||||||
|
"video", "dragonfork")
|
||||||
|
if err != nil {
|
||||||
|
_ = pc.Close()
|
||||||
|
return nil, fmt.Errorf("webrtc: new video track: %w", err)
|
||||||
|
}
|
||||||
|
audioTrack, err := webrtc.NewTrackLocalStaticRTP(
|
||||||
|
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus},
|
||||||
|
"audio", "dragonfork")
|
||||||
|
if err != nil {
|
||||||
|
_ = pc.Close()
|
||||||
|
return nil, fmt.Errorf("webrtc: new audio track: %w", err)
|
||||||
|
}
|
||||||
|
if _, err := pc.AddTrack(videoTrack); err != nil {
|
||||||
|
_ = pc.Close()
|
||||||
|
return nil, fmt.Errorf("webrtc: add video track: %w", err)
|
||||||
|
}
|
||||||
|
if _, err := pc.AddTrack(audioTrack); err != nil {
|
||||||
|
_ = pc.Close()
|
||||||
|
return nil, fmt.Errorf("webrtc: add audio track: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pc.SetRemoteDescription(offer); err != nil {
|
||||||
|
_ = pc.Close()
|
||||||
|
return nil, fmt.Errorf("webrtc: set remote: %w", err)
|
||||||
|
}
|
||||||
|
answer, err := pc.CreateAnswer(nil)
|
||||||
|
if err != nil {
|
||||||
|
_ = pc.Close()
|
||||||
|
return nil, fmt.Errorf("webrtc: create answer: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gatherComplete := webrtc.GatheringCompletePromise(pc)
|
||||||
|
if err := pc.SetLocalDescription(answer); err != nil {
|
||||||
|
_ = pc.Close()
|
||||||
|
return nil, fmt.Errorf("webrtc: set local: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-gatherComplete:
|
||||||
|
case <-ctx.Done():
|
||||||
|
_ = pc.Close()
|
||||||
|
return nil, ErrICETimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
videoSub := videoSrc.Subscribe(64)
|
||||||
|
audioSub := audioSrc.Subscribe(64)
|
||||||
|
|
||||||
|
p := &Peer{
|
||||||
|
resourceID: newResourceID(),
|
||||||
|
pc: pc,
|
||||||
|
answer: *pc.LocalDescription(),
|
||||||
|
videoSource: videoSrc,
|
||||||
|
audioSource: audioSrc,
|
||||||
|
videoSub: videoSub,
|
||||||
|
audioSub: audioSub,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) {
|
||||||
|
if st == webrtc.PeerConnectionStateFailed ||
|
||||||
|
st == webrtc.PeerConnectionStateDisconnected ||
|
||||||
|
st == webrtc.PeerConnectionStateClosed {
|
||||||
|
_ = p.Close()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
go forwardRTPSplit(p.done, videoSub, audioSub, videoTrack, audioTrack)
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
func newResourceID() string {
|
func newResourceID() string {
|
||||||
b := make([]byte, 8)
|
b := make([]byte, 8)
|
||||||
_, _ = rand.Read(b)
|
_, _ = rand.Read(b)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue