dragon-iso/src/TeamsISO.Engine/Controller/IsoController.cs
Zac Gaetano 909237f454 feat(ndi): plumb NDI groups (discovery + output) through the engine
Adds an NdiGroupSettings record carrying optional comma-separated NDI group lists for the finder and the senders. Extends INdiInterop.CreateFinder / CreateSender with optional groups arguments and populates NDIlib_find_create_t.p_groups and NDIlib_send_create_t.p_groups via P/Invoke. IsoController reads the settings on construction, threads DiscoveryGroups into NdiDiscoveryService and OutputGroups into IsoPipelineConfig, and exposes SetGroupSettingsAsync for runtime updates (group changes apply on next process restart so live pipelines aren't orphaned).

This unblocks the 'transcoder' topology where Teams broadcasts NDI on a private group (e.g. teamsiso-input) and TeamsISO re-emits clean normalized streams on Public — keeping raw, wrong-framerate Teams sources off the production network.

EngineConfig schema is JSON-back-compat: existing config.json files (no NdiGroups field) deserialize with NdiGroups=null and load as NdiGroupSettings.Default. UI surface for these settings comes in a follow-up.

Tests: 72/72 passing (was 69) — added IsoController coverage that group settings are read from ConfigStore on startup, passed to the finder, threaded into per-pipeline config, and round-trip through SetGroupSettingsAsync/Save/Load.
2026-05-07 23:48:49 -04:00

233 lines
8.8 KiB
C#

using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using TeamsISO.Engine.Discovery;
using TeamsISO.Engine.Domain;
using TeamsISO.Engine.Interop;
using TeamsISO.Engine.Persistence;
using TeamsISO.Engine.Pipeline;
namespace TeamsISO.Engine.Controller;
/// <summary>
/// Default implementation of <see cref="IIsoController"/>.
/// Holds the participant tracker, the discovery service, the pipeline dictionary, the config store,
/// and the runtime probe. Exposes participant and alert observables for the host UI.
/// </summary>
public sealed class IsoController : IIsoController
{
private readonly INdiInterop _interop;
private readonly Func<IsoPipelineConfig, IsoPipeline> _pipelineFactory;
private readonly ConfigStore _configStore;
private readonly NdiRuntimeProbe _runtimeProbe;
private readonly ILogger<IsoController> _logger;
private readonly TimeSpan _renameWindow;
private readonly TimeSpan _discoveryInterval;
private readonly ParticipantTracker _tracker;
private readonly Channel<DiscoveryEvent> _discoveryChannel;
private readonly NdiDiscoveryService _discovery;
private readonly Dictionary<Guid, IsoPipeline> _pipelines = new();
private readonly BehaviorSubject<IReadOnlyList<Participant>> _participants =
new(Array.Empty<Participant>());
private readonly Subject<EngineAlert> _alerts = new();
private readonly object _gate = new();
private FrameProcessingSettings _settings;
private NdiGroupSettings _groupSettings;
private CancellationTokenSource? _cts;
private Task? _discoveryTask;
private Task? _eventPumpTask;
public IObservable<IReadOnlyList<Participant>> Participants => _participants.AsObservable();
public IObservable<EngineAlert> Alerts => _alerts.AsObservable();
public FrameProcessingSettings GlobalSettings { get { lock (_gate) return _settings; } }
public NdiGroupSettings GroupSettings { get { lock (_gate) return _groupSettings; } }
public IsoController(
INdiInterop interop,
Func<IsoPipelineConfig, IsoPipeline> pipelineFactory,
ConfigStore configStore,
NdiRuntimeProbe runtimeProbe,
ILoggerFactory loggerFactory,
TimeSpan? renameWindow = null,
TimeSpan? discoveryInterval = null,
Func<DateTimeOffset>? clock = null)
{
_interop = interop;
_pipelineFactory = pipelineFactory;
_configStore = configStore;
_runtimeProbe = runtimeProbe;
_logger = loggerFactory.CreateLogger<IsoController>();
_renameWindow = renameWindow ?? TimeSpan.FromSeconds(5);
_discoveryInterval = discoveryInterval ?? TimeSpan.FromMilliseconds(500);
var loaded = configStore.Load();
_settings = loaded.Global;
_groupSettings = loaded.GroupsOrDefault;
_tracker = new ParticipantTracker(_renameWindow, clock ?? (() => DateTimeOffset.UtcNow));
_discoveryChannel = Channel.CreateUnbounded<DiscoveryEvent>();
_discovery = new NdiDiscoveryService(
interop, _discoveryChannel.Writer,
loggerFactory.CreateLogger<NdiDiscoveryService>(),
_groupSettings.DiscoveryGroups);
}
public async Task StartAsync(CancellationToken cancellationToken)
{
if (_cts is not null)
throw new InvalidOperationException("Controller already started.");
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
// Runtime probe — surface alert if mismatch but don't fail startup.
var probeResult = _runtimeProbe.Probe();
if (probeResult is NdiRuntimeProbeResult.Mismatch mismatch)
{
_alerts.OnNext(new EngineAlert.NdiRuntimeMismatch(mismatch.Detected, mismatch.Expected));
_logger.LogWarning("NDI runtime mismatch: detected {Detected}, expected {Expected}.",
mismatch.Detected, mismatch.Expected);
}
_discoveryTask = _discovery.RunAsync(_discoveryInterval, _cts.Token);
_eventPumpTask = PumpDiscoveryEventsAsync(_cts.Token);
await Task.CompletedTask;
}
public IsoHealthStats GetStats(Guid participantId)
{
lock (_gate)
{
return _pipelines.TryGetValue(participantId, out var pipeline)
? IsoHealthStats.Empty // production wires pipeline.Stats; Phase B-1 leaves this stub
: IsoHealthStats.Empty;
}
}
public async Task EnableIsoAsync(Guid participantId, string? customName, CancellationToken cancellationToken)
{
Participant? p;
lock (_gate)
{
if (_pipelines.ContainsKey(participantId)) return;
p = _tracker.Participants.FirstOrDefault(x => x.Id == participantId);
}
if (p is null || p.CurrentSource is null)
throw new InvalidOperationException($"Participant {participantId} not currently visible on the network.");
var output = customName ?? DefaultOutputName(participantId);
string? outputGroups;
FrameProcessingSettings settingsSnapshot;
lock (_gate)
{
outputGroups = _groupSettings.OutputGroups;
settingsSnapshot = _settings;
}
var config = new IsoPipelineConfig(participantId, p.CurrentSource.FullName, output, settingsSnapshot)
{
OutputGroups = outputGroups,
};
var pipeline = _pipelineFactory(config);
lock (_gate) _pipelines[participantId] = pipeline;
await pipeline.StartAsync();
await PersistAssignmentsAsync(cancellationToken);
}
public async Task DisableIsoAsync(Guid participantId, CancellationToken cancellationToken)
{
IsoPipeline? pipeline;
lock (_gate)
{
if (!_pipelines.Remove(participantId, out pipeline)) return;
}
await pipeline.StopAsync();
await pipeline.DisposeAsync();
await PersistAssignmentsAsync(cancellationToken);
}
public Task SetGlobalSettingsAsync(FrameProcessingSettings settings, CancellationToken cancellationToken)
{
lock (_gate) _settings = settings;
return PersistAssignmentsAsync(cancellationToken);
}
/// <summary>
/// Updates the NDI group configuration. Note: existing finder/sender handles aren't
/// rebuilt — group changes take effect on the next process restart, since rebuilding
/// the live finder mid-flight would orphan in-flight participants. The settings
/// panel surfaces this caveat to the user.
/// </summary>
public Task SetGroupSettingsAsync(NdiGroupSettings groupSettings, CancellationToken cancellationToken)
{
lock (_gate) _groupSettings = groupSettings;
return PersistAssignmentsAsync(cancellationToken);
}
private Task PersistAssignmentsAsync(CancellationToken cancellationToken)
{
try
{
FrameProcessingSettings settings;
NdiGroupSettings groupSettings;
IReadOnlyList<IsoAssignment> assignments;
lock (_gate)
{
settings = _settings;
groupSettings = _groupSettings;
assignments = _pipelines.Keys.Select(id =>
new IsoAssignment(id, IsEnabled: true, CustomOutputName: null)).ToArray();
}
_configStore.Save(new EngineConfig(settings, assignments, groupSettings));
}
catch (Exception ex)
{
_alerts.OnNext(new EngineAlert.ConfigSaveFailed(ex.Message));
_logger.LogWarning(ex, "Failed to persist engine config.");
}
return Task.CompletedTask;
}
private async Task PumpDiscoveryEventsAsync(CancellationToken cancellationToken)
{
try
{
await foreach (var ev in _discoveryChannel.Reader.ReadAllAsync(cancellationToken))
{
lock (_gate)
{
_tracker.Apply(ev);
_participants.OnNext(_tracker.Participants);
}
}
}
catch (OperationCanceledException) { }
}
private static string DefaultOutputName(Guid participantId) =>
$"TEAMSISO_{participantId.ToString("N")[..8].ToUpperInvariant()}";
public async ValueTask DisposeAsync()
{
_cts?.Cancel();
if (_eventPumpTask is not null)
{
try { await _eventPumpTask; } catch { /* swallow */ }
}
if (_discoveryTask is not null)
{
try { await _discoveryTask; } catch { /* swallow */ }
}
IsoPipeline[] toDispose;
lock (_gate)
{
toDispose = _pipelines.Values.ToArray();
_pipelines.Clear();
}
foreach (var p in toDispose) await p.DisposeAsync();
_alerts.OnCompleted();
_participants.OnCompleted();
_cts?.Dispose();
}
}