feat(pipeline): add FrameProcessor with closest-frame timing and slate fallback
Some checks failed
CI / build-and-test (push) Failing after 24s

This commit is contained in:
Zac Gaetano 2026-05-07 15:15:19 +00:00
parent 970f04861d
commit 5c039025fd
2 changed files with 209 additions and 0 deletions

View file

@ -0,0 +1,100 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using TeamsISO.Engine.Domain;
namespace TeamsISO.Engine.Pipeline;
/// <summary>
/// Per-ISO frame timing engine. Implements closest-frame strategy: at each tick,
/// pick the newest available raw frame (dropping older queued frames), scale and emit it.
/// If no new frame is available, re-emit the last frame. If no frame has arrived for
/// <see cref="_slateThreshold"/>, emit a no-signal slate instead.
/// </summary>
public sealed class FrameProcessor
{
private readonly FrameProcessingSettings _settings;
private readonly IFrameScaler _scaler;
private readonly SolidFrameRenderer _slateRenderer;
private readonly IFrameClock _clock;
private readonly ChannelReader<RawFrame> _input;
private readonly ChannelWriter<ProcessedFrame> _output;
private readonly TimeSpan _slateThreshold;
private readonly ILogger<FrameProcessor> _logger;
private RawFrame? _lastRawFrame;
private long _lastFrameTickTicks;
private long _framesIn;
private long _framesOut;
private long _framesDropped;
private long _framesDuplicated;
private long _framesSlated;
public FrameProcessor(
FrameProcessingSettings settings,
IFrameScaler scaler,
SolidFrameRenderer slateRenderer,
IFrameClock clock,
ChannelReader<RawFrame> input,
ChannelWriter<ProcessedFrame> output,
TimeSpan slateThreshold,
ILogger<FrameProcessor> logger)
{
_settings = settings;
_scaler = scaler;
_slateRenderer = slateRenderer;
_clock = clock;
_input = input;
_output = output;
_slateThreshold = slateThreshold;
_logger = logger;
}
public IsoHealthStats Stats =>
new(
FramesIn: Interlocked.Read(ref _framesIn),
FramesOut: Interlocked.Read(ref _framesOut),
FramesDropped: Interlocked.Read(ref _framesDropped),
FramesDuplicated: Interlocked.Read(ref _framesDuplicated),
LastFrameAt: _lastFrameTickTicks == 0 ? null : new DateTimeOffset(_lastFrameTickTicks, TimeSpan.Zero),
IncomingFps: 0,
IncomingWidth: _lastRawFrame?.Width ?? 0,
IncomingHeight: _lastRawFrame?.Height ?? 0);
public Task ProcessOnceAsync(CancellationToken cancellationToken)
{
// Drain the input channel non-blockingly, keeping only the newest frame.
RawFrame? newest = null;
while (_input.TryRead(out var frame))
{
if (newest is not null)
Interlocked.Increment(ref _framesDropped);
newest = frame;
Interlocked.Increment(ref _framesIn);
}
var (targetW, targetH) = _settings.ResolutionSize;
var nowTicks = _clock.NowTicks;
ProcessedFrame toEmit;
if (newest is not null)
{
_lastRawFrame = newest;
_lastFrameTickTicks = nowTicks;
toEmit = _scaler.Scale(newest, targetW, targetH, _settings.Aspect, nowTicks);
}
else if (_lastRawFrame is not null && (nowTicks - _lastFrameTickTicks) <= _slateThreshold.Ticks)
{
Interlocked.Increment(ref _framesDuplicated);
toEmit = _scaler.Scale(_lastRawFrame, targetW, targetH, _settings.Aspect, nowTicks);
}
else
{
Interlocked.Increment(ref _framesSlated);
toEmit = _slateRenderer.Render(targetW, targetH, b: 0x80, g: 0x80, r: 0x80, a: 0xFF, nowTicks);
}
Interlocked.Increment(ref _framesOut);
_output.TryWrite(toEmit);
return Task.CompletedTask;
}
}

View file

@ -0,0 +1,109 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging.Abstractions;
using TeamsISO.Engine.Domain;
using TeamsISO.Engine.Pipeline;
using TeamsISO.Engine.Tests.Fakes;
namespace TeamsISO.Engine.Tests.Pipeline;
public class FrameProcessorTests
{
private static readonly FrameProcessingSettings Settings1080p30 =
new(TargetFramerate.Fps30, TargetResolution.R1080p, AspectMode.Pillarbox, AudioMode.Auto);
private static RawFrame MakeFrame(int width, int height, long ts) =>
new(width, height, ts, new byte[width * height * 4], PixelFormat.Bgra);
private static FrameProcessor NewProcessor(
FakeFrameClock clock,
Channel<RawFrame> input,
Channel<ProcessedFrame> output,
FrameProcessingSettings? settings = null)
=> new(
settings: settings ?? Settings1080p30,
scaler: new PassthroughFrameScaler(),
slateRenderer: new SolidFrameRenderer(),
clock: clock,
input: input.Reader,
output: output.Writer,
slateThreshold: TimeSpan.FromSeconds(2.5),
logger: NullLogger<FrameProcessor>.Instance);
[Fact]
public async Task ProcessOnce_NewFrameAvailable_EmitsScaledFrame()
{
var clock = new FakeFrameClock();
var input = Channel.CreateBounded<RawFrame>(4);
var output = Channel.CreateUnbounded<ProcessedFrame>();
var proc = NewProcessor(clock, input, output);
input.Writer.TryWrite(MakeFrame(640, 360, ts: 100));
clock.Advance(TimeSpan.FromMilliseconds(34));
await proc.ProcessOnceAsync(CancellationToken.None);
output.Reader.TryRead(out var frame).Should().BeTrue();
frame!.Width.Should().Be(1920);
frame.Height.Should().Be(1080);
}
[Fact]
public async Task ProcessOnce_NoNewFrame_ReEmitsLastFrame()
{
var clock = new FakeFrameClock();
var input = Channel.CreateBounded<RawFrame>(4);
var output = Channel.CreateUnbounded<ProcessedFrame>();
var proc = NewProcessor(clock, input, output);
input.Writer.TryWrite(MakeFrame(640, 360, ts: 100));
clock.Advance(TimeSpan.FromMilliseconds(34));
await proc.ProcessOnceAsync(CancellationToken.None);
output.Reader.TryRead(out _).Should().BeTrue();
clock.Advance(TimeSpan.FromMilliseconds(34));
await proc.ProcessOnceAsync(CancellationToken.None);
output.Reader.TryRead(out var second).Should().BeTrue();
second!.Width.Should().Be(1920);
proc.Stats.FramesDuplicated.Should().Be(1);
}
[Fact]
public async Task ProcessOnce_NoFrameForLongerThanSlateThreshold_EmitsSlate()
{
var clock = new FakeFrameClock();
var input = Channel.CreateBounded<RawFrame>(4);
var output = Channel.CreateUnbounded<ProcessedFrame>();
var proc = NewProcessor(clock, input, output);
input.Writer.TryWrite(MakeFrame(640, 360, ts: 100));
clock.Advance(TimeSpan.FromMilliseconds(34));
await proc.ProcessOnceAsync(CancellationToken.None);
output.Reader.TryRead(out _);
clock.Advance(TimeSpan.FromSeconds(3));
await proc.ProcessOnceAsync(CancellationToken.None);
output.Reader.TryRead(out var slate).Should().BeTrue();
slate!.Width.Should().Be(1920);
slate.Height.Should().Be(1080);
slate.Pixels.Span[0].Should().Be(0x80);
}
[Fact]
public async Task ProcessOnce_PicksNewestFrame_DropsOlder()
{
var clock = new FakeFrameClock();
var input = Channel.CreateBounded<RawFrame>(4);
var output = Channel.CreateUnbounded<ProcessedFrame>();
var proc = NewProcessor(clock, input, output);
input.Writer.TryWrite(MakeFrame(640, 360, ts: 100));
input.Writer.TryWrite(MakeFrame(640, 360, ts: 200));
input.Writer.TryWrite(MakeFrame(640, 360, ts: 300));
clock.Advance(TimeSpan.FromMilliseconds(34));
await proc.ProcessOnceAsync(CancellationToken.None);
proc.Stats.FramesIn.Should().Be(3);
proc.Stats.FramesDropped.Should().Be(2);
}
}