using System.Threading.Channels; using Microsoft.Extensions.Logging; using TeamsISO.Engine.Domain; using TeamsISO.Engine.Interop; namespace TeamsISO.Engine.Pipeline; /// /// Per-ISO unit. Owns one capture loop, one frame processor, one send loop, and the /// supervisor that restarts the inner pipeline with exponential backoff on failure. /// public sealed class IsoPipeline : IAsyncDisposable { private Func _runInner; private readonly ExponentialBackoff _backoff; private readonly Func _delay; private readonly ILogger _logger; private CancellationTokenSource? _cts; private Task? _supervisorTask; private int _consecutiveFailures; // Refs to the currently-live receiver and sender, set by the inner loop on each // restart. Reads via Volatile.Read are safe from any thread (UI's stats poll). private NdiReceiver? _liveReceiver; private NdiSender? _liveSender; private RawFrame? _lastReceivedFrame; private DateTimeOffset? _lastReceivedAt; public Guid ParticipantId { get; } public IsoState State { get; private set; } = IsoState.Idle; public int ConsecutiveFailures => _consecutiveFailures; /// /// Snapshot of the pipeline's current health. Safe to call from any thread; values /// are inherently a moment-in-time view and may change immediately. Returns /// when no inner pipeline is currently /// running (e.g. between supervisor restarts or after final failure). /// public Domain.IsoHealthStats GetStats() { var receiver = Volatile.Read(ref _liveReceiver); var sender = Volatile.Read(ref _liveSender); var lastFrame = Volatile.Read(ref _lastReceivedFrame); var lastAt = _lastReceivedAt; if (receiver is null || sender is null) return Domain.IsoHealthStats.Empty; return new Domain.IsoHealthStats( FramesIn: receiver.FramesCaptured, FramesOut: sender.FramesSent, FramesDropped: 0, // FrameProcessor currently doesn't surface drops; wire later FramesDuplicated: 0, // same — last-frame re-emits aren't counted yet LastFrameAt: lastAt, IncomingFps: 0, // running rate computation is a follow-up IncomingWidth: lastFrame?.Width ?? 0, IncomingHeight: lastFrame?.Height ?? 0); } /// /// Test ctor. The caller supplies the inner runner directly so failures and lifetimes /// can be controlled from a unit test. /// internal IsoPipeline( Guid participantId, Func runInner, ExponentialBackoff backoff, Func delay, ILoggerFactory loggerFactory) { ParticipantId = participantId; _runInner = runInner; _backoff = backoff; _delay = delay; _logger = loggerFactory.CreateLogger(); } /// /// Production ctor. Builds the inner runner from the engine dependencies. /// public IsoPipeline( IsoPipelineConfig config, INdiInterop interop, IFrameScaler scaler, IFrameClock frameClock, ExponentialBackoff backoff, Func delay, ILoggerFactory loggerFactory) : this(config.ParticipantId, // The inner-runner closure captures `this` so the receiver/sender // wired by RunInnerPipelineAsync can be published to instance fields // for stats reads. default(Func)!, backoff, delay, loggerFactory) { _runInner = ct => RunInnerPipelineAsync( config, interop, scaler, frameClock, loggerFactory, ct, onLive: (recv, send) => { Volatile.Write(ref _liveReceiver, recv); Volatile.Write(ref _liveSender, send); }, onClear: () => { Volatile.Write(ref _liveReceiver, null); Volatile.Write(ref _liveSender, null); }, onFrame: frame => { Volatile.Write(ref _lastReceivedFrame, frame); _lastReceivedAt = DateTimeOffset.UtcNow; }); } /// Starts the supervisor. Returns immediately; pipeline runs in the background. public Task StartAsync() { if (_supervisorTask is not null) throw new InvalidOperationException("Pipeline already started."); _cts = new CancellationTokenSource(); State = IsoState.Receiving; _supervisorTask = SupervisorLoopAsync(_cts.Token); return Task.CompletedTask; } /// Stops the pipeline and awaits supervisor completion. public async Task StopAsync() { if (_cts is null) return; _cts.Cancel(); if (_supervisorTask is not null) { try { await _supervisorTask; } catch (OperationCanceledException) { /* expected */ } } State = IsoState.Idle; _cts.Dispose(); _cts = null; _supervisorTask = null; } public async ValueTask DisposeAsync() { await StopAsync(); } private async Task SupervisorLoopAsync(CancellationToken ct) { _consecutiveFailures = 0; while (!ct.IsCancellationRequested) { try { await _runInner(ct); // Inner exited normally (typically only on cancellation) — leave the loop. break; } catch (OperationCanceledException) { break; } catch (Exception ex) { _consecutiveFailures++; _logger.LogWarning(ex, "Pipeline {ParticipantId} failed (consecutive failure #{N}).", ParticipantId, _consecutiveFailures); if (_backoff.ShouldGiveUp(_consecutiveFailures)) { State = IsoState.Error; _logger.LogError("Pipeline {ParticipantId} giving up after {N} consecutive failures.", ParticipantId, _consecutiveFailures); return; } var wait = _backoff.NextDelay(_consecutiveFailures); _logger.LogInformation("Pipeline {ParticipantId} retrying in {Delay}.", ParticipantId, wait); try { await _delay(wait, ct); } catch (OperationCanceledException) { break; } State = IsoState.Receiving; } } } /// /// Default inner pipeline: spins up receiver → processor → sender on bounded channels /// and awaits all three. Throws if any of them throws. /// /// The optional / / /// callbacks let the outer publish references to the live /// receiver and sender (so it can read counters from any thread for health stats) /// and observe the most recent received frame (so source resolution / last-seen-at /// can be surfaced in the UI). All three are no-ops by default. /// private static async Task RunInnerPipelineAsync( IsoPipelineConfig config, INdiInterop interop, IFrameScaler scaler, IFrameClock frameClock, ILoggerFactory loggerFactory, CancellationToken ct, Action? onLive = null, Action? onClear = null, Action? onFrame = null) { var rawChannel = Channel.CreateBounded(new BoundedChannelOptions(config.RawChannelCapacity) { FullMode = BoundedChannelFullMode.DropOldest, SingleReader = true, SingleWriter = true, }); var processedChannel = Channel.CreateBounded(new BoundedChannelOptions(config.ProcessedChannelCapacity) { FullMode = BoundedChannelFullMode.DropOldest, SingleReader = true, SingleWriter = true, }); // Tap the raw frames as they flow into the channel so the host can show "last // frame at" / source resolution without us re-implementing a probe. var rawWriter = onFrame is null ? rawChannel.Writer : new TappedChannelWriter(rawChannel.Writer, onFrame); using var receiver = new NdiReceiver( interop, config.SourceName, rawWriter, loggerFactory.CreateLogger()); using var sender = new NdiSender( interop, config.OutputName, processedChannel.Reader, loggerFactory.CreateLogger(), config.OutputGroups); onLive?.Invoke(receiver, sender); var processor = new FrameProcessor( config.Settings, scaler, new SolidFrameRenderer(), frameClock, rawChannel.Reader, processedChannel.Writer, config.SlateThreshold, loggerFactory.CreateLogger()); var receiverTask = receiver.RunAsync(ct); var senderTask = sender.RunAsync(ct); var processorTask = ProcessorLoopAsync(processor, frameClock, ct); try { await Task.WhenAll(receiverTask, senderTask, processorTask); } finally { rawChannel.Writer.TryComplete(); processedChannel.Writer.TryComplete(); onClear?.Invoke(); } } /// /// Channel-writer wrapper that fires a callback on every successful write but /// otherwise behaves identically to the inner writer. Used to tap the raw-frame /// stream for stats without entangling the receiver with the stats API. /// private sealed class TappedChannelWriter : ChannelWriter { private readonly ChannelWriter _inner; private readonly Action _onWrite; public TappedChannelWriter(ChannelWriter inner, Action onWrite) { _inner = inner; _onWrite = onWrite; } public override bool TryWrite(T item) { if (_inner.TryWrite(item)) { _onWrite(item); return true; } return false; } public override ValueTask WaitToWriteAsync(CancellationToken ct = default) => _inner.WaitToWriteAsync(ct); public override bool TryComplete(Exception? error = null) => _inner.TryComplete(error); } private static async Task ProcessorLoopAsync(FrameProcessor processor, IFrameClock clock, CancellationToken ct) { while (!ct.IsCancellationRequested) { var advanced = await clock.WaitForNextTickAsync(ct); if (!advanced) break; await processor.ProcessOnceAsync(ct); } } }