diff --git a/src/TeamsISO.Engine/Pipeline/NdiSender.cs b/src/TeamsISO.Engine/Pipeline/NdiSender.cs new file mode 100644 index 0000000..f828f94 --- /dev/null +++ b/src/TeamsISO.Engine/Pipeline/NdiSender.cs @@ -0,0 +1,70 @@ +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using TeamsISO.Engine.Interop; + +namespace TeamsISO.Engine.Pipeline; + +/// +/// Pulls processed frames from a channel and forwards them to . +/// +public sealed class NdiSender : IDisposable +{ + private readonly INdiInterop _interop; + private readonly string _outputName; + private readonly ChannelReader _input; + private readonly ILogger _logger; + private readonly NdiSenderHandle _handle; + private long _framesSent; + + public NdiSender( + INdiInterop interop, + string outputName, + ChannelReader input, + ILogger logger) + { + _interop = interop; + _outputName = outputName; + _input = input; + _logger = logger; + _handle = interop.CreateSender(outputName); + } + + public long FramesSent => Interlocked.Read(ref _framesSent); + + /// + /// Awaits one frame and forwards it. Returns false if the channel is completed. + /// Test seam. + /// + public async ValueTask SendNextAsync(CancellationToken cancellationToken) + { + if (!await _input.WaitToReadAsync(cancellationToken)) + return false; + if (!_input.TryRead(out var frame)) + return false; + _interop.SendFrame(_handle, frame); + Interlocked.Increment(ref _framesSent); + return true; + } + + /// Long-running send loop. Run on a dedicated thread. + public Task RunAsync(CancellationToken cancellationToken) => + Task.Factory.StartNew(async () => + { + try + { + while (!cancellationToken.IsCancellationRequested) + { + var more = await SendNextAsync(cancellationToken); + if (!more) break; + } + } + catch (OperationCanceledException) { } + catch (Exception ex) + { + _logger.LogError(ex, "NdiSender loop crashed for output {Output}.", _outputName); + throw; + } + }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap(); + + public void Dispose() => _handle.Dispose(); +} diff --git a/src/tests/TeamsISO.Engine.Tests/Pipeline/NdiSenderTests.cs b/src/tests/TeamsISO.Engine.Tests/Pipeline/NdiSenderTests.cs new file mode 100644 index 0000000..db6aceb --- /dev/null +++ b/src/tests/TeamsISO.Engine.Tests/Pipeline/NdiSenderTests.cs @@ -0,0 +1,62 @@ +using System.Threading.Channels; +using Microsoft.Extensions.Logging.Abstractions; +using TeamsISO.Engine.Pipeline; +using TeamsISO.Engine.Tests.Fakes; + +namespace TeamsISO.Engine.Tests.Pipeline; + +public class NdiSenderTests +{ + private const string Output = "TEAMSISO_01"; + + private static ProcessedFrame MakeFrame(long ts) => + new(1920, 1080, ts, new byte[1920 * 1080 * 4], PixelFormat.Bgra); + + [Fact] + public async Task SendNextAsync_FrameAvailable_ForwardsToInterop() + { + var interop = new FakeNdiInterop(); + var input = Channel.CreateUnbounded(); + input.Writer.TryWrite(MakeFrame(100)); + var sender = new NdiSender(interop, Output, input.Reader, NullLogger.Instance); + + var sent = await sender.SendNextAsync(CancellationToken.None); + + sent.Should().BeTrue(); + sender.FramesSent.Should().Be(1); + interop.SentFrames[Output].Should().HaveCount(1); + interop.SentFrames[Output][0].TimestampTicks.Should().Be(100); + } + + [Fact] + public async Task SendNextAsync_ChannelCompleted_ReturnsFalse() + { + var interop = new FakeNdiInterop(); + var input = Channel.CreateUnbounded(); + input.Writer.TryComplete(); + var sender = new NdiSender(interop, Output, input.Reader, NullLogger.Instance); + + var sent = await sender.SendNextAsync(CancellationToken.None); + + sent.Should().BeFalse(); + } + + [Fact] + public async Task RunAsync_CompletesOnCancellation() + { + var interop = new FakeNdiInterop(); + var input = Channel.CreateUnbounded(); + var sender = new NdiSender(interop, Output, input.Reader, NullLogger.Instance); + + using var cts = new CancellationTokenSource(); + var runTask = sender.RunAsync(cts.Token); + + // Send one frame, then cancel + input.Writer.TryWrite(MakeFrame(1)); + await Task.Delay(30); + cts.Cancel(); + await runTask; + + interop.SenderCreatedCount[Output].Should().Be(1); + } +}