From 3a17e543c5ed7f4b3d10c41c77909126a860f1ee Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Fri, 17 Apr 2026 08:44:59 -0400 Subject: [PATCH] feat(webrtc): add thread-safe Registry for stream_id -> SourceHandle --- core/webrtc/registry.go | 51 +++++++++++++++++++++++++ core/webrtc/registry_test.go | 74 ++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 core/webrtc/registry.go create mode 100644 core/webrtc/registry_test.go diff --git a/core/webrtc/registry.go b/core/webrtc/registry.go new file mode 100644 index 0000000..fd924b2 --- /dev/null +++ b/core/webrtc/registry.go @@ -0,0 +1,51 @@ +package webrtc + +import ( + "fmt" + "sync" +) + +// SourceHandle is the minimal interface the Registry stores per stream_id. +// The concrete type is *Source, defined in source.go. +type SourceHandle interface { + ID() string +} + +// Registry is a thread-safe map from stream_id to active SourceHandle. +type Registry struct { + mu sync.RWMutex + streams map[string]SourceHandle +} + +// NewRegistry returns an empty Registry. +func NewRegistry() *Registry { + return &Registry{streams: make(map[string]SourceHandle)} +} + +// Register associates src with streamID. Returns an error if streamID is +// already registered. +func (r *Registry) Register(streamID string, src SourceHandle) error { + r.mu.Lock() + defer r.mu.Unlock() + if _, exists := r.streams[streamID]; exists { + return fmt.Errorf("webrtc: stream %q already registered", streamID) + } + r.streams[streamID] = src + return nil +} + +// Lookup returns the handle for streamID. The second return value is false +// if no source is registered. +func (r *Registry) Lookup(streamID string) (SourceHandle, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + src, ok := r.streams[streamID] + return src, ok +} + +// Deregister removes streamID. No-op if not present. +func (r *Registry) Deregister(streamID string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.streams, streamID) +} diff --git a/core/webrtc/registry_test.go b/core/webrtc/registry_test.go new file mode 100644 index 0000000..543777e --- /dev/null +++ b/core/webrtc/registry_test.go @@ -0,0 +1,74 @@ +package webrtc + +import ( + "sync" + "testing" +) + +// mockSource implements the minimum Source-like shape needed by the registry. +// The real Source type is defined in Task 5; the registry only needs a +// stable type to store and retrieve. +type mockSource struct { + id string +} + +func (m *mockSource) ID() string { return m.id } + +func TestRegistry_RegisterAndLookup(t *testing.T) { + r := NewRegistry() + src := &mockSource{id: "streamA"} + + if err := r.Register("streamA", src); err != nil { + t.Fatalf("Register returned error: %v", err) + } + + got, ok := r.Lookup("streamA") + if !ok { + t.Fatal("Lookup(streamA) returned ok=false, want true") + } + if got != src { + t.Errorf("Lookup returned %v, want %v", got, src) + } +} + +func TestRegistry_LookupMissing(t *testing.T) { + r := NewRegistry() + _, ok := r.Lookup("nope") + if ok { + t.Error("Lookup on empty registry returned ok=true, want false") + } +} + +func TestRegistry_DuplicateRegister(t *testing.T) { + r := NewRegistry() + _ = r.Register("streamA", &mockSource{id: "streamA"}) + + if err := r.Register("streamA", &mockSource{id: "streamA"}); err == nil { + t.Error("duplicate Register should return error, got nil") + } +} + +func TestRegistry_Deregister(t *testing.T) { + r := NewRegistry() + _ = r.Register("streamA", &mockSource{id: "streamA"}) + r.Deregister("streamA") + + if _, ok := r.Lookup("streamA"); ok { + t.Error("after Deregister, Lookup should return ok=false") + } +} + +func TestRegistry_ConcurrentAccess(t *testing.T) { + r := NewRegistry() + var wg sync.WaitGroup + + for i := 0; i < 100; i++ { + wg.Add(3) + id := string(rune('a' + (i % 26))) + go func() { defer wg.Done(); _ = r.Register(id, &mockSource{id: id}) }() + go func() { defer wg.Done(); _, _ = r.Lookup(id) }() + go func() { defer wg.Done(); r.Deregister(id) }() + } + wg.Wait() + // No assertion — test passes if -race doesn't flag anything. +}