From 1b280e3e7736113d32017a1db3412c99b78308dd Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Thu, 7 May 2026 15:14:15 +0000 Subject: [PATCH] feat(discovery): add NdiDiscoveryService with diff-based event emission --- .../Discovery/NdiDiscoveryService.cs | 79 +++++++++++++++++++ .../Discovery/NdiDiscoveryServiceTests.cs | 69 ++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100644 src/TeamsISO.Engine/Discovery/NdiDiscoveryService.cs create mode 100644 src/tests/TeamsISO.Engine.Tests/Discovery/NdiDiscoveryServiceTests.cs diff --git a/src/TeamsISO.Engine/Discovery/NdiDiscoveryService.cs b/src/TeamsISO.Engine/Discovery/NdiDiscoveryService.cs new file mode 100644 index 0000000..9031ca6 --- /dev/null +++ b/src/TeamsISO.Engine/Discovery/NdiDiscoveryService.cs @@ -0,0 +1,79 @@ +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using TeamsISO.Engine.Interop; + +namespace TeamsISO.Engine.Discovery; + +/// +/// Polls at a fixed cadence, diffs the +/// resulting set against the previous poll, and emits s +/// on a channel for downstream consumers. +/// +public sealed class NdiDiscoveryService +{ + private readonly INdiInterop _interop; + private readonly ChannelWriter _writer; + private readonly ILogger _logger; + private readonly NdiFindHandle _finder; + private readonly HashSet _previous = new(); + + public NdiDiscoveryService( + INdiInterop interop, + ChannelWriter writer, + ILogger logger) + { + _interop = interop; + _writer = writer; + _logger = logger; + _finder = interop.CreateFinder(); + } + + /// + /// Runs a single poll cycle. Public for unit testing; production uses . + /// + public void PollOnce() + { + var current = _interop.GetCurrentSources(_finder); + var currentSet = new HashSet(current); + + foreach (var name in currentSet.Except(_previous)) + { + var parsed = NdiSourceParser.Parse(name); + if (parsed is null) + { + _logger.LogTrace("Ignoring unrecognized source: {Name}", name); + continue; + } + _writer.TryWrite(new DiscoveryEvent.Added(parsed)); + } + + foreach (var name in _previous.Except(currentSet)) + { + var parsed = NdiSourceParser.Parse(name); + if (parsed is null) continue; + _writer.TryWrite(new DiscoveryEvent.Removed(parsed)); + } + + _previous.Clear(); + foreach (var name in currentSet) _previous.Add(name); + } + + /// Long-running poll loop. Cancel the token to stop. + public async Task RunAsync(TimeSpan pollInterval, CancellationToken cancellationToken) + { + using var timer = new PeriodicTimer(pollInterval); + try + { + while (await timer.WaitForNextTickAsync(cancellationToken)) + { + try { PollOnce(); } + catch (Exception ex) { _logger.LogWarning(ex, "Discovery poll failed; will retry on next tick."); } + } + } + catch (OperationCanceledException) { /* expected */ } + finally + { + _finder.Dispose(); + } + } +} diff --git a/src/tests/TeamsISO.Engine.Tests/Discovery/NdiDiscoveryServiceTests.cs b/src/tests/TeamsISO.Engine.Tests/Discovery/NdiDiscoveryServiceTests.cs new file mode 100644 index 0000000..54ca337 --- /dev/null +++ b/src/tests/TeamsISO.Engine.Tests/Discovery/NdiDiscoveryServiceTests.cs @@ -0,0 +1,69 @@ +using System.Threading.Channels; +using Microsoft.Extensions.Logging.Abstractions; +using TeamsISO.Engine.Discovery; +using TeamsISO.Engine.Tests.Fakes; + +namespace TeamsISO.Engine.Tests.Discovery; + +public class NdiDiscoveryServiceTests +{ + [Fact] + public void PollOnce_AddsNewParticipantSources_AndIgnoresMalformedStrings() + { + var interop = new FakeNdiInterop(); + interop.Sources.Add("PC1 (Teams - Jane)"); + interop.Sources.Add("PC1 (Teams)"); + interop.Sources.Add("Just A Camera"); + interop.Sources.Add("BAD (Teams - )"); + var channel = Channel.CreateUnbounded(); + + var svc = new NdiDiscoveryService(interop, channel.Writer, NullLogger.Instance); + + svc.PollOnce(); + + var emitted = DrainChannel(channel.Reader); + emitted.OfType().Select(a => a.Source.FullName) + .Should().BeEquivalentTo(new[] { "PC1 (Teams - Jane)", "PC1 (Teams)" }); + } + + [Fact] + public void PollOnce_EmitsRemoved_WhenSourceDisappears() + { + var interop = new FakeNdiInterop(); + interop.Sources.Add("PC1 (Teams - Jane)"); + var channel = Channel.CreateUnbounded(); + + var svc = new NdiDiscoveryService(interop, channel.Writer, NullLogger.Instance); + svc.PollOnce(); + DrainChannel(channel.Reader); + + interop.Sources.Clear(); + svc.PollOnce(); + + var emitted = DrainChannel(channel.Reader); + emitted.OfType().Select(r => r.Source.FullName) + .Should().BeEquivalentTo(new[] { "PC1 (Teams - Jane)" }); + } + + [Fact] + public void PollOnce_NoChange_EmitsNothing() + { + var interop = new FakeNdiInterop(); + interop.Sources.Add("PC1 (Teams - Jane)"); + var channel = Channel.CreateUnbounded(); + + var svc = new NdiDiscoveryService(interop, channel.Writer, NullLogger.Instance); + svc.PollOnce(); DrainChannel(channel.Reader); + + svc.PollOnce(); + + DrainChannel(channel.Reader).Should().BeEmpty(); + } + + private static List DrainChannel(ChannelReader reader) + { + var list = new List(); + while (reader.TryRead(out var ev)) list.Add(ev); + return list; + } +}