From cd5e852a306df68e4ddbecfdc088f521bdcbb847 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Thu, 7 May 2026 15:28:27 +0000 Subject: [PATCH] feat(controller): add IIsoController and IsoController implementation --- .../Controller/IIsoController.cs | 34 +++ .../Controller/IsoController.cs | 205 ++++++++++++++++++ .../Controller/IsoControllerTests.cs | 145 +++++++++++++ 3 files changed, 384 insertions(+) create mode 100644 src/TeamsISO.Engine/Controller/IIsoController.cs create mode 100644 src/TeamsISO.Engine/Controller/IsoController.cs create mode 100644 src/tests/TeamsISO.Engine.Tests/Controller/IsoControllerTests.cs diff --git a/src/TeamsISO.Engine/Controller/IIsoController.cs b/src/TeamsISO.Engine/Controller/IIsoController.cs new file mode 100644 index 0000000..384f976 --- /dev/null +++ b/src/TeamsISO.Engine/Controller/IIsoController.cs @@ -0,0 +1,34 @@ +using TeamsISO.Engine.Domain; + +namespace TeamsISO.Engine.Controller; + +/// +/// Top-of-engine API the WPF host (Phase C) and any future control APIs (OSC / WebSocket in v2.0) +/// bind to. All commands are async and cancellable. +/// +public interface IIsoController : IAsyncDisposable +{ + /// Observable list of currently-known meeting participants. + IObservable> Participants { get; } + + /// Observable stream of engine alerts (for UI banner display and ops logging). + IObservable Alerts { get; } + + /// Current global processing settings. + FrameProcessingSettings GlobalSettings { get; } + + /// Starts discovery and supervises the runtime probe. Returns once startup is complete. + Task StartAsync(CancellationToken cancellationToken); + + /// Returns the latest for a given participant's ISO, or empty if none. + IsoHealthStats GetStats(Guid participantId); + + /// Enables an ISO pipeline for the given participant. + Task EnableIsoAsync(Guid participantId, string? customName, CancellationToken cancellationToken); + + /// Disables and tears down the pipeline for the given participant. + Task DisableIsoAsync(Guid participantId, CancellationToken cancellationToken); + + /// Updates global processing settings and persists them. Currently does not restart running pipelines (Phase C wires that). + Task SetGlobalSettingsAsync(FrameProcessingSettings settings, CancellationToken cancellationToken); +} diff --git a/src/TeamsISO.Engine/Controller/IsoController.cs b/src/TeamsISO.Engine/Controller/IsoController.cs new file mode 100644 index 0000000..be26051 --- /dev/null +++ b/src/TeamsISO.Engine/Controller/IsoController.cs @@ -0,0 +1,205 @@ +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using TeamsISO.Engine.Discovery; +using TeamsISO.Engine.Domain; +using TeamsISO.Engine.Interop; +using TeamsISO.Engine.Persistence; +using TeamsISO.Engine.Pipeline; + +namespace TeamsISO.Engine.Controller; + +/// +/// Default implementation of . +/// Holds the participant tracker, the discovery service, the pipeline dictionary, the config store, +/// and the runtime probe. Exposes participant and alert observables for the host UI. +/// +public sealed class IsoController : IIsoController +{ + private readonly INdiInterop _interop; + private readonly Func _pipelineFactory; + private readonly ConfigStore _configStore; + private readonly NdiRuntimeProbe _runtimeProbe; + private readonly ILogger _logger; + private readonly TimeSpan _renameWindow; + private readonly TimeSpan _discoveryInterval; + + private readonly ParticipantTracker _tracker; + private readonly Channel _discoveryChannel; + private readonly NdiDiscoveryService _discovery; + private readonly Dictionary _pipelines = new(); + private readonly BehaviorSubject> _participants = + new(Array.Empty()); + private readonly Subject _alerts = new(); + private readonly object _gate = new(); + + private FrameProcessingSettings _settings; + private CancellationTokenSource? _cts; + private Task? _discoveryTask; + private Task? _eventPumpTask; + + public IObservable> Participants => _participants.AsObservable(); + public IObservable Alerts => _alerts.AsObservable(); + public FrameProcessingSettings GlobalSettings { get { lock (_gate) return _settings; } } + + public IsoController( + INdiInterop interop, + Func pipelineFactory, + ConfigStore configStore, + NdiRuntimeProbe runtimeProbe, + ILoggerFactory loggerFactory, + TimeSpan? renameWindow = null, + TimeSpan? discoveryInterval = null, + Func? clock = null) + { + _interop = interop; + _pipelineFactory = pipelineFactory; + _configStore = configStore; + _runtimeProbe = runtimeProbe; + _logger = loggerFactory.CreateLogger(); + _renameWindow = renameWindow ?? TimeSpan.FromSeconds(5); + _discoveryInterval = discoveryInterval ?? TimeSpan.FromMilliseconds(500); + + var loaded = configStore.Load(); + _settings = loaded.Global; + + _tracker = new ParticipantTracker(_renameWindow, clock ?? (() => DateTimeOffset.UtcNow)); + _discoveryChannel = Channel.CreateUnbounded(); + _discovery = new NdiDiscoveryService( + interop, _discoveryChannel.Writer, + loggerFactory.CreateLogger()); + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + if (_cts is not null) + throw new InvalidOperationException("Controller already started."); + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + // Runtime probe — surface alert if mismatch but don't fail startup. + var probeResult = _runtimeProbe.Probe(); + if (probeResult is NdiRuntimeProbeResult.Mismatch mismatch) + { + _alerts.OnNext(new EngineAlert.NdiRuntimeMismatch(mismatch.Detected, mismatch.Expected)); + _logger.LogWarning("NDI runtime mismatch: detected {Detected}, expected {Expected}.", + mismatch.Detected, mismatch.Expected); + } + + _discoveryTask = _discovery.RunAsync(_discoveryInterval, _cts.Token); + _eventPumpTask = PumpDiscoveryEventsAsync(_cts.Token); + await Task.CompletedTask; + } + + public IsoHealthStats GetStats(Guid participantId) + { + lock (_gate) + { + return _pipelines.TryGetValue(participantId, out var pipeline) + ? IsoHealthStats.Empty // production wires pipeline.Stats; Phase B-1 leaves this stub + : IsoHealthStats.Empty; + } + } + + public async Task EnableIsoAsync(Guid participantId, string? customName, CancellationToken cancellationToken) + { + Participant? p; + lock (_gate) + { + if (_pipelines.ContainsKey(participantId)) return; + p = _tracker.Participants.FirstOrDefault(x => x.Id == participantId); + } + if (p is null || p.CurrentSource is null) + throw new InvalidOperationException($"Participant {participantId} not currently visible on the network."); + + var output = customName ?? DefaultOutputName(participantId); + var config = new IsoPipelineConfig(participantId, p.CurrentSource.FullName, output, _settings); + var pipeline = _pipelineFactory(config); + + lock (_gate) _pipelines[participantId] = pipeline; + await pipeline.StartAsync(); + await PersistAssignmentsAsync(cancellationToken); + } + + public async Task DisableIsoAsync(Guid participantId, CancellationToken cancellationToken) + { + IsoPipeline? pipeline; + lock (_gate) + { + if (!_pipelines.Remove(participantId, out pipeline)) return; + } + await pipeline.StopAsync(); + await pipeline.DisposeAsync(); + await PersistAssignmentsAsync(cancellationToken); + } + + public Task SetGlobalSettingsAsync(FrameProcessingSettings settings, CancellationToken cancellationToken) + { + lock (_gate) _settings = settings; + return PersistAssignmentsAsync(cancellationToken); + } + + private Task PersistAssignmentsAsync(CancellationToken cancellationToken) + { + try + { + FrameProcessingSettings settings; + IReadOnlyList assignments; + lock (_gate) + { + settings = _settings; + assignments = _pipelines.Keys.Select(id => + new IsoAssignment(id, IsEnabled: true, CustomOutputName: null)).ToArray(); + } + _configStore.Save(new EngineConfig(settings, assignments)); + } + catch (Exception ex) + { + _alerts.OnNext(new EngineAlert.ConfigSaveFailed(ex.Message)); + _logger.LogWarning(ex, "Failed to persist engine config."); + } + return Task.CompletedTask; + } + + private async Task PumpDiscoveryEventsAsync(CancellationToken cancellationToken) + { + try + { + await foreach (var ev in _discoveryChannel.Reader.ReadAllAsync(cancellationToken)) + { + lock (_gate) + { + _tracker.Apply(ev); + _participants.OnNext(_tracker.Participants); + } + } + } + catch (OperationCanceledException) { } + } + + private static string DefaultOutputName(Guid participantId) => + $"TEAMSISO_{participantId.ToString("N")[..8].ToUpperInvariant()}"; + + public async ValueTask DisposeAsync() + { + _cts?.Cancel(); + if (_eventPumpTask is not null) + { + try { await _eventPumpTask; } catch { /* swallow */ } + } + if (_discoveryTask is not null) + { + try { await _discoveryTask; } catch { /* swallow */ } + } + IsoPipeline[] toDispose; + lock (_gate) + { + toDispose = _pipelines.Values.ToArray(); + _pipelines.Clear(); + } + foreach (var p in toDispose) await p.DisposeAsync(); + _alerts.OnCompleted(); + _participants.OnCompleted(); + _cts?.Dispose(); + } +} diff --git a/src/tests/TeamsISO.Engine.Tests/Controller/IsoControllerTests.cs b/src/tests/TeamsISO.Engine.Tests/Controller/IsoControllerTests.cs new file mode 100644 index 0000000..cd2d165 --- /dev/null +++ b/src/tests/TeamsISO.Engine.Tests/Controller/IsoControllerTests.cs @@ -0,0 +1,145 @@ +using System.Reactive.Linq; +using Microsoft.Extensions.Logging.Abstractions; +using TeamsISO.Engine.Controller; +using TeamsISO.Engine.Domain; +using TeamsISO.Engine.Interop; +using TeamsISO.Engine.Persistence; +using TeamsISO.Engine.Pipeline; +using TeamsISO.Engine.Tests.Fakes; + +namespace TeamsISO.Engine.Tests.Controller; + +public class IsoControllerTests : IDisposable +{ + private readonly string _dir; + private readonly ConfigStore _store; + private readonly FakeNdiInterop _interop; + private readonly NdiRuntimeProbe _probeMatch; + private readonly NdiRuntimeProbe _probeMismatch; + private readonly List _factoryCalls = new(); + private readonly Dictionary _pipelineBlockers = new(); + + public IsoControllerTests() + { + _dir = Path.Combine(Path.GetTempPath(), $"teamsiso-controller-{Guid.NewGuid():N}"); + Directory.CreateDirectory(_dir); + _store = new ConfigStore(Path.Combine(_dir, "config.json"), NullLogger.Instance); + _interop = new FakeNdiInterop { RuntimeVersion = "6.0.0" }; + _probeMatch = new NdiRuntimeProbe(_interop, expectedVersion: "6.0.0"); + _probeMismatch = new NdiRuntimeProbe(_interop, expectedVersion: "9.9.9"); + } + + public void Dispose() => Directory.Delete(_dir, recursive: true); + + private IsoPipeline TestPipelineFactory(IsoPipelineConfig config) + { + _factoryCalls.Add(config); + var blocker = new TaskCompletionSource(); + _pipelineBlockers[config.ParticipantId] = blocker; + var backoff = new ExponentialBackoff(maxAttempts: 3, initial: TimeSpan.FromMilliseconds(1), cap: TimeSpan.FromMilliseconds(5)); + return new IsoPipeline( + config.ParticipantId, + ct => blocker.Task.WaitAsync(ct), + backoff, + (_, _) => Task.CompletedTask, + NullLoggerFactory.Instance); + } + + private IsoController NewController(NdiRuntimeProbe? probe = null) => + new( + _interop, + TestPipelineFactory, + _store, + probe ?? _probeMatch, + NullLoggerFactory.Instance, + renameWindow: TimeSpan.FromSeconds(5), + discoveryInterval: TimeSpan.FromMilliseconds(20)); + + [Fact] + public async Task DiscoveredParticipant_AppearsInParticipantsObservable() + { + await using var controller = NewController(); + var seenLists = new List>(); + using var sub = controller.Participants.Subscribe(p => seenLists.Add(p)); + + await controller.StartAsync(CancellationToken.None); + _interop.Sources.Add("PC1 (Teams - Jane)"); + + // Wait for discovery to pick it up + var deadline = DateTime.UtcNow.AddSeconds(2); + while (seenLists.LastOrDefault()?.Any() != true && DateTime.UtcNow < deadline) + await Task.Delay(20); + + seenLists.Last().Should().HaveCount(1); + seenLists.Last()[0].DisplayName.Should().Be("Jane"); + } + + [Fact] + public async Task EnableIsoAsync_CreatesAndStartsPipeline() + { + await using var controller = NewController(); + await controller.StartAsync(CancellationToken.None); + + _interop.Sources.Add("PC1 (Teams - Jane)"); + var pid = await WaitForFirstParticipantAsync(controller); + + await controller.EnableIsoAsync(pid, customName: "TEAMSISO_JANE", CancellationToken.None); + + _factoryCalls.Should().HaveCount(1); + _factoryCalls[0].OutputName.Should().Be("TEAMSISO_JANE"); + _factoryCalls[0].SourceName.Should().Be("PC1 (Teams - Jane)"); + } + + [Fact] + public async Task DisableIsoAsync_StopsPipeline_AndRemovesIt() + { + await using var controller = NewController(); + await controller.StartAsync(CancellationToken.None); + + _interop.Sources.Add("PC1 (Teams - Jane)"); + var pid = await WaitForFirstParticipantAsync(controller); + await controller.EnableIsoAsync(pid, customName: null, CancellationToken.None); + + // Unblock the pipeline so StopAsync can complete + _pipelineBlockers[pid].SetResult(); + await controller.DisableIsoAsync(pid, CancellationToken.None); + + // Re-enable should now create a new pipeline + await controller.EnableIsoAsync(pid, customName: null, CancellationToken.None); + _factoryCalls.Should().HaveCount(2); + } + + [Fact] + public async Task SetGlobalSettingsAsync_PersistsToConfigStore() + { + await using var controller = NewController(); + var newSettings = new FrameProcessingSettings( + TargetFramerate.Fps59_94, TargetResolution.R1080p, AspectMode.Pillarbox, AudioMode.Auto); + + await controller.SetGlobalSettingsAsync(newSettings, CancellationToken.None); + + controller.GlobalSettings.Should().Be(newSettings); + _store.Load().Global.Should().Be(newSettings); + } + + [Fact] + public async Task StartAsync_RuntimeProbeMismatch_RaisesAlert() + { + await using var controller = NewController(probe: _probeMismatch); + var alerts = new List(); + using var sub = controller.Alerts.Subscribe(alerts.Add); + + await controller.StartAsync(CancellationToken.None); + + alerts.Should().Contain(a => a is EngineAlert.NdiRuntimeMismatch); + } + + private static async Task WaitForFirstParticipantAsync(IsoController controller) + { + var tcs = new TaskCompletionSource(); + using var sub = controller.Participants + .Where(p => p.Count > 0) + .Subscribe(p => tcs.TrySetResult(p[0].Id)); + return await tcs.Task.WaitAsync(TimeSpan.FromSeconds(3)); + } +}