From 01c456cd1a16b2b4295bf810896a93e449627c0a Mon Sep 17 00:00:00 2001 From: ZGaetano Date: Sat, 9 May 2026 16:20:09 -0400 Subject: [PATCH] feat(core/webrtc): add IngestPeer for WHIP publish side (issue #16) IngestPeer is the symmetric inverse of the WHEP Peer: - Creates a recvonly PeerConnection (Pion receives tracks from the publisher) - OnTrack -> reads RTP packets from the remote track and writes them to loopback UDP ports (videoPort, audioPort) that FFmpeg is listening on - Full lifecycle: Done(), Connected(), Close(), AddICECandidate() PeerFactory.CreateIngestPeer() follows the same ctx/offer/ICE-gather pattern as CreatePeerFromSources() so the app/webrtc handler layer can use a uniform error matrix. --- core/webrtc/whip.go | 195 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 195 insertions(+) create mode 100644 core/webrtc/whip.go diff --git a/core/webrtc/whip.go b/core/webrtc/whip.go new file mode 100644 index 0000000..827d199 --- /dev/null +++ b/core/webrtc/whip.go @@ -0,0 +1,195 @@ +package webrtc + +import ( + "context" + "fmt" + "net" + "sync" + + "github.com/pion/webrtc/v4" +) + +// IngestPeer receives a WebRTC publish stream (WHIP protocol) and +// forwards the received RTP tracks to loopback UDP ports for FFmpeg +// consumption. It is the symmetric inverse of the egress Peer: +// +// Publisher (browser / OBS) -> WebRTC -> IngestPeer -> UDP -> FFmpeg input +// +// FFmpeg must already be bound on videoPort/audioPort (i.e., the process +// has started with those ports as its RTP input legs) before the first +// RTP packets arrive — the loopback UDP writes are fire-and-forget and +// harmless if FFmpeg hasn't opened the socket yet. +type IngestPeer struct { + resourceID string + pc *webrtc.PeerConnection + answer webrtc.SessionDescription + + // Destination UDP addresses — FFmpeg's bound RTP input sockets. + videoAddr *net.UDPAddr + audioAddr *net.UDPAddr + + // Shared sender socket used for all forwarded packets. + udpConn *net.UDPConn + + done chan struct{} + once sync.Once + connected chan struct{} + connOnce sync.Once +} + +// CreateIngestPeer builds a recvonly PeerConnection, sets the remote +// offer (from the WHIP publisher), creates and gathers the answer, then +// wires OnTrack to forward received video and audio RTP to videoPort and +// audioPort on localhost respectively. +// +// videoPort and audioPort must be loopback UDP ports that FFmpeg (or any +// other RTP consumer) is already listening on. The caller owns the returned +// peer and must call Close() when done. +func (f *PeerFactory) CreateIngestPeer(ctx context.Context, + offer webrtc.SessionDescription, + videoPort, audioPort int) (*IngestPeer, error) { + + pc, err := f.api.NewPeerConnection(f.rtcConfig) + if err != nil { + return nil, fmt.Errorf("webrtc: whip: new peer connection: %w", err) + } + + // Add recvonly transceivers so the SDP negotiation offers to + // receive both video and audio from the publisher. + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: whip: add video transceiver: %w", err) + } + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: whip: add audio transceiver: %w", err) + } + + if err := pc.SetRemoteDescription(offer); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: whip: set remote: %w", err) + } + answer, err := pc.CreateAnswer(nil) + if err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: whip: create answer: %w", err) + } + + gatherComplete := webrtc.GatheringCompletePromise(pc) + if err := pc.SetLocalDescription(answer); err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: whip: set local: %w", err) + } + + select { + case <-gatherComplete: + case <-ctx.Done(): + _ = pc.Close() + return nil, ErrICETimeout + } + + // Shared UDP sender socket for forwarding RTP to FFmpeg. + udpConn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0}) + if err != nil { + _ = pc.Close() + return nil, fmt.Errorf("webrtc: whip: bind sender socket: %w", err) + } + + p := &IngestPeer{ + resourceID: newResourceID(), + pc: pc, + answer: *pc.LocalDescription(), + videoAddr: &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: videoPort}, + audioAddr: &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: audioPort}, + udpConn: udpConn, + done: make(chan struct{}), + connected: make(chan struct{}), + } + + pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) { + if st == webrtc.PeerConnectionStateConnected { + p.connOnce.Do(func() { close(p.connected) }) + } + if st == webrtc.PeerConnectionStateFailed || + st == webrtc.PeerConnectionStateDisconnected || + st == webrtc.PeerConnectionStateClosed { + _ = p.Close() + } + }) + + // Wire each incoming track to its UDP destination. OnTrack fires + // once per negotiated media section; we expect at most one video + // and one audio track. + pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { + var dst *net.UDPAddr + switch track.Kind() { + case webrtc.RTPCodecTypeVideo: + dst = p.videoAddr + case webrtc.RTPCodecTypeAudio: + dst = p.audioAddr + default: + // Unknown media kind — ignore. + return + } + go p.forwardTrack(track, dst) + }) + + return p, nil +} + +// forwardTrack reads raw RTP packets from the remote track and writes +// them verbatim to dst via the shared UDP sender socket. Exits when +// p.done is closed or the track read errors (e.g., peer connection +// closed by the remote). +func (p *IngestPeer) forwardTrack(track *webrtc.TrackRemote, dst *net.UDPAddr) { + buf := make([]byte, 1500) // MTU-sized; same as Source.readLoop + for { + select { + case <-p.done: + return + default: + } + n, _, err := track.Read(buf) + if err != nil { + // Track closed or peer gone — exit cleanly. + return + } + // WriteToUDP is non-blocking from the caller's perspective: + // if FFmpeg hasn't bound the port yet the OS will ICMP-reject + // and we'll get a net.Error. We ignore write errors to avoid + // thrashing on transient startup races. + _, _ = p.udpConn.WriteToUDP(buf[:n], dst) + } +} + +// Answer returns the locally-created SDP answer. Valid after CreateIngestPeer. +func (p *IngestPeer) Answer() webrtc.SessionDescription { return p.answer } + +// ResourceID returns the stable resource id used in the WHIP Location header. +func (p *IngestPeer) ResourceID() string { return p.resourceID } + +// Done returns a channel closed when the peer has been torn down. +func (p *IngestPeer) Done() <-chan struct{} { return p.done } + +// Connected returns a channel closed when ICE first reaches Connected state. +func (p *IngestPeer) Connected() <-chan struct{} { return p.connected } + +// AddICECandidate forwards a trickle-ICE candidate to the underlying +// PeerConnection. +func (p *IngestPeer) AddICECandidate(c webrtc.ICECandidateInit) error { + return p.pc.AddICECandidate(c) +} + +// Close tears down the peer connection and stops all track forwarders. +// Safe to call multiple times. +func (p *IngestPeer) Close() error { + var err error + p.once.Do(func() { + close(p.done) + _ = p.udpConn.Close() + err = p.pc.Close() + }) + return err +}