feat(core/webrtc): add IngestPeer for WHIP publish side (issue #16)
IngestPeer is the symmetric inverse of the WHEP Peer: - Creates a recvonly PeerConnection (Pion receives tracks from the publisher) - OnTrack -> reads RTP packets from the remote track and writes them to loopback UDP ports (videoPort, audioPort) that FFmpeg is listening on - Full lifecycle: Done(), Connected(), Close(), AddICECandidate() PeerFactory.CreateIngestPeer() follows the same ctx/offer/ICE-gather pattern as CreatePeerFromSources() so the app/webrtc handler layer can use a uniform error matrix.
This commit is contained in:
parent
5f9ba6f764
commit
01c456cd1a
1 changed files with 195 additions and 0 deletions
195
core/webrtc/whip.go
Normal file
195
core/webrtc/whip.go
Normal file
|
|
@ -0,0 +1,195 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
// IngestPeer receives a WebRTC publish stream (WHIP protocol) and
|
||||
// forwards the received RTP tracks to loopback UDP ports for FFmpeg
|
||||
// consumption. It is the symmetric inverse of the egress Peer:
|
||||
//
|
||||
// Publisher (browser / OBS) -> WebRTC -> IngestPeer -> UDP -> FFmpeg input
|
||||
//
|
||||
// FFmpeg must already be bound on videoPort/audioPort (i.e., the process
|
||||
// has started with those ports as its RTP input legs) before the first
|
||||
// RTP packets arrive — the loopback UDP writes are fire-and-forget and
|
||||
// harmless if FFmpeg hasn't opened the socket yet.
|
||||
type IngestPeer struct {
|
||||
resourceID string
|
||||
pc *webrtc.PeerConnection
|
||||
answer webrtc.SessionDescription
|
||||
|
||||
// Destination UDP addresses — FFmpeg's bound RTP input sockets.
|
||||
videoAddr *net.UDPAddr
|
||||
audioAddr *net.UDPAddr
|
||||
|
||||
// Shared sender socket used for all forwarded packets.
|
||||
udpConn *net.UDPConn
|
||||
|
||||
done chan struct{}
|
||||
once sync.Once
|
||||
connected chan struct{}
|
||||
connOnce sync.Once
|
||||
}
|
||||
|
||||
// CreateIngestPeer builds a recvonly PeerConnection, sets the remote
|
||||
// offer (from the WHIP publisher), creates and gathers the answer, then
|
||||
// wires OnTrack to forward received video and audio RTP to videoPort and
|
||||
// audioPort on localhost respectively.
|
||||
//
|
||||
// videoPort and audioPort must be loopback UDP ports that FFmpeg (or any
|
||||
// other RTP consumer) is already listening on. The caller owns the returned
|
||||
// peer and must call Close() when done.
|
||||
func (f *PeerFactory) CreateIngestPeer(ctx context.Context,
|
||||
offer webrtc.SessionDescription,
|
||||
videoPort, audioPort int) (*IngestPeer, error) {
|
||||
|
||||
pc, err := f.api.NewPeerConnection(f.rtcConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("webrtc: whip: new peer connection: %w", err)
|
||||
}
|
||||
|
||||
// Add recvonly transceivers so the SDP negotiation offers to
|
||||
// receive both video and audio from the publisher.
|
||||
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
|
||||
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: whip: add video transceiver: %w", err)
|
||||
}
|
||||
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
|
||||
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: whip: add audio transceiver: %w", err)
|
||||
}
|
||||
|
||||
if err := pc.SetRemoteDescription(offer); err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: whip: set remote: %w", err)
|
||||
}
|
||||
answer, err := pc.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: whip: create answer: %w", err)
|
||||
}
|
||||
|
||||
gatherComplete := webrtc.GatheringCompletePromise(pc)
|
||||
if err := pc.SetLocalDescription(answer); err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: whip: set local: %w", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-gatherComplete:
|
||||
case <-ctx.Done():
|
||||
_ = pc.Close()
|
||||
return nil, ErrICETimeout
|
||||
}
|
||||
|
||||
// Shared UDP sender socket for forwarding RTP to FFmpeg.
|
||||
udpConn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0})
|
||||
if err != nil {
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("webrtc: whip: bind sender socket: %w", err)
|
||||
}
|
||||
|
||||
p := &IngestPeer{
|
||||
resourceID: newResourceID(),
|
||||
pc: pc,
|
||||
answer: *pc.LocalDescription(),
|
||||
videoAddr: &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: videoPort},
|
||||
audioAddr: &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: audioPort},
|
||||
udpConn: udpConn,
|
||||
done: make(chan struct{}),
|
||||
connected: make(chan struct{}),
|
||||
}
|
||||
|
||||
pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) {
|
||||
if st == webrtc.PeerConnectionStateConnected {
|
||||
p.connOnce.Do(func() { close(p.connected) })
|
||||
}
|
||||
if st == webrtc.PeerConnectionStateFailed ||
|
||||
st == webrtc.PeerConnectionStateDisconnected ||
|
||||
st == webrtc.PeerConnectionStateClosed {
|
||||
_ = p.Close()
|
||||
}
|
||||
})
|
||||
|
||||
// Wire each incoming track to its UDP destination. OnTrack fires
|
||||
// once per negotiated media section; we expect at most one video
|
||||
// and one audio track.
|
||||
pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
|
||||
var dst *net.UDPAddr
|
||||
switch track.Kind() {
|
||||
case webrtc.RTPCodecTypeVideo:
|
||||
dst = p.videoAddr
|
||||
case webrtc.RTPCodecTypeAudio:
|
||||
dst = p.audioAddr
|
||||
default:
|
||||
// Unknown media kind — ignore.
|
||||
return
|
||||
}
|
||||
go p.forwardTrack(track, dst)
|
||||
})
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// forwardTrack reads raw RTP packets from the remote track and writes
|
||||
// them verbatim to dst via the shared UDP sender socket. Exits when
|
||||
// p.done is closed or the track read errors (e.g., peer connection
|
||||
// closed by the remote).
|
||||
func (p *IngestPeer) forwardTrack(track *webrtc.TrackRemote, dst *net.UDPAddr) {
|
||||
buf := make([]byte, 1500) // MTU-sized; same as Source.readLoop
|
||||
for {
|
||||
select {
|
||||
case <-p.done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
n, _, err := track.Read(buf)
|
||||
if err != nil {
|
||||
// Track closed or peer gone — exit cleanly.
|
||||
return
|
||||
}
|
||||
// WriteToUDP is non-blocking from the caller's perspective:
|
||||
// if FFmpeg hasn't bound the port yet the OS will ICMP-reject
|
||||
// and we'll get a net.Error. We ignore write errors to avoid
|
||||
// thrashing on transient startup races.
|
||||
_, _ = p.udpConn.WriteToUDP(buf[:n], dst)
|
||||
}
|
||||
}
|
||||
|
||||
// Answer returns the locally-created SDP answer. Valid after CreateIngestPeer.
|
||||
func (p *IngestPeer) Answer() webrtc.SessionDescription { return p.answer }
|
||||
|
||||
// ResourceID returns the stable resource id used in the WHIP Location header.
|
||||
func (p *IngestPeer) ResourceID() string { return p.resourceID }
|
||||
|
||||
// Done returns a channel closed when the peer has been torn down.
|
||||
func (p *IngestPeer) Done() <-chan struct{} { return p.done }
|
||||
|
||||
// Connected returns a channel closed when ICE first reaches Connected state.
|
||||
func (p *IngestPeer) Connected() <-chan struct{} { return p.connected }
|
||||
|
||||
// AddICECandidate forwards a trickle-ICE candidate to the underlying
|
||||
// PeerConnection.
|
||||
func (p *IngestPeer) AddICECandidate(c webrtc.ICECandidateInit) error {
|
||||
return p.pc.AddICECandidate(c)
|
||||
}
|
||||
|
||||
// Close tears down the peer connection and stops all track forwarders.
|
||||
// Safe to call multiple times.
|
||||
func (p *IngestPeer) Close() error {
|
||||
var err error
|
||||
p.once.Do(func() {
|
||||
close(p.done)
|
||||
_ = p.udpConn.Close()
|
||||
err = p.pc.Close()
|
||||
})
|
||||
return err
|
||||
}
|
||||
Loading…
Reference in a new issue