diff --git a/core/webrtc/forward.go b/core/webrtc/forward.go new file mode 100644 index 0000000..1b74fdd --- /dev/null +++ b/core/webrtc/forward.go @@ -0,0 +1,37 @@ +package webrtc + +import ( + "github.com/pion/rtp" + "github.com/pion/webrtc/v4" +) + +// forwardRTP reads packets from sub and writes them to the correct track +// based on payload type (H.264 → video, Opus → audio). Payload-type +// inspection is the simplest M1 approach; M2 will switch to per-track +// source channels once the process resolver manages separate video/audio +// UDP ports. +func forwardRTP(done <-chan struct{}, sub <-chan *rtp.Packet, + video, audio *webrtc.TrackLocalStaticRTP) { + for { + select { + case <-done: + return + case pkt, ok := <-sub: + if !ok { + return + } + // Pion default H.264 PT = 102, Opus PT = 111. If the publisher + // uses different PTs we'll revisit in M2 — for M1 PoC we + // configure FFmpeg to these values explicitly in the publisher + // script. + switch pkt.PayloadType { + case 102: + _ = video.WriteRTP(pkt) + case 111: + _ = audio.WriteRTP(pkt) + default: + // Unknown PT — drop. Log in M3. + } + } + } +} diff --git a/core/webrtc/peer.go b/core/webrtc/peer.go new file mode 100644 index 0000000..ffa0af3 --- /dev/null +++ b/core/webrtc/peer.go @@ -0,0 +1,159 @@ +package webrtc + +import ( + "context" + "crypto/rand" + "encoding/hex" + "fmt" + "sync" + + "github.com/pion/rtp" + "github.com/pion/webrtc/v4" +) + +// PeerFactory builds PeerConnections from a shared Pion API instance +// configured from Config. +type PeerFactory struct { + api *webrtc.API + rtcConfig webrtc.Configuration +} + +// NewPeerFactory initializes a Pion API with the codec set we support +// (H.264 + Opus) and applies the provided Config. +func NewPeerFactory(c Config) (*PeerFactory, error) { + if err := c.Validate(); err != nil { + return nil, err + } + + me := &webrtc.MediaEngine{} + if err := me.RegisterDefaultCodecs(); err != nil { + return nil, fmt.Errorf("webrtc: register default codecs: %w", err) + } + + rtcConfig, se, err := BuildICEConfig(c) + if err != nil { + return nil, err + } + + opts := []func(*webrtc.API){webrtc.WithMediaEngine(me)} + if se != nil { + opts = append(opts, webrtc.WithSettingEngine(*se)) + } + api := webrtc.NewAPI(opts...) + + return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil +} + +// Peer wraps a Pion PeerConnection bound to a Source's subscription. +type Peer struct { + resourceID string + pc *webrtc.PeerConnection + answer webrtc.SessionDescription + source *Source + sub chan *rtp.Packet + done chan struct{} + once sync.Once +} + +// CreatePeer builds a PeerConnection, sets the remote offer, generates an +// answer, attaches video+audio tracks fed from src, and blocks until ICE +// gathering completes or ctx expires. +func (f *PeerFactory) CreatePeer(ctx context.Context, src *Source, offer webrtc.SessionDescription) (*Peer, error) { + pc, err := f.api.NewPeerConnection(f.rtcConfig) + if err != nil { + return nil, fmt.Errorf("webrtc: new peer connection: %w", err) + } + + videoTrack, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, + "video", "dragonfork") + if err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: new video track: %w", err) + } + audioTrack, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, + "audio", "dragonfork") + if err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: new audio track: %w", err) + } + if _, err := pc.AddTrack(videoTrack); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: add video track: %w", err) + } + if _, err := pc.AddTrack(audioTrack); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: add audio track: %w", err) + } + + if err := pc.SetRemoteDescription(offer); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: set remote: %w", err) + } + answer, err := pc.CreateAnswer(nil) + if err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: create answer: %w", err) + } + + gatherComplete := webrtc.GatheringCompletePromise(pc) + if err := pc.SetLocalDescription(answer); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: set local: %w", err) + } + + select { + case <-gatherComplete: + case <-ctx.Done(): + _ = pc.Close() + return nil, ErrICETimeout + } + + sub := src.Subscribe(64) + + p := &Peer{ + resourceID: newResourceID(), + pc: pc, + answer: *pc.LocalDescription(), + source: src, + sub: sub, + done: make(chan struct{}), + } + + pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) { + if st == webrtc.PeerConnectionStateFailed || + st == webrtc.PeerConnectionStateDisconnected || + st == webrtc.PeerConnectionStateClosed { + _ = p.Close() + } + }) + + go forwardRTP(p.done, sub, videoTrack, audioTrack) + + return p, nil +} + +// Answer returns the locally-created SDP answer. Valid after CreatePeer. +func (p *Peer) Answer() webrtc.SessionDescription { return p.answer } + +// ResourceID returns the stable resource id used in the WHEP Location header. +func (p *Peer) ResourceID() string { return p.resourceID } + +// Close tears down the peer connection and unsubscribes from the source. +// Safe to call multiple times. +func (p *Peer) Close() error { + var err error + p.once.Do(func() { + close(p.done) + p.source.Unsubscribe(p.sub) + err = p.pc.Close() + }) + return err +} + +func newResourceID() string { + b := make([]byte, 8) + _, _ = rand.Read(b) + return hex.EncodeToString(b) +} diff --git a/core/webrtc/peer_test.go b/core/webrtc/peer_test.go new file mode 100644 index 0000000..46e615a --- /dev/null +++ b/core/webrtc/peer_test.go @@ -0,0 +1,96 @@ +package webrtc + +import ( + "context" + "testing" + "time" + + "github.com/pion/webrtc/v4" +) + +// minimalOfferSDP returns an SDP offer that advertises H.264 (video) and +// Opus (audio) as recvonly — the minimum a WHEP client sends. +func minimalOfferSDP(t *testing.T) webrtc.SessionDescription { + t.Helper() + // Create a throwaway PC to generate a valid offer. + me := &webrtc.MediaEngine{} + if err := me.RegisterDefaultCodecs(); err != nil { + t.Fatalf("RegisterDefaultCodecs: %v", err) + } + api := webrtc.NewAPI(webrtc.WithMediaEngine(me)) + pc, err := api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + t.Fatalf("NewPeerConnection: %v", err) + } + defer pc.Close() + + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + t.Fatalf("AddTransceiver video: %v", err) + } + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + t.Fatalf("AddTransceiver audio: %v", err) + } + offer, err := pc.CreateOffer(nil) + if err != nil { + t.Fatalf("CreateOffer: %v", err) + } + return offer +} + +func TestPeerFactory_CreateAnswer(t *testing.T) { + src, err := NewSource("streamA", 0) + if err != nil { + t.Fatalf("NewSource: %v", err) + } + defer src.Close() + src.Start() + + cfg := DefaultConfig() + factory, err := NewPeerFactory(cfg) + if err != nil { + t.Fatalf("NewPeerFactory: %v", err) + } + + offer := minimalOfferSDP(t) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + peer, err := factory.CreatePeer(ctx, src, offer) + if err != nil { + t.Fatalf("CreatePeer: %v", err) + } + defer peer.Close() + + if peer.Answer().Type != webrtc.SDPTypeAnswer { + t.Errorf("Answer().Type = %v, want answer", peer.Answer().Type) + } + if peer.ResourceID() == "" { + t.Error("ResourceID should be non-empty") + } +} + +func TestPeerFactory_ClosesCleanly(t *testing.T) { + src, _ := NewSource("streamA", 0) + defer src.Close() + src.Start() + + factory, _ := NewPeerFactory(DefaultConfig()) + offer := minimalOfferSDP(t) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + peer, err := factory.CreatePeer(ctx, src, offer) + if err != nil { + t.Fatalf("CreatePeer: %v", err) + } + if err := peer.Close(); err != nil { + t.Errorf("Close: %v", err) + } + // Second close should be a no-op, not panic. + if err := peer.Close(); err != nil { + t.Errorf("second Close: %v", err) + } +}