From 34a2f1483c4ed5ad88fae2e383783588dcac4876 Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Sun, 10 May 2026 09:41:29 -0400 Subject: [PATCH] feat(engine): refresh discovery affordance + idempotent re-Add handling --- .../Controller/IIsoController.cs | 47 ++++++++ .../Controller/IsoController.cs | 110 +++++++++++++++++- .../Discovery/NdiDiscoveryService.cs | 38 +++++- .../Discovery/ParticipantTracker.cs | 13 +++ .../Discovery/ParticipantTrackerTests.cs | 26 +++++ 5 files changed, 231 insertions(+), 3 deletions(-) diff --git a/src/TeamsISO.Engine/Controller/IIsoController.cs b/src/TeamsISO.Engine/Controller/IIsoController.cs index 5c16d31..898ad67 100644 --- a/src/TeamsISO.Engine/Controller/IIsoController.cs +++ b/src/TeamsISO.Engine/Controller/IIsoController.cs @@ -26,9 +26,26 @@ public interface IIsoController : IAsyncDisposable /// Returns the latest for a given participant's ISO, or empty if none. IsoHealthStats GetStats(Guid participantId); + /// + /// Returns the most recent processed frame for the given participant's pipeline, + /// or null if no pipeline is running or no frame has been processed yet. Used by + /// the WPF host to render in-app preview thumbnails. The returned + /// is immutable; callers may copy / scale freely. + /// + Pipeline.ProcessedFrame? GetLatestProcessedFrame(Guid participantId); + /// Enables an ISO pipeline for the given participant. Task EnableIsoAsync(Guid participantId, string? customName, CancellationToken cancellationToken); + /// + /// Enable an ISO pipeline with a per-call override for whether this specific + /// pipeline gets a recorder attached. null + /// means "follow the global flag" (the + /// default behavior); true forces a recorder; false forces no recorder. + /// Used by the UI to give the operator per-participant recording opt-out. + /// + Task EnableIsoAsync(Guid participantId, string? customName, bool? recordOverride, CancellationToken cancellationToken); + /// Disables and tears down the pipeline for the given participant. Task DisableIsoAsync(Guid participantId, CancellationToken cancellationToken); @@ -40,4 +57,34 @@ public interface IIsoController : IAsyncDisposable /// restart — rebuilding finder/sender handles mid-flight would orphan running pipelines. /// Task SetGroupSettingsAsync(NdiGroupSettings groupSettings, CancellationToken cancellationToken); + + /// + /// Forces NDI discovery to rebuild its finder on the next poll tick, then re-emits all + /// currently-visible sources as freshly-added. Useful right after applying a new + /// transcoder topology when Teams or other senders need to be re-detected on the new + /// group, without having to restart the whole process. + /// + void RefreshDiscovery(); + + /// + /// Per-output recording on/off. When enabled, each subsequently-started ISO writes + /// its normalized output to /<display-name>/. + /// Already-running ISOs are not retroactively recorded (would require restarting + /// their pipelines, which can hiccup live output) — the operator should disable + + /// re-enable a participant to start recording it. + /// + void SetRecording(bool enabled, string? outputDirectory); + + /// True if has been called with enabled=true. + bool RecordingEnabled { get; } + + /// The output directory configured by the most recent call. + string? RecordingDirectory { get; } + + /// + /// Drop a timestamped marker into every currently-recording pipeline. No-op + /// if no pipelines are recording. Markers land in each recording's + /// manifest.json under the markers[] array. + /// + void AddRecordingMarker(string label); } diff --git a/src/TeamsISO.Engine/Controller/IsoController.cs b/src/TeamsISO.Engine/Controller/IsoController.cs index fdda5ec..647c589 100644 --- a/src/TeamsISO.Engine/Controller/IsoController.cs +++ b/src/TeamsISO.Engine/Controller/IsoController.cs @@ -21,14 +21,20 @@ public sealed class IsoController : IIsoController private readonly Func _pipelineFactory; private readonly ConfigStore _configStore; private readonly NdiRuntimeProbe _runtimeProbe; + private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; private readonly TimeSpan _renameWindow; private readonly TimeSpan _discoveryInterval; + private bool _recordingEnabled; + private string? _recordingDirectory; private readonly ParticipantTracker _tracker; private readonly Channel _discoveryChannel; private readonly NdiDiscoveryService _discovery; private readonly Dictionary _pipelines = new(); + // Parallel map of active recorders keyed by participant id, for the + // marker-drop API which needs to fan out to every running recorder. + private readonly Dictionary _recorders = new(); private readonly BehaviorSubject> _participants = new(Array.Empty()); private readonly Subject _alerts = new(); @@ -59,6 +65,7 @@ public sealed class IsoController : IIsoController _pipelineFactory = pipelineFactory; _configStore = configStore; _runtimeProbe = runtimeProbe; + _loggerFactory = loggerFactory; _logger = loggerFactory.CreateLogger(); _renameWindow = renameWindow ?? TimeSpan.FromSeconds(5); _discoveryInterval = discoveryInterval ?? TimeSpan.FromMilliseconds(500); @@ -108,7 +115,21 @@ public sealed class IsoController : IIsoController return pipeline.GetStats(); } - public async Task EnableIsoAsync(Guid participantId, string? customName, CancellationToken cancellationToken) + public Pipeline.ProcessedFrame? GetLatestProcessedFrame(Guid participantId) + { + IsoPipeline? pipeline; + lock (_gate) + { + if (!_pipelines.TryGetValue(participantId, out pipeline)) + return null; + } + return pipeline.LatestProcessedFrame; + } + + public Task EnableIsoAsync(Guid participantId, string? customName, CancellationToken cancellationToken) => + EnableIsoAsync(participantId, customName, recordOverride: null, cancellationToken); + + public async Task EnableIsoAsync(Guid participantId, string? customName, bool? recordOverride, CancellationToken cancellationToken) { Participant? p; lock (_gate) @@ -122,18 +143,55 @@ public sealed class IsoController : IIsoController var output = customName ?? DefaultOutputName(participantId); string? outputGroups; FrameProcessingSettings settingsSnapshot; + bool recordingEnabled; + string? recordingDirectory; lock (_gate) { outputGroups = _groupSettings.OutputGroups; settingsSnapshot = _settings; + recordingEnabled = _recordingEnabled; + recordingDirectory = _recordingDirectory; } + + // Per-call override beats global toggle. recordOverride=true forces a + // recorder even when global recording is off (useful for "record just + // this one"); recordOverride=false suppresses the recorder when global + // is on (operator opt-out per participant). + var shouldRecord = recordOverride ?? recordingEnabled; + IRecorderSink? recorder = null; + if (shouldRecord) + { + // Per-pipeline recorder instance — each ISO writes to its own + // subdirectory keyed by display name. Wrapping in try/catch so a + // recorder construction failure (no logger, weird platform) never + // takes down EnableIsoAsync. + try + { + recorder = new RawBgraRecorderSink(_loggerFactory.CreateLogger()); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Could not construct recorder for participant {Id}; ISO will run without recording.", participantId); + } + } + var config = new IsoPipelineConfig(participantId, p.CurrentSource.FullName, output, settingsSnapshot) { OutputGroups = outputGroups, + Recorder = recorder, + // Pass the directory whenever we have a recorder, regardless of the + // global flag — the per-call override may have forced one even when + // global recording is off. + RecordingOutputDirectory = recorder is not null ? recordingDirectory : null, + RecorderDisplayName = p.DisplayName, }; var pipeline = _pipelineFactory(config); - lock (_gate) _pipelines[participantId] = pipeline; + lock (_gate) + { + _pipelines[participantId] = pipeline; + if (recorder is not null) _recorders[participantId] = recorder; + } await pipeline.StartAsync(); await PersistAssignmentsAsync(cancellationToken); } @@ -144,6 +202,9 @@ public sealed class IsoController : IIsoController lock (_gate) { if (!_pipelines.Remove(participantId, out pipeline)) return; + // Pipeline.DisposeAsync calls IRecorderSink.Close internally via + // its supervisor finally; we just drop our parallel reference here. + _recorders.Remove(participantId); } await pipeline.StopAsync(); await pipeline.DisposeAsync(); @@ -165,9 +226,54 @@ public sealed class IsoController : IIsoController public Task SetGroupSettingsAsync(NdiGroupSettings groupSettings, CancellationToken cancellationToken) { lock (_gate) _groupSettings = groupSettings; + // Push the new discovery-groups string into the live discovery service so that the + // next operator-initiated Refresh picks them up. We don't auto-refresh here because + // mid-flight rebuilds are cheap but visible (sources blink) and the Settings panel + // tells the operator to use the explicit Refresh button. + _discovery.UpdateDiscoveryGroups(groupSettings.DiscoveryGroups); return PersistAssignmentsAsync(cancellationToken); } + /// + /// Forces the discovery service to rebuild its NDI finder on the next poll tick. + /// Implemented as a non-blocking flag set so the controller method returns instantly; + /// the actual rebuild + re-emit happens on the dedicated discovery loop. + /// + public void RefreshDiscovery() + { + _discovery.RequestRefresh(); + _logger.LogInformation("Discovery refresh requested."); + } + + public void SetRecording(bool enabled, string? outputDirectory) + { + lock (_gate) + { + _recordingEnabled = enabled; + _recordingDirectory = outputDirectory; + } + _logger.LogInformation( + "Recording {State} (output: {Dir})", + enabled ? "ENABLED" : "DISABLED", + outputDirectory ?? "(unset)"); + } + + public bool RecordingEnabled { get { lock (_gate) return _recordingEnabled; } } + public string? RecordingDirectory { get { lock (_gate) return _recordingDirectory; } } + + public void AddRecordingMarker(string label) + { + Pipeline.IRecorderSink[] snapshot; + lock (_gate) snapshot = _recorders.Values.ToArray(); + foreach (var rec in snapshot) + { + try { rec.AddMarker(label); } + catch { /* per-recorder defensive */ } + } + if (snapshot.Length > 0) + _logger.LogInformation("Marker dropped on {Count} active recording(s): {Label}", snapshot.Length, label); + } + private Task PersistAssignmentsAsync(CancellationToken cancellationToken) { try diff --git a/src/TeamsISO.Engine/Discovery/NdiDiscoveryService.cs b/src/TeamsISO.Engine/Discovery/NdiDiscoveryService.cs index 37e8804..d60ce94 100644 --- a/src/TeamsISO.Engine/Discovery/NdiDiscoveryService.cs +++ b/src/TeamsISO.Engine/Discovery/NdiDiscoveryService.cs @@ -14,8 +14,10 @@ public sealed class NdiDiscoveryService private readonly INdiInterop _interop; private readonly ChannelWriter _writer; private readonly ILogger _logger; - private readonly NdiFindHandle _finder; + private NdiFindHandle _finder; private readonly HashSet _previous = new(); + private string? _discoveryGroups; + private int _refreshRequested; public NdiDiscoveryService( INdiInterop interop, @@ -26,9 +28,22 @@ public sealed class NdiDiscoveryService _interop = interop; _writer = writer; _logger = logger; + _discoveryGroups = discoveryGroups; _finder = interop.CreateFinder(discoveryGroups); } + /// + /// Request that the next poll tick rebuild the underlying NDI finder. Useful right + /// after the operator changes discovery groups or applies a new transcoder topology + /// — without this, the finder is bound to the groups it was created with and never + /// sees new sources from the just-configured group. Honored on the next + /// tick: the old finder is disposed, a fresh one is created, + /// and the seen-set is cleared so all currently-visible sources re-fire as + /// . Cheap (idempotent) — extra Refresh calls + /// while a refresh is already pending are coalesced. + /// + public void RequestRefresh() => Interlocked.Exchange(ref _refreshRequested, 1); + /// /// Runs a single poll cycle. Public for unit testing; production uses . /// @@ -67,6 +82,20 @@ public sealed class NdiDiscoveryService { while (await timer.WaitForNextTickAsync(cancellationToken)) { + if (Interlocked.Exchange(ref _refreshRequested, 0) == 1) + { + try + { + _logger.LogInformation("Rebuilding NDI finder on operator request."); + _finder.Dispose(); + _finder = _interop.CreateFinder(_discoveryGroups); + _previous.Clear(); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Finder refresh failed; continuing with existing finder."); + } + } try { PollOnce(); } catch (Exception ex) { _logger.LogWarning(ex, "Discovery poll failed; will retry on next tick."); } } @@ -77,4 +106,11 @@ public sealed class NdiDiscoveryService _finder.Dispose(); } } + + /// + /// Updates the cached discovery-groups string used by future finder rebuilds. + /// Call after this to actually pick up the change. + /// + public void UpdateDiscoveryGroups(string? discoveryGroups) => + _discoveryGroups = discoveryGroups; } diff --git a/src/TeamsISO.Engine/Discovery/ParticipantTracker.cs b/src/TeamsISO.Engine/Discovery/ParticipantTracker.cs index ca23971..a3c0fe3 100644 --- a/src/TeamsISO.Engine/Discovery/ParticipantTracker.cs +++ b/src/TeamsISO.Engine/Discovery/ParticipantTracker.cs @@ -48,6 +48,19 @@ public sealed class ParticipantTracker var now = _now(); PruneRecentlyRemoved(now); + // Idempotency guard: if the same FullName is already tracked (because the + // operator hit Refresh and discovery is re-emitting everything), refresh + // LastSeen + DisplayName in place instead of minting a duplicate row. + var alreadyLive = _participants.FirstOrDefault(p => + p.CurrentSource is not null && p.CurrentSource.FullName == source.FullName); + if (alreadyLive is not null) + { + alreadyLive.DisplayName = source.DisplayName!; + alreadyLive.CurrentSource = source; + alreadyLive.LastSeen = now; + return; + } + var match = _recentlyRemoved.FirstOrDefault(rr => rr.MachineName == source.MachineName); if (match is not null) { diff --git a/src/tests/TeamsISO.Engine.Tests/Discovery/ParticipantTrackerTests.cs b/src/tests/TeamsISO.Engine.Tests/Discovery/ParticipantTrackerTests.cs index 385e4df..e9268b0 100644 --- a/src/tests/TeamsISO.Engine.Tests/Discovery/ParticipantTrackerTests.cs +++ b/src/tests/TeamsISO.Engine.Tests/Discovery/ParticipantTrackerTests.cs @@ -136,6 +136,32 @@ public class ParticipantTrackerTests tracker.Participants[0].Id.Should().Be(firstId); } + [Fact] + public void Added_SameSourceTwice_IsIdempotent_DoesNotDuplicate() + { + // Regression for the discovery-refresh path: when the operator clicks "Refresh" + // we deliberately re-emit Added events for sources that were already known + // (clearing the discovery service's seen-set so it re-fires everything coming + // back from the rebuilt finder). The tracker must coalesce these — minting a + // new Guid would orphan the operator's running ISO and visibly duplicate the + // row in the participants list. + var time = T0; + var tracker = new ParticipantTracker(TimeSpan.FromSeconds(5), () => time); + var jane = Source("PC1", "Jane"); + + tracker.Apply(new DiscoveryEvent.Added(jane)); + var originalId = tracker.Participants[0].Id; + + time = T0.AddSeconds(2); + tracker.Apply(new DiscoveryEvent.Added(jane)); + + tracker.Participants.Should().HaveCount(1); + tracker.Participants[0].Id.Should().Be(originalId, + because: "re-emitting Added for the still-live source must not mint a fresh Id"); + tracker.Participants[0].LastSeen.Should().Be(time, + because: "the second Add should refresh LastSeen"); + } + [Fact] public void ActiveSpeakerRemove_DoesNotPoisonRenameWindowForLaterParticipant() {