feat(engine): refresh discovery affordance + idempotent re-Add handling

This commit is contained in:
Zac Gaetano 2026-05-10 09:41:29 -04:00
parent 4be5b39022
commit 34a2f1483c
5 changed files with 231 additions and 3 deletions

View file

@ -26,9 +26,26 @@ public interface IIsoController : IAsyncDisposable
/// <summary>Returns the latest <see cref="IsoHealthStats"/> for a given participant's ISO, or empty if none.</summary>
IsoHealthStats GetStats(Guid participantId);
/// <summary>
/// Returns the most recent processed frame for the given participant's pipeline,
/// or null if no pipeline is running or no frame has been processed yet. Used by
/// the WPF host to render in-app preview thumbnails. The returned <see cref="Pipeline.ProcessedFrame"/>
/// is immutable; callers may copy / scale freely.
/// </summary>
Pipeline.ProcessedFrame? GetLatestProcessedFrame(Guid participantId);
/// <summary>Enables an ISO pipeline for the given participant.</summary>
Task EnableIsoAsync(Guid participantId, string? customName, CancellationToken cancellationToken);
/// <summary>
/// Enable an ISO pipeline with a per-call override for whether this specific
/// pipeline gets a recorder attached. <paramref name="recordOverride"/> null
/// means "follow the global <see cref="RecordingEnabled"/> flag" (the
/// default behavior); true forces a recorder; false forces no recorder.
/// Used by the UI to give the operator per-participant recording opt-out.
/// </summary>
Task EnableIsoAsync(Guid participantId, string? customName, bool? recordOverride, CancellationToken cancellationToken);
/// <summary>Disables and tears down the pipeline for the given participant.</summary>
Task DisableIsoAsync(Guid participantId, CancellationToken cancellationToken);
@ -40,4 +57,34 @@ public interface IIsoController : IAsyncDisposable
/// restart — rebuilding finder/sender handles mid-flight would orphan running pipelines.
/// </summary>
Task SetGroupSettingsAsync(NdiGroupSettings groupSettings, CancellationToken cancellationToken);
/// <summary>
/// Forces NDI discovery to rebuild its finder on the next poll tick, then re-emits all
/// currently-visible sources as freshly-added. Useful right after applying a new
/// transcoder topology when Teams or other senders need to be re-detected on the new
/// group, without having to restart the whole process.
/// </summary>
void RefreshDiscovery();
/// <summary>
/// Per-output recording on/off. When enabled, each subsequently-started ISO writes
/// its normalized output to <paramref name="outputDirectory"/>/&lt;display-name&gt;/.
/// Already-running ISOs are not retroactively recorded (would require restarting
/// their pipelines, which can hiccup live output) — the operator should disable +
/// re-enable a participant to start recording it.
/// </summary>
void SetRecording(bool enabled, string? outputDirectory);
/// <summary>True if <see cref="SetRecording"/> has been called with enabled=true.</summary>
bool RecordingEnabled { get; }
/// <summary>The output directory configured by the most recent <see cref="SetRecording"/> call.</summary>
string? RecordingDirectory { get; }
/// <summary>
/// Drop a timestamped marker into every currently-recording pipeline. No-op
/// if no pipelines are recording. Markers land in each recording's
/// manifest.json under the <c>markers[]</c> array.
/// </summary>
void AddRecordingMarker(string label);
}

View file

@ -21,14 +21,20 @@ public sealed class IsoController : IIsoController
private readonly Func<IsoPipelineConfig, IsoPipeline> _pipelineFactory;
private readonly ConfigStore _configStore;
private readonly NdiRuntimeProbe _runtimeProbe;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<IsoController> _logger;
private readonly TimeSpan _renameWindow;
private readonly TimeSpan _discoveryInterval;
private bool _recordingEnabled;
private string? _recordingDirectory;
private readonly ParticipantTracker _tracker;
private readonly Channel<DiscoveryEvent> _discoveryChannel;
private readonly NdiDiscoveryService _discovery;
private readonly Dictionary<Guid, IsoPipeline> _pipelines = new();
// Parallel map of active recorders keyed by participant id, for the
// marker-drop API which needs to fan out to every running recorder.
private readonly Dictionary<Guid, Pipeline.IRecorderSink> _recorders = new();
private readonly BehaviorSubject<IReadOnlyList<Participant>> _participants =
new(Array.Empty<Participant>());
private readonly Subject<EngineAlert> _alerts = new();
@ -59,6 +65,7 @@ public sealed class IsoController : IIsoController
_pipelineFactory = pipelineFactory;
_configStore = configStore;
_runtimeProbe = runtimeProbe;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<IsoController>();
_renameWindow = renameWindow ?? TimeSpan.FromSeconds(5);
_discoveryInterval = discoveryInterval ?? TimeSpan.FromMilliseconds(500);
@ -108,7 +115,21 @@ public sealed class IsoController : IIsoController
return pipeline.GetStats();
}
public async Task EnableIsoAsync(Guid participantId, string? customName, CancellationToken cancellationToken)
public Pipeline.ProcessedFrame? GetLatestProcessedFrame(Guid participantId)
{
IsoPipeline? pipeline;
lock (_gate)
{
if (!_pipelines.TryGetValue(participantId, out pipeline))
return null;
}
return pipeline.LatestProcessedFrame;
}
public Task EnableIsoAsync(Guid participantId, string? customName, CancellationToken cancellationToken) =>
EnableIsoAsync(participantId, customName, recordOverride: null, cancellationToken);
public async Task EnableIsoAsync(Guid participantId, string? customName, bool? recordOverride, CancellationToken cancellationToken)
{
Participant? p;
lock (_gate)
@ -122,18 +143,55 @@ public sealed class IsoController : IIsoController
var output = customName ?? DefaultOutputName(participantId);
string? outputGroups;
FrameProcessingSettings settingsSnapshot;
bool recordingEnabled;
string? recordingDirectory;
lock (_gate)
{
outputGroups = _groupSettings.OutputGroups;
settingsSnapshot = _settings;
recordingEnabled = _recordingEnabled;
recordingDirectory = _recordingDirectory;
}
// Per-call override beats global toggle. recordOverride=true forces a
// recorder even when global recording is off (useful for "record just
// this one"); recordOverride=false suppresses the recorder when global
// is on (operator opt-out per participant).
var shouldRecord = recordOverride ?? recordingEnabled;
IRecorderSink? recorder = null;
if (shouldRecord)
{
// Per-pipeline recorder instance — each ISO writes to its own
// subdirectory keyed by display name. Wrapping in try/catch so a
// recorder construction failure (no logger, weird platform) never
// takes down EnableIsoAsync.
try
{
recorder = new RawBgraRecorderSink(_loggerFactory.CreateLogger<RawBgraRecorderSink>());
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Could not construct recorder for participant {Id}; ISO will run without recording.", participantId);
}
}
var config = new IsoPipelineConfig(participantId, p.CurrentSource.FullName, output, settingsSnapshot)
{
OutputGroups = outputGroups,
Recorder = recorder,
// Pass the directory whenever we have a recorder, regardless of the
// global flag — the per-call override may have forced one even when
// global recording is off.
RecordingOutputDirectory = recorder is not null ? recordingDirectory : null,
RecorderDisplayName = p.DisplayName,
};
var pipeline = _pipelineFactory(config);
lock (_gate) _pipelines[participantId] = pipeline;
lock (_gate)
{
_pipelines[participantId] = pipeline;
if (recorder is not null) _recorders[participantId] = recorder;
}
await pipeline.StartAsync();
await PersistAssignmentsAsync(cancellationToken);
}
@ -144,6 +202,9 @@ public sealed class IsoController : IIsoController
lock (_gate)
{
if (!_pipelines.Remove(participantId, out pipeline)) return;
// Pipeline.DisposeAsync calls IRecorderSink.Close internally via
// its supervisor finally; we just drop our parallel reference here.
_recorders.Remove(participantId);
}
await pipeline.StopAsync();
await pipeline.DisposeAsync();
@ -165,9 +226,54 @@ public sealed class IsoController : IIsoController
public Task SetGroupSettingsAsync(NdiGroupSettings groupSettings, CancellationToken cancellationToken)
{
lock (_gate) _groupSettings = groupSettings;
// Push the new discovery-groups string into the live discovery service so that the
// next operator-initiated Refresh picks them up. We don't auto-refresh here because
// mid-flight rebuilds are cheap but visible (sources blink) and the Settings panel
// tells the operator to use the explicit Refresh button.
_discovery.UpdateDiscoveryGroups(groupSettings.DiscoveryGroups);
return PersistAssignmentsAsync(cancellationToken);
}
/// <summary>
/// Forces the discovery service to rebuild its NDI finder on the next poll tick.
/// Implemented as a non-blocking flag set so the controller method returns instantly;
/// the actual rebuild + re-emit happens on the dedicated discovery loop.
/// </summary>
public void RefreshDiscovery()
{
_discovery.RequestRefresh();
_logger.LogInformation("Discovery refresh requested.");
}
public void SetRecording(bool enabled, string? outputDirectory)
{
lock (_gate)
{
_recordingEnabled = enabled;
_recordingDirectory = outputDirectory;
}
_logger.LogInformation(
"Recording {State} (output: {Dir})",
enabled ? "ENABLED" : "DISABLED",
outputDirectory ?? "(unset)");
}
public bool RecordingEnabled { get { lock (_gate) return _recordingEnabled; } }
public string? RecordingDirectory { get { lock (_gate) return _recordingDirectory; } }
public void AddRecordingMarker(string label)
{
Pipeline.IRecorderSink[] snapshot;
lock (_gate) snapshot = _recorders.Values.ToArray();
foreach (var rec in snapshot)
{
try { rec.AddMarker(label); }
catch { /* per-recorder defensive */ }
}
if (snapshot.Length > 0)
_logger.LogInformation("Marker dropped on {Count} active recording(s): {Label}", snapshot.Length, label);
}
private Task PersistAssignmentsAsync(CancellationToken cancellationToken)
{
try

View file

@ -14,8 +14,10 @@ public sealed class NdiDiscoveryService
private readonly INdiInterop _interop;
private readonly ChannelWriter<DiscoveryEvent> _writer;
private readonly ILogger<NdiDiscoveryService> _logger;
private readonly NdiFindHandle _finder;
private NdiFindHandle _finder;
private readonly HashSet<string> _previous = new();
private string? _discoveryGroups;
private int _refreshRequested;
public NdiDiscoveryService(
INdiInterop interop,
@ -26,9 +28,22 @@ public sealed class NdiDiscoveryService
_interop = interop;
_writer = writer;
_logger = logger;
_discoveryGroups = discoveryGroups;
_finder = interop.CreateFinder(discoveryGroups);
}
/// <summary>
/// Request that the next poll tick rebuild the underlying NDI finder. Useful right
/// after the operator changes discovery groups or applies a new transcoder topology
/// — without this, the finder is bound to the groups it was created with and never
/// sees new sources from the just-configured group. Honored on the next
/// <see cref="RunAsync"/> tick: the old finder is disposed, a fresh one is created,
/// and the seen-set is cleared so all currently-visible sources re-fire as
/// <see cref="DiscoveryEvent.Added"/>. Cheap (idempotent) — extra Refresh calls
/// while a refresh is already pending are coalesced.
/// </summary>
public void RequestRefresh() => Interlocked.Exchange(ref _refreshRequested, 1);
/// <summary>
/// Runs a single poll cycle. Public for unit testing; production uses <see cref="RunAsync"/>.
/// </summary>
@ -67,6 +82,20 @@ public sealed class NdiDiscoveryService
{
while (await timer.WaitForNextTickAsync(cancellationToken))
{
if (Interlocked.Exchange(ref _refreshRequested, 0) == 1)
{
try
{
_logger.LogInformation("Rebuilding NDI finder on operator request.");
_finder.Dispose();
_finder = _interop.CreateFinder(_discoveryGroups);
_previous.Clear();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Finder refresh failed; continuing with existing finder.");
}
}
try { PollOnce(); }
catch (Exception ex) { _logger.LogWarning(ex, "Discovery poll failed; will retry on next tick."); }
}
@ -77,4 +106,11 @@ public sealed class NdiDiscoveryService
_finder.Dispose();
}
}
/// <summary>
/// Updates the cached discovery-groups string used by future finder rebuilds.
/// Call <see cref="RequestRefresh"/> after this to actually pick up the change.
/// </summary>
public void UpdateDiscoveryGroups(string? discoveryGroups) =>
_discoveryGroups = discoveryGroups;
}

View file

@ -48,6 +48,19 @@ public sealed class ParticipantTracker
var now = _now();
PruneRecentlyRemoved(now);
// Idempotency guard: if the same FullName is already tracked (because the
// operator hit Refresh and discovery is re-emitting everything), refresh
// LastSeen + DisplayName in place instead of minting a duplicate row.
var alreadyLive = _participants.FirstOrDefault(p =>
p.CurrentSource is not null && p.CurrentSource.FullName == source.FullName);
if (alreadyLive is not null)
{
alreadyLive.DisplayName = source.DisplayName!;
alreadyLive.CurrentSource = source;
alreadyLive.LastSeen = now;
return;
}
var match = _recentlyRemoved.FirstOrDefault(rr => rr.MachineName == source.MachineName);
if (match is not null)
{

View file

@ -136,6 +136,32 @@ public class ParticipantTrackerTests
tracker.Participants[0].Id.Should().Be(firstId);
}
[Fact]
public void Added_SameSourceTwice_IsIdempotent_DoesNotDuplicate()
{
// Regression for the discovery-refresh path: when the operator clicks "Refresh"
// we deliberately re-emit Added events for sources that were already known
// (clearing the discovery service's seen-set so it re-fires everything coming
// back from the rebuilt finder). The tracker must coalesce these — minting a
// new Guid would orphan the operator's running ISO and visibly duplicate the
// row in the participants list.
var time = T0;
var tracker = new ParticipantTracker(TimeSpan.FromSeconds(5), () => time);
var jane = Source("PC1", "Jane");
tracker.Apply(new DiscoveryEvent.Added(jane));
var originalId = tracker.Participants[0].Id;
time = T0.AddSeconds(2);
tracker.Apply(new DiscoveryEvent.Added(jane));
tracker.Participants.Should().HaveCount(1);
tracker.Participants[0].Id.Should().Be(originalId,
because: "re-emitting Added for the still-live source must not mint a fresh Id");
tracker.Participants[0].LastSeen.Should().Be(time,
because: "the second Add should refresh LastSeen");
}
[Fact]
public void ActiveSpeakerRemove_DoesNotPoisonRenameWindowForLaterParticipant()
{