using System.Threading.Channels; using Microsoft.Extensions.Logging; using TeamsISO.Engine.Domain; using TeamsISO.Engine.Interop; namespace TeamsISO.Engine.Pipeline; /// /// Per-ISO unit. Owns one capture loop, one frame processor, one send loop, and the /// supervisor that restarts the inner pipeline with exponential backoff on failure. /// public sealed class IsoPipeline : IAsyncDisposable { private Func _runInner; private readonly ExponentialBackoff _backoff; private readonly Func _delay; private readonly ILogger _logger; private CancellationTokenSource? _cts; private Task? _supervisorTask; private int _consecutiveFailures; // Refs to the currently-live receiver, sender, and frame processor, set by the // inner loop on each restart. Reads via Volatile.Read are safe from any thread // (UI's stats poll). private NdiReceiver? _liveReceiver; private NdiSender? _liveSender; private FrameProcessor? _liveProcessor; // Last-frame metadata, snapshotted out of the RawFrame on capture so we don't // hold a reference to the frame's pixel buffer past its useful life. Two ints // and a DateTimeOffset are atomically writable on x64; we accept tearing on x86 // (purely advisory stats display). private int _lastWidth; private int _lastHeight; private DateTimeOffset? _lastReceivedAt; // Ring buffer of the last 30 incoming-frame timestamps for live fps display. // Updated on the receiver's capture thread (single writer) and read by the UI // poll thread (single reader); we use a lock for the snapshot path because // an array-of-ticks can't be torn-read atomically. private readonly long[] _frameTimes = new long[30]; private int _frameTimesHead; private int _frameTimesCount; private readonly object _frameTimesGate = new(); public Guid ParticipantId { get; } public IsoState State { get; private set; } = IsoState.Idle; public int ConsecutiveFailures => _consecutiveFailures; /// /// Snapshot of the pipeline's current health. Safe to call from any thread; values /// are inherently a moment-in-time view and may change immediately. Returns /// when no inner pipeline is currently /// running (e.g. between supervisor restarts or after final failure). /// public Domain.IsoHealthStats GetStats() { var receiver = Volatile.Read(ref _liveReceiver); var sender = Volatile.Read(ref _liveSender); var processor = Volatile.Read(ref _liveProcessor); var w = Volatile.Read(ref _lastWidth); var h = Volatile.Read(ref _lastHeight); var lastAt = _lastReceivedAt; if (receiver is null || sender is null) return Domain.IsoHealthStats.Empty; // FrameProcessor.Stats already aggregates FramesDropped (older frames dropped // by the closest-frame strategy when the input channel had backlog) and // FramesDuplicated (last-frame re-emits when no new frame arrived this tick). var procStats = processor?.Stats; return new Domain.IsoHealthStats( FramesIn: receiver.FramesCaptured, FramesOut: sender.FramesSent, FramesDropped: procStats?.FramesDropped ?? 0, FramesDuplicated: procStats?.FramesDuplicated ?? 0, LastFrameAt: lastAt, IncomingFps: ComputeFps(), IncomingWidth: w, IncomingHeight: h); } /// /// Computes a moving-average incoming framerate from the last N frame timestamps. /// Rate = (count - 1) / (newest - oldest). Returns 0 if fewer than 2 frames are /// recorded or if the window is degenerate (clock skew, all-equal timestamps). /// private double ComputeFps() { long oldest, newest; int count; lock (_frameTimesGate) { count = _frameTimesCount; if (count < 2) return 0; // Oldest is at the slot AFTER head when buffer is full; otherwise at index 0. var oldestIdx = count < _frameTimes.Length ? 0 : _frameTimesHead; var newestIdx = (_frameTimesHead - 1 + _frameTimes.Length) % _frameTimes.Length; oldest = _frameTimes[oldestIdx]; newest = _frameTimes[newestIdx]; } var deltaTicks = newest - oldest; if (deltaTicks <= 0) return 0; var seconds = deltaTicks / (double)TimeSpan.TicksPerSecond; return (count - 1) / seconds; } private void RecordFrameTimestamp(long ticks) { lock (_frameTimesGate) { _frameTimes[_frameTimesHead] = ticks; _frameTimesHead = (_frameTimesHead + 1) % _frameTimes.Length; if (_frameTimesCount < _frameTimes.Length) _frameTimesCount++; } } private void ResetFrameTimestamps() { lock (_frameTimesGate) { _frameTimesHead = 0; _frameTimesCount = 0; } } /// /// Test ctor. The caller supplies the inner runner directly so failures and lifetimes /// can be controlled from a unit test. /// internal IsoPipeline( Guid participantId, Func runInner, ExponentialBackoff backoff, Func delay, ILoggerFactory loggerFactory) { ParticipantId = participantId; _runInner = runInner; _backoff = backoff; _delay = delay; _logger = loggerFactory.CreateLogger(); } /// /// Production ctor. Builds the inner runner from the engine dependencies. /// public IsoPipeline( IsoPipelineConfig config, INdiInterop interop, IFrameScaler scaler, IFrameClock frameClock, ExponentialBackoff backoff, Func delay, ILoggerFactory loggerFactory) : this(config.ParticipantId, // The inner-runner closure captures `this` so the receiver/sender // wired by RunInnerPipelineAsync can be published to instance fields // for stats reads. default(Func)!, backoff, delay, loggerFactory) { _runInner = ct => RunInnerPipelineAsync( config, interop, scaler, frameClock, loggerFactory, ct, onLive: (recv, send, proc) => { Volatile.Write(ref _liveReceiver, recv); Volatile.Write(ref _liveSender, send); Volatile.Write(ref _liveProcessor, proc); ResetFrameTimestamps(); // fresh window on every supervisor restart }, onClear: () => { Volatile.Write(ref _liveReceiver, null); Volatile.Write(ref _liveSender, null); Volatile.Write(ref _liveProcessor, null); ResetFrameTimestamps(); }, onFrame: frame => { // Snapshot dimensions only — don't hold the RawFrame reference past // the channel write so the GC can reclaim it on schedule, and so a // late stats read can never resurrect a dropped frame's pixel buffer. Volatile.Write(ref _lastWidth, frame.Width); Volatile.Write(ref _lastHeight, frame.Height); var nowTicks = DateTimeOffset.UtcNow.UtcTicks; _lastReceivedAt = new DateTimeOffset(nowTicks, TimeSpan.Zero); RecordFrameTimestamp(nowTicks); }); } /// Starts the supervisor. Returns immediately; pipeline runs in the background. public Task StartAsync() { if (_supervisorTask is not null) throw new InvalidOperationException("Pipeline already started."); _cts = new CancellationTokenSource(); State = IsoState.Receiving; _supervisorTask = SupervisorLoopAsync(_cts.Token); return Task.CompletedTask; } /// Stops the pipeline and awaits supervisor completion. public async Task StopAsync() { if (_cts is null) return; _cts.Cancel(); if (_supervisorTask is not null) { try { await _supervisorTask; } catch (OperationCanceledException) { /* expected */ } } State = IsoState.Idle; _cts.Dispose(); _cts = null; _supervisorTask = null; } public async ValueTask DisposeAsync() { await StopAsync(); } private async Task SupervisorLoopAsync(CancellationToken ct) { _consecutiveFailures = 0; while (!ct.IsCancellationRequested) { try { await _runInner(ct); // Inner exited normally (typically only on cancellation) — leave the loop. break; } catch (OperationCanceledException) { break; } catch (Exception ex) { _consecutiveFailures++; _logger.LogWarning(ex, "Pipeline {ParticipantId} failed (consecutive failure #{N}).", ParticipantId, _consecutiveFailures); if (_backoff.ShouldGiveUp(_consecutiveFailures)) { State = IsoState.Error; _logger.LogError("Pipeline {ParticipantId} giving up after {N} consecutive failures.", ParticipantId, _consecutiveFailures); return; } var wait = _backoff.NextDelay(_consecutiveFailures); _logger.LogInformation("Pipeline {ParticipantId} retrying in {Delay}.", ParticipantId, wait); try { await _delay(wait, ct); } catch (OperationCanceledException) { break; } State = IsoState.Receiving; } } } /// /// Default inner pipeline: spins up receiver → processor → sender on bounded channels /// and awaits all three. Throws if any of them throws. /// /// The optional / / /// callbacks let the outer publish references to the live /// receiver and sender (so it can read counters from any thread for health stats) /// and observe the most recent received frame (so source resolution / last-seen-at /// can be surfaced in the UI). All three are no-ops by default. /// private static async Task RunInnerPipelineAsync( IsoPipelineConfig config, INdiInterop interop, IFrameScaler scaler, IFrameClock frameClock, ILoggerFactory loggerFactory, CancellationToken ct, Action? onLive = null, Action? onClear = null, Action? onFrame = null) { var rawChannel = Channel.CreateBounded(new BoundedChannelOptions(config.RawChannelCapacity) { FullMode = BoundedChannelFullMode.DropOldest, SingleReader = true, SingleWriter = true, }); var processedChannel = Channel.CreateBounded(new BoundedChannelOptions(config.ProcessedChannelCapacity) { FullMode = BoundedChannelFullMode.DropOldest, SingleReader = true, SingleWriter = true, }); // Tap the raw frames as they flow into the channel so the host can show "last // frame at" / source resolution without us re-implementing a probe. var rawWriter = onFrame is null ? rawChannel.Writer : new TappedChannelWriter(rawChannel.Writer, onFrame); using var receiver = new NdiReceiver( interop, config.SourceName, rawWriter, loggerFactory.CreateLogger()); using var sender = new NdiSender( interop, config.OutputName, processedChannel.Reader, loggerFactory.CreateLogger(), config.OutputGroups); var processor = new FrameProcessor( config.Settings, scaler, new SolidFrameRenderer(), frameClock, rawChannel.Reader, processedChannel.Writer, config.SlateThreshold, loggerFactory.CreateLogger()); onLive?.Invoke(receiver, sender, processor); var receiverTask = receiver.RunAsync(ct); var senderTask = sender.RunAsync(ct); var processorTask = ProcessorLoopAsync(processor, frameClock, ct); try { await Task.WhenAll(receiverTask, senderTask, processorTask); } finally { rawChannel.Writer.TryComplete(); processedChannel.Writer.TryComplete(); onClear?.Invoke(); } } /// /// Channel-writer wrapper that fires a callback on every successful write but /// otherwise behaves identically to the inner writer. Used to tap the raw-frame /// stream for stats without entangling the receiver with the stats API. /// private sealed class TappedChannelWriter : ChannelWriter { private readonly ChannelWriter _inner; private readonly Action _onWrite; public TappedChannelWriter(ChannelWriter inner, Action onWrite) { _inner = inner; _onWrite = onWrite; } public override bool TryWrite(T item) { if (_inner.TryWrite(item)) { _onWrite(item); return true; } return false; } public override ValueTask WaitToWriteAsync(CancellationToken ct = default) => _inner.WaitToWriteAsync(ct); public override bool TryComplete(Exception? error = null) => _inner.TryComplete(error); } private static async Task ProcessorLoopAsync(FrameProcessor processor, IFrameClock clock, CancellationToken ct) { while (!ct.IsCancellationRequested) { var advanced = await clock.WaitForNextTickAsync(ct); if (!advanced) break; await processor.ProcessOnceAsync(ct); } } }