datarhei-dragonfork-core/core/webrtc/peer.go
Zac Gaetano 9d38e9ccdb feat(webrtc): add app/webrtc subsystem + lifecycle hooks
Introduces the subsystem layer that sits alongside api.API and wires
the M1 core/webrtc primitives into the per-process restream lifecycle.

app/webrtc/subsystem.go:
  - Subsystem struct holding the global WebRTC config, core PeerFactory,
    per-process stream map, and logger
  - New(config.DataWebRTC, logger) constructor
  - Enabled(), Hooks(), Close(), lookup() methods

app/webrtc/lifecycle.go:
  - onProcessStart: allocates an adjacent UDP port pair, binds two
    Pion Sources (video on V, audio on V+1), registers them under the
    process id, and returns the two RTP output legs to append to the
    FFmpeg command.
  - onProcessStop: tears down the pair.
  - allocAdjacentPair: retries up to 10 times to find a free (V, V+1)
    pair since the kernel's ephemeral picker can hand us an odd port.
  - splitRTPLegs: converts BuildArgs' flat []string into two ConfigIO
    entries by splitting on the second -map token.

core/webrtc/peer.go + forward.go:
  - Adds PeerFactory.CreatePeerFromSources for the M2 two-source
    forwarding mode (video and audio on separate UDP ports, no
    payload-type sniffing). Leaves CreatePeer intact for the M1 PoC.
  - Adds forwardRTPSplit companion goroutine.

config/data.go:
  - Promote anonymous WebRTC struct to named type DataWebRTC so
    app/webrtc can accept it by value.
2026-04-17 10:02:00 -04:00

264 lines
7.2 KiB
Go

package webrtc
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"sync"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
)
// PeerFactory builds PeerConnections from a shared Pion API instance
// configured from Config.
type PeerFactory struct {
api *webrtc.API
rtcConfig webrtc.Configuration
}
// NewPeerFactory initializes a Pion API with the codec set we support
// (H.264 + Opus) and applies the provided Config.
func NewPeerFactory(c Config) (*PeerFactory, error) {
if err := c.Validate(); err != nil {
return nil, err
}
me := &webrtc.MediaEngine{}
if err := me.RegisterDefaultCodecs(); err != nil {
return nil, fmt.Errorf("webrtc: register default codecs: %w", err)
}
rtcConfig, se, err := BuildICEConfig(c)
if err != nil {
return nil, err
}
opts := []func(*webrtc.API){webrtc.WithMediaEngine(me)}
if se != nil {
opts = append(opts, webrtc.WithSettingEngine(*se))
}
api := webrtc.NewAPI(opts...)
return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil
}
// Peer wraps a Pion PeerConnection bound to either a single Source
// subscription (M1, payload-type split forwarding) or to a pair of
// video+audio Source subscriptions (M2, per-track forwarding).
type Peer struct {
resourceID string
pc *webrtc.PeerConnection
answer webrtc.SessionDescription
// M1 single-source mode: source+sub are set, videoSource/audioSource are nil.
source *Source
sub chan *rtp.Packet
// M2 two-source mode: videoSource/audioSource and their subs are set,
// source/sub are nil.
videoSource *Source
audioSource *Source
videoSub chan *rtp.Packet
audioSub chan *rtp.Packet
done chan struct{}
once sync.Once
}
// CreatePeer builds a PeerConnection, sets the remote offer, generates an
// answer, attaches video+audio tracks fed from src, and blocks until ICE
// gathering completes or ctx expires.
func (f *PeerFactory) CreatePeer(ctx context.Context, src *Source, offer webrtc.SessionDescription) (*Peer, error) {
pc, err := f.api.NewPeerConnection(f.rtcConfig)
if err != nil {
return nil, fmt.Errorf("webrtc: new peer connection: %w", err)
}
videoTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264},
"video", "dragonfork")
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: new video track: %w", err)
}
audioTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus},
"audio", "dragonfork")
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: new audio track: %w", err)
}
if _, err := pc.AddTrack(videoTrack); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: add video track: %w", err)
}
if _, err := pc.AddTrack(audioTrack); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: add audio track: %w", err)
}
if err := pc.SetRemoteDescription(offer); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: set remote: %w", err)
}
answer, err := pc.CreateAnswer(nil)
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: create answer: %w", err)
}
gatherComplete := webrtc.GatheringCompletePromise(pc)
if err := pc.SetLocalDescription(answer); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: set local: %w", err)
}
select {
case <-gatherComplete:
case <-ctx.Done():
_ = pc.Close()
return nil, ErrICETimeout
}
sub := src.Subscribe(64)
p := &Peer{
resourceID: newResourceID(),
pc: pc,
answer: *pc.LocalDescription(),
source: src,
sub: sub,
done: make(chan struct{}),
}
pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) {
if st == webrtc.PeerConnectionStateFailed ||
st == webrtc.PeerConnectionStateDisconnected ||
st == webrtc.PeerConnectionStateClosed {
_ = p.Close()
}
})
go forwardRTP(p.done, sub, videoTrack, audioTrack)
return p, nil
}
// Answer returns the locally-created SDP answer. Valid after CreatePeer.
func (p *Peer) Answer() webrtc.SessionDescription { return p.answer }
// ResourceID returns the stable resource id used in the WHEP Location header.
func (p *Peer) ResourceID() string { return p.resourceID }
// Close tears down the peer connection and unsubscribes from each
// source. Safe to call multiple times.
func (p *Peer) Close() error {
var err error
p.once.Do(func() {
close(p.done)
if p.source != nil && p.sub != nil {
p.source.Unsubscribe(p.sub)
}
if p.videoSource != nil && p.videoSub != nil {
p.videoSource.Unsubscribe(p.videoSub)
}
if p.audioSource != nil && p.audioSub != nil {
p.audioSource.Unsubscribe(p.audioSub)
}
err = p.pc.Close()
})
return err
}
// CreatePeerFromSources is the M2 entry point: it builds a
// PeerConnection with video+audio tracks and subscribes each to its
// own dedicated Source. Used when the restream manager emits two
// FFmpeg RTP legs on separate UDP ports — there is no payload-type
// sniffing required, each Source feeds its matching track directly.
func (f *PeerFactory) CreatePeerFromSources(ctx context.Context,
videoSrc, audioSrc *Source, offer webrtc.SessionDescription) (*Peer, error) {
pc, err := f.api.NewPeerConnection(f.rtcConfig)
if err != nil {
return nil, fmt.Errorf("webrtc: new peer connection: %w", err)
}
videoTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264},
"video", "dragonfork")
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: new video track: %w", err)
}
audioTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus},
"audio", "dragonfork")
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: new audio track: %w", err)
}
if _, err := pc.AddTrack(videoTrack); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: add video track: %w", err)
}
if _, err := pc.AddTrack(audioTrack); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: add audio track: %w", err)
}
if err := pc.SetRemoteDescription(offer); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: set remote: %w", err)
}
answer, err := pc.CreateAnswer(nil)
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: create answer: %w", err)
}
gatherComplete := webrtc.GatheringCompletePromise(pc)
if err := pc.SetLocalDescription(answer); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: set local: %w", err)
}
select {
case <-gatherComplete:
case <-ctx.Done():
_ = pc.Close()
return nil, ErrICETimeout
}
videoSub := videoSrc.Subscribe(64)
audioSub := audioSrc.Subscribe(64)
p := &Peer{
resourceID: newResourceID(),
pc: pc,
answer: *pc.LocalDescription(),
videoSource: videoSrc,
audioSource: audioSrc,
videoSub: videoSub,
audioSub: audioSub,
done: make(chan struct{}),
}
pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) {
if st == webrtc.PeerConnectionStateFailed ||
st == webrtc.PeerConnectionStateDisconnected ||
st == webrtc.PeerConnectionStateClosed {
_ = p.Close()
}
})
go forwardRTPSplit(p.done, videoSub, audioSub, videoTrack, audioTrack)
return p, nil
}
func newResourceID() string {
b := make([]byte, 8)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}