feat(webrtc): add PeerFactory, Peer, and RTP forwarder

This commit is contained in:
Zac Gaetano 2026-04-17 08:47:27 -04:00
parent 917c353e03
commit b2a691186c
3 changed files with 292 additions and 0 deletions

37
core/webrtc/forward.go Normal file
View file

@ -0,0 +1,37 @@
package webrtc
import (
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
)
// forwardRTP reads packets from sub and writes them to the correct track
// based on payload type (H.264 → video, Opus → audio). Payload-type
// inspection is the simplest M1 approach; M2 will switch to per-track
// source channels once the process resolver manages separate video/audio
// UDP ports.
func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet,
video, audio *webrtc.TrackLocalStaticRTP) {
for {
select {
case <-done:
return
case pkt, ok := <-sub:
if !ok {
return
}
// Pion default H.264 PT = 102, Opus PT = 111. If the publisher
// uses different PTs we'll revisit in M2 — for M1 PoC we
// configure FFmpeg to these values explicitly in the publisher
// script.
switch pkt.PayloadType {
case 102:
_ = video.WriteRTP(pkt)
case 111:
_ = audio.WriteRTP(pkt)
default:
// Unknown PT — drop. Log in M3.
}
}
}
}

159
core/webrtc/peer.go Normal file
View file

@ -0,0 +1,159 @@
package webrtc
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"sync"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
)
// PeerFactory builds PeerConnections from a shared Pion API instance
// configured from Config.
type PeerFactory struct {
api *webrtc.API
rtcConfig webrtc.Configuration
}
// NewPeerFactory initializes a Pion API with the codec set we support
// (H.264 + Opus) and applies the provided Config.
func NewPeerFactory(c Config) (*PeerFactory, error) {
if err := c.Validate(); err != nil {
return nil, err
}
me := &webrtc.MediaEngine{}
if err := me.RegisterDefaultCodecs(); err != nil {
return nil, fmt.Errorf("webrtc: register default codecs: %w", err)
}
rtcConfig, se, err := BuildICEConfig(c)
if err != nil {
return nil, err
}
opts := []func(*webrtc.API){webrtc.WithMediaEngine(me)}
if se != nil {
opts = append(opts, webrtc.WithSettingEngine(*se))
}
api := webrtc.NewAPI(opts...)
return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil
}
// Peer wraps a Pion PeerConnection bound to a Source's subscription.
type Peer struct {
resourceID string
pc *webrtc.PeerConnection
answer webrtc.SessionDescription
source *Source
sub chan *rtp.Packet
done chan struct{}
once sync.Once
}
// CreatePeer builds a PeerConnection, sets the remote offer, generates an
// answer, attaches video+audio tracks fed from src, and blocks until ICE
// gathering completes or ctx expires.
func (f *PeerFactory) CreatePeer(ctx context.Context, src *Source, offer webrtc.SessionDescription) (*Peer, error) {
pc, err := f.api.NewPeerConnection(f.rtcConfig)
if err != nil {
return nil, fmt.Errorf("webrtc: new peer connection: %w", err)
}
videoTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264},
"video", "dragonfork")
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: new video track: %w", err)
}
audioTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus},
"audio", "dragonfork")
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: new audio track: %w", err)
}
if _, err := pc.AddTrack(videoTrack); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: add video track: %w", err)
}
if _, err := pc.AddTrack(audioTrack); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: add audio track: %w", err)
}
if err := pc.SetRemoteDescription(offer); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: set remote: %w", err)
}
answer, err := pc.CreateAnswer(nil)
if err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: create answer: %w", err)
}
gatherComplete := webrtc.GatheringCompletePromise(pc)
if err := pc.SetLocalDescription(answer); err != nil {
_ = pc.Close()
return nil, fmt.Errorf("webrtc: set local: %w", err)
}
select {
case <-gatherComplete:
case <-ctx.Done():
_ = pc.Close()
return nil, ErrICETimeout
}
sub := src.Subscribe(64)
p := &Peer{
resourceID: newResourceID(),
pc: pc,
answer: *pc.LocalDescription(),
source: src,
sub: sub,
done: make(chan struct{}),
}
pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) {
if st == webrtc.PeerConnectionStateFailed ||
st == webrtc.PeerConnectionStateDisconnected ||
st == webrtc.PeerConnectionStateClosed {
_ = p.Close()
}
})
go forwardRTP(p.done, sub, videoTrack, audioTrack)
return p, nil
}
// Answer returns the locally-created SDP answer. Valid after CreatePeer.
func (p *Peer) Answer() webrtc.SessionDescription { return p.answer }
// ResourceID returns the stable resource id used in the WHEP Location header.
func (p *Peer) ResourceID() string { return p.resourceID }
// Close tears down the peer connection and unsubscribes from the source.
// Safe to call multiple times.
func (p *Peer) Close() error {
var err error
p.once.Do(func() {
close(p.done)
p.source.Unsubscribe(p.sub)
err = p.pc.Close()
})
return err
}
func newResourceID() string {
b := make([]byte, 8)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}

96
core/webrtc/peer_test.go Normal file
View file

@ -0,0 +1,96 @@
package webrtc
import (
"context"
"testing"
"time"
"github.com/pion/webrtc/v4"
)
// minimalOfferSDP returns an SDP offer that advertises H.264 (video) and
// Opus (audio) as recvonly — the minimum a WHEP client sends.
func minimalOfferSDP(t *testing.T) webrtc.SessionDescription {
t.Helper()
// Create a throwaway PC to generate a valid offer.
me := &webrtc.MediaEngine{}
if err := me.RegisterDefaultCodecs(); err != nil {
t.Fatalf("RegisterDefaultCodecs: %v", err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
pc, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
t.Fatalf("NewPeerConnection: %v", err)
}
defer pc.Close()
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
t.Fatalf("AddTransceiver video: %v", err)
}
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
t.Fatalf("AddTransceiver audio: %v", err)
}
offer, err := pc.CreateOffer(nil)
if err != nil {
t.Fatalf("CreateOffer: %v", err)
}
return offer
}
func TestPeerFactory_CreateAnswer(t *testing.T) {
src, err := NewSource("streamA", 0)
if err != nil {
t.Fatalf("NewSource: %v", err)
}
defer src.Close()
src.Start()
cfg := DefaultConfig()
factory, err := NewPeerFactory(cfg)
if err != nil {
t.Fatalf("NewPeerFactory: %v", err)
}
offer := minimalOfferSDP(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
peer, err := factory.CreatePeer(ctx, src, offer)
if err != nil {
t.Fatalf("CreatePeer: %v", err)
}
defer peer.Close()
if peer.Answer().Type != webrtc.SDPTypeAnswer {
t.Errorf("Answer().Type = %v, want answer", peer.Answer().Type)
}
if peer.ResourceID() == "" {
t.Error("ResourceID should be non-empty")
}
}
func TestPeerFactory_ClosesCleanly(t *testing.T) {
src, _ := NewSource("streamA", 0)
defer src.Close()
src.Start()
factory, _ := NewPeerFactory(DefaultConfig())
offer := minimalOfferSDP(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
peer, err := factory.CreatePeer(ctx, src, offer)
if err != nil {
t.Fatalf("CreatePeer: %v", err)
}
if err := peer.Close(); err != nil {
t.Errorf("Close: %v", err)
}
// Second close should be a no-op, not panic.
if err := peer.Close(); err != nil {
t.Errorf("second Close: %v", err)
}
}