feat(pipeline): add NdiReceiver with channel-based output
Some checks failed
CI / build-and-test (push) Has been cancelled

This commit is contained in:
Zac Gaetano 2026-05-07 15:23:26 +00:00
parent f1513ddaf5
commit ead5e79935
2 changed files with 131 additions and 0 deletions

View file

@ -0,0 +1,69 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using TeamsISO.Engine.Interop;
namespace TeamsISO.Engine.Pipeline;
/// <summary>
/// Wraps an <see cref="INdiInterop"/> receiver handle and pushes captured frames into a
/// <see cref="ChannelWriter{T}"/>. The channel is expected to be configured drop-oldest
/// by the caller (the <see cref="IsoPipeline"/>); this receiver does not enforce backpressure itself.
/// </summary>
public sealed class NdiReceiver : IDisposable
{
private readonly INdiInterop _interop;
private readonly string _sourceName;
private readonly ChannelWriter<RawFrame> _output;
private readonly ILogger<NdiReceiver> _logger;
private readonly NdiReceiverHandle _handle;
private long _framesCaptured;
public NdiReceiver(
INdiInterop interop,
string sourceName,
ChannelWriter<RawFrame> output,
ILogger<NdiReceiver> logger)
{
_interop = interop;
_sourceName = sourceName;
_output = output;
_logger = logger;
_handle = interop.CreateReceiver(sourceName);
}
public long FramesCaptured => Interlocked.Read(ref _framesCaptured);
/// <summary>
/// Captures one frame (or returns on timeout). Test seam.
/// </summary>
public void CaptureOnce(int timeoutMs)
{
var frame = _interop.CaptureFrame(_handle, timeoutMs);
if (frame is null) return;
Interlocked.Increment(ref _framesCaptured);
_output.TryWrite(frame);
}
/// <summary>
/// Long-running capture loop. Run on a dedicated thread (<c>TaskCreationOptions.LongRunning</c>).
/// </summary>
public Task RunAsync(CancellationToken cancellationToken) =>
Task.Factory.StartNew(() =>
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
CaptureOnce(timeoutMs: 100);
}
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
_logger.LogError(ex, "NdiReceiver loop crashed for source {Source}.", _sourceName);
throw;
}
}, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
public void Dispose() => _handle.Dispose();
}

View file

@ -0,0 +1,62 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging.Abstractions;
using TeamsISO.Engine.Pipeline;
using TeamsISO.Engine.Tests.Fakes;
namespace TeamsISO.Engine.Tests.Pipeline;
public class NdiReceiverTests
{
private const string Source = "PC1 (Teams - Jane)";
private static RawFrame MakeFrame(long ts) =>
new(640, 360, ts, new byte[640 * 360 * 4], PixelFormat.Bgra);
[Fact]
public void CaptureOnce_FrameAvailable_WritesToOutputAndIncrementsCounter()
{
var interop = new FakeNdiInterop();
interop.ReceiverFrames.GetOrAdd(Source, _ => new System.Collections.Concurrent.ConcurrentQueue<RawFrame>())
.Enqueue(MakeFrame(100));
var output = Channel.CreateUnbounded<RawFrame>();
var receiver = new NdiReceiver(interop, Source, output.Writer, NullLogger<NdiReceiver>.Instance);
receiver.CaptureOnce(timeoutMs: 100);
receiver.FramesCaptured.Should().Be(1);
output.Reader.TryRead(out var captured).Should().BeTrue();
captured!.TimestampTicks.Should().Be(100);
}
[Fact]
public void CaptureOnce_NoFrameAvailable_DoesNothing()
{
var interop = new FakeNdiInterop();
var output = Channel.CreateUnbounded<RawFrame>();
var receiver = new NdiReceiver(interop, Source, output.Writer, NullLogger<NdiReceiver>.Instance);
receiver.CaptureOnce(timeoutMs: 100);
receiver.FramesCaptured.Should().Be(0);
output.Reader.TryRead(out _).Should().BeFalse();
}
[Fact]
public async Task RunAsync_HonorsCancellation_AndDisposesHandle()
{
var interop = new FakeNdiInterop();
var output = Channel.CreateUnbounded<RawFrame>();
var receiver = new NdiReceiver(interop, Source, output.Writer, NullLogger<NdiReceiver>.Instance);
using var cts = new CancellationTokenSource();
var runTask = receiver.RunAsync(cts.Token);
await Task.Delay(50);
cts.Cancel();
await runTask;
// Receiver was created exactly once
interop.ReceiverCreatedCount[Source].Should().Be(1);
}
}