feat(pipeline): add NdiSender with channel-based input
Some checks failed
CI / build-and-test (push) Has been cancelled

This commit is contained in:
Zac Gaetano 2026-05-07 15:23:51 +00:00
parent ead5e79935
commit aecbda674d
2 changed files with 132 additions and 0 deletions

View file

@ -0,0 +1,70 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using TeamsISO.Engine.Interop;
namespace TeamsISO.Engine.Pipeline;
/// <summary>
/// Pulls processed frames from a channel and forwards them to <see cref="INdiInterop.SendFrame"/>.
/// </summary>
public sealed class NdiSender : IDisposable
{
private readonly INdiInterop _interop;
private readonly string _outputName;
private readonly ChannelReader<ProcessedFrame> _input;
private readonly ILogger<NdiSender> _logger;
private readonly NdiSenderHandle _handle;
private long _framesSent;
public NdiSender(
INdiInterop interop,
string outputName,
ChannelReader<ProcessedFrame> input,
ILogger<NdiSender> logger)
{
_interop = interop;
_outputName = outputName;
_input = input;
_logger = logger;
_handle = interop.CreateSender(outputName);
}
public long FramesSent => Interlocked.Read(ref _framesSent);
/// <summary>
/// Awaits one frame and forwards it. Returns false if the channel is completed.
/// Test seam.
/// </summary>
public async ValueTask<bool> SendNextAsync(CancellationToken cancellationToken)
{
if (!await _input.WaitToReadAsync(cancellationToken))
return false;
if (!_input.TryRead(out var frame))
return false;
_interop.SendFrame(_handle, frame);
Interlocked.Increment(ref _framesSent);
return true;
}
/// <summary>Long-running send loop. Run on a dedicated thread.</summary>
public Task RunAsync(CancellationToken cancellationToken) =>
Task.Factory.StartNew(async () =>
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
var more = await SendNextAsync(cancellationToken);
if (!more) break;
}
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
_logger.LogError(ex, "NdiSender loop crashed for output {Output}.", _outputName);
throw;
}
}, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
public void Dispose() => _handle.Dispose();
}

View file

@ -0,0 +1,62 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging.Abstractions;
using TeamsISO.Engine.Pipeline;
using TeamsISO.Engine.Tests.Fakes;
namespace TeamsISO.Engine.Tests.Pipeline;
public class NdiSenderTests
{
private const string Output = "TEAMSISO_01";
private static ProcessedFrame MakeFrame(long ts) =>
new(1920, 1080, ts, new byte[1920 * 1080 * 4], PixelFormat.Bgra);
[Fact]
public async Task SendNextAsync_FrameAvailable_ForwardsToInterop()
{
var interop = new FakeNdiInterop();
var input = Channel.CreateUnbounded<ProcessedFrame>();
input.Writer.TryWrite(MakeFrame(100));
var sender = new NdiSender(interop, Output, input.Reader, NullLogger<NdiSender>.Instance);
var sent = await sender.SendNextAsync(CancellationToken.None);
sent.Should().BeTrue();
sender.FramesSent.Should().Be(1);
interop.SentFrames[Output].Should().HaveCount(1);
interop.SentFrames[Output][0].TimestampTicks.Should().Be(100);
}
[Fact]
public async Task SendNextAsync_ChannelCompleted_ReturnsFalse()
{
var interop = new FakeNdiInterop();
var input = Channel.CreateUnbounded<ProcessedFrame>();
input.Writer.TryComplete();
var sender = new NdiSender(interop, Output, input.Reader, NullLogger<NdiSender>.Instance);
var sent = await sender.SendNextAsync(CancellationToken.None);
sent.Should().BeFalse();
}
[Fact]
public async Task RunAsync_CompletesOnCancellation()
{
var interop = new FakeNdiInterop();
var input = Channel.CreateUnbounded<ProcessedFrame>();
var sender = new NdiSender(interop, Output, input.Reader, NullLogger<NdiSender>.Instance);
using var cts = new CancellationTokenSource();
var runTask = sender.RunAsync(cts.Token);
// Send one frame, then cancel
input.Writer.TryWrite(MakeFrame(1));
await Task.Delay(30);
cts.Cancel();
await runTask;
interop.SenderCreatedCount[Output].Should().Be(1);
}
}