diff --git a/src/TeamsISO.Engine/Pipeline/FrameProcessor.cs b/src/TeamsISO.Engine/Pipeline/FrameProcessor.cs new file mode 100644 index 0000000..7da6f9d --- /dev/null +++ b/src/TeamsISO.Engine/Pipeline/FrameProcessor.cs @@ -0,0 +1,100 @@ +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using TeamsISO.Engine.Domain; + +namespace TeamsISO.Engine.Pipeline; + +/// +/// Per-ISO frame timing engine. Implements closest-frame strategy: at each tick, +/// pick the newest available raw frame (dropping older queued frames), scale and emit it. +/// If no new frame is available, re-emit the last frame. If no frame has arrived for +/// , emit a no-signal slate instead. +/// +public sealed class FrameProcessor +{ + private readonly FrameProcessingSettings _settings; + private readonly IFrameScaler _scaler; + private readonly SolidFrameRenderer _slateRenderer; + private readonly IFrameClock _clock; + private readonly ChannelReader _input; + private readonly ChannelWriter _output; + private readonly TimeSpan _slateThreshold; + private readonly ILogger _logger; + + private RawFrame? _lastRawFrame; + private long _lastFrameTickTicks; + private long _framesIn; + private long _framesOut; + private long _framesDropped; + private long _framesDuplicated; + private long _framesSlated; + + public FrameProcessor( + FrameProcessingSettings settings, + IFrameScaler scaler, + SolidFrameRenderer slateRenderer, + IFrameClock clock, + ChannelReader input, + ChannelWriter output, + TimeSpan slateThreshold, + ILogger logger) + { + _settings = settings; + _scaler = scaler; + _slateRenderer = slateRenderer; + _clock = clock; + _input = input; + _output = output; + _slateThreshold = slateThreshold; + _logger = logger; + } + + public IsoHealthStats Stats => + new( + FramesIn: Interlocked.Read(ref _framesIn), + FramesOut: Interlocked.Read(ref _framesOut), + FramesDropped: Interlocked.Read(ref _framesDropped), + FramesDuplicated: Interlocked.Read(ref _framesDuplicated), + LastFrameAt: _lastFrameTickTicks == 0 ? null : new DateTimeOffset(_lastFrameTickTicks, TimeSpan.Zero), + IncomingFps: 0, + IncomingWidth: _lastRawFrame?.Width ?? 0, + IncomingHeight: _lastRawFrame?.Height ?? 0); + + public Task ProcessOnceAsync(CancellationToken cancellationToken) + { + // Drain the input channel non-blockingly, keeping only the newest frame. + RawFrame? newest = null; + while (_input.TryRead(out var frame)) + { + if (newest is not null) + Interlocked.Increment(ref _framesDropped); + newest = frame; + Interlocked.Increment(ref _framesIn); + } + + var (targetW, targetH) = _settings.ResolutionSize; + var nowTicks = _clock.NowTicks; + + ProcessedFrame toEmit; + if (newest is not null) + { + _lastRawFrame = newest; + _lastFrameTickTicks = nowTicks; + toEmit = _scaler.Scale(newest, targetW, targetH, _settings.Aspect, nowTicks); + } + else if (_lastRawFrame is not null && (nowTicks - _lastFrameTickTicks) <= _slateThreshold.Ticks) + { + Interlocked.Increment(ref _framesDuplicated); + toEmit = _scaler.Scale(_lastRawFrame, targetW, targetH, _settings.Aspect, nowTicks); + } + else + { + Interlocked.Increment(ref _framesSlated); + toEmit = _slateRenderer.Render(targetW, targetH, b: 0x80, g: 0x80, r: 0x80, a: 0xFF, nowTicks); + } + + Interlocked.Increment(ref _framesOut); + _output.TryWrite(toEmit); + return Task.CompletedTask; + } +} diff --git a/src/tests/TeamsISO.Engine.Tests/Pipeline/FrameProcessorTests.cs b/src/tests/TeamsISO.Engine.Tests/Pipeline/FrameProcessorTests.cs new file mode 100644 index 0000000..bd423c9 --- /dev/null +++ b/src/tests/TeamsISO.Engine.Tests/Pipeline/FrameProcessorTests.cs @@ -0,0 +1,109 @@ +using System.Threading.Channels; +using Microsoft.Extensions.Logging.Abstractions; +using TeamsISO.Engine.Domain; +using TeamsISO.Engine.Pipeline; +using TeamsISO.Engine.Tests.Fakes; + +namespace TeamsISO.Engine.Tests.Pipeline; + +public class FrameProcessorTests +{ + private static readonly FrameProcessingSettings Settings1080p30 = + new(TargetFramerate.Fps30, TargetResolution.R1080p, AspectMode.Pillarbox, AudioMode.Auto); + + private static RawFrame MakeFrame(int width, int height, long ts) => + new(width, height, ts, new byte[width * height * 4], PixelFormat.Bgra); + + private static FrameProcessor NewProcessor( + FakeFrameClock clock, + Channel input, + Channel output, + FrameProcessingSettings? settings = null) + => new( + settings: settings ?? Settings1080p30, + scaler: new PassthroughFrameScaler(), + slateRenderer: new SolidFrameRenderer(), + clock: clock, + input: input.Reader, + output: output.Writer, + slateThreshold: TimeSpan.FromSeconds(2.5), + logger: NullLogger.Instance); + + [Fact] + public async Task ProcessOnce_NewFrameAvailable_EmitsScaledFrame() + { + var clock = new FakeFrameClock(); + var input = Channel.CreateBounded(4); + var output = Channel.CreateUnbounded(); + var proc = NewProcessor(clock, input, output); + + input.Writer.TryWrite(MakeFrame(640, 360, ts: 100)); + clock.Advance(TimeSpan.FromMilliseconds(34)); + await proc.ProcessOnceAsync(CancellationToken.None); + + output.Reader.TryRead(out var frame).Should().BeTrue(); + frame!.Width.Should().Be(1920); + frame.Height.Should().Be(1080); + } + + [Fact] + public async Task ProcessOnce_NoNewFrame_ReEmitsLastFrame() + { + var clock = new FakeFrameClock(); + var input = Channel.CreateBounded(4); + var output = Channel.CreateUnbounded(); + var proc = NewProcessor(clock, input, output); + + input.Writer.TryWrite(MakeFrame(640, 360, ts: 100)); + clock.Advance(TimeSpan.FromMilliseconds(34)); + await proc.ProcessOnceAsync(CancellationToken.None); + output.Reader.TryRead(out _).Should().BeTrue(); + + clock.Advance(TimeSpan.FromMilliseconds(34)); + await proc.ProcessOnceAsync(CancellationToken.None); + + output.Reader.TryRead(out var second).Should().BeTrue(); + second!.Width.Should().Be(1920); + proc.Stats.FramesDuplicated.Should().Be(1); + } + + [Fact] + public async Task ProcessOnce_NoFrameForLongerThanSlateThreshold_EmitsSlate() + { + var clock = new FakeFrameClock(); + var input = Channel.CreateBounded(4); + var output = Channel.CreateUnbounded(); + var proc = NewProcessor(clock, input, output); + + input.Writer.TryWrite(MakeFrame(640, 360, ts: 100)); + clock.Advance(TimeSpan.FromMilliseconds(34)); + await proc.ProcessOnceAsync(CancellationToken.None); + output.Reader.TryRead(out _); + + clock.Advance(TimeSpan.FromSeconds(3)); + await proc.ProcessOnceAsync(CancellationToken.None); + + output.Reader.TryRead(out var slate).Should().BeTrue(); + slate!.Width.Should().Be(1920); + slate.Height.Should().Be(1080); + slate.Pixels.Span[0].Should().Be(0x80); + } + + [Fact] + public async Task ProcessOnce_PicksNewestFrame_DropsOlder() + { + var clock = new FakeFrameClock(); + var input = Channel.CreateBounded(4); + var output = Channel.CreateUnbounded(); + var proc = NewProcessor(clock, input, output); + + input.Writer.TryWrite(MakeFrame(640, 360, ts: 100)); + input.Writer.TryWrite(MakeFrame(640, 360, ts: 200)); + input.Writer.TryWrite(MakeFrame(640, 360, ts: 300)); + clock.Advance(TimeSpan.FromMilliseconds(34)); + await proc.ProcessOnceAsync(CancellationToken.None); + + proc.Stats.FramesIn.Should().Be(3); + proc.Stats.FramesDropped.Should().Be(2); + } +}