feat(discovery): add NdiDiscoveryService with diff-based event emission
Some checks failed
CI / build-and-test (push) Failing after 22s

This commit is contained in:
Zac Gaetano 2026-05-07 15:14:15 +00:00
parent cef9018b6d
commit 1b280e3e77
2 changed files with 148 additions and 0 deletions

View file

@ -0,0 +1,79 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using TeamsISO.Engine.Interop;
namespace TeamsISO.Engine.Discovery;
/// <summary>
/// Polls <see cref="INdiInterop.GetCurrentSources"/> at a fixed cadence, diffs the
/// resulting set against the previous poll, and emits <see cref="DiscoveryEvent"/>s
/// on a channel for downstream consumers.
/// </summary>
public sealed class NdiDiscoveryService
{
private readonly INdiInterop _interop;
private readonly ChannelWriter<DiscoveryEvent> _writer;
private readonly ILogger<NdiDiscoveryService> _logger;
private readonly NdiFindHandle _finder;
private readonly HashSet<string> _previous = new();
public NdiDiscoveryService(
INdiInterop interop,
ChannelWriter<DiscoveryEvent> writer,
ILogger<NdiDiscoveryService> logger)
{
_interop = interop;
_writer = writer;
_logger = logger;
_finder = interop.CreateFinder();
}
/// <summary>
/// Runs a single poll cycle. Public for unit testing; production uses <see cref="RunAsync"/>.
/// </summary>
public void PollOnce()
{
var current = _interop.GetCurrentSources(_finder);
var currentSet = new HashSet<string>(current);
foreach (var name in currentSet.Except(_previous))
{
var parsed = NdiSourceParser.Parse(name);
if (parsed is null)
{
_logger.LogTrace("Ignoring unrecognized source: {Name}", name);
continue;
}
_writer.TryWrite(new DiscoveryEvent.Added(parsed));
}
foreach (var name in _previous.Except(currentSet))
{
var parsed = NdiSourceParser.Parse(name);
if (parsed is null) continue;
_writer.TryWrite(new DiscoveryEvent.Removed(parsed));
}
_previous.Clear();
foreach (var name in currentSet) _previous.Add(name);
}
/// <summary>Long-running poll loop. Cancel the token to stop.</summary>
public async Task RunAsync(TimeSpan pollInterval, CancellationToken cancellationToken)
{
using var timer = new PeriodicTimer(pollInterval);
try
{
while (await timer.WaitForNextTickAsync(cancellationToken))
{
try { PollOnce(); }
catch (Exception ex) { _logger.LogWarning(ex, "Discovery poll failed; will retry on next tick."); }
}
}
catch (OperationCanceledException) { /* expected */ }
finally
{
_finder.Dispose();
}
}
}

View file

@ -0,0 +1,69 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging.Abstractions;
using TeamsISO.Engine.Discovery;
using TeamsISO.Engine.Tests.Fakes;
namespace TeamsISO.Engine.Tests.Discovery;
public class NdiDiscoveryServiceTests
{
[Fact]
public void PollOnce_AddsNewParticipantSources_AndIgnoresMalformedStrings()
{
var interop = new FakeNdiInterop();
interop.Sources.Add("PC1 (Teams - Jane)");
interop.Sources.Add("PC1 (Teams)");
interop.Sources.Add("Just A Camera");
interop.Sources.Add("BAD (Teams - )");
var channel = Channel.CreateUnbounded<DiscoveryEvent>();
var svc = new NdiDiscoveryService(interop, channel.Writer, NullLogger<NdiDiscoveryService>.Instance);
svc.PollOnce();
var emitted = DrainChannel(channel.Reader);
emitted.OfType<DiscoveryEvent.Added>().Select(a => a.Source.FullName)
.Should().BeEquivalentTo(new[] { "PC1 (Teams - Jane)", "PC1 (Teams)" });
}
[Fact]
public void PollOnce_EmitsRemoved_WhenSourceDisappears()
{
var interop = new FakeNdiInterop();
interop.Sources.Add("PC1 (Teams - Jane)");
var channel = Channel.CreateUnbounded<DiscoveryEvent>();
var svc = new NdiDiscoveryService(interop, channel.Writer, NullLogger<NdiDiscoveryService>.Instance);
svc.PollOnce();
DrainChannel(channel.Reader);
interop.Sources.Clear();
svc.PollOnce();
var emitted = DrainChannel(channel.Reader);
emitted.OfType<DiscoveryEvent.Removed>().Select(r => r.Source.FullName)
.Should().BeEquivalentTo(new[] { "PC1 (Teams - Jane)" });
}
[Fact]
public void PollOnce_NoChange_EmitsNothing()
{
var interop = new FakeNdiInterop();
interop.Sources.Add("PC1 (Teams - Jane)");
var channel = Channel.CreateUnbounded<DiscoveryEvent>();
var svc = new NdiDiscoveryService(interop, channel.Writer, NullLogger<NdiDiscoveryService>.Instance);
svc.PollOnce(); DrainChannel(channel.Reader);
svc.PollOnce();
DrainChannel(channel.Reader).Should().BeEmpty();
}
private static List<DiscoveryEvent> DrainChannel(ChannelReader<DiscoveryEvent> reader)
{
var list = new List<DiscoveryEvent>();
while (reader.TryRead(out var ev)) list.Add(ev);
return list;
}
}