feat(webrtc): add Connected() channel to Peer for ICE establishment timing
Some checks failed
ci / race tests (push) Blocked by required conditions
ci / WebRTC smoke (5-viewer fanout) (push) Blocked by required conditions
ci / WebRTC latency p95 gate (push) Blocked by required conditions
ci / vet + build (push) Has been cancelled

This commit is contained in:
Zac Gaetano 2026-05-06 15:55:42 -04:00
parent 2283a32f2a
commit 70324aad28

View file

@ -63,8 +63,10 @@ type Peer struct {
videoSub chan *rtp.Packet
audioSub chan *rtp.Packet
done chan struct{}
once sync.Once
done chan struct{}
once sync.Once
connected chan struct{}
connOnce sync.Once
}
// CreatePeer builds a PeerConnection, sets the remote offer, generates an
@ -131,9 +133,13 @@ func (f *PeerFactory) CreatePeer(ctx context.Context, src *Source, offer webrtc.
source: src,
sub: sub,
done: make(chan struct{}),
connected: make(chan struct{}),
}
pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) {
if st == webrtc.PeerConnectionStateConnected {
p.connOnce.Do(func() { close(p.connected) })
}
if st == webrtc.PeerConnectionStateFailed ||
st == webrtc.PeerConnectionStateDisconnected ||
st == webrtc.PeerConnectionStateClosed {
@ -158,6 +164,12 @@ func (p *Peer) ResourceID() string { return p.resourceID }
// index cleanup without polling.
func (p *Peer) Done() <-chan struct{} { return p.done }
// Connected returns a channel that is closed the first time Pion reports
// PeerConnectionStateConnected. Callers that need to measure ICE
// establishment duration select on Connected() vs Done() from the moment
// the peer is created.
func (p *Peer) Connected() <-chan struct{} { return p.connected }
// Close tears down the peer connection and unsubscribes from each
// source. Safe to call multiple times.
func (p *Peer) Close() error {
@ -248,9 +260,13 @@ func (f *PeerFactory) CreatePeerFromSources(ctx context.Context,
videoSub: videoSub,
audioSub: audioSub,
done: make(chan struct{}),
connected: make(chan struct{}),
}
pc.OnConnectionStateChange(func(st webrtc.PeerConnectionState) {
if st == webrtc.PeerConnectionStateConnected {
p.connOnce.Do(func() { close(p.connected) })
}
if st == webrtc.PeerConnectionStateFailed ||
st == webrtc.PeerConnectionStateDisconnected ||
st == webrtc.PeerConnectionStateClosed {
@ -263,7 +279,6 @@ func (f *PeerFactory) CreatePeerFromSources(ctx context.Context,
return p, nil
}
// AddICECandidate forwards a trickle-ICE candidate to the underlying
// PeerConnection. Returns the underlying error if the candidate is
// malformed or the connection has already been closed.