using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading.Channels; using Microsoft.Extensions.Logging; using TeamsISO.Engine.Discovery; using TeamsISO.Engine.Domain; using TeamsISO.Engine.Interop; using TeamsISO.Engine.Persistence; using TeamsISO.Engine.Pipeline; namespace TeamsISO.Engine.Controller; /// /// Default implementation of . /// Holds the participant tracker, the discovery service, the pipeline dictionary, the config store, /// and the runtime probe. Exposes participant and alert observables for the host UI. /// public sealed class IsoController : IIsoController { private readonly INdiInterop _interop; private readonly Func _pipelineFactory; private readonly ConfigStore _configStore; private readonly NdiRuntimeProbe _runtimeProbe; private readonly ILogger _logger; private readonly TimeSpan _renameWindow; private readonly TimeSpan _discoveryInterval; private readonly ParticipantTracker _tracker; private readonly Channel _discoveryChannel; private readonly NdiDiscoveryService _discovery; private readonly Dictionary _pipelines = new(); private readonly BehaviorSubject> _participants = new(Array.Empty()); private readonly Subject _alerts = new(); private readonly object _gate = new(); private FrameProcessingSettings _settings; private CancellationTokenSource? _cts; private Task? _discoveryTask; private Task? _eventPumpTask; public IObservable> Participants => _participants.AsObservable(); public IObservable Alerts => _alerts.AsObservable(); public FrameProcessingSettings GlobalSettings { get { lock (_gate) return _settings; } } public IsoController( INdiInterop interop, Func pipelineFactory, ConfigStore configStore, NdiRuntimeProbe runtimeProbe, ILoggerFactory loggerFactory, TimeSpan? renameWindow = null, TimeSpan? discoveryInterval = null, Func? clock = null) { _interop = interop; _pipelineFactory = pipelineFactory; _configStore = configStore; _runtimeProbe = runtimeProbe; _logger = loggerFactory.CreateLogger(); _renameWindow = renameWindow ?? TimeSpan.FromSeconds(5); _discoveryInterval = discoveryInterval ?? TimeSpan.FromMilliseconds(500); var loaded = configStore.Load(); _settings = loaded.Global; _tracker = new ParticipantTracker(_renameWindow, clock ?? (() => DateTimeOffset.UtcNow)); _discoveryChannel = Channel.CreateUnbounded(); _discovery = new NdiDiscoveryService( interop, _discoveryChannel.Writer, loggerFactory.CreateLogger()); } public async Task StartAsync(CancellationToken cancellationToken) { if (_cts is not null) throw new InvalidOperationException("Controller already started."); _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); // Runtime probe — surface alert if mismatch but don't fail startup. var probeResult = _runtimeProbe.Probe(); if (probeResult is NdiRuntimeProbeResult.Mismatch mismatch) { _alerts.OnNext(new EngineAlert.NdiRuntimeMismatch(mismatch.Detected, mismatch.Expected)); _logger.LogWarning("NDI runtime mismatch: detected {Detected}, expected {Expected}.", mismatch.Detected, mismatch.Expected); } _discoveryTask = _discovery.RunAsync(_discoveryInterval, _cts.Token); _eventPumpTask = PumpDiscoveryEventsAsync(_cts.Token); await Task.CompletedTask; } public IsoHealthStats GetStats(Guid participantId) { lock (_gate) { return _pipelines.TryGetValue(participantId, out var pipeline) ? IsoHealthStats.Empty // production wires pipeline.Stats; Phase B-1 leaves this stub : IsoHealthStats.Empty; } } public async Task EnableIsoAsync(Guid participantId, string? customName, CancellationToken cancellationToken) { Participant? p; lock (_gate) { if (_pipelines.ContainsKey(participantId)) return; p = _tracker.Participants.FirstOrDefault(x => x.Id == participantId); } if (p is null || p.CurrentSource is null) throw new InvalidOperationException($"Participant {participantId} not currently visible on the network."); var output = customName ?? DefaultOutputName(participantId); var config = new IsoPipelineConfig(participantId, p.CurrentSource.FullName, output, _settings); var pipeline = _pipelineFactory(config); lock (_gate) _pipelines[participantId] = pipeline; await pipeline.StartAsync(); await PersistAssignmentsAsync(cancellationToken); } public async Task DisableIsoAsync(Guid participantId, CancellationToken cancellationToken) { IsoPipeline? pipeline; lock (_gate) { if (!_pipelines.Remove(participantId, out pipeline)) return; } await pipeline.StopAsync(); await pipeline.DisposeAsync(); await PersistAssignmentsAsync(cancellationToken); } public Task SetGlobalSettingsAsync(FrameProcessingSettings settings, CancellationToken cancellationToken) { lock (_gate) _settings = settings; return PersistAssignmentsAsync(cancellationToken); } private Task PersistAssignmentsAsync(CancellationToken cancellationToken) { try { FrameProcessingSettings settings; IReadOnlyList assignments; lock (_gate) { settings = _settings; assignments = _pipelines.Keys.Select(id => new IsoAssignment(id, IsEnabled: true, CustomOutputName: null)).ToArray(); } _configStore.Save(new EngineConfig(settings, assignments)); } catch (Exception ex) { _alerts.OnNext(new EngineAlert.ConfigSaveFailed(ex.Message)); _logger.LogWarning(ex, "Failed to persist engine config."); } return Task.CompletedTask; } private async Task PumpDiscoveryEventsAsync(CancellationToken cancellationToken) { try { await foreach (var ev in _discoveryChannel.Reader.ReadAllAsync(cancellationToken)) { lock (_gate) { _tracker.Apply(ev); _participants.OnNext(_tracker.Participants); } } } catch (OperationCanceledException) { } } private static string DefaultOutputName(Guid participantId) => $"TEAMSISO_{participantId.ToString("N")[..8].ToUpperInvariant()}"; public async ValueTask DisposeAsync() { _cts?.Cancel(); if (_eventPumpTask is not null) { try { await _eventPumpTask; } catch { /* swallow */ } } if (_discoveryTask is not null) { try { await _discoveryTask; } catch { /* swallow */ } } IsoPipeline[] toDispose; lock (_gate) { toDispose = _pipelines.Values.ToArray(); _pipelines.Clear(); } foreach (var p in toDispose) await p.DisposeAsync(); _alerts.OnCompleted(); _participants.OnCompleted(); _cts?.Dispose(); } }