diff --git a/app/webrtc/lifecycle.go b/app/webrtc/lifecycle.go index 61e676a..2583d09 100644 --- a/app/webrtc/lifecycle.go +++ b/app/webrtc/lifecycle.go @@ -94,6 +94,7 @@ func (s *Subsystem) onProcessStart(id string, cfg *appcfg.Config) ([]appcfg.Conf func (s *Subsystem) onProcessStop(id string) { s.mu.Lock() st, ok := s.streams[id] + teardown := s.teardown if ok { delete(s.streams, id) } @@ -102,6 +103,16 @@ func (s *Subsystem) onProcessStop(id string) { if !ok { return } + + // Broadcast first, so any subscribed peers get torn down while + // the streamID is still meaningful. The handler's tearDownStreamPeers + // drives each Peer.Close() which in turn unsubscribes from the + // Sources we're about to shut down — preventing a "subscribers fan + // out into a closed channel" race. + if teardown != nil { + teardown(id) + } + if st.video != nil { _ = st.video.Close() } diff --git a/app/webrtc/subsystem.go b/app/webrtc/subsystem.go index d0d5d09..a9257d4 100644 --- a/app/webrtc/subsystem.go +++ b/app/webrtc/subsystem.go @@ -31,6 +31,12 @@ type Subsystem struct { mu sync.Mutex streams map[string]*processStream // processID -> stream pair + + // teardown is set by the Handler (or any other consumer) so the + // Subsystem can broadcast process-stop events. Called *before* + // the per-stream Sources are closed, so consumers can yank their + // own indexes while the stream id is still valid. + teardown func(streamID string) } // processStream captures the two Sources (video + audio) backing a @@ -110,6 +116,19 @@ func (s *Subsystem) Close() { } } +// SetTeardownHook registers a callback invoked just before a stream's +// Sources are closed in onProcessStop. The callback is expected to +// tear down any external resources keyed by streamID — most importantly +// the WHEP Handler's per-stream peer index. +// +// Calling SetTeardownHook again replaces the previous callback; pass +// nil to detach. Only one consumer is supported by design. +func (s *Subsystem) SetTeardownHook(fn func(streamID string)) { + s.mu.Lock() + defer s.mu.Unlock() + s.teardown = fn +} + // lookup returns the per-process stream pair for id, or nil, false. // Used by the WHEP handler. func (s *Subsystem) lookup(id string) (*processStream, bool) {