feat(webrtc): add thread-safe Registry for stream_id -> SourceHandle

This commit is contained in:
Zac Gaetano 2026-04-17 08:44:59 -04:00
parent 2250cb0a8f
commit 3a17e543c5
2 changed files with 125 additions and 0 deletions

51
core/webrtc/registry.go Normal file
View file

@ -0,0 +1,51 @@
package webrtc
import (
"fmt"
"sync"
)
// SourceHandle is the minimal interface the Registry stores per stream_id.
// The concrete type is *Source, defined in source.go.
type SourceHandle interface {
ID() string
}
// Registry is a thread-safe map from stream_id to active SourceHandle.
type Registry struct {
mu sync.RWMutex
streams map[string]SourceHandle
}
// NewRegistry returns an empty Registry.
func NewRegistry() *Registry {
return &Registry{streams: make(map[string]SourceHandle)}
}
// Register associates src with streamID. Returns an error if streamID is
// already registered.
func (r *Registry) Register(streamID string, src SourceHandle) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.streams[streamID]; exists {
return fmt.Errorf("webrtc: stream %q already registered", streamID)
}
r.streams[streamID] = src
return nil
}
// Lookup returns the handle for streamID. The second return value is false
// if no source is registered.
func (r *Registry) Lookup(streamID string) (SourceHandle, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
src, ok := r.streams[streamID]
return src, ok
}
// Deregister removes streamID. No-op if not present.
func (r *Registry) Deregister(streamID string) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.streams, streamID)
}

View file

@ -0,0 +1,74 @@
package webrtc
import (
"sync"
"testing"
)
// mockSource implements the minimum Source-like shape needed by the registry.
// The real Source type is defined in Task 5; the registry only needs a
// stable type to store and retrieve.
type mockSource struct {
id string
}
func (m *mockSource) ID() string { return m.id }
func TestRegistry_RegisterAndLookup(t *testing.T) {
r := NewRegistry()
src := &mockSource{id: "streamA"}
if err := r.Register("streamA", src); err != nil {
t.Fatalf("Register returned error: %v", err)
}
got, ok := r.Lookup("streamA")
if !ok {
t.Fatal("Lookup(streamA) returned ok=false, want true")
}
if got != src {
t.Errorf("Lookup returned %v, want %v", got, src)
}
}
func TestRegistry_LookupMissing(t *testing.T) {
r := NewRegistry()
_, ok := r.Lookup("nope")
if ok {
t.Error("Lookup on empty registry returned ok=true, want false")
}
}
func TestRegistry_DuplicateRegister(t *testing.T) {
r := NewRegistry()
_ = r.Register("streamA", &mockSource{id: "streamA"})
if err := r.Register("streamA", &mockSource{id: "streamA"}); err == nil {
t.Error("duplicate Register should return error, got nil")
}
}
func TestRegistry_Deregister(t *testing.T) {
r := NewRegistry()
_ = r.Register("streamA", &mockSource{id: "streamA"})
r.Deregister("streamA")
if _, ok := r.Lookup("streamA"); ok {
t.Error("after Deregister, Lookup should return ok=false")
}
}
func TestRegistry_ConcurrentAccess(t *testing.T) {
r := NewRegistry()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(3)
id := string(rune('a' + (i % 26)))
go func() { defer wg.Done(); _ = r.Register(id, &mockSource{id: id}) }()
go func() { defer wg.Done(); _, _ = r.Lookup(id) }()
go func() { defer wg.Done(); r.Deregister(id) }()
}
wg.Wait()
// No assertion — test passes if -race doesn't flag anything.
}