diff --git a/src/TeamsISO.Engine/Pipeline/NdiReceiver.cs b/src/TeamsISO.Engine/Pipeline/NdiReceiver.cs new file mode 100644 index 0000000..0dea0f2 --- /dev/null +++ b/src/TeamsISO.Engine/Pipeline/NdiReceiver.cs @@ -0,0 +1,69 @@ +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using TeamsISO.Engine.Interop; + +namespace TeamsISO.Engine.Pipeline; + +/// +/// Wraps an receiver handle and pushes captured frames into a +/// . The channel is expected to be configured drop-oldest +/// by the caller (the ); this receiver does not enforce backpressure itself. +/// +public sealed class NdiReceiver : IDisposable +{ + private readonly INdiInterop _interop; + private readonly string _sourceName; + private readonly ChannelWriter _output; + private readonly ILogger _logger; + private readonly NdiReceiverHandle _handle; + private long _framesCaptured; + + public NdiReceiver( + INdiInterop interop, + string sourceName, + ChannelWriter output, + ILogger logger) + { + _interop = interop; + _sourceName = sourceName; + _output = output; + _logger = logger; + _handle = interop.CreateReceiver(sourceName); + } + + public long FramesCaptured => Interlocked.Read(ref _framesCaptured); + + /// + /// Captures one frame (or returns on timeout). Test seam. + /// + public void CaptureOnce(int timeoutMs) + { + var frame = _interop.CaptureFrame(_handle, timeoutMs); + if (frame is null) return; + Interlocked.Increment(ref _framesCaptured); + _output.TryWrite(frame); + } + + /// + /// Long-running capture loop. Run on a dedicated thread (TaskCreationOptions.LongRunning). + /// + public Task RunAsync(CancellationToken cancellationToken) => + Task.Factory.StartNew(() => + { + try + { + while (!cancellationToken.IsCancellationRequested) + { + CaptureOnce(timeoutMs: 100); + } + } + catch (OperationCanceledException) { } + catch (Exception ex) + { + _logger.LogError(ex, "NdiReceiver loop crashed for source {Source}.", _sourceName); + throw; + } + }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default); + + public void Dispose() => _handle.Dispose(); +} diff --git a/src/tests/TeamsISO.Engine.Tests/Pipeline/NdiReceiverTests.cs b/src/tests/TeamsISO.Engine.Tests/Pipeline/NdiReceiverTests.cs new file mode 100644 index 0000000..1d93422 --- /dev/null +++ b/src/tests/TeamsISO.Engine.Tests/Pipeline/NdiReceiverTests.cs @@ -0,0 +1,62 @@ +using System.Threading.Channels; +using Microsoft.Extensions.Logging.Abstractions; +using TeamsISO.Engine.Pipeline; +using TeamsISO.Engine.Tests.Fakes; + +namespace TeamsISO.Engine.Tests.Pipeline; + +public class NdiReceiverTests +{ + private const string Source = "PC1 (Teams - Jane)"; + + private static RawFrame MakeFrame(long ts) => + new(640, 360, ts, new byte[640 * 360 * 4], PixelFormat.Bgra); + + [Fact] + public void CaptureOnce_FrameAvailable_WritesToOutputAndIncrementsCounter() + { + var interop = new FakeNdiInterop(); + interop.ReceiverFrames.GetOrAdd(Source, _ => new System.Collections.Concurrent.ConcurrentQueue()) + .Enqueue(MakeFrame(100)); + + var output = Channel.CreateUnbounded(); + var receiver = new NdiReceiver(interop, Source, output.Writer, NullLogger.Instance); + + receiver.CaptureOnce(timeoutMs: 100); + + receiver.FramesCaptured.Should().Be(1); + output.Reader.TryRead(out var captured).Should().BeTrue(); + captured!.TimestampTicks.Should().Be(100); + } + + [Fact] + public void CaptureOnce_NoFrameAvailable_DoesNothing() + { + var interop = new FakeNdiInterop(); + var output = Channel.CreateUnbounded(); + var receiver = new NdiReceiver(interop, Source, output.Writer, NullLogger.Instance); + + receiver.CaptureOnce(timeoutMs: 100); + + receiver.FramesCaptured.Should().Be(0); + output.Reader.TryRead(out _).Should().BeFalse(); + } + + [Fact] + public async Task RunAsync_HonorsCancellation_AndDisposesHandle() + { + var interop = new FakeNdiInterop(); + var output = Channel.CreateUnbounded(); + var receiver = new NdiReceiver(interop, Source, output.Writer, NullLogger.Instance); + + using var cts = new CancellationTokenSource(); + var runTask = receiver.RunAsync(cts.Token); + + await Task.Delay(50); + cts.Cancel(); + await runTask; + + // Receiver was created exactly once + interop.ReceiverCreatedCount[Source].Should().Be(1); + } +}