From 3abd4d8fd18697ccd81749a2defa68dbbd1f6bcd Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Sun, 3 May 2026 11:23:55 +0000 Subject: [PATCH] feat(app/webrtc): broadcast process-stop via SetTeardownHook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Subsystem.SetTeardownHook installs a callback the subsystem invokes just before closing per-stream Sources in onProcessStop. Used by the WHEP Handler in M3 to drain its per-stream peer index before the underlying Sources go away โ€” closes the 'subscribers fan out into a closed channel' race the design's ยง6 error matrix calls out as 'Publisher disconnects / FFmpeg exits'. Single consumer by design (one subsystem, one handler). Calling SetTeardownHook again replaces the previous callback; nil detaches. Co-Authored-By: Claude Opus 4.7 --- app/webrtc/lifecycle.go | 11 +++++++++++ app/webrtc/subsystem.go | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/app/webrtc/lifecycle.go b/app/webrtc/lifecycle.go index 61e676a..2583d09 100644 --- a/app/webrtc/lifecycle.go +++ b/app/webrtc/lifecycle.go @@ -94,6 +94,7 @@ func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.Conf func (s *Subsystem) onProcessStop(id string) { s.mu.Lock() st, ok := s.streams[id] + teardown := s.teardown if ok { delete(s.streams, id) } @@ -102,6 +103,16 @@ func (s *Subsystem) onProcessStop(id string) { if !ok { return } + + // Broadcast first, so any subscribed peers get torn down while + // the streamID is still meaningful. The handler's tearDownStreamPeers + // drives each Peer.Close() which in turn unsubscribes from the + // Sources we're about to shut down โ€” preventing a "subscribers fan + // out into a closed channel" race. + if teardown != nil { + teardown(id) + } + if st.video != nil { _ = st.video.Close() } diff --git a/app/webrtc/subsystem.go b/app/webrtc/subsystem.go index d0d5d09..a9257d4 100644 --- a/app/webrtc/subsystem.go +++ b/app/webrtc/subsystem.go @@ -31,6 +31,12 @@ type Subsystem struct { mu sync.Mutex streams map[string]*processStream // processID -> stream pair + + // teardown is set by the Handler (or any other consumer) so the + // Subsystem can broadcast process-stop events. Called *before* + // the per-stream Sources are closed, so consumers can yank their + // own indexes while the stream id is still valid. + teardown func(streamID string) } // processStream captures the two Sources (video + audio) backing a @@ -110,6 +116,19 @@ func (s *Subsystem) Close() { } } +// SetTeardownHook registers a callback invoked just before a stream's +// Sources are closed in onProcessStop. The callback is expected to +// tear down any external resources keyed by streamID โ€” most importantly +// the WHEP Handler's per-stream peer index. +// +// Calling SetTeardownHook again replaces the previous callback; pass +// nil to detach. Only one consumer is supported by design. +func (s *Subsystem) SetTeardownHook(fn func(streamID string)) { + s.mu.Lock() + defer s.mu.Unlock() + s.teardown = fn +} + // lookup returns the per-process stream pair for id, or nil, false. // Used by the WHEP handler. func (s *Subsystem) lookup(id string) (*processStream, bool) {