package webrtc import ( "context" "crypto/rand" "encoding/hex" "fmt" "sync" "github.com/pion/rtp" "github.com/pion/webrtc/v4" ) // PeerFactory builds PeerConnections from a shared Pion API instance // configured from Config. type PeerFactory struct { api *webrtc.API rtcConfig webrtc.Configuration } // NewPeerFactory initializes a Pion API with the codec set we support // (H.264 + Opus) and applies the provided Config. func NewPeerFactory(c Config) (*PeerFactory, error) { if err := c.Validate(); err != nil { return nil, err } me := &webrtc.MediaEngine{} if err := me.RegisterDefaultCodecs(); err != nil { return nil, fmt.Errorf("webrtc: register default codecs: %w", err) } rtcConfig, se, err := BuildICEConfig(c) if err != nil { return nil, err } opts := []func(*webrtc.API){webrtc.WithMediaEngine(me)} if se != nil { opts = append(opts, webrtc.WithSettingEngine(*se)) } api := webrtc.NewAPI(opts...) return &PeerFactory{api: api, rtcConfig: rtcConfig}, nil } // Peer wraps a Pion PeerConnection bound to either a single Source // subscription (M1, payload-type split forwarding) or to a pair of // video+audio Source subscriptions (M2, per-track forwarding). type Peer struct { resourceID string pc *webrtc.PeerConnection answer webrtc.SessionDescription // M1 single-source mode: source+sub are set, videoSource/audioSource are nil. source *Source sub chan *rtp.Packet // M2 two-source mode: videoSource/audioSource and their subs are set, // source/sub are nil. videoSource *Source audioSource *Source videoSub chan *rtp.Packet audioSub chan *rtp.Packet done chan struct{} once sync.Once } // CreatePeer builds a PeerConnection, sets the remote offer, generates an // answer, attaches video+audio tracks fed from src, and blocks until ICE // gathering completes or ctx expires. func (f *PeerFactory) CreatePeer(ctx context.Context, src *Source, offer webrtc.SessionDescription) (*Peer, error) { pc, err := f.api.NewPeerConnection(f.rtcConfig) if err != nil { return nil, fmt.Errorf("webrtc: new peer connection: %w", err) } videoTrack, err := webrtc.NewTrackLocalStaticRTP( webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "dragonfork") if err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: new video track: %w", err) } audioTrack, err := webrtc.NewTrackLocalStaticRTP( webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "dragonfork") if err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: new audio track: %w", err) } if _, err := pc.AddTrack(videoTrack); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: add video track: %w", err) } if _, err := pc.AddTrack(audioTrack); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: add audio track: %w", err) } if err := pc.SetRemoteDescription(offer); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: set remote: %w", err) } answer, err := pc.CreateAnswer(nil) if err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: create answer: %w", err) } gatherComplete := webrtc.GatheringCompletePromise(pc) if err := pc.SetLocalDescription(answer); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: set local: %w", err) } select { case <-gatherComplete: case <-ctx.Done(): _ = pc.Close() return nil, ErrICETimeout } sub := src.Subscribe(64) p := &Peer{ resourceID: newResourceID(), pc: pc, answer: *pc.LocalDescription(), source: src, sub: sub, done: make(chan struct{}), } pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) { if st == webrtc.PeerConnectionStateFailed || st == webrtc.PeerConnectionStateDisconnected || st == webrtc.PeerConnectionStateClosed { _ = p.Close() } }) go forwardRTP(p.done, sub, videoTrack, audioTrack) return p, nil } // Answer returns the locally-created SDP answer. Valid after CreatePeer. func (p *Peer) Answer() webrtc.SessionDescription { return p.answer } // ResourceID returns the stable resource id used in the WHEP Location header. func (p *Peer) ResourceID() string { return p.resourceID } // Done returns a channel that is closed when the Peer has been torn down // (either explicitly via Close, or because Pion observed an ICE // failure / disconnection). Consumers can range over it to drive // index cleanup without polling. func (p *Peer) Done() <-chan struct{} { return p.done } // Close tears down the peer connection and unsubscribes from each // source. Safe to call multiple times. func (p *Peer) Close() error { var err error p.once.Do(func() { close(p.done) if p.source != nil && p.sub != nil { p.source.Unsubscribe(p.sub) } if p.videoSource != nil && p.videoSub != nil { p.videoSource.Unsubscribe(p.videoSub) } if p.audioSource != nil && p.audioSub != nil { p.audioSource.Unsubscribe(p.audioSub) } err = p.pc.Close() }) return err } // CreatePeerFromSources is the M2 entry point: it builds a // PeerConnection with video+audio tracks and subscribes each to its // own dedicated Source. Used when the restream manager emits two // FFmpeg RTP legs on separate UDP ports — there is no payload-type // sniffing required, each Source feeds its matching track directly. func (f *PeerFactory) CreatePeerFromSources(ctx context.Context, videoSrc, audioSrc *Source, offer webrtc.SessionDescription) (*Peer, error) { pc, err := f.api.NewPeerConnection(f.rtcConfig) if err != nil { return nil, fmt.Errorf("webrtc: new peer connection: %w", err) } videoTrack, err := webrtc.NewTrackLocalStaticRTP( webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "dragonfork") if err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: new video track: %w", err) } audioTrack, err := webrtc.NewTrackLocalStaticRTP( webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "dragonfork") if err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: new audio track: %w", err) } if _, err := pc.AddTrack(videoTrack); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: add video track: %w", err) } if _, err := pc.AddTrack(audioTrack); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: add audio track: %w", err) } if err := pc.SetRemoteDescription(offer); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: set remote: %w", err) } answer, err := pc.CreateAnswer(nil) if err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: create answer: %w", err) } gatherComplete := webrtc.GatheringCompletePromise(pc) if err := pc.SetLocalDescription(answer); err != nil { _ = pc.Close() return nil, fmt.Errorf("webrtc: set local: %w", err) } select { case <-gatherComplete: case <-ctx.Done(): _ = pc.Close() return nil, ErrICETimeout } videoSub := videoSrc.Subscribe(64) audioSub := audioSrc.Subscribe(64) p := &Peer{ resourceID: newResourceID(), pc: pc, answer: *pc.LocalDescription(), videoSource: videoSrc, audioSource: audioSrc, videoSub: videoSub, audioSub: audioSub, done: make(chan struct{}), } pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) { if st == webrtc.PeerConnectionStateFailed || st == webrtc.PeerConnectionStateDisconnected || st == webrtc.PeerConnectionStateClosed { _ = p.Close() } }) go forwardRTPSplit(p.done, videoSub, audioSub, videoTrack, audioTrack) return p, nil } // AddICECandidate forwards a trickle-ICE candidate to the underlying // PeerConnection. Returns the underlying error if the candidate is // malformed or the connection has already been closed. func (p *Peer) AddICECandidate(c webrtc.ICECandidateInit) error { return p.pc.AddICECandidate(c) } func newResourceID() string { b := make([]byte, 8) _, _ = rand.Read(b) return hex.EncodeToString(b) }