From 49b6dfb9edf79dcced778c7e3e07a273091c60bc Mon Sep 17 00:00:00 2001 From: Zac Gaetano Date: Thu, 7 May 2026 15:26:54 +0000 Subject: [PATCH] feat(pipeline): add IsoPipeline with lifecycle and restart supervisor --- src/TeamsISO.Engine/Pipeline/IsoPipeline.cs | 198 ++++++++++++++++++ .../Pipeline/IsoPipelineConfig.cs | 23 ++ src/TeamsISO.Engine/TeamsISO.Engine.csproj | 24 ++- .../Pipeline/IsoPipelineTests.cs | 99 +++++++++ 4 files changed, 333 insertions(+), 11 deletions(-) create mode 100644 src/TeamsISO.Engine/Pipeline/IsoPipeline.cs create mode 100644 src/TeamsISO.Engine/Pipeline/IsoPipelineConfig.cs create mode 100644 src/tests/TeamsISO.Engine.Tests/Pipeline/IsoPipelineTests.cs diff --git a/src/TeamsISO.Engine/Pipeline/IsoPipeline.cs b/src/TeamsISO.Engine/Pipeline/IsoPipeline.cs new file mode 100644 index 0000000..511fb21 --- /dev/null +++ b/src/TeamsISO.Engine/Pipeline/IsoPipeline.cs @@ -0,0 +1,198 @@ +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using TeamsISO.Engine.Domain; +using TeamsISO.Engine.Interop; + +namespace TeamsISO.Engine.Pipeline; + +/// +/// Per-ISO unit. Owns one capture loop, one frame processor, one send loop, and the +/// supervisor that restarts the inner pipeline with exponential backoff on failure. +/// +public sealed class IsoPipeline : IAsyncDisposable +{ + private readonly Func _runInner; + private readonly ExponentialBackoff _backoff; + private readonly Func _delay; + private readonly ILogger _logger; + + private CancellationTokenSource? _cts; + private Task? _supervisorTask; + private int _consecutiveFailures; + + public Guid ParticipantId { get; } + public IsoState State { get; private set; } = IsoState.Idle; + public int ConsecutiveFailures => _consecutiveFailures; + + /// + /// Test ctor. The caller supplies the inner runner directly so failures and lifetimes + /// can be controlled from a unit test. + /// + internal IsoPipeline( + Guid participantId, + Func runInner, + ExponentialBackoff backoff, + Func delay, + ILoggerFactory loggerFactory) + { + ParticipantId = participantId; + _runInner = runInner; + _backoff = backoff; + _delay = delay; + _logger = loggerFactory.CreateLogger(); + } + + /// + /// Production ctor. Builds the inner runner from the engine dependencies. + /// + public IsoPipeline( + IsoPipelineConfig config, + INdiInterop interop, + IFrameScaler scaler, + IFrameClock frameClock, + ExponentialBackoff backoff, + Func delay, + ILoggerFactory loggerFactory) + : this(config.ParticipantId, + ct => RunInnerPipelineAsync(config, interop, scaler, frameClock, loggerFactory, ct), + backoff, + delay, + loggerFactory) { } + + /// Starts the supervisor. Returns immediately; pipeline runs in the background. + public Task StartAsync() + { + if (_supervisorTask is not null) + throw new InvalidOperationException("Pipeline already started."); + _cts = new CancellationTokenSource(); + State = IsoState.Receiving; + _supervisorTask = SupervisorLoopAsync(_cts.Token); + return Task.CompletedTask; + } + + /// Stops the pipeline and awaits supervisor completion. + public async Task StopAsync() + { + if (_cts is null) return; + _cts.Cancel(); + if (_supervisorTask is not null) + { + try { await _supervisorTask; } + catch (OperationCanceledException) { /* expected */ } + } + State = IsoState.Idle; + _cts.Dispose(); + _cts = null; + _supervisorTask = null; + } + + public async ValueTask DisposeAsync() + { + await StopAsync(); + } + + private async Task SupervisorLoopAsync(CancellationToken ct) + { + _consecutiveFailures = 0; + while (!ct.IsCancellationRequested) + { + try + { + await _runInner(ct); + // Inner exited normally (typically only on cancellation) — leave the loop. + break; + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + _consecutiveFailures++; + _logger.LogWarning(ex, + "Pipeline {ParticipantId} failed (consecutive failure #{N}).", + ParticipantId, _consecutiveFailures); + + if (_backoff.ShouldGiveUp(_consecutiveFailures)) + { + State = IsoState.Error; + _logger.LogError("Pipeline {ParticipantId} giving up after {N} consecutive failures.", + ParticipantId, _consecutiveFailures); + return; + } + + var wait = _backoff.NextDelay(_consecutiveFailures); + _logger.LogInformation("Pipeline {ParticipantId} retrying in {Delay}.", ParticipantId, wait); + try + { + await _delay(wait, ct); + } + catch (OperationCanceledException) + { + break; + } + State = IsoState.Receiving; + } + } + } + + /// + /// Default inner pipeline: spins up receiver → processor → sender on bounded channels + /// and awaits all three. Throws if any of them throws. + /// + private static async Task RunInnerPipelineAsync( + IsoPipelineConfig config, + INdiInterop interop, + IFrameScaler scaler, + IFrameClock frameClock, + ILoggerFactory loggerFactory, + CancellationToken ct) + { + var rawChannel = Channel.CreateBounded(new BoundedChannelOptions(config.RawChannelCapacity) + { + FullMode = BoundedChannelFullMode.DropOldest, + SingleReader = true, + SingleWriter = true, + }); + var processedChannel = Channel.CreateBounded(new BoundedChannelOptions(config.ProcessedChannelCapacity) + { + FullMode = BoundedChannelFullMode.DropOldest, + SingleReader = true, + SingleWriter = true, + }); + + using var receiver = new NdiReceiver( + interop, config.SourceName, rawChannel.Writer, loggerFactory.CreateLogger()); + using var sender = new NdiSender( + interop, config.OutputName, processedChannel.Reader, loggerFactory.CreateLogger()); + + var processor = new FrameProcessor( + config.Settings, scaler, new SolidFrameRenderer(), + frameClock, rawChannel.Reader, processedChannel.Writer, + config.SlateThreshold, loggerFactory.CreateLogger()); + + var receiverTask = receiver.RunAsync(ct); + var senderTask = sender.RunAsync(ct); + var processorTask = ProcessorLoopAsync(processor, frameClock, ct); + + try + { + await Task.WhenAll(receiverTask, senderTask, processorTask); + } + finally + { + rawChannel.Writer.TryComplete(); + processedChannel.Writer.TryComplete(); + } + } + + private static async Task ProcessorLoopAsync(FrameProcessor processor, IFrameClock clock, CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + var advanced = await clock.WaitForNextTickAsync(ct); + if (!advanced) break; + await processor.ProcessOnceAsync(ct); + } + } +} diff --git a/src/TeamsISO.Engine/Pipeline/IsoPipelineConfig.cs b/src/TeamsISO.Engine/Pipeline/IsoPipelineConfig.cs new file mode 100644 index 0000000..272e57c --- /dev/null +++ b/src/TeamsISO.Engine/Pipeline/IsoPipelineConfig.cs @@ -0,0 +1,23 @@ +using TeamsISO.Engine.Domain; + +namespace TeamsISO.Engine.Pipeline; + +/// +/// Per-pipeline configuration — identifies the participant, the source it captures, +/// the output it emits, and the global processing settings to apply. +/// +public sealed record IsoPipelineConfig( + Guid ParticipantId, + string SourceName, + string OutputName, + FrameProcessingSettings Settings) +{ + /// Default no-signal threshold per spec §4. + public TimeSpan SlateThreshold { get; init; } = TimeSpan.FromSeconds(2.5); + + /// Bounded raw-frame channel capacity (drop-oldest backpressure). + public int RawChannelCapacity { get; init; } = 4; + + /// Bounded processed-frame channel capacity. + public int ProcessedChannelCapacity { get; init; } = 2; +} diff --git a/src/TeamsISO.Engine/TeamsISO.Engine.csproj b/src/TeamsISO.Engine/TeamsISO.Engine.csproj index d6fa014..310a2dd 100644 --- a/src/TeamsISO.Engine/TeamsISO.Engine.csproj +++ b/src/TeamsISO.Engine/TeamsISO.Engine.csproj @@ -1,17 +1,19 @@ - - - - net8.0 - enable - enable - - + + + + net8.0 + + - - - + + + + + + + diff --git a/src/tests/TeamsISO.Engine.Tests/Pipeline/IsoPipelineTests.cs b/src/tests/TeamsISO.Engine.Tests/Pipeline/IsoPipelineTests.cs new file mode 100644 index 0000000..765d844 --- /dev/null +++ b/src/tests/TeamsISO.Engine.Tests/Pipeline/IsoPipelineTests.cs @@ -0,0 +1,99 @@ +using Microsoft.Extensions.Logging.Abstractions; +using TeamsISO.Engine.Domain; +using TeamsISO.Engine.Pipeline; + +namespace TeamsISO.Engine.Tests.Pipeline; + +public class IsoPipelineTests +{ + private static ExponentialBackoff FastBackoff() => + new(maxAttempts: 5, initial: TimeSpan.FromMilliseconds(1), cap: TimeSpan.FromMilliseconds(10)); + + private static Func NoDelay() => + (_, _) => Task.CompletedTask; + + [Fact] + public async Task StartAsync_TransitionsIdleToReceiving() + { + var participant = Guid.NewGuid(); + var blocker = new TaskCompletionSource(); + var pipeline = new IsoPipeline( + participant, + ct => blocker.Task.WaitAsync(ct), + FastBackoff(), + NoDelay(), + NullLoggerFactory.Instance); + + await pipeline.StartAsync(); + + pipeline.State.Should().Be(IsoState.Receiving); + + blocker.SetResult(); + await pipeline.StopAsync(); + } + + [Fact] + public async Task StopAsync_TransitionsBackToIdle() + { + var blocker = new TaskCompletionSource(); + var pipeline = new IsoPipeline( + Guid.NewGuid(), + ct => blocker.Task.WaitAsync(ct), + FastBackoff(), + NoDelay(), + NullLoggerFactory.Instance); + + await pipeline.StartAsync(); + await pipeline.StopAsync(); + + pipeline.State.Should().Be(IsoState.Idle); + } + + [Fact] + public async Task Supervisor_FailsOnce_ThenRecovers_RemainsReceiving() + { + int callCount = 0; + var done = new TaskCompletionSource(); + var pipeline = new IsoPipeline( + Guid.NewGuid(), + ct => + { + callCount++; + if (callCount == 1) throw new InvalidOperationException("boom"); + return done.Task.WaitAsync(ct); + }, + FastBackoff(), + NoDelay(), + NullLoggerFactory.Instance); + + await pipeline.StartAsync(); + await Task.Delay(50); // let the supervisor cycle through the failure + callCount.Should().BeGreaterOrEqualTo(2); + pipeline.ConsecutiveFailures.Should().Be(1); + pipeline.State.Should().Be(IsoState.Receiving); + + done.SetResult(); + await pipeline.StopAsync(); + } + + [Fact] + public async Task Supervisor_FailsRepeatedly_TransitionsToError() + { + var pipeline = new IsoPipeline( + Guid.NewGuid(), + _ => throw new InvalidOperationException("permanent"), + FastBackoff(), + NoDelay(), + NullLoggerFactory.Instance); + + await pipeline.StartAsync(); + + // Wait for supervisor to give up (max 5 attempts × ~1ms delay) + var deadline = DateTime.UtcNow.AddSeconds(2); + while (pipeline.State != IsoState.Error && DateTime.UtcNow < deadline) + await Task.Delay(20); + + pipeline.State.Should().Be(IsoState.Error); + pipeline.ConsecutiveFailures.Should().Be(6); // 5 retries allowed, give up on 6th + } +}