using System.Threading.Channels; using Microsoft.Extensions.Logging; using TeamsISO.Engine.Domain; using TeamsISO.Engine.Interop; namespace TeamsISO.Engine.Pipeline; /// /// Per-ISO unit. Owns one capture loop, one frame processor, one send loop, and the /// supervisor that restarts the inner pipeline with exponential backoff on failure. /// public sealed class IsoPipeline : IAsyncDisposable { private readonly Func _runInner; private readonly ExponentialBackoff _backoff; private readonly Func _delay; private readonly ILogger _logger; private CancellationTokenSource? _cts; private Task? _supervisorTask; private int _consecutiveFailures; public Guid ParticipantId { get; } public IsoState State { get; private set; } = IsoState.Idle; public int ConsecutiveFailures => _consecutiveFailures; /// /// Test ctor. The caller supplies the inner runner directly so failures and lifetimes /// can be controlled from a unit test. /// internal IsoPipeline( Guid participantId, Func runInner, ExponentialBackoff backoff, Func delay, ILoggerFactory loggerFactory) { ParticipantId = participantId; _runInner = runInner; _backoff = backoff; _delay = delay; _logger = loggerFactory.CreateLogger(); } /// /// Production ctor. Builds the inner runner from the engine dependencies. /// public IsoPipeline( IsoPipelineConfig config, INdiInterop interop, IFrameScaler scaler, IFrameClock frameClock, ExponentialBackoff backoff, Func delay, ILoggerFactory loggerFactory) : this(config.ParticipantId, ct => RunInnerPipelineAsync(config, interop, scaler, frameClock, loggerFactory, ct), backoff, delay, loggerFactory) { } /// Starts the supervisor. Returns immediately; pipeline runs in the background. public Task StartAsync() { if (_supervisorTask is not null) throw new InvalidOperationException("Pipeline already started."); _cts = new CancellationTokenSource(); State = IsoState.Receiving; _supervisorTask = SupervisorLoopAsync(_cts.Token); return Task.CompletedTask; } /// Stops the pipeline and awaits supervisor completion. public async Task StopAsync() { if (_cts is null) return; _cts.Cancel(); if (_supervisorTask is not null) { try { await _supervisorTask; } catch (OperationCanceledException) { /* expected */ } } State = IsoState.Idle; _cts.Dispose(); _cts = null; _supervisorTask = null; } public async ValueTask DisposeAsync() { await StopAsync(); } private async Task SupervisorLoopAsync(CancellationToken ct) { _consecutiveFailures = 0; while (!ct.IsCancellationRequested) { try { await _runInner(ct); // Inner exited normally (typically only on cancellation) — leave the loop. break; } catch (OperationCanceledException) { break; } catch (Exception ex) { _consecutiveFailures++; _logger.LogWarning(ex, "Pipeline {ParticipantId} failed (consecutive failure #{N}).", ParticipantId, _consecutiveFailures); if (_backoff.ShouldGiveUp(_consecutiveFailures)) { State = IsoState.Error; _logger.LogError("Pipeline {ParticipantId} giving up after {N} consecutive failures.", ParticipantId, _consecutiveFailures); return; } var wait = _backoff.NextDelay(_consecutiveFailures); _logger.LogInformation("Pipeline {ParticipantId} retrying in {Delay}.", ParticipantId, wait); try { await _delay(wait, ct); } catch (OperationCanceledException) { break; } State = IsoState.Receiving; } } } /// /// Default inner pipeline: spins up receiver → processor → sender on bounded channels /// and awaits all three. Throws if any of them throws. /// private static async Task RunInnerPipelineAsync( IsoPipelineConfig config, INdiInterop interop, IFrameScaler scaler, IFrameClock frameClock, ILoggerFactory loggerFactory, CancellationToken ct) { var rawChannel = Channel.CreateBounded(new BoundedChannelOptions(config.RawChannelCapacity) { FullMode = BoundedChannelFullMode.DropOldest, SingleReader = true, SingleWriter = true, }); var processedChannel = Channel.CreateBounded(new BoundedChannelOptions(config.ProcessedChannelCapacity) { FullMode = BoundedChannelFullMode.DropOldest, SingleReader = true, SingleWriter = true, }); using var receiver = new NdiReceiver( interop, config.SourceName, rawChannel.Writer, loggerFactory.CreateLogger()); using var sender = new NdiSender( interop, config.OutputName, processedChannel.Reader, loggerFactory.CreateLogger()); var processor = new FrameProcessor( config.Settings, scaler, new SolidFrameRenderer(), frameClock, rawChannel.Reader, processedChannel.Writer, config.SlateThreshold, loggerFactory.CreateLogger()); var receiverTask = receiver.RunAsync(ct); var senderTask = sender.RunAsync(ct); var processorTask = ProcessorLoopAsync(processor, frameClock, ct); try { await Task.WhenAll(receiverTask, senderTask, processorTask); } finally { rawChannel.Writer.TryComplete(); processedChannel.Writer.TryComplete(); } } private static async Task ProcessorLoopAsync(FrameProcessor processor, IFrameClock clock, CancellationToken ct) { while (!ct.IsCancellationRequested) { var advanced = await clock.WaitForNextTickAsync(ct); if (!advanced) break; await processor.ProcessOnceAsync(ct); } } }